раскол по запятой искра RDD [дубликат]

Вот окончательная конфигурация, которую я разработал, которая использует комбинацию синонов и насмешек:

// Dependencies
var expect = require('chai').expect;
var sinon = require('sinon');
var mockery = require('mockery');
var reloadStub = require('../../../spec/utils/reloadStub');

describe('UNIT: userController.js', function() {

  var reportErrorStub;
  var controller;
  var userModel;

  before(function() {
    // mock the error reporter
    mockery.enable({
      warnOnReplace: false,
      warnOnUnregistered: false,
      useCleanCache: true
    });

    // load controller and model
    controller = require('./userController');
    userModel = require('./userModel');
  });

  after(function() {
    // disable mock after tests complete
    mockery.disable();
  });

  describe('#createUser', function() {
    var req;
    var res;
    var status;
    var end;
    var json;

    // Stub `#save` for all these tests
    before(function() {
      sinon.stub(userModel.prototype, 'save');
    });

    // Stub out req and res
    beforeEach(function() {
      req = {
        body: {
          username: 'Andrew',
          userID: 1
        }
      };

      status = sinon.stub();
      end = sinon.stub();
      json = sinon.stub();

      res = { status: status.returns({ end: end, json: json }) };
    });

    // Reset call count after each test
    afterEach(function() {
      userModel.prototype.save.reset();
    });

    // Restore after all tests finish
    after(function() {
      userModel.prototype.save.restore();
    });

    it('should call `User.save`', function(done) {
      controller.createUser(req, res);
      /**
       * Since Mongoose's `new` is asynchronous, run our expectations on the
       * next cycle of the event loop.
       */
      setTimeout(function() {
        expect(userModel.prototype.save.callCount).to.equal(1);
        done();
      }, 0);
    });
  }
}
69
задан Kernael 28 February 2015 в 15:41
поделиться

10 ответов

Вы уверены, что все строки имеют не менее 2 столбцов? Вы можете попробовать что-то вроде, просто чтобы проверить?:

sc.textFile("file.csv") \
    .map(lambda line: line.split(",")) \
    .filter(lambda line: len(line)>1) \
    .map(lambda line: (line[0],line[1])) \
    .collect()

В качестве альтернативы вы можете распечатать виновника (если есть):

sc.textFile("file.csv") \
    .map(lambda line: line.split(",")) \
    .filter(lambda line: len(line)<=1) \
    .collect()
42
ответ дан 030 23 August 2018 в 01:23
поделиться
  • 1
    Вот и все, одна строка с одним столбцом, спасибо. – Kernael 28 February 2015 в 18:01
  • 2
    Лучше анализировать, используя встроенную библиотеку csv, чтобы обрабатывать все экранирование, потому что просто разделение запятой не будет работать, если, скажем, есть запятые в значениях. – sudo 17 April 2017 в 23:16
  • 3
    Существует множество инструментов для разбора csv, не изобретайте колесо – Stephen 14 September 2017 в 16:29
  • 4
    Этот код будет разбит, если в кавычках есть запятая. Разбор csv более сложный, чем просто расщепление на ",". – Alceu Costa 19 July 2018 в 13:16
import pandas as pd

data1 = pd.read_csv("test1.csv")
data2 = pd.read_csv("train1.csv")
-4
ответ дан Baum mit Augen 23 August 2018 в 01:23
поделиться
  • 1
    Это Панды, а не Спарк. Оригинальный автор хочет, чтобы данные загружались в кластер Spark с распределенной памятью, а не на одну машину. – ZakJ 8 November 2017 в 18:45

Это соответствует тому, что JP Mercier изначально предложил об использовании Pandas, но с серьезной модификацией: если вы читаете данные в Pandas в кусках, это должно быть более податливым. Смысл, что вы можете анализировать гораздо больший файл, чем Pandas может фактически обрабатывать как единый кусок и передавать его Spark в меньших размерах. (Это также отвечает на комментарий о том, почему хотелось бы использовать Spark, если они все равно могут загружать все в Pandas.)

from pyspark import SparkContext
from pyspark.sql import SQLContext
import pandas as pd

sc = SparkContext('local','example')  # if using locally
sql_sc = SQLContext(sc)

Spark_Full = sc.emptyRDD()
chunk_100k = pd.read_csv("Your_Data_File.csv", chunksize=100000)
# if you have headers in your csv file:
headers = list(pd.read_csv("Your_Data_File.csv", nrows=0).columns)

for chunky in chunk_100k:
    Spark_Full +=  sc.parallelize(chunky.values.tolist())

YourSparkDataFrame = Spark_Full.toDF(headers)
# if you do not have headers, leave empty instead:
# YourSparkDataFrame = Spark_Full.toDF()
YourSparkDataFrame.show()
10
ответ дан Community 23 August 2018 в 01:23
поделиться
  • 1
    Вам не нужно, чтобы Hive использовал DataFrames. Что касается вашего решения: a) Нет необходимости в StringIO. csv может использовать любой итеративный b) __next__ не следует использовать напрямую и сбой в пустой строке. Взгляните на flatMap c) Было бы гораздо эффективнее использовать mapPartitions вместо инициализации считывателя на каждой строке :) – zero323 22 April 2016 в 05:21
  • 2
    Большое спасибо за исправления! Прежде чем отредактировать свой ответ, я хочу убедиться, что полностью понимаю. 1) Почему rdd.mapPartitions(lambda x: csv.reader(x)) работает, а rdd.map(lambda x: csv.reader(x)) выдает ошибку? Я ожидал, что оба бросят то же самое TypeError: can't pickle _csv.reader objects. Похоже, mapPartitions автоматически называет некоторый эквивалент «readlines», на объекте csv.reader, где с map я должен был явно вызвать __next__, чтобы получить списки из csv.reader. 2) Куда входит flatMap? Только вызов mapPartitions работал для меня. – Galen Long 22 April 2016 в 19:11
  • 3
    rdd.mapPartitions(lambda x: csv.reader(x)) работает, потому что mapPartitions ожидает объект Iterable. Если вы хотите быть явным, вы могли бы понять или выразить выражение. map не работает, потому что он не перебирает объект. Поэтому мое предложение использовать flatMap(lambda x: csv.reader([x])), которое будет перебирать читателя. Но mapPartitions здесь намного лучше. – zero323 23 April 2016 в 04:30
  • 4
    обратите внимание, что это будет читать заголовок как строку данных, а не как заголовок – muon 13 February 2017 в 20:30

Если ваши данные csv не содержат строк в любом из полей, вы можете загрузить свои данные с помощью textFile() и проанализировать его

import csv
import StringIO

def loadRecord(line):
    input = StringIO.StringIO(line)
    reader = csv.DictReader(input, fieldnames=["name1", "name2"])
    return reader.next()

input = sc.textFile(inputFile).map(loadRecord)
2
ответ дан iec2011007 23 August 2018 в 01:23
поделиться

Если вы хотите загрузить csv в качестве фреймворка данных, вы можете сделать следующее:

from pyspark.sql import SQLContext
sqlContext = SQLContext(sc)

df = sqlContext.read.format('com.databricks.spark.csv') \
    .options(header='true', inferschema='true') \
    .load('sampleFile.csv') # this is your csv file

Это сработало для меня.

0
ответ дан Jeril 23 August 2018 в 01:23
поделиться
  • 1
    Я отказался, потому что этот ответ уже существует. – Galen Long 31 March 2018 в 05:00

И еще одна опция, состоящая в чтении CSV-файла с использованием Pandas, а затем импортировании Pandas DataFrame в Spark.

Например:

from pyspark import SparkContext
from pyspark.sql import SQLContext
import pandas as pd

sc = SparkContext('local','example')  # if using locally
sql_sc = SQLContext(sc)

pandas_df = pd.read_csv('file.csv')  # assuming the file contains a header
# pandas_df = pd.read_csv('file.csv', names = ['column 1','column 2']) # if no header
s_df = sql_sc.createDataFrame(pandas_df)
9
ответ дан JP Mercier 23 August 2018 в 01:23
поделиться
  • 1
    Почему OP хотел бы сделать на искру, если он сможет загружать данные в пандах – WoodChopper 14 November 2015 в 15:57
  • 2
    Академические цели – bluerubez 15 December 2015 в 01:53
  • 3
    Не желая устанавливать или указывать зависимости для каждого искрового блока .... – SummerEla 15 June 2016 в 00:59
  • 4
    Panda позволяет записывать файлы при чтении, поэтому здесь используется прецедент, поскольку Pandas обрабатывает начальный синтаксический анализ файлов. См. Мой ответ ниже для кода. – abby sobh 6 October 2016 в 18:07
  • 5
    Внимание: Pandas также обрабатывает путь схемы столбцов иначе, чем искра, особенно когда задействованы пробелы. Безопаснее просто загружать csv как строки для каждого столбца. – Donald Vetal 2 August 2017 в 22:17

Теперь есть еще один вариант для любого общего файла csv: https://github.com/seahboonsiew/pyspark-csv следующим образом:

Предположим, что мы имеем следующее context

sc = SparkContext
sqlCtx = SQLContext or HiveContext

Сначала распределите pyspark-csv.py исполнителям, использующим SparkContext

import pyspark_csv as pycsv
sc.addPyFile('pyspark_csv.py')

Чтение данных csv через SparkContext и преобразование его в DataFrame

plaintext_rdd = sc.textFile('hdfs://x.x.x.x/blah.csv')
dataframe = pycsv.csvToDataFrame(sqlCtx, plaintext_rdd)
3
ответ дан optimist 23 August 2018 в 01:23
поделиться
from pyspark.sql import SparkSession

spark = SparkSession \
    .builder \
    .appName("Python Spark SQL basic example") \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()

df = spark.read.csv("/home/stp/test1.csv",header=True,separator="|");

print(df.collect())
8
ответ дан tar 23 August 2018 в 01:23
поделиться

Spark 2.0.0 +

Вы можете напрямую использовать встроенный источник данных csv:

spark.read.csv(
    "some_input_file.csv", header=True, mode="DROPMALFORMED", schema=schema
)

или

(spark.read
    .schema(schema)
    .option("header", "true")
    .option("mode", "DROPMALFORMED")
    .csv("some_input_file.csv"))

без включения каких-либо внешние зависимости.

Spark & ​​lt; 2.0.0:

Вместо ручного разбора, который в общем случае далек от тривиального, я бы рекомендовал spark-csv :

Удостоверьтесь, что Spark CSV включен в путь (--packages, --jars, --driver-class-path)

И загрузите ваши данные следующим образом:

(df = sqlContext
    .read.format("com.databricks.spark.csv")
    .option("header", "true")
    .option("inferschema", "true")
    .option("mode", "DROPMALFORMED")
    .load("some_input_file.csv"))

Он может обрабатывать загрузку, схему вывода, отбрасывания неверных строк и не требует передачи данных из Python в JVM.

Примечание:

Если вам известна схема, лучше избегать вывода схемы и передать ее к DataFrameReader. Предполагая, что у вас есть три столбца - целое, двойное и строковое:

from pyspark.sql.types import StructType, StructField
from pyspark.sql.types import DoubleType, IntegerType, StringType

schema = StructType([
    StructField("A", IntegerType()),
    StructField("B", DoubleType()),
    StructField("C", StringType())
])

(sqlContext
    .read
    .format("com.databricks.spark.csv")
    .schema(schema)
    .option("header", "true")
    .option("mode", "DROPMALFORMED")
    .load("some_input_file.csv"))
111
ответ дан zero323 23 August 2018 в 01:23
поделиться
  • 1
    Если вы это сделаете, не забудьте включить пакет csv databricks, когда вы открываете оболочку pyspark или используете spark-submit. Например, pyspark --packages com.databricks:spark-csv_2.11:1.4.0 (обязательно замените версии databricks / spark на те, которые вы установили). – Galen Long 22 April 2016 в 19:54
  • 2
    Это csvContext или sqlContext в pyspark? Потому что в scala вам нужен csvContext – Geoffrey Anderson 24 May 2018 в 14:40
10
ответ дан Community 5 November 2018 в 23:05
поделиться
Другие вопросы по тегам:

Похожие вопросы: