Чтобы обновить таблицу базы данных, используя SparkSQL [duplicate]

Ну, как насчет этого:

for (var key in myStringArray) {
    console.log(myStringArray[key]);
}
9
задан void 6 January 2016 в 22:33
поделиться

4 ответа

Не поддерживается. DataFrameWriter может либо добавить, либо перезаписать существующую таблицу. Если ваше приложение требует более сложной логики, вам придется иметь дело с этим вручную.

Один из вариантов - использовать действие (foreach, foreachPartition) со стандартным соединением JDBC. Еще один - записать во временное и обработать остальное непосредственно в базе данных.

10
ответ дан zero323 5 September 2018 в 11:11
поделиться

KrisP имеет право на это. Лучший способ сделать upsert - это не подготовленное заявление. Важно отметить, что этот метод будет вставлять один за раз с таким количеством разделов, как количество работающих у вас. Если вы хотите сделать это в пакетном режиме, вы также можете

import java.sql._
dataframe.coalesce("NUMBER OF WORKERS").mapPartitions((d) => Iterator(d)).foreach { batch =>
val dbc: Connection = DriverManager.getConnection("JDBCURL")
val st: PreparedStatement = dbc.prepareStatement("YOUR PREPARED STATEMENT")

batch.grouped("# Of Rows you want per batch").foreach { session =>
  session.foreach { x =>
    st.setDouble(1, x.getDouble(1)) 
    st.addBatch()
  }
  st.executeBatch()
}
dbc.close()
  }

Это будет выполнять партии для каждого рабочего и закрыть соединение с БД. Это дает вам контроль над количеством рабочих, количеством партий и позволяет работать в этих пределах.

8
ответ дан jstuartmill 5 September 2018 в 11:11
поделиться

Если вы собираетесь делать это вручную и с помощью опции 1, упомянутой zero323, вы должны посмотреть исходный код Spark для инструкции insert здесь

  def insertStatement(conn: Connection, table: String, rddSchema: StructType): PreparedStatement = {
    val columns = rddSchema.fields.map(_.name).mkString(",")
    val placeholders = rddSchema.fields.map(_ => "?").mkString(",")
    val sql = s"INSERT INTO $table ($columns) VALUES ($placeholders)"
    conn.prepareStatement(sql)
  }

PreparedStatement является частью java.sql и имеет методы, подобные execute() и executeUpdate(). Разумеется, вам все равно придется изменить sql.

7
ответ дан KrisP 5 September 2018 в 11:11
поделиться

Чтобы вставить JDBC, вы можете использовать

dataframe.write.mode(SaveMode.Append).jdbc(jdbc_url,table_name,connection_properties)

Кроме того, Dataframe.write предоставляет вам DataFrameWriter и имеет некоторые методы для вставки блока данных.

def insertInto(tableName: String): Unit

Вставляет содержимое DataFrame в указанную таблицу. Это требует, чтобы схема DataFrame была такой же, как схема таблицы.

Поскольку она вставляет данные в существующую таблицу, формат или параметры будут игнорироваться.

http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.DataFrameWriter

Пока ничего не удалось обновить отдельные записи из коробки из искры, хотя

2
ответ дан Soumitra 5 September 2018 в 11:11
поделиться
Другие вопросы по тегам:

Похожие вопросы: