Wednesday, April 9, 2014

Listen to OpenStack Neutron Messages from RabbitMQ using Kombu messaging library

As I continue to investigate on how to write the plugin or more precise the mechanism driver for the neutron ml2 plugin. I would like to look at the interaction among nova, neutron and its agents. I found out that some of the communication is using the RabbitMQ messaging. I understand that neutron uses the python kombu message library.  So I trying to write a few lines of code to listen to the messages.

I've modified the sample code of worker.py from here to suit my need. Here is my initial code.

from kombu.mixins import ConsumerMixin
from kombu.log import get_logger
from kombu import Queue, Exchange

logger = get_logger(__name__)


class Worker(ConsumerMixin):
    task_queue = Queue('notifications.info', Exchange('neutron', 'topic'))

    def __init__(self, connection):
        self.connection = connection

    def get_consumers(self, Consumer, channel):
        return [Consumer(queues=[self.task_queue],
                         accept=['json'],
                         callbacks=[self.process_task])]

    def process_task(self, body, message):
        print("RECEIVED MESSAGE: %r" % (body, ))
        message.ack()

if __name__ == '__main__':
    from kombu import Connection
    from kombu.utils.debug import setup_logging
    # setup root logger
    setup_logging(loglevel='DEBUG', loggers=[''])

    with Connection('amqp://guest:supersecrete@localhost:5672//') as conn:
        try:
            print(conn)
            worker = Worker(conn)
            worker.run()
        except KeyboardInterrupt:
            print('bye bye')


The above highlighted codes are the changes. I make sure I use the correct queue name and exchange, and the 'guess' password you set in your setup. 

To find out which queue name and topic available, I use

sudo rabbitmqctl list_exchanges

and

sudo rabbitmqctl list_queues

To find out more info about rabbitmqctl,  read the man page here.

I pick the 'notifications.info' queue because I am interesting to look into the 'port.create.start' and 'port.create.end' events which are useful to my current work.

Then I ran the above program,
<Connection: amqp://guest@localhost:5672// at 0x2396050>

Everything seemed fine but I did not receive any events when the port creation was triggered by instance creation.

So what did it go wrong?

After poking a few places, I saw the message from the rabbitmq log. The log is located at /var/log/rabbitmq.

=ERROR REPORT==== 8-Apr-2014::17:47:06 ===
connection <0.25614.29>, channel 1 - soft error:
{amqp_error,precondition_failed,
            "cannot redeclare exchange 'neutron' in vhost '/' with different type, durable, internal or autodelete value",
            'exchange.declare'}

So the default settings of the kombu topic is different from the Neutron ml2 plugin of the OpenStack. RabbitMQ thought I tried to redeclare some of the attributes of the topic. Since sample code uses the default settings of the Exchange class, I check the default settings from the API doc and the settings of the topic using 'rabbitmqctl list_exchanges'.

Note: The default output of 'sudo rabbitmqctl list_exchanges' only shows name and type attributes. To list the addition attributes, you need to specify them as arguments. For example, 'sudo rabbitmqctl list_exchanges name type autodelete' lists the name, type and autodelete attributes. Please see the man page for details.

I found that the autodelete attributes for both topic and queue needs to be set to False. Here is the source code with the highlighted changes.

from kombu.mixins import ConsumerMixin
from kombu.log import get_logger
from kombu import Queue, Exchange

logger = get_logger(__name__)


class Worker(ConsumerMixin):
    task_queue = Queue('notifications.info', Exchange('neutron', 'topic', durable=False), durable=False)

    def __init__(self, connection):
        self.connection = connection

    def get_consumers(self, Consumer, channel):
        return [Consumer(queues=[self.task_queue],
                         accept=['json'],
                         callbacks=[self.process_task])]

    def process_task(self, body, message):
        print("RECEIVED MESSAGE: %r" % (body, ))
        message.ack()

if __name__ == '__main__':
    from kombu import Connection
    from kombu.utils.debug import setup_logging
    # setup root logger
    setup_logging(loglevel='DEBUG', loggers=[''])

    with Connection('amqp://guest:supersecrete@localhost:5672//') as conn:
        try:
            print(conn)
            worker = Worker(conn)
            worker.run()
        except KeyboardInterrupt:
            print('bye bye')

After making the changes, now I can receive the messages. 

2 comments:

  1. Fred hi,

    I would appreciate little bit help with this listener
    I don't know about python, so my basic question is: This worker.py in which directory do you put it and how do you configure rabbitmq to use it?

    Thank you

    ReplyDelete
  2. You can put the python on any directory you would like to. Just run the python worker.py.

    Assume you have rabbitmq installed. The exchange and queue can be created programmatically. You may want to start with these examples from this link, http://kombu.readthedocs.org/en/latest/userguide/examples.html.


    ReplyDelete