Source code for amqpstorm.management.virtual_host

from amqpstorm.compatibility import quote
from amqpstorm.management.base import ManagementHandler

API_VIRTUAL_HOST = 'vhosts/%s'
API_VIRTUAL_HOSTS = 'vhosts'
API_VIRTUAL_HOSTS_PERMISSION = 'vhosts/%s/permissions'


[docs]class VirtualHost(ManagementHandler):
[docs] def get(self, virtual_host): """Get Virtual Host details. :param str virtual_host: Virtual host name :raises ApiError: Raises if the remote server encountered an error. :raises ApiConnectionError: Raises if there was a connectivity issue. :rtype: dict """ virtual_host = quote(virtual_host, '') return self.http_client.get(API_VIRTUAL_HOST % virtual_host)
[docs] def list(self): """List all Virtual Hosts. :raises ApiError: Raises if the remote server encountered an error. :raises ApiConnectionError: Raises if there was a connectivity issue. :rtype: list """ return self.http_client.get(API_VIRTUAL_HOSTS)
[docs] def create(self, virtual_host): """Create a Virtual Host. :param str virtual_host: Virtual host name :raises ApiError: Raises if the remote server encountered an error. :raises ApiConnectionError: Raises if there was a connectivity issue. :rtype: dict """ virtual_host = quote(virtual_host, '') return self.http_client.put(API_VIRTUAL_HOST % virtual_host)
[docs] def delete(self, virtual_host): """Delete a Virtual Host. :param str virtual_host: Virtual host name :raises ApiError: Raises if the remote server encountered an error. :raises ApiConnectionError: Raises if there was a connectivity issue. :rtype: dict """ virtual_host = quote(virtual_host, '') return self.http_client.delete(API_VIRTUAL_HOST % virtual_host)
[docs] def get_permissions(self, virtual_host): """Get all Virtual hosts permissions. :raises ApiError: Raises if the remote server encountered an error. :raises ApiConnectionError: Raises if there was a connectivity issue. :rtype: dict """ virtual_host = quote(virtual_host, '') return self.http_client.get(API_VIRTUAL_HOSTS_PERMISSION % ( virtual_host ))