Skip to main content

asyncio


Event Loop in Thread

import asyncio
import threading


new_loop = asyncio.new_event_loop()

def run_loop(loop: asyncio.AbstractEventLoop) -> None:
asyncio.set_event_loop(loop)

try:
loop.run_forever()
finally:
try:
loop.run_until_complete(loop.shutdown_asyncgens())
loop.run_until_complete(loop.shutdown_default_executor())
finally:
asyncio.set_event_loop(None)
loop.close()

t = threading.Thread(target=run_loop, args=(new_loop,))
t.start()

# new_loop에 작업 요청
# ...

# loop.run_forever()를 종료해달라는 요청
new_loop.call_soon_threadsafe(new_loop.stop)
t.join()