Source code for amqpstorm.exchange

"""AMQPStorm Channel.Exchange."""
from __future__ import annotations

import logging
from typing import Any

from pamqp.commands import Exchange as pamqp_exchange

from amqpstorm import compatibility
from amqpstorm.base import Handler
from amqpstorm.exception import AMQPInvalidArgument

LOGGER = logging.getLogger(__name__)


[docs] class Exchange(Handler): """RabbitMQ Exchange Operations.""" __slots__ = []
[docs] def declare( self, exchange: str = '', exchange_type: str = 'direct', passive: bool = False, durable: bool = False, auto_delete: bool = False, arguments: dict[str, Any] | None = None, ) -> dict[str, Any]: """Declare an Exchange. :param str exchange: Exchange name :param str exchange_type: Exchange type :param bool passive: Do not create :param bool durable: Durable exchange :param bool auto_delete: Automatically delete when not in use :param dict arguments: Exchange key/value arguments :raises AMQPInvalidArgument: Invalid Parameters :raises AMQPChannelError: Raises if the channel encountered an error. :raises AMQPConnectionError: Raises if the connection encountered an error. :rtype: dict """ if not compatibility.is_string(exchange): raise AMQPInvalidArgument('exchange should be a string') elif not compatibility.is_string(exchange_type): raise AMQPInvalidArgument('exchange_type should be a string') elif not isinstance(passive, bool): raise AMQPInvalidArgument('passive should be a boolean') elif not isinstance(durable, bool): raise AMQPInvalidArgument('durable should be a boolean') elif not isinstance(auto_delete, bool): raise AMQPInvalidArgument('auto_delete should be a boolean') elif arguments is not None and not isinstance(arguments, dict): raise AMQPInvalidArgument('arguments should be a dict or None') declare_frame = pamqp_exchange.Declare(exchange=exchange, exchange_type=exchange_type, passive=passive, durable=durable, auto_delete=auto_delete, arguments=arguments) return self._channel.rpc_request(declare_frame)
[docs] def delete(self, exchange: str = '', if_unused: bool = False) -> dict[str, Any]: """Delete an Exchange. :param str exchange: Exchange name :param bool if_unused: Delete only if unused :raises AMQPInvalidArgument: Invalid Parameters :raises AMQPChannelError: Raises if the channel encountered an error. :raises AMQPConnectionError: Raises if the connection encountered an error. :rtype: dict """ if not compatibility.is_string(exchange): raise AMQPInvalidArgument('exchange should be a string') delete_frame = pamqp_exchange.Delete(exchange=exchange, if_unused=if_unused) return self._channel.rpc_request(delete_frame)
[docs] def bind( self, destination: str = '', source: str = '', routing_key: str = '', arguments: dict[str, Any] | None = None, ) -> dict[str, Any]: """Bind an Exchange. :param str destination: Exchange name :param str source: Exchange to bind to :param str routing_key: The routing key to use :param dict arguments: Bind key/value arguments :raises AMQPInvalidArgument: Invalid Parameters :raises AMQPChannelError: Raises if the channel encountered an error. :raises AMQPConnectionError: Raises if the connection encountered an error. :rtype: dict """ if not compatibility.is_string(destination): raise AMQPInvalidArgument('destination should be a string') elif not compatibility.is_string(source): raise AMQPInvalidArgument('source should be a string') elif not compatibility.is_string(routing_key): raise AMQPInvalidArgument('routing_key should be a string') elif arguments is not None and not isinstance(arguments, dict): raise AMQPInvalidArgument('arguments should be a dict or None') bind_frame = pamqp_exchange.Bind(destination=destination, source=source, routing_key=routing_key, arguments=arguments) return self._channel.rpc_request(bind_frame)
[docs] def unbind( self, destination: str = '', source: str = '', routing_key: str = '', arguments: dict[str, Any] | None = None, ) -> dict[str, Any]: """Unbind an Exchange. :param str destination: Exchange name :param str source: Exchange to unbind from :param str routing_key: The routing key used :param dict arguments: Unbind key/value arguments :raises AMQPInvalidArgument: Invalid Parameters :raises AMQPChannelError: Raises if the channel encountered an error. :raises AMQPConnectionError: Raises if the connection encountered an error. :rtype: dict """ if not compatibility.is_string(destination): raise AMQPInvalidArgument('destination should be a string') elif not compatibility.is_string(source): raise AMQPInvalidArgument('source should be a string') elif not compatibility.is_string(routing_key): raise AMQPInvalidArgument('routing_key should be a string') elif arguments is not None and not isinstance(arguments, dict): raise AMQPInvalidArgument('arguments should be a dict or None') unbind_frame = pamqp_exchange.Unbind(destination=destination, source=source, routing_key=routing_key, arguments=arguments) return self._channel.rpc_request(unbind_frame)