Исполнители Java: как получить уведомление, не блокируя, когда задача завершена?

Ниже перечислены любые глобальные данные Python, которые вы не хотите воссоздавать для каждого запроса, а не только rserve, а не только данные, которые уникальны для каждого пользователя.

Нам нужно некоторое общее местоположение создать соединение rserve для каждого пользователя. Самый простой способ сделать это - запустить multiprocessing.Manager в качестве отдельного процесса.

import atexit
from multiprocessing import Lock
from multiprocessing.managers import BaseManager
import pyRserve

connections = {}
lock = Lock()


def get_connection(user_id):
    with lock:
        if user_id not in connections:
            connections[user_id] = pyRserve.connect()

        return connections[user_id]


@atexit.register
def close_connections():
    for connection in connections.values():
        connection.close()


manager = BaseManager(('', 37844), b'password')
manager.register('get_connection', get_connection)
server = manager.get_server()
server.serve_forever()

Запустить его перед запуском приложения, чтобы менеджер был доступен:

python rserve_manager.py

Мы можем получить доступ к этому менеджеру из приложения во время запросов, используя простую функцию. Это предполагает, что у вас есть значение для «user_id» в сеансе (что, например, делает Flask-Login). Это приведет к уникальному уникальному соединению rserve для каждого пользователя, а не к сеансу.

from multiprocessing.managers import BaseManager
from flask import g, session

def get_rserve():
    if not hasattr(g, 'rserve'):
        manager = BaseManager(('', 37844), b'password')
        manager.register('get_connection')
        manager.connect()
        g.rserve = manager.get_connection(session['user_id'])

    return g.rserve

Доступ к нему внутри представления:

result = get_rserve().eval('3 + 5')

Это должно помочь вам начать, хотя есть много возможностей, которые можно улучшить, например, не жестко кодировать адрес и пароль, а не отбрасывать соединения с менеджером. Это было написано с помощью Python 3, но должно работать с Python 2.

138
задан Michael Myers 5 May 2009 в 18:18
поделиться

4 ответа

Определите интерфейс обратного вызова для получения любых параметров, которые вы хотите передать в уведомление о завершении. Затем вызовите его в конце задачи.

Вы могли бы даже написать общую оболочку для Runnable задач и отправить их в ExecutorService . Или см. Ниже механизм, встроенный в Java 8.

class CallbackTask implements Runnable {

  private final Runnable task;

  private final Callback callback;

  CallbackTask(Runnable task, Callback callback) {
    this.task = task;
    this.callback = callback;
  }

  public void run() {
    task.run();
    callback.complete();
  }

}

С CompletableFuture Java 8 включала более сложные средства для составления конвейеров, где процессы могут выполняться асинхронно и условно. Вот' Придуманный, но полный пример уведомления.

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;

public class GetTaskNotificationWithoutBlocking {

  public static void main(String... argv) throws Exception {
    ExampleService svc = new ExampleService();
    GetTaskNotificationWithoutBlocking listener = new GetTaskNotificationWithoutBlocking();
    CompletableFuture<String> f = CompletableFuture.supplyAsync(svc::work);
    f.thenAccept(listener::notify);
    System.out.println("Exiting main()");
  }

  void notify(String msg) {
    System.out.println("Received message: " + msg);
  }

}

class ExampleService {

  String work() {
    sleep(7000, TimeUnit.MILLISECONDS); /* Pretend to be busy... */
    char[] str = new char[5];
    ThreadLocalRandom current = ThreadLocalRandom.current();
    for (int idx = 0; idx < str.length; ++idx)
      str[idx] = (char) ('A' + current.nextInt(26));
    String msg = new String(str);
    System.out.println("Generated message: " + msg);
    return msg;
  }

  public static void sleep(long average, TimeUnit unit) {
    String name = Thread.currentThread().getName();
    long timeout = Math.min(exponential(average), Math.multiplyExact(10, average));
    System.out.printf("%s sleeping %d %s...%n", name, timeout, unit);
    try {
      unit.sleep(timeout);
      System.out.println(name + " awoke.");
    } catch (InterruptedException abort) {
      Thread.currentThread().interrupt();
      System.out.println(name + " interrupted.");
    }
  }

  public static long exponential(long avg) {
    return (long) (avg * -Math.log(1 - ThreadLocalRandom.current().nextDouble()));
  }

}
135
ответ дан 23 November 2019 в 23:20
поделиться

Если вы хотите убедиться, что никакие задачи не будут выполняться одновременно, используйте SingleThreadedExecutor . Задачи будут обрабатываться в порядке их отправки. Вам даже не нужно держать задачи, просто отправьте их исполнительному директору.

5
ответ дан 23 November 2019 в 23:20
поделиться

ThreadPoolExecutor также имеет beforeExecute и afterExecute методы перехвата, которые вы можете переопределить и использовать. Вот описание из ThreadPoolExecutor Javadocs .

Методы подключения

Этот класс предоставляет защищенный переопределяемый beforeExecute (java.lang.Thread, java.lang). Runnable) и afterExecute (java.lang.Runnable, java.lang.Throwable) методы, которые вызываются до и после выполнения каждой задачи. Их можно использовать для манипулирования средой исполнения; например, повторная инициализация ThreadLocals , сбор статистики или добавление записей журнала. Дополнительно, Метод terminated () может быть переопределен для выполнения любой специальной обработки, которая должна быть выполнена после полного завершения Executor . Если методы перехвата или обратного вызова генерируют исключения, внутренние рабочие потоки могут, в свою очередь, завершиться с ошибкой и внезапно прерваться.

14
ответ дан 23 November 2019 в 23:20
поделиться

Использовать CountDownLatch .

Он из java.util.concurrent , и это именно тот способ, которым несколько потоков ожидают завершения выполнения, прежде чем продолжение.

Чтобы добиться эффекта обратного вызова, за которым вы ухаживаете, это требует немного дополнительной дополнительной работы. А именно, обрабатывая это самостоятельно в отдельном потоке, который использует CountDownLatch и действительно ожидает его, а затем продолжает уведомлять обо всем, что вам нужно уведомить. Отсутствует встроенная поддержка обратного вызова или чего-либо подобного этому эффекту.


РЕДАКТИРОВАТЬ: Теперь, когда я еще больше понимаю ваш вопрос, я думаю, что вы слишком далеко зашли, излишне. Если вы возьмете обычный SingleThreadExecutor , задайте ему все задачи, и он будет выполнять очередность изначально.

6
ответ дан 23 November 2019 в 23:20
поделиться
Другие вопросы по тегам:

Похожие вопросы: