Решение этой проблемы было дано миллион раз в Интернете. Задача называется Задача об изменении монет . Можно найти решения в http://rosettacode.org/wiki/Count_the_coins и математическую модель этого в http://jaqm.ro/issues/volume-5,issue-2/ pdf / patterson_harmel.pdf (или проблема с изменением монеты Google ).
Кстати, решение Scala от Tsagadai интересно. В этом примере получается 1 или 0. В качестве побочного эффекта он перечисляет на консоли все возможные решения. Он отображает решение, но не делает его пригодным для использования каким-либо образом.
Чтобы быть как можно полезнее, код должен возвращать List[List[Int]]
, чтобы разрешить получать количество решений (длина списка списков), «лучшее» решение (самый короткий список) или все возможные решения.
Вот пример. Это очень неэффективно, но это легко понять.
object Sum extends App {
def sumCombinations(total: Int, numbers: List[Int]): List[List[Int]] = {
def add(x: (Int, List[List[Int]]), y: (Int, List[List[Int]])): (Int, List[List[Int]]) = {
(x._1 + y._1, x._2 ::: y._2)
}
def sumCombinations(resultAcc: List[List[Int]], sumAcc: List[Int], total: Int, numbers: List[Int]): (Int, List[List[Int]]) = {
if (numbers.isEmpty || total < 0) {
(0, resultAcc)
} else if (total == 0) {
(1, sumAcc :: resultAcc)
} else {
add(sumCombinations(resultAcc, sumAcc, total, numbers.tail), sumCombinations(resultAcc, numbers.head :: sumAcc, total - numbers.head, numbers))
}
}
sumCombinations(Nil, Nil, total, numbers.sortWith(_ > _))._2
}
println(sumCombinations(15, List(1, 2, 5, 10)) mkString "\n")
}
При запуске он отображает:
List(1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1)
List(1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 2)
List(1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 2, 2)
List(1, 1, 1, 1, 1, 1, 1, 1, 1, 2, 2, 2)
List(1, 1, 1, 1, 1, 1, 1, 2, 2, 2, 2)
List(1, 1, 1, 1, 1, 2, 2, 2, 2, 2)
List(1, 1, 1, 2, 2, 2, 2, 2, 2)
List(1, 2, 2, 2, 2, 2, 2, 2)
List(1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 5)
List(1, 1, 1, 1, 1, 1, 1, 1, 2, 5)
List(1, 1, 1, 1, 1, 1, 2, 2, 5)
List(1, 1, 1, 1, 2, 2, 2, 5)
List(1, 1, 2, 2, 2, 2, 5)
List(2, 2, 2, 2, 2, 5)
List(1, 1, 1, 1, 1, 5, 5)
List(1, 1, 1, 2, 5, 5)
List(1, 2, 2, 5, 5)
List(5, 5, 5)
List(1, 1, 1, 1, 1, 10)
List(1, 1, 1, 2, 10)
List(1, 2, 2, 10)
List(5, 10)
Функция sumCombinations()
может использоваться сама по себе и результат может быть дополнительно проанализирован, чтобы отобразить «лучшее» решение (самый короткий список) или количество решений (количество списков).
Обратите внимание, что даже так, требования могут быть не полностью доволен. Может случиться так, что порядок каждого списка в решении будет значительным. В таком случае каждый список должен дублироваться столько раз, сколько есть комбинация его элементов. Или нас могут интересовать только разные комбинации.
Например, мы могли бы подумать, что List(5, 10)
должен дать две комбинации: List(5, 10)
и List(10, 5)
. Для List(5, 5, 5)
он может давать только три комбинации или один, в зависимости от требований. Для целых чисел три перестановки эквивалентны, но если мы имеем дело с монетами, например, в «проблеме смены монет», это не так.
Также не указано в требованиях, является ли вопрос о том, (или монету) можно использовать только один или несколько раз. Мы могли бы (и должны!) Обобщить проблему на список списков вхождений каждого числа. Это переводит в реальной жизни в «каковы возможные способы сделать определенную сумму денег с помощью набора монет (а не набора значений монет)». Первоначальная проблема - только частный случай этого, где у нас столько же вхождений каждой монеты, сколько необходимо, чтобы сделать общую сумму с каждым значением одной монеты.
Вы можете использовать пакет signal , если вы работаете в UNIX:
In [1]: import signal
# Register an handler for the timeout
In [2]: def handler(signum, frame):
...: print "Forever is over!"
...: raise Exception("end of time")
...:
# This function *may* run for an indetermined time...
In [3]: def loop_forever():
...: import time
...: while 1:
...: print "sec"
...: time.sleep(1)
...:
...:
# Register the signal function handler
In [4]: signal.signal(signal.SIGALRM, handler)
Out[4]: 0
# Define a timeout for your function
In [5]: signal.alarm(10)
Out[5]: 0
In [6]: try:
...: loop_forever()
...: except Exception, exc:
...: print exc
....:
sec
sec
sec
sec
sec
sec
sec
sec
Forever is over!
end of time
# Cancel the timer if the function returned before timeout
# (ok, mine won't but yours maybe will :)
In [7]: signal.alarm(0)
Out[7]: 0
Через 10 секунд после вызова alarm.alarm(10)
вызывается обработчик. Это вызывает исключение, которое вы можете перехватить из обычного кода Python.
Этот модуль плохо воспроизводится с потоками (но тогда кто это делает?)
Обратите внимание, что поскольку мы поднимаем исключение, когда происходит тайм-аут, он может оказаться пойманным и проигнорированным внутри функции, например, одной из таких функций:
def loop_forever():
while 1:
print 'sec'
try:
time.sleep(10)
except:
continue
Мы можем использовать сигналы для одного и того же. Я думаю, что приведенный ниже пример будет полезен для вас. Это очень просто по сравнению с потоками.
import signal
def timeout(signum, frame):
raise myException
#this is an infinite loop, never ending under normal circumstances
def main():
print 'Starting Main ',
while 1:
print 'in main ',
#SIGALRM is only usable on a unix platform
signal.signal(signal.SIGALRM, timeout)
#change 5 to however many seconds you need
signal.alarm(5)
try:
main()
except myException:
print "whoops"
try: ... except: ...
всегда плохая идея.
– hivert
23 July 2013 в 12:28
У меня есть другое предложение, которое является чистой функцией (с тем же API, что и предложение потоковой передачи) и, кажется, работает нормально (на основе предложений по этой теме)
def timeout(func, args=(), kwargs={}, timeout_duration=1, default=None):
import signal
class TimeoutError(Exception):
pass
def handler(signum, frame):
raise TimeoutError()
# set the timeout handler
signal.signal(signal.SIGALRM, handler)
signal.alarm(timeout_duration)
try:
result = func(*args, **kwargs)
except TimeoutError as exc:
result = default
finally:
signal.alarm(0)
return result
timeout
. Гораздо лучше установить значение по умолчанию None
, а в первой строке функции добавить kwargs = kwargs or {}
. Args в порядке, потому что кортежи не изменяемы.
– scottmrogowski
12 August 2016 в 17:13
Есть много предложений, но никто из них не использует concurrent.futures, который, по моему мнению, является наиболее понятным для этого способом.
from concurrent.futures import ProcessPoolExecutor
# Warning: this does not terminate function if timeout
def timeout_five(fnc, *args, **kwargs):
with ProcessPoolExecutor() as p:
f = p.submit(fnc, *args, **kwargs)
return f.result(timeout=5)
Супер просто читать и поддерживать.
Мы создаем пул, отправляем один процесс, а затем дождаемся до 5 секунд, прежде чем поднимать TimeoutError, который вы могли бы поймать и обработать, но вам нужно.
Родной для python 3.2+ и backported до 2.7 (
Переключение между потоками и процессами так же просто, как замена ProcessPoolExecutor
на ThreadPoolExecutor
.
Если вы хотите завершить процесс по таймауту, я бы предложил посмотреть в Pebble .
self
является параметром? Не могли бы вы дать образец вызова?
– Elliott Beach
22 October 2017 в 03:20
Как мне вызвать функцию или как ее обернуть, чтобы, если она занимает больше 5 секунд, скрипт отменяет ее?
Я отправил gist , который решает этот вопрос / проблему с декоратором и
threading.Timer
. Здесь он с разбивкой.Импорт и настройки для совместимости
Он был протестирован с Python 2 и 3. Он также должен работать под Unix / Linux и Windows.
Сначала импорт. Эти попытки сохранить код совместимым независимо от версии Python:
from __future__ import print_function import sys import threading from time import sleep try: import thread except ImportError: import _thread as thread
Использовать независимый от версии код:
try: range, _print = xrange, print def print(*args, **kwargs): flush = kwargs.pop('flush', False) _print(*args, **kwargs) if flush: kwargs.get('file', sys.stdout).flush() except NameError: pass
Теперь мы импортировали наши функции из стандартной библиотеки.
exit_after
decoratorДалее нам нужна функция для завершения
main()
из дочернего потока:def quit_function(fn_name): # print to stderr, unbuffered in Python 2. print('{0} took too long'.format(fn_name), file=sys.stderr) sys.stderr.flush() # Python 3 stderr is likely buffered. thread.interrupt_main() # raises KeyboardInterrupt
И вот сам декоратор :
def exit_after(s): ''' use as decorator to exit process if function takes longer than s seconds ''' def outer(fn): def inner(*args, **kwargs): timer = threading.Timer(s, quit_function, args=[fn.__name__]) timer.start() try: result = fn(*args, **kwargs) finally: timer.cancel() return result return inner return outer
Использование
И вот использование, которое напрямую отвечает на ваш вопрос об окончании через 5 секунд!:
@exit_after(5) def countdown(n): print('countdown started', flush=True) for i in range(n, -1, -1): print(i, end=', ', flush=True) sleep(1) print('countdown finished')
Демонстрация:
>>> countdown(3) countdown started 3, 2, 1, 0, countdown finished >>> countdown(10) countdown started 10, 9, 8, 7, 6, countdown took too long Traceback (most recent call last): File "<stdin>", line 1, in <module> File "<stdin>", line 11, in inner File "<stdin>", line 6, in countdown KeyboardInterrupt
Второй вызов функции не будет завершен, вместо этого процесс должен выйти с трассировкой!
KeyboardInterrupt
не всегда останавливает спальный потокОбратите внимание, что спящий режим не всегда прерывается прерыванием клавиатуры на Python 2 в Windows, например:
@exit_after(1) def sleep10(): sleep(10) print('slept 10 seconds') >>> sleep10() sleep10 took too long # Note that it hangs here about 9 more seconds Traceback (most recent call last): File "<stdin>", line 1, in <module> File "<stdin>", line 11, in inner File "<stdin>", line 3, in sleep10 KeyboardInterrupt
, и он не может прервать работу кода в расширениях, если он явно не проверяет наличие
PyErr_CheckSignals()
, см. Cython, Python и KeyboardInterrupt игнорируютсяЯ бы избежал спящего потока более секунды, в любом случае - это эон времени процессора.
] Как мне вызвать функцию или как ее обернуть, чтобы, если она занимает больше 5 секунд, скрипт отменяет ее и делает Что-то еще?
Чтобы поймать его и сделать что-то еще, вы можете поймать KeyboardInterrupt.
>>> try: ... countdown(10) ... except KeyboardInterrupt: ... print('do something else') ... countdown started 10, 9, 8, 7, 6, countdown took too long do something else
Ниже приведено небольшое улучшение для данного решения на основе потоков.
Код ниже поддерживает исключения:
def runFunctionCatchExceptions(func, *args, **kwargs):
try:
result = func(*args, **kwargs)
except Exception, message:
return ["exception", message]
return ["RESULT", result]
def runFunctionWithTimeout(func, args=(), kwargs={}, timeout_duration=10, default=None):
import threading
class InterruptableThread(threading.Thread):
def __init__(self):
threading.Thread.__init__(self)
self.result = default
def run(self):
self.result = runFunctionCatchExceptions(func, *args, **kwargs)
it = InterruptableThread()
it.start()
it.join(timeout_duration)
if it.isAlive():
return default
if it.result[0] == "exception":
raise it.result[1]
return it.result[1]
Вызывает его с 5-секундным таймаутом:
result = timeout(remote_calculate, (myarg,), timeout_duration=5)
runFunctionCatchExceptions()
вызываются определенные функции Python, получающие GIL. Например. следующее никогда бы или очень долгое время не возвращалось, если вызвано внутри функции: eval(2**9999999999**9999999999)
. См. stackoverflow.com/questions/22138190/…
– Mikko Ohtamaa
27 October 2014 в 13:53
Пакет stopit
, найденный на pypi, кажется хорошо обрабатывает тайм-ауты.
Мне нравится декоратор @stopit.threading_timeoutable
, который добавляет параметр timeout
к украшенной функции, что делает то, что вы ожидать, он останавливает функцию.
Проверьте это на pypi: https://pypi.python.org/pypi/stopit
Отличный, простой в использовании и надежный PyPi тайм-аут проекта (g0] https://pypi.org/project/timeout-decorator/ )
:
pip install timeout-decorator
Использование:
import time
import timeout_decorator
@timeout_decorator.timeout(5)
def mytest():
print "Start"
for i in range(1,10):
time.sleep(1)
print "%d seconds have passed" % i
if __name__ == '__main__':
mytest()
#!/usr/bin/python2
import sys, subprocess, threading
proc = subprocess.Popen(sys.argv[2:])
timer = threading.Timer(float(sys.argv[1]), proc.terminate)
timer.start()
proc.wait()
timer.cancel()
exit(proc.returncode)
У меня возникла необходимость в вложенных прерываниях времени (которые SIGALARM не может сделать), которые не будут блокироваться по времени. sleep (который ниточный подход не может сделать). Я закончил копирование и слегка изменяющий код отсюда: http://code.activestate.com/recipes/577600-queue-for-managing-multiple-sigalrm-alarms-concurr/
Сам код:
#!/usr/bin/python
# lightly modified version of http://code.activestate.com/recipes/577600-queue-for-managing-multiple-sigalrm-alarms-concurr/
"""alarm.py: Permits multiple SIGALRM events to be queued.
Uses a `heapq` to store the objects to be called when an alarm signal is
raised, so that the next alarm is always at the top of the heap.
"""
import heapq
import signal
from time import time
__version__ = '$Revision: 2539 $'.split()[1]
alarmlist = []
__new_alarm = lambda t, f, a, k: (t + time(), f, a, k)
__next_alarm = lambda: int(round(alarmlist[0][0] - time())) if alarmlist else None
__set_alarm = lambda: signal.alarm(max(__next_alarm(), 1))
class TimeoutError(Exception):
def __init__(self, message, id_=None):
self.message = message
self.id_ = id_
class Timeout:
''' id_ allows for nested timeouts. '''
def __init__(self, id_=None, seconds=1, error_message='Timeout'):
self.seconds = seconds
self.error_message = error_message
self.id_ = id_
def handle_timeout(self):
raise TimeoutError(self.error_message, self.id_)
def __enter__(self):
self.this_alarm = alarm(self.seconds, self.handle_timeout)
def __exit__(self, type, value, traceback):
try:
cancel(self.this_alarm)
except ValueError:
pass
def __clear_alarm():
"""Clear an existing alarm.
If the alarm signal was set to a callable other than our own, queue the
previous alarm settings.
"""
oldsec = signal.alarm(0)
oldfunc = signal.signal(signal.SIGALRM, __alarm_handler)
if oldsec > 0 and oldfunc != __alarm_handler:
heapq.heappush(alarmlist, (__new_alarm(oldsec, oldfunc, [], {})))
def __alarm_handler(*zargs):
"""Handle an alarm by calling any due heap entries and resetting the alarm.
Note that multiple heap entries might get called, especially if calling an
entry takes a lot of time.
"""
try:
nextt = __next_alarm()
while nextt is not None and nextt <= 0:
(tm, func, args, keys) = heapq.heappop(alarmlist)
func(*args, **keys)
nextt = __next_alarm()
finally:
if alarmlist: __set_alarm()
def alarm(sec, func, *args, **keys):
"""Set an alarm.
When the alarm is raised in `sec` seconds, the handler will call `func`,
passing `args` and `keys`. Return the heap entry (which is just a big
tuple), so that it can be cancelled by calling `cancel()`.
"""
__clear_alarm()
try:
newalarm = __new_alarm(sec, func, args, keys)
heapq.heappush(alarmlist, newalarm)
return newalarm
finally:
__set_alarm()
def cancel(alarm):
"""Cancel an alarm by passing the heap entry returned by `alarm()`.
It is an error to try to cancel an alarm which has already occurred.
"""
__clear_alarm()
try:
alarmlist.remove(alarm)
heapq.heapify(alarmlist)
finally:
if alarmlist: __set_alarm()
и пример использования:
import alarm
from time import sleep
try:
with alarm.Timeout(id_='a', seconds=5):
try:
with alarm.Timeout(id_='b', seconds=2):
sleep(3)
except alarm.TimeoutError as e:
print 'raised', e.id_
sleep(30)
except alarm.TimeoutError as e:
print 'raised', e.id_
else:
print 'nope.'
Я просмотрел этот поток при поиске тайм-аута на модульных тестах. Я не нашел ничего простого в ответах или сторонних пакетах, поэтому я написал декоратор ниже, вы можете перейти прямо в код:
import multiprocessing.pool
import functools
def timeout(max_timeout):
"""Timeout decorator, parameter in seconds."""
def timeout_decorator(item):
"""Wrap the original function."""
@functools.wraps(item)
def func_wrapper(*args, **kwargs):
"""Closure for function."""
pool = multiprocessing.pool.ThreadPool(processes=1)
async_result = pool.apply_async(item, args, kwargs)
# raises a TimeoutError if execution exceeds max_timeout
return async_result.get(max_timeout)
return func_wrapper
return timeout_decorator
Тогда это так просто, как это, чтобы пропустить тест или любую функцию вам нравится:
@timeout(5.0) # if execution takes longer than 5 seconds, raise a TimeoutError
def test_base_regression(self):
...
Exception
внутри func_wrapper и сделать pool.close()
после улова, чтобы гарантировать, что нить всегда умирает впоследствии независимо от того, что. Затем вы можете бросить TimeoutError
или все, что захотите. Кажется, работает для меня.
– sudo
28 January 2017 в 22:23
RuntimeError: can't start new thread
. Будет ли он работать, если я его игнорирую или есть что-то еще, что я могу сделать, чтобы обойти это? Заранее спасибо!
– Benjie
26 July 2017 в 12:39
Вы можете использовать multiprocessing.Process
для выполнения именно этого.
Код
import multiprocessing
import time
# bar
def bar():
for i in range(100):
print "Tick"
time.sleep(1)
if __name__ == '__main__':
# Start bar as a process
p = multiprocessing.Process(target=bar)
p.start()
# Wait for 10 seconds or until process finishes
p.join(10)
# If thread is still active
if p.is_alive():
print "running... let's kill it..."
# Terminate
p.terminate()
p.join()
join()
. что делает ваш x число одновременных подпроцессов запущенным до тех пор, пока они не закончат свою работу, или сумму, определенную в join(10)
. Дело в том, что у вас есть блокирующий ввод-вывод для 10 процессов, используя join (10), вы установили их для ожидания всех из них максимум 10 для начального процесса EACH. Используйте флаг демона, например, этот пример stackoverflow.com/a/27420072/2480481 . Конечно, u может передать флаг daemon=True
непосредственно функции multiprocessing.Process()
.
– erm3nda
2 January 2017 в 12:35
terminate() ... Note that exit handlers and finally clauses, etc., will not be executed. Note that descendant processes of the process will not be terminated – they will simply become orphaned.
– abalcerek
10 May 2017 в 14:03
signal.alarm
и связанные с нимSIGALRM
недоступны на платформах Windows. – Double AA 19 August 2011 в 17:20signal.signal
--- все ли они будут работать правильно? Не будет ли каждыйsignal.signal
отменить «одновременный» один? – brownian 10 May 2012 в 09:28