Вы можете ввести новый массив столбцов - когда значение = 2, затем Array (-1,0) или Array (0), затем взорвать этот столбец и добавить его с отметкой времени в секундах. Ниже следует работать для вас. Проверьте это:
scala> val df = Seq((1,"15:00:01",3),(1,"17:04:02",2)).toDF("id","timestamp","value")
df: org.apache.spark.sql.DataFrame = [id: int, timestamp: string ... 1 more field]
scala> val df2 = df.withColumn("timestamp",'timestamp.cast("timestamp"))
df2: org.apache.spark.sql.DataFrame = [id: int, timestamp: timestamp ... 1 more field]
scala> df2.show(false)
+---+-------------------+-----+
|id |timestamp |value|
+---+-------------------+-----+
|1 |2019-03-04 15:00:01|3 |
|1 |2019-03-04 17:04:02|2 |
+---+-------------------+-----+
scala> val df3 = df2.withColumn("newc", when($"value"===lit(2),lit(Array(-1,0))).otherwise(lit(Array(0))))
df3: org.apache.spark.sql.DataFrame = [id: int, timestamp: timestamp ... 2 more fields]
scala> df3.show(false)
+---+-------------------+-----+-------+
|id |timestamp |value|newc |
+---+-------------------+-----+-------+
|1 |2019-03-04 15:00:01|3 |[0] |
|1 |2019-03-04 17:04:02|2 |[-1, 0]|
+---+-------------------+-----+-------+
scala> val df4 = df3.withColumn("c_explode",explode('newc)).withColumn("timestamp2",to_timestamp(unix_timestamp('timestamp)+'c_explode))
df4: org.apache.spark.sql.DataFrame = [id: int, timestamp: timestamp ... 4 more fields]
scala> df4.select($"id",$"timestamp2",$"value").show(false)
+---+-------------------+-----+
|id |timestamp2 |value|
+---+-------------------+-----+
|1 |2019-03-04 15:00:01|3 |
|1 |2019-03-04 17:04:01|2 |
|1 |2019-03-04 17:04:02|2 |
+---+-------------------+-----+
scala>
Если вы хотите, чтобы часть времени оставалась одна, то вы можете сделать как
scala> df4.withColumn("timestamp",from_unixtime(unix_timestamp('timestamp2),"HH:mm:ss")).select($"id",$"timestamp",$"value").show(false)
+---+---------+-----+
|id |timestamp|value|
+---+---------+-----+
|1 |15:00:01 |3 |
|1 |17:04:01 |2 |
|1 |17:04:02 |2 |
+---+---------+-----+
На самом деле это очень возможно. использовать SQL CE вместо полноценного SQL Server, изменяя только параметры конфигурации: измените строку подключения и по возможности используйте интерфейсы семейства IDbXXX
вместо платформо-зависимых SqlXXX
и SqlCeXXX
. См. DbProviderFactories .
Однако следует помнить о различиях в диалектах SQL этих двух платформ.
Все связанные с SQL объекты, которые вам нужны для любой БД, наследуются от базового абстрактного Db ... (т. Е. DbConnection, DbDataAdapter и т. Д.). Поэтому вы можете написать некоторый класс DatabaseManager, который при создании экземпляра должен знать, имеете ли вы дело с Sql или Sql Ce. Затем с этого момента вы имеете дело только с объектами базового класса (DbConnection и т. Д.). Таким образом, все, что вам нужно каждый раз менять, - это создание экземпляра вашего класса менеджера.
Еще один плюс к тому, чтобы сделать это таким образом, если вы позже решите переключиться на другого провайдера, не нужно много кода менять.