Если вы используете Spark 1.4+, это стало намного проще, благодаря API DataFrame . (DataFrames были введены в Spark 1.3, но partitionBy()
, который нам нужен, был введен в 1.4 .)
Если вы начинаете с RDD, вы будете сначала нужно преобразовать его в DataFrame:
val people_rdd = sc.parallelize(Seq((1, "alice"), (1, "bob"), (2, "charlie")))
val people_df = people_rdd.toDF("number", "name")
В Python этот же код:
people_rdd = sc.parallelize([(1, "alice"), (1, "bob"), (2, "charlie")])
people_df = people_rdd.toDF(["number", "name"])
Как только у вас есть DataFrame, запись на несколько выходов на основе конкретный ключ прост. Более того, и это красота API DataFrame - код почти одинаковый для Python, Scala, Java и R:
people_df.write.partitionBy("number").text("people")
И вы можете легко использовать другие форматы вывода, если вы хотите:
people_df.write.partitionBy("number").json("people-json")
people_df.write.partitionBy("number").parquet("people-parquet")
В каждом из этих примеров Spark создаст подкаталог для каждого из ключей, которые мы разделили DataFrame на:
people/
_SUCCESS
number=1/
part-abcd
part-efgh
number=2/
part-abcd
part-efgh
Как насчет того, чтобы добавить что-то вроде этого в Вашу модель User?
has_many :active_events, :through => :event_users,
:class_name => "Event",
:source => :event,
:conditions => ['event_users.active = ?',true]
После этого необходимо быть в состоянии получить активные события для пользователя только путем вызова:
User.first.active_events
Даже при том, что Ваш u.events не явно вызов user_events таблицы, та таблица все еще включена в SQL неявно из-за необходимых соединений. Так, можно все еще использовать ту таблицу в условиях находки:
u.events.find(:all, :conditions => ["user_events.active = ?", true])
, Конечно, если Вы планируете сделать этот поиск, много тогда уверенный, дайте ему отдельную ассоциацию, как Milan Novota предполагает, но нет никакого требование для Вас, чтобы сделать это тот путь