Вышеупомянутое решение не работает, поскольку оно не имеет значения «30». Здесь представлено решение с развернутым стилем, которое создает список, а не последовательность.
def unfoldRange(i: Int, j: Int, s: Int): List[Int] = {
if (i >= j) List(j)
else i :: unfoldRange(i+s,j,s)
}
У вас не будет доступа к объекту TestFile
, поскольку схема создается во время выполнения, а не предварительно компилируется. Если вы хотите сохранить этот POJO, вам понадобится конструктор для public TestFile(GenericRecord avroRecord)
. Вам нужно будет создать GenericRecord
, используя этот объект Schema
, так же, как если бы вы анализировали его из Строка или файл.
Например,
Schema schema = SchemaBuilder.record("TestFile")
.namespace("com.example.kafka.data.ingestion.model")
.fields()
.requiredLong("date")
.requiredInt("counter")
.requiredString("mc")
.endRecord();
GenericRecord entry1 = new GenericData.Record(schema);
entry1.put("date", 1L);
entry1.put("counter", 2);
entry1.put("mc", "3");
// producer.send(new ProducerRecord<>(topic, entry1);
Полный пример Кафки доступен из Confluent
Если вы не указали обязательное поле, оно Я выдам ошибку, и значения типов не проверяются (я мог бы поставить "counter", "2"
, и он отправил бы строковое значение (мне кажется, это ошибка). По сути, GenericRecord == HashMap<String, Object>
с дополнительным преимуществом требуемых / обнуляемых полей.
И вам нужно будет настроить сериализатор Avro, такой как Confluent, для которого требуется запуск реестра реестра, или версию, подобную Cloudera показывает
[1117 В противном случае вам необходимо преобразовать объект Avro вbyte[]
(как показано в вашей ссылке и просто использовать ByteArraySerializer