John Fremlin's blog: Exactly-Once in multithreaded async Python

Posted 2020-07-31 22:00:00 GMT

1 watching live

Python's builtin greenlets with asyncio are complex to use and the relationship with other threading mechanisms are unclear. Here's an example of an attempt to create only once semantics.

With multiple OS threads in the same Python process, each with its own asyncio event loop, then at most one thread and therefore at most one event loop may execute Python code at any time under CPython. If you have IO heavy code or code that can run outside Python, the convenience of a single consistent Python environment may outweigh the costs of having to serialize all Python execution.

One nuance is that under asyncio all futures are bound to one event loop. You can't wait for another event loop from another, or else you get an error like

RuntimeError: Task ... got Future ... attached to a different loop

Having loops wait for each other is complex to debug. However, it is possible to achieve quite easily by having the future you're waiting for call back to the loop you're in via call_soon_threadsafe. This primitive allows calling across loops safely.

The future publishes the loop it is on. If it's not on the current loop, create a future on the current loop that waits for the original one. We could create at most one future per loop by caching these.

Here's a demo implementation.

import asyncio
import threading
from contextlib import contextmanager
 
async def one_time_setup():
    with p('one_time_setup'):
        await asyncio.sleep(1)
 
future = None
async def demo_exactly_once():
    global future
    if not future:
        future = asyncio.create_task(one_time_setup())
    with p('waiting'):
        local_loop = asyncio.get_event_loop()
        if future.done() or future.get_loop() == local_loop:
            return await future
        else:
            local_future = local_loop.create_future()
            future.add_done_callback(
                lambda result: local_loop.call_soon_threadsafe(
		       lambda: local_future.set_result(result))
            )
            return await local_future
 
@contextmanager
def p(msg):
    """Simple reporting mechanism including loop and thread information"""
    def event(event):
        print(f"{msg} {event} from thread: {threading.get_ident()}, event loop: {id(asyncio.get_running_loop())}")
    event('start')
    try:
        yield
    finally:
        event('end')
 
def worker():
    new_loop = asyncio.new_event_loop()
    asyncio.set_event_loop(new_loop)
    new_loop.run_until_complete(demo_exactly_once())
for thread in [threading.Thread(target=worker) for _ in range(2)]:
    thread.start()

Post a comment