Обновления:
2016-07-04
Поскольку последнее обновление MongoDB Spark Connector созрело довольно много. Он предоставляет обновленные бинарные файлы и API на основе источника данных, но использует конфигурацию SparkConf
, поэтому он субъективно менее гибкий, чем Stratio / Spark-MongoDB.
2016-03-30
После первоначального ответа я нашел два разных способа подключения к MongoDB от Spark:
В то время как первый кажется относительно незрелым, последний выглядит гораздо лучше, чем разъем Mongo-Hadoop, и предоставляет API-интерфейс Spark SQL.
# Adjust Scala and package version according to your setup
# although officially 0.11 supports only Spark 1.5
# I haven't encountered any issues on 1.6.1
bin/pyspark --packages com.stratio.datasource:spark-mongodb_2.11:0.11.0
df = (sqlContext.read
.format("com.stratio.datasource.mongodb")
.options(host="mongo:27017", database="foo", collection="bar")
.load())
df.show()
## +---+----+--------------------+
## | x| y| _id|
## +---+----+--------------------+
## |1.0|-1.0|56fbe6f6e4120712c...|
## |0.0| 4.0|56fbe701e4120712c...|
## +---+----+--------------------+
Кажется, он намного более стабилен, чем mongo-hadoop-spark
, поддерживает предикат pushdown без статической конфигурации и просто
Оригинальный ответ:
Действительно, здесь есть немало движущихся частей. Я попытался сделать это немного более управляемым, построив простое изображение Docker, которое примерно соответствует описанной конфигурации (хотя для краткости я опутал библиотеки Hadoop). Вы можете найти полный источник на GitHub
( DOI 10.5281 / zenodo.47882 ) и построить его с нуля:
git clone https://github.com/zero323/docker-mongo-spark.git
cd docker-mongo-spark
docker build -t zero323/mongo-spark .
или загрузить image Я нажал на Docker Hub , чтобы вы могли просто docker pull zero323/mongo-spark
):
Запустить изображения:
docker run -d --name mongo mongo:2.6
docker run -i -t --link mongo:mongo zero323/mongo-spark /bin/bash
Запустить прохождение оболочки PySpark --jars
и --driver-class-path
:
pyspark --jars ${JARS} --driver-class-path ${SPARK_DRIVER_EXTRA_CLASSPATH}
И, наконец, посмотрите, как это работает:
import pymongo
import pymongo_spark
mongo_url = 'mongodb://mongo:27017/'
client = pymongo.MongoClient(mongo_url)
client.foo.bar.insert_many([
{"x": 1.0, "y": -1.0}, {"x": 0.0, "y": 4.0}])
client.close()
pymongo_spark.activate()
rdd = (sc.mongoRDD('{0}foo.bar'.format(mongo_url))
.map(lambda doc: (doc.get('x'), doc.get('y'))))
rdd.collect()
## [(1.0, -1.0), (0.0, 4.0)]
Обратите внимание, что mongo-hadoop, похоже, закрывает соединение после первого действия. Итак, вызов, например, rdd.count()
после того, как сбор будет вызывать исключение.
Исходя из различных проблем, с которыми я столкнулся при создании этого изображения, я склонен полагать, что передача mongo-hadoop-1.5.0-SNAPSHOT.jar
и mongo-hadoop-spark-1.5.0-SNAPSHOT.jar
в оба --jars
и --driver-class-path
является единственным жестким требованием.
Примечания:
--packages
, скорее всего, лучший вариант. Нашел!
Мне нужно отредактировать существующий конвейер сборки и добавить задачу с версией Python 2.x.
Конвейер -> Правка -> Значок плюса в задании агента -> Выбрать версию Python -> Использовать 2.x.