I. Import 하기

from carrot import connection as carrot_connection
from carrot import messaging

II. Connection 객체 생성

params = dict(hostname=FLAGS.rabbit_host,
                    port=FLAGS.rabbit_port,
                    ssl=FLAGS.rabbit_use_ssl,
                    userid=FLAGS.rabbit_userid,
                    password=FLAGS.rabbit_password,
                    virtual_host=FLAGS.rabbit_virtual_host)

connection = carrot_connection.BrokerConnection(params);

III. Consumer 생성

consumer = messaging.Consumer( connection=self,
                                                  topic=topic,
                                                  proxy=proxy)

IV. Sample (http://nathanborror.com/posts/2009/may/20/working-django-and-rabbitmq/)

########## Set global variables ###################

AMQP_SERVER = 'localhost'
AMQP_PORT = 5672
AMQP_USER = 'guest'
AMQP_PASSWORD = 'guest'
AMQP_VHOST = '/'

########## Create a consumer ###################

>>> from flopsy import Connection, Consumer
>>> consumer = Consumer(connection=Connection())
>>> consumer.declare(queue='books', exchange='readernaut', routing_key='importer', auto_delete=False)

>>> def message_callback(message):
...     print 'Recieved: ' + message.body
...     consumer.channel.basic_ack(message.delivery_tag)
>>>
>>> consumer.register(message_callback)
>>> consumer.wait()


########## Create a publisher ###################

>>> from flopsy import Connection, Publisher
>>> publisher = Publisher(connection=Connection(), exchange='readernaut', routing_key='importer')
>>> publisher.publish('Test message!')
>>> publisher.close()



[ Pool 사용 ]

1. Publisher 가 cast 로 호출할 때 pools.Pool을 사용 (Connection 을 Pool로 연결)

from eventlet import pools

class Pool(pools.Pool):
    def create(self):
        LOG.debug('Pool creating new connection')
        return Connection.instance(new=True)

ConnectionPool = Pool(
        max_size=FLAGS.rpc_conn_pool_size,
        order_as_stack=True)

with ConnectionPool.item() as conn:
        publisher = DirectPublisher(connection=conn, msg_id=msg_id)

2. Consumer 가 메세지를 받을 때 GreenPool을 사용 (메소드를 Pool로 실행)

from eventlet import greenpool

self.pool = greenpool.GreenPool(FLAGS.rpc_thread_pool_size)
self.pool.spawn_n(self._process_data, msg_id, ctxt, method, args)

def _process_data(self, msg_id, ctxt, method, args):
...
 
3. Consumer 를 Thread로 변경하여 실행을 분기시키고자 할 때 eventlet.spawn을 사용
    - 코드를 wait() block 없이 계속 실행
    - 해당 Thread 가 return 값으로 넘어옮

import eventlet

self._rpc_consumer_thread = eventlet.spawn(_consumer_thread)
 
4. Report 와 같이 내부 LoopingCall 을 분기시켜 실행하고자 할 때 eventlet.greenthread 를 사용

from eventlet import greenthread

def _inner():
    ...
    greenthread.sleep(interval)
    ...

greenthread.spawn(_inner)




[ 용어 정리 ]
- queue      : Queue 이름을 의미
- exchange : Queue 와 Bind 하기 위한 exchange 이름을 의미
- exchange_type :
    . direct  : routing_key 가 정확하게 매칭되어야 함
    . topic   : routing_key 를 패턴 매칭으로 사용 가능
    . fanout : routing_key 가 필요 없이 모두 통신
- routing_key : Key 이름에 해당하는 Queue 에만 메세지를 보낼 수 있음




저작자 표시 변경 금지
신고
Posted by OpenStack Korea 부대표 Seungkyu Ahn(seungkyua@gmail.com