Вы должны прочитать немного о том, как работает потребление сообщений в Kafka. Вот ссылка на потребительский раздел официальных документов Kafka: https://kafka.apache.org/documentation/#theconsumer
По сути, в Kafka сообщения удаляются только после того, как достаточно время прошло, и это настроено с помощью log.retention.hours
, log.retention.minutes
и log.retention.ms
, как сказал @Amin.
В Kafka любое количество потребителей может начать получать сообщения из любой темы в любой момент, независимо от того, потребляют ли другие потребители эту же тему. Кафка отслеживает, где каждый потребитель в каждой теме / разделе использует смещения, которые хранятся в самой Кафке. Таким образом, если вашему потребителю нужно принять сообщение 100, как вы описали в своем вопросе, вы можете просто «перемотать» назад на желаемое сообщение и снова начать нормально потреблять. Не имеет значения, употребляли ли вы его ранее, или другие пользователи читают эту тему или нет.
Из официальных документов Кафки:
Потребитель может сознательно вернуться назад к старому смещению и повторно использовать данные. Это нарушает общий контракт очереди, но оказывается важной особенностью для многих потребителей. Например, если код потребителя содержит ошибку и обнаруживается после использования некоторых сообщений, потребитель может повторно использовать эти сообщения после исправления ошибки.
BLOCKQUOTE>
Это уже находится в платформе: Счетный. Диапазон .
Для других типов, Вы могли бы интересоваться классами диапазона в моем библиотека MiscUtil.
Вот идея, которая позволяет классу диапазона работать с обеими вещами, которые дискретны и те, которые не являются:
class Range<T> where T: IComparable<T>
{
public T From { get; set; }
public T To { get; set; }
public Range(T from, T to) { this.From = from; this.To = to; }
public IEnumerable<T> Enumerate(Func<T, T> next)
{
for (T t = this.From; t.CompareTo(this.To) < 0; t = next(t))
{
yield return t;
}
}
static void Example()
{
new Range<int> (0, 100).Enumerate(i => i+1)
}
}
Поочередно, быстрый интерфейс из дополнительных методов:
public static IEnumerable<int> To(this int start, int end)
{
return start.To(end, i => i + 1);
}
public static IEnumerable<int> To(this int start, int end, Func<int, int> next)
{
int current = start;
while (current < end)
{
yield return current;
current = next(current);
}
}
используемый как:
1.To(100)
И если Вы думаете, что, предоставляя перечислитель каждый раз является раздражающим, вот производный класс:
class EnumerableRange<T> : Range<T>, IEnumerable<T>
where T : IComparable<T>
{
readonly Func<T, T> _next;
public EnumerableRange(T from, T to, Func<T, T> next)
: base(from, to)
{
this._next = next;
}
public IEnumerator<T> GetEnumerator()
{
return Enumerate(this._next).GetEnumerator();
}
IEnumerator IEnumerable.GetEnumerator()
{
return this.GetEnumerator();
}
}