Source code for amqpstorm.uri_connection

"""AMQPStorm Uri wrapper for Connection."""
from __future__ import annotations

import logging
from typing import Any
from urllib.parse import ParseResult

from amqpstorm import compatibility
from amqpstorm.compatibility import ssl
from amqpstorm.compatibility import urlparse
from amqpstorm.connection import Connection
from amqpstorm.connection import DEFAULT_HEARTBEAT_TIMEOUT
from amqpstorm.connection import DEFAULT_SOCKET_TIMEOUT
from amqpstorm.connection import DEFAULT_VIRTUAL_HOST
from amqpstorm.exception import AMQPConnectionError

LOGGER = logging.getLogger(__name__)


[docs] class UriConnection(Connection): """RabbitMQ Connection that takes a Uri string. e.g. :: import amqpstorm connection = amqpstorm.UriConnection( 'amqp://guest:guest@localhost:5672/%2F?heartbeat=60' ) Using a SSL Context: :: import ssl import amqpstorm ssl_options = { 'context': ssl.create_default_context(cafile='ca_certificate.pem'), 'server_hostname': 'rmq.eandersson.net' } connection = amqpstorm.UriConnection( 'amqps://guest:guest@rmq.eandersson.net:5671/%2F?heartbeat=60', ssl_options=ssl_options ) :param str uri: AMQP Connection string :param dict ssl_options: SSL kwargs :param dict client_properties: None or dict of client properties :param bool lazy: Lazy initialize the connection :raises TypeError: Raises on invalid uri. :raises ValueError: Raises on invalid uri. :raises AttributeError: Raises on invalid uri. :raises AMQPConnectionError: Raises if the connection encountered an error. """ __slots__ = []
[docs] def __init__( self, uri: str, ssl_options: dict[str, Any] | None = None, client_properties: dict[str, Any] | None = None, lazy: bool = False, ) -> None: uri = compatibility.patch_uri(uri) parsed_uri = urlparse.urlparse(uri) use_ssl = parsed_uri.scheme == 'amqps' or parsed_uri.scheme == 'https' hostname = parsed_uri.hostname or 'localhost' port = parsed_uri.port or (5671 if use_ssl else 5672) username = urlparse.unquote(parsed_uri.username or 'guest') password = urlparse.unquote(parsed_uri.password or 'guest') kwargs = self._parse_uri_options(parsed_uri, use_ssl, ssl_options) super().__init__( hostname, username, password, port, client_properties=client_properties, lazy=lazy, **kwargs )
def _parse_uri_options( self, parsed_uri: ParseResult, use_ssl: bool = False, ssl_options: dict[str, Any] | None = None, ) -> dict[str, Any]: """Parse the uri options. :param parsed_uri: :param bool use_ssl: :return: """ ssl_options = ssl_options or {} kwargs = urlparse.parse_qs(parsed_uri.query) vhost = urlparse.unquote(parsed_uri.path[1:]) or DEFAULT_VIRTUAL_HOST options = { 'ssl': use_ssl, 'virtual_host': vhost, 'heartbeat': int(kwargs.pop('heartbeat', [DEFAULT_HEARTBEAT_TIMEOUT])[0]), 'poller': kwargs.pop('poller', ['select'])[0], 'locale': kwargs.pop('locale', ['en_US'])[0], 'timeout': int(kwargs.pop('timeout', [DEFAULT_SOCKET_TIMEOUT])[0]) } if use_ssl: if not compatibility.SSL_SUPPORTED: raise AMQPConnectionError( 'Python not compiled with support ' 'for TLSv1 or higher' ) ssl_options.update(self._parse_ssl_options(kwargs)) options['ssl_options'] = ssl_options return options def _parse_ssl_options(self, ssl_kwargs: dict[str, Any]) -> dict[str, Any]: """Parse TLS Options. :param ssl_kwargs: :rtype: dict """ ssl_options = {} for key in ssl_kwargs: if key not in compatibility.SSL_OPTIONS: LOGGER.warning('invalid option: %s', key) continue elif 'cert_reqs' in key: value = self._get_ssl_validation(ssl_kwargs[key][0]) else: value = ssl_kwargs[key][0] ssl_options[key] = value return ssl_options def _get_ssl_validation(self, value: str) -> Any: """Get the TLS Validation option. :param str value: :return: TLS Certificate Options """ return self._get_ssl_attribute( value, compatibility.SSL_CERT_MAP, ssl.CERT_REQUIRED, "ssl_options: cert_reqs %r not recognised; falling back to " 'CERT_REQUIRED.', ) @staticmethod def _get_ssl_attribute( value: str, mapping: dict[str, Any], default_value: Any, warning_message: str, ) -> Any: """Get the TLS attribute based on the compatibility mapping. Looks up ``'cert_<value>'`` (case-insensitive) in the mapping. If no valid attribute can be found, fall back on the default and emit a warning. :param str value: :param dict mapping: Dictionary based mapping :param default_value: Default fall-back value :param str warning_message: Warning message :return: """ needle = value.strip().lower() for candidate in (needle, 'cert_' + needle): if candidate in mapping: return mapping[candidate] LOGGER.warning(warning_message, value) return default_value