Различные макеты groupby
df.index.to_series().groupby([df['A'],df['B']]).apply(list)
Out[449]:
A B
1 3 [0, 1, 2]
2 4 [3, 4]
5 [5]
dtype: object
Вы можете использовать LinkedBlockingDeque и физически удалить элемент из очереди (используя takeLast ()
), но снова заменить его на конец очереди , если обработка не удалась, с использованием putLast (E e)
. Тем временем ваши «производители» будут добавлять элементы в перед очереди, используя putFirst (E e)
.
Вы всегда можете инкапсулировать это поведение в своей собственной Queue
и предоставить метод blockingPeek ()
, который выполняет takeLast ()
, за которым следует putLast ()
за кулисами базового LinkedBlockingDeque
]. Следовательно, с точки зрения вызывающего клиента элемент никогда не удаляется из вашей очереди.
не обрабатывает следующий элемент до предыдущий , элемент обрабатывается успешно.
public void run() {
Object lastSuccessfullyProcessedElement = null;
while (!exit) {
Object obj = lastSuccessfullyProcessedElement == null ? queue.take() : lastSuccessfullyProcessedElement; // blocking
boolean successful = process(obj);
if(!successful) {
lastSuccessfullyProcessedElement = obj;
} else {
lastSuccessfullyProcessedElement = null;
}
}
}
peek()
и проверка, если значение является нулевым, не является эффективным ЦП. я видел, что использование ЦП идет в 10% в моей системе, когда очередь пуста для следующей программы.
while (true) {
Object o = queue.peek();
if(o == null) continue;
// omitted for the sake of brevity
}
Добавление sleep()
добавляет замедление.
Добавление его назад очереди, использующей putLast
, нарушит порядок. Кроме того, это - операция блокирования, которая требует блокировок.
Быстрый ответ: на самом деле нет способа получить блокирующий просмотр, панель, самостоятельно реализующая блокирующую очередь с блокирующим peek ().
Я что-то упускаю?
peek () может вызвать проблемы с параллелизмом -
Похоже, вам лучше удалить элемент и обработать его с помощью Схема цепочки ответственности
Правка: re: ваш последний пример: если у вас только 1 потребитель, вы никогда не избавитесь от объекта в очереди - если он не обновится за это время - в этом случае вам лучше быть очень осторожными с безопасностью потоков и, вероятно, в любом случае не следовало помещать элемент в очередь.
Похоже, что сама BlockingQueue не имеет функций, которые вы указываете.
Я мог бы попытаться немного переформулировать проблему: что бы вы сделали с объектами, которые нельзя «правильно обработать»? Если вы просто оставите их в очереди, вы В какой-то момент придется их вытащить и разобраться с ними. Я бы рекомендовал либо выяснить, как их обрабатывать (обычно, если queue.get () дает какое-либо недопустимое или неправильное значение, вы, вероятно, можете просто бросить его на пол), либо выбрать другую структуру данных, чем FIFO.
Однако, поскольку я не знаю, смогу ли я успешно обработать объект, я хочу просто peek (), а не удалять объект. Я хочу удалить объект только в том случае, если смогу успешно его обработать.
В общем, он не является потокобезопасным. Что если после того, как вы peek ()
определите, что объект может быть успешно обработан, но до того, как вы take ()
его для удаления и обработки, другой поток возьмет этот объект?
Может вы также просто добавляете очередь слушателя событий в свою очередь блокировки, а затем, когда что-то добавляется в очередь (блокирующую), отправляете событие своим слушателям? Вы можете заблокировать поток до тех пор, пока не будет вызван его метод actionPerformed.
Единственное, что я знаю, это BlockingBuffer в Коллекции Apache Commons :
Если вызывается либо get, либо remove на пустой буфер, вызывающий поток ожидает уведомления о добавлении или Операция addAll завершена.
get ()
эквивалентно peek ()
, а буфер
может действовать как BlockingQueue
, украшение UnboundedFifoBuffer с помощью BlockingBuffer