nodejs функциональное программирование с генераторами и обещаниями

двойная кавычка пользователя, чтобы получить точное значение. например:

echo "${var}"

, и он правильно прочитает ваше значение.

4
задан Muayyad Alsadi 18 January 2019 в 13:47
поделиться

2 ответа

Я думаю, что, возможно, я разработал для вас ответ некоторое время назад, и он называется scramjet . Он легкий (нет тысяч зависимостей в node_modules), прост в использовании и делает ваш код очень простым для понимания и чтения.

Давайте начнем с вашего случая:

DataStream
    .from(getDocs(10000))
    .use(stream => {
        let counter = 0;

        const items = new DataStream();
        const out = new DataStream();

        stream
            .peek(1, async ([first]) => out.whenWrote(first))
            .batch(100)
            .reduce(async (acc, result) => {
                await items.whenWrote(result);

                return result[result.length - 1];
            }, null)
            .then((last) => out.whenWrote(last))
            .then(() => items.end());

        items
            .setOptions({ maxParallel: 1 })
            .do(arr => counter += arr.length)
            .each(batch => writeDataToSocketIo(batch))
            .run()
            .then(() => (out.end(counter)))
        ;

        return out;
    })
    .toArray()
    .then(([first, last, count]) => ({ first, count, last }))
    .then(console.log)
;

Так что я не совсем согласен, что javascript FRP является антипаттерном, и я не думаю, что у меня есть единственный ответ на этот вопрос, но при разработке первого коммиты Я обнаружил, что синтаксис стрелок ES6 и async / await, написанные в цепочке, делают код легко понятным.

Вот еще один пример кода scramjet из OpenAQ , в частности, этой строки в процессе их выборки :

return DataStream.fromArray(Object.values(sources))
  // flatten the sources
  .flatten()
  // set parallel limits
  .setOptions({maxParallel: maxParallelAdapters})
  // filter sources - if env is set then choose only matching source,
  //   otherwise filter out inactive sources.
  // * inactive sources will be run if called by name in env.
  .use(chooseSourcesBasedOnEnv, env, runningSources)
  // mark sources as started
  .do(markSourceAs('started', runningSources))
  // get measurements object from given source
  // all error handling should happen inside this call
  .use(fetchCorrectedMeasurementsFromSourceStream, env)
  // perform streamed save to DB and S3 on each source.
  .use(streamMeasurementsToDBAndStorage, env)
  // mark sources as finished
  .do(markSourceAs('finished', runningSources))
  // convert to measurement report format for storage
  .use(prepareCompleteResultsMessage, fetchReport, env)
  // aggregate to Array
  .toArray()
  // save fetch log to DB and send a webhook if necessary.
  .then(
    reportAndRecordFetch(fetchReport, sources, env, apiURL, webhookKey)
  );

Он описывает все, что происходит с каждым источником данных. , Итак, вот мое предложение для допроса. :)

0
ответ дан Michał Kapracki 18 January 2019 в 13:47
поделиться

Я не уверен, что было бы справедливо подразумевать, что функциональное программирование могло предложить какие-либо преимущества перед императивным программированием с точки зрения производительности при работе с огромным количеством данных.

Я думаю, вам нужно добавить еще один инструмент в ваш инструментарий, и это может быть RxJS .

RxJS - это библиотека для составления асинхронных и событийных программ с использованием наблюдаемых последовательностей.

Если вы не знакомы с RxJS или реактивным программированием в целом, мои примеры определенно будут выглядеть странно, но я думаю, что было бы неплохо познакомиться с этими концепциями [1114 ]

В вашем случае наблюдаемая последовательность - это ваш экземпляр MongoDB, который со временем генерирует записи.

Я собираюсь подделать вашу базу данных:

var db = range(1, 5);

Функция range - это функция RxJS, которая будет выдавать значение в указанном диапазоне.

db.subscribe(n => {
  console.log(`record ${n}`);
});

//=> record 1
//=> record 2
//=> record 3
//=> record 4
//=> record 5

Теперь меня интересуют только первая и последняя запись.

Я могу создать наблюдаемую, которая будет излучать только первую запись, и создать другую, которая будет излучать только последнюю:

var db = range(1, 5);
var firstRecord = db.pipe(first());
var lastRecord = db.pipe(last());

merge(firstRecord, lastRecord).subscribe(n => {
  console.log(`record ${n}`);
});
//=> record 1
//=> record 5

Однако мне также нужно обрабатывать все записи партиями: (в в этом примере я собираюсь создать партии по 10 записей в каждой)

var db = range(1, 100);
var batches = db.pipe(bufferCount(10))
var firstRecord = db.pipe(first());
var lastRecord = db.pipe(last());

merge(firstRecord, batches, lastRecord).subscribe(n => {
  console.log(`record ${n}`);
});

//=> record 1
//=> record 1,2,3,4,5,6,7,8,9,10
//=> record 11,12,13,14,15,16,17,18,19,20
//=> record 21,22,23,24,25,26,27,28,29,30
//=> record 31,32,33,34,35,36,37,38,39,40
//=> record 41,42,43,44,45,46,47,48,49,50
//=> record 51,52,53,54,55,56,57,58,59,60
//=> record 61,62,63,64,65,66,67,68,69,70
//=> record 71,72,73,74,75,76,77,78,79,80
//=> record 81,82,83,84,85,86,87,88,89,90
//=> record 91,92,93,94,95,96,97,98,99,100
//=> record 100

Как вы можете видеть в выводе, он выпустил:

  1. Первая запись
  2. [ 118] Десять партий по 10 записей в каждой
  3. Последняя запись

Я не буду пытаться решить ваше упражнение для вас, и я не слишком знаком с RxJS, чтобы расширять тоже много об этом.

Я просто хотел показать вам другой способ и дать вам понять, что это можно совместить с функциональным программированием.

Надеюсь, это поможет.

0
ответ дан customcommander 18 January 2019 в 13:47
поделиться
Другие вопросы по тегам:

Похожие вопросы: