python input () ждет какое-то время, и если никакой ввод не указан, выполните команду default [duplicate]

Для тех, кто не хочет использовать внешние почтовые программы и хочет отправлять почту () на выделенный Linux-сервер.

Способ, как php-сообщения, описаны в php.ini в разделе [mail function] , Параметр sendmail-path описывает, как вызывается sendmail. Значение по умолчанию - sendmail -t -i, поэтому, если вы заработаете sendmail -t -i < message.txt в консоли linux, вы сделаете это. Вы также можете добавить mail.log для отладки и убедиться, что почта действительно вызвана.

Различные MTA могут реализовать sendmail, они просто делают символическую ссылку на свои двоичные файлы на это имя. Например, в debian default используется postfix. Настройте свой MTA для отправки почты и протестируйте ее с консоли с помощью sendmail -v -t -i < message.txt. Файл message.txt должен содержать все заголовки сообщения и тело, адресаты назначения для конверта будут взяты из заголовка To:. Пример:

From: myapp@example.com
To: mymail@example.com
Subject: Test mail via sendmail.

Text body.

Я предпочитаю использовать ssmtp как MTA, потому что он прост и не требует запуска демона с открытыми портами. ssmtp подходит только для отправки почты с локального хоста, он также может отправлять аутентифицированную электронную почту через вашу учетную запись в общедоступной почтовой службе. Установите ssmtp и отредактируйте конфигурацию /etc/ssmtp/ssmtp.conf. Чтобы иметь возможность также получать локальную системную почту для учетных записей unix (например, оповещения для root из заданий cron), настройте файл /etc/ssmtp/revaliases.

Вот моя конфигурация для моей учетной записи в почте Yandex:

root=mymail@example.com
mailhub=smtp.yandex.ru:465
FromLineOverride=YES
UseTLS=YES
AuthUser=abcde@yandex.ru
AuthPass=password
190
задан martineau 23 May 2016 в 20:52
поделиться

13 ответов

Вы можете использовать пакет signal , если вы работаете в UNIX:

In [1]: import signal

# Register an handler for the timeout
In [2]: def handler(signum, frame):
   ...:     print "Forever is over!"
   ...:     raise Exception("end of time")
   ...: 

# This function *may* run for an indetermined time...
In [3]: def loop_forever():
   ...:     import time
   ...:     while 1:
   ...:         print "sec"
   ...:         time.sleep(1)
   ...:         
   ...:         

# Register the signal function handler
In [4]: signal.signal(signal.SIGALRM, handler)
Out[4]: 0

# Define a timeout for your function
In [5]: signal.alarm(10)
Out[5]: 0

In [6]: try:
   ...:     loop_forever()
   ...: except Exception, exc: 
   ...:     print exc
   ....: 
sec
sec
sec
sec
sec
sec
sec
sec
Forever is over!
end of time

# Cancel the timer if the function returned before timeout
# (ok, mine won't but yours maybe will :)
In [7]: signal.alarm(0)
Out[7]: 0

Через 10 секунд после вызова alarm.alarm(10) вызывается обработчик. Это вызывает исключение, которое вы можете перехватить из обычного кода Python.

Этот модуль плохо воспроизводится с потоками (но тогда кто это делает?)

Обратите внимание, что поскольку мы поднимаем исключение, когда происходит тайм-аут, он может оказаться пойманным и проигнорированным внутри функции, например, одной из таких функций:

def loop_forever():
    while 1:
        print 'sec'
        try:
            time.sleep(10)
        except:
            continue
155
ответ дан satoru 23 August 2018 в 00:48
поделиться
  • 1
    Я использую Python 2.5.4. Существует такая ошибка: Traceback (последний последний вызов): File & quot; aa.py & quot ;, строка 85, в func signal.signal (signal.SIGALRM, обработчик) AttributeError: объект 'module' не имеет атрибута 'SIGALRM' – flypen 13 May 2011 в 02:59
  • 2
    @flypen это потому, что signal.alarm и связанные с ним SIGALRM недоступны на платформах Windows. – Double AA 19 August 2011 в 17:20
  • 3
    Если есть много процессов, и каждый вызов signal.signal --- все ли они будут работать правильно? Не будет ли каждый signal.signal отменить «одновременный» один? – brownian 10 May 2012 в 09:28
  • 4
    Предупреждение для тех, кто хочет использовать это с расширением C: обработчик сигнала Python не будет вызываться до тех пор, пока функция C не вернет управление интерпретатору Python. Для этого варианта использования используйте ответ ATOzTOA: stackoverflow.com/a/14924210/1286628 – wkschwartz 20 February 2014 в 22:25
  • 5
    Второе предупреждение о потоках. signal.alarm работает только в основном потоке. Я попытался использовать это в представлениях Django - немедленный сбой с формулировкой только основного потока. – JL Peyret 2 April 2015 в 06:51

Мы можем использовать сигналы для одного и того же. Я думаю, что приведенный ниже пример будет полезен для вас. Это очень просто по сравнению с потоками.

import signal

def timeout(signum, frame):
    raise myException

#this is an infinite loop, never ending under normal circumstances
def main():
    print 'Starting Main ',
    while 1:
        print 'in main ',

#SIGALRM is only usable on a unix platform
signal.signal(signal.SIGALRM, timeout)

#change 5 to however many seconds you need
signal.alarm(5)

try:
    main()
except myException:
    print "whoops"
0
ответ дан A R 23 August 2018 в 00:48
поделиться
  • 1
    Лучше выбрать конкретное исключение и поймать только его. Bare try: ... except: ... всегда плохая идея. – hivert 23 July 2013 в 12:28
  • 2

У меня есть другое предложение, которое является чистой функцией (с тем же API, что и предложение потоковой передачи) и, кажется, работает нормально (на основе предложений по этой теме)

def timeout(func, args=(), kwargs={}, timeout_duration=1, default=None):
    import signal

    class TimeoutError(Exception):
        pass

    def handler(signum, frame):
        raise TimeoutError()

    # set the timeout handler
    signal.signal(signal.SIGALRM, handler) 
    signal.alarm(timeout_duration)
    try:
        result = func(*args, **kwargs)
    except TimeoutError as exc:
        result = default
    finally:
        signal.alarm(0)

    return result
43
ответ дан Alex 23 August 2018 в 00:48
поделиться
  • 1
    Вы также должны восстановить исходный обработчик сигнала. См. stackoverflow.com/questions/492519/… – Martin Konecny 11 June 2013 в 16:21
  • 2
    Еще одно замечание: метод сигнала Unix работает только в том случае, если вы применяете его в основном потоке. Применение его в подполе вызывает исключение и не будет работать. – Martin Konecny 12 June 2013 в 21:23
  • 3
    Это не лучшее решение, потому что оно работает только с Linux. – max 13 March 2014 в 22:10
  • 4
    Макс, не верно - работает с любым совместимым с POSIX unix. Я думаю, что ваш комментарий должен быть более точным, не работает в Windows. – Chris Johnson 16 November 2015 в 20:41
  • 5
    Вам следует избегать установки kwargs в пустой dict. Общий доступ к Python - это то, что аргументы по умолчанию для функций изменяемы. Таким образом, словарь будет использоваться для всех вызовов timeout. Гораздо лучше установить значение по умолчанию None, а в первой строке функции добавить kwargs = kwargs or {}. Args в порядке, потому что кортежи не изменяемы. – scottmrogowski 12 August 2016 в 17:13

Есть много предложений, но никто из них не использует concurrent.futures, который, по моему мнению, является наиболее понятным для этого способом.

from concurrent.futures import ProcessPoolExecutor

# Warning: this does not terminate function if timeout
def timeout_five(fnc, *args, **kwargs):
    with ProcessPoolExecutor() as p:
        f = p.submit(fnc, *args, **kwargs)
        return f.result(timeout=5)

Супер просто читать и поддерживать.

Мы создаем пул, отправляем один процесс, а затем дождаемся до 5 секунд, прежде чем поднимать TimeoutError, который вы могли бы поймать и обработать, но вам нужно.

Родной для python 3.2+ и backported до 2.7 (

Переключение между потоками и процессами так же просто, как замена ProcessPoolExecutor на ThreadPoolExecutor.

Если вы хотите завершить процесс по таймауту, я бы предложил посмотреть в Pebble .

10
ответ дан Brian 23 August 2018 в 00:48
поделиться
  • 1
    Лучше, чем верхний ответ, но почему параметр self является параметром? Не могли бы вы дать образец вызова? – Elliott Beach 22 October 2017 в 03:20
  • 2
    @ElliottBeach Это была опечатка. Исправлена. – Brian 23 October 2017 в 21:14
  • 3
    Что такое & quot; Предупреждение: это не прерывает функцию, если тайм-аут & quot; имею в виду? – Scott Stafford 8 December 2017 в 17:25
  • 4
    @ScottStafford Процессы / потоки не заканчиваются только потому, что TimeoutError был поднят. Таким образом, процесс или поток по-прежнему будут пытаться выполнить до конца и не будут автоматически давать вам контроль за вашим таймаутом. – Brian 11 December 2017 в 08:59
  • 5
    Позволит ли я сохранить результаты, промежуточные в то время? например если у меня есть рекурсивная функция, я устанавливаю тайм-аут на 5, и за это время у меня есть частичные результаты, как мне написать функцию для возврата частичных результатов при тайм-ауте? – SumNeuron 16 March 2018 в 12:49

Как мне вызвать функцию или как ее обернуть, чтобы, если она занимает больше 5 секунд, скрипт отменяет ее?

Я отправил gist , который решает этот вопрос / проблему с декоратором и threading.Timer. Здесь он с разбивкой.

Импорт и настройки для совместимости

Он был протестирован с Python 2 и 3. Он также должен работать под Unix / Linux и Windows.

Сначала импорт. Эти попытки сохранить код совместимым независимо от версии Python:

from __future__ import print_function
import sys
import threading
from time import sleep
try:
    import thread
except ImportError:
    import _thread as thread

Использовать независимый от версии код:

try:
    range, _print = xrange, print
    def print(*args, **kwargs): 
        flush = kwargs.pop('flush', False)
        _print(*args, **kwargs)
        if flush:
            kwargs.get('file', sys.stdout).flush()            
except NameError:
    pass

Теперь мы импортировали наши функции из стандартной библиотеки.

exit_after decorator

Далее нам нужна функция для завершения main() из дочернего потока:

def quit_function(fn_name):
    # print to stderr, unbuffered in Python 2.
    print('{0} took too long'.format(fn_name), file=sys.stderr)
    sys.stderr.flush() # Python 3 stderr is likely buffered.
    thread.interrupt_main() # raises KeyboardInterrupt

И вот сам декоратор :

def exit_after(s):
    '''
    use as decorator to exit process if 
    function takes longer than s seconds
    '''
    def outer(fn):
        def inner(*args, **kwargs):
            timer = threading.Timer(s, quit_function, args=[fn.__name__])
            timer.start()
            try:
                result = fn(*args, **kwargs)
            finally:
                timer.cancel()
            return result
        return inner
    return outer

Использование

И вот использование, которое напрямую отвечает на ваш вопрос об окончании через 5 секунд!:

@exit_after(5)
def countdown(n):
    print('countdown started', flush=True)
    for i in range(n, -1, -1):
        print(i, end=', ', flush=True)
        sleep(1)
    print('countdown finished')

Демонстрация:

>>> countdown(3)
countdown started
3, 2, 1, 0, countdown finished
>>> countdown(10)
countdown started
10, 9, 8, 7, 6, countdown took too long
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "<stdin>", line 11, in inner
  File "<stdin>", line 6, in countdown
KeyboardInterrupt

Второй вызов функции не будет завершен, вместо этого процесс должен выйти с трассировкой!

KeyboardInterrupt не всегда останавливает спальный поток

Обратите внимание, что спящий режим не всегда прерывается прерыванием клавиатуры на Python 2 в Windows, например:

@exit_after(1)
def sleep10():
    sleep(10)
    print('slept 10 seconds')

>>> sleep10()
sleep10 took too long         # Note that it hangs here about 9 more seconds
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "<stdin>", line 11, in inner
  File "<stdin>", line 3, in sleep10
KeyboardInterrupt

, и он не может прервать работу кода в расширениях, если он явно не проверяет наличие PyErr_CheckSignals(), см. Cython, Python и KeyboardInterrupt игнорируются

Я бы избежал спящего потока более секунды, в любом случае - это эон времени процессора.

] Как мне вызвать функцию или как ее обернуть, чтобы, если она занимает больше 5 секунд, скрипт отменяет ее и делает Что-то еще?

Чтобы поймать его и сделать что-то еще, вы можете поймать KeyboardInterrupt.

>>> try:
...     countdown(10)
... except KeyboardInterrupt:
...     print('do something else')
... 
countdown started
10, 9, 8, 7, 6, countdown took too long
do something else
44
ответ дан Community 23 August 2018 в 00:48
поделиться

Ниже приведено небольшое улучшение для данного решения на основе потоков.

Код ниже поддерживает исключения:

def runFunctionCatchExceptions(func, *args, **kwargs):
    try:
        result = func(*args, **kwargs)
    except Exception, message:
        return ["exception", message]

    return ["RESULT", result]


def runFunctionWithTimeout(func, args=(), kwargs={}, timeout_duration=10, default=None):
    import threading
    class InterruptableThread(threading.Thread):
        def __init__(self):
            threading.Thread.__init__(self)
            self.result = default
        def run(self):
            self.result = runFunctionCatchExceptions(func, *args, **kwargs)
    it = InterruptableThread()
    it.start()
    it.join(timeout_duration)
    if it.isAlive():
        return default

    if it.result[0] == "exception":
        raise it.result[1]

    return it.result[1]

Вызывает его с 5-секундным таймаутом:

result = timeout(remote_calculate, (myarg,), timeout_duration=5)
0
ответ дан diemacht 23 August 2018 в 00:48
поделиться
  • 1
    Это вызовет новое исключение, скрывающее исходную трассировку. Смотрите мою версию ниже ... – Meitham 14 December 2012 в 13:20
  • 2
    Это также небезопасно, как будто внутри runFunctionCatchExceptions() вызываются определенные функции Python, получающие GIL. Например. следующее никогда бы или очень долгое время не возвращалось, если вызвано внутри функции: eval(2**9999999999**9999999999). См. stackoverflow.com/questions/22138190/… – Mikko Ohtamaa 27 October 2014 в 13:53

Пакет stopit, найденный на pypi, кажется хорошо обрабатывает тайм-ауты.

Мне нравится декоратор @stopit.threading_timeoutable, который добавляет параметр timeout к украшенной функции, что делает то, что вы ожидать, он останавливает функцию.

Проверьте это на pypi: https://pypi.python.org/pypi/stopit

9
ответ дан egeland 23 August 2018 в 00:48
поделиться

Отличный, простой в использовании и надежный PyPi тайм-аут проекта (g0] https://pypi.org/project/timeout-decorator/ )

:

pip install timeout-decorator

Использование:

import time
import timeout_decorator

@timeout_decorator.timeout(5)
def mytest():
    print "Start"
    for i in range(1,10):
        time.sleep(1)
        print "%d seconds have passed" % i

if __name__ == '__main__':
    mytest()
0
ответ дан Gil 23 August 2018 в 00:48
поделиться
#!/usr/bin/python2
import sys, subprocess, threading
proc = subprocess.Popen(sys.argv[2:])
timer = threading.Timer(float(sys.argv[1]), proc.terminate)
timer.start()
proc.wait()
timer.cancel()
exit(proc.returncode)
2
ответ дан Hal Canary 23 August 2018 в 00:48
поделиться
  • 1
    Хотя этот код может ответить на вопрос, предоставление дополнительного контекста относительно того, как и / или почему оно решает проблему, улучшит долгосрочную ценность ответа – Dan Cornilescu 27 April 2016 в 13:48

У меня возникла необходимость в вложенных прерываниях времени (которые SIGALARM не может сделать), которые не будут блокироваться по времени. sleep (который ниточный подход не может сделать). Я закончил копирование и слегка изменяющий код отсюда: http://code.activestate.com/recipes/577600-queue-for-managing-multiple-sigalrm-alarms-concurr/

Сам код:

#!/usr/bin/python

# lightly modified version of http://code.activestate.com/recipes/577600-queue-for-managing-multiple-sigalrm-alarms-concurr/


"""alarm.py: Permits multiple SIGALRM events to be queued.

Uses a `heapq` to store the objects to be called when an alarm signal is
raised, so that the next alarm is always at the top of the heap.
"""

import heapq
import signal
from time import time

__version__ = '$Revision: 2539 $'.split()[1]

alarmlist = []

__new_alarm = lambda t, f, a, k: (t + time(), f, a, k)
__next_alarm = lambda: int(round(alarmlist[0][0] - time())) if alarmlist else None
__set_alarm = lambda: signal.alarm(max(__next_alarm(), 1))


class TimeoutError(Exception):
    def __init__(self, message, id_=None):
        self.message = message
        self.id_ = id_


class Timeout:
    ''' id_ allows for nested timeouts. '''
    def __init__(self, id_=None, seconds=1, error_message='Timeout'):
        self.seconds = seconds
        self.error_message = error_message
        self.id_ = id_
    def handle_timeout(self):
        raise TimeoutError(self.error_message, self.id_)
    def __enter__(self):
        self.this_alarm = alarm(self.seconds, self.handle_timeout)
    def __exit__(self, type, value, traceback):
        try:
            cancel(self.this_alarm) 
        except ValueError:
            pass


def __clear_alarm():
    """Clear an existing alarm.

    If the alarm signal was set to a callable other than our own, queue the
    previous alarm settings.
    """
    oldsec = signal.alarm(0)
    oldfunc = signal.signal(signal.SIGALRM, __alarm_handler)
    if oldsec > 0 and oldfunc != __alarm_handler:
        heapq.heappush(alarmlist, (__new_alarm(oldsec, oldfunc, [], {})))


def __alarm_handler(*zargs):
    """Handle an alarm by calling any due heap entries and resetting the alarm.

    Note that multiple heap entries might get called, especially if calling an
    entry takes a lot of time.
    """
    try:
        nextt = __next_alarm()
        while nextt is not None and nextt <= 0:
            (tm, func, args, keys) = heapq.heappop(alarmlist)
            func(*args, **keys)
            nextt = __next_alarm()
    finally:
        if alarmlist: __set_alarm()


def alarm(sec, func, *args, **keys):
    """Set an alarm.

    When the alarm is raised in `sec` seconds, the handler will call `func`,
    passing `args` and `keys`. Return the heap entry (which is just a big
    tuple), so that it can be cancelled by calling `cancel()`.
    """
    __clear_alarm()
    try:
        newalarm = __new_alarm(sec, func, args, keys)
        heapq.heappush(alarmlist, newalarm)
        return newalarm
    finally:
        __set_alarm()


def cancel(alarm):
    """Cancel an alarm by passing the heap entry returned by `alarm()`.

    It is an error to try to cancel an alarm which has already occurred.
    """
    __clear_alarm()
    try:
        alarmlist.remove(alarm)
        heapq.heapify(alarmlist)
    finally:
        if alarmlist: __set_alarm()

и пример использования:

import alarm
from time import sleep

try:
    with alarm.Timeout(id_='a', seconds=5):
        try:
            with alarm.Timeout(id_='b', seconds=2):
                sleep(3)
        except alarm.TimeoutError as e:
            print 'raised', e.id_
        sleep(30)
except alarm.TimeoutError as e:
    print 'raised', e.id_
else:
    print 'nope.'
1
ответ дан James 23 August 2018 в 00:48
поделиться

Я просмотрел этот поток при поиске тайм-аута на модульных тестах. Я не нашел ничего простого в ответах или сторонних пакетах, поэтому я написал декоратор ниже, вы можете перейти прямо в код:

import multiprocessing.pool
import functools

def timeout(max_timeout):
    """Timeout decorator, parameter in seconds."""
    def timeout_decorator(item):
        """Wrap the original function."""
        @functools.wraps(item)
        def func_wrapper(*args, **kwargs):
            """Closure for function."""
            pool = multiprocessing.pool.ThreadPool(processes=1)
            async_result = pool.apply_async(item, args, kwargs)
            # raises a TimeoutError if execution exceeds max_timeout
            return async_result.get(max_timeout)
        return func_wrapper
    return timeout_decorator

Тогда это так просто, как это, чтобы пропустить тест или любую функцию вам нравится:

@timeout(5.0)  # if execution takes longer than 5 seconds, raise a TimeoutError
def test_base_regression(self):
    ...
26
ответ дан Matt Tardiff 23 August 2018 в 00:48
поделиться
  • 1
    Будьте осторожны, так как это не завершает функцию после достижения таймаута! – Sylvain 16 September 2016 в 10:35
  • 2
    Обратите внимание, что в Windows это порождает совершенно новый процесс - который будет потреблять время до таймаута, возможно, много, если зависимости потребуют много времени для настройки. – Aaron Hall♦ 11 January 2017 в 18:59
  • 3
    Да, это требует некоторой настройки. Он оставляет потоки навсегда. – sudo 28 January 2017 в 22:15
  • 4
    IDK, если это лучший способ, но вы можете попробовать / поймать Exception внутри func_wrapper и сделать pool.close() после улова, чтобы гарантировать, что нить всегда умирает впоследствии независимо от того, что. Затем вы можете бросить TimeoutError или все, что захотите. Кажется, работает для меня. – sudo 28 January 2017 в 22:23
  • 5
    Это полезно, но как только я это делал много раз, я получаю RuntimeError: can't start new thread. Будет ли он работать, если я его игнорирую или есть что-то еще, что я могу сделать, чтобы обойти это? Заранее спасибо! – Benjie 26 July 2017 в 12:39

Вы можете использовать multiprocessing.Process для выполнения именно этого.

Код

import multiprocessing
import time

# bar
def bar():
    for i in range(100):
        print "Tick"
        time.sleep(1)

if __name__ == '__main__':
    # Start bar as a process
    p = multiprocessing.Process(target=bar)
    p.start()

    # Wait for 10 seconds or until process finishes
    p.join(10)

    # If thread is still active
    if p.is_alive():
        print "running... let's kill it..."

        # Terminate
        p.terminate()
        p.join()
101
ответ дан rtrwalker 23 August 2018 в 00:48
поделиться
  • 1
    Как получить возвращаемое значение целевого метода? – bad_keypoints 11 August 2015 в 07:05
  • 2
    Это не работает, если вызываемая функция застревает в блоке ввода / вывода. – sudo 29 July 2016 в 18:35
  • 3
    @bad_keypoints См. этот ответ: stackoverflow.com/a/10415215/1384471 В принципе, вы передаете список, по которому вы вставляете ответ. – Peter 15 December 2016 в 11:19
  • 4
  • 5
    @ATOzTOA проблема с этим решением, по крайней мере для моих целей, заключается в том, что она потенциально не позволяет детям прокладывать чистку после себя. Из документации функции завершения terminate() ... Note that exit handlers and finally clauses, etc., will not be executed. Note that descendant processes of the process will not be terminated – they will simply become orphaned. – abalcerek 10 May 2017 в 14:03
0
ответ дан as - if 5 November 2018 в 22:31
поделиться
Другие вопросы по тегам:

Похожие вопросы: