Самый простой способ добиться того, что вы хотите (где s - ConfigurationBuilder):
s.Global(a => a.UseModules());
s.AddImport("breeze", "breeze-client");
var mySpecialTypes = typeof(IBreezeEntity).Assembly.GetTypes()
.Where(d => typeof(IBreezeEntity).IsAssignableFrom(d));
foreach (var type in mySpecialTypes)
{
s.Substitute(type, new RtSimpleTypeName($"I{type.Name} & breeze.Entity"));
}
Reinforced.Typings также сохраняет наследование. Подумайте о том, как получить ваши сущности из общего типа / интерфейса и экспортировать его.
По моему опыту df.rdd.getNumPartitions
очень быстро, я никогда не сталкивался с тем, чтобы это занимало больше секунды или около того.
Кроме того, вы также можете попробовать
val numPartitions: Long = df
.select(org.apache.spark.sql.functions.spark_partition_id()).distinct().count()
, что позволит избежать использования .rdd
В rdd.getNumPartitions
нет внутренней цены компонента rdd
, потому что возвращенный RDD
никогда не оценивается.
Хотя вы можете легко определить это эмпирически, используя отладчик (я оставлю это как упражнение для читателя), или установив, что никакие задания не запускаются в базовом сценарии
Spark session available as 'spark'.
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/___/ .__/\_,_/_/ /_/\_\ version 2.4.0
/_/
Using Scala version 2.11.12 (OpenJDK 64-Bit Server VM, Java 1.8.0_181)
Type in expressions to have them evaluated.
Type :help for more information.
scala> val ds = spark.read.text("README.md")
ds: org.apache.spark.sql.DataFrame = [value: string]
scala> ds.rdd.getNumPartitions
res0: Int = 1
scala> spark.sparkContext.statusTracker.getJobIdsForGroup(null).isEmpty // Check if there are any known jobs
res1: Boolean = true
[1141 ] этого может быть недостаточно, чтобы убедить вас. Итак, давайте подойдем к этому более систематическим образом:
rdd
возвращает MapPartitionRDD
(ds
, как определено выше):
scala> ds.rdd.getClass
res2: Class[_ <: org.apache.spark.rdd.RDD[org.apache.spark.sql.Row]] = class org.apache.spark.rdd.MapPartitionsRDD
RDD.getNumPartitions
вызывает RDD.partitions
.
RDD.partitions
вызывает getPartitions
(не стесняйтесь также отслеживать путь контрольной точки). RDD.getPartitions
является абстрактным . MapPartitionsRDD.getPartitions
, которая просто делегирует вызов родителю . Есть только MapPartitionsRDD
между rdd
и источником.
scala> ds.rdd.toDebugString
res3: String =
(1) MapPartitionsRDD[3] at rdd at <console>:26 []
| MapPartitionsRDD[2] at rdd at <console>:26 []
| MapPartitionsRDD[1] at rdd at <console>:26 []
| FileScanRDD[0] at rdd at <console>:26 []
Точно так же, если бы Dataset
содержал обмен, мы следовали бы за родителями до ближайшего шаффла:
scala> ds.orderBy("value").rdd.toDebugString
res4: String =
(67) MapPartitionsRDD[13] at rdd at <console>:26 []
| MapPartitionsRDD[12] at rdd at <console>:26 []
| MapPartitionsRDD[11] at rdd at <console>:26 []
| ShuffledRowRDD[10] at rdd at <console>:26 []
+-(1) MapPartitionsRDD[9] at rdd at <console>:26 []
| MapPartitionsRDD[5] at rdd at <console>:26 []
| FileScanRDD[4] at rdd at <console>:26 []
Обратите внимание, что этот случай особенно интересен, потому что мы фактически вызвали работу: [1146 ]
scala> spark.sparkContext.statusTracker.getJobIdsForGroup(null).isEmpty
res5: Boolean = false
scala> spark.sparkContext.statusTracker.getJobIdsForGroup(null)
res6: Array[Int] = Array(0)
Это потому, что мы столкнулись как сценарий, в котором разделы не могут быть определены статически (см. Количество разделов в фрейме данных после сортировки? и Почему преобразование sortBy запускает задание Spark? [тысяча сто тридцать два]).
В таком сценарии getNumPartitions
также будет запускаться задание:
scala> ds.orderBy("value").rdd.getNumPartitions
res7: Int = 67
scala> spark.sparkContext.statusTracker.getJobIdsForGroup(null) // Note new job id
res8: Array[Int] = Array(1, 0)
, однако это не означает, что наблюдаемая стоимость так или иначе связана с вызовом .rdd
. Вместо этого это внутренняя стоимость поиска partitions
в случае, когда нет статической формулы (например, в некоторых форматах ввода Hadoop, где требуется полное сканирование данных).
Обратите внимание, что отмеченные здесь пункты не должны быть экстраполированы на другие приложения из Dataset.rdd
. Например, ds.rdd.count
было бы действительно дорого и расточительно.