"""AMQPStorm Channel.Queue."""
from __future__ import annotations
import logging
from typing import Any
from pamqp.commands import Queue as pamqp_queue
from amqpstorm import compatibility
from amqpstorm.base import Handler
from amqpstorm.exception import AMQPInvalidArgument
LOGGER = logging.getLogger(__name__)
[docs]
class Queue(Handler):
"""RabbitMQ Queue Operations."""
__slots__ = []
[docs]
def declare(
self,
queue: str = '',
passive: bool = False,
durable: bool = False,
exclusive: bool = False,
auto_delete: bool = False,
arguments: dict[str, Any] | None = None,
) -> dict[str, Any]:
"""Declare a Queue.
:param str queue: Queue name
:param bool passive: Do not create
:param bool durable: Durable queue
:param bool exclusive: Request exclusive access
:param bool auto_delete: Automatically delete when not in use
:param dict arguments: Queue key/value arguments
:raises AMQPInvalidArgument: Invalid Parameters
:raises AMQPChannelError: Raises if the channel encountered an error.
:raises AMQPConnectionError: Raises if the connection
encountered an error.
:rtype: dict
"""
if not compatibility.is_string(queue):
raise AMQPInvalidArgument('queue should be a string')
elif not isinstance(passive, bool):
raise AMQPInvalidArgument('passive should be a boolean')
elif not isinstance(durable, bool):
raise AMQPInvalidArgument('durable should be a boolean')
elif not isinstance(exclusive, bool):
raise AMQPInvalidArgument('exclusive should be a boolean')
elif not isinstance(auto_delete, bool):
raise AMQPInvalidArgument('auto_delete should be a boolean')
elif arguments is not None and not isinstance(arguments, dict):
raise AMQPInvalidArgument('arguments should be a dict or None')
declare_frame = pamqp_queue.Declare(queue=queue,
passive=passive,
durable=durable,
exclusive=exclusive,
auto_delete=auto_delete,
arguments=arguments)
return self._channel.rpc_request(declare_frame)
[docs]
def delete(
self, queue: str = '', if_unused: bool = False, if_empty: bool = False,
) -> dict[str, Any]:
"""Delete a Queue.
:param str queue: Queue name
:param bool if_unused: Delete only if unused
:param bool if_empty: Delete only if empty
:raises AMQPInvalidArgument: Invalid Parameters
:raises AMQPChannelError: Raises if the channel encountered an error.
:raises AMQPConnectionError: Raises if the connection
encountered an error.
:rtype: dict
"""
if not compatibility.is_string(queue):
raise AMQPInvalidArgument('queue should be a string')
elif not isinstance(if_unused, bool):
raise AMQPInvalidArgument('if_unused should be a boolean')
elif not isinstance(if_empty, bool):
raise AMQPInvalidArgument('if_empty should be a boolean')
delete_frame = pamqp_queue.Delete(queue=queue, if_unused=if_unused,
if_empty=if_empty)
return self._channel.rpc_request(delete_frame)
[docs]
def purge(self, queue: str) -> dict[str, Any]:
"""Purge a Queue.
:param str queue: Queue name
:raises AMQPInvalidArgument: Invalid Parameters
:raises AMQPChannelError: Raises if the channel encountered an error.
:raises AMQPConnectionError: Raises if the connection
encountered an error.
:rtype: dict
"""
if not compatibility.is_string(queue):
raise AMQPInvalidArgument('queue should be a string')
purge_frame = pamqp_queue.Purge(queue=queue)
return self._channel.rpc_request(purge_frame)
[docs]
def bind(
self,
queue: str = '',
exchange: str = '',
routing_key: str = '',
arguments: dict[str, Any] | None = None,
) -> dict[str, Any]:
"""Bind a Queue.
:param str queue: Queue name
:param str exchange: Exchange name
:param str routing_key: The routing key to use
:param dict arguments: Bind key/value arguments
:raises AMQPInvalidArgument: Invalid Parameters
:raises AMQPChannelError: Raises if the channel encountered an error.
:raises AMQPConnectionError: Raises if the connection
encountered an error.
:rtype: dict
"""
if not compatibility.is_string(queue):
raise AMQPInvalidArgument('queue should be a string')
elif not compatibility.is_string(exchange):
raise AMQPInvalidArgument('exchange should be a string')
elif not compatibility.is_string(routing_key):
raise AMQPInvalidArgument('routing_key should be a string')
elif arguments is not None and not isinstance(arguments, dict):
raise AMQPInvalidArgument('arguments should be a dict or None')
bind_frame = pamqp_queue.Bind(queue=queue,
exchange=exchange,
routing_key=routing_key,
arguments=arguments)
return self._channel.rpc_request(bind_frame)
[docs]
def unbind(
self,
queue: str = '',
exchange: str = '',
routing_key: str = '',
arguments: dict[str, Any] | None = None,
) -> dict[str, Any]:
"""Unbind a Queue.
:param str queue: Queue name
:param str exchange: Exchange name
:param str routing_key: The routing key used
:param dict arguments: Unbind key/value arguments
:raises AMQPInvalidArgument: Invalid Parameters
:raises AMQPChannelError: Raises if the channel encountered an error.
:raises AMQPConnectionError: Raises if the connection
encountered an error.
:rtype: dict
"""
if not compatibility.is_string(queue):
raise AMQPInvalidArgument('queue should be a string')
elif not compatibility.is_string(exchange):
raise AMQPInvalidArgument('exchange should be a string')
elif not compatibility.is_string(routing_key):
raise AMQPInvalidArgument('routing_key should be a string')
elif arguments is not None and not isinstance(arguments, dict):
raise AMQPInvalidArgument('arguments should be a dict or None')
unbind_frame = pamqp_queue.Unbind(queue=queue,
exchange=exchange,
routing_key=routing_key,
arguments=arguments)
return self._channel.rpc_request(unbind_frame)