Лучше вы можете запросить соответствующий элемент массива с помощью $slice
, полезно ли возвращать значимый объект в массиве.
db.test.find({"shapes.color" : "blue"}, {"shapes.$" : 1})
$slice
полезно, когда вы знаете индекс элемента , но иногда вы хотите, какой бы элемент массива не соответствовал вашим критериям. Вы можете вернуть соответствующий элемент с помощью оператора $
.
Вы получаете эти ошибки, потому что схема, определенная sqlType
, никогда не отображается и не предназначена для прямого доступа. Он просто предоставляет способ выражения сложных типов данных с использованием собственных типов Spark SQL.
Вы можете получить доступ к отдельным атрибутам с помощью UDF, но сначала покажите, что внутренняя структура действительно не отображается:
dataFrame.printSchema
// root
// |-- person_id: integer (nullable = true)
// |-- person: mockperso (nullable = true)
Для создания UDF нужны функции, которые принимают в качестве аргумента объект типа, представленного данным UDT:
import org.apache.spark.sql.functions.udf
val getFirstName = (person: MockPerson) => person.getFirstName
val getLastName = (person: MockPerson) => person.getLastName
val getAge = (person: MockPerson) => person.getAge
, который можно обернуть с помощью функции udf
:
val getFirstNameUDF = udf(getFirstName)
val getLastNameUDF = udf(getLastName)
val getAgeUDF = udf(getAge)
dataFrame.select(
getFirstNameUDF($"person").alias("first_name"),
getLastNameUDF($"person").alias("last_name"),
getAgeUDF($"person").alias("age")
).show()
// +----------+---------+---+
// |first_name|last_name|age|
// +----------+---------+---+
// | First1| Last1| 1|
// | First2| Last2| 2|
// +----------+---------+---+
Чтобы использовать их с необработанным SQL, у вас есть функции регистрации через SQLContext
:
sqlContext.udf.register("first_name", getFirstName)
sqlContext.udf.register("last_name", getLastName)
sqlContext.udf.register("age", getAge)
sqlContext.sql("""
SELECT first_name(person) AS first_name, last_name(person) AS last_name
FROM person
WHERE age(person) < 100""").show
// +----------+---------+
// |first_name|last_name|
// +----------+---------+
// | First1| Last1|
// | First2| Last2|
// +----------+---------+
К сожалению, он поставляется с прикрепленной ценой. Прежде всего, для каждой операции требуется десериализация. Это также существенно ограничивает способы оптимизации запроса. В частности, для любой операции join
в одном из этих полей требуется декартово произведение.
На практике, если вы хотите кодировать сложную структуру, которая содержит атрибуты, которые могут быть выражены с помощью встроенных типов, лучше использовать StructType
:
case class Person(first_name: String, last_name: String, age: Int)
val df = sc.parallelize(
(1 to 2).map(i => (i, Person(s"First$i", s"Last$i", i)))).toDF("id", "person")
df.printSchema
// root
// |-- id: integer (nullable = false)
// |-- person: struct (nullable = true)
// | |-- first_name: string (nullable = true)
// | |-- last_name: string (nullable = true)
// | |-- age: integer (nullable = false)
df
.where($"person.age" < 100)
.select($"person.first_name", $"person.last_name")
.show
// +----------+---------+
// |first_name|last_name|
// +----------+---------+
// | First1| Last1|
// | First2| Last2|
// +----------+---------+
и зарезервировать UDT для фактических расширений типа, подобных встроенным VectorUDT
, или вещам, которые могут извлечь выгоду из определенного представления , как перечисления .