Pyspark - обновление определенных столбцов в таблице mysql [дубликат]

Хм ... Я удивлен, что компилирует (это, я проверил). Я не знаю никаких гарантий, которые сделают это безопасным. Использовать статический конструктор ...


Изменить: я принимаю (см. лучший ответ выше ), что он будет работать; но моя идея с кодом заключается в том, чтобы сделать его максимально простым и очевидным. Если не очевидно, что он будет работать (и не может быть, если вам нужно спросить), тогда не пишите его таким образом ...

В частности, проблемы с использованием поля order:

  • он может сломаться, если вы перемещаете код (что я часто делаю)
  • он может сломаться, если вы разделите код на классы partial

Мой совет остается: использовать статический конструктор для этого сценария.

12
задан nicola 25 February 2016 в 19:02
поделиться

3 ответа

Это невозможно. На данный момент (Spark 1.6.0 / 2.2.0 SNAPSHOT) Spark DataFrameWriter поддерживает только четыре режима записи:

  • SaveMode.Overwrite: перезаписать существующие данные.
  • SaveMode.Append: добавьте данные.
  • SaveMode.Ignore: проигнорируйте операцию (т. е. нет-op).
  • SaveMode.ErrorIfExists: параметр по умолчанию, выдайте исключение в runtime.

Вы можете вставить вручную, например, с помощью mapPartitions (так как вы хотите, чтобы операция UPSERT была идемпотентной и как такая простая в использовании), напишите во временную таблицу и выполнить перезагрузку вручную или использовать триггеры.

В целом достижение поведения при перезагрузке для пакетных операций и поддержание достойной производительности далеко не тривиально. Вы должны помнить, что в общем случае будет выполняться несколько параллельных транзакций (по одному на каждый раздел), поэтому вы должны убедиться, что конфликты записи не будут (как правило, с использованием конкретного раздела) или предоставить соответствующие процедуры восстановления. На практике может быть лучше выполнить и пакетную запись во временную таблицу и разрешить часть upsert непосредственно в базе данных.

16
ответ дан zero323 19 August 2018 в 05:19
поделиться

Жаль, что в Spark нет режима SaveMode.Upsert для таких довольно распространенных случаев, как upserting.

zero322 в целом прав, но я думаю, что это должно быть возможным (с компромиссом в производительности) предложите такую ​​функцию замены.

Я также хотел предоставить некоторый Java-код для этого случая. Конечно, это не то, что исполнитель, как встроенный, из искры - но это должно быть хорошей основой для ваших требований. Просто измените его по своему усмотрению:

myDF.repartition(20); //one connection per partition, see below

myDF.foreachPartition((Iterator<Row> t) -> {
            Connection conn = DriverManager.getConnection(
                    Constants.DB_JDBC_CONN,
                    Constants.DB_JDBC_USER,
                    Constants.DB_JDBC_PASS);

            conn.setAutoCommit(true);
            Statement statement = conn.createStatement();

            final int batchSize = 100000;
            int i = 0;
            while (t.hasNext()) {
                Row row = t.next();
                try {
                    // better than REPLACE INTO, less cycles
                    statement.addBatch(("INSERT INTO mytable " + "VALUES ("
                            + "'" + row.getAs("_id") + "', 
                            + "'" + row.getStruct(1).get(0) + "'
                            + "')  ON DUPLICATE KEY UPDATE _id='" + row.getAs("_id") + "';"));
                    //conn.commit();

                    if (++i % batchSize == 0) {
                        statement.executeBatch();
                    }
                } catch (SQLIntegrityConstraintViolationException e) {
                    //should not occur, nevertheless
                    //conn.commit();
                } catch (SQLException e) {
                    e.printStackTrace();
                } finally {
                    //conn.commit();
                    statement.executeBatch();
                }
            }
            int[] ret = statement.executeBatch();

            System.out.println("Ret val: " + Arrays.toString(ret));
            System.out.println("Update count: " + statement.getUpdateCount());
            conn.commit();

            statement.close();
            conn.close();
0
ответ дан Aydin K. 19 August 2018 в 05:19
поделиться

Ответ на нуль323 прав, я просто хотел добавить, что вы можете использовать пакет JayDeBeApi для обхода этого: https://pypi.python.org/pypi/JayDeBeApi/

для обновления данных в вашей таблице mysql. Это может быть плохо висящий плод, так как у вас уже установлен драйвер mysql jdbc.

Модуль JayDeBeApi позволяет вам подключаться из кода Python к базам данных с помощью Java JDBC. Он предоставляет Python DB-API v2.0 для этой базы данных.

Мы используем дистрибутив Anaconda для Python, а пакет python для JayDeBeApi является стандартным.

См. Примеры в этом ссылка выше.

0
ответ дан Tagar 19 August 2018 в 05:19
поделиться
Другие вопросы по тегам:

Похожие вопросы: