Возможно, это - глупый вопрос, но я, может казаться, не нахожу очевидный ответ.
Мне нужна параллельная очередь FIFO, которая содержит только уникальные значения. Попытка добавить значение, которое уже существует в очереди просто, игнорирует то значение. Который, если бы не потокобезопасность было бы тривиально. Существует ли структура данных в Java или возможно коде snipit в межсетях, который показывает это поведение?
Если вам нужен более высокий параллелизм, чем полная синхронизация, есть один известный мне способ сделать это, используя ConcurrentHashMap в качестве опорной карты. Ниже приведен только набросок.
public final class ConcurrentHashSet<E> extends ForwardingSet<E>
implements Set<E>, Queue<E> {
private enum Dummy { VALUE }
private final ConcurrentMap<E, Dummy> map;
ConcurrentHashSet(ConcurrentMap<E, Dummy> map) {
super(map.keySet());
this.map = Preconditions.checkNotNull(map);
}
@Override public boolean add(E element) {
return map.put(element, Dummy.VALUE) == null;
}
@Override public boolean addAll(Collection<? extends E> newElements) {
// just the standard implementation
boolean modified = false;
for (E element : newElements) {
modified |= add(element);
}
return modified;
}
@Override public boolean offer(E element) {
return add(element);
}
@Override public E remove() {
E polled = poll();
if (polled == null) {
throw new NoSuchElementException();
}
return polled;
}
@Override public E poll() {
for (E element : this) {
// Not convinced that removing via iterator is viable (check this?)
if (map.remove(element) != null) {
return element;
}
}
return null;
}
@Override public E element() {
return iterator().next();
}
@Override public E peek() {
Iterator<E> iterator = iterator();
return iterator.hasNext() ? iterator.next() : null;
}
}
При таком подходе не все так радужно. У нас нет достойного способа выбрать головной элемент, кроме использования entrySet().iterator().next()
опорной карты, в результате чего карта со временем становится все более несбалансированной. Эта несбалансированность является проблемой как из-за большего количества столкновений ведер, так и из-за большего количества споров между сегментами.
Примечание: этот код использует Guava в нескольких местах.
Нет встроенного в коллекции, которая делает это. Есть несколько параллельных реализаций Set
, которые могут использоваться вместе с параллельной Queue
.
Например, элемент добавляется в очередь только после того, как он был успешно добавлен в набор, и каждый элемент, удаленный из очереди, удаляется из набора. В этом случае логически содержимое очереди действительно то, что есть в наборе, а очередь используется только для отслеживания порядка и обеспечения эффективных take ()
и poll ()
операции найдены только для BlockingQueue
.
java.util.concurrent.ConcurrentLinkedQueue поможет вам в большинстве случаев.
Оберните ConcurrentLinkedQueue вашим собственным классом, который проверяет уникальность добавления. Ваш код должен быть потокобезопасным.
Я бы использовал синхронизированный LinkedHashSet до тех пор, пока не появилось достаточно оснований для рассмотрения альтернатив. Основное преимущество, которое может предложить более параллельное решение, - это разделение блокировок.
Простейшим параллельным подходом будет ConcurrentHashMap (действующий как набор) и ConcurrentLinkedQueue. Порядок операций обеспечит желаемое ограничение. Offer () сначала выполнит CHM # putIfAbsent () и, в случае успеха, вставит в CLQ. Poll () берет из CLQ, а затем удаляет его из CHM. Это означает, что мы рассматриваем запись в нашей очереди, если она есть на карте, и CLQ обеспечивает порядок. Затем производительность можно было бы отрегулировать, увеличив уровень параллелизма карты. Если вы терпимы к дополнительной колоритности, тогда дешевый CHM # get () может выступить в качестве разумного предварительного условия (но он может пострадать, будучи немного устаревшим представлением).
Что вы подразумеваете под параллельной очередью с семантикой Set? Если вы имеете в виду действительно параллельную структуру (в отличие от потокобезопасной), то я бы сказал, что вы просите пони.
Что произойдет, например, если вы вызовете put (element)
и обнаружите, что что-то уже есть, что немедленно удаляется? Например, что в вашем случае означает, если offer (element) || queue.contains (element)
возвращает false
?
Такие вещи часто нужно рассматривать немного иначе в параллельном мире, поскольку часто все не так, как кажется, если вы не остановите мир (lock это вниз). В противном случае вы обычно смотрите на что-то в прошлом. Итак, что вы на самом деле пытаетесь сделать?