Robust ConsumerΒΆ

"""
Robust Consumer that will automatically re-connect on failure.
"""
import logging
import time

import amqpstorm
from amqpstorm import Connection

logging.basicConfig(level=logging.INFO)
LOGGER = logging.getLogger()


class Consumer(object):
    def __init__(self, max_retries=None):
        self.max_retries = max_retries
        self.connection = None

    def create_connection(self):
        """Create a connection.

        :return:
        """
        attempts = 0
        while True:
            attempts += 1
            try:
                self.connection = Connection('localhost', 'guest', 'guest')
                break
            except amqpstorm.AMQPError as why:
                LOGGER.exception(why)
                if self.max_retries and attempts > self.max_retries:
                    break
                time.sleep(min(attempts * 2, 30))
            except KeyboardInterrupt:
                break

    def start(self):
        """Start the Consumers.

        :return:
        """
        if not self.connection:
            self.create_connection()
        while True:
            try:
                channel = self.connection.channel()
                channel.queue.declare('simple_queue')
                channel.basic.consume(self, 'simple_queue', no_ack=False)
                channel.start_consuming()
                if not channel.consumer_tags:
                    channel.close()
            except amqpstorm.AMQPError as why:
                LOGGER.exception(why)
                self.create_connection()
            except KeyboardInterrupt:
                self.connection.close()
                break

    def __call__(self, message):
        print("Message:", message.body)

        # Acknowledge that we handled the message without any issues.
        message.ack()

        # Reject the message.
        # message.reject()

        # Reject the message, and put it back in the queue.
        # message.reject(requeue=True)


if __name__ == '__main__':
    CONSUMER = Consumer()
    CONSUMER.start()