Scala / Spark вперед заполняют условия [дубликат]

Обычно ошибка, возникающая при сбое в работе базы данных базы данных, поэтому не забудьте подключить вашу базу данных или включить файл базы данных.

include_once(db_connetc.php');

ИЛИ

// Create a connection
$connection = mysql_connect("localhost", "root", "") or die(mysql_error());

//Select database
mysql_select_db("db_name", $connection) or die(mysql_error());

$employee_query = "SELECT * FROM employee WHERE `id` ='".$_POST['id']."';

$employee_data = mysql_query($employee_query);

if (mysql_num_rows($employee_data) > 0) {

    while ($row = mysql_fetch_array($employee_data)){
        echo $row['emp_name'];
    } // end of while loop
} // end of if
  • Лучшей практикой является запуск запроса в sqlyog, а затем его копирование в код страницы.
  • Всегда сохраняйте свой запрос в переменной и затем повторяйте эту переменную. Затем перейдите к mysql_query($query_variable);.
20
задан MrE 11 November 2015 в 02:36
поделиться

1 ответ

Первоначальный ответ (одно предположение о временном ряду):

Прежде всего, попробуйте избежать оконных функций, если вы не можете предоставить предложение PARTITION BY. Он перемещает данные в один раздел, поэтому большую часть времени это просто невозможно.

Что вы можете сделать, это заполнить пробелы на RDD, используя mapPartitionsWithIndex. Поскольку вы не представили пример данных или ожидаемого вывода, считайте, что это псевдокод не является настоящей программой Scala:

  • сначала позволяет упорядочить DataFrame по дате и преобразовать в RDD
    import org.apache.spark.sql.Row
    import org.apache.spark.rdd.RDD
    
    val rows: RDD[Row] = df.orderBy($"Date").rdd
    
  • next позволяет найти последнее не нулевое наблюдение на каждый раздел
    def notMissing(row: Row): Boolean = ???
    
    val toCarry: scala.collection.Map[Int,Option[org.apache.spark.sql.Row]] = rows
      .mapPartitionsWithIndex{ case (i, iter) => 
        Iterator((i, iter.filter(notMissing(_)).toSeq.lastOption)) }
      .collectAsMap
    
  • и преобразовать это Map в трансляцию
    val toCarryBd = sc.broadcast(toCarry)
    
  • окончательно на несколько разбиений по разделам снова заполняя пробелы:
    def fill(i: Int, iter: Iterator[Row]): Iterator[Row] = {
      // If it is the beginning of partition and value is missing
      // extract value to fill from toCarryBd.value
      // Remember to correct for empty / only missing partitions
      // otherwise take last not-null from the current partition
    }
    
    val imputed: RDD[Row] = rows
      .mapPartitionsWithIndex{ case (i, iter) => fill(i, iter) } 
    
  • , наконец, преобразуется обратно в DataFrame

Редактирование (секционированный / временной ряд для групповых данных):

дьявол находится в деталях. Если ваши данные разбиты на разделы, тогда вся проблема может быть решена с помощью groupBy. Предположим, что вы просто разделяете по столбцу «v» типа T, а Date - это целая временная метка:

def fill(iter: List[Row]): List[Row] = {
  // Just go row by row and fill with last non-empty value
  ???
}

val groupedAndSorted = df.rdd
  .groupBy(_.getAs[T]("k"))
  .mapValues(_.toList.sortBy(_.getAs[Int]("Date")))

val rows: RDD[Row] = groupedAndSorted.mapValues(fill).values.flatMap(identity)

val dfFilled = sqlContext.createDataFrame(rows, df.schema)

Таким образом вы можете одновременно заполнять все столбцы.

Можно ли это сделать с помощью DataFrames вместо преобразования взад и вперед в RDD?

Это зависит, хотя вряд ли это будет эффективно. Если максимальный зазор относительно невелик, вы можете сделать что-то вроде этого:

import org.apache.spark.sql.functions._
import org.apache.spark.sql.expressions.{WindowSpec, Window}
import org.apache.spark.sql.Column

val maxGap: Int = ???  // Maximum gap between observations
val columnsToFill: List[String] = ???  // List of columns to fill
val suffix: String = "_" // To disambiguate between original and imputed 

// Take lag 1 to maxGap and coalesce
def makeCoalesce(w: WindowSpec)(magGap: Int)(suffix: String)(c: String) = {
  // Generate lag values between 1 and maxGap
  val lags = (1 to maxGap).map(lag(col(c), _)over(w))
  // Add current, coalesce and set alias
  coalesce(col(c) +: lags: _*).alias(s"$c$suffix")
}


// For each column you want to fill nulls apply makeCoalesce
val lags: List[Column] = columnsToFill.map(makeCoalesce(w)(maxGap)("_"))


// Finally select
val dfImputed = df.select($"*" :: lags: _*)

Его можно легко настроить, чтобы использовать разный максимальный зазор на столбе.

Простейший способ добиться аналогичного результат последней версии Spark заключается в использовании last с ignoreNulls:

import org.apache.spark.sql.functions._
import org.apache.spark.sql.expressions.Window

val w = Window.partitionBy($"k").orderBy($"Date")
  .rowsBetween(Window.unboundedPreceding, -1)

df.withColumn("value", coalesce($"value", last($"value", true).over(w)))

. Хотя можно отказаться от предложения partitionBy и применить этот метод по всему миру, это будет чрезмерно дорогостоящим с большими наборами данных .

14
ответ дан user6910411 26 August 2018 в 01:50
поделиться
Другие вопросы по тегам:

Похожие вопросы: