java BlockingQueue не имеет блокирующегося быстрого взгляда?

Различные макеты groupby

df.index.to_series().groupby([df['A'],df['B']]).apply(list)
Out[449]: 
A  B
1  3    [0, 1, 2]
2  4       [3, 4]
   5          [5]
dtype: object
31
задан rouble 24 February 2019 в 06:56
поделиться

7 ответов

Вы можете использовать LinkedBlockingDeque и физически удалить элемент из очереди (используя takeLast () ), но снова заменить его на конец очереди , если обработка не удалась, с использованием putLast (E e) . Тем временем ваши «производители» будут добавлять элементы в перед очереди, используя putFirst (E e) .

Вы всегда можете инкапсулировать это поведение в своей собственной Queue и предоставить метод blockingPeek () , который выполняет takeLast () , за которым следует putLast () за кулисами базового LinkedBlockingDeque ]. Следовательно, с точки зрения вызывающего клиента элемент никогда не удаляется из вашей очереди.

14
ответ дан 27 November 2019 в 22:49
поделиться

'Самое простое' решение

не обрабатывает следующий элемент до предыдущий , элемент обрабатывается успешно.

public void run() {

Object lastSuccessfullyProcessedElement = null;

    while (!exit) {
        Object obj =  lastSuccessfullyProcessedElement == null ? queue.take() : lastSuccessfullyProcessedElement; // blocking

        boolean successful = process(obj);

        if(!successful) {
            lastSuccessfullyProcessedElement = obj;
        } else {
            lastSuccessfullyProcessedElement = null;
        }
    }
}
  1. Вызов peek() и проверка, если значение является нулевым, не является эффективным ЦП.

я видел, что использование ЦП идет в 10% в моей системе, когда очередь пуста для следующей программы.

while (true) {
   Object o = queue.peek();
   if(o == null) continue;
   // omitted for the sake of brevity
}
  1. Добавление sleep() добавляет замедление.

  2. Добавление его назад очереди, использующей putLast, нарушит порядок. Кроме того, это - операция блокирования, которая требует блокировок.

0
ответ дан 27 November 2019 в 22:49
поделиться

Быстрый ответ: на самом деле нет способа получить блокирующий просмотр, панель, самостоятельно реализующая блокирующую очередь с блокирующим peek ().

Я что-то упускаю?

peek () может вызвать проблемы с параллелизмом -

  • Если вы не можете обработать свое сообщение peek () 'd - оно останется в очереди, если только у вас не будет нескольких потребителей.
  • Кто это получит. объект вне очереди, если вы не можете его обработать?
  • Если у вас несколько потребителей, вы получаете состояние гонки между вами peek () 'ing и другим потоком, также обрабатывающим элементы, что приводит к дублированию обработки или хуже.

Похоже, вам лучше удалить элемент и обработать его с помощью Схема цепочки ответственности

Правка: re: ваш последний пример: если у вас только 1 потребитель, вы никогда не избавитесь от объекта в очереди - если он не обновится за это время - в этом случае вам лучше быть очень осторожными с безопасностью потоков и, вероятно, в любом случае не следовало помещать элемент в очередь.

1
ответ дан 27 November 2019 в 22:49
поделиться

Похоже, что сама BlockingQueue не имеет функций, которые вы указываете.

Я мог бы попытаться немного переформулировать проблему: что бы вы сделали с объектами, которые нельзя «правильно обработать»? Если вы просто оставите их в очереди, вы В какой-то момент придется их вытащить и разобраться с ними. Я бы рекомендовал либо выяснить, как их обрабатывать (обычно, если queue.get () дает какое-либо недопустимое или неправильное значение, вы, вероятно, можете просто бросить его на пол), либо выбрать другую структуру данных, чем FIFO.

0
ответ дан 27 November 2019 в 22:49
поделиться

Однако, поскольку я не знаю, смогу ли я успешно обработать объект, я хочу просто peek (), а не удалять объект. Я хочу удалить объект только в том случае, если смогу успешно его обработать.

В общем, он не является потокобезопасным. Что если после того, как вы peek () определите, что объект может быть успешно обработан, но до того, как вы take () его для удаления и обработки, другой поток возьмет этот объект?

]
6
ответ дан 27 November 2019 в 22:49
поделиться

Может вы также просто добавляете очередь слушателя событий в свою очередь блокировки, а затем, когда что-то добавляется в очередь (блокирующую), отправляете событие своим слушателям? Вы можете заблокировать поток до тех пор, пока не будет вызван его метод actionPerformed.

1
ответ дан 27 November 2019 в 22:49
поделиться

Единственное, что я знаю, это BlockingBuffer в Коллекции Apache Commons :

Если вызывается либо get, либо remove на пустой буфер, вызывающий поток ожидает уведомления о добавлении или Операция addAll завершена.

get () эквивалентно peek () , а буфер может действовать как BlockingQueue , украшение UnboundedFifoBuffer с помощью BlockingBuffer

2
ответ дан 27 November 2019 в 22:49
поделиться
Другие вопросы по тегам:

Похожие вопросы: