Spark Streaming с защищенной IBM MQ [дубликат]

Я создал небольшую утилиту, которая может генерировать интерфейсы TypeScript из классов C #. Доступно как пакет NuGet . Подробную документацию можно найти на веб-странице проекта .

21
задан user7337271 14 January 2017 в 23:28
поделиться

6 ответов

При создании и развертывании приложений Spark все зависимости требуют совместимых версий.

  • версия Scala. Все пакеты должны использовать одну и ту же версию (2.10, 2.11, 2.12) Scala. Рассмотрим следующий (неверный) build.sbt:
    name := "Simple Project"
    
    version := "1.0"
    
    libraryDependencies ++= Seq(
       "org.apache.spark" % "spark-core_2.11" % "2.0.1",
       "org.apache.spark" % "spark-streaming_2.10" % "2.0.1",
       "org.apache.bahir" % "spark-streaming-twitter_2.11" % "2.0.1"
    )
    
    Мы используем spark-streaming для Scala 2.10, а оставшиеся пакеты для Scala 2.11. Допустимым файлом может быть
    name := "Simple Project"
    
    version := "1.0"
    
    libraryDependencies ++= Seq(
       "org.apache.spark" % "spark-core_2.11" % "2.0.1",
       "org.apache.spark" % "spark-streaming_2.11" % "2.0.1",
       "org.apache.bahir" % "spark-streaming-twitter_2.11" % "2.0.1"
    )
    
    , но лучше указать версию по всему миру и использовать %%:
    name := "Simple Project"
    
    version := "1.0"
    
    scalaVersion := "2.11.7"
    
    libraryDependencies ++= Seq(
       "org.apache.spark" %% "spark-core" % "2.0.1",
       "org.apache.spark" %% "spark-streaming" % "2.0.1",
       "org.apache.bahir" %% "spark-streaming-twitter" % "2.0.1"
    )
    
    Аналогично в Maven:
    <project>
      <groupId>com.example</groupId>
      <artifactId>simple-project</artifactId>
      <modelVersion>4.0.0</modelVersion>
      <name>Simple Project</name>
      <packaging>jar</packaging>
      <version>1.0</version>
      <properties>
        <spark.version>2.0.1</spark.version>
      </properties> 
      <dependencies>
        <dependency> <!-- Spark dependency -->
          <groupId>org.apache.spark</groupId>
          <artifactId>spark-core_2.11</artifactId>
          <version>${spark.version}</version>
        </dependency>
        <dependency>
          <groupId>org.apache.spark</groupId>
          <artifactId>spark-streaming_2.11</artifactId>
          <version>${spark.version}</version>
        </dependency> 
        <dependency>
          <groupId>org.apache.bahir</groupId>
          <artifactId>spark-streaming-twitter_2.11</artifactId>
          <version>${spark.version}</version>
        </dependency>
      </dependencies>
    </project>
    
  • Исправленная версия Все пакеты должны использовать тот же самый главный Spark версии (1.6, 2.0, 2.1, ...). Рассмотрим следующий (неверный) build.sbt:
    name := "Simple Project"
    
    version := "1.0"
    
    libraryDependencies ++= Seq(
       "org.apache.spark" % "spark-core_2.11" % "1.6.1",
       "org.apache.spark" % "spark-streaming_2.10" % "2.0.1",
       "org.apache.bahir" % "spark-streaming-twitter_2.11" % "2.0.1"
    )
    
    Мы используем spark-core 1.6, в то время как остальные компоненты находятся в Spark 2.0. Допустимым файлом может быть
    name := "Simple Project"
    
    version := "1.0"
    
    libraryDependencies ++= Seq(
       "org.apache.spark" % "spark-core_2.11" % "2.0.1",
       "org.apache.spark" % "spark-streaming_2.10" % "2.0.1",
       "org.apache.bahir" % "spark-streaming-twitter_2.11" % "2.0.1"
    )
    
    , но лучше использовать переменную:
    name := "Simple Project"
    
    version := "1.0"
    
    val sparkVersion = "2.0.1"
    
    libraryDependencies ++= Seq(
       "org.apache.spark" % "spark-core_2.11" % sparkVersion,
       "org.apache.spark" % "spark-streaming_2.10" % sparkVersion,
       "org.apache.bahir" % "spark-streaming-twitter_2.11" % sparkVersion
    )
    
    Аналогично в Maven:
    <project>
      <groupId>com.example</groupId>
      <artifactId>simple-project</artifactId>
      <modelVersion>4.0.0</modelVersion>
      <name>Simple Project</name>
      <packaging>jar</packaging>
      <version>1.0</version>
      <properties>
        <spark.version>2.0.1</spark.version>
        <scala.version>2.11</scala.version>
      </properties> 
      <dependencies>
        <dependency> <!-- Spark dependency -->
          <groupId>org.apache.spark</groupId>
          <artifactId>spark-core_${scala.version}</artifactId>
          <version>${spark.version}</version>
        </dependency>
        <dependency>
          <groupId>org.apache.spark</groupId>
          <artifactId>spark-streaming_${scala.version}</artifactId>
          <version>${spark.version}</version>
        </dependency> 
        <dependency>
          <groupId>org.apache.bahir</groupId>
          <artifactId>spark-streaming-twitter_${scala.version}</artifactId>
          <version>${spark.version}</version>
        </dependency>
      </dependencies>
    </project>
    
  • Исправленная версия, используемая в зависимостях Spark, должна соответствовать версии Spark установки Spark. Например, если вы используете 1.6.1 в кластере, вы должны использовать 1.6.1 для сборки банок. Неправильное рассогласование не всегда принимается.
  • Версия Scala, используемая для сборки jar, должна соответствовать версии Scala, используемой для создания развернутого Spark. По умолчанию (загружаемые двоичные файлы и сборки по умолчанию): Spark 1.x -> Scala 2.10 Spark 2.x -> Scala 2.11
  • Дополнительные пакеты должны быть доступны на рабочих узлах, если они включены в жирную банку. Существует множество опций, включая: --jars аргумент для spark-submit - для распространения локальных файлов jar. --packages для spark-submit - для получения зависимостей из репозитория Maven. При отправке в узел кластера вы должны включить приложение jar в --jars.
14
ответ дан user7337271 17 August 2018 в 23:41
поделиться

Классы зависимостей вашего приложения должны быть указаны в параметре application-jar вашей команды запуска.

Более подробную информацию можно найти в документации Spark

. Из документации:

application-jar: Путь к объединенной банке, включая ваше приложение и все зависимости. URL-адрес должен быть глобально видимым внутри вашего кластера, например, путь hdfs: // или путь к файлу: //, который присутствует на всех узлах

3
ответ дан clearlight 17 August 2018 в 23:41
поделиться

Я думаю, что эта проблема должна решить плагин сборки. Вам нужно построить жирную банку. Например, в sbt:

  • добавьте файл $PROJECT_ROOT/project/assembly.sbt с кодом addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.14.0")
  • в build.sbt added some libraries libraryDependencies ++ = Seq ("com.some. компания "%%" some-lib "%" 1.0.0 ")`
  • в консоли sbt введите «сборка» и разверните сборку jar

. Если вам нужно дополнительную информацию, перейдите к https://github.com/sbt/sbt-assembly

0
ответ дан Idriss Neumann 17 August 2018 в 23:41
поделиться
  • 1
    Хотя эта ссылка может ответить на вопрос, лучше включить здесь основные части ответа и предоставить ссылку для справки. Ответные ссылки могут стать недействительными, если связанная страница изменится. - Из обзора – Max Vollmer 20 June 2018 в 13:35

Путь классов Apache Spark построен динамически (для размещения кода пользователя для каждого приложения), что делает его уязвимым для таких проблем. @ user7337271 ответ правильный, но есть еще несколько проблем, в зависимости от используемого вами менеджера кластера («мастер»).

Во-первых, приложение Spark состоит из из этих компонентов (каждый из них является отдельным JVM, поэтому потенциально содержит разные классы в его пути к классам):

  1. Драйвер: это ваше приложение, создающее SparkSession (или SparkContext) и подключение к диспетчеру кластера для выполнения фактической работы
  2. Диспетчер кластеров: служит в качестве «точки входа» в кластер, которому поручено выделить исполнителей для каждого заявление. В Spark есть несколько разных типов: автономный, YARN и Mesos, которые мы опишем ниже.
  3. Исполнители: это процессы на узлах кластера, выполняющие фактическую работу (запуск Spark Задачи )

Соотношение между ними описано на этой диаграмме из обзора режима кластера Apache Spark :

Теперь - какие классы должны находиться в каждом из этих компонентов?

На это может ответить следующая диаграмма:

Давайте разберем это медленно:

  1. Spark Code - это библиотеки Spark. Они должны существовать в ALL трех компонентах, так как они включают клей, который позволяет Spark выполнять связь между ними. Кстати, авторы Spark внесли дизайнерское решение включить код для ВСЕХ компонентов во ВСЕХ компонентах (например, включить код, который должен запускаться только в Executor в драйвере), чтобы упростить это - так что «толстая банка Spark» (в версиях до 1.6 ) или «архив» (в версии 2.0, ниже) содержат необходимый код для всех компонентов и должны быть доступны во всех них.
  2. Код только для драйверов - это код пользователя, который не содержит ничего, что должно используется для исполнителей, то есть кода, который не используется при каких-либо преобразованиях в RDD / DataFrame / Dataset. Это необязательно должно быть отделено от распределенного кода пользователя, но это может быть.
  3. Распределенный код - это код пользователя, который скомпилирован с кодом драйвера, но также должен выполняться на исполнителях - все используемые фактические преобразования должны быть включены в эту банку.

Теперь, когда мы получили это прямо, как мы получаем, чтобы классы правильно загружались в каждом компоненте, и какими правилами они должны следовать?

  1. Spark Code : как утверждают предыдущие ответы, вы должны использовать те же версии Scala и Spark для всех компонентов. 1.1 В автономном режиме существует «ранее существовавшая» установка Spark, к которой могут подключаться приложения (драйверы). Это означает, что все драйверы должны использовать ту же версию Spark, что и на главном и исполнительном устройствах. 1.2 В YARN / Mesos каждое приложение может использовать другую версию Spark, но все компоненты одного и того же приложения должны использовать один и тот же. Это означает, что если вы использовали версию X для компиляции и упаковки вашего приложения-драйвера, вы должны предоставить такую ​​же версию при запуске SparkSession (например, через spark.yarn.archive или spark.yarn.jars параметры при использовании YARN). Банки / архив, которые вы предоставляете, должны включать все зависимости Spark (включая транзитивные зависимости), и они будут отправляться диспетчером кластера каждому исполнителю при запуске приложения.
  2. Код драйвера: это полностью до - драйвер код может быть отправлен в виде кучи банок или «толстой банки», если он включает все зависимости Spark + весь код пользователя
  3. Распределенный код: помимо присутствия в драйвере этот код должен быть отправлены исполнителям (опять же, вместе со всеми его транзитивными зависимостями). Это делается с использованием параметра spark.jars.

Подводя итог, рассмотрим предлагаемый подход к созданию и развертыванию Spark Application (в данном случае - с использованием YARN):

  • Создайте библиотеку с вашим распределенным кодом, упакуйте ее как «регулярную» банку (с файлом .pom, описывающим ее зависимости), так и как «живую банку» (со всеми включенными транзитными зависимостями).
  • Создайте приложение драйвера с зависимостями компиляции в вашей распределенной библиотеке кодов и Apache Spark (с определенной версией).
  • Пакет приложения драйвера в жирную банку, которая будет развернута в driver
  • Передайте правильную версию вашего распределенного кода в качестве значения параметра spark.jars при запуске SparkSession
  • Передайте местоположение файла архива (например, gzip), содержащего все банки в папке lib/ загруженных двоичных файлов Spark в качестве значения spark.yarn.archive
17
ответ дан Kenny John Jacob 17 August 2018 в 23:41
поделиться
3
ответ дан clearlight 6 September 2018 в 15:12
поделиться
3
ответ дан clearlight 29 October 2018 в 21:26
поделиться
Другие вопросы по тегам:

Похожие вопросы: