Management Api

class amqpstorm.management.ManagementApi(api_url, username, password, timeout=10, verify=None, cert=None)[source]

RabbitMQ Management Api

e.g.

from amqpstorm.management import ManagementApi
client = ManagementApi('http://localhost:15672', 'guest', 'guest')
client.user.create('my_user', 'password')
client.user.set_permission(
    'my_user',
    virtual_host='/',
    configure_regex='.*',
    write_regex='.*',
    read_regex='.*'
)
basic

RabbitMQ Basic Operations.

e.g.

client.basic.publish('Hello RabbitMQ', routing_key='my_queue')
Return type:amqpstorm.management.basic.Basic
channel

RabbitMQ Channel Operations.

e.g.

client.channel.list()
Return type:amqpstorm.management.channel.Channel
connection

RabbitMQ Connection Operations.

e.g.

client.connection.list()
Return type:amqpstorm.management.connection.Connection
exchange

RabbitMQ Exchange Operations.

e.g.

client.exchange.declare('my_exchange')
Return type:amqpstorm.management.exchange.Exchange
queue

RabbitMQ Queue Operations.

e.g.

client.queue.declare('my_queue', virtual_host='/')
Return type:amqpstorm.management.queue.Queue
user

RabbitMQ User Operations.

e.g.

client.user.create('my_user', 'password')
Return type:amqpstorm.management.user.User
aliveness_test(virtual_host='/')[source]

Aliveness Test.

e.g.

from amqpstorm.management import ManagementApi
client = ManagementApi('http://localhost:15672', 'guest', 'guest')
result = client.aliveness_test('/')
if result['status'] == 'ok':
    print("RabbitMQ is alive!")
else:
    print("RabbitMQ is not alive! :(")
Parameters:

virtual_host (str) – Virtual host name

Raises:
Return type:

dict

overview()[source]

Get Overview.

Raises:
Return type:

dict

nodes()[source]

Get Nodes.

Raises:
Return type:

dict

top()[source]

Top Processes.

Raises:
Return type:

list

whoami()[source]

Who am I?

Raises:
Return type:

dict

class amqpstorm.management.basic.Basic(http_client)[source]
publish(body, routing_key, exchange='amq.default', virtual_host='/', properties=None, payload_encoding='string')[source]

Publish a Message.

Parameters:
  • body (bytes,str,unicode) – Message payload
  • routing_key (str) – Message routing key
  • exchange (str) – The exchange to publish the message to
  • virtual_host (str) – Virtual host name
  • properties (dict) – Message properties
  • payload_encoding (str) – Payload encoding.
Raises:
Return type:

dict

get(queue, virtual_host='/', requeue=False, to_dict=False, count=1, truncate=50000, encoding='auto')[source]

Get Messages.

Parameters:
  • queue (str) – Queue name
  • virtual_host (str) – Virtual host name
  • requeue (bool) – Re-queue message
  • to_dict (bool) – Should incoming messages be converted to a dictionary before delivery.
  • count (int) – How many messages should we try to fetch.
  • truncate (int) – The maximum length in bytes, beyond that the server will truncate the message.
  • encoding (str) – Message encoding.
Raises:
Return type:

list

class amqpstorm.management.channel.Channel(http_client)[source]
get(channel)[source]

Get Connection details.

Parameters:

channel – Channel name

Raises:
Return type:

dict

list()[source]

List all Channels.

Raises:
Return type:

list

class amqpstorm.management.connection.Connection(http_client)[source]
get(connection)[source]

Get Connection details.

Parameters:

connection (str) – Connection name

Raises:
Return type:

dict

list()[source]

Get Connections.

Raises:
Return type:

list

close(connection, reason='Closed via management api')[source]

Close Connection.

Parameters:
  • connection (str) – Connection name
  • reason (str) – Reason for closing connection.
Raises:
Return type:

None

class amqpstorm.management.exchange.Exchange(http_client)[source]
get(exchange, virtual_host='/')[source]

Get Exchange details.

Parameters:
  • exchange (str) – Exchange name
  • virtual_host (str) – Virtual host name
Raises:
Return type:

dict

list(virtual_host='/', show_all=False)[source]

List Exchanges.

Parameters:
  • virtual_host (str) – Virtual host name
  • show_all (bool) – List all Exchanges
Raises:
Return type:

list

declare(exchange='', exchange_type='direct', virtual_host='/', passive=False, durable=False, auto_delete=False, internal=False, arguments=None)[source]

Declare an Exchange.

Parameters:
  • exchange (str) – Exchange name
  • exchange_type (str) – Exchange type
  • virtual_host (str) – Virtual host name
  • passive (bool) – Do not create
  • durable (bool) – Durable exchange
  • auto_delete (bool) – Automatically delete when not in use
  • internal (bool) – Is the exchange for use by the broker only.
  • arguments (dict,None) – Exchange key/value arguments
Raises:
Return type:

None

delete(exchange, virtual_host='/')[source]

Delete an Exchange.

Parameters:
  • exchange (str) – Exchange name
  • virtual_host (str) – Virtual host name
Raises:
Return type:

dict

bindings(exchange, virtual_host='/')[source]

Get Exchange bindings.

Parameters:
  • exchange (str) – Exchange name
  • virtual_host (str) – Virtual host name
Raises:
Return type:

list

bind(destination='', source='', routing_key='', virtual_host='/', arguments=None)[source]

Bind an Exchange.

Parameters:
  • source (str) – Source Exchange name
  • destination (str) – Destination Exchange name
  • routing_key (str) – The routing key to use
  • virtual_host (str) – Virtual host name
  • arguments (dict,None) – Bind key/value arguments
Raises:
Return type:

None

unbind(destination='', source='', routing_key='', virtual_host='/', properties_key=None)[source]

Unbind an Exchange.

Parameters:
  • source (str) – Source Exchange name
  • destination (str) – Destination Exchange name
  • routing_key (str) – The routing key to use
  • virtual_host (str) – Virtual host name
  • properties_key (str) –
Raises:
Return type:

None

class amqpstorm.management.queue.Queue(http_client)[source]
get(queue, virtual_host='/')[source]

Get Queue details.

Parameters:
  • queue – Queue name
  • virtual_host (str) – Virtual host name
Raises:
Return type:

dict

list(virtual_host='/', show_all=False)[source]

List Queues.

Parameters:
  • virtual_host (str) – Virtual host name
  • show_all (bool) – List all Queues
Raises:
Return type:

list

declare(queue='', virtual_host='/', passive=False, durable=False, auto_delete=False, arguments=None)[source]

Declare a Queue.

Parameters:
  • queue (str) – Queue name
  • virtual_host (str) – Virtual host name
  • passive (bool) – Do not create
  • durable (bool) – Durable queue
  • auto_delete (bool) – Automatically delete when not in use
  • arguments (dict,None) – Queue key/value arguments
Raises:
Return type:

dict

delete(queue, virtual_host='/')[source]

Delete a Queue.

Parameters:
  • queue (str) – Queue name
  • virtual_host (str) – Virtual host name
Raises:
Return type:

dict

purge(queue, virtual_host='/')[source]

Purge a Queue.

Parameters:
  • queue (str) – Queue name
  • virtual_host (str) – Virtual host name
Raises:
Return type:

None

bindings(queue, virtual_host='/')[source]

Get Queue bindings.

Parameters:
  • queue (str) – Queue name
  • virtual_host (str) – Virtual host name
Raises:
Return type:

list

bind(queue='', exchange='', routing_key='', virtual_host='/', arguments=None)[source]

Bind a Queue.

Parameters:
  • queue (str) – Queue name
  • exchange (str) – Exchange name
  • routing_key (str) – The routing key to use
  • virtual_host (str) – Virtual host name
  • arguments (dict,None) – Bind key/value arguments
Raises:
Return type:

None

unbind(queue='', exchange='', routing_key='', virtual_host='/', properties_key=None)[source]

Unbind a Queue.

Parameters:
  • queue (str) – Queue name
  • exchange (str) – Exchange name
  • routing_key (str) – The routing key to use
  • virtual_host (str) – Virtual host name
  • properties_key (str) –
Raises:
Return type:

None

class amqpstorm.management.user.User(http_client)[source]
get(username)[source]

Get User details.

Parameters:username (str) – Username
Return type:dict
list()[source]

List all Users.

Return type:list
create(username, password, tags='')[source]

Create User.

Parameters:
  • username (str) – Username
  • password (str) – Password
  • tags (str) – Comma-separate list of tags (e.g. monitoring)
Return type:

None

delete(username)[source]

Delete User.

Parameters:username (str) – Username
Return type:dict
get_permission(username, virtual_host)[source]

Get User permissions for the configured virtual host.

Parameters:
  • username (str) – Username
  • virtual_host (str) – Virtual host name
Raises:
Return type:

dict

get_permissions(username)[source]

Get all Users permissions.

Parameters:

username (str) – Username

Raises:
Return type:

dict

set_permission(username, virtual_host, configure_regex='.*', write_regex='.*', read_regex='.*')[source]

Set User permissions for the configured virtual host.

Parameters:
  • username (str) – Username
  • virtual_host (str) – Virtual host name
  • configure_regex (str) – Permission pattern for configuration operations for this user.
  • write_regex (str) – Permission pattern for write operations for this user.
  • read_regex (str) – Permission pattern for read operations for this user.
Raises:
Return type:

dict

delete_permission(username, virtual_host)[source]

Delete User permissions for the configured virtual host.

Parameters:
  • username (str) – Username
  • virtual_host (str) – Virtual host name
Raises:
Return type:

dict

class amqpstorm.management.virtual_host.VirtualHost(http_client)[source]
get(virtual_host)[source]

Get Virtual Host details.

Parameters:

virtual_host (str) – Virtual host name

Raises:
Return type:

dict

list()[source]

List all Virtual Hosts.

Raises:
Return type:

list

create(virtual_host)[source]

Create a Virtual Host.

Parameters:

virtual_host (str) – Virtual host name

Raises:
Return type:

dict

delete(virtual_host)[source]

Delete a Virtual Host.

Parameters:

virtual_host (str) – Virtual host name

Raises:
Return type:

dict

get_permissions(virtual_host)[source]

Get all Virtual hosts permissions.

Raises:
Return type:

dict