Вот мой сценарий:
У меня есть главный актер, который получает сообщения от нескольких дочерних актеров. Эти сообщения содержат данные для агрегирования. В этой логике агрегации нужно ли мне заботиться о проблемах синхронизации, если я использую общую структуру данных для сбора агрегации?
else if(arg0 instanceof ReducedMsg){
ReducedMsg reduced = (ReducedMsg)arg0;
counter.decrementAndGet();
synchronized(finalResult){
finalResult.add((KeyValue<K, V>) reduced.getReduced());
if(counter.get() == 0){
if(checkAndReduce(finalResult)){
finalResult.clear();
}
else{
stop();
latch.countDown();
}
}
}
}
Итак, как вы можете видеть, у меня есть finalResult, к которому будет агрегировано каждое сообщение, и после логики обработки коллекция также должна быть очищена.
На самом деле то, что я пытаюсь реализовать, — это рекурсивная (ассоциативная )редукция mapreduce. Итак, мне нужно сохранить синхронизированный блок, как я полагаю? Или случайно Akka выполняет onReceive по одному потоку за раз?
Эта логика дает точный и предсказуемый результат на небольшом наборе данных. Моя проблема в том, что когда мой набор входных данных немного велик, код зависает. Я хочу быть уверен, что это из-за переключения контекста для моего блока синхронизации, чтобы я мог углубиться в другой дизайн.