Management Api

class amqpstorm.management.ManagementApi[source]

RabbitMQ Management Api

e.g.

from amqpstorm.management import ManagementApi
client = ManagementApi('https://localhost:15671', 'guest', 'guest', verify=True)
client.user.create('my_user', 'password', tags='administrator')
client.user.set_permission(
    'my_user',
    virtual_host='/',
    configure_regex='.*',
    write_regex='.*',
    read_regex='.*'
)
Parameters:
  • api_url (str) – RabbitMQ Management url (e.g. https://rmq.eandersson.net:15671)

  • username (str) – Username (e.g. guest)

  • password (str) – Password (e.g. guest)

  • timeout (int,float) – TCP Timeout

  • verify (None,str,bool) – Requests session verify (e.g. True, False or path to CA bundle)

  • cert (None,str,tuple) – Requests session cert

__init__(api_url: str, username: str, password: str, timeout: float = 10, verify: bool | str | None = None, cert: str | tuple[str, str] | None = None) None[source]
property basic: Basic

RabbitMQ Basic Operations.

e.g.

client.basic.publish('Hello RabbitMQ', routing_key='my_queue')
Return type:

Basic

property channel: Channel

RabbitMQ Channel Operations.

e.g.

client.channel.list()
Return type:

Channel

property connection: Connection

RabbitMQ Connection Operations.

e.g.

client.connection.list()
Return type:

Connection

property exchange: Exchange

RabbitMQ Exchange Operations.

e.g.

client.exchange.declare('my_exchange')
Return type:

Exchange

property healthchecks: HealthChecks

RabbitMQ Healthchecks.

e.g.

client.healthchecks.get()
Return type:

HealthChecks

property queue: Queue

RabbitMQ Queue Operations.

e.g.

client.queue.declare('my_queue', virtual_host='/')
Return type:

Queue

property user: User

RabbitMQ User Operations.

e.g.

client.user.create('my_user', 'password')
Return type:

User

property virtual_host: VirtualHost

RabbitMQ VirtualHost Operations.

Return type:

VirtualHost

aliveness_test(virtual_host: str = '/') dict[str, Any][source]

Aliveness Test.

e.g.

from amqpstorm.management import ManagementApi
client = ManagementApi('http://localhost:15672', 'guest', 'guest')
result = client.aliveness_test('/')
if result['status'] == 'ok':
    print("RabbitMQ is alive!")
else:
    print("RabbitMQ is not alive! :(")
Parameters:

virtual_host (str) – Virtual host name

Raises:
Return type:

dict

cluster_name() dict[str, Any][source]

Get Cluster Name.

Raises:
Return type:

dict

node(name: str) dict[str, Any][source]

Get Nodes.

Raises:
Return type:

dict

nodes() list[dict[str, Any]][source]

Get Nodes.

Raises:
Return type:

dict

overview() dict[str, Any][source]

Get Overview.

Raises:
Return type:

dict

top() list[dict[str, Any]][source]

Top Processes.

Raises:
Return type:

list

whoami() dict[str, Any][source]

Who am I?

Raises:
Return type:

dict

classmethod __new__(*args, **kwargs)
class amqpstorm.management.basic.Basic[source]
publish(body: str, routing_key: str, exchange: str = 'amq.default', virtual_host: str = '/', properties: dict[str, Any] | None = None, payload_encoding: str = 'string') dict[str, Any][source]

Publish a Message.

Parameters:
  • body (bytes,str,unicode) – Message payload

  • routing_key (str) – Message routing key

  • exchange (str) – The exchange to publish the message to

  • virtual_host (str) – Virtual host name

  • properties (dict) – Message properties

  • payload_encoding (str) – Payload encoding.

Raises:
Return type:

dict

get(queue: str, virtual_host: str = '/', requeue: bool = False, to_dict: bool = False, count: int = 1, truncate: int = 50000, encoding: str = 'auto') list[Message] | list[dict[str, Any]][source]

Get Messages.

Parameters:
  • queue (str) – Queue name

  • virtual_host (str) – Virtual host name

  • requeue (bool) – Re-queue message

  • to_dict (bool) – Should incoming messages be converted to a dictionary before delivery.

  • count (int) – How many messages should we try to fetch.

  • truncate (int) – The maximum length in bytes, beyond that the server will truncate the message.

  • encoding (str) – Message encoding.

Raises:
Return type:

list

class amqpstorm.management.channel.Channel[source]
get(channel: str) dict[str, Any][source]

Get Channel details.

Parameters:

channel – Channel name

Raises:
  • ApiError – Raises if the remote server encountered an error. We also raise an exception if the channel cannot be found.

  • ApiConnectionError – Raises if there was a connectivity issue.

Return type:

dict

list(name: str | None = None, page_size: int | None = None, use_regex: bool = False) List[dict[str, Any]][source]

List all Channels.

Parameters:
  • name – Filter by name

  • use_regex – Enables regular expression for the param name

  • page_size – Number of elements per page

Raises:
Return type:

list

class amqpstorm.management.connection.Connection[source]
get(connection: str) dict[str, Any][source]

Get Connection details.

Parameters:

connection (str) – Connection name

Raises:
  • ApiError – Raises if the remote server encountered an error. We also raise an exception if the connection cannot be found.

  • ApiConnectionError – Raises if there was a connectivity issue.

Return type:

dict

list(name: str | None = None, page_size: int = 100, use_regex: bool = False) List[dict[str, Any]][source]

Get Connections.

Parameters:
  • name – Filter by name

  • use_regex – Enables regular expression for the param name

  • page_size – Number of elements per page

Raises:
Return type:

list

close(connection: str, reason: str = 'Closed via management api') None[source]

Close Connection.

Parameters:
  • connection (str) – Connection name

  • reason (str) – Reason for closing connection.

Raises:
Return type:

None

class amqpstorm.management.exchange.Exchange[source]
get(exchange: str, virtual_host: str = '/') dict[str, Any][source]

Get Exchange details.

Parameters:
  • exchange (str) – Exchange name

  • virtual_host (str) – Virtual host name

Raises:
Return type:

dict

list(virtual_host: str = '/', show_all: bool = False, name: str | None = None, page_size: int = 100, use_regex: bool = False) List[dict[str, Any]][source]

List Exchanges.

Parameters:
  • virtual_host (str) – Virtual host name

  • show_all (bool) – List Exchanges across all virtual hosts

  • name – Filter by name

  • use_regex – Enables regular expression for the param name

  • page_size – Number of elements per page

Raises:
  • ApiError – Raises if the remote server encountered an error. We also raise an exception if the exchange cannot be found.

  • ApiConnectionError – Raises if there was a connectivity issue.

Return type:

list

declare(exchange: str = '', exchange_type: str = 'direct', virtual_host: str = '/', passive: bool = False, durable: bool = False, auto_delete: bool = False, internal: bool = False, arguments: dict[str, Any] | None = None) dict[str, Any] | None[source]

Declare an Exchange.

Parameters:
  • exchange (str) – Exchange name

  • exchange_type (str) – Exchange type

  • virtual_host (str) – Virtual host name

  • passive (bool) – Do not create

  • durable (bool) – Durable exchange

  • auto_delete (bool) – Automatically delete when not in use

  • internal (bool) – Is the exchange for use by the broker only.

  • arguments (dict,None) – Exchange key/value arguments

Raises:
Return type:

None

delete(exchange: str, virtual_host: str = '/') dict[str, Any][source]

Delete an Exchange.

Parameters:
  • exchange (str) – Exchange name

  • virtual_host (str) – Virtual host name

Raises:
Return type:

dict

bindings(exchange: str, virtual_host: str = '/') List[dict[str, Any]][source]

Get Exchange bindings.

Parameters:
  • exchange (str) – Exchange name

  • virtual_host (str) – Virtual host name

Raises:
Return type:

list

bind(destination: str = '', source: str = '', routing_key: str = '', virtual_host: str = '/', arguments: dict[str, Any] | None = None) None[source]

Bind an Exchange.

Parameters:
  • source (str) – Source Exchange name

  • destination (str) – Destination Exchange name

  • routing_key (str) – The routing key to use

  • virtual_host (str) – Virtual host name

  • arguments (dict,None) – Bind key/value arguments

Raises:
Return type:

None

unbind(destination: str = '', source: str = '', routing_key: str = '', virtual_host: str = '/', properties_key: str | None = None) None[source]

Unbind an Exchange.

Parameters:
  • source (str) – Source Exchange name

  • destination (str) – Destination Exchange name

  • routing_key (str) – The routing key to use

  • virtual_host (str) – Virtual host name

  • properties_key (str)

Raises:
Return type:

None

class amqpstorm.management.healthchecks.HealthChecks[source]
get(node: str | None = None) dict[str, Any][source]

Run basic healthchecks against the current node, or against a given node.

Example response:

> {“status”:”ok”} > {“status”:”failed”,”reason”:”string”}

Parameters:

node – Node name

Raises:
Return type:

dict

class amqpstorm.management.queue.Queue[source]
get(queue: str, virtual_host: str = '/') dict[str, Any][source]

Get Queue details.

Parameters:
  • queue – Queue name

  • virtual_host (str) – Virtual host name

Raises:
  • ApiError – Raises if the remote server encountered an error. We also raise an exception if the queue cannot be found.

  • ApiConnectionError – Raises if there was a connectivity issue.

Return type:

dict

list(virtual_host: str = '/', show_all: bool = False, name: str | None = None, page_size: int = 100, use_regex: bool = False) List[dict[str, Any]][source]

List Queues.

Parameters:
  • virtual_host (str) – Virtual host name

  • show_all (bool) – List Queues across all virtual hosts

  • name – Filter by name

  • use_regex – Enables regular expression for the param name

  • page_size – Number of elements per page

Raises:
Return type:

list

declare(queue: str = '', virtual_host: str = '/', passive: bool = False, durable: bool = False, auto_delete: bool = False, arguments: dict[str, Any] | None = None) dict[str, Any][source]

Declare a Queue.

Parameters:
  • queue (str) – Queue name

  • virtual_host (str) – Virtual host name

  • passive (bool) – Do not create

  • durable (bool) – Durable queue

  • auto_delete (bool) – Automatically delete when not in use

  • arguments (dict,None) – Queue key/value arguments

Raises:
Return type:

dict

delete(queue: str, virtual_host: str = '/') dict[str, Any][source]

Delete a Queue.

Parameters:
  • queue (str) – Queue name

  • virtual_host (str) – Virtual host name

Raises:
Return type:

dict

purge(queue: str, virtual_host: str = '/') None[source]

Purge a Queue.

Parameters:
  • queue (str) – Queue name

  • virtual_host (str) – Virtual host name

Raises:
Return type:

None

bindings(queue: str, virtual_host: str = '/') List[dict[str, Any]][source]

Get Queue bindings.

Parameters:
  • queue (str) – Queue name

  • virtual_host (str) – Virtual host name

Raises:
Return type:

list

bind(queue: str = '', exchange: str = '', routing_key: str = '', virtual_host: str = '/', arguments: dict[str, Any] | None = None) None[source]

Bind a Queue.

Parameters:
  • queue (str) – Queue name

  • exchange (str) – Exchange name

  • routing_key (str) – The routing key to use

  • virtual_host (str) – Virtual host name

  • arguments (dict,None) – Bind key/value arguments

Raises:
Return type:

None

unbind(queue: str = '', exchange: str = '', routing_key: str = '', virtual_host: str = '/', properties_key: str | None = None) None[source]

Unbind a Queue.

Parameters:
  • queue (str) – Queue name

  • exchange (str) – Exchange name

  • routing_key (str) – The routing key to use

  • virtual_host (str) – Virtual host name

  • properties_key (str)

Raises:
Return type:

None

class amqpstorm.management.user.User[source]
get(username: str) dict[str, Any][source]

Get User details.

Parameters:

username (str) – Username

Raises:
  • ApiError – Raises if the remote server encountered an error. We also raise an exception if the user cannot be found.

  • ApiConnectionError – Raises if there was a connectivity issue.

Return type:

dict

list() List[dict[str, Any]][source]

List all Users.

Return type:

list

create(username: str, password: str, tags: str | List[str] = '') None[source]

Create User.

Parameters:
  • username (str) – Username

  • password (str) – Password

  • tags (str,list) – Comma-separate list of tags (e.g. monitoring)

Raises:
Return type:

None

delete(username: str | List[str]) dict[str, Any][source]

Delete User or a list of Users.

Parameters:

username (str,list) – Username or a list of Usernames

Raises:
Return type:

dict

get_permission(username: str, virtual_host: str) dict[str, Any][source]

Get User permissions for the configured virtual host.

Parameters:
  • username (str) – Username

  • virtual_host (str) – Virtual host name

Raises:
Return type:

dict

get_permissions(username: str) dict[str, Any][source]

Get all Users permissions.

Parameters:

username (str) – Username

Raises:
Return type:

dict

set_permission(username: str, virtual_host: str, configure_regex: str = '.*', write_regex: str = '.*', read_regex: str = '.*') dict[str, Any][source]

Set User permissions for the configured virtual host.

Parameters:
  • username (str) – Username

  • virtual_host (str) – Virtual host name

  • configure_regex (str) – Permission pattern for configuration operations for this user.

  • write_regex (str) – Permission pattern for write operations for this user.

  • read_regex (str) – Permission pattern for read operations for this user.

Raises:
Return type:

dict

delete_permission(username: str, virtual_host: str) dict[str, Any][source]

Delete User permissions for the configured virtual host.

Parameters:
  • username (str) – Username

  • virtual_host (str) – Virtual host name

Raises:
Return type:

dict

class amqpstorm.management.virtual_host.VirtualHost[source]
get(virtual_host: str) dict[str, Any][source]

Get Virtual Host details.

Parameters:

virtual_host (str) – Virtual host name

Raises:
  • ApiError – Raises if the remote server encountered an error. We also raise an exception if the virtual host cannot be found.

  • ApiConnectionError – Raises if there was a connectivity issue.

Return type:

dict

list() List[dict[str, Any]][source]

List all Virtual Hosts.

Raises:
Return type:

list

create(virtual_host: str) dict[str, Any][source]

Create a Virtual Host.

Parameters:

virtual_host (str) – Virtual host name

Raises:
Return type:

dict

delete(virtual_host: str) dict[str, Any][source]

Delete a Virtual Host.

Parameters:

virtual_host (str) – Virtual host name

Raises:
Return type:

dict

get_permissions(virtual_host: str) dict[str, Any][source]

Get all Virtual hosts permissions.

Raises:
Return type:

dict