Обычно ошибка, возникающая при сбое в работе базы данных базы данных, поэтому не забудьте подключить вашу базу данных или включить файл базы данных.
include_once(db_connetc.php');
ИЛИ
// Create a connection
$connection = mysql_connect("localhost", "root", "") or die(mysql_error());
//Select database
mysql_select_db("db_name", $connection) or die(mysql_error());
$employee_query = "SELECT * FROM employee WHERE `id` ='".$_POST['id']."';
$employee_data = mysql_query($employee_query);
if (mysql_num_rows($employee_data) > 0) {
while ($row = mysql_fetch_array($employee_data)){
echo $row['emp_name'];
} // end of while loop
} // end of if
mysql_query($query_variable);
. Прежде всего, попробуйте избежать оконных функций, если вы не можете предоставить предложение PARTITION BY
. Он перемещает данные в один раздел, поэтому большую часть времени это просто невозможно.
Что вы можете сделать, это заполнить пробелы на RDD
, используя mapPartitionsWithIndex
. Поскольку вы не представили пример данных или ожидаемого вывода, считайте, что это псевдокод не является настоящей программой Scala:
DataFrame
по дате и преобразовать в RDD
import org.apache.spark.sql.Row
import org.apache.spark.rdd.RDD
val rows: RDD[Row] = df.orderBy($"Date").rdd
def notMissing(row: Row): Boolean = ???
val toCarry: scala.collection.Map[Int,Option[org.apache.spark.sql.Row]] = rows
.mapPartitionsWithIndex{ case (i, iter) =>
Iterator((i, iter.filter(notMissing(_)).toSeq.lastOption)) }
.collectAsMap
Map
в трансляцию val toCarryBd = sc.broadcast(toCarry)
def fill(i: Int, iter: Iterator[Row]): Iterator[Row] = {
// If it is the beginning of partition and value is missing
// extract value to fill from toCarryBd.value
// Remember to correct for empty / only missing partitions
// otherwise take last not-null from the current partition
}
val imputed: RDD[Row] = rows
.mapPartitionsWithIndex{ case (i, iter) => fill(i, iter) }
дьявол находится в деталях. Если ваши данные разбиты на разделы, тогда вся проблема может быть решена с помощью groupBy
. Предположим, что вы просто разделяете по столбцу «v» типа T
, а Date
- это целая временная метка:
def fill(iter: List[Row]): List[Row] = {
// Just go row by row and fill with last non-empty value
???
}
val groupedAndSorted = df.rdd
.groupBy(_.getAs[T]("k"))
.mapValues(_.toList.sortBy(_.getAs[Int]("Date")))
val rows: RDD[Row] = groupedAndSorted.mapValues(fill).values.flatMap(identity)
val dfFilled = sqlContext.createDataFrame(rows, df.schema)
Таким образом вы можете одновременно заполнять все столбцы.
Можно ли это сделать с помощью DataFrames вместо преобразования взад и вперед в RDD?
blockquote>Это зависит, хотя вряд ли это будет эффективно. Если максимальный зазор относительно невелик, вы можете сделать что-то вроде этого:
import org.apache.spark.sql.functions._ import org.apache.spark.sql.expressions.{WindowSpec, Window} import org.apache.spark.sql.Column val maxGap: Int = ??? // Maximum gap between observations val columnsToFill: List[String] = ??? // List of columns to fill val suffix: String = "_" // To disambiguate between original and imputed // Take lag 1 to maxGap and coalesce def makeCoalesce(w: WindowSpec)(magGap: Int)(suffix: String)(c: String) = { // Generate lag values between 1 and maxGap val lags = (1 to maxGap).map(lag(col(c), _)over(w)) // Add current, coalesce and set alias coalesce(col(c) +: lags: _*).alias(s"$c$suffix") } // For each column you want to fill nulls apply makeCoalesce val lags: List[Column] = columnsToFill.map(makeCoalesce(w)(maxGap)("_")) // Finally select val dfImputed = df.select($"*" :: lags: _*)
Его можно легко настроить, чтобы использовать разный максимальный зазор на столбе.
Простейший способ добиться аналогичного результат последней версии Spark заключается в использовании
last
сignoreNulls
:import org.apache.spark.sql.functions._ import org.apache.spark.sql.expressions.Window val w = Window.partitionBy($"k").orderBy($"Date") .rowsBetween(Window.unboundedPreceding, -1) df.withColumn("value", coalesce($"value", last($"value", true).over(w)))
. Хотя можно отказаться от предложения
partitionBy
и применить этот метод по всему миру, это будет чрезмерно дорогостоящим с большими наборами данных .