"""AMQPStorm Message."""
import json
import uuid
from datetime import datetime
from amqpstorm.base import BaseMessage
from amqpstorm.compatibility import try_utf8_decode
from amqpstorm.exception import AMQPMessageError
[docs]
class Message(BaseMessage):
"""RabbitMQ Message.
e.g.
::
# Message Properties.
properties = {
'content_type': 'text/plain',
'expiration': '3600',
'headers': {'key': 'value'},
}
# Create a new message.
message = Message.create(channel, 'Hello RabbitMQ!', properties)
# Publish the message to a queue called, 'my_queue'.
message.publish('my_queue')
:param Channel channel: AMQPStorm Channel
:param bytes,str,unicode body: Message payload
:param dict method: Message method
:param dict properties: Message properties
:param bool auto_decode: Auto-decode strings when possible. Does not
apply to to_dict, or to_tuple.
"""
__slots__ = [
'_decode_cache'
]
def __init__(self, channel, body=None, method=None, properties=None,
auto_decode=True):
super(Message, self).__init__(
channel, body, method, properties, auto_decode
)
self._decode_cache = dict()
[docs]
@staticmethod
def create(channel, body, properties=None):
"""Create a new Message.
:param Channel channel: AMQPStorm Channel
:param bytes,str,unicode body: Message payload
:param dict properties: Message properties
:rtype: Message
"""
properties = dict(properties or {})
if 'correlation_id' not in properties:
properties['correlation_id'] = str(uuid.uuid4())
if 'message_id' not in properties:
properties['message_id'] = str(uuid.uuid4())
if 'timestamp' not in properties:
properties['timestamp'] = datetime.utcnow()
return Message(channel, auto_decode=False,
body=body, properties=properties)
@property
def body(self):
"""Return the Message Body.
If auto_decode is enabled, the body will automatically be
decoded using decode('utf-8') if possible.
:rtype: bytes,str,unicode
"""
if not self._auto_decode:
return self._body
if 'body' in self._decode_cache:
return self._decode_cache['body']
body = try_utf8_decode(self._body)
self._decode_cache['body'] = body
return body
@property
def channel(self):
"""Return the Channel used by this message.
:rtype: Channel
"""
return self._channel
@property
def method(self):
"""Return the Message Method.
If auto_decode is enabled, all strings will automatically be
decoded using decode('utf-8') if possible.
:rtype: dict
"""
return self._try_decode_utf8_content(self._method, 'method')
@property
def properties(self):
"""Returns the Message Properties.
If auto_decode is enabled, all strings will automatically be
decoded using decode('utf-8') if possible.
:rtype: dict
"""
return self._try_decode_utf8_content(self._properties, 'properties')
[docs]
def ack(self):
"""Acknowledge Message.
:raises AMQPInvalidArgument: Invalid Parameters
:raises AMQPChannelError: Raises if the channel encountered an error.
:raises AMQPConnectionError: Raises if the connection
encountered an error.
:return:
"""
if not self._method:
raise AMQPMessageError(
'Message.ack only available on incoming messages'
)
self._channel.basic.ack(delivery_tag=self.delivery_tag)
[docs]
def nack(self, requeue=True):
"""Negative Acknowledgement.
:raises AMQPInvalidArgument: Invalid Parameters
:raises AMQPChannelError: Raises if the channel encountered an error.
:raises AMQPConnectionError: Raises if the connection
encountered an error.
:param bool requeue: Re-queue the message
"""
if not self._method:
raise AMQPMessageError(
'Message.nack only available on incoming messages'
)
self._channel.basic.nack(delivery_tag=self.delivery_tag,
requeue=requeue)
[docs]
def reject(self, requeue=True):
"""Reject Message.
:raises AMQPInvalidArgument: Invalid Parameters
:raises AMQPChannelError: Raises if the channel encountered an error.
:raises AMQPConnectionError: Raises if the connection
encountered an error.
:param bool requeue: Re-queue the message
"""
if not self._method:
raise AMQPMessageError(
'Message.reject only available on incoming messages'
)
self._channel.basic.reject(delivery_tag=self.delivery_tag,
requeue=requeue)
[docs]
def publish(self, routing_key, exchange='', mandatory=False,
immediate=False):
"""Publish Message.
:param str routing_key: Message routing key
:param str exchange: The exchange to publish the message to
:param bool mandatory: Requires the message is published
:param bool immediate: Request immediate delivery
:raises AMQPInvalidArgument: Invalid Parameters
:raises AMQPChannelError: Raises if the channel encountered an error.
:raises AMQPConnectionError: Raises if the connection
encountered an error.
:rtype: bool,None
"""
return self._channel.basic.publish(body=self._body,
routing_key=routing_key,
exchange=exchange,
properties=self._properties,
mandatory=mandatory,
immediate=immediate)
@property
def app_id(self):
"""Get AMQP Message attribute: app_id.
:return:
"""
return self.properties.get('app_id')
@app_id.setter
def app_id(self, value):
"""Set AMQP Message attribute: app_id.
:return:
"""
self._update_properties('app_id', value)
@property
def message_id(self):
"""Get AMQP Message attribute: message_id.
:return:
"""
return self.properties.get('message_id')
@message_id.setter
def message_id(self, value):
"""Set AMQP Message attribute: message_id.
:return:
"""
self._update_properties('message_id', value)
@property
def content_encoding(self):
"""Get AMQP Message attribute: content_encoding.
:return:
"""
return self.properties.get('content_encoding')
@content_encoding.setter
def content_encoding(self, value):
"""Set AMQP Message attribute: content_encoding.
:return:
"""
self._update_properties('content_encoding', value)
@property
def content_type(self):
"""Get AMQP Message attribute: content_type.
:return:
"""
return self.properties.get('content_type')
@content_type.setter
def content_type(self, value):
"""Set AMQP Message attribute: content_type.
:return:
"""
self._update_properties('content_type', value)
@property
def correlation_id(self):
"""Get AMQP Message attribute: correlation_id.
:return:
"""
return self.properties.get('correlation_id')
@correlation_id.setter
def correlation_id(self, value):
"""Set AMQP Message attribute: correlation_id.
:return:
"""
self._update_properties('correlation_id', value)
@property
def delivery_mode(self):
"""Get AMQP Message attribute: delivery_mode.
:return:
"""
return self.properties.get('delivery_mode')
@delivery_mode.setter
def delivery_mode(self, value):
"""Set AMQP Message attribute: delivery_mode.
:return:
"""
self._update_properties('delivery_mode', value)
@property
def timestamp(self):
"""Get AMQP Message attribute: timestamp.
:return:
"""
return self.properties.get('timestamp')
@timestamp.setter
def timestamp(self, value):
"""Set AMQP Message attribute: timestamp.
:return:
"""
self._update_properties('timestamp', value)
@property
def priority(self):
"""Get AMQP Message attribute: priority.
:return:
"""
return self.properties.get('priority')
@priority.setter
def priority(self, value):
"""Set AMQP Message attribute: priority.
:return:
"""
self._update_properties('priority', value)
@property
def reply_to(self):
"""Get AMQP Message attribute: reply_to.
:return:
"""
return self.properties.get('reply_to')
@reply_to.setter
def reply_to(self, value):
"""Set AMQP Message attribute: reply_to.
:return:
"""
self._update_properties('reply_to', value)
@property
def message_type(self):
"""Get AMQP Message attribute: message_type.
:return:
"""
return self.properties.get('message_type')
@message_type.setter
def message_type(self, value):
"""Set AMQP Message attribute: message_type.
:return:
"""
self._update_properties('message_type', value)
@property
def expiration(self):
"""Get AMQP Message attribute: expiration.
:return:
"""
return self.properties.get('expiration')
@expiration.setter
def expiration(self, value):
"""Set AMQP Message attribute: expiration.
:return:
"""
self._update_properties('expiration', value)
@property
def user_id(self):
"""Get AMQP Message attribute: user_id.
:return:
"""
return self.properties.get('user_id')
@user_id.setter
def user_id(self, value):
"""Set AMQP Message attribute: user_id.
:return:
"""
self._update_properties('user_id', value)
@property
def redelivered(self):
"""Indicates if this message may have been delivered before (but not
acknowledged).
:rtype: bool,None
"""
if not self._method:
return None
return self._method.get('redelivered')
@property
def delivery_tag(self):
"""Server-assigned delivery tag.
:rtype: int,None
"""
if not self._method:
return None
return self._method.get('delivery_tag')
[docs]
def json(self):
"""Deserialize the message body, if it is JSON.
:return:
"""
return json.loads(self.body)
def _update_properties(self, name, value):
"""Update properties, and keep cache up-to-date if auto decode is
enabled.
:param str name: Key
:param obj value: Value
:return:
"""
if self._auto_decode and 'properties' in self._decode_cache:
self._decode_cache['properties'][name] = value
self._properties[name] = value
def _try_decode_utf8_content(self, content, content_type):
"""Generic function to decode content.
:param object content:
:return:
"""
if not self._auto_decode or not content:
return content
if content_type in self._decode_cache:
return self._decode_cache[content_type]
if isinstance(content, dict):
content = self._try_decode_dict(content)
else:
content = try_utf8_decode(content)
self._decode_cache[content_type] = content
return content
def _try_decode_dict(self, content):
"""Decode content of a dictionary.
:param dict content:
:return:
"""
result = dict()
for key, value in content.items():
key = try_utf8_decode(key)
if isinstance(value, dict):
result[key] = self._try_decode_dict(value)
elif isinstance(value, list):
result[key] = self._try_decode_list(value)
elif isinstance(value, tuple):
result[key] = self._try_decode_tuple(value)
else:
result[key] = try_utf8_decode(value)
return result
@staticmethod
def _try_decode_list(content):
"""Decode content of a list.
:param list,tuple content:
:return:
"""
result = list()
for value in content:
result.append(try_utf8_decode(value))
return result
@staticmethod
def _try_decode_tuple(content):
"""Decode content of a tuple.
:param tuple content:
:return:
"""
return tuple(Message._try_decode_list(content))