Rapid Cancelling Of Tasks Can Cause InvalidStateError

I have a program that creates tasks that can be cancelled at any time to create a new task. Sometimes, cancelling these tasks will cause “InvalidStateError” to be raised and crash the program.
I have created a short script that recreates this error:

import asyncio
from autobahn.asyncio.component import Component, run

tasks = list()

def do_something(count):
    print(count)

async def cancel_tasks():
    while True:
        await asyncio.sleep(0.3)
        if tasks:
            task = tasks.pop(0)
            if not task.done():
                task.cancel()

async def main(session):
    count = 0
    while True:
        await session.call("com.test", count)
        count += 1

transports = [{
    "type": "websocket",
    "url": f"ws://localhost:8080/ws",
    "endpoint": {
        "type": "tcp",
        "host": "localhost",
        "port": 8080
    }
}]
component = Component(transports=transports, realm="realm1")

@component.on_join
async def on_join(session, details):
    print("joined")
    await session.register(do_something, "com.test")
    asyncio.create_task(cancel_tasks())
    while True:
        tasks.append(asyncio.create_task(main(session)))
        await asyncio.sleep(1)

@component.on_disconnect
async def on_disconnect(session, was_clean):
    print("stopping...")
    loop = asyncio.get_event_loop()
    loop.stop()

run([component])

Error:

2022-10-25T15:18:13 Traceback (most recent call last):
  File "<radacted>/venv/lib/python3.9/site-packages/autobahn/wamp/websocket.py", line 106, in onMessage
    self._session.onMessage(msg)
  File "<radacted>/venv/lib/python3.9/site-packages/autobahn/wamp/protocol.py", line 957, in onMessage
    txaio.resolve(on_reply, msg.args[0])
  File "<radacted>/venv/lib/python3.9/site-packages/txaio/aio.py", line 450, in resolve
    future.set_result(result)
asyncio.exceptions.InvalidStateError: invalid state

With this test and after poking around at the code a little, here’s my theory of what is going wrong: The Task is created and a call is made inside that Task. Before the call recieves a response, the Task is cancelled but the call still waits for a response. When it gets one, it tries to set the result of the task the call is assigned to (seen in aio.py line 450), unaware that the task has been cancelled, which then causes “InvalidStateError” to be raised.

With this in mind, I decided to add a check in protocol.py above where the responce is resolved on line 957:

if asyncio.isfuture(on_reply) and on_reply.done():
    print("result of Task cannot be processed as it is done")

# above might already have rejected, so we guard ..
if enc_err:
    txaio.reject(on_reply, enc_err)
else:
    if msg.kwargs or (call_request.options and call_request.options.details):
        kwargs = msg.kwargs or {}
        if msg.args:
            res = types.CallResult(*msg.args,
                                   callee=msg.callee,
                                   callee_authid=msg.callee_authid,
                                   callee_authrole=msg.callee_authrole,
                                   forward_for=msg.forward_for,
                                   **kwargs)
        else:
            res = types.CallResult(callee=msg.callee,
                                   callee_authid=msg.callee_authid,
                                   callee_authrole=msg.callee_authrole,
                                   forward_for=msg.forward_for,
                                   **kwargs)
        txaio.resolve(on_reply, res)
    else:
        if msg.args:
            if len(msg.args) > 1:
                res = types.CallResult(*msg.args)
                txaio.resolve(on_reply, res)
            else:
                txaio.resolve(on_reply, msg.args[0])
        else:
            txaio.resolve(on_reply, None)

This change fixed the issue in both the script to recreate the error and the original program that the error was occuring in.

Is my theory of the problem correct and is the suggested fix suitable for a PR?

yeah, your theory sounds convincing. pls find my thoughts below …

It is a race condition in the client library between sending a call cancel and an actual call result.

Currently, this is deadly: receiving a call result when “there is no call”.

Since “the network” has latency > 0, the race can’t be avoided.

The problem is: “there is no call” has actually 2 subcases:

a) there never was any call issued in the first place
b) a call had been indeed issued previously, but it was canceled

rgd the right fix, this should IMO behave like:

a) should still trigger an InvalidStateError
b) should be silently ignored

IOW: once a client has send a cancel for a call, any call result that is received after that isn’t a InvalidStateError, but silently ignored.

what do you think?

in any case: yeah, this is definitely an issue worth submitting and a PR would be great!

Thank you for you comments.
I have changed the fix to catch and log the error instead of preventing resolve from being called.

Issue: Rapid Cancelling Of Tasks Can Cause InvalidStateError · Issue #1600 · crossbario/autobahn-python · GitHub
PR: Fix for #1600 by Skully17 · Pull Request #1601 · crossbario/autobahn-python · GitHub