Wednesday, April 9, 2014

Listen to OpenStack Neutron Messages from RabbitMQ using Kombu messaging library

As I continue to investigate on how to write the plugin or more precise the mechanism driver for the neutron ml2 plugin. I would like to look at the interaction among nova, neutron and its agents. I found out that some of the communication is using the RabbitMQ messaging. I understand that neutron uses the python kombu message library.  So I trying to write a few lines of code to listen to the messages.

I've modified the sample code of worker.py from here to suit my need. Here is my initial code.

from kombu.mixins import ConsumerMixin
from kombu.log import get_logger
from kombu import Queue, Exchange

logger = get_logger(__name__)


class Worker(ConsumerMixin):
    task_queue = Queue('notifications.info', Exchange('neutron', 'topic'))

    def __init__(self, connection):
        self.connection = connection

    def get_consumers(self, Consumer, channel):
        return [Consumer(queues=[self.task_queue],
                         accept=['json'],
                         callbacks=[self.process_task])]

    def process_task(self, body, message):
        print("RECEIVED MESSAGE: %r" % (body, ))
        message.ack()

if __name__ == '__main__':
    from kombu import Connection
    from kombu.utils.debug import setup_logging
    # setup root logger
    setup_logging(loglevel='DEBUG', loggers=[''])

    with Connection('amqp://guest:supersecrete@localhost:5672//') as conn:
        try:
            print(conn)
            worker = Worker(conn)
            worker.run()
        except KeyboardInterrupt:
            print('bye bye')


The above highlighted codes are the changes. I make sure I use the correct queue name and exchange, and the 'guess' password you set in your setup. 

To find out which queue name and topic available, I use

sudo rabbitmqctl list_exchanges

and

sudo rabbitmqctl list_queues

To find out more info about rabbitmqctl,  read the man page here.

I pick the 'notifications.info' queue because I am interesting to look into the 'port.create.start' and 'port.create.end' events which are useful to my current work.

Then I ran the above program,
<Connection: amqp://guest@localhost:5672// at 0x2396050>

Everything seemed fine but I did not receive any events when the port creation was triggered by instance creation.

So what did it go wrong?

After poking a few places, I saw the message from the rabbitmq log. The log is located at /var/log/rabbitmq.

=ERROR REPORT==== 8-Apr-2014::17:47:06 ===
connection <0.25614.29>, channel 1 - soft error:
{amqp_error,precondition_failed,
            "cannot redeclare exchange 'neutron' in vhost '/' with different type, durable, internal or autodelete value",
            'exchange.declare'}

So the default settings of the kombu topic is different from the Neutron ml2 plugin of the OpenStack. RabbitMQ thought I tried to redeclare some of the attributes of the topic. Since sample code uses the default settings of the Exchange class, I check the default settings from the API doc and the settings of the topic using 'rabbitmqctl list_exchanges'.

Note: The default output of 'sudo rabbitmqctl list_exchanges' only shows name and type attributes. To list the addition attributes, you need to specify them as arguments. For example, 'sudo rabbitmqctl list_exchanges name type autodelete' lists the name, type and autodelete attributes. Please see the man page for details.

I found that the autodelete attributes for both topic and queue needs to be set to False. Here is the source code with the highlighted changes.

from kombu.mixins import ConsumerMixin
from kombu.log import get_logger
from kombu import Queue, Exchange

logger = get_logger(__name__)


class Worker(ConsumerMixin):
    task_queue = Queue('notifications.info', Exchange('neutron', 'topic', durable=False), durable=False)

    def __init__(self, connection):
        self.connection = connection

    def get_consumers(self, Consumer, channel):
        return [Consumer(queues=[self.task_queue],
                         accept=['json'],
                         callbacks=[self.process_task])]

    def process_task(self, body, message):
        print("RECEIVED MESSAGE: %r" % (body, ))
        message.ack()

if __name__ == '__main__':
    from kombu import Connection
    from kombu.utils.debug import setup_logging
    # setup root logger
    setup_logging(loglevel='DEBUG', loggers=[''])

    with Connection('amqp://guest:supersecrete@localhost:5672//') as conn:
        try:
            print(conn)
            worker = Worker(conn)
            worker.run()
        except KeyboardInterrupt:
            print('bye bye')

After making the changes, now I can receive the messages. 

Saturday, April 5, 2014

Devstack with Oracle Enterprise Linux

Currently I am working on a development of OpenStack Neutron plugin for a network switch. One of the OS I need to deploy on is Oracle Enterprise Linux. Oracle announced the support of OpenStack in last December, the annocument is here. However, if you try to use devstack to setup your enviroment, it still complains that it is not a supported platform.

If you make the two line changes on the funtions-common in devstack show below.

--- a/functions-common
+++ b/functions-common
@@ -364,8 +364,6 @@ function GetOSVersion {
             fi
         elif [[ $os_VENDOR == "openSUSE project" ]]; then
             os_VENDOR="openSUSE"
-        elif [[ $os_VENDOR == "OracleServer" ]]; then
-            os_VENDOR="Red Hat"
         elif [[ $os_VENDOR =~ Red.*Hat ]]; then
             os_VENDOR="Red Hat"

         fi

Run stack.sh again, you can deploy it without any issue.

In next article, I will talk about using vagrant, pycharm and virtual box to set up your development environment.