"""AMQPStorm Channel.Basic."""
from __future__ import annotations
import logging
from typing import TYPE_CHECKING
from typing import Any
from typing import Callable
from typing import Iterable
from pamqp import body as pamqp_body
from pamqp import header as pamqp_header
from pamqp import commands
from amqpstorm import compatibility
from amqpstorm.base import BaseMessage
from amqpstorm.base import Handler
from amqpstorm.base import MAX_FRAME_SIZE
from amqpstorm.exception import AMQPChannelError
from amqpstorm.exception import AMQPInvalidArgument
from amqpstorm.message import Message
if TYPE_CHECKING:
from amqpstorm.channel import Channel
LOGGER = logging.getLogger(__name__)
[docs]
class Basic(Handler):
"""RabbitMQ Basic Operations."""
__slots__ = ['_max_frame_size']
[docs]
def __init__(self, channel: Channel, max_frame_size: int | None = None) -> None:
super().__init__(channel)
self._max_frame_size = max_frame_size or MAX_FRAME_SIZE
[docs]
def qos(
self,
prefetch_count: int = 0,
prefetch_size: int = 0,
global_: bool = False,
) -> dict[str, Any]:
"""Specify quality of service.
:param int prefetch_count: Prefetch window in messages
:param int/long prefetch_size: Prefetch window in octets
:param bool global_: Apply to entire connection
:raises AMQPInvalidArgument: Invalid Parameters
:raises AMQPChannelError: Raises if the channel encountered an error.
:raises AMQPConnectionError: Raises if the connection
encountered an error.
:rtype: dict
"""
if not compatibility.is_integer(prefetch_count):
raise AMQPInvalidArgument('prefetch_count should be an integer')
elif not compatibility.is_integer(prefetch_size):
raise AMQPInvalidArgument('prefetch_size should be an integer')
elif not isinstance(global_, bool):
raise AMQPInvalidArgument('global_ should be a boolean')
qos_frame = commands.Basic.Qos(prefetch_count=prefetch_count,
prefetch_size=prefetch_size,
global_=global_)
return self._channel.rpc_request(qos_frame)
[docs]
def get(
self,
queue: str = '',
no_ack: bool = False,
to_dict: bool = False,
auto_decode: bool = True,
message_impl: type[BaseMessage] | None = None,
) -> Message | BaseMessage | dict[str, Any] | None:
"""Fetch a single message.
:param str queue: Queue name
:param bool no_ack: No acknowledgement needed
:param bool to_dict: Should incoming messages be converted to a
dictionary before delivery.
:param bool auto_decode: Auto-decode strings when possible.
:param class message_impl: Message implementation based on BaseMessage
:raises AMQPInvalidArgument: Invalid Parameters
:raises AMQPChannelError: Raises if the channel encountered an error.
:raises AMQPConnectionError: Raises if the connection
encountered an error.
:returns: Returns a single message, as long as there is a message in
the queue. If no message is available, returns None.
:rtype: amqpstorm.Message,dict,None
"""
if not compatibility.is_string(queue):
raise AMQPInvalidArgument('queue should be a string')
elif not isinstance(no_ack, bool):
raise AMQPInvalidArgument('no_ack should be a boolean')
elif self._channel.consumer_tags:
raise AMQPChannelError("Cannot call 'get' when channel is "
"set to consume")
if message_impl:
if not issubclass(message_impl, BaseMessage):
raise AMQPInvalidArgument(
'message_impl should be derived from BaseMessage'
)
else:
message_impl = Message
get_frame = commands.Basic.Get(queue=queue,
no_ack=no_ack)
with self._channel.lock:
message = self._get_message(get_frame, auto_decode=auto_decode,
message_impl=message_impl)
if message and to_dict:
return message.to_dict()
return message
[docs]
def recover(self, requeue: bool = False) -> dict[str, Any]:
"""Redeliver unacknowledged messages.
:param bool requeue: Re-queue the messages
:raises AMQPInvalidArgument: Invalid Parameters
:raises AMQPChannelError: Raises if the channel encountered an error.
:raises AMQPConnectionError: Raises if the connection
encountered an error.
:rtype: dict
"""
if not isinstance(requeue, bool):
raise AMQPInvalidArgument('requeue should be a boolean')
recover_frame = commands.Basic.Recover(requeue=requeue)
return self._channel.rpc_request(recover_frame)
[docs]
def consume(
self,
callback: Callable[..., Any] | None = None,
queue: str = '',
consumer_tag: str = '',
exclusive: bool = False,
no_ack: bool = False,
no_local: bool = False,
arguments: dict[str, Any] | None = None,
) -> str:
"""Start a queue consumer.
:param typing.Callable callback: Message callback
:param str queue: Queue name
:param str consumer_tag: Consumer tag
:param bool no_local: Do not deliver own messages
:param bool no_ack: No acknowledgement needed
:param bool exclusive: Request exclusive access
:param dict arguments: Consume key/value arguments
:raises AMQPInvalidArgument: Invalid Parameters
:raises AMQPChannelError: Raises if the channel encountered an error.
:raises AMQPConnectionError: Raises if the connection
encountered an error.
:returns: Consumer tag
:rtype: str
"""
if not compatibility.is_string(queue):
raise AMQPInvalidArgument('queue should be a string')
elif not compatibility.is_string(consumer_tag):
raise AMQPInvalidArgument('consumer_tag should be a string')
elif not isinstance(exclusive, bool):
raise AMQPInvalidArgument('exclusive should be a boolean')
elif not isinstance(no_ack, bool):
raise AMQPInvalidArgument('no_ack should be a boolean')
elif not isinstance(no_local, bool):
raise AMQPInvalidArgument('no_local should be a boolean')
elif arguments is not None and not isinstance(arguments, dict):
raise AMQPInvalidArgument('arguments should be a dict or None')
with self._channel.lock:
consume_rpc_result = self._consume_rpc_request(arguments, consumer_tag,
exclusive, no_ack,
no_local, queue)
tag = self._consume_add_and_get_tag(consume_rpc_result)
self._channel._consumer_callbacks[tag] = callback
return tag
[docs]
def cancel(self, consumer_tag: str = '') -> dict[str, Any]:
"""Cancel a queue consumer.
:param str consumer_tag: Consumer tag
:raises AMQPInvalidArgument: Invalid Parameters
:raises AMQPChannelError: Raises if the channel encountered an error.
:raises AMQPConnectionError: Raises if the connection
encountered an error.
:rtype: dict
"""
if not compatibility.is_string(consumer_tag):
raise AMQPInvalidArgument('consumer_tag should be a string')
cancel_frame = commands.Basic.Cancel(consumer_tag=consumer_tag)
with self._channel.lock:
result = self._channel.rpc_request(cancel_frame)
self._channel.remove_consumer_tag(consumer_tag)
return result
[docs]
def publish(
self,
body: bytes | str,
routing_key: str,
exchange: str = '',
properties: dict[str, Any] | None = None,
mandatory: bool = False,
immediate: bool = False,
) -> bool | None:
"""Publish a Message.
:param bytes,str,unicode body: Message payload
:param str routing_key: Message routing key
:param str exchange: The exchange to publish the message to
:param dict properties: Message properties
:param bool mandatory: Requires the message is published
:param bool immediate: Request immediate delivery
:raises AMQPInvalidArgument: Invalid Parameters
:raises AMQPChannelError: Raises if the channel encountered an error.
:raises AMQPConnectionError: Raises if the connection
encountered an error.
:rtype: bool,None
"""
self._validate_publish_parameters(body, exchange, immediate, mandatory,
properties, routing_key)
properties = properties or {}
encoded_body = self._handle_utf8_payload(body, properties)
properties_frame = commands.Basic.Properties(**properties)
method_frame = commands.Basic.Publish(exchange=exchange,
routing_key=routing_key,
mandatory=mandatory,
immediate=immediate)
header_frame = pamqp_header.ContentHeader(body_size=len(encoded_body),
properties=properties_frame)
frames_out: list[Any] = [method_frame, header_frame]
frames_out.extend(self._create_content_body(encoded_body))
if self._channel.confirming_deliveries:
with self._channel.rpc.lock:
return self._publish_confirm(frames_out, mandatory)
self._channel.write_frames(frames_out)
return None
[docs]
def ack(self, delivery_tag: int = 0, multiple: bool = False) -> None:
"""Acknowledge Message.
:param int/long delivery_tag: Server-assigned delivery tag
:param bool multiple: Acknowledge multiple messages
:raises AMQPInvalidArgument: Invalid Parameters
:raises AMQPChannelError: Raises if the channel encountered an error.
:raises AMQPConnectionError: Raises if the connection
encountered an error.
:return:
"""
if not compatibility.is_integer(delivery_tag):
raise AMQPInvalidArgument('delivery_tag should be an integer')
elif not isinstance(multiple, bool):
raise AMQPInvalidArgument('multiple should be a boolean')
ack_frame = commands.Basic.Ack(delivery_tag=delivery_tag,
multiple=multiple)
self._channel.write_frame(ack_frame)
[docs]
def nack(
self,
delivery_tag: int = 0,
multiple: bool = False,
requeue: bool = True,
) -> None:
"""Negative Acknowledgement.
:param int/long delivery_tag: Server-assigned delivery tag
:param bool multiple: Negative acknowledge multiple messages
:param bool requeue: Re-queue the message
:raises AMQPInvalidArgument: Invalid Parameters
:raises AMQPChannelError: Raises if the channel encountered an error.
:raises AMQPConnectionError: Raises if the connection
encountered an error.
:return:
"""
if not compatibility.is_integer(delivery_tag):
raise AMQPInvalidArgument('delivery_tag should be an integer')
elif not isinstance(multiple, bool):
raise AMQPInvalidArgument('multiple should be a boolean')
elif not isinstance(requeue, bool):
raise AMQPInvalidArgument('requeue should be a boolean')
nack_frame = commands.Basic.Nack(delivery_tag=delivery_tag,
multiple=multiple,
requeue=requeue)
self._channel.write_frame(nack_frame)
[docs]
def reject(self, delivery_tag: int = 0, requeue: bool = True) -> None:
"""Reject Message.
:param int/long delivery_tag: Server-assigned delivery tag
:param bool requeue: Re-queue the message
:raises AMQPInvalidArgument: Invalid Parameters
:raises AMQPChannelError: Raises if the channel encountered an error.
:raises AMQPConnectionError: Raises if the connection
encountered an error.
:return:
"""
if not compatibility.is_integer(delivery_tag):
raise AMQPInvalidArgument('delivery_tag should be an integer')
elif not isinstance(requeue, bool):
raise AMQPInvalidArgument('requeue should be a boolean')
reject_frame = commands.Basic.Reject(delivery_tag=delivery_tag,
requeue=requeue)
self._channel.write_frame(reject_frame)
def _consume_add_and_get_tag(self, consume_rpc_result: dict[str, Any]) -> str:
"""Add the tag to the channel and return it.
:param dict consume_rpc_result:
:rtype: str
"""
consumer_tag = consume_rpc_result['consumer_tag']
self._channel.add_consumer_tag(consumer_tag)
return consumer_tag
def _consume_rpc_request(
self,
arguments: dict[str, Any] | None,
consumer_tag: str,
exclusive: bool,
no_ack: bool,
no_local: bool,
queue: str,
) -> dict[str, Any]:
"""Create a Consume Frame and execute a RPC request.
:param str queue: Queue name
:param str consumer_tag: Consumer tag
:param bool no_local: Do not deliver own messages
:param bool no_ack: No acknowledgement needed
:param bool exclusive: Request exclusive access
:param dict arguments: Consume key/value arguments
:rtype: dict
"""
consume_frame = commands.Basic.Consume(queue=queue,
consumer_tag=consumer_tag,
exclusive=exclusive,
no_local=no_local,
no_ack=no_ack,
arguments=arguments)
return self._channel.rpc_request(consume_frame)
@staticmethod
def _validate_publish_parameters(
body: Any,
exchange: Any,
immediate: Any,
mandatory: Any,
properties: Any,
routing_key: Any,
) -> None:
"""Validate Publish Parameters.
:param bytes,str,unicode body: Message payload
:param str routing_key: Message routing key
:param str exchange: The exchange to publish the message to
:param dict properties: Message properties
:param bool mandatory: Requires the message is published
:param bool immediate: Request immediate delivery
:raises AMQPInvalidArgument: Invalid Parameters
:return:
"""
if not compatibility.is_string(body):
raise AMQPInvalidArgument('body should be a string')
elif not compatibility.is_string(routing_key):
raise AMQPInvalidArgument('routing_key should be a string')
elif not compatibility.is_string(exchange):
raise AMQPInvalidArgument('exchange should be a string')
elif properties is not None and not isinstance(properties, dict):
raise AMQPInvalidArgument('properties should be a dict or None')
elif not isinstance(mandatory, bool):
raise AMQPInvalidArgument('mandatory should be a boolean')
elif not isinstance(immediate, bool):
raise AMQPInvalidArgument('immediate should be a boolean')
@staticmethod
def _handle_utf8_payload(body: bytes | str, properties: dict[str, Any]) -> bytes:
"""Update the Body and Properties to the appropriate encoding.
:param bytes,str,unicode body: Message payload
:param dict properties: Message properties
:return:
"""
if 'content_encoding' not in properties:
properties['content_encoding'] = 'utf-8'
encoding = properties['content_encoding']
if isinstance(body, str):
body = bytes(body, encoding=encoding)
return body
def _get_message(
self,
get_frame: Any,
auto_decode: bool,
message_impl: type[BaseMessage],
) -> BaseMessage | None:
"""Get and return a message using a Basic.Get frame.
:param Basic.Get get_frame:
:param bool auto_decode: Auto-decode strings when possible.
:param class message_impl: Message implementation based on BaseMessage
:rtype: Message
"""
with self._channel.rpc.lock:
message_uuid = self._channel.rpc.register_request(
get_frame.valid_responses + ['ContentHeader', 'ContentBody']
)
try:
self._channel.write_frame(get_frame)
get_ok_frame: Any = self._channel.rpc.get_request(
message_uuid, raw=True, multiple=True,
)
if isinstance(get_ok_frame, commands.Basic.GetEmpty):
return None
content_header: Any = self._channel.rpc.get_request(
message_uuid, raw=True, multiple=True,
)
body = self._get_content_body(message_uuid,
content_header.body_size)
finally:
self._channel.rpc.remove(message_uuid)
return message_impl(channel=self._channel,
body=body,
method=dict(get_ok_frame),
properties=dict(content_header.properties),
auto_decode=auto_decode)
def _publish_confirm(self, frames_out: list[Any], mandatory: bool) -> bool:
"""Confirm that message was published successfully.
:param list frames_out:
:rtype: bool
"""
confirm_uuid = self._channel.rpc.register_request(['Basic.Ack',
'Basic.Nack'])
self._channel.write_frames(frames_out)
result = self._channel.rpc.get_request(confirm_uuid, raw=True)
if mandatory:
self._channel.check_for_exceptions()
if isinstance(result, commands.Basic.Ack):
return True
return False
def _create_content_body(self, body: bytes) -> Iterable[pamqp_body.ContentBody]:
"""Split body based on the maximum frame size.
This function is based on code from Rabbitpy.
https://github.com/gmr/rabbitpy
:param bytes,str,unicode body: Message payload
:rtype: collections.Iterable
"""
body_len = len(body)
max_frame_size = self._max_frame_size
for start_frame in range(0, body_len, max_frame_size):
end_frame = min(start_frame + max_frame_size, body_len)
yield pamqp_body.ContentBody(body[start_frame:end_frame])
def _get_content_body(self, message_uuid: str, body_size: int) -> bytes:
"""Get Content Body using RPC requests.
:param str uuid_body: Rpc Identifier.
:param int body_size: Content Size.
:rtype: str
"""
body_parts: list[bytes] = []
body_len = 0
while body_len < body_size:
body_piece: Any = self._channel.rpc.get_request(
message_uuid, raw=True, multiple=True,
)
if not body_piece.value:
break
body_parts.append(body_piece.value)
body_len += len(body_piece.value)
return b''.join(body_parts)