amqpy.channel module

AMQP Channels

class amqpy.channel.Channel(amqpy.abstract_channel.AbstractChannel)[source]

Bases: amqpy.abstract_channel.AbstractChannel

The channel class provides methods for a client to establish and operate an AMQP channel. All public members are fully thread-safe.

CH_MODE_CONFIRM = 2

Publisher confirm mode (RabbitMQ extension)

CH_MODE_NONE = 0

Default channel mode

CH_MODE_TX = 1

Transaction mode

active = None

Current channel active state (flow control)

Type:bool
is_open = None

Current channel open/closed state

Type:bool
mode = None

Channel mode state (default, transactional, publisher confirm)

Type:int
returned_messages = None

Returned messages that the server was unable to deliver

Type:queue.Queue
__init__(connection, channel_id=None, auto_decode=True)[source]

Create a channel bound to a connection and using the specified numeric channel_id, and open on the server

If auto_decode is enabled (default), incoming Message bodies will be automatically decoded to str if possible.

Parameters:
  • connection (amqpy.connection.Connection) – the channel’s associated Connection
  • channel_id (int or None) – the channel’s assigned channel ID
  • auto_decode (bool) – enable auto decoding of message bodies
basic_ack(delivery_tag, multiple=False) → None[source]

Acknowledge one or more messages

This method acknowledges one or more messages delivered via the Deliver or Get-Ok methods. The client can ask to confirm a single message or a set of messages up to and including a specific message.

  • The delivery tag is valid only within the same channel that the message was received.
  • Set delivery_tag to 0 and multiple to True to acknowledge all outstanding messages.
  • If the delivery_tag is invalid, the server must raise a channel exception.
Parameters:
  • delivery_tag (int) – server-assigned delivery tag; 0 means “all messages received so far”
  • multiple (bool) – if set, the delivery_tag is treated as “all messages up to and including”
basic_cancel(consumer_tag, nowait=False) → None[source]

End a queue consumer

This method cancels a consumer. This does not affect already delivered messages, but it does mean the server will not send any more messages for that consumer. The client may receive an arbitrary number of messages in between sending the cancel method and receiving the cancel-ok reply.

  • If the queue no longer exists when the client sends a cancel command, or the consumer has been cancelled for other reasons, this command has no effect.
Parameters:
  • consumer_tag (str) – consumer tag, valid only within the current connection and channel
  • nowait (bool) – if set, the server will not respond to the method and the client should not wait for a reply
basic_consume(queue='', consumer_tag='', no_local=False, no_ack=False, exclusive=False, nowait=False, callback=None, arguments=None, on_cancel=None) → str[source]

Start a queue consumer

This method asks the server to start a “consumer”, which is a transient request for messages from a specific queue. Consumers last as long as the channel they were created on, or until the client cancels them.

  • The consumer_tag is local to a connection, so two clients can use the same consumer tags. But on the same connection, the consumer_tag must be unique, or the server must raise a 530 NOT ALLOWED connection exception.
  • If no_ack is set, the server automatically acknowledges each message on behalf of the client.
  • If exclusive is set, the client asks for this consumer to have exclusive access to the queue. If the server cannot grant exclusive access to the queue because there are other consumers active, it must raise a 403 ACCESS REFUSED channel exception.
  • callback must be a Callable(message) which is called for each messaged delivered by the broker. If no callback is specified, messages are quietly discarded; no_ack should probably be set to True in that case.
Parameters:
  • queue (str) – name of queue; if None, refers to last declared queue for this channel
  • consumer_tag (str) – consumer tag, local to the connection
  • no_local (bool) – if True: do not deliver own messages
  • no_ack (bool) – server will not expect an ack for each message
  • exclusive (bool) – request exclusive access
  • nowait (bool) – if set, the server will not respond to the method and the client should not wait for a reply
  • callback (Callable) – a callback callable(message) for each delivered message
  • arguments (dict) – AMQP method arguments
  • on_cancel (Callable) – a callback callable
Returns:

consumer tag

Return type:

str

basic_get(queue='', no_ack=False) → amqpy.message.Message or None[source]

Directly get a message from the queue

This method is non-blocking. If no messages are available on the queue, None is returned.

Parameters:
  • queue (str) – queue name; leave blank to refer to last declared queue for the channel
  • no_ack (bool) – if enabled, the server automatically acknowledges the message
Returns:

message, or None if no messages are available on the queue

Return type:

amqpy.message.Message or None

basic_publish(msg, exchange='', routing_key='', mandatory=False, immediate=False) → None[source]

Publish a message

This method publishes a message to a specific exchange. The message will be routed to queues as defined by the exchange configuration and distributed to any active consumers when the transaction, if any, is committed.

If publisher confirms are enabled, this method will automatically wait to receive an “ack” from the server.

Note

Returned messages are sent back from the server and loaded into the returned_messages queue of the channel that sent them. In order to receive all returned messages, call loop(0) on the connection object before checking the channel’s returned_messages queue.

Parameters:
  • msg (amqpy.Message) – message
  • exchange (str) – exchange name, empty string means default exchange
  • routing_key (str) – routing key
  • mandatory (bool) – True: deliver to at least one queue, or return it; False: drop the unroutable message
  • immediate (bool) – request immediate delivery
basic_qos(prefetch_size=0, prefetch_count=0, a_global=False) → None[source]

Specify quality of service

This method requests a specific quality of service. The QoS can be specified for the current channel or for all channels on the connection. The particular properties and semantics of a qos method always depend on the content class semantics. Though the qos method could in principle apply to both peers, it is currently meaningful only for the server.

  • The client can request that messages be sent in advance so that when the client finishes processing a message, the following message is already held locally, rather than needing to be sent down the channel. Prefetching gives a performance improvement. This field specifies the prefetch window size in octets. The server will send a message in advance if it is equal to or smaller in size than the available prefetch size (and also falls into other prefetch limits). May be set to zero, meaning “no specific limit”, although other prefetch limits may still apply. The prefetch-size is ignored if the no-ack option is set.
  • The server must ignore prefetch_size setting when the client is not processing any messages - i.e. the prefetch size does not limit the transfer of single messages to a client, only the sending in advance of more messages while the client still has one or more unacknowledged messages.
  • The prefetch_count specifies a prefetch window in terms of whole messages. This field may be used in combination with the prefetch-size field; a message will only be sent in advance if both prefetch windows (and those at the channel and connection level) allow it. The prefetch-count is ignored if the no-ack option is set.
  • The server may send less data in advance than allowed by the client’s specified prefetch windows but it must not send more.
Parameters:
  • prefetch_size (int) – prefetch window in octets
  • prefetch_count (int) – prefetch window in messages
  • a_global (bool) – apply to entire connection (default is for current channel only)
basic_recover(requeue=False) → None[source]

Redeliver unacknowledged messages

This method asks the broker to redeliver all unacknowledged messages on a specified channel. Zero or more messages may be redelivered. This method is only allowed on non-transacted channels.

  • The server MUST set the redelivered flag on all messages that are resent.
  • The server MUST raise a channel exception if this is called on a transacted channel.
Parameters:requeue (bool) – if set, the server will attempt to requeue the message, potentially then delivering it to a different subscriber
basic_recover_async(requeue=False) → None[source]

Redeliver unacknowledged messages (async)

This method asks the broker to redeliver all unacknowledged messages on a specified channel. Zero or more messages may be redelivered. This method is only allowed on non-transacted channels.

  • The server MUST set the redelivered flag on all messages that are resent.
  • The server MUST raise a channel exception if this is called on a transacted channel.
Parameters:requeue (bool) – if set, the server will attempt to requeue the message, potentially then delivering it to a different subscriber
basic_reject(delivery_tag, requeue) → None[source]

Reject an incoming message

This method allows a client to reject a message. It can be used to interrupt and cancel large incoming messages, or return untreatable messages to their original queue.

  • The server SHOULD be capable of accepting and process the Reject method while sending message content with a Deliver or Get-Ok method I.e. the server should read and process incoming methods while sending output frames. To cancel a partially-send content, the server sends a content body frame of size 1 (i.e. with no data except the frame-end octet).
  • The server SHOULD interpret this method as meaning that the client is unable to process the message at this time.
  • A client MUST NOT use this method as a means of selecting messages to process A rejected message MAY be discarded or dead-lettered, not necessarily passed to another client.
  • The server MUST NOT deliver the message to the same client within the context of the current channel. The recommended strategy is to attempt to deliver the message to an alternative consumer, and if that is not possible, to move the message to a dead-letter queue. The server MAY use more sophisticated tracking to hold the message on the queue and redeliver it to the same client at a later stage.
Parameters:
  • delivery_tag (int) – server-assigned channel-specific delivery tag
  • requeue (bool) – True: requeue the message; False: discard the message
close(reply_code=0, reply_text='', method_type=method_t(class_id=0, method_id=0)) → None[source]

Request a channel close

This method indicates that the sender wants to close the channel. This may be due to internal conditions (e.g. a forced shut-down) or due to an error handling a specific method, i.e. an exception When a close is due to an exception, the sender provides the class and method id of the method which caused the exception.

Parameters:
  • reply_code (int) – the reply code
  • reply_text (str) – localized reply text
  • method_type (amqpy.spec.method_t) – if close is triggered by a failing method, this is the method that caused it
confirm_select(nowait=False) → None[source]

Enable publisher confirms for this channel (RabbitMQ extension)

The channel must not be in transactional mode. If it is, the server raises a PreconditionFailed exception and closes the channel. Note that amqpy will automatically reopen the channel, at which point this method can be called again successfully.

Parameters:nowait (bool) – if set, the server will not respond to the method and the client should not wait for a reply
Raises:PreconditionFailed – if the channel is in transactional mode
exchange_bind(dest_exch, source_exch='', routing_key='', nowait=False, arguments=None) → None[source]

Bind an exchange to an exchange

  • Both the dest_exch and source_exch must already exist. Blank exchange names mean the default exchange.
  • A server MUST allow and ignore duplicate bindings - that is, two or more bind methods for a specific exchanges, with identical arguments - without treating these as an error.
  • A server MUST allow cycles of exchange bindings to be created including allowing an exchange to be bound to itself.
  • A server MUST not deliver the same message more than once to a destination exchange, even if the topology of exchanges and bindings results in multiple (even infinite) routes to that exchange.
Parameters:
  • dest_exch (str) – name of destination exchange to bind
  • source_exch (str) – name of source exchange to bind
  • routing_key (str) – routing key for the binding (note: not all exchanges use a routing key)
  • nowait (bool) – if set, the server will not respond to the method and the client should not wait for a reply
  • arguments (dict) – binding arguments, specific to the exchange class
exchange_declare(exchange, exch_type, passive=False, durable=False, auto_delete=True, nowait=False, arguments=None) → None[source]

Declare exchange, create if needed

  • Exchanges cannot be redeclared with different types. The client MUST not attempt to redeclare an existing exchange with a different type than used in the original Exchange.Declare method.
  • This method creates an exchange if it does not already exist, and if the exchange exists, verifies that it is of the correct and expected class.
  • The server must ignore the durable field if the exchange already exists.
  • The server must ignore the auto_delete field if the exchange already exists.
  • If nowait is enabled and the server could not complete the method, it will raise a channel or connection exception.
  • arguments is ignored if passive is True.
Parameters:
  • exchange (str) – exchange name
  • exch_type (str) – exchange type (direct, fanout, etc.)
  • passive (bool) – do not create exchange; client can use this to check whether an exchange exists
  • durable (bool) – mark exchange as durable (remain active after server restarts)
  • auto_delete (bool) – auto-delete exchange when all queues have finished using it
  • nowait (bool) – if set, the server will not respond to the method and the client should not wait for a reply
  • arguments (dict) – exchange declare arguments
Raises:
  • AccessRefused – if attempting to declare an exchange with a reserved name (amq.*)
  • NotFound – if passive is enabled and the exchange does not exist
Returns:

None

exchange_delete(exchange, if_unused=False, nowait=False) → None[source]

Delete an exchange

This method deletes an exchange.

  • If the exchange does not exist, the server must raise a channel exception. When an exchange is deleted, all queue bindings on the exchange are cancelled.
  • If if_unused is set, and the exchange has queue bindings, the server must raise a channel exception.
Parameters:
  • exchange (str) – exchange name
  • if_unused (bool) – delete only if unused (has no queue bindings)
  • nowait (bool) – if set, the server will not respond to the method and the client should not wait for a reply
Raises:
  • NotFound – if exchange with exchange does not exist
  • PreconditionFailed – if attempting to delete a queue with bindings and if_unused is set
Returns:

None

exchange_unbind(dest_exch, source_exch='', routing_key='', nowait=False, arguments=None) → None[source]

Unbind an exchange from an exchange

  • If the unbind fails, the server must raise a connection exception. The server must not

    attempt to unbind an exchange that does not exist from an exchange.

  • Blank exchange names mean the default exchange.

Parameters:
  • dest_exch (str) – destination exchange name
  • source_exch (str) – source exchange name
  • routing_key (str) – routing key to unbind
  • nowait (bool) – if set, the server will not respond to the method and the client should not wait for a reply
  • arguments (dict) – binding arguments, specific to the exchange class
flow(active) → None[source]

Enable/disable flow from peer

This method asks the peer to pause or restart the flow of content data. This is a simple flow-control mechanism that a peer can use to avoid overflowing its queues or otherwise finding itself receiving more messages than it can process. Note that this method is not intended for window control The peer that receives a request to stop sending content should finish sending the current content, if any, and then wait until it receives a Flow restart method.

Parameters:active (bool) – True: peer starts sending content frames; False: peer stops sending content frames
queue_bind(queue, exchange='', routing_key='', nowait=False, arguments=None) → None[source]

Bind queue to an exchange

This method binds a queue to an exchange. Until a queue is bound it will not receive any messages. In a classic messaging model, store-and-forward queues are bound to a dest exchange and subscription queues are bound to a dest_wild exchange.

  • The server must allow and ignore duplicate bindings without treating these as an error.
  • If a bind fails, the server must raise a connection exception.
  • The server must not allow a durable queue to bind to a transient exchange. If a client attempts this, the server must raise a channel exception.
  • The server should support at least 4 bindings per queue, and ideally, impose no limit except as defined by available resources.
  • If the client did not previously declare a queue, and the queue is empty, the server must raise a connection exception with reply code 530 (not allowed).
  • If queue does not exist, the server must raise a channel exception with reply code 404 (not found).
  • If exchange does not exist, the server must raise a channel exception with reply code 404 (not found).
Parameters:
  • queue (str) – name of queue to bind; blank refers to the last declared queue for this channel
  • exchange (str) – name of exchange to bind to
  • routing_key (str) – routing key for the binding
  • nowait (bool) – if set, the server will not respond to the method and the client should not wait for a reply
  • arguments (dict) – binding arguments, specific to the exchange class
queue_declare(queue='', passive=False, durable=False, exclusive=False, auto_delete=True, nowait=False, arguments=None) → queue_declare_ok_t or None[source]

Declare queue, create if needed

This method creates or checks a queue. When creating a new queue the client can specify various properties that control the durability of the queue and its contents, and the level of sharing for the queue. A tuple containing the queue name, message count, and consumer count is returned, which is essential for declaring automatically named queues.

  • If passive is specified, the server state is not modified (a queue will not be declared), and the server only checks if the specified queue exists and returns its properties. If the queue does not exist, the server must raise a 404 NOT FOUND channel exception.
  • The server must create a default binding for a newly-created queue to the default exchange, which is an exchange of type ‘direct’.
  • Queue names starting with ‘amq.’ are reserved for use by the server. If an attempt is made to declare a queue with such a name, and the passive flag is disabled, the server must raise a 403 ACCESS REFUSED connection exception.
  • The server must raise a 405 RESOURCE LOCKED channel exception if an attempt is made to access a queue declared as exclusive by another open connection.
  • The server must ignore the auto_delete flag if the queue already exists.

RabbitMQ supports the following useful additional arguments:

  • x-max-length (int): maximum queue size
    • Queue length is a measure that takes into account ready messages, ignoring unacknowledged messages and message size. Messages will be dropped or dead-lettered from the front of the queue to make room for new messages once the limit is reached.
Parameters:
  • queue (str) – queue name; leave blank to let the server generate a name automatically
  • passive (bool) – do not create queue; client can use this to check whether a queue exists
  • durable (bool) – mark as durable (remain active after server restarts)
  • exclusive (bool) – mark as exclusive (can only be consumed from by this connection); implies auto_delete
  • auto_delete (bool) – auto-delete queue when all consumers have finished using it
  • nowait (bool) – if set, the server will not respond to the method and the client should not wait for a reply
  • arguments (dict) – exchange declare arguments
Raises:
  • NotFound – if passive is enabled and the queue does not exist
  • AccessRefused – if an attempt is made to declare a queue with a reserved name
  • ResourceLocked – if an attempt is made to access an exclusive queue declared by another open connection
Returns:

queue_declare_ok_t(queue, message_count, consumer_count), or None if nowait

Return type:

queue_declare_ok_t or None

queue_delete(queue='', if_unused=False, if_empty=False, nowait=False) → int[source]

Delete a queue

This method deletes a queue. When a queue is deleted any pending messages are sent to a dead-letter queue if this is defined in the server configuration, and all consumers on the queue are cancelled.

Parameters:
  • queue (str) – name of queue to delete, empty string refers to last declared queue on this channel
  • if_unused (bool) – delete only if unused (has no consumers); raise a channel exception otherwise
  • if_empty (bool) – delete only if empty; raise a channel exception otherwise
  • nowait (bool) – if set, the server will not respond to the method and the client should not wait for a reply
Raises:
  • NotFound – if queue does not exist
  • PreconditionFailed – if if_unused or if_empty conditions are not met
Returns:

number of messages deleted

Return type:

int

queue_purge(queue='', nowait=False) → int or None[source]

Purge a queue

This method removes all messages from a queue. It does not cancel consumers. Purged messages are deleted without any formal “undo” mechanism.

  • On transacted channels the server MUST not purge messages that have already been sent to a client but not yet acknowledged.
  • If nowait is False, this method returns a message count.
Parameters:
  • queue (str) – queue name to purge; leave blank to refer to last declared queue for this channel
  • nowait (bool) – if set, the server will not respond to the method and the client should not wait for a reply
Returns:

message count (if nowait is False)

Return type:

int or None

queue_unbind(queue, exchange, routing_key='', nowait=False, arguments=None) → None[source]

Unbind a queue from an exchange

This method unbinds a queue from an exchange.

  • If a unbind fails, the server MUST raise a connection exception.
  • The client must not attempt to unbind a queue that does not exist.
  • The client must not attempt to unbind a queue from an exchange that does not exist.
Parameters:
  • queue (str) – name of queue to unbind, leave blank to refer to the last declared queue on this channel
  • exchange (str) – name of exchange to unbind, leave blank to refer to default exchange
  • routing_key (str) – routing key of binding
  • arguments (dict) – binding arguments, specific to the exchange class
tx_commit() → None[source]

Commit the current transaction

This method commits all messages published and acknowledged in the current transaction. A new transaction starts immediately after a commit.

tx_rollback() → None[source]

Abandon the current transaction

This method abandons all messages published and acknowledged in the current transaction. A new transaction starts immediately after a rollback.

tx_select() → None[source]

Select standard transaction mode

This method sets the channel to use standard transactions. The client must use this method at least once on a channel before using the Commit or Rollback methods.

The channel must not be in publish acknowledge mode. If it is, the server raises a PreconditionFailed exception and closes the channel. Note that amqpy will automatically reopen the channel, at which point this method can be called again successfully.

Raises:PreconditionFailed – if the channel is in publish acknowledge mode