Вот окончательная конфигурация, которую я разработал, которая использует комбинацию синонов и насмешек:
// Dependencies
var expect = require('chai').expect;
var sinon = require('sinon');
var mockery = require('mockery');
var reloadStub = require('../../../spec/utils/reloadStub');
describe('UNIT: userController.js', function() {
var reportErrorStub;
var controller;
var userModel;
before(function() {
// mock the error reporter
mockery.enable({
warnOnReplace: false,
warnOnUnregistered: false,
useCleanCache: true
});
// load controller and model
controller = require('./userController');
userModel = require('./userModel');
});
after(function() {
// disable mock after tests complete
mockery.disable();
});
describe('#createUser', function() {
var req;
var res;
var status;
var end;
var json;
// Stub `#save` for all these tests
before(function() {
sinon.stub(userModel.prototype, 'save');
});
// Stub out req and res
beforeEach(function() {
req = {
body: {
username: 'Andrew',
userID: 1
}
};
status = sinon.stub();
end = sinon.stub();
json = sinon.stub();
res = { status: status.returns({ end: end, json: json }) };
});
// Reset call count after each test
afterEach(function() {
userModel.prototype.save.reset();
});
// Restore after all tests finish
after(function() {
userModel.prototype.save.restore();
});
it('should call `User.save`', function(done) {
controller.createUser(req, res);
/**
* Since Mongoose's `new` is asynchronous, run our expectations on the
* next cycle of the event loop.
*/
setTimeout(function() {
expect(userModel.prototype.save.callCount).to.equal(1);
done();
}, 0);
});
}
}
Вы уверены, что все строки имеют не менее 2 столбцов? Вы можете попробовать что-то вроде, просто чтобы проверить?:
sc.textFile("file.csv") \
.map(lambda line: line.split(",")) \
.filter(lambda line: len(line)>1) \
.map(lambda line: (line[0],line[1])) \
.collect()
В качестве альтернативы вы можете распечатать виновника (если есть):
sc.textFile("file.csv") \
.map(lambda line: line.split(",")) \
.filter(lambda line: len(line)<=1) \
.collect()
import pandas as pd
data1 = pd.read_csv("test1.csv")
data2 = pd.read_csv("train1.csv")
Это соответствует тому, что JP Mercier изначально предложил об использовании Pandas, но с серьезной модификацией: если вы читаете данные в Pandas в кусках, это должно быть более податливым. Смысл, что вы можете анализировать гораздо больший файл, чем Pandas может фактически обрабатывать как единый кусок и передавать его Spark в меньших размерах. (Это также отвечает на комментарий о том, почему хотелось бы использовать Spark, если они все равно могут загружать все в Pandas.)
from pyspark import SparkContext
from pyspark.sql import SQLContext
import pandas as pd
sc = SparkContext('local','example') # if using locally
sql_sc = SQLContext(sc)
Spark_Full = sc.emptyRDD()
chunk_100k = pd.read_csv("Your_Data_File.csv", chunksize=100000)
# if you have headers in your csv file:
headers = list(pd.read_csv("Your_Data_File.csv", nrows=0).columns)
for chunky in chunk_100k:
Spark_Full += sc.parallelize(chunky.values.tolist())
YourSparkDataFrame = Spark_Full.toDF(headers)
# if you do not have headers, leave empty instead:
# YourSparkDataFrame = Spark_Full.toDF()
YourSparkDataFrame.show()
StringIO
. csv
может использовать любой итеративный b) __next__
не следует использовать напрямую и сбой в пустой строке. Взгляните на flatMap c) Было бы гораздо эффективнее использовать mapPartitions
вместо инициализации считывателя на каждой строке :)
– zero323
22 April 2016 в 05:21
rdd.mapPartitions(lambda x: csv.reader(x))
работает, а rdd.map(lambda x: csv.reader(x))
выдает ошибку? Я ожидал, что оба бросят то же самое TypeError: can't pickle _csv.reader objects
. Похоже, mapPartitions
автоматически называет некоторый эквивалент «readlines», на объекте csv.reader
, где с map
я должен был явно вызвать __next__
, чтобы получить списки из csv.reader
. 2) Куда входит flatMap
? Только вызов mapPartitions
работал для меня.
– Galen Long
22 April 2016 в 19:11
rdd.mapPartitions(lambda x: csv.reader(x))
работает, потому что mapPartitions
ожидает объект Iterable
. Если вы хотите быть явным, вы могли бы понять или выразить выражение. map
не работает, потому что он не перебирает объект. Поэтому мое предложение использовать flatMap(lambda x: csv.reader([x]))
, которое будет перебирать читателя. Но mapPartitions
здесь намного лучше.
– zero323
23 April 2016 в 04:30
Если ваши данные csv не содержат строк в любом из полей, вы можете загрузить свои данные с помощью textFile()
и проанализировать его
import csv
import StringIO
def loadRecord(line):
input = StringIO.StringIO(line)
reader = csv.DictReader(input, fieldnames=["name1", "name2"])
return reader.next()
input = sc.textFile(inputFile).map(loadRecord)
Если вы хотите загрузить csv в качестве фреймворка данных, вы можете сделать следующее:
from pyspark.sql import SQLContext
sqlContext = SQLContext(sc)
df = sqlContext.read.format('com.databricks.spark.csv') \
.options(header='true', inferschema='true') \
.load('sampleFile.csv') # this is your csv file
Это сработало для меня.
И еще одна опция, состоящая в чтении CSV-файла с использованием Pandas, а затем импортировании Pandas DataFrame в Spark.
Например:
from pyspark import SparkContext
from pyspark.sql import SQLContext
import pandas as pd
sc = SparkContext('local','example') # if using locally
sql_sc = SQLContext(sc)
pandas_df = pd.read_csv('file.csv') # assuming the file contains a header
# pandas_df = pd.read_csv('file.csv', names = ['column 1','column 2']) # if no header
s_df = sql_sc.createDataFrame(pandas_df)
Теперь есть еще один вариант для любого общего файла csv: https://github.com/seahboonsiew/pyspark-csv следующим образом:
Предположим, что мы имеем следующее context
sc = SparkContext
sqlCtx = SQLContext or HiveContext
Сначала распределите pyspark-csv.py исполнителям, использующим SparkContext
import pyspark_csv as pycsv
sc.addPyFile('pyspark_csv.py')
Чтение данных csv через SparkContext и преобразование его в DataFrame
plaintext_rdd = sc.textFile('hdfs://x.x.x.x/blah.csv')
dataframe = pycsv.csvToDataFrame(sqlCtx, plaintext_rdd)
from pyspark.sql import SparkSession
spark = SparkSession \
.builder \
.appName("Python Spark SQL basic example") \
.config("spark.some.config.option", "some-value") \
.getOrCreate()
df = spark.read.csv("/home/stp/test1.csv",header=True,separator="|");
print(df.collect())
Spark 2.0.0 +
Вы можете напрямую использовать встроенный источник данных csv:
spark.read.csv(
"some_input_file.csv", header=True, mode="DROPMALFORMED", schema=schema
)
или
(spark.read
.schema(schema)
.option("header", "true")
.option("mode", "DROPMALFORMED")
.csv("some_input_file.csv"))
без включения каких-либо внешние зависимости.
Spark & lt; 2.0.0:
Вместо ручного разбора, который в общем случае далек от тривиального, я бы рекомендовал spark-csv
:
Удостоверьтесь, что Spark CSV включен в путь (--packages
, --jars
, --driver-class-path
)
И загрузите ваши данные следующим образом:
(df = sqlContext
.read.format("com.databricks.spark.csv")
.option("header", "true")
.option("inferschema", "true")
.option("mode", "DROPMALFORMED")
.load("some_input_file.csv"))
Он может обрабатывать загрузку, схему вывода, отбрасывания неверных строк и не требует передачи данных из Python в JVM.
Примечание:
Если вам известна схема, лучше избегать вывода схемы и передать ее к DataFrameReader
. Предполагая, что у вас есть три столбца - целое, двойное и строковое:
from pyspark.sql.types import StructType, StructField
from pyspark.sql.types import DoubleType, IntegerType, StringType
schema = StructType([
StructField("A", IntegerType()),
StructField("B", DoubleType()),
StructField("C", StringType())
])
(sqlContext
.read
.format("com.databricks.spark.csv")
.schema(schema)
.option("header", "true")
.option("mode", "DROPMALFORMED")
.load("some_input_file.csv"))
pyspark --packages com.databricks:spark-csv_2.11:1.4.0
(обязательно замените версии databricks / spark на те, которые вы установили).
– Galen Long
22 April 2016 в 19:54
csv
, чтобы обрабатывать все экранирование, потому что просто разделение запятой не будет работать, если, скажем, есть запятые в значениях. – sudo 17 April 2017 в 23:16","
. – Alceu Costa 19 July 2018 в 13:16