Posted 2020-07-31 22:00:00 GMT
Python's builtin greenlets with asyncio are complex to use and the relationship with other threading mechanisms are unclear. Here's an example of an attempt to create only once semantics.
With multiple OS threads in the same Python process, each with its own asyncio event loop, then at most one thread and therefore at most one event loop may execute Python code at any time under CPython. If you have IO heavy code or code that can run outside Python, the convenience of a single consistent Python environment may outweigh the costs of having to serialize all Python execution.
One nuance is that under asyncio all futures are bound to one event loop. You can't wait for another event loop from another, or else you get an error like
RuntimeError: Task ... got Future ... attached to a different loop
Having loops wait for each other is complex to debug. However, it is possible to achieve quite easily by having the future you're waiting for call back to the loop you're in via call_soon_threadsafe. This primitive allows calling across loops safely.
The future publishes the loop it is on. If it's not on the current loop, create a future on the current loop that waits for the original one. We could create at most one future per loop by caching these.
Here's a demo implementation.
import asyncio import threading from contextlib import contextmanager async def one_time_setup(): with p('one_time_setup'): await asyncio.sleep(1) future = None async def demo_exactly_once(): global future if not future: future = asyncio.create_task(one_time_setup()) with p('waiting'): local_loop = asyncio.get_event_loop() if future.done() or future.get_loop() == local_loop: return await future else: local_future = local_loop.create_future() future.add_done_callback( lambda result: local_loop.call_soon_threadsafe( lambda: local_future.set_result(result)) ) return await local_future @contextmanager def p(msg): """Simple reporting mechanism including loop and thread information""" def event(event): print(f"{msg} {event} from thread: {threading.get_ident()}, event loop: {id(asyncio.get_running_loop())}") event('start') try: yield finally: event('end') def worker(): new_loop = asyncio.new_event_loop() asyncio.set_event_loop(new_loop) new_loop.run_until_complete(demo_exactly_once()) for thread in [threading.Thread(target=worker) for _ in range(2)]: thread.start()
Post a comment