본문으로 건너뛰기

asyncio

EventLoop in Thread

import asyncio
import threading

# 메인 스레드에서 자식 스레드의 이벤트 루프를 제어
new_loop = asyncio.new_event_loop()

def run_loop(loop: asyncio.AbstractEventLoop) -> None:
# 현재 스레드에 이벤트 루프 등록
asyncio.set_event_loop(loop)

try:
# 현재 스레드에 등록된 이벤트 루프 가져오기
# loop = asyncio.get_event_loop()
loop.run_forever()
finally:
try:
loop.run_until_complete(loop.shutdown_asyncgens())
loop.run_until_complete(loop.shutdown_default_executor())
finally:
# 현재 스레드에 등록된 이벤트 루프 제거
asyncio.set_event_loop(None)
loop.close()

t = threading.Thread(target=run_loop, args=(new_loop,))
t.start()

# new_loop에 작업 요청
# ...

# loop.run_forever()를 종료해달라는 요청
new_loop.call_soon_threadsafe(new_loop.stop)
t.join()

EventLoop

  • 이벤트 루프 실행

    • loop.run_until_complete(future)
      • future가 완료될 때까지 블로킹 됩니다.
      • 실행전에 등록된 콜백이 있는 경우 해당 콜백들이 실행됩니다.
      • 코루틴을 인자로 전달한 경우 asyncio.Task로 감싸서 실행합니다.
      • future.result()를 반환합니다.
    • loop.run_forever(): 이벤트 루프를 실행시키고 루프가 종료될 때까지 블로킹 됩니다.
    • loop.stop()
      • 루프에 등록된 모든 코루틴을 종료시키고 루프를 종료합니다.
      • 새로 등록한 콜백은 실행되지 않고, 다음 이벤트 루프를 실행시킬 때 실행됩니다.
      • 콜백 내에서 새로운 콜백을 등록한 경우 새로운 콜백이므로 실행되지 않기 때문에 await에 걸려있는 경우 프로그램이 멈추게 됩니다.
  • 이벤트 루프에 콜백 등록

    • loop.call_soon(callback, *args, context=None)
      • 콜백은 callback(*args)로 실행됩니다.
      • 컨텍스트를 지정하지 않으면 현재 컨택스트가 사용됩니다.
      • asyncio.Handle를 반환합니다.
    • loop.call_soon_threadsafe(callback, *args, context=None)
      • 스레드 안전한 loop.call_soon()입니다.
      • 다른 스레드에서 콜백을 등록할 때 사용해야합니다.
    • loop.call_later(delay, callback, *args, context=None)
      • delay초 후에 콜백을 등록합니다.
      • asyncio.TimerHandle를 반환합니다.
    • loop.call_at(when, callback, *args, context=None)
      • when == loop.time()에 콜백을 등록합니다.
      • asyncio.TimerHandle를 반환합니다.
    • loop.run_in_executor(executor, callback, *args)
      • executor에서 콜백을 실행합니다.
        • None(기본 실행기), concurrent.futures.ThreadPoolExecutor, concurrent.futures.ProcessPoolExecutor
      • asyncio.Future를 반환합니다.
    • loop.add_signal_handler(sig, callback, *args)
      • signal.SIGINT, signal.SIGTERM, ... 등의 signal이 발생하면 콜백을 실행합니다.
      • 콜백은 callback(*args)로 실행됩니다.
      • 메인 스레드에서만 사용할 수 있습니다.
      • loop.remove_signal_handler(sig)로 콜백을 제거할 수 있습니다.
    • asyncio.run_coroutine_threadsafe(coroutine, loop)
      • 루프에 코루틴을 등록합니다.
      • concurrent.futures.Future를 반환합니다.
    • Keywords 인자를 사용해야하는 경우 functools.partial()을 사용합니다.
      • Ex) loop.call_soon(functools.partial(callback, arg1, key=value))
  • 이벤트 루프 상태 확인

    • loop.is_running(): 루프가 실행중이면 True를 반환합니다.
    • loop.is_closed(): 루프가 닫혔으면 True를 반환합니다.
  • 이벤트 루프 닫기

    • loop.shutdown_asyncgens()
      • 현재 열려있는 비동기 제너레이터 객체의 aclose()를 호출하여 닫습니다.
      • 이후에 비동기 제너레이터가 열리면 경고가 발생합니다.
    • loop.shutdown_default_executor()
      • 기본 실행기의 종료 콜백을 등록하고, 현재 열려있는 ThreadPoolExecutor의 스레드들을 조인합니다.
      • 이후에 loop.run_in_executor()에 기본 실행기를 사용하려는 경우 RuntimeError를 발생시킵니다.
    • loop.close()
      • 루프를 닫습니다.
      • 등록된 모든 콜백이 삭제 됩니다.