The proper way to create a message queue

Hi,

I love using crossbar.io router for the architecture of my async python projects, but I still havn’t figured out a way to create a message queue that ensures each message gets consumed and holds them in the meantime.

Any example or hint about how to achieve this ?

Hi there,

a way to create a message queue that ensures each message gets consumed and holds them in the meantime

for the latter, there is a feature called “event history” with both in-memory ephemeral and LMDB database backed stores:

then, you can use the WAMP meta API to query the history (“queue”) on topics.

rgd the former: Crossbar.io has no support for transactional message publication, or transactional message delivery.

if you need transactional messaging, and with no more infos/constraints about your specific use case, I’d use PostgreSQL with Crossbar.io in front with RPCs.

Hope this helps,
Cheers,
/Tobias

Thank you oberstet,

I’ll look into the event history but what i ended up doing (for now) is a simple “in memory queue” implementation which works like this (with autobahn-python):

        queue = []
        async def on_event(event_data):
            queue.append(event_data)
        await self.subscribe(on_event, 'my.subscribe.route')
    
        async def process_next_event():
            event_data = queue.pop(0)
            #do something with event_data
            await asyncio.sleep(0.1)
            asyncio.get_event_loop().create_task(process_next_event())
            
        #initial call to trigger the recursive-ish queue-pop mechanic
        await process_next_event()

This way, when an event is received through pub/sub, I store it in the queue and I’m able to pop them at a choosen pace, here: asyncio.sleep(0.1).

This does the trick for now and I’m posting since it might help someone else trying to do a simple queue with crossbar. Suggestions for improvements welcome