Вам нужно будет внедрить какой-нибудь Mutex. В основном класс, который ставит в очередь вещи для синхронного выполнения. Пример
var Mutex = function() {
this.queue = [];
this.locked = false;
};
Mutex.prototype.enqueue = function(task) {
this.queue.push(task);
if (!this.locked) {
this.dequeue();
}
};
Mutex.prototype.dequeue = function() {
this.locked = true;
const task = this.queue.shift();
if (task) {
this.execute(task);
} else {
this.locked = false;
}
};
Mutex.prototype.execute = async function(task) {
try { await task(); } catch (err) { }
this.dequeue();
}
Для того, чтобы это работало, ваша функция applyMessage
(в зависимости от того, что обрабатывает сообщения Kafka) должна вернуть Promise
- обратите внимание, что асинхронность переместилась из родительской функции в возвращенное Promise. function:
function applyMessage(message: kafka.Message) {
return new Promise(async function(resolve,reject) {
try {
const usersCount = await db('users').count()
// just assume we ABSOLUTELY need to calculate a number of users,
// so we need previous state
await db('users').insert(inferUserFromMessage(message))
resolve();
} catch (err) {
reject(err);
}
});
}
Наконец, каждый вызов applyMessage
должен добавляться в очередь Mutex, а не вызываться напрямую:
var mutex = new Mutex();
consumer.on('message', message => mutex.enqueue(function() { return applyMessage(message); }))
Хорошо, это - хитрое. Я только что закончил проект, где мы пользовались этой библиотекой с открытым исходным кодом для отображения HTML: http://code.google.com/p/flex-htmlfilter
Это обрабатывает списки, таблицы, и т.д., и довольно легко работать с. Но, я не думаю, что это поддерживает фирменный символ. Это действительно поддерживает некоторых из других, все же.
Я использую его, чтобы вытянуть содержание от CMS и отобразиться в моем фильме флэш-памяти. Это работало отлично и позволяет мне всю гибкость моделирования флэш-памяти, в которой я нуждаюсь.