параллелизм Java: много устройств записи, одно средство чтения

Я должен собрать некоторую статистику в своем программном обеспечении, и я пытаюсь сделать его быстро и корректный, который не легок (для меня!)

сначала мой код до сих пор с двумя классами, StatsService и StatsHarvester

public class StatsService
{
private Map   stats   = new HashMap(1000);

public void notify ( String key )
{
    Long value = 1l;
    synchronized (stats)
    {
        if (stats.containsKey(key))
        {
            value = stats.get(key) + 1;
        }
        stats.put(key, value);
    }
}

public Map getStats ( )
{
    Map copy;
    synchronized (stats)
    {
        copy = new HashMap(stats);
        stats.clear();
    }
    return copy;
}
}

это - мой второй класс, комбайн, который время от времени собирает статистику и пишет им в базу данных.

public class StatsHarvester implements Runnable
{
private StatsService    statsService;
private Thread          t;

public void init ( )
{
    t = new Thread(this);
    t.start();
}

public synchronized void run ( )
{
    while (true)
    {
        try
        {
            wait(5 * 60 * 1000); // 5 minutes
            collectAndSave();
        }
        catch (InterruptedException e)
        {
            e.printStackTrace();
        }
    }
}

private void collectAndSave ( )
{
    Map stats = statsService.getStats();
    // do something like:
    // saveRecords(stats);
}
}

Во времени выполнения это будет иметь приблизительно 30 параллельных рабочих потоков каждым вызовом notify(key) приблизительно 100 раз. Только один StatsHarvester звонит statsService.getStats()

Таким образом, у меня есть много устройств записи и только одно средство чтения. было бы хорошо иметь точную статистику, но я не забочусь, потеряны ли некоторые записи на высоком параллелизме.

Читатель должен работать каждые 5 минут или независимо от того, что разумно.

Запись должна быть максимально быстро. Чтение должно быть быстрым, но если оно блокирует приблизительно для 300 мс каждые 5 минут, его штраф.

Я прочитал много документов (параллелизм Java на практике, эффективный Java и так далее), но у меня есть сильное чувство, что мне нужен Ваш совет разобраться в нем.

Я надеюсь, что заявил свою проблему, ясную и достаточно короткую для получения ценной справки.


Править

Благодаря всем для Ваших подробных и полезных ответов. Поскольку я ожидал, что существует больше чем один способ сделать это.

Я протестировал большинство Ваших предложений (те, которых я понял), и загрузил тестовый проект погуглить код для дальнейшей ссылки (проект знатока)

http://code.google.com/p/javastats/

Я протестировал различные реализации своего StatsService

  • HashMapStatsService (HMSS)
  • ConcurrentHashMapStatsService (CHMSS)
  • LinkedQueueStatsService (LQSS)
  • GoogleStatsService (GSS)
  • ExecutorConcurrentHashMapStatsService (ECHMSS)
  • ExecutorHashMapStatsService (EHMSS)

и я протестировал их с x количество Потоков каждый вызов уведомляет y времена, результаты находятся в мс

         10,100   10,1000  10,5000  50,100   50,1000  50,5000  100,100  100,1000 100,5000 
GSS       1        5        17       7        21       117      7        37       254       Summe: 466
ECHMSS    1        6        21       5        32       132      8        54       249       Summe: 508
HMSS      1        8        45       8        52       233      11       103      449       Summe: 910
EHMSS     1        5        24       7        31       113      8        67       235       Summe: 491
CHMSS     1        2        9        3        11       40       7        26       72        Summe: 171
LQSS      0        3        11       3        16       56       6        27       144       Summe: 266

В данный момент я думаю, что буду использовать ConcurrentHashMap, поскольку он предлагает хорошую производительность, в то время как довольно легко понять.

Спасибо за весь Ваш вход! Janning

17
задан Janning 31 March 2010 в 09:49
поделиться

7 ответов

Поскольку Джек ускользнул от вас можно использовать библиотеку java.util.concurrent, которая включает ConcurrentHashMap и AtomicLong. Вы можете вставить AtomicLong, если его нет, вы можете увеличить значение. Поскольку AtomicLong является потокобезопасным, вы сможете увеличивать переменную, не беспокоясь о проблемах параллелизма.

public void notify(String key) {
    AtomicLong value = stats.get(key);
    if (value == null) {
        value = stats.putIfAbsent(key, new AtomicLong(1));
    }
    if (value != null) {
        value.incrementAndGet();
    }
}

Это должно быть как быстрым, так и потокобезопасным.

Править: Немного отредактирован, так что есть не более двух поисков.

16
ответ дан 30 November 2019 в 11:43
поделиться

Ответ Криса Дейла выглядит хорошим подходом.

Другой альтернативой может быть использование параллельного Multiset . Один из них находится в библиотеке Google Collections . Вы можете использовать это следующим образом:

private Multiset<String> stats = ConcurrentHashMultiset.create();

public void notify ( String key )
{
    stats.add(key, 1);
}

Если посмотреть на источник , это реализовано с помощью ConcurrentHashMap и с использованием putIfAbsent и версии с тремя аргументами замените , чтобы обнаружить одновременные изменения и повторить попытку.

4
ответ дан 30 November 2019 в 11:43
поделиться

Почему бы вам не использовать java.util.concurrent.ConcurrentHashMap ? Он обрабатывает все внутренне, избегая бесполезных блокировок на карте и экономя вам много работы: вам не придется заботиться о синхронизации при получении и размещении ..

Из документации:

Хеш-таблица, поддерживающая полный параллелизм получение и регулируемый ожидаемый параллелизм для обновлений. Этот класс подчиняется той же функциональной спецификации, что и Hashtable, и включает версии методов, соответствующие каждому методу Hashtable. Однако, несмотря на то, что все операции являются потокобезопасными, операции извлечения не влекут за собой блокировку , и нет никакой поддержки для блокировки всей таблицы таким образом, чтобы предотвратить любой доступ.

Вы можете указать его уровень параллелизма :

Разрешенный параллелизм между операциями обновления определяется необязательным аргументом конструктора concurrencyLevel (по умолчанию 16), который используется в качестве подсказки для внутреннего изменения размера . Таблица внутренне разделена на разделы, чтобы попытаться разрешить указанное количество одновременных обновлений без конкуренции. Поскольку размещение в хеш-таблицах по существу случайное, фактический параллелизм будет отличаться. В идеале, вы должны выбрать значение, позволяющее разместить столько потоков, сколько когда-либо будет одновременно изменять таблицу . Использование значительно большего значения, чем нужно, может привести к потере места и времени, а значительно меньшее значение может привести к конфликту потоков. Но завышенные и недооцененные в пределах порядка величины обычно не оказывают заметного влияния.Значение один подходит, когда известно, что только один поток будет изменять, а все остальные будут только читать. Кроме того, изменение размера этой или любой другой хэш-таблицы является относительно медленной операцией, поэтому, когда это возможно, рекомендуется предоставлять оценки ожидаемых размеров таблиц в конструкторах.

Как указано в комментариях, внимательно прочтите документацию ConcurrentHashMap , особенно когда в ней говорится об атомарных или неатомарных операциях.

Чтобы получить гарантию атомарности, вы должны учитывать, какие операции являются атомарными. Из интерфейса ConcurrentMap вы узнаете, что:

V putIfAbsent(K key, V value)
V replace(K key, V value)
boolean replace(K key,V oldValue, V newValue)
boolean remove(Object key, Object value)

можно использовать безопасно.

8
ответ дан 30 November 2019 в 11:43
поделиться

Я бы предложил взглянуть на библиотеку util.concurrent в Java. Я думаю, вы сможете реализовать это решение намного чище. Я не думаю, что здесь вообще нужна карта. Я бы рекомендовал реализовать это с помощью ConcurrentLinkedQueue. Каждый "производитель" может свободно писать в эту очередь, не беспокоясь о других. Он может поместить в очередь объект с данными для своей статистики.

Сборщик может потреблять очередь, постоянно извлекая данные и обрабатывая их. Затем он может хранить их так, как ему нужно.

5
ответ дан 30 November 2019 в 11:43
поделиться

Другой подход к проблеме заключается в использовании (тривиальной) безопасности потоков с помощью ограничения потоков. По сути, создайте один фоновый поток, который заботится как о чтении, так и о записи. Имеет довольно неплохие характеристики с точки зрения масштабируемости и простоты.

Идея состоит в том, что вместо того, чтобы все потоки пытались обновить данные напрямую, они создают задачу «обновления» для обработки фоновым потоком. Этот же поток также может выполнять задачу чтения, если допустимо некоторое отставание в обработке обновлений.

Этот дизайн довольно хорош, потому что потокам больше не придется конкурировать за блокировку для обновления данных, а поскольку карта ограничена одним потоком, вы можете просто использовать простой HashMap для получения / размещения и т. Д. С точки зрения реализации, это будет означать создание однопоточного исполнителя и отправку задач записи, которые также могут выполнять необязательную операцию «collectAndSave».

Набросок кода может выглядеть следующим образом:

public class StatsService {
    private ExecutorService executor = Executors.newSingleThreadExecutor();
    private final Map<String,Long> stats = new HashMap<String,Long>();

    public void notify(final String key) {
        Runnable r = new Runnable() {
            public void run() {
                Long value = stats.get(key);
                if (value == null) {
                    value = 1L;
                } else {
                    value++;
                }
                stats.put(key, value);
                // do the optional collectAndSave periodically
                if (timeToDoCollectAndSave()) {
                    collectAndSave();
                }
            }
        };
        executor.execute(r);
    }
}

Существует BlockingQueue, связанный с исполнителем, и каждый поток, который создает задачу для StatsService, использует BlockingQueue. Ключевой момент заключается в следующем: продолжительность блокировки для этой операции должна быть намного короче , чем продолжительность блокировки в исходном коде, поэтому конкуренция должна быть намного меньше. В целом это должно привести к гораздо большей пропускной способности и задержке.

Еще одно преимущество состоит в том, что, поскольку только один поток читает и записывает в карту, можно использовать простой HashMap и примитивный длинный тип (без использования ConcurrentHashMap или атомарных типов). Это также значительно упрощает код, который на самом деле его обрабатывает.

Надеюсь, это поможет.

3
ответ дан 30 November 2019 в 11:43
поделиться

Если мы проигнорируем часть сбора урожая и сосредоточимся на записи, основным узким местом программы является то, что статистика фиксируется на очень грубом уровне детализации. Если два потока хотят обновить разные ключи, они должны ждать.

Если вы заранее знаете набор ключей и можете предварительно инициализировать карту так, чтобы к моменту прихода потока обновления ключ гарантированно существовал, вы могли бы сделать блокировку переменной accumulator вместо всей карты, или использовать безопасный для потока объект accumulator.

Вместо того, чтобы реализовывать это самостоятельно, существуют реализации map, разработанные специально для параллелизма и выполняющие эту более тонкую блокировку для вас.

Однако есть одно предостережение - это статистика, поскольку вам придется получать блокировки на все аккумуляторы примерно в одно и то же время. Если вы используете существующую карту с поддержкой параллелизма, возможно, существует конструкция для получения моментального снимка.

0
ответ дан 30 November 2019 в 11:43
поделиться

Вы изучали ScheduledThreadPoolExecutor ? Вы можете использовать это для планирования ваших писателей, которые все могут писать в параллельную коллекцию, такую ​​как ConcurrentLinkedQueue , упомянутую @Chris Dail. У вас может быть отдельно запланированное задание для чтения из очереди по мере необходимости, и Java SDK должен обрабатывать практически все ваши проблемы параллелизма, не требуя ручной блокировки.

1
ответ дан 30 November 2019 в 11:43
поделиться
Другие вопросы по тегам:

Похожие вопросы: