asyncio
Event Loop in Thread
import asyncio
import threading
new_loop = asyncio.new_event_loop()
def run_loop(loop: asyncio.AbstractEventLoop) -> None:
asyncio.set_event_loop(loop)
try:
loop.run_forever()
finally:
try:
loop.run_until_complete(loop.shutdown_asyncgens())
loop.run_until_complete(loop.shutdown_default_executor())
finally:
asyncio.set_event_loop(None)
loop.close()
t = threading.Thread(target=run_loop, args=(new_loop,))
t.start()
# new_loop에 작업 요청
# ...
# loop.run_forever()를 종료해달라는 요청
new_loop.call_soon_threadsafe(new_loop.stop)
t.join()