Python concurrent.futures
ProcessPoolExecutor
import os
import time
from concurrent.futures import ProcessPoolExecutor, as_completed
def task(index: int) -> int:
print(f"task {index} start in {os.getpid()}")
if index % 3 == 0:
raise RuntimeError(f"task {index} error")
time.sleep(3 - index % 3)
return index
with ProcessPoolExecutor(max_workers=3) as executor:
# Non-blocking
# pool 크기에 상관 없이 작업 자체는 바로 제출 됨
futures = [executor.submit(task, *args) for args in [(i,) for i in range(10)]]
print("submit done")
# 완료된 작업을 가져옴
for future in as_completed(futures):
try:
# 에러 없이 task가 완료되었을 때 결과를 가져옴
result = future.result()
print(f"task {result} done")
except BaseException as e:
# task를 실행하는 도중 발생한 에러 처리
print(f"exception: {e}")
import os
import time
from concurrent.futures import Future, ProcessPoolExecutor, wait
def task(index: int) -> int:
print(f"task {index} start in {os.getpid()}")
if index % 3 == 0:
raise RuntimeError(f"task {index} error")
time.sleep(3 - index % 3)
return index
def callback(fut: Future[int]):
try:
# 에러 없이 task가 완료되었을 때 결과를 가져옴
result = fut.result()
print(f"task {result} done")
except BaseException as e:
# task를 실행하는 도중 발생한 에러 처리
print(f"exception: {e}")
with ProcessPoolExecutor(max_workers=3) as executor:
# Non-blocking
# pool 크기에 상관 없이 작업 자체는 바로 제출 됨
futures: list[Future[int]] = []
for args in [(i,) for i in range(10)]:
future = executor.submit(task, *args)
future.add_done_callback(callback)
futures.append(future)
print("submit done")
# 모든 작업이 완료될 때까지 대기
wait(futures)
경고
ProcessPoolExecutor
를 사용할 때, timeout
기능을 제공하는 Future
의 메서드나, 이를 인자로 받는 함수를 사용하면 프로그램 상에서는 timeout된 것으로 보이지만 실제로는 작업이 계속 진행되는 문제가 있습니다.
timeout
이 필요한 경우, ProcessPoolExecutor._processes
를 직접 사용하여 Process
를 종료시키거나 multiprocessing
모듈로 프로그램을 작성해야합니다.
ProcessPoolExecutor with asyncio
asyncio로 프로그래밍을 하다보면 CPU-bound 작업에 의해 이벤트 루프가 블로킹되어 다른 작업이 실행되 지 않는 경우가 있습니다. 이를 해결하기위해 ProcessPoolExecutor만 사용하는 경우에도 이벤트 루프가 블로킹되기 때문에, loop.run_in_executor
를 사용하여 함수 실행을 awaitable로 변경하는 작업을 해줘야합니다.
import asyncio
from concurrent.futures import ProcessPoolExecutor
def task(index: int) -> int:
print(f"task {index} start in {os.getpid()}")
if index % 3 == 0:
raise RuntimeError(f"task {index} error")
time.sleep(10 - index % 3)
return index
async def cpu_bound_task() -> list[int]:
loop = asyncio.get_event_loop()
with ProcessPoolExecutor(max_workers=3) as executor:
# Non-blocking
# pool 크기에 상관 없이 작업 자체는 바로 제출 됨
futures = [
loop.run_in_executor(executor, task, *args) for args in [(i,) for i in range(10)]
]
print("submit done")
results = []
for future in asyncio.as_completed(futures):
try:
# 에러 없이 task가 완료되었을 때 결과를 가져옴
# 기다리는 동안 다른 async 작업이 실행될 수 있음
result = await future
print(f"task {result} done")
results.append(result)
except BaseException as e:
# task를 실행하는 도중 발생한 에러 처리
print(f"exception: {e}")
return results
자식 프로세스에서 signal 핸들링
프로세스가 생성될 때,
- fork의 경우 자식 프로세스는 부모의 모든 자원을 상속받습니다. 따라서 생성되는 시점의 부모 프로세스의 signal handler를 상속받습니다.
- spawn의 경우 자식 프로세스는 필요한 자원만 상속받습니다.
자식 프로세스가 시그널에 의해 종료되는 것을 막아야하는 경우가 있는데, 이를 위해 Process 시작 시 signal handler를 무시하도록 설정해야합니다.
import signal
from concurrent.futures import ProcessPoolExecutor
def ignore_signal() -> None:
signal.signal(signal.SIGINT, signal.SIG_IGN)
signal.signal(signal.SIGTERM, signal.SIG_IGN)
with ProcessPoolExecutor(5, initializer=ignore_signal) as executor:
...