Я думаю, что, возможно, я разработал для вас ответ некоторое время назад, и он называется scramjet
. Он легкий (нет тысяч зависимостей в node_modules
), прост в использовании и делает ваш код очень простым для понимания и чтения.
Давайте начнем с вашего случая:
DataStream
.from(getDocs(10000))
.use(stream => {
let counter = 0;
const items = new DataStream();
const out = new DataStream();
stream
.peek(1, async ([first]) => out.whenWrote(first))
.batch(100)
.reduce(async (acc, result) => {
await items.whenWrote(result);
return result[result.length - 1];
}, null)
.then((last) => out.whenWrote(last))
.then(() => items.end());
items
.setOptions({ maxParallel: 1 })
.do(arr => counter += arr.length)
.each(batch => writeDataToSocketIo(batch))
.run()
.then(() => (out.end(counter)))
;
return out;
})
.toArray()
.then(([first, last, count]) => ({ first, count, last }))
.then(console.log)
;
Так что я не совсем согласен, что javascript FRP является антипаттерном, и я не думаю, что у меня есть единственный ответ на этот вопрос, но при разработке первого коммиты Я обнаружил, что синтаксис стрелок ES6 и async / await, написанные в цепочке, делают код легко понятным.
Вот еще один пример кода scramjet из OpenAQ , в частности, этой строки в процессе их выборки :
return DataStream.fromArray(Object.values(sources))
// flatten the sources
.flatten()
// set parallel limits
.setOptions({maxParallel: maxParallelAdapters})
// filter sources - if env is set then choose only matching source,
// otherwise filter out inactive sources.
// * inactive sources will be run if called by name in env.
.use(chooseSourcesBasedOnEnv, env, runningSources)
// mark sources as started
.do(markSourceAs('started', runningSources))
// get measurements object from given source
// all error handling should happen inside this call
.use(fetchCorrectedMeasurementsFromSourceStream, env)
// perform streamed save to DB and S3 on each source.
.use(streamMeasurementsToDBAndStorage, env)
// mark sources as finished
.do(markSourceAs('finished', runningSources))
// convert to measurement report format for storage
.use(prepareCompleteResultsMessage, fetchReport, env)
// aggregate to Array
.toArray()
// save fetch log to DB and send a webhook if necessary.
.then(
reportAndRecordFetch(fetchReport, sources, env, apiURL, webhookKey)
);
Он описывает все, что происходит с каждым источником данных. , Итак, вот мое предложение для допроса. :)
Вот то, как я выяснил, как перечислить все файлы Проекта TFS:
Добавьте Microsoft. TeamFoundation. Клиент и Microsoft. TeamFoundation. VersionControl. Клиент как ссылка на Ваш проект.
Добавьте использование для Microsoft. TeamFoundation. Клиент и Microsoft. TeamFoundation. VersionControl. Клиент
TeamFoundationServer server = new TeamFoundationServer("server");
VersionControlServer version = server.GetService(typeof(VersionControlServer)) as VersionControlServer;
ItemSet items = version.GetItems(@"$\ProjectName", RecursionType.Full);
ItemSet items = version.GetItems(@"$\ProjectName\FileName.cs", RecursionType.Full);
foreach (Item item in items.Items)
{
System.Console.WriteLine(item.ServerItem);
}
Второй GetItems ограничит объекты, найденные теми из определенного имени файла. У меня просто есть этот образец, производящий путь сервера для всех найденных файлов.