Ну, как насчет этого:
for (var key in myStringArray) {
console.log(myStringArray[key]);
}
Не поддерживается. DataFrameWriter
может либо добавить, либо перезаписать существующую таблицу. Если ваше приложение требует более сложной логики, вам придется иметь дело с этим вручную.
Один из вариантов - использовать действие (foreach
, foreachPartition
) со стандартным соединением JDBC. Еще один - записать во временное и обработать остальное непосредственно в базе данных.
KrisP имеет право на это. Лучший способ сделать upsert - это не подготовленное заявление. Важно отметить, что этот метод будет вставлять один за раз с таким количеством разделов, как количество работающих у вас. Если вы хотите сделать это в пакетном режиме, вы также можете
import java.sql._
dataframe.coalesce("NUMBER OF WORKERS").mapPartitions((d) => Iterator(d)).foreach { batch =>
val dbc: Connection = DriverManager.getConnection("JDBCURL")
val st: PreparedStatement = dbc.prepareStatement("YOUR PREPARED STATEMENT")
batch.grouped("# Of Rows you want per batch").foreach { session =>
session.foreach { x =>
st.setDouble(1, x.getDouble(1))
st.addBatch()
}
st.executeBatch()
}
dbc.close()
}
Это будет выполнять партии для каждого рабочего и закрыть соединение с БД. Это дает вам контроль над количеством рабочих, количеством партий и позволяет работать в этих пределах.
Если вы собираетесь делать это вручную и с помощью опции 1, упомянутой zero323, вы должны посмотреть исходный код Spark для инструкции insert здесь
def insertStatement(conn: Connection, table: String, rddSchema: StructType): PreparedStatement = {
val columns = rddSchema.fields.map(_.name).mkString(",")
val placeholders = rddSchema.fields.map(_ => "?").mkString(",")
val sql = s"INSERT INTO $table ($columns) VALUES ($placeholders)"
conn.prepareStatement(sql)
}
PreparedStatement
является частью java.sql
и имеет методы, подобные execute()
и executeUpdate()
. Разумеется, вам все равно придется изменить sql
.
Чтобы вставить JDBC, вы можете использовать
dataframe.write.mode(SaveMode.Append).jdbc(jdbc_url,table_name,connection_properties)
Кроме того, Dataframe.write предоставляет вам DataFrameWriter и имеет некоторые методы для вставки блока данных.
def insertInto(tableName: String): Unit
Вставляет содержимое DataFrame в указанную таблицу. Это требует, чтобы схема DataFrame была такой же, как схема таблицы.
Поскольку она вставляет данные в существующую таблицу, формат или параметры будут игнорироваться.
http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.DataFrameWriter
Пока ничего не удалось обновить отдельные записи из коробки из искры, хотя