"""AMQPStorm Channel.Queue."""
import logging
from pamqp.specification import Queue as pamqp_queue
from amqpstorm import compatibility
from amqpstorm.base import Handler
from amqpstorm.exception import AMQPInvalidArgument
LOGGER = logging.getLogger(__name__)
[docs]class Queue(Handler):
"""RabbitMQ Queue Operations."""
__slots__ = []
[docs] def declare(self, queue='', passive=False, durable=False,
exclusive=False, auto_delete=False, arguments=None):
"""Declare a Queue.
:param str queue: Queue name
:param bool passive: Do not create
:param bool durable: Durable queue
:param bool exclusive: Request exclusive access
:param bool auto_delete: Automatically delete when not in use
:param dict arguments: Queue key/value arguments
:raises AMQPInvalidArgument: Invalid Parameters
:raises AMQPChannelError: Raises if the channel encountered an error.
:raises AMQPConnectionError: Raises if the connection
encountered an error.
:rtype: dict
"""
if not compatibility.is_string(queue):
raise AMQPInvalidArgument('queue should be a string')
elif not isinstance(passive, bool):
raise AMQPInvalidArgument('passive should be a boolean')
elif not isinstance(durable, bool):
raise AMQPInvalidArgument('durable should be a boolean')
elif not isinstance(exclusive, bool):
raise AMQPInvalidArgument('exclusive should be a boolean')
elif not isinstance(auto_delete, bool):
raise AMQPInvalidArgument('auto_delete should be a boolean')
elif arguments is not None and not isinstance(arguments, dict):
raise AMQPInvalidArgument('arguments should be a dict or None')
declare_frame = pamqp_queue.Declare(queue=queue,
passive=passive,
durable=durable,
exclusive=exclusive,
auto_delete=auto_delete,
arguments=arguments)
return self._channel.rpc_request(declare_frame)
[docs] def delete(self, queue='', if_unused=False, if_empty=False):
"""Delete a Queue.
:param str queue: Queue name
:param bool if_unused: Delete only if unused
:param bool if_empty: Delete only if empty
:raises AMQPInvalidArgument: Invalid Parameters
:raises AMQPChannelError: Raises if the channel encountered an error.
:raises AMQPConnectionError: Raises if the connection
encountered an error.
:rtype: dict
"""
if not compatibility.is_string(queue):
raise AMQPInvalidArgument('queue should be a string')
elif not isinstance(if_unused, bool):
raise AMQPInvalidArgument('if_unused should be a boolean')
elif not isinstance(if_empty, bool):
raise AMQPInvalidArgument('if_empty should be a boolean')
delete_frame = pamqp_queue.Delete(queue=queue, if_unused=if_unused,
if_empty=if_empty)
return self._channel.rpc_request(delete_frame)
[docs] def purge(self, queue):
"""Purge a Queue.
:param str queue: Queue name
:raises AMQPInvalidArgument: Invalid Parameters
:raises AMQPChannelError: Raises if the channel encountered an error.
:raises AMQPConnectionError: Raises if the connection
encountered an error.
:rtype: dict
"""
if not compatibility.is_string(queue):
raise AMQPInvalidArgument('queue should be a string')
purge_frame = pamqp_queue.Purge(queue=queue)
return self._channel.rpc_request(purge_frame)
[docs] def bind(self, queue='', exchange='', routing_key='', arguments=None):
"""Bind a Queue.
:param str queue: Queue name
:param str exchange: Exchange name
:param str routing_key: The routing key to use
:param dict arguments: Bind key/value arguments
:raises AMQPInvalidArgument: Invalid Parameters
:raises AMQPChannelError: Raises if the channel encountered an error.
:raises AMQPConnectionError: Raises if the connection
encountered an error.
:rtype: dict
"""
if not compatibility.is_string(queue):
raise AMQPInvalidArgument('queue should be a string')
elif not compatibility.is_string(exchange):
raise AMQPInvalidArgument('exchange should be a string')
elif not compatibility.is_string(routing_key):
raise AMQPInvalidArgument('routing_key should be a string')
elif arguments is not None and not isinstance(arguments, dict):
raise AMQPInvalidArgument('arguments should be a dict or None')
bind_frame = pamqp_queue.Bind(queue=queue,
exchange=exchange,
routing_key=routing_key,
arguments=arguments)
return self._channel.rpc_request(bind_frame)
[docs] def unbind(self, queue='', exchange='', routing_key='', arguments=None):
"""Unbind a Queue.
:param str queue: Queue name
:param str exchange: Exchange name
:param str routing_key: The routing key used
:param dict arguments: Unbind key/value arguments
:raises AMQPInvalidArgument: Invalid Parameters
:raises AMQPChannelError: Raises if the channel encountered an error.
:raises AMQPConnectionError: Raises if the connection
encountered an error.
:rtype: dict
"""
if not compatibility.is_string(queue):
raise AMQPInvalidArgument('queue should be a string')
elif not compatibility.is_string(exchange):
raise AMQPInvalidArgument('exchange should be a string')
elif not compatibility.is_string(routing_key):
raise AMQPInvalidArgument('routing_key should be a string')
elif arguments is not None and not isinstance(arguments, dict):
raise AMQPInvalidArgument('arguments should be a dict or None')
unbind_frame = pamqp_queue.Unbind(queue=queue,
exchange=exchange,
routing_key=routing_key,
arguments=arguments)
return self._channel.rpc_request(unbind_frame)