from __future__ import annotations
from typing import Any
from amqpstorm.compatibility import quote
from amqpstorm.management.basic import Basic
from amqpstorm.management.channel import Channel
from amqpstorm.management.connection import Connection
from amqpstorm.management.exchange import Exchange
from amqpstorm.management.healthchecks import HealthChecks
from amqpstorm.management.http_client import HTTPClient
from amqpstorm.management.queue import Queue
from amqpstorm.management.user import User
from amqpstorm.management.virtual_host import VirtualHost
API_ALIVENESS_TEST = 'aliveness-test/%s'
API_CLUSTER_NAME = 'cluster-name'
API_NODE = 'nodes/%s'
API_NODES = 'nodes'
API_OVERVIEW = 'overview'
API_TOP = 'top/%s'
API_WHOAMI = 'whoami'
[docs]
class ManagementApi:
"""RabbitMQ Management Api
e.g.
::
from amqpstorm.management import ManagementApi
client = ManagementApi('https://localhost:15671', 'guest', 'guest', verify=True)
client.user.create('my_user', 'password', tags='administrator')
client.user.set_permission(
'my_user',
virtual_host='/',
configure_regex='.*',
write_regex='.*',
read_regex='.*'
)
:param str api_url: RabbitMQ Management url (e.g. https://rmq.eandersson.net:15671)
:param str username: Username (e.g. guest)
:param str password: Password (e.g. guest)
:param int,float timeout: TCP Timeout
:param None,str,bool verify: Requests session verify (e.g. True, False or path to CA bundle)
:param None,str,tuple cert: Requests session cert
"""
[docs]
def __init__(
self,
api_url: str,
username: str,
password: str,
timeout: float = 10,
verify: bool | str | None = None,
cert: str | tuple[str, str] | None = None,
) -> None:
self.http_client = HTTPClient(
api_url, username, password,
timeout=timeout, verify=verify, cert=cert
)
self._basic = Basic(self.http_client)
self._channel = Channel(self.http_client)
self._connection = Connection(self.http_client)
self._exchange = Exchange(self.http_client)
self._healthchecks = HealthChecks(self.http_client)
self._queue = Queue(self.http_client)
self._user = User(self.http_client)
self._virtual_host = VirtualHost(self.http_client)
def __enter__(self) -> ManagementApi:
return self
def __exit__(self, *_: Any) -> None:
pass
def __del__(self) -> None:
self.http_client.session.close()
@property
def basic(self) -> Basic:
"""RabbitMQ Basic Operations.
e.g.
::
client.basic.publish('Hello RabbitMQ', routing_key='my_queue')
:rtype: amqpstorm.management.basic.Basic
"""
return self._basic
@property
def channel(self) -> Channel:
"""RabbitMQ Channel Operations.
e.g.
::
client.channel.list()
:rtype: amqpstorm.management.channel.Channel
"""
return self._channel
@property
def connection(self) -> Connection:
"""RabbitMQ Connection Operations.
e.g.
::
client.connection.list()
:rtype: amqpstorm.management.connection.Connection
"""
return self._connection
@property
def exchange(self) -> Exchange:
"""RabbitMQ Exchange Operations.
e.g.
::
client.exchange.declare('my_exchange')
:rtype: amqpstorm.management.exchange.Exchange
"""
return self._exchange
@property
def healthchecks(self) -> HealthChecks:
"""RabbitMQ Healthchecks.
e.g.
::
client.healthchecks.get()
:rtype: amqpstorm.management.healthchecks.HealthChecks
"""
return self._healthchecks
@property
def queue(self) -> Queue:
"""RabbitMQ Queue Operations.
e.g.
::
client.queue.declare('my_queue', virtual_host='/')
:rtype: amqpstorm.management.queue.Queue
"""
return self._queue
@property
def user(self) -> User:
"""RabbitMQ User Operations.
e.g.
::
client.user.create('my_user', 'password')
:rtype: amqpstorm.management.user.User
"""
return self._user
@property
def virtual_host(self) -> VirtualHost:
"""RabbitMQ VirtualHost Operations.
:rtype: amqpstorm.management.virtual_host.VirtualHost
"""
return self._virtual_host
[docs]
def aliveness_test(self, virtual_host: str = '/') -> dict[str, Any]:
"""Aliveness Test.
e.g.
::
from amqpstorm.management import ManagementApi
client = ManagementApi('http://localhost:15672', 'guest', 'guest')
result = client.aliveness_test('/')
if result['status'] == 'ok':
print("RabbitMQ is alive!")
else:
print("RabbitMQ is not alive! :(")
:param str virtual_host: Virtual host name
:raises ApiError: Raises if the remote server encountered an error.
:raises ApiConnectionError: Raises if there was a connectivity issue.
:rtype: dict
"""
virtual_host = quote(virtual_host, '')
return self.http_client.get(API_ALIVENESS_TEST %
virtual_host)
[docs]
def cluster_name(self) -> dict[str, Any]:
"""Get Cluster Name.
:raises ApiError: Raises if the remote server encountered an error.
:raises ApiConnectionError: Raises if there was a connectivity issue.
:rtype: dict
"""
return self.http_client.get(API_CLUSTER_NAME)
[docs]
def node(self, name: str) -> dict[str, Any]:
"""Get Nodes.
:raises ApiError: Raises if the remote server encountered an error.
:raises ApiConnectionError: Raises if there was a connectivity issue.
:rtype: dict
"""
return self.http_client.get(API_NODE % name)
[docs]
def nodes(self) -> list[dict[str, Any]]:
"""Get Nodes.
:raises ApiError: Raises if the remote server encountered an error.
:raises ApiConnectionError: Raises if there was a connectivity issue.
:rtype: dict
"""
return self.http_client.get(API_NODES)
[docs]
def overview(self) -> dict[str, Any]:
"""Get Overview.
:raises ApiError: Raises if the remote server encountered an error.
:raises ApiConnectionError: Raises if there was a connectivity issue.
:rtype: dict
"""
return self.http_client.get(API_OVERVIEW)
[docs]
def top(self) -> list[dict[str, Any]]:
"""Top Processes.
:raises ApiError: Raises if the remote server encountered an error.
:raises ApiConnectionError: Raises if there was a connectivity issue.
:rtype: list
"""
nodes = []
for node in self.nodes():
nodes.append(self.http_client.get(API_TOP % node['name']))
return nodes
[docs]
def whoami(self) -> dict[str, Any]:
"""Who am I?
:raises ApiError: Raises if the remote server encountered an error.
:raises ApiConnectionError: Raises if there was a connectivity issue.
:rtype: dict
"""
return self.http_client.get(API_WHOAMI)