Source code for amqpstorm.management.exception

from __future__ import annotations

from typing import Any

from amqpstorm.exception import AMQPError
from amqpstorm.exception import AMQP_ERROR_MAPPING


[docs] class ApiError(AMQPError): """Management Api Error"""
[docs] def __init__( self, message: str | None = None, *args: Any, **kwargs: Any, ) -> None: self._message = message self._error_code = kwargs.pop('reply_code', None) super(AMQPError, self).__init__(*args, **kwargs) if self._error_code not in AMQP_ERROR_MAPPING: return self._error_type = AMQP_ERROR_MAPPING[self._error_code][0] self._documentation = AMQP_ERROR_MAPPING[self._error_code][1]
def __str__(self) -> str: if self._error_code in AMQP_ERROR_MAPPING: documentation = self.documentation if isinstance(documentation, bytes): documentation = documentation.decode('utf-8', errors='replace') return f'{self.error_type} - {documentation}' return self._message or ''
[docs] class ApiConnectionError(AMQPError): """Management Api Connection Error""" pass