Обновления аккумуляторов отправляются обратно в драйвер, когда задача успешно завершена. Таким образом, результаты вашего аккумулятора гарантированы, если вы уверены, что каждая задача будет выполнена ровно один раз, и каждая задача будет выполнена так, как вы ожидали.
Я предпочитаю полагаться на reduce
и aggregate
вместо
val acc = sc.accumulator(0)
val rdd = sc.parallelize(1 to 10, 2)
val accumulating = rdd.map { x => acc += 1; x }
accumulating.count
assert(acc == 10)
Будет ли это гарантировано правильным (не имеют дубликатов)?
blockquote >Да, если спекулятивное выполнение отключено.
map
иcount
будут на одном этапе, так что, как вы говорите, нет возможности успешно выполнить задачу более одного раза.Но аккумулятор обновляется как боковое, эффект. Поэтому вы должны быть очень осторожны, думая о том, как будет выполняться код. Рассмотрим это вместо
accumulating.count
:// Same setup as before. accumulating.mapPartitions(p => Iterator(p.next)).collect assert(acc == 2)
Это также создаст одну задачу для каждого раздела, и каждая задача будет гарантирована выполнить ровно один раз. Но код в
map
не будет выполнен для всех элементов, только первый в каждом разделе.Аккумулятор похож на глобальную переменную. Если вы разделяете ссылку на RDD, которая может наращивать аккумулятор, тогда другой код (другие потоки) может также привести к его увеличению.
// Same setup as before. val x = new X(accumulating) // We don't know what X does. // It may trigger the calculation // any number of times. accumulating.count assert(acc >= 10)
Для этого вам нужно сделать две вещи:
См. Ниже, как это сделать:
Вызов Thread.setDefaultUncaughtExceptionHandler ()
, чтобы перехватить все неперехваченные исключения, и в этом случае будет вызван метод uncaughtException ()
. "Принудительное закрытие" не появится, и приложение не будет отвечать, что не очень хорошо.
Чтобы перезапустить приложение в случае сбоя, вы должны сделать следующее:
В методе onCreate
в вашем основном действии инициализировать член PendingIntent
:
Intent intent = PendingIntent.getActivity (
YourApplication.getInstance (). GetBaseContext (),
0,
новое намерение (getIntent ()),
getIntent (). getFlags ());
Затем поместите в свой метод uncaughtException ()
следующее:
AlarmManager mgr = (AlarmManager) getSystemService(Context.ALARM_SERVICE);
mgr.set(AlarmManager.RTC, System.currentTimeMillis() + 2000, intent);
System.exit(2);
Вы также должны вызвать System.exit ()
, иначе работать не будет.
Таким образом, ваше приложение перезапустится через 2 секунды.
В конце концов вы можете установить какой-нибудь флаг в своем намерении, что приложение разбилось, и в своем методе onCreate ()
вы можете показать диалог «Извините, приложение разбилось, надеюсь, больше никогда :)» .
Хитрость в том, чтобы в первую очередь не было принудительного закрытия.
Если вы используете метод Thread.setDefaultUncaughtExceptionHandler ()
, вы можете перехватить исключения, из-за которых ваше приложение принудительно закрывается.
Посмотрите на этот вопрос , чтобы увидеть пример использования UncaughtExceptionHandler
для регистрации Исключений, вызванных приложением.