Source code for amqpstorm.tx

"""AMQPStorm Channel.Tx."""
from __future__ import annotations

import logging
from types import TracebackType
from typing import TYPE_CHECKING
from typing import Any

from pamqp import commands

from amqpstorm.base import Handler

if TYPE_CHECKING:
    from amqpstorm.channel import Channel

LOGGER = logging.getLogger(__name__)


[docs] class Tx(Handler): """RabbitMQ Transactions. Server local transactions, in which the server will buffer published messages until the client commits (or rollback) the messages. """ __slots__ = ['_tx_active']
[docs] def __init__(self, channel: Channel) -> None: self._tx_active = True super().__init__(channel)
def __enter__(self) -> Tx: self.select() return self def __exit__( self, exception_type: type[BaseException] | None, exception_value: BaseException | None, _: TracebackType | None, ) -> None: if exception_type: LOGGER.warning( 'Leaving Transaction on exception: %s', exception_value ) if self._tx_active: self.rollback() return if self._tx_active: self.commit()
[docs] def select(self) -> dict[str, Any] | None: """Enable standard transaction mode. This will enable transaction mode on the channel. Meaning that messages will be kept in the remote server buffer until such a time that either commit or rollback is called. :return: """ self._tx_active = True return self._channel.rpc_request(commands.Tx.Select())
[docs] def commit(self) -> dict[str, Any] | None: """Commit the current transaction. Commit all messages published during the current transaction session to the remote server. A new transaction session starts as soon as the command has been executed. :return: """ self._tx_active = False return self._channel.rpc_request(commands.Tx.Commit())
[docs] def rollback(self) -> dict[str, Any] | None: """Abandon the current transaction. Rollback all messages published during the current transaction session to the remote server. Note that all messages published during this transaction session will be lost, and will have to be published again. A new transaction session starts as soon as the command has been executed. :return: """ self._tx_active = False return self._channel.rpc_request(commands.Tx.Rollback())