I've modified the sample code of worker.py from here to suit my need. Here is my initial code.
from kombu.mixins import ConsumerMixin
from kombu.log import get_logger
from kombu import Queue, Exchange
logger = get_logger(__name__)
class Worker(ConsumerMixin):
task_queue = Queue('notifications.info', Exchange('neutron', 'topic'))
def __init__(self, connection):
self.connection = connection
def get_consumers(self, Consumer, channel):
return [Consumer(queues=[self.task_queue],
accept=['json'],
callbacks=[self.process_task])]
def process_task(self, body, message):
print("RECEIVED MESSAGE: %r" % (body, ))
message.ack()
if __name__ == '__main__':
from kombu import Connection
from kombu.utils.debug import setup_logging
# setup root logger
setup_logging(loglevel='DEBUG', loggers=[''])
with Connection('amqp://guest:supersecrete@localhost:5672//') as conn:
try:
print(conn)
worker = Worker(conn)
worker.run()
except KeyboardInterrupt:
print('bye bye')
The above highlighted codes are the changes. I make sure I use the correct queue name and exchange, and the 'guess' password you set in your setup.
To find out which queue name and topic available, I use
sudo rabbitmqctl list_exchanges
and
sudo rabbitmqctl list_queues
To find out more info about rabbitmqctl, read the man page here.
I pick the 'notifications.info' queue because I am interesting to look into the 'port.create.start' and 'port.create.end' events which are useful to my current work.
Then I ran the above program,
<Connection: amqp://guest@localhost:5672// at 0x2396050>
Everything seemed fine but I did not receive any events when the port creation was triggered by instance creation.
So what did it go wrong?
After poking a few places, I saw the message from the rabbitmq log. The log is located at /var/log/rabbitmq.
=ERROR REPORT==== 8-Apr-2014::17:47:06 ===
connection <0.25614.29>, channel 1 - soft error:
{amqp_error,precondition_failed,
"cannot redeclare exchange 'neutron' in vhost '/' with different type, durable, internal or autodelete value",
'exchange.declare'}
So the default settings of the kombu topic is different from the Neutron ml2 plugin of the OpenStack. RabbitMQ thought I tried to redeclare some of the attributes of the topic. Since sample code uses the default settings of the Exchange class, I check the default settings from the API doc and the settings of the topic using 'rabbitmqctl list_exchanges'.
Note: The default output of 'sudo rabbitmqctl list_exchanges' only shows name and type attributes. To list the addition attributes, you need to specify them as arguments. For example, 'sudo rabbitmqctl list_exchanges name type autodelete' lists the name, type and autodelete attributes. Please see the man page for details.
I found that the autodelete attributes for both topic and queue needs to be set to False. Here is the source code with the highlighted changes.
from kombu.mixins import ConsumerMixin
from kombu.log import get_logger
from kombu import Queue, Exchange
logger = get_logger(__name__)
class Worker(ConsumerMixin):
task_queue = Queue('notifications.info', Exchange('neutron', 'topic', durable=False), durable=False)
def __init__(self, connection):
self.connection = connection
def get_consumers(self, Consumer, channel):
return [Consumer(queues=[self.task_queue],
accept=['json'],
callbacks=[self.process_task])]
def process_task(self, body, message):
print("RECEIVED MESSAGE: %r" % (body, ))
message.ack()
if __name__ == '__main__':
from kombu import Connection
from kombu.utils.debug import setup_logging
# setup root logger
setup_logging(loglevel='DEBUG', loggers=[''])
with Connection('amqp://guest:supersecrete@localhost:5672//') as conn:
try:
print(conn)
worker = Worker(conn)
worker.run()
except KeyboardInterrupt:
print('bye bye')
After making the changes, now I can receive the messages.