Есть много предложений, но никто из них не использует concurrent.futures, который, по моему мнению, является наиболее понятным для этого способом.
from concurrent.futures import ProcessPoolExecutor
# Warning: this does not terminate function if timeout
def timeout_five(fnc, *args, **kwargs):
with ProcessPoolExecutor() as p:
f = p.submit(fnc, *args, **kwargs)
return f.result(timeout=5)
Супер просто читать и поддерживать.
Мы создаем пул, отправляем один процесс, а затем дождаемся до 5 секунд, прежде чем поднимать TimeoutError, который вы могли бы поймать и обработать, но вам нужно.
Родной для python 3.2+ и backported до 2.7 (
Переключение между потоками и процессами так же просто, как замена ProcessPoolExecutor
на ThreadPoolExecutor
.
Если вы хотите завершить процесс по таймауту, я бы предложил посмотреть в Pebble .