"""AMQPStorm Channel.Exchange."""
from __future__ import annotations
import logging
from typing import Any
from pamqp.commands import Exchange as pamqp_exchange
from amqpstorm import compatibility
from amqpstorm.base import Handler
from amqpstorm.exception import AMQPInvalidArgument
LOGGER = logging.getLogger(__name__)
[docs]
class Exchange(Handler):
"""RabbitMQ Exchange Operations."""
__slots__ = []
[docs]
def declare(
self,
exchange: str = '',
exchange_type: str = 'direct',
passive: bool = False,
durable: bool = False,
auto_delete: bool = False,
arguments: dict[str, Any] | None = None,
) -> dict[str, Any]:
"""Declare an Exchange.
:param str exchange: Exchange name
:param str exchange_type: Exchange type
:param bool passive: Do not create
:param bool durable: Durable exchange
:param bool auto_delete: Automatically delete when not in use
:param dict arguments: Exchange key/value arguments
:raises AMQPInvalidArgument: Invalid Parameters
:raises AMQPChannelError: Raises if the channel encountered an error.
:raises AMQPConnectionError: Raises if the connection
encountered an error.
:rtype: dict
"""
if not compatibility.is_string(exchange):
raise AMQPInvalidArgument('exchange should be a string')
elif not compatibility.is_string(exchange_type):
raise AMQPInvalidArgument('exchange_type should be a string')
elif not isinstance(passive, bool):
raise AMQPInvalidArgument('passive should be a boolean')
elif not isinstance(durable, bool):
raise AMQPInvalidArgument('durable should be a boolean')
elif not isinstance(auto_delete, bool):
raise AMQPInvalidArgument('auto_delete should be a boolean')
elif arguments is not None and not isinstance(arguments, dict):
raise AMQPInvalidArgument('arguments should be a dict or None')
declare_frame = pamqp_exchange.Declare(exchange=exchange,
exchange_type=exchange_type,
passive=passive,
durable=durable,
auto_delete=auto_delete,
arguments=arguments)
return self._channel.rpc_request(declare_frame)
[docs]
def delete(self, exchange: str = '', if_unused: bool = False) -> dict[str, Any]:
"""Delete an Exchange.
:param str exchange: Exchange name
:param bool if_unused: Delete only if unused
:raises AMQPInvalidArgument: Invalid Parameters
:raises AMQPChannelError: Raises if the channel encountered an error.
:raises AMQPConnectionError: Raises if the connection
encountered an error.
:rtype: dict
"""
if not compatibility.is_string(exchange):
raise AMQPInvalidArgument('exchange should be a string')
delete_frame = pamqp_exchange.Delete(exchange=exchange,
if_unused=if_unused)
return self._channel.rpc_request(delete_frame)
[docs]
def bind(
self,
destination: str = '',
source: str = '',
routing_key: str = '',
arguments: dict[str, Any] | None = None,
) -> dict[str, Any]:
"""Bind an Exchange.
:param str destination: Exchange name
:param str source: Exchange to bind to
:param str routing_key: The routing key to use
:param dict arguments: Bind key/value arguments
:raises AMQPInvalidArgument: Invalid Parameters
:raises AMQPChannelError: Raises if the channel encountered an error.
:raises AMQPConnectionError: Raises if the connection
encountered an error.
:rtype: dict
"""
if not compatibility.is_string(destination):
raise AMQPInvalidArgument('destination should be a string')
elif not compatibility.is_string(source):
raise AMQPInvalidArgument('source should be a string')
elif not compatibility.is_string(routing_key):
raise AMQPInvalidArgument('routing_key should be a string')
elif arguments is not None and not isinstance(arguments, dict):
raise AMQPInvalidArgument('arguments should be a dict or None')
bind_frame = pamqp_exchange.Bind(destination=destination,
source=source,
routing_key=routing_key,
arguments=arguments)
return self._channel.rpc_request(bind_frame)
[docs]
def unbind(
self,
destination: str = '',
source: str = '',
routing_key: str = '',
arguments: dict[str, Any] | None = None,
) -> dict[str, Any]:
"""Unbind an Exchange.
:param str destination: Exchange name
:param str source: Exchange to unbind from
:param str routing_key: The routing key used
:param dict arguments: Unbind key/value arguments
:raises AMQPInvalidArgument: Invalid Parameters
:raises AMQPChannelError: Raises if the channel encountered an error.
:raises AMQPConnectionError: Raises if the connection
encountered an error.
:rtype: dict
"""
if not compatibility.is_string(destination):
raise AMQPInvalidArgument('destination should be a string')
elif not compatibility.is_string(source):
raise AMQPInvalidArgument('source should be a string')
elif not compatibility.is_string(routing_key):
raise AMQPInvalidArgument('routing_key should be a string')
elif arguments is not None and not isinstance(arguments, dict):
raise AMQPInvalidArgument('arguments should be a dict or None')
unbind_frame = pamqp_exchange.Unbind(destination=destination,
source=source,
routing_key=routing_key,
arguments=arguments)
return self._channel.rpc_request(unbind_frame)