Avro и Kafka, используя SchemaBuilder

Вышеупомянутое решение не работает, поскольку оно не имеет значения «30». Здесь представлено решение с развернутым стилем, которое создает список, а не последовательность.

def unfoldRange(i: Int, j: Int, s: Int): List[Int] = {
  if (i >= j) List(j)
  else i :: unfoldRange(i+s,j,s)
}
3
задан Giorgos Myrianthous 15 January 2019 в 11:21
поделиться

1 ответ

У вас не будет доступа к объекту TestFile, поскольку схема создается во время выполнения, а не предварительно компилируется. Если вы хотите сохранить этот POJO, вам понадобится конструктор для public TestFile(GenericRecord avroRecord)

. Вам нужно будет создать GenericRecord, используя этот объект Schema, так же, как если бы вы анализировали его из Строка или файл.

Например,

Schema schema = SchemaBuilder.record("TestFile")
            .namespace("com.example.kafka.data.ingestion.model")
            .fields()
            .requiredLong("date")
            .requiredInt("counter")
            .requiredString("mc")
            .endRecord();

GenericRecord entry1 = new GenericData.Record(schema);
entry1.put("date", 1L);
entry1.put("counter", 2);
entry1.put("mc", "3");

// producer.send(new ProducerRecord<>(topic, entry1);

Полный пример Кафки доступен из Confluent

Если вы не указали обязательное поле, оно Я выдам ошибку, и значения типов не проверяются (я мог бы поставить "counter", "2", и он отправил бы строковое значение (мне кажется, это ошибка). По сути, GenericRecord == HashMap<String, Object> с дополнительным преимуществом требуемых / обнуляемых полей.

И вам нужно будет настроить сериализатор Avro, такой как Confluent, для которого требуется запуск реестра реестра, или версию, подобную Cloudera показывает

[1117 В противном случае вам необходимо преобразовать объект Avro в byte[] (как показано в вашей ссылке и просто использовать ByteArraySerializer

0
ответ дан cricket_007 15 January 2019 в 11:21
поделиться
Другие вопросы по тегам:

Похожие вопросы: