How can I open a connection to crossbar and publish without using run() loop?

I have a simple python application where I would like to open a connection to the crossbar server, and then publish info messages from time to time.

If I call autobahn.asyncio.component.run(), to open the connection, then the main thread of
control is given to that loop and I cannot run my own functions.

So I would like to essentially have Component() join the crossbar and then be able to get my hands on the session in my main thread and call session.publish whenever I need to.

When I run the code below I get an error. How can I make sure an event loop exists for this,
or else how to manually make a connection to the crossbar server and manually publish to it without requiring the asyncio event loop?

  File "/Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/asyncio/events.py", line 602, in get_event_loop
    % threading.current_thread().name)
RuntimeError: There is no current event loop in thread 'Thread-1'.


from autobahn.asyncio.component import Component, run
import threading

# Declaration for purpose of documentation to say these are globals
global this_session
global crossbar

crossbar = Component(
   transports=[
       {
           "type": "websocket",
           "url": u"wss://localhost:1964/ws",
           "endpoint": {
               "type": "tcp",
               "host": "localhost",
               "port": 1964,
#                "tls": context,
           },
           "options": {
               "open_handshake_timeout": 100,
           }
       },
   ],
   realm=u"leela",
)

@crossbar.on_join
async def join(session, details):
   global this_session
   print("joined {}".format(details))
   this_session = session

# 
def publish_something(data):
   global this_session
   this_session.publish("my.topic.render.json", data)


t1 = threading.Thread(target=run, args=([crossbar],))
t1.start()

Well, you do need an event-loop to do event-based networking.

Autobahn lets you choose either asyncio or Twisted for that. The autobahn.asyncio.component.run method is a convenience, though; if you arrange to have the loop running elsewhere then just call component.start(). You should be able to run the Twisted reactor in a thread. I believe that should also work for asyncio … but in both cases you can only call asyncio/twisted methods in that thread. So, you would have to arrange for your “publish” call to occur on that same thread. Twisted has reactor.callFromThread helper; asyncio has call_soon_threadsafe (see https://docs.python.org/3/library/asyncio-eventloop.html#asyncio.loop.call_soon_threadsafe )

Is there a reason you want to use threads for this? If you’re writing new code, it would be best if it was all event-based.

I am also trying to figure out the same problem as @hqm.
Running component.start blocks (in this code example crossbar.start). It never exits. It connects and blocks.

Example usage:

import asyncio
from autobahn.asyncio.component import Component

class Client:
    session = None
    details = None

    def __init__(self, url, realm):
        self.url = url
        self.realm = realm
        self.comp = Component(transports=url, realm=realm)
        self.comp.on('join', self.on_join)

    async def on_join(self, session, details):
        self.session = session
        self.details = details
        print('joined')

    async def init(self):
        await self.comp.start(asyncio.get_running_loop())

in ipython:

>>> client = Client('ws://router:port', 'example_realm')
>>> loop = asyncio.get_event_loop()
>>> loop.run_until_complete(client.start())  <-- this gets stuck here
>>> await client.session.call("com.app.func_name", 1, 2, 3) <--- this doesnt happen, but is expected or hoped for to happen

In this setting, how to perform a call without using run and blocking?

It isn’t “blocking” but the future that start() returns doesn’t fire until your Component is “done”. So, just don’t await that future. That is, instead of wrapping client.start() in run_until_complete just start your loop, call start() then do your call.

You may also like the main= argument to Component (which receives a connected session).

A big part of the reason you provide “things which are called with a session object” (instead of e.g. await component.get_session() or similar) is because of re-connects and failures: on_join may run many times. So, if you do your set up in there it will run each time your client re-connects. Similar for a “main”-style program: if it fails, the client will re-connect and re-start your “main”.

Thank you for taking the time to respond.

I updated the client:

import asyncio

from autobahn.asyncio.component import Component


class Client:
    session = None
    details = None

    def __init__(self, url, realm):
        self.url = url
        self.realm = realm
        self.comp = Component(transports=url, realm=realm)
        self.comp.on('join', self.on_join)

    async def on_join(self, session, details):
        self.session = session
        self.details = details
        print('joined')

    async def on_stop(self, loop, arg):
        print("stopping", arg)

    def init(self, loop):
        self.comp.start(loop)

in ipython:

In [18]: async def do_call():
    ...:     client = Client("ws://router:port", "example_realm")
    ...:     client.init(asyncio.get_running_loop())
    ...:     res = client.session.call("com.app.fun_name", 1, 2, 3)
    ...:     print(res)
In [19]: loop.create_task(do_call())
Out[19]: <Task pending name='Task-3888' coro=<do_call() running at <ipython-input-18-380f93d29516>:1>>

In [20]: loop.run_forever()
joined

print(res) never happened.
Is this what you mean?
can you please provide a minimal example that doesn’t utilize run?

It’s hard to understand what you’re trying to accomplish, exactly. Something is going to have to be your “main”; is that what “do_call” is above? Nothing is awaiting your Task is I think the immediate problem above … but of course something has to be responsible for starting and stopping the event-loop (that’s most of what the run convenience method does).

I would generally think of having Client be constructed by the Component (instead of “having a” Component) because now you have an object that’s “sometimes not ready to go”. That is, session will be None whenever you’re not connected to the router. Thinking the other way around, if your Component constructs a Client then the session is never None. So that’s a second problem above: client.session won’t be valid until some arbitrary amount of time passes (or until on_join runs).

So, why not put your session.call("com.app.fun_name", 1, 2, 3) inside the on-join? Do you want to run it once? Do you want to run it every time you connect to the router?

There’s kind of two styles / ways to use Component. For “server-ish” things, you’d want to do setup in on_join. For “client-ish” things, you can pass a main= kwarg to Component … this will run that coroutine and then disconnect and consider itself “done”. You don’t have to write things that way (e.g. you can accomplish everything using on_join / on_leave) but that’s the intent.

But, okay, so here’s your example re-worked to function. As I say above, though, I’d structure this differently so I’d like to learn why you want to grab the session outside of the Client class?

import asyncio

from autobahn.asyncio.component import Component


class Client:
    session = None
    details = None

    def __init__(self, url, realm, ready):
        self.url = url
        self.realm = realm
        self.ready = ready
        self.comp = Component(transports=url, realm=realm)
        self.comp.on('join', self.on_join)

    async def on_join(self, session, details):
        self.session = session
        self.details = details
        print('joined')
        self.ready.set_result(None)

    async def on_stop(self, loop, arg):
        print("stopping", arg)

    def init(self, loop):
        self.comp.start(loop)


async def main(loop):
    ready = loop.create_future()
    client = Client("ws://localhost:8080/ws", "crossbardemo", ready)
    client.init(loop)
    # client.session will be None here .. so we need to wait until
    # "on_join" runs..
    await ready
    res = await client.session.call("com.app.fun_name", 1, 2, 3)
    print(res)

loop = asyncio.get_event_loop()
loop.run_until_complete(main(loop))

Thank you again! Really appreciate your help. Been working at it for two days, hardly had any break-through.

I have component A (server Component, runs via component.run) that manages connections/access to multiple services.
Component B is tornado web server, that needs to talk to different services via component A.
To minimize all open-join-call rpc-leave, I want to have 1 client that will connect to router, join and then perform calls as needed (expected many calls, from different handlers). On server shutdown it will gracefully leave.

Thank you for sharing the code. That future bit was a missing piece in my asyncio knowledge.

Great!

Yeah, asyncio really tries to hide “Future” these days, but sometimes that’s what you need. So part of the reason Component makes the join/hide stuff explicit and only gives you the session in join is because for your case you’ll have to think about what the webserver should do if its WAMP connection is down at the moment (maybe it just started and hasn’t connected yet, or maybe some “networking stuff” happened and it got disconnected). Maybe it wants to just wait, maybe it wants to give an error to that client and hope things improve next time, etc …

(So you probably want to do something sensible in “on_leave” … like self.session = None or self.web_server.set_wamp_session(None) etc). The important point here is that once you call Component.start then join can be called any number of times after that (but WONT be called twice in a row without leave() being called first).

This is our use case:

We have an application running that will execute some batches of tests, and we just want to monitor its progress by having it publish periodically its status.

The problem is, how do we get our code to execute our series of tests in our “main” app, if the autobahn event loop has taken control via the “run()” entry point?

Autobahn doesn’t have an event-loop, the run() convenience function merely uses either asyncio’s run_forever or Twisted’s reactor.run(). As per the above, the .start() method can be used to start a Component while you retain control over the program. You still need to start the underlying event-loop of course (but now you do that instead of run()).