from __future__ import annotations
from typing import Any
from amqpstorm.management.base import ManagementHandler
HEALTHCHECKS = 'healthchecks/node/'
HEALTHCHECKS_NODE = 'healthchecks/node/%s'
[docs]
class HealthChecks(ManagementHandler):
[docs]
def get(self, node: str | None = None) -> dict[str, Any]:
"""Run basic healthchecks against the current node, or against a given
node.
Example response:
> {"status":"ok"}
> {"status":"failed","reason":"string"}
:param node: Node name
:raises ApiError: Raises if the remote server encountered an error.
:raises ApiConnectionError: Raises if there was a connectivity issue.
:rtype: dict
"""
if not node:
return self.http_client.get(HEALTHCHECKS)
return self.http_client.get(HEALTHCHECKS_NODE % node)