Хм ... Я удивлен, что компилирует (это, я проверил). Я не знаю никаких гарантий, которые сделают это безопасным. Использовать статический конструктор ...
Изменить: я принимаю (см. лучший ответ выше ), что он будет работать; но моя идея с кодом заключается в том, чтобы сделать его максимально простым и очевидным. Если не очевидно, что он будет работать (и не может быть, если вам нужно спросить), тогда не пишите его таким образом ...
В частности, проблемы с использованием поля order:
partial
Мой совет остается: использовать статический конструктор для этого сценария.
Это невозможно. На данный момент (Spark 1.6.0 / 2.2.0 SNAPSHOT) Spark DataFrameWriter
поддерживает только четыре режима записи:
blockquote>
SaveMode.Overwrite
: перезаписать существующие данные.SaveMode.Append
: добавьте данные.SaveMode.Ignore
: проигнорируйте операцию (т. е. нет-op).SaveMode.ErrorIfExists
: параметр по умолчанию, выдайте исключение в runtime.Вы можете вставить вручную, например, с помощью
mapPartitions
(так как вы хотите, чтобы операция UPSERT была идемпотентной и как такая простая в использовании), напишите во временную таблицу и выполнить перезагрузку вручную или использовать триггеры.В целом достижение поведения при перезагрузке для пакетных операций и поддержание достойной производительности далеко не тривиально. Вы должны помнить, что в общем случае будет выполняться несколько параллельных транзакций (по одному на каждый раздел), поэтому вы должны убедиться, что конфликты записи не будут (как правило, с использованием конкретного раздела) или предоставить соответствующие процедуры восстановления. На практике может быть лучше выполнить и пакетную запись во временную таблицу и разрешить часть upsert непосредственно в базе данных.
Жаль, что в Spark нет режима SaveMode.Upsert
для таких довольно распространенных случаев, как upserting.
zero322 в целом прав, но я думаю, что это должно быть возможным (с компромиссом в производительности) предложите такую функцию замены.
Я также хотел предоставить некоторый Java-код для этого случая. Конечно, это не то, что исполнитель, как встроенный, из искры - но это должно быть хорошей основой для ваших требований. Просто измените его по своему усмотрению:
myDF.repartition(20); //one connection per partition, see below
myDF.foreachPartition((Iterator<Row> t) -> {
Connection conn = DriverManager.getConnection(
Constants.DB_JDBC_CONN,
Constants.DB_JDBC_USER,
Constants.DB_JDBC_PASS);
conn.setAutoCommit(true);
Statement statement = conn.createStatement();
final int batchSize = 100000;
int i = 0;
while (t.hasNext()) {
Row row = t.next();
try {
// better than REPLACE INTO, less cycles
statement.addBatch(("INSERT INTO mytable " + "VALUES ("
+ "'" + row.getAs("_id") + "',
+ "'" + row.getStruct(1).get(0) + "'
+ "') ON DUPLICATE KEY UPDATE _id='" + row.getAs("_id") + "';"));
//conn.commit();
if (++i % batchSize == 0) {
statement.executeBatch();
}
} catch (SQLIntegrityConstraintViolationException e) {
//should not occur, nevertheless
//conn.commit();
} catch (SQLException e) {
e.printStackTrace();
} finally {
//conn.commit();
statement.executeBatch();
}
}
int[] ret = statement.executeBatch();
System.out.println("Ret val: " + Arrays.toString(ret));
System.out.println("Update count: " + statement.getUpdateCount());
conn.commit();
statement.close();
conn.close();
Ответ на нуль323 прав, я просто хотел добавить, что вы можете использовать пакет JayDeBeApi для обхода этого: https://pypi.python.org/pypi/JayDeBeApi/
для обновления данных в вашей таблице mysql. Это может быть плохо висящий плод, так как у вас уже установлен драйвер mysql jdbc.
Модуль JayDeBeApi позволяет вам подключаться из кода Python к базам данных с помощью Java JDBC. Он предоставляет Python DB-API v2.0 для этой базы данных.
Мы используем дистрибутив Anaconda для Python, а пакет python для JayDeBeApi является стандартным.
См. Примеры в этом ссылка выше.