Source code for amqpstorm.basic

"""AMQPStorm Channel.Basic."""
from __future__ import annotations

import logging
from typing import TYPE_CHECKING
from typing import Any
from typing import Callable
from typing import Iterable

from pamqp import body as pamqp_body
from pamqp import header as pamqp_header
from pamqp import commands

from amqpstorm import compatibility
from amqpstorm.base import BaseMessage
from amqpstorm.base import Handler
from amqpstorm.base import MAX_FRAME_SIZE
from amqpstorm.exception import AMQPChannelError
from amqpstorm.exception import AMQPInvalidArgument
from amqpstorm.message import Message

if TYPE_CHECKING:
    from amqpstorm.channel import Channel

LOGGER = logging.getLogger(__name__)


[docs] class Basic(Handler): """RabbitMQ Basic Operations.""" __slots__ = ['_max_frame_size']
[docs] def __init__(self, channel: Channel, max_frame_size: int | None = None) -> None: super().__init__(channel) self._max_frame_size = max_frame_size or MAX_FRAME_SIZE
[docs] def qos( self, prefetch_count: int = 0, prefetch_size: int = 0, global_: bool = False, ) -> dict[str, Any]: """Specify quality of service. :param int prefetch_count: Prefetch window in messages :param int/long prefetch_size: Prefetch window in octets :param bool global_: Apply to entire connection :raises AMQPInvalidArgument: Invalid Parameters :raises AMQPChannelError: Raises if the channel encountered an error. :raises AMQPConnectionError: Raises if the connection encountered an error. :rtype: dict """ if not compatibility.is_integer(prefetch_count): raise AMQPInvalidArgument('prefetch_count should be an integer') elif not compatibility.is_integer(prefetch_size): raise AMQPInvalidArgument('prefetch_size should be an integer') elif not isinstance(global_, bool): raise AMQPInvalidArgument('global_ should be a boolean') qos_frame = commands.Basic.Qos(prefetch_count=prefetch_count, prefetch_size=prefetch_size, global_=global_) return self._channel.rpc_request(qos_frame)
[docs] def get( self, queue: str = '', no_ack: bool = False, to_dict: bool = False, auto_decode: bool = True, message_impl: type[BaseMessage] | None = None, ) -> Message | BaseMessage | dict[str, Any] | None: """Fetch a single message. :param str queue: Queue name :param bool no_ack: No acknowledgement needed :param bool to_dict: Should incoming messages be converted to a dictionary before delivery. :param bool auto_decode: Auto-decode strings when possible. :param class message_impl: Message implementation based on BaseMessage :raises AMQPInvalidArgument: Invalid Parameters :raises AMQPChannelError: Raises if the channel encountered an error. :raises AMQPConnectionError: Raises if the connection encountered an error. :returns: Returns a single message, as long as there is a message in the queue. If no message is available, returns None. :rtype: amqpstorm.Message,dict,None """ if not compatibility.is_string(queue): raise AMQPInvalidArgument('queue should be a string') elif not isinstance(no_ack, bool): raise AMQPInvalidArgument('no_ack should be a boolean') elif self._channel.consumer_tags: raise AMQPChannelError("Cannot call 'get' when channel is " "set to consume") if message_impl: if not issubclass(message_impl, BaseMessage): raise AMQPInvalidArgument( 'message_impl should be derived from BaseMessage' ) else: message_impl = Message get_frame = commands.Basic.Get(queue=queue, no_ack=no_ack) with self._channel.lock: message = self._get_message(get_frame, auto_decode=auto_decode, message_impl=message_impl) if message and to_dict: return message.to_dict() return message
[docs] def recover(self, requeue: bool = False) -> dict[str, Any]: """Redeliver unacknowledged messages. :param bool requeue: Re-queue the messages :raises AMQPInvalidArgument: Invalid Parameters :raises AMQPChannelError: Raises if the channel encountered an error. :raises AMQPConnectionError: Raises if the connection encountered an error. :rtype: dict """ if not isinstance(requeue, bool): raise AMQPInvalidArgument('requeue should be a boolean') recover_frame = commands.Basic.Recover(requeue=requeue) return self._channel.rpc_request(recover_frame)
[docs] def consume( self, callback: Callable[..., Any] | None = None, queue: str = '', consumer_tag: str = '', exclusive: bool = False, no_ack: bool = False, no_local: bool = False, arguments: dict[str, Any] | None = None, ) -> str: """Start a queue consumer. :param typing.Callable callback: Message callback :param str queue: Queue name :param str consumer_tag: Consumer tag :param bool no_local: Do not deliver own messages :param bool no_ack: No acknowledgement needed :param bool exclusive: Request exclusive access :param dict arguments: Consume key/value arguments :raises AMQPInvalidArgument: Invalid Parameters :raises AMQPChannelError: Raises if the channel encountered an error. :raises AMQPConnectionError: Raises if the connection encountered an error. :returns: Consumer tag :rtype: str """ if not compatibility.is_string(queue): raise AMQPInvalidArgument('queue should be a string') elif not compatibility.is_string(consumer_tag): raise AMQPInvalidArgument('consumer_tag should be a string') elif not isinstance(exclusive, bool): raise AMQPInvalidArgument('exclusive should be a boolean') elif not isinstance(no_ack, bool): raise AMQPInvalidArgument('no_ack should be a boolean') elif not isinstance(no_local, bool): raise AMQPInvalidArgument('no_local should be a boolean') elif arguments is not None and not isinstance(arguments, dict): raise AMQPInvalidArgument('arguments should be a dict or None') with self._channel.lock: consume_rpc_result = self._consume_rpc_request(arguments, consumer_tag, exclusive, no_ack, no_local, queue) tag = self._consume_add_and_get_tag(consume_rpc_result) self._channel._consumer_callbacks[tag] = callback return tag
[docs] def cancel(self, consumer_tag: str = '') -> dict[str, Any]: """Cancel a queue consumer. :param str consumer_tag: Consumer tag :raises AMQPInvalidArgument: Invalid Parameters :raises AMQPChannelError: Raises if the channel encountered an error. :raises AMQPConnectionError: Raises if the connection encountered an error. :rtype: dict """ if not compatibility.is_string(consumer_tag): raise AMQPInvalidArgument('consumer_tag should be a string') cancel_frame = commands.Basic.Cancel(consumer_tag=consumer_tag) with self._channel.lock: result = self._channel.rpc_request(cancel_frame) self._channel.remove_consumer_tag(consumer_tag) return result
[docs] def publish( self, body: bytes | str, routing_key: str, exchange: str = '', properties: dict[str, Any] | None = None, mandatory: bool = False, immediate: bool = False, ) -> bool | None: """Publish a Message. :param bytes,str,unicode body: Message payload :param str routing_key: Message routing key :param str exchange: The exchange to publish the message to :param dict properties: Message properties :param bool mandatory: Requires the message is published :param bool immediate: Request immediate delivery :raises AMQPInvalidArgument: Invalid Parameters :raises AMQPChannelError: Raises if the channel encountered an error. :raises AMQPConnectionError: Raises if the connection encountered an error. :rtype: bool,None """ self._validate_publish_parameters(body, exchange, immediate, mandatory, properties, routing_key) properties = properties or {} encoded_body = self._handle_utf8_payload(body, properties) properties_frame = commands.Basic.Properties(**properties) method_frame = commands.Basic.Publish(exchange=exchange, routing_key=routing_key, mandatory=mandatory, immediate=immediate) header_frame = pamqp_header.ContentHeader(body_size=len(encoded_body), properties=properties_frame) frames_out: list[Any] = [method_frame, header_frame] frames_out.extend(self._create_content_body(encoded_body)) if self._channel.confirming_deliveries: with self._channel.rpc.lock: return self._publish_confirm(frames_out, mandatory) self._channel.write_frames(frames_out) return None
[docs] def ack(self, delivery_tag: int = 0, multiple: bool = False) -> None: """Acknowledge Message. :param int/long delivery_tag: Server-assigned delivery tag :param bool multiple: Acknowledge multiple messages :raises AMQPInvalidArgument: Invalid Parameters :raises AMQPChannelError: Raises if the channel encountered an error. :raises AMQPConnectionError: Raises if the connection encountered an error. :return: """ if not compatibility.is_integer(delivery_tag): raise AMQPInvalidArgument('delivery_tag should be an integer') elif not isinstance(multiple, bool): raise AMQPInvalidArgument('multiple should be a boolean') ack_frame = commands.Basic.Ack(delivery_tag=delivery_tag, multiple=multiple) self._channel.write_frame(ack_frame)
[docs] def nack( self, delivery_tag: int = 0, multiple: bool = False, requeue: bool = True, ) -> None: """Negative Acknowledgement. :param int/long delivery_tag: Server-assigned delivery tag :param bool multiple: Negative acknowledge multiple messages :param bool requeue: Re-queue the message :raises AMQPInvalidArgument: Invalid Parameters :raises AMQPChannelError: Raises if the channel encountered an error. :raises AMQPConnectionError: Raises if the connection encountered an error. :return: """ if not compatibility.is_integer(delivery_tag): raise AMQPInvalidArgument('delivery_tag should be an integer') elif not isinstance(multiple, bool): raise AMQPInvalidArgument('multiple should be a boolean') elif not isinstance(requeue, bool): raise AMQPInvalidArgument('requeue should be a boolean') nack_frame = commands.Basic.Nack(delivery_tag=delivery_tag, multiple=multiple, requeue=requeue) self._channel.write_frame(nack_frame)
[docs] def reject(self, delivery_tag: int = 0, requeue: bool = True) -> None: """Reject Message. :param int/long delivery_tag: Server-assigned delivery tag :param bool requeue: Re-queue the message :raises AMQPInvalidArgument: Invalid Parameters :raises AMQPChannelError: Raises if the channel encountered an error. :raises AMQPConnectionError: Raises if the connection encountered an error. :return: """ if not compatibility.is_integer(delivery_tag): raise AMQPInvalidArgument('delivery_tag should be an integer') elif not isinstance(requeue, bool): raise AMQPInvalidArgument('requeue should be a boolean') reject_frame = commands.Basic.Reject(delivery_tag=delivery_tag, requeue=requeue) self._channel.write_frame(reject_frame)
def _consume_add_and_get_tag(self, consume_rpc_result: dict[str, Any]) -> str: """Add the tag to the channel and return it. :param dict consume_rpc_result: :rtype: str """ consumer_tag = consume_rpc_result['consumer_tag'] self._channel.add_consumer_tag(consumer_tag) return consumer_tag def _consume_rpc_request( self, arguments: dict[str, Any] | None, consumer_tag: str, exclusive: bool, no_ack: bool, no_local: bool, queue: str, ) -> dict[str, Any]: """Create a Consume Frame and execute a RPC request. :param str queue: Queue name :param str consumer_tag: Consumer tag :param bool no_local: Do not deliver own messages :param bool no_ack: No acknowledgement needed :param bool exclusive: Request exclusive access :param dict arguments: Consume key/value arguments :rtype: dict """ consume_frame = commands.Basic.Consume(queue=queue, consumer_tag=consumer_tag, exclusive=exclusive, no_local=no_local, no_ack=no_ack, arguments=arguments) return self._channel.rpc_request(consume_frame) @staticmethod def _validate_publish_parameters( body: Any, exchange: Any, immediate: Any, mandatory: Any, properties: Any, routing_key: Any, ) -> None: """Validate Publish Parameters. :param bytes,str,unicode body: Message payload :param str routing_key: Message routing key :param str exchange: The exchange to publish the message to :param dict properties: Message properties :param bool mandatory: Requires the message is published :param bool immediate: Request immediate delivery :raises AMQPInvalidArgument: Invalid Parameters :return: """ if not compatibility.is_string(body): raise AMQPInvalidArgument('body should be a string') elif not compatibility.is_string(routing_key): raise AMQPInvalidArgument('routing_key should be a string') elif not compatibility.is_string(exchange): raise AMQPInvalidArgument('exchange should be a string') elif properties is not None and not isinstance(properties, dict): raise AMQPInvalidArgument('properties should be a dict or None') elif not isinstance(mandatory, bool): raise AMQPInvalidArgument('mandatory should be a boolean') elif not isinstance(immediate, bool): raise AMQPInvalidArgument('immediate should be a boolean') @staticmethod def _handle_utf8_payload(body: bytes | str, properties: dict[str, Any]) -> bytes: """Update the Body and Properties to the appropriate encoding. :param bytes,str,unicode body: Message payload :param dict properties: Message properties :return: """ if 'content_encoding' not in properties: properties['content_encoding'] = 'utf-8' encoding = properties['content_encoding'] if isinstance(body, str): body = bytes(body, encoding=encoding) return body def _get_message( self, get_frame: Any, auto_decode: bool, message_impl: type[BaseMessage], ) -> BaseMessage | None: """Get and return a message using a Basic.Get frame. :param Basic.Get get_frame: :param bool auto_decode: Auto-decode strings when possible. :param class message_impl: Message implementation based on BaseMessage :rtype: Message """ with self._channel.rpc.lock: message_uuid = self._channel.rpc.register_request( get_frame.valid_responses + ['ContentHeader', 'ContentBody'] ) try: self._channel.write_frame(get_frame) get_ok_frame: Any = self._channel.rpc.get_request( message_uuid, raw=True, multiple=True, ) if isinstance(get_ok_frame, commands.Basic.GetEmpty): return None content_header: Any = self._channel.rpc.get_request( message_uuid, raw=True, multiple=True, ) body = self._get_content_body(message_uuid, content_header.body_size) finally: self._channel.rpc.remove(message_uuid) return message_impl(channel=self._channel, body=body, method=dict(get_ok_frame), properties=dict(content_header.properties), auto_decode=auto_decode) def _publish_confirm(self, frames_out: list[Any], mandatory: bool) -> bool: """Confirm that message was published successfully. :param list frames_out: :rtype: bool """ confirm_uuid = self._channel.rpc.register_request(['Basic.Ack', 'Basic.Nack']) self._channel.write_frames(frames_out) result = self._channel.rpc.get_request(confirm_uuid, raw=True) if mandatory: self._channel.check_for_exceptions() if isinstance(result, commands.Basic.Ack): return True return False def _create_content_body(self, body: bytes) -> Iterable[pamqp_body.ContentBody]: """Split body based on the maximum frame size. This function is based on code from Rabbitpy. https://github.com/gmr/rabbitpy :param bytes,str,unicode body: Message payload :rtype: collections.Iterable """ body_len = len(body) max_frame_size = self._max_frame_size for start_frame in range(0, body_len, max_frame_size): end_frame = min(start_frame + max_frame_size, body_len) yield pamqp_body.ContentBody(body[start_frame:end_frame]) def _get_content_body(self, message_uuid: str, body_size: int) -> bytes: """Get Content Body using RPC requests. :param str uuid_body: Rpc Identifier. :param int body_size: Content Size. :rtype: str """ body_parts: list[bytes] = [] body_len = 0 while body_len < body_size: body_piece: Any = self._channel.rpc.get_request( message_uuid, raw=True, multiple=True, ) if not body_piece.value: break body_parts.append(body_piece.value) body_len += len(body_piece.value) return b''.join(body_parts)