Считайте данные текстового файла с помощью Spark и разделите данные с помощью запятой

Я всегда использую: https://www.mail-tester.com/

Он дает мне отзыв о технической части отправки электронной почты. Как SPF-записи, DKIM, счет Spamassassin и так далее. Несмотря на то, что я знаю, что требуется, я постоянно делаю ошибки, а mail-tester.com позволяет легко понять, что может быть неправильным.

-2
задан SHALIN PATEL 18 March 2019 в 15:48
поделиться

3 ответа

Ваши данные не в формате CSV. CSV означает разделенный запятыми текстовый файл с фиксированной схемой. CSV для ваших данных будет выглядеть следующим образом:

abc,x1,x2,x3,,
def,x1,x3,x4,x8,x9
ghi,x7,x10,x11,,

Обратите внимание на запятые в строках 1 & amp; 3, которых нет в ваших данных.

Поскольку у вас есть текстовый файл, который не является CSV, способ получить нужную схему в Spark - это прочитать весь файл в Python, проанализировать то, что вы хотите, и затем использовать spark.crateDataFrame(). В качестве альтернативы, если у вас есть несколько таких файлов в каталоге, используйте SparkContext.wholeTextFiles, а затем flatMap свою функцию синтаксического анализа.

Если вы уже сделали что-то вроде open("Your File.txt").readlines, остальное просто:

import re
from pyspark.sql import *

lines = [
  "abc, x1, x2, x3",
  "def, x1, x3, x4,x8,x9",
  "ghi, x7, x10, x11"
]

split = re.compile("\s*,\s*")
Line = Row("id", "first", "rest")

def parse_line(id, line):
  tokens = split.split(line.strip)
  return Line(id, tokens[0], tokens.pop(0))

def parse_lines(lines):
  return [parse_line(i, x) for i,x in enumerate(lines)]

spark.createDataFrame(parse_lines(lines))
0
ответ дан Sim 18 March 2019 в 15:48
поделиться

Что вы можете сделать, это сгенерировать сначала идентификатор, используя zipWithIndex, а затем внутри функции map взять первую часть строки с r[0].split(",")[0], а вторую с r[0].split(",")[1:].

Вот код, как описано выше:

from pyspark.sql.types import StringType

lines = ["abc, x1, x2, x3",
        "def, x1, x3, x4,x8,x9",
        "ghi, x7, x10, x11"]

df = spark.createDataFrame(lines, StringType())
df = df.rdd.zipWithIndex() \
           .map(lambda (r, indx): (indx, r[0].split(",")[0], r[0].split(",")[1:])) \
           .toDF(["id", "name", "x_col"])

df.show(10, False)

И вывод:

+---+----+-----------------------+
|id |name|x_col                  |
+---+----+-----------------------+
|0  |abc |[ x1,  x2,  x3]        |
|1  |def |[ x1,  x3,  x4, x8, x9]|
|2  |ghi |[ x7,  x10,  x11]      |
+---+----+-----------------------+
0
ответ дан Alexandros Biratsis 18 March 2019 в 15:48
поделиться

Если данные поступают в файл, могут быть реализованы таким образом:

  1. Считать файл как CSV;
  2. Добавить столбец индекса с помощью «monotonically_increasing_id»
  3. Сначала выберите столбец, а все остальные столбцы как массив.

На Scala можно реализовать таким образом:

val df = spark.read.option("header", "false").csv("non-csv.txt")
val remainingColumns = df.columns.tail
df.withColumn("id", monotonically_increasing_id).
  select(
    col("id"),
    col(df.columns(0)),
    array(remainingColumns.head, remainingColumns.tail: _*)
  ).show(false)

Вывод:

+---+---+--------------------+
|id |_c0|array(_c1, _c2, _c3)|
+---+---+--------------------+
|0  |abc|[ x1,  x2,  x3]     |
|1  |def|[ x1,  x3,  x4]     |
|2  |ghi|[ x7,  x10,  x11]   |
+---+---+--------------------+
0
ответ дан pasha701 18 March 2019 в 15:48
поделиться
Другие вопросы по тегам:

Похожие вопросы: