Вы можете попробовать с моим предложением - с помощью switchMap
pipe
let isAlreadyExist = false; // "global" variable :)
s$.pipe(
map((x) => x + 1),
switchMap(async (x) => {
isAlreadyExist = await alreadyExist(x); // make sure alreadyExist is a promise func
return x; // "do" do nothing with your data
}),
filter((x) => isAlreadyExist)) // isAlreadyExist is value of alreadyExist func
.subscribe(...)
или вы можете прочитать о defer
из RxJS Observable
.