Предложение else
выполняется, если вы выходите из блока в обычном режиме, нажав на условие цикла или опустив нижнюю часть блока try. Это not выполняется, если вы break
или return
вышли из блока или вызвали исключение. Он работает не только во время и за циклами, но и пытается блокировать.
Обычно вы находите его в местах, где обычно вы выходите из цикла раньше, а отключение конца цикла является неожиданным / необычным повод. Например, если вы просматриваете список, ищущий значение:
for value in values:
if value == 5:
print "Found it!"
break
else:
print "Nowhere to be found. :-("
Не поддерживается. DataFrameWriter
может либо добавить, либо перезаписать существующую таблицу. Если ваше приложение требует более сложной логики, вам придется иметь дело с этим вручную.
Один из вариантов - использовать действие (foreach
, foreachPartition
) со стандартным соединением JDBC. Еще один - записать во временное и обработать остальное непосредственно в базе данных.
KrisP имеет право на это. Лучший способ сделать upsert - это не подготовленное заявление. Важно отметить, что этот метод будет вставлять один за раз с таким количеством разделов, как количество работающих у вас. Если вы хотите сделать это в пакетном режиме, вы также можете
import java.sql._
dataframe.coalesce("NUMBER OF WORKERS").mapPartitions((d) => Iterator(d)).foreach { batch =>
val dbc: Connection = DriverManager.getConnection("JDBCURL")
val st: PreparedStatement = dbc.prepareStatement("YOUR PREPARED STATEMENT")
batch.grouped("# Of Rows you want per batch").foreach { session =>
session.foreach { x =>
st.setDouble(1, x.getDouble(1))
st.addBatch()
}
st.executeBatch()
}
dbc.close()
}
Это будет выполнять партии для каждого рабочего и закрыть соединение с БД. Это дает вам контроль над количеством рабочих, количеством партий и позволяет работать в этих пределах.
Если вы собираетесь делать это вручную и с помощью опции 1, упомянутой zero323, вы должны посмотреть исходный код Spark для инструкции insert здесь
def insertStatement(conn: Connection, table: String, rddSchema: StructType): PreparedStatement = {
val columns = rddSchema.fields.map(_.name).mkString(",")
val placeholders = rddSchema.fields.map(_ => "?").mkString(",")
val sql = s"INSERT INTO $table ($columns) VALUES ($placeholders)"
conn.prepareStatement(sql)
}
PreparedStatement
является частью java.sql
и имеет методы, подобные execute()
и executeUpdate()
. Разумеется, вам все равно придется изменить sql
.
Чтобы вставить JDBC, вы можете использовать
dataframe.write.mode(SaveMode.Append).jdbc(jdbc_url,table_name,connection_properties)
Кроме того, Dataframe.write предоставляет вам DataFrameWriter и имеет некоторые методы для вставки блока данных.
def insertInto(tableName: String): Unit
Вставляет содержимое DataFrame в указанную таблицу. Это требует, чтобы схема DataFrame была такой же, как схема таблицы.
Поскольку она вставляет данные в существующую таблицу, формат или параметры будут игнорироваться.
http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.DataFrameWriter
Пока ничего не удалось обновить отдельные записи из коробки из искры, хотя