Python concurrent.futures
ProcessPoolExecutor
import os
import time
from concurrent.futures import ProcessPoolExecutor, as_completed
def task(index: int) -> int:
print(f"task {index} start in {os.getpid()}")
if index % 3 == 0:
raise RuntimeError(f"task {index} error")
time.sleep(3 - index % 3)
return index
with ProcessPoolExecutor(max_workers=3) as executor:
# Non-blocking
# pool 크기에 상관 없이 작업 자체는 바로 제출 됨
futures = [executor.submit(task, *args) for args in [(i,) for i in range(10)]]
print("submit done")
# 완료된 작업을 가져옴
for future in as_completed(futures):
try:
# 에러 없이 task가 완료되었을 때 결과를 가져옴
result = future.result()
print(f"task {result} done")
except BaseException as e:
# task를 실행하는 도중 발생한 에러 처리
print(f"exception: {e}")
import os
import time
from concurrent.futures import Future, ProcessPoolExecutor, wait
def task(index: int) -> int:
print(f"task {index} start in {os.getpid()}")
if index % 3 == 0:
raise RuntimeError(f"task {index} error")
time.sleep(3 - index % 3)
return index
def callback(fut: Future[int]):
try:
# 에러 없이 task가 완료되었을 때 결과를 가져옴
result = fut.result()
print(f"task {result} done")
except BaseException as e:
# task를 실행하는 도중 발생한 에러 처리
print(f"exception: {e}")
with ProcessPoolExecutor(max_workers=3) as executor:
# Non-blocking
# pool 크기에 상관 없이 작업 자체는 바로 제출 됨
futures: list[Future[int]] = []
for args in [(i,) for i in range(10)]:
future = executor.submit(task, *args)
future.add_done_callback(callback)
futures.append(future)
print("submit done")
# 모든 작업이 완료될 때까지 대기
wait(futures)