Я не совсем уверен, что вы пытаетесь сделать в IdlingWrapper, но я думаю, что текущая реализация очень хрупкая.
Я думаю, что самое важное, что должно произойти, - это гарантировать наблюдаемый может быть вызван только один раз.
Вот краткая реализация, чтобы продемонстрировать это, а также мою реализацию wrapObservable.
public class Test {
private static int counter = 0;
private static final List<Observable<?>> list = Collections.synchronizedList(new ArrayList<>());
protected static <T> Observable<T> wrapObservable(final Observable<T> original) {
// run atleast once???
synchronized (list) {
list.add(original);
}
return Observable.create(new Observable.OnSubscribe<Void>() {
@Override
public void call(Subscriber<? super Void> subscriber) {
synchronized (list) {
counter++;
if (!list.contains(original)) {
subscriber.onError(new Exception("You can only subscribe once!"));
return;
}
list.remove(original);
}
// Sleep to make it easier to see things happening...
try {
Thread.sleep(3000);
} catch (InterruptedException ignored) {
}
subscriber.onCompleted();
}
}).flatMap(new Func1<Void, Observable<? extends T>>() {
@Override
public Observable<? extends T> call(Void o) {
return original;
}
}).finallyDo(new Action0() {
@Override
public void call() {
synchronized (list) {
counter--;
if (list.size() == 0 && counter == 0) {
System.err.println("finally");
}
}
}
});
}
public static void main(String[] args) throws InterruptedException {
for(int i = 0; i < 10; i++) {
// running in io thread for simulating async call.
Observable<String> test = wrapObservable(Observable.from("TEST!!!!!!")).subscribeOn(Schedulers.io());
test.subscribe(new Observer<String>() {
@Override
public void onCompleted() {
System.err.println("completed");
}
@Override
public void onError(Throwable e) {
System.err.println("error");
}
@Override
public void onNext(String s) {
System.err.println("next");
}
});
// example of calling the same observable twice.
test.subscribe(new Observer<String>() {
@Override
public void onCompleted() {
System.err.println("completed");
}
@Override
public void onError(Throwable e) {
System.err.println("error");
}
@Override
public void onNext(String s) {
System.err.println("next");
}
});
}
Thread.sleep(10000);
}
}
Я знаю, что несколько человек столкнутся с этим и найдут, что ничто здесь не помогает. Самым легким путем я нашел, чтобы сделать это, похож на это:
dict_count = len(dict_list)
df = pd.DataFrame(dict_list[0], index=[0])
for i in range(1,dict_count-1):
df = df.append(dict_list[i], ignore_index=True)
Hope это помогает кому-то!