Параллельная очередь набора

Возможно, это - глупый вопрос, но я, может казаться, не нахожу очевидный ответ.

Мне нужна параллельная очередь FIFO, которая содержит только уникальные значения. Попытка добавить значение, которое уже существует в очереди просто, игнорирует то значение. Который, если бы не потокобезопасность было бы тривиально. Существует ли структура данных в Java или возможно коде snipit в межсетях, который показывает это поведение?

33
задан Steve Skrla 30 June 2010 в 21:45
поделиться

5 ответов

Если вам нужен более высокий параллелизм, чем полная синхронизация, есть один известный мне способ сделать это, используя ConcurrentHashMap в качестве опорной карты. Ниже приведен только набросок.

public final class ConcurrentHashSet<E> extends ForwardingSet<E>
    implements Set<E>, Queue<E> {
  private enum Dummy { VALUE }

  private final ConcurrentMap<E, Dummy> map;

  ConcurrentHashSet(ConcurrentMap<E, Dummy> map) {
    super(map.keySet());
    this.map = Preconditions.checkNotNull(map);
  }

  @Override public boolean add(E element) {
    return map.put(element, Dummy.VALUE) == null;
  }

  @Override public boolean addAll(Collection<? extends E> newElements) {
    // just the standard implementation
    boolean modified = false;
    for (E element : newElements) {
      modified |= add(element);
    }
    return modified;
  }

  @Override public boolean offer(E element) {
    return add(element);
  }

  @Override public E remove() {
    E polled = poll();
    if (polled == null) {
      throw new NoSuchElementException();
    }
    return polled;
  }

  @Override public E poll() {
    for (E element : this) {
      // Not convinced that removing via iterator is viable (check this?)
      if (map.remove(element) != null) {
        return element;
      }
    }
    return null;
  }

  @Override public E element() {
    return iterator().next();
  }

  @Override public E peek() {
    Iterator<E> iterator = iterator();
    return iterator.hasNext() ? iterator.next() : null;
  }
}

При таком подходе не все так радужно. У нас нет достойного способа выбрать головной элемент, кроме использования entrySet().iterator().next() опорной карты, в результате чего карта со временем становится все более несбалансированной. Эта несбалансированность является проблемой как из-за большего количества столкновений ведер, так и из-за большего количества споров между сегментами.

Примечание: этот код использует Guava в нескольких местах.

6
ответ дан 27 November 2019 в 19:34
поделиться

Нет встроенного в коллекции, которая делает это. Есть несколько параллельных реализаций Set , которые могут использоваться вместе с параллельной Queue .

Например, элемент добавляется в очередь только после того, как он был успешно добавлен в набор, и каждый элемент, удаленный из очереди, удаляется из набора. В этом случае логически содержимое очереди действительно то, что есть в наборе, а очередь используется только для отслеживания порядка и обеспечения эффективных take () и poll () операции найдены только для BlockingQueue .

5
ответ дан 27 November 2019 в 19:34
поделиться

java.util.concurrent.ConcurrentLinkedQueue поможет вам в большинстве случаев.

Оберните ConcurrentLinkedQueue вашим собственным классом, который проверяет уникальность добавления. Ваш код должен быть потокобезопасным.

4
ответ дан 27 November 2019 в 19:34
поделиться

Я бы использовал синхронизированный LinkedHashSet до тех пор, пока не появилось достаточно оснований для рассмотрения альтернатив. Основное преимущество, которое может предложить более параллельное решение, - это разделение блокировок.

Простейшим параллельным подходом будет ConcurrentHashMap (действующий как набор) и ConcurrentLinkedQueue. Порядок операций обеспечит желаемое ограничение. Offer () сначала выполнит CHM # putIfAbsent () и, в случае успеха, вставит в CLQ. Poll () берет из CLQ, а затем удаляет его из CHM. Это означает, что мы рассматриваем запись в нашей очереди, если она есть на карте, и CLQ обеспечивает порядок. Затем производительность можно было бы отрегулировать, увеличив уровень параллелизма карты. Если вы терпимы к дополнительной колоритности, тогда дешевый CHM # get () может выступить в качестве разумного предварительного условия (но он может пострадать, будучи немного устаревшим представлением).

4
ответ дан 27 November 2019 в 19:34
поделиться

Что вы подразумеваете под параллельной очередью с семантикой Set? Если вы имеете в виду действительно параллельную структуру (в отличие от потокобезопасной), то я бы сказал, что вы просите пони.

Что произойдет, например, если вы вызовете put (element) и обнаружите, что что-то уже есть, что немедленно удаляется? Например, что в вашем случае означает, если offer (element) || queue.contains (element) возвращает false ?

Такие вещи часто нужно рассматривать немного иначе в параллельном мире, поскольку часто все не так, как кажется, если вы не остановите мир (lock это вниз). В противном случае вы обычно смотрите на что-то в прошлом. Итак, что вы на самом деле пытаетесь сделать?

2
ответ дан 27 November 2019 в 19:34
поделиться
Другие вопросы по тегам:

Похожие вопросы: