"""Messages for AMQP
"""
from __future__ import absolute_import, division, print_function
__metaclass__ = type
import six
from . import spec
from amqpy.serialization import AMQPReader, AMQPWriter
import logging
log = logging.getLogger('amqpy')
__all__ = ['Message']
class GenericContent:
"""Base class for AMQP content
Subclasses should override :attr:`PROPERTIES`.
"""
__slots__ = ['properties']
PROPERTIES = []
def __init__(self, properties):
"""Save the properties appropriate to this AMQP content type in a 'properties' dictionary
:param dict properties: content properties
"""
#: Content properties
#:
#: :type: dict[str|unicode, str|dict]
self.properties = properties
def __eq__(self, other):
"""Check if this object has the same properties as another content object
"""
return self.properties == other.properties
def load_properties(self, raw_bytes):
"""Load raw bytes into :attr:`self.properties`
The `raw_bytes` are the payload of a `FrameType.HEADER` frame, starting at a byte-offset
of 12.
"""
reader = AMQPReader(raw_bytes)
# read 16-bit shorts until we get one with a low bit set to zero
flags = []
while True:
flag_bits = reader.read_short()
flags.append(flag_bits)
if flag_bits & 1 == 0:
break
shift = 0
d = {}
flag_bits = None
for prop_name, data_type in self.PROPERTIES:
if shift == 0:
if not flags:
break
flag_bits, flags = flags[0], flags[1:]
shift = 15
if flag_bits & (1 << shift):
d[prop_name] = getattr(reader, 'read_' + data_type)()
shift -= 1
self.properties = d
def serialize_properties(self):
"""Serialize :attr:`self.properties` into raw bytes suitable to append
to the payload of `FrameType.HEADER` frames
"""
# write
shift = 15
flag_bits = 0
flags = []
prop_writer = AMQPWriter()
for prop_name, data_type in self.PROPERTIES:
val = self.properties.get(prop_name)
if val is not None:
if shift == 0:
flags.append(flag_bits)
flag_bits = 0
shift = 15
flag_bits |= (1 << shift)
if data_type != 'bit':
getattr(prop_writer, 'write_' + data_type)(val)
shift -= 1
flags.append(flag_bits)
# write final data
writer = AMQPWriter()
for flag_bits in flags:
writer.write_short(flag_bits)
writer.write(prop_writer.getvalue())
return writer.getvalue()
[docs]class Message(GenericContent):
"""A Message for use with the `Channel.basic_*` methods
"""
__slots__ = ['body', 'channel', 'delivery_info']
CLASS_ID = spec.Basic.CLASS_ID
# instances of this class have these attributes, which are passed back and
# forth as message properties between client and server
PROPERTIES = [
('content_type', 'shortstr'),
('content_encoding', 'shortstr'),
('application_headers', 'table'),
('delivery_mode', 'octet'),
('priority', 'octet'),
('correlation_id', 'shortstr'),
('reply_to', 'shortstr'),
('expiration', 'shortstr'),
('message_id', 'shortstr'),
('timestamp', 'timestamp'),
('type', 'shortstr'),
('user_id', 'shortstr'),
('app_id', 'shortstr'),
('cluster_id', 'shortstr')
]
[docs] def __init__(self, body='', channel=None, **properties):
"""
If `body` is a `str`, then `content_encoding` will automatically be set to 'UTF-8', unless
explicitly specified.
Example::
msg = Message('hello world', content_type='text/plain', application_headers={'foo': 7})
:param body: message body
:type body: bytes or str or unicode
:param channel: associated channel
:type channel: amqpy.channel.Channel
:param properties:
* content_type (shortstr): MIME content type
* content_encoding (shortstr): MIME content encoding
* application_headers: (table): Message header field table:
dict[str, str|int|Decimal|datetime|dict]
* delivery_mode: (octet): Non-persistent (1) or persistent (2)
* priority (octet): The message priority, 0 to 9
* correlation_id (shortstr) The application correlation identifier
* reply_to (shortstr) The destination to reply to
* expiration (shortstr): Message expiration specification
* message_id (shortstr): The application message identifier
* timestamp (datetime.datetime): The message timestamp
* type (shortstr): The message type name
* user_id (shortstr): The creating user id
* app_id (shortstr): The creating application id
* cluster_id (shortstr): Intra-cluster routing identifier
"""
super(Message, self).__init__(properties)
#: Message body (bytes or str or unicode)
self.body = body
#: Associated channel, set after receiving a message (amqpy.channel.Channel)
self.channel = channel
#: Delivery info, set after receiving a message (dict)
self.delivery_info = {}
if isinstance(body, six.string_types):
# if the `body` is a string, automatically set the content_encoding
# to UTF-8 if it hasn't already been set
self.properties['content_encoding'] = properties.get('content_encoding', 'UTF-8')
def __eq__(self, other):
"""Check if the properties and bodies of this Message and another Message are the same
Received messages may contain a 'delivery_info' attribute, which isn't compared.
"""
try:
return super(Message, self).__eq__(other) and self.body == other.body
except AttributeError:
return False
@property
def application_headers(self):
"""Get application headers
:return: application headers
:rtype: dict
"""
return self.properties.get('application_headers')
@property
def delivery_tag(self):
"""Get delivery tag
:return: delivery tag
:rtype: int
"""
return self.delivery_info.get('delivery_tag')
[docs] def ack(self):
"""Acknowledge message
This is a convenience method which calls :meth:`self.channel.basic_ack()`
"""
dt = self.delivery_tag
if dt is not None:
self.channel.basic_ack(dt)
else:
raise Exception('No delivery tag')
[docs] def reject(self, requeue):
"""Reject message
This is a convenience method which calls :meth:`self.channel.basic_reject()`
:param bool requeue: requeue if True else discard the message
"""
dt = self.delivery_tag
if dt is not None:
self.channel.basic_reject(dt, requeue)
else:
raise Exception('No delivery tag')