Source code for amqpy.channel

"""AMQP Channels
"""
from __future__ import absolute_import, division, print_function

__metaclass__ = type
import logging
import six

if six.PY2:
    from Queue import Queue
else:
    from queue import Queue

from .proto import Method
from .concurrency import synchronized_connection
from .abstract_channel import AbstractChannel
from .exceptions import ChannelError, ConsumerCancelled, error_for_code
from .spec import basic_return_t, queue_declare_ok_t, method_t
from .serialization import AMQPWriter
from . import spec

__all__ = ['Channel']

log = logging.getLogger('amqpy')


[docs]class Channel(AbstractChannel): """ The channel class provides methods for a client to establish and operate an AMQP channel. All public members are fully thread-safe. """ ### constants #: Default channel mode CH_MODE_NONE = 0 #: Transaction mode CH_MODE_TX = 1 #: Publisher confirm mode (RabbitMQ extension) CH_MODE_CONFIRM = 2
[docs] def __init__(self, connection, channel_id=None, auto_decode=True): """Create a channel bound to a connection and using the specified numeric channel_id, and open on the server If `auto_decode` is enabled (default), incoming Message bodies will be automatically decoded to `str` if possible. :param connection: the channel's associated Connection :param channel_id: the channel's assigned channel ID :param auto_decode: enable auto decoding of message bodies :type connection: amqpy.connection.Connection :type channel_id: int or None :type auto_decode: bool """ if channel_id: # noinspection PyProtectedMember connection._claim_channel_id(channel_id) else: # noinspection PyProtectedMember channel_id = connection._get_free_channel_id() super(Channel, self).__init__(connection, channel_id) # auto decode received messages self.auto_decode = auto_decode ### channel state variables: #: Current channel open/closed state #: #: :type: bool self.is_open = False #: Current channel active state (flow control) #: #: :type: bool self.active = True #: Channel mode state (default, transactional, publisher confirm) #: #: :type: int self.mode = 0 #: Returned messages that the server was unable to deliver #: #: :type: queue.Queue self.returned_messages = Queue() # consumer callbacks dict[consumer_tag str: callable] self.callbacks = {} # consumer cancel callbacks dict dict[consumer_tag str: callable] self.cancel_callbacks = {} # set of consumers that have opted for `no_ack` delivery (server will not expect an ack # for delivered messages) self.no_ack_consumers = set() # open the channel self._open()
def _close(self): """Tear down this object, after we've agreed to close with the server """ log.debug('Channel close #{}'.format(self.channel_id)) self.is_open = False channel_id, self.channel_id = self.channel_id, None connection, self.connection = self.connection, None if connection: connection.channels.pop(channel_id, None) # noinspection PyProtectedMember connection._avail_channel_ids.append(channel_id) self.callbacks.clear() self.cancel_callbacks.clear() self.no_ack_consumers.clear() def _open(self): """Open the channel """ if self.is_open: return self._send_open() def _revive(self): self.is_open = False self.mode = self.CH_MODE_NONE self._send_open() @synchronized_connection()
[docs] def close(self, reply_code=0, reply_text='', method_type=method_t(0, 0)): """Request a channel close This method indicates that the sender wants to close the channel. This may be due to internal conditions (e.g. a forced shut-down) or due to an error handling a specific method, i.e. an exception When a close is due to an exception, the sender provides the class and method id of the method which caused the exception. :param reply_code: the reply code :param reply_text: localized reply text :param method_type: if close is triggered by a failing method, this is the method that caused it :type reply_code: int :type reply_text: str :type method_type: amqpy.spec.method_t """ try: if not self.is_open or self.connection is None: return args = AMQPWriter() args.write_short(reply_code) args.write_shortstr(reply_text) args.write_short(method_type.class_id) args.write_short(method_type.method_id) self._send_method(Method(spec.Channel.Close, args)) return self.wait_any([spec.Channel.Close, spec.Channel.CloseOk]) finally: self.connection = None
def _cb_close(self, method): """Respond to a channel close sent by the server This method indicates that the sender (server) wants to close the channel. This may be due to internal conditions (e.g. a forced shut-down) or due to an error handling a specific method, i.e. an exception. When a close is due to an exception, the sender provides the class and method id of the method which caused the exception. This method sends a "close-ok" to the server, then re-opens the channel. """ args = method.args reply_code = args.read_short() reply_text = args.read_shortstr() class_id = args.read_short() method_id = args.read_short() self._send_method(Method(spec.Channel.CloseOk)) self.is_open = False # re-open the channel self._revive() # get information about the method which caused the server to close the channel method_type = method_t(class_id, method_id) raise error_for_code(reply_code, reply_text, method_type, ChannelError, self.channel_id) def _cb_close_ok(self, method): """Confirm a channel close This method confirms a Channel.Close method and tells the recipient that it is safe to release resources for the channel and close the socket. """ assert method self._close() @synchronized_connection()
[docs] def flow(self, active): """Enable/disable flow from peer This method asks the peer to pause or restart the flow of content data. This is a simple flow-control mechanism that a peer can use to avoid overflowing its queues or otherwise finding itself receiving more messages than it can process. Note that this method is not intended for window control The peer that receives a request to stop sending content should finish sending the current content, if any, and then wait until it receives a Flow restart method. :param active: True: peer starts sending content frames; False: peer stops sending content frames :type active: bool """ args = AMQPWriter() args.write_bit(active) self._send_method(Method(spec.Channel.Flow, args)) return self.wait_any([spec.Channel.FlowOk, self._cb_flow_ok])
def _cb_flow(self, method): """Enable/disable flow from peer This method asks the peer to pause or restart the flow of content data. This is a simple flow-control mechanism that a peer can use to avoid overflowing its queues or otherwise finding itself receiving more messages than it can process. Note that this method is not intended for window control The peer that receives a request to stop sending content should finish sending the current content, if any, and then wait until it receives a Flow restart method. """ args = method.args self.active = args.read_bit() self._send_flow_ok(self.active) def _send_flow_ok(self, active): """Confirm a flow method Confirms to the peer that a flow command was received and processed. :param active: True: peer starts sending content frames; False: peer stops sending content frames :type active: bool """ args = AMQPWriter() args.write_bit(active) self._send_method(Method(spec.Channel.FlowOk, args)) def _cb_flow_ok(self, method): """Confirm a flow method Confirms to the peer that a flow command was received and processed. """ args = method.args return args.read_bit() def _send_open(self): """Open a channel This method opens a channel. """ args = AMQPWriter() args.write_shortstr('') # reserved self._send_method(Method(spec.Channel.Open, args)) return self.wait(spec.Channel.OpenOk) def _cb_open_ok(self, method): """Handle received "open-ok" The server sends this method to signal to the client that this channel is ready for use. """ assert method self.is_open = True log.debug('Channel open') @synchronized_connection()
[docs] def exchange_declare(self, exchange, exch_type, passive=False, durable=False, auto_delete=True, nowait=False, arguments=None): """Declare exchange, create if needed * Exchanges cannot be redeclared with different types. The client MUST not attempt to redeclare an existing exchange with a different type than used in the original Exchange.Declare method. * This method creates an exchange if it does not already exist, and if the exchange exists, verifies that it is of the correct and expected class. * The server must ignore the `durable` field if the exchange already exists. * The server must ignore the `auto_delete` field if the exchange already exists. * If `nowait` is enabled and the server could not complete the method, it will raise a channel or connection exception. * `arguments` is ignored if passive is True. :param str exchange: exchange name :param str exch_type: exchange type (direct, fanout, etc.) :param bool passive: do not create exchange; client can use this to check whether an exchange exists :param bool durable: mark exchange as durable (remain active after server restarts) :param bool auto_delete: auto-delete exchange when all queues have finished using it :param bool nowait: if set, the server will not respond to the method and the client should not wait for a reply :param dict arguments: exchange declare arguments :raise AccessRefused: if attempting to declare an exchange with a reserved name (amq.*) :raise NotFound: if `passive` is enabled and the exchange does not exist :return: None """ arguments = arguments or {} args = AMQPWriter() args.write_short(0) # reserved-1 args.write_shortstr(exchange) # exchange name args.write_shortstr(exch_type) # exchange type args.write_bit(passive) # passive args.write_bit(durable) # durable args.write_bit(auto_delete) # auto-delete args.write_bit(False) # internal args.write_bit(nowait) args.write_table(arguments) self._send_method(Method(spec.Exchange.Declare, args)) if not nowait: return self.wait(spec.Exchange.DeclareOk)
def _cb_exchange_declare_ok(self, method): """Confirms an exchange declaration The server sends this method to confirm a Declare method and confirms the name of the exchange, essential for automatically-named exchanges. """ pass @synchronized_connection()
[docs] def exchange_delete(self, exchange, if_unused=False, nowait=False): """Delete an exchange This method deletes an exchange. * If the exchange does not exist, the server must raise a channel exception. When an exchange is deleted, all queue bindings on the exchange are cancelled. * If `if_unused` is set, and the exchange has queue bindings, the server must raise a channel exception. :param str exchange: exchange name :param bool if_unused: delete only if unused (has no queue bindings) :param bool nowait: if set, the server will not respond to the method and the client should not wait for a reply :raise NotFound: if exchange with `exchange` does not exist :raise PreconditionFailed: if attempting to delete a queue with bindings and `if_unused` is set :return: None """ args = AMQPWriter() args.write_short(0) args.write_shortstr(exchange) args.write_bit(if_unused) args.write_bit(nowait) self._send_method(Method(spec.Exchange.Delete, args)) if not nowait: return self.wait(spec.Exchange.DeleteOk)
def _cb_exchange_delete_ok(self, method): """Confirm deletion of an exchange The server sends this method to confirm that the deletion of an exchange was successful. """ pass @synchronized_connection()
[docs] def exchange_bind(self, dest_exch, source_exch='', routing_key='', nowait=False, arguments=None): """Bind an exchange to an exchange * Both the `dest_exch` and `source_exch` must already exist. Blank exchange names mean the default exchange. * A server MUST allow and ignore duplicate bindings - that is, two or more bind methods for a specific exchanges, with identical arguments - without treating these as an error. * A server MUST allow cycles of exchange bindings to be created including allowing an exchange to be bound to itself. * A server MUST not deliver the same message more than once to a destination exchange, even if the topology of exchanges and bindings results in multiple (even infinite) routes to that exchange. :param str dest_exch: name of destination exchange to bind :param str source_exch: name of source exchange to bind :param str routing_key: routing key for the binding (note: not all exchanges use a routing key) :param bool nowait: if set, the server will not respond to the method and the client should not wait for a reply :param dict arguments: binding arguments, specific to the exchange class """ arguments = {} if arguments is None else arguments args = AMQPWriter() args.write_short(0) args.write_shortstr(dest_exch) args.write_shortstr(source_exch) args.write_shortstr(routing_key) args.write_bit(nowait) args.write_table(arguments) self._send_method(Method(spec.Exchange.Bind, args)) if not nowait: return self.wait(spec.Exchange.BindOk)
@synchronized_connection()
[docs] def exchange_unbind(self, dest_exch, source_exch='', routing_key='', nowait=False, arguments=None): """Unbind an exchange from an exchange * If the unbind fails, the server must raise a connection exception. The server must not attempt to unbind an exchange that does not exist from an exchange. * Blank exchange names mean the default exchange. :param str dest_exch: destination exchange name :param str source_exch: source exchange name :param str routing_key: routing key to unbind :param bool nowait: if set, the server will not respond to the method and the client should not wait for a reply :param dict arguments: binding arguments, specific to the exchange class """ arguments = {} if arguments is None else arguments args = AMQPWriter() args.write_short(0) args.write_shortstr(dest_exch) args.write_shortstr(source_exch) args.write_shortstr(routing_key) args.write_bit(nowait) args.write_table(arguments) self._send_method(Method(spec.Exchange.Unbind, args)) if not nowait: return self.wait(spec.Exchange.UnbindOk)
def _cb_exchange_bind_ok(self, method): """Confirm bind successful The server sends this method to confirm that the bind was successful. """ pass def _cb_exchange_unbind_ok(self, method): """Confirm unbind successful The server sends this method to confirm that the unbind was successful. """ pass @synchronized_connection()
[docs] def queue_bind(self, queue, exchange='', routing_key='', nowait=False, arguments=None): """Bind queue to an exchange This method binds a queue to an exchange. Until a queue is bound it will not receive any messages. In a classic messaging model, store-and-forward queues are bound to a dest exchange and subscription queues are bound to a dest_wild exchange. * The server must allow and ignore duplicate bindings without treating these as an error. * If a bind fails, the server must raise a connection exception. * The server must not allow a durable queue to bind to a transient exchange. If a client attempts this, the server must raise a channel exception. * The server should support at least 4 bindings per queue, and ideally, impose no limit except as defined by available resources. * If the client did not previously declare a queue, and the `queue` is empty, the server must raise a connection exception with reply code 530 (not allowed). * If `queue` does not exist, the server must raise a channel exception with reply code 404 (not found). * If `exchange` does not exist, the server must raise a channel exception with reply code 404 (not found). :param str queue: name of queue to bind; blank refers to the last declared queue for this channel :param str exchange: name of exchange to bind to :param str routing_key: routing key for the binding :param bool nowait: if set, the server will not respond to the method and the client should not wait for a reply :param dict arguments: binding arguments, specific to the exchange class """ arguments = {} if arguments is None else arguments args = AMQPWriter() args.write_short(0) args.write_shortstr(queue) args.write_shortstr(exchange) args.write_shortstr(routing_key) args.write_bit(nowait) args.write_table(arguments) self._send_method(Method(spec.Queue.Bind, args)) if not nowait: return self.wait(spec.Queue.BindOk)
def _cb_queue_bind_ok(self, method): """Confirm bind successful The server sends this method to confirm that the bind was successful. """ pass @synchronized_connection()
[docs] def queue_unbind(self, queue, exchange, routing_key='', nowait=False, arguments=None): """Unbind a queue from an exchange This method unbinds a queue from an exchange. * If a unbind fails, the server MUST raise a connection exception. * The client must not attempt to unbind a queue that does not exist. * The client must not attempt to unbind a queue from an exchange that does not exist. :param str queue: name of queue to unbind, leave blank to refer to the last declared queue on this channel :param str exchange: name of exchange to unbind, leave blank to refer to default exchange :param str routing_key: routing key of binding :param dict arguments: binding arguments, specific to the exchange class """ arguments = {} if arguments is None else arguments args = AMQPWriter() args.write_short(0) args.write_shortstr(queue) args.write_shortstr(exchange) args.write_shortstr(routing_key) # args.write_bit(nowait) args.write_table(arguments) self._send_method(Method(spec.Queue.Unbind, args)) if not nowait: return self.wait(spec.Queue.UnbindOk)
def _cb_queue_unbind_ok(self, method): """Confirm unbind successful This method confirms that the unbind was successful. """ pass @synchronized_connection()
[docs] def queue_declare(self, queue='', passive=False, durable=False, exclusive=False, auto_delete=True, nowait=False, arguments=None): """Declare queue, create if needed This method creates or checks a queue. When creating a new queue the client can specify various properties that control the durability of the queue and its contents, and the level of sharing for the queue. A tuple containing the queue name, message count, and consumer count is returned, which is essential for declaring automatically named queues. * If `passive` is specified, the server state is not modified (a queue will not be declared), and the server only checks if the specified queue exists and returns its properties. If the queue does not exist, the server must raise a 404 NOT FOUND channel exception. * The server must create a default binding for a newly-created queue to the default exchange, which is an exchange of type 'direct'. * Queue names starting with 'amq.' are reserved for use by the server. If an attempt is made to declare a queue with such a name, and the `passive` flag is disabled, the server must raise a 403 ACCESS REFUSED connection exception. * The server must raise a 405 RESOURCE LOCKED channel exception if an attempt is made to access a queue declared as exclusive by another open connection. * The server must ignore the `auto_delete` flag if the queue already exists. RabbitMQ supports the following useful additional arguments: * x-max-length (int): maximum queue size * Queue length is a measure that takes into account ready messages, ignoring unacknowledged messages and message size. Messages will be dropped or dead-lettered from the front of the queue to make room for new messages once the limit is reached. :param str queue: queue name; leave blank to let the server generate a name automatically :param bool passive: do not create queue; client can use this to check whether a queue exists :param bool durable: mark as durable (remain active after server restarts) :param bool exclusive: mark as exclusive (can only be consumed from by this connection); implies `auto_delete` :param bool auto_delete: auto-delete queue when all consumers have finished using it :param bool nowait: if set, the server will not respond to the method and the client should not wait for a reply :param dict arguments: exchange declare arguments :raise NotFound: if `passive` is enabled and the queue does not exist :raise AccessRefused: if an attempt is made to declare a queue with a reserved name :raise ResourceLocked: if an attempt is made to access an exclusive queue declared by another open connection :return: queue_declare_ok_t(queue, message_count, consumer_count), or None if `nowait` :rtype: queue_declare_ok_t or None """ arguments = arguments or {} args = AMQPWriter() args.write_short(0) args.write_shortstr(queue) args.write_bit(passive) args.write_bit(durable) args.write_bit(exclusive) args.write_bit(auto_delete) args.write_bit(nowait) args.write_table(arguments) self._send_method(Method(spec.Queue.Declare, args)) if not nowait: return self.wait(spec.Queue.DeclareOk)
def _cb_queue_declare_ok(self, method): """Confirm a queue declare This method is called when the server responds to a `queue.declare`. :return: queue_declare_ok_t(queue, message_count, consumer_count), or None if `nowait` :rtype: queue_declare_ok_t or None """ args = method.args return queue_declare_ok_t(args.read_shortstr(), args.read_long(), args.read_long()) @synchronized_connection()
[docs] def queue_delete(self, queue='', if_unused=False, if_empty=False, nowait=False): """Delete a queue This method deletes a queue. When a queue is deleted any pending messages are sent to a dead-letter queue if this is defined in the server configuration, and all consumers on the queue are cancelled. :param str queue: name of queue to delete, empty string refers to last declared queue on this channel :param bool if_unused: delete only if unused (has no consumers); raise a channel exception otherwise :param bool if_empty: delete only if empty; raise a channel exception otherwise :param bool nowait: if set, the server will not respond to the method and the client should not wait for a reply :raise NotFound: if `queue` does not exist :raise PreconditionFailed: if `if_unused` or `if_empty` conditions are not met :return: number of messages deleted :rtype: int """ args = AMQPWriter() args.write_short(0) args.write_shortstr(queue) args.write_bit(if_unused) args.write_bit(if_empty) args.write_bit(nowait) self._send_method(Method(spec.Queue.Delete, args)) if not nowait: return self.wait(spec.Queue.DeleteOk)
def _cb_queue_delete_ok(self, method): """Confirm deletion of a queue This method confirms the deletion of a queue. PARAMETERS: message_count: long number of messages purged Reports the number of messages purged. """ args = method.args return args.read_long() @synchronized_connection()
[docs] def queue_purge(self, queue='', nowait=False): """Purge a queue This method removes all messages from a queue. It does not cancel consumers. Purged messages are deleted without any formal "undo" mechanism. * On transacted channels the server MUST not purge messages that have already been sent to a client but not yet acknowledged. * If nowait is False, this method returns a message count. :param str queue: queue name to purge; leave blank to refer to last declared queue for this channel :param bool nowait: if set, the server will not respond to the method and the client should not wait for a reply :return: message count (if nowait is False) :rtype: int or None """ args = AMQPWriter() args.write_short(0) args.write_shortstr(queue) args.write_bit(nowait) self._send_method(Method(spec.Queue.Purge, args)) if not nowait: return self.wait(spec.Queue.PurgeOk)
def _cb_queue_purge_ok(self, method): """Confirms a queue purge This method confirms the purge of a queue. PARAMETERS: message_count: long number of messages purged Reports the number of messages purged. """ args = method.args return args.read_long() @synchronized_connection()
[docs] def basic_ack(self, delivery_tag, multiple=False): """Acknowledge one or more messages This method acknowledges one or more messages delivered via the Deliver or Get-Ok methods. The client can ask to confirm a single message or a set of messages up to and including a specific message. * The delivery tag is valid only within the same channel that the message was received. * Set `delivery_tag` to `0` and `multiple` to `True` to acknowledge all outstanding messages. * If the `delivery_tag` is invalid, the server must raise a channel exception. :param int delivery_tag: server-assigned delivery tag; 0 means "all messages received so far" :param bool multiple: if set, the `delivery_tag` is treated as "all messages up to and including" """ args = AMQPWriter() args.write_longlong(delivery_tag) args.write_bit(multiple) self._send_method(Method(spec.Basic.Ack, args))
@synchronized_connection()
[docs] def basic_cancel(self, consumer_tag, nowait=False): """End a queue consumer This method cancels a consumer. This does not affect already delivered messages, but it does mean the server will not send any more messages for that consumer. The client may receive an arbitrary number of messages in between sending the cancel method and receiving the cancel-ok reply. * If the queue no longer exists when the client sends a cancel command, or the consumer has been cancelled for other reasons, this command has no effect. :param str consumer_tag: consumer tag, valid only within the current connection and channel :param bool nowait: if set, the server will not respond to the method and the client should not wait for a reply """ if self.connection is not None: self.no_ack_consumers.discard(consumer_tag) args = AMQPWriter() args.write_shortstr(consumer_tag) args.write_bit(nowait) self._send_method(Method(spec.Basic.Cancel, args)) return self.wait(spec.Basic.CancelOk)
def _cb_basic_cancel_notify(self, method): """Consumer cancelled by server. Most likely the queue was deleted. """ args = method.args consumer_tag = args.read_shortstr() callback = self._on_cancel(consumer_tag) if callback: callback(consumer_tag) else: raise ConsumerCancelled(consumer_tag, spec.Basic.Cancel) def _cb_basic_cancel_ok(self, method): """Confirm a cancelled consumer This method confirms that the cancellation was completed. PARAMETERS: consumer_tag: shortstr consumer tag Identifier for the consumer, valid within the current connection. RULE: The consumer tag is valid only within the channel from which the consumer was created. I.e. a client MUST NOT create a consumer in one channel and then use it in another. """ args = method.args consumer_tag = args.read_shortstr() self._on_cancel(consumer_tag) def _on_cancel(self, consumer_tag): """ :param consumer_tag: :return: callback, if any :rtype: callable or None """ self.callbacks.pop(consumer_tag, None) return self.cancel_callbacks.pop(consumer_tag, None) @synchronized_connection()
[docs] def basic_consume(self, queue='', consumer_tag='', no_local=False, no_ack=False, exclusive=False, nowait=False, callback=None, arguments=None, on_cancel=None): """Start a queue consumer This method asks the server to start a "consumer", which is a transient request for messages from a specific queue. Consumers last as long as the channel they were created on, or until the client cancels them. * The `consumer_tag` is local to a connection, so two clients can use the same consumer tags. But on the same connection, the `consumer_tag` must be unique, or the server must raise a 530 NOT ALLOWED connection exception. * If `no_ack` is set, the server automatically acknowledges each message on behalf of the client. * If `exclusive` is set, the client asks for this consumer to have exclusive access to the queue. If the server cannot grant exclusive access to the queue because there are other consumers active, it must raise a 403 ACCESS REFUSED channel exception. * `callback` must be a `Callable(message)` which is called for each messaged delivered by the broker. If no callback is specified, messages are quietly discarded; `no_ack` should probably be set to True in that case. :param str queue: name of queue; if None, refers to last declared queue for this channel :param str consumer_tag: consumer tag, local to the connection :param bool no_local: if True: do not deliver own messages :param bool no_ack: server will not expect an ack for each message :param bool exclusive: request exclusive access :param bool nowait: if set, the server will not respond to the method and the client should not wait for a reply :param Callable callback: a callback callable(message) for each delivered message :param dict arguments: AMQP method arguments :param Callable on_cancel: a callback callable :return: consumer tag :rtype: str """ args = AMQPWriter() args.write_short(0) args.write_shortstr(queue) args.write_shortstr(consumer_tag) args.write_bit(no_local) args.write_bit(no_ack) args.write_bit(exclusive) args.write_bit(nowait) args.write_table(arguments or {}) self._send_method(Method(spec.Basic.Consume, args)) if not nowait: consumer_tag = self.wait(spec.Basic.ConsumeOk) self.callbacks[consumer_tag] = callback if on_cancel: self.cancel_callbacks[consumer_tag] = on_cancel if no_ack: self.no_ack_consumers.add(consumer_tag) return consumer_tag
def _cb_basic_consume_ok(self, method): """Confirm a new consumer The server provides the client with a consumer tag, which is used by the client for methods called on the consumer at a later stage. PARAMETERS: consumer_tag: shortstr Holds the consumer tag specified by the client or provided by the server. """ args = method.args return args.read_shortstr() def _cb_basic_deliver(self, method): """Notify the client of a consumer message This method delivers a message to the client, via a consumer. In the asynchronous message delivery model, the client starts a consumer using the Consume method, then the server responds with Deliver methods as and when messages arrive for that consumer. This method can be called in a "classmethod" style static-context and is done so by :meth:`~amqpy.connection.Connection.drain_events()`. RULE: The server SHOULD track the number of times a message has been delivered to clients and when a message is redelivered a certain number of times - e.g. 5 times - without being acknowledged, the server SHOULD consider the message to be unprocessable (possibly causing client applications to abort), and move the message to a dead letter queue. PARAMETERS: consumer_tag: shortstr consumer tag Identifier for the consumer, valid within the current connection. RULE: The consumer tag is valid only within the channel from which the consumer was created. I.e. a client MUST NOT create a consumer in one channel and then use it in another. delivery_tag: longlong server-assigned delivery tag The server-assigned and channel-specific delivery tag RULE: The delivery tag is valid only within the channel from which the message was received I.e. a client MUST NOT receive a message on one channel and then acknowledge it on another. RULE: The server MUST NOT use a zero value for delivery tags Zero is reserved for client use, meaning "all messages so far received". redelivered: boolean message is being redelivered This indicates that the message has been previously delivered to this or another client. exchange: shortstr Specifies the name of the exchange that the message was originally published to. routing_key: shortstr Message routing key Specifies the routing key name specified when the message was published. """ args = method.args msg = method.content consumer_tag = args.read_shortstr() delivery_tag = args.read_longlong() redelivered = args.read_bit() exchange = args.read_shortstr() routing_key = args.read_shortstr() msg.channel = self msg.delivery_info = { 'consumer_tag': consumer_tag, 'delivery_tag': delivery_tag, 'redelivered': redelivered, 'exchange': exchange, 'routing_key': routing_key, } callback = self.callbacks.get(consumer_tag) if callback: callback(msg) else: raise Exception('No callback available for consumer tag: {}'.format(consumer_tag)) @synchronized_connection()
[docs] def basic_get(self, queue='', no_ack=False): """Directly get a message from the `queue` This method is non-blocking. If no messages are available on the queue, `None` is returned. :param str queue: queue name; leave blank to refer to last declared queue for the channel :param bool no_ack: if enabled, the server automatically acknowledges the message :return: message, or None if no messages are available on the queue :rtype: amqpy.message.Message or None """ args = AMQPWriter() args.write_short(0) args.write_shortstr(queue) args.write_bit(no_ack) self._send_method(Method(spec.Basic.Get, args)) return self.wait_any([spec.Basic.GetOk, spec.Basic.GetEmpty])
def _cb_basic_get_empty(self, method): """Indicate no messages available This method tells the client that the queue has no messages available for the client. """ args = method.args args.read_shortstr() def _cb_basic_get_ok(self, method): """Provide client with a message This method delivers a message to the client following a get method. A message delivered by 'get-ok' must be acknowledged unless the no-ack option was set in the get method. PARAMETERS: delivery_tag: longlong server-assigned delivery tag The server-assigned and channel-specific delivery tag RULE: The delivery tag is valid only within the channel from which the message was received I.e. a client MUST NOT receive a message on one channel and then acknowledge it on another. RULE: The server MUST NOT use a zero value for delivery tags Zero is reserved for client use, meaning "all messages so far received". redelivered: boolean message is being redelivered This indicates that the message has been previously delivered to this or another client. exchange: shortstr Specifies the name of the exchange that the message was originally published to. If empty, the message was published to the default exchange. routing_key: shortstr Message routing key Specifies the routing key name specified when the message was published. message_count: long number of messages pending This field reports the number of messages pending on the queue, excluding the message being delivered. Note that this figure is indicative, not reliable, and can change arbitrarily as messages are added to the queue and removed by other clients. """ args = method.args msg = method.content delivery_tag = args.read_longlong() redelivered = args.read_bit() exchange = args.read_shortstr() routing_key = args.read_shortstr() message_count = args.read_long() msg.channel = self msg.delivery_info = { 'delivery_tag': delivery_tag, 'redelivered': redelivered, 'exchange': exchange, 'routing_key': routing_key, 'message_count': message_count } return msg def _basic_publish(self, msg, exchange='', routing_key='', mandatory=False, immediate=False): args = AMQPWriter() args.write_short(0) args.write_shortstr(exchange) args.write_shortstr(routing_key) args.write_bit(mandatory) args.write_bit(immediate) self._send_method(Method(spec.Basic.Publish, args, msg)) @synchronized_connection()
[docs] def basic_publish(self, msg, exchange='', routing_key='', mandatory=False, immediate=False): """Publish a message This method publishes a message to a specific exchange. The message will be routed to queues as defined by the exchange configuration and distributed to any active consumers when the transaction, if any, is committed. If publisher confirms are enabled, this method will automatically wait to receive an "ack" from the server. .. note:: Returned messages are sent back from the server and loaded into the `returned_messages` queue of the channel that sent them. In order to receive all returned messages, call `loop(0)` on the connection object before checking the channel's `returned_messages` queue. :param msg: message :param str exchange: exchange name, empty string means default exchange :param str routing_key: routing key :param bool mandatory: True: deliver to at least one queue, or return it; False: drop the unroutable message :param bool immediate: request immediate delivery :type msg: amqpy.Message """ self._basic_publish(msg, exchange, routing_key, mandatory, immediate) if self.mode == self.CH_MODE_CONFIRM: self.wait(spec.Basic.Ack)
@synchronized_connection()
[docs] def basic_qos(self, prefetch_size=0, prefetch_count=0, a_global=False): """Specify quality of service This method requests a specific quality of service. The QoS can be specified for the current channel or for all channels on the connection. The particular properties and semantics of a qos method always depend on the content class semantics. Though the qos method could in principle apply to both peers, it is currently meaningful only for the server. * The client can request that messages be sent in advance so that when the client finishes processing a message, the following message is already held locally, rather than needing to be sent down the channel. Prefetching gives a performance improvement. This field specifies the prefetch window size in octets. The server will send a message in advance if it is equal to or smaller in size than the available prefetch size (and also falls into other prefetch limits). May be set to zero, meaning "no specific limit", although other prefetch limits may still apply. The prefetch-size is ignored if the no-ack option is set. * The server must ignore `prefetch_size` setting when the client is not processing any messages - i.e. the prefetch size does not limit the transfer of single messages to a client, only the sending in advance of more messages while the client still has one or more unacknowledged messages. * The `prefetch_count` specifies a prefetch window in terms of whole messages. This field may be used in combination with the prefetch-size field; a message will only be sent in advance if both prefetch windows (and those at the channel and connection level) allow it. The prefetch-count is ignored if the no-ack option is set. * The server may send less data in advance than allowed by the client's specified prefetch windows but it must not send more. :param int prefetch_size: prefetch window in octets :param int prefetch_count: prefetch window in messages :param bool a_global: apply to entire connection (default is for current channel only) """ args = AMQPWriter() args.write_long(prefetch_size) args.write_short(prefetch_count) args.write_bit(a_global) self._send_method(Method(spec.Basic.Qos, args)) return self.wait(spec.Basic.QosOk)
def _cb_basic_qos_ok(self, method): """Confirm the requested qos This method tells the client that the requested QoS levels could be handled by the server. The requested QoS applies to all active consumers until a new QoS is defined. """ pass @synchronized_connection()
[docs] def basic_recover(self, requeue=False): """Redeliver unacknowledged messages This method asks the broker to redeliver all unacknowledged messages on a specified channel. Zero or more messages may be redelivered. This method is only allowed on non-transacted channels. * The server MUST set the redelivered flag on all messages that are resent. * The server MUST raise a channel exception if this is called on a transacted channel. :param bool requeue: if set, the server will attempt to requeue the message, potentially then delivering it to a different subscriber """ args = AMQPWriter() args.write_bit(requeue) self._send_method(Method(spec.Basic.Recover, args))
@synchronized_connection()
[docs] def basic_recover_async(self, requeue=False): """Redeliver unacknowledged messages (async) This method asks the broker to redeliver all unacknowledged messages on a specified channel. Zero or more messages may be redelivered. This method is only allowed on non-transacted channels. * The server MUST set the redelivered flag on all messages that are resent. * The server MUST raise a channel exception if this is called on a transacted channel. :param bool requeue: if set, the server will attempt to requeue the message, potentially then delivering it to a different subscriber """ args = AMQPWriter() args.write_bit(requeue) self._send_method(Method(spec.Basic.RecoverAsync, args))
def _cb_basic_recover_ok(self, method): """In 0-9-1 the deprecated recover solicits a response """ pass @synchronized_connection()
[docs] def basic_reject(self, delivery_tag, requeue): """Reject an incoming message This method allows a client to reject a message. It can be used to interrupt and cancel large incoming messages, or return untreatable messages to their original queue. * The server SHOULD be capable of accepting and process the Reject method while sending message content with a Deliver or Get-Ok method I.e. the server should read and process incoming methods while sending output frames. To cancel a partially-send content, the server sends a content body frame of size 1 (i.e. with no data except the frame-end octet). * The server SHOULD interpret this method as meaning that the client is unable to process the message at this time. * A client MUST NOT use this method as a means of selecting messages to process A rejected message MAY be discarded or dead-lettered, not necessarily passed to another client. * The server MUST NOT deliver the message to the same client within the context of the current channel. The recommended strategy is to attempt to deliver the message to an alternative consumer, and if that is not possible, to move the message to a dead-letter queue. The server MAY use more sophisticated tracking to hold the message on the queue and redeliver it to the same client at a later stage. :param int delivery_tag: server-assigned channel-specific delivery tag :param bool requeue: True: requeue the message; False: discard the message """ args = AMQPWriter() args.write_longlong(delivery_tag) args.write_bit(requeue) self._send_method(Method(spec.Basic.Reject, args))
def _cb_basic_return(self, method): """Return a failed message This method returns an undeliverable message that was published with the `immediate` flag set, or an unroutable message published with the `mandatory` flag set. The reply code and text provide information about the reason that the message was undeliverable. """ args = method.args msg = method.content self.returned_messages.put(basic_return_t( args.read_short(), args.read_shortstr(), args.read_shortstr(), args.read_shortstr(), msg, )) @synchronized_connection()
[docs] def tx_commit(self): """Commit the current transaction This method commits all messages published and acknowledged in the current transaction. A new transaction starts immediately after a commit. """ self._send_method(Method(spec.Tx.Commit)) return self.wait(spec.Tx.CommitOk)
def _cb_tx_commit_ok(self, method): """Confirm a successful commit This method confirms to the client that the commit succeeded. Note that if a commit fails, the server raises a channel exception. """ pass @synchronized_connection()
[docs] def tx_rollback(self): """Abandon the current transaction This method abandons all messages published and acknowledged in the current transaction. A new transaction starts immediately after a rollback. """ self._send_method(Method(spec.Tx.Rollback)) return self.wait(spec.Tx.RollbackOk)
def _cb_tx_rollback_ok(self, method): """Confirm a successful rollback This method confirms to the client that the rollback succeeded. Note that if an rollback fails, the server raises a channel exception. """ pass @synchronized_connection()
[docs] def tx_select(self): """Select standard transaction mode This method sets the channel to use standard transactions. The client must use this method at least once on a channel before using the Commit or Rollback methods. The channel must not be in publish acknowledge mode. If it is, the server raises a :exc:`PreconditionFailed` exception and closes the channel. Note that amqpy will automatically reopen the channel, at which point this method can be called again successfully. :raise PreconditionFailed: if the channel is in publish acknowledge mode """ self._send_method(Method(spec.Tx.Select)) #self.wait(spec.Tx.SelectOk) self.wait(spec.Tx.SelectOk) self.mode = self.CH_MODE_TX
def _cb_tx_select_ok(self, method): """Confirm transaction mode This method confirms to the client that the channel was successfully set to use standard transactions. """ pass @synchronized_connection()
[docs] def confirm_select(self, nowait=False): """Enable publisher confirms for this channel (RabbitMQ extension) The channel must not be in transactional mode. If it is, the server raises a :exc:`PreconditionFailed` exception and closes the channel. Note that amqpy will automatically reopen the channel, at which point this method can be called again successfully. :param bool nowait: if set, the server will not respond to the method and the client should not wait for a reply :raise PreconditionFailed: if the channel is in transactional mode """ args = AMQPWriter() args.write_bit(nowait) self._send_method(Method(spec.Confirm.Select, args)) if not nowait: self.wait(spec.Confirm.SelectOk) self.mode = self.CH_MODE_CONFIRM
def _cb_confirm_select_ok(self, method): """With this method, the broker confirms to the client that the channel is now using publisher confirms """ pass def _cb_basic_ack_recv(self, method): """Callback for receiving a `spec.Basic.Ack` This will be called when the server acknowledges a published message (RabbitMQ extension). """ # args = method.args # delivery_tag = args.read_longlong() # multiple = args.read_bit() METHOD_MAP = { spec.Channel.OpenOk: _cb_open_ok, spec.Channel.Flow: _cb_flow, spec.Channel.FlowOk: _cb_flow_ok, spec.Channel.Close: _cb_close, spec.Channel.CloseOk: _cb_close_ok, spec.Exchange.DeclareOk: _cb_exchange_declare_ok, spec.Exchange.DeleteOk: _cb_exchange_delete_ok, spec.Exchange.BindOk: _cb_exchange_bind_ok, spec.Exchange.UnbindOk: _cb_exchange_unbind_ok, spec.Queue.DeclareOk: _cb_queue_declare_ok, spec.Queue.BindOk: _cb_queue_bind_ok, spec.Queue.PurgeOk: _cb_queue_purge_ok, spec.Queue.DeleteOk: _cb_queue_delete_ok, spec.Queue.UnbindOk: _cb_queue_unbind_ok, spec.Basic.QosOk: _cb_basic_qos_ok, spec.Basic.ConsumeOk: _cb_basic_consume_ok, spec.Basic.Cancel: _cb_basic_cancel_notify, spec.Basic.CancelOk: _cb_basic_cancel_ok, spec.Basic.Return: _cb_basic_return, spec.Basic.Deliver: _cb_basic_deliver, spec.Basic.GetOk: _cb_basic_get_ok, spec.Basic.GetEmpty: _cb_basic_get_empty, spec.Basic.Ack: _cb_basic_ack_recv, spec.Basic.RecoverOk: _cb_basic_recover_ok, spec.Confirm.SelectOk: _cb_confirm_select_ok, spec.Tx.SelectOk: _cb_tx_select_ok, spec.Tx.CommitOk: _cb_tx_commit_ok, spec.Tx.RollbackOk: _cb_tx_rollback_ok, }