API documentation:
AMQP Connections
amqpy.connection.
Connection
(amqpy.abstract_channel.AbstractChannel)[source]¶Bases: amqpy.abstract_channel.AbstractChannel
The connection class provides methods for a client to establish a network connection to a server, and for both peers to operate the connection thereafter
connected
¶@property
Check if connection is connected
Returns: | True if connected, else False |
---|---|
Return type: | bool |
server_capabilities
¶@property
Get server capabilities
These properties are set only after successfully connecting.
Returns: | server capabilities |
---|---|
Return type: | dict |
sock
¶@property
Access underlying TCP socket
Returns: | socket |
---|---|
Return type: | socket.socket |
channels
= None¶Map of {channel_id: Channel} for all active channels
Type: | dict[int, Channel] |
---|
transport
= None¶Type: | amqpy.transport.Transport |
---|
__init__
(host='localhost', port=5672, ssl=None, connect_timeout=None, userid='guest', password='guest', login_method='AMQPLAIN', virtual_host='/', locale='en_US', channel_max=65535, frame_max=131072, heartbeat=0, client_properties=None, on_blocked=None, on_unblocked=None)[source]¶Create a connection to the specified host
If you are using SSL, make sure the correct port number is specified (usually 5671), as the default of 5672 is for non-SSL connections.
Parameters: |
|
---|
channel
(channel_id=None) → amqpy.channel.Channel[source]¶Create a new channel, or fetch the channel associated with channel_id if specified
Parameters: | channel_id (int or None) – channel ID number |
---|---|
Returns: | Channel |
Return type: | amqpy.channel.Channel |
close
(reply_code=0, reply_text='', method_type=method_t(class_id=0, method_id=0)) → None[source]¶Close connection to the server
This method performs a connection close handshake with the server, then closes the underlying connection.
If this connection close is due to a client error, the client may provide a reply_code, reply_text, and method_type to indicate to the server the reason for closing the connection.
Parameters: |
|
---|
connect
() → None[source]¶Connect using saved connection parameters
This method does not need to be called explicitly; it is called by the constructor during initialization.
Note: reconnecting invalidates all declarations (channels, queues, consumers, delivery tags, etc.).
drain_events
(timeout=None) → None[source]¶Wait for an event on all channels
This method should be called after creating consumers in order to receive delivered messages and execute consumer callbacks.
Parameters: | timeout (float or None) – maximum allowed time to wait for an event |
---|---|
Raises: | amqpy.exceptions.Timeout – if the operation times out |
is_alive
() → bool[source]¶Check if connection is alive
This method is the primary way to check if the connection is alive.
Side effects: This method may send a heartbeat as a last resort to check if the connection is alive.
Returns: | True if connection is alive, else False |
---|---|
Return type: | bool |
loop
(timeout=None) → None[source]¶Call drain_events()
continuously
Parameters: | timeout (float or None) – maximum allowed time to wait for an event |
---|
AMQP Channels
amqpy.channel.
Channel
(amqpy.abstract_channel.AbstractChannel)[source]¶Bases: amqpy.abstract_channel.AbstractChannel
The channel class provides methods for a client to establish and operate an AMQP channel. All public members are fully thread-safe.
CH_MODE_CONFIRM
= 2¶Publisher confirm mode (RabbitMQ extension)
CH_MODE_NONE
= 0¶Default channel mode
CH_MODE_TX
= 1¶Transaction mode
active
= None¶Current channel active state (flow control)
Type: | bool |
---|
is_open
= None¶Current channel open/closed state
Type: | bool |
---|
mode
= None¶Channel mode state (default, transactional, publisher confirm)
Type: | int |
---|
returned_messages
= None¶Returned messages that the server was unable to deliver
Type: | queue.Queue |
---|
__init__
(connection, channel_id=None, auto_decode=True)[source]¶Create a channel bound to a connection and using the specified numeric channel_id, and open on the server
If auto_decode is enabled (default), incoming Message bodies will be automatically decoded to str if possible.
Parameters: |
|
---|
basic_ack
(delivery_tag, multiple=False) → None[source]¶Acknowledge one or more messages
This method acknowledges one or more messages delivered via the Deliver or Get-Ok methods. The client can ask to confirm a single message or a set of messages up to and including a specific message.
Parameters: |
---|
basic_cancel
(consumer_tag, nowait=False) → None[source]¶End a queue consumer
This method cancels a consumer. This does not affect already delivered messages, but it does mean the server will not send any more messages for that consumer. The client may receive an arbitrary number of messages in between sending the cancel method and receiving the cancel-ok reply.
Parameters: |
---|
basic_consume
(queue='', consumer_tag='', no_local=False, no_ack=False, exclusive=False, nowait=False, callback=None, arguments=None, on_cancel=None) → str[source]¶Start a queue consumer
This method asks the server to start a “consumer”, which is a transient request for messages from a specific queue. Consumers last as long as the channel they were created on, or until the client cancels them.
Parameters: |
|
---|---|
Returns: | consumer tag |
Return type: |
basic_get
(queue='', no_ack=False) → amqpy.message.Message or None[source]¶Directly get a message from the queue
This method is non-blocking. If no messages are available on the queue, None is returned.
Parameters: | |
---|---|
Returns: | message, or None if no messages are available on the queue |
Return type: | amqpy.message.Message or None |
basic_publish
(msg, exchange='', routing_key='', mandatory=False, immediate=False) → None[source]¶Publish a message
This method publishes a message to a specific exchange. The message will be routed to queues as defined by the exchange configuration and distributed to any active consumers when the transaction, if any, is committed.
If publisher confirms are enabled, this method will automatically wait to receive an “ack” from the server.
Note
Returned messages are sent back from the server and loaded into the returned_messages queue of the channel that sent them. In order to receive all returned messages, call loop(0) on the connection object before checking the channel’s returned_messages queue.
Parameters: |
---|
basic_qos
(prefetch_size=0, prefetch_count=0, a_global=False) → None[source]¶Specify quality of service
This method requests a specific quality of service. The QoS can be specified for the current channel or for all channels on the connection. The particular properties and semantics of a qos method always depend on the content class semantics. Though the qos method could in principle apply to both peers, it is currently meaningful only for the server.
Parameters: |
---|
basic_recover
(requeue=False) → None[source]¶Redeliver unacknowledged messages
This method asks the broker to redeliver all unacknowledged messages on a specified channel. Zero or more messages may be redelivered. This method is only allowed on non-transacted channels.
Parameters: | requeue (bool) – if set, the server will attempt to requeue the message, potentially then delivering it to a different subscriber |
---|
basic_recover_async
(requeue=False) → None[source]¶Redeliver unacknowledged messages (async)
This method asks the broker to redeliver all unacknowledged messages on a specified channel. Zero or more messages may be redelivered. This method is only allowed on non-transacted channels.
Parameters: | requeue (bool) – if set, the server will attempt to requeue the message, potentially then delivering it to a different subscriber |
---|
basic_reject
(delivery_tag, requeue) → None[source]¶Reject an incoming message
This method allows a client to reject a message. It can be used to interrupt and cancel large incoming messages, or return untreatable messages to their original queue.
Parameters: |
---|
close
(reply_code=0, reply_text='', method_type=method_t(class_id=0, method_id=0)) → None[source]¶Request a channel close
This method indicates that the sender wants to close the channel. This may be due to internal conditions (e.g. a forced shut-down) or due to an error handling a specific method, i.e. an exception When a close is due to an exception, the sender provides the class and method id of the method which caused the exception.
Parameters: |
|
---|
confirm_select
(nowait=False) → None[source]¶Enable publisher confirms for this channel (RabbitMQ extension)
The channel must not be in transactional mode. If it is, the server raises a
PreconditionFailed
exception and closes the channel. Note that amqpy will
automatically reopen the channel, at which point this method can be called again
successfully.
Parameters: | nowait (bool) – if set, the server will not respond to the method and the client should not wait for a reply |
---|---|
Raises: | PreconditionFailed – if the channel is in transactional mode |
exchange_bind
(dest_exch, source_exch='', routing_key='', nowait=False, arguments=None) → None[source]¶Bind an exchange to an exchange
Parameters: |
|
---|
exchange_declare
(exchange, exch_type, passive=False, durable=False, auto_delete=True, nowait=False, arguments=None) → None[source]¶Declare exchange, create if needed
Parameters: |
|
---|---|
Raises: |
|
Returns: | None |
exchange_delete
(exchange, if_unused=False, nowait=False) → None[source]¶Delete an exchange
This method deletes an exchange.
Parameters: | |
---|---|
Raises: |
|
Returns: | None |
exchange_unbind
(dest_exch, source_exch='', routing_key='', nowait=False, arguments=None) → None[source]¶Unbind an exchange from an exchange
attempt to unbind an exchange that does not exist from an exchange.
Blank exchange names mean the default exchange.
Parameters: |
|
---|
flow
(active) → None[source]¶Enable/disable flow from peer
This method asks the peer to pause or restart the flow of content data. This is a simple flow-control mechanism that a peer can use to avoid overflowing its queues or otherwise finding itself receiving more messages than it can process. Note that this method is not intended for window control The peer that receives a request to stop sending content should finish sending the current content, if any, and then wait until it receives a Flow restart method.
Parameters: | active (bool) – True: peer starts sending content frames; False: peer stops sending content frames |
---|
queue_bind
(queue, exchange='', routing_key='', nowait=False, arguments=None) → None[source]¶Bind queue to an exchange
This method binds a queue to an exchange. Until a queue is bound it will not receive any messages. In a classic messaging model, store-and-forward queues are bound to a dest exchange and subscription queues are bound to a dest_wild exchange.
Parameters: |
|
---|
queue_declare
(queue='', passive=False, durable=False, exclusive=False, auto_delete=True, nowait=False, arguments=None) → queue_declare_ok_t or None[source]¶Declare queue, create if needed
This method creates or checks a queue. When creating a new queue the client can specify various properties that control the durability of the queue and its contents, and the level of sharing for the queue. A tuple containing the queue name, message count, and consumer count is returned, which is essential for declaring automatically named queues.
RabbitMQ supports the following useful additional arguments:
Parameters: |
|
---|---|
Raises: |
|
Returns: | queue_declare_ok_t(queue, message_count, consumer_count), or None if nowait |
Return type: | queue_declare_ok_t or None |
queue_delete
(queue='', if_unused=False, if_empty=False, nowait=False) → int[source]¶Delete a queue
This method deletes a queue. When a queue is deleted any pending messages are sent to a dead-letter queue if this is defined in the server configuration, and all consumers on the queue are cancelled.
Parameters: |
|
---|---|
Raises: |
|
Returns: | number of messages deleted |
Return type: |
queue_purge
(queue='', nowait=False) → int or None[source]¶Purge a queue
This method removes all messages from a queue. It does not cancel consumers. Purged messages are deleted without any formal “undo” mechanism.
Parameters: | |
---|---|
Returns: | message count (if nowait is False) |
Return type: | int or None |
queue_unbind
(queue, exchange, routing_key='', nowait=False, arguments=None) → None[source]¶Unbind a queue from an exchange
This method unbinds a queue from an exchange.
Parameters: |
|
---|
tx_commit
() → None[source]¶Commit the current transaction
This method commits all messages published and acknowledged in the current transaction. A new transaction starts immediately after a commit.
tx_rollback
() → None[source]¶Abandon the current transaction
This method abandons all messages published and acknowledged in the current transaction. A new transaction starts immediately after a rollback.
tx_select
() → None[source]¶Select standard transaction mode
This method sets the channel to use standard transactions. The client must use this method at least once on a channel before using the Commit or Rollback methods.
The channel must not be in publish acknowledge mode. If it is, the server raises a
PreconditionFailed
exception and closes the channel. Note that amqpy will
automatically reopen the channel, at which point this method can be called again
successfully.
Raises: | PreconditionFailed – if the channel is in publish acknowledge mode |
---|
Messages for AMQP
amqpy.message.
Message
(amqpy.message.GenericContent)[source]¶Bases: amqpy.message.GenericContent
A Message for use with the Channel.basic_* methods
body
¶Message body (bytes or str or unicode)
channel
¶Associated channel, set after receiving a message (amqpy.channel.Channel)
delivery_info
¶Delivery info, set after receiving a message (dict)
__init__
(body='', channel=None, **properties)[source]¶If body is a str, then content_encoding will automatically be set to ‘UTF-8’, unless explicitly specified.
Example:
msg = Message('hello world', content_type='text/plain', application_headers={'foo': 7})
Parameters: |
|
---|
amqpy.spec.
FRAME_MIN_SIZE
= 4096¶The default, minimum frame size that both the client and server must be able to handle
amqpy.spec.
FrameType
[source]¶Bases: object
This class contains frame-related constants
METHOD, HEADER, BODY, and HEARTBEAT are all frame type constants which make up the first byte of every frame. The END constant is the termination value which is the last byte of every frame.
High-level representations of AMQP protocol objects
amqpy.proto.
Frame
[source]¶Bases: object
AMQP frame
A Frame represents the lowest-level packet of data specified by the AMQP 0.9.1 wire-level protocol. All methods and messages are packed into one or more frames before being sent to the peer.
The format of the AMQP frame is as follows:
offset: 0 1 3 7 size+7 size+8
+------+---------+---------+-------------------+-----------+
| type | channel | size | --- payload --- | frame-end |
+------+---------+---------+-------------------+-----------+
size (bytes) 1 2 4 size 1
data
¶raw frame data; can be manually manipulated at any time
Type: | bytearray |
---|
amqpy.proto.
Method
[source]¶Bases: object
AMQP method
The AMQP 0.9.1 protocol specifies communication as sending and receiving “methods”. Methods consist of a “class-id” and “method-id” and are represented by a method_t namedtuple in amqpy. Methods are packed into the payload of a FrameType.METHOD frame, and most methods can be fully sent in a single frame. If the method specified to be carrying content (such as a message), the method frame is followed by additional frames: a FrameType.HEADER frame, then zero or more FrameType.BODY frames.
The format of the FrameType.METHOD frame’s payload is as follows:
offset: 0 2 4
+----------+-----------+-------------- - -
| class-id | method-id | arguments...
+----------+-----------+-------------- - -
size (bytes): 2 2 variable
The format of the FrameType.HEADER frame’s payload is as follows:
offset: 0 2 4 12 14
+----------+--------+-----------+----------------+------------------- - -
| class-id | weight | body size | property flags | property list...
+----------+--------+-----------+----------------+------------------- - -
size (bytes): 2 2 8 2 variable
The format of the FrameType.BODY frame’s payload is simply raw binary data of the message body.
channel_id
¶Type: | int |
---|
complete
¶@property
Check if the message that is carried by this method has been completely assembled, i.e. the expected number of bytes have been loaded
This method is intended to be called when constructing a Method from incoming data.
Returns: | True if method is complete, else False |
---|---|
Return type: | bool |
content
¶Type: | Message or None |
---|
method_type
¶Type: | amqpy.spec.method_t |
---|
__init__
(method_type=None, args=None, content=None, channel_id=None)[source]¶Parameters: |
|
---|
dump_body_frame
(chunk_size) → generator[amqpy.proto.Frame][source]¶Create a body frame
This method is intended to be called when sending frames for an already-completed Method.
Parameters: | chunk_size (int) – body chunk size in bytes; this is typically the maximum frame size - 8 |
---|---|
Returns: | generator of FrameType.BODY frames |
Return type: | generator[amqpy.proto.Frame] |
dump_header_frame
() → amqpy.proto.Frame[source]¶Create a header frame
This method is intended to be called when sending frames for an already-completed Method.
Returns: | FrameType.HEADER frame |
---|---|
Return type: | amqpy.proto.Frame |
dump_method_frame
() → amqpy.proto.Frame[source]¶Create a method frame
This method is intended to be called when sending frames for an already-completed Method.
Returns: | FrameType.METHOD frame |
---|---|
Return type: | amqpy.proto.Frame |
load_body_frame
(frame) → None[source]¶Add content to partial method
This method is intended to be called when constructing a Method from incoming data.
Parameters: | frame (amqpy.proto.Frame) – FrameType.BODY frame |
---|
load_header_frame
(frame) → None[source]¶Add header to partial method
This method is intended to be called when constructing a Method from incoming data.
Parameters: | frame (amqpy.proto.Frame) – FrameType.HEADER frame |
---|
load_method_frame
(frame) → None[source]¶Load method frame payload data
This method is intended to be called when constructing a Method from incoming data.
After calling, self.method_type, self.args, and self.channel_id will be loaded with data from the frame.
Parameters: | frame (amqpy.proto.Frame) – FrameType.METHOD frame |
---|
AMQP uses exceptions to handle errors:
According to the AMQP specification, an exception closes the associated channel or connection, and returns a reply code and reply text to the client. However, amqpy will automatically re-open the channel after a channel error.
amqpy.exceptions.
Timeout
[source]¶Bases: TimeoutError
General AMQP operation timeout
amqpy.exceptions.
ContentTooLarge
(reply_text=None, method_type=None, method_name=None, reply_code=None, channel_id=None)[source]¶Bases: amqpy.exceptions.RecoverableChannelError
The client attempted to transfer content larger than the server could accept at the present time. The client may retry at a later time.
amqpy.exceptions.
NoConsumers
(reply_text=None, method_type=None, method_name=None, reply_code=None, channel_id=None)[source]¶Bases: amqpy.exceptions.RecoverableChannelError
When the exchange cannot deliver to a consumer when the immediate flag is set. As a result of pending data on the queue or the absence of any consumers of the queue.
amqpy.exceptions.
ConnectionForced
(reply_text=None, method_type=None, method_name=None, reply_code=None, channel_id=None)[source]¶Bases: amqpy.exceptions.RecoverableConnectionError
An operator intervened to close the connection for some reason. The client may retry at some later date.
amqpy.exceptions.
InvalidPath
(reply_text=None, method_type=None, method_name=None, reply_code=None, channel_id=None)[source]¶Bases: amqpy.exceptions.IrrecoverableConnectionError
The client tried to work with an unknown virtual host.
amqpy.exceptions.
AccessRefused
(reply_text=None, method_type=None, method_name=None, reply_code=None, channel_id=None)[source]¶Bases: amqpy.exceptions.IrrecoverableChannelError
The client attempted to work with a server entity to which it has no access due to security settings.
amqpy.exceptions.
NotFound
(reply_text=None, method_type=None, method_name=None, reply_code=None, channel_id=None)[source]¶Bases: amqpy.exceptions.IrrecoverableChannelError
The client attempted to work with a server entity that does not exist.
amqpy.exceptions.
ResourceLocked
(reply_text=None, method_type=None, method_name=None, reply_code=None, channel_id=None)[source]¶Bases: amqpy.exceptions.RecoverableChannelError
The client attempted to work with a server entity to which it has no access because another client is working with it.
amqpy.exceptions.
PreconditionFailed
(reply_text=None, method_type=None, method_name=None, reply_code=None, channel_id=None)[source]¶Bases: amqpy.exceptions.IrrecoverableChannelError
The client requested a method that was not allowed because some precondition failed.
amqpy.exceptions.
FrameError
(reply_text=None, method_type=None, method_name=None, reply_code=None, channel_id=None)[source]¶Bases: amqpy.exceptions.IrrecoverableConnectionError
The sender sent a malformed frame that the recipient could not decode. This strongly implies a programming error in the sending peer.
amqpy.exceptions.
FrameSyntaxError
(reply_text=None, method_type=None, method_name=None, reply_code=None, channel_id=None)[source]¶Bases: amqpy.exceptions.IrrecoverableConnectionError
The sender sent a frame that contained illegal values for one or more fields. This strongly implies a programming error in the sending peer.
amqpy.exceptions.
InvalidCommand
(reply_text=None, method_type=None, method_name=None, reply_code=None, channel_id=None)[source]¶Bases: amqpy.exceptions.IrrecoverableConnectionError
The client sent an invalid sequence of frames, attempting to perform an operation that was considered invalid by the server. This usually implies a programming error in the client.
amqpy.exceptions.
ChannelNotOpen
(reply_text=None, method_type=None, method_name=None, reply_code=None, channel_id=None)[source]¶Bases: amqpy.exceptions.IrrecoverableConnectionError
The client attempted to work with a channel that had not been correctly opened. This most likely indicates a fault in the client layer.
amqpy.exceptions.
UnexpectedFrame
(reply_text=None, method_type=None, method_name=None, reply_code=None, channel_id=None)[source]¶Bases: amqpy.exceptions.IrrecoverableConnectionError
The peer sent a frame that was not expected, usually in the context of a content header and body. This strongly indicates a fault in the peer’s content processing.
amqpy.exceptions.
ResourceError
(reply_text=None, method_type=None, method_name=None, reply_code=None, channel_id=None)[source]¶Bases: amqpy.exceptions.RecoverableConnectionError
The server could not complete the method because it lacked sufficient resources. This may be due to the client creating too many of some type of entity.
amqpy.exceptions.
NotAllowed
(reply_text=None, method_type=None, method_name=None, reply_code=None, channel_id=None)[source]¶Bases: amqpy.exceptions.IrrecoverableConnectionError
The client tried to work with some entity in a manner that is prohibited by the server, due to security settings or by some other criteria.
amqpy.exceptions.
AMQPNotImplementedError
(reply_text=None, method_type=None, method_name=None, reply_code=None, channel_id=None)[source]¶Bases: amqpy.exceptions.IrrecoverableConnectionError
The client tried to use functionality that is not implemented in the server.
amqpy.exceptions.
InternalError
(reply_text=None, method_type=None, method_name=None, reply_code=None, channel_id=None)[source]¶Bases: amqpy.exceptions.IrrecoverableConnectionError
The server could not complete the method because of an internal error. The server may require intervention by an operator in order to resume normal operations.
amqpy is a pure-Python AMQP 0.9.1 client library for Python >= 3.2.0 (including PyPy3) with a focus on:
It has very good performance, as AMQP 0.9.1 is a very efficient binary protocol, but does not sacrifice clean design and testability to save a few extra CPU cycles.
This library is actively maintained and has a zero bug policy. Please submit issues and pull requests, and bugs will be fixed immediately.
The current API is not final, but will progressively get more stable as version 1.0.0 is approached.
This library makes the following guarantees:
amqpy is easy to install, and there are no dependencies:
pip install amqpy
amqpy is easy to use:
from amqpy import Connection, Message, AbstractConsumer, Timeout
conn = Connection() # connect to guest:guest@localhost:5672 by default
ch = conn.channel()
# declare an exchange and queue, and bind the queue to the exchange
ch.exchange_declare('test.exchange', 'direct')
ch.queue_declare('test.q')
ch.queue_bind('test.q', exchange='test.exchange', routing_key='test.q')
# publish a few messages, which will get routed to the queue bound to the routing key "test.q"
ch.basic_publish(Message('hello world 1'), exchange='test.exchange', routing_key='test.q')
ch.basic_publish(Message('hello world 2'), exchange='test.exchange', routing_key='test.q')
ch.basic_publish(Message('hello world 3'), exchange='test.exchange', routing_key='test.q')
# get a message from the queue
msg = ch.basic_get('test.q')
# don't forget to acknowledge it
msg.ack()
Let’s create a consumer:
class Consumer(AbstractConsumer):
def run(self, msg: Message):
print('Received a message: {}'.format(msg.body))
msg.ack()
consumer = Consumer(ch, 'test.q')
consumer.declare()
# wait for events, which will receive delivered messages and call any consumer callbacks
while True:
conn.drain_events(timeout=None)
Any AMQP 0.9.1-compliant server is supported, but RabbitMQ is our primary target. Apache Qpid is confirmed to work, but only with “anonymous” authentication. A CRAM-MD5 auth mechanism is currently being developed and will be released shortly.
Connection.drain_events()
Supports RabbitMQ extensions:
Channel.confirm_select()
, then use
Channel.basic_publish_confirm
Channel.exchange_bind()
and Channel.exchange_unbind()
ChannelError
being raised,
but not if an on_cancel
callback is passed to basic_consume
amqpy uses the excellent tox and pytest frameworks. To run all tests, simply install a local RabbitMQ server. No additional configuration is necessary for RabbitMQ. Then run in the project root:
$ pip install pytest
$ py.test