Flink - интеграционное тестирование с несколькими источниками

Резюме

  • blob: для Chrome 8+, Firefox 6+, Safari 6.0+, Opera 15 +
  • data:application/javascript для Opera 10.60 - 12
  • eval в противном случае (IE 10 +)
blockquote>

URL.createObjectURL() можно использовать для создания веб-рабочего из строки. Блаб может быть создан с использованием BlobBuilder API устарел или конструктор Blob .

Демонстрация: http: // jsfiddle .net / uqcFM / 49 /

// URL.createObjectURL
window.URL = window.URL || window.webkitURL;

// "Server response", used in all examples
var response = "self.onmessage=function(e){postMessage('Worker: '+e.data);}";

var blob;
try {
    blob = new Blob([response], {type: 'application/javascript'});
} catch (e) { // Backwards-compatibility
    window.BlobBuilder = window.BlobBuilder || window.WebKitBlobBuilder || window.MozBlobBuilder;
    blob = new BlobBuilder();
    blob.append(response);
    blob = blob.getBlob();
}
var worker = new Worker(URL.createObjectURL(blob));

// Test, used in all examples:
worker.onmessage = function(e) {
    alert('Response: ' + e.data);
};
worker.postMessage('Test');

Совместимость

Работники Web поддерживаются в следующих браузерах source :

  • Chrome 3
  • Firefox 3.5
  • IE 10
  • Opera 10.60
  • Safari 4

Поддержка этого метода основана на поддержке API Blob и метода URL.createObjectUrl. Blob Совместимость :

  • Chrome 8+ (WebKitBlobBuilder), конструктор 20+ (Blob)
  • Firefox 6+ ( MozBlobBuilder), конструктор 13+ (Blob)
  • Конструктор Safari 6+ (Blob)

IE10 поддерживает MSBlobBuilder и URL.createObjectURL. Тем не менее, попытка создания веб-рабочего из blob: -URL выдает SecurityError.

Opera 12 не поддерживает API URL. Некоторые пользователи могут иметь поддельную версию объекта URL, благодаря этому взлому в browser.js .

Падение 1: data-URI

Opera поддерживает URI-данные в качестве аргумента для конструктора Worker. Примечание. Не забудьте, чтобы выбрали специальные символы (такие как # и %).

// response as defined in the first example
var worker = new Worker('data:application/javascript,' +
                        encodeURIComponent(response) );
// ... Test as defined in the first example

Демо: http://jsfiddle.net / uqcFM / 37 /

Fallback 2: Eval

eval может использоваться как резерв для Safari (& lt; 6) и IE 10.

// Worker-helper.js
self.onmessage = function(e) {
    self.onmessage = null; // Clean-up
    eval(e.data);
};
// Usage:
var worker = new Worker('Worker-helper.js');
// `response` as defined in the first example
worker.postMessage(response);
// .. Test as defined in the first example
3
задан Andrew Briggs 27 February 2019 в 15:05
поделиться

1 ответ

Я бы предложил добавить контрольный код к вашим двум SourceFunctions и использовать MiniClusterWithClientResource. Это может выглядеть следующим образом:

public class JobITCase {

    private static final int NUM_TMS = 2;
    private static final int NUM_SLOTS = 2;
    private static final int PARALLELISM = NUM_SLOTS * NUM_TMS;

    @ClassRule
    public final static MiniClusterWithClientResource MINI_CLUSTER_WITH_CLIENT_RESOURCE = new MiniClusterWithClientResource(
            new MiniClusterResourceConfiguration.Builder()
                .setNumberSlotsPerTaskManager(NUM_SLOTS)
                .setNumberTaskManagers(NUM_TMS)
                .build());

    @Test
    public void testJob() throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(PARALLELISM);

        final MyControllableSourceFunction source1 = new MyControllableSourceFunction("source1");
        final MyControllableSourceFunction source2 = new MyControllableSourceFunction("source2");

        final DataStreamSource<Integer> input1 = env.addSource(source1);
        final DataStreamSource<Integer> input2 = env.addSource(source2);

        input1.connect(input2).map(new CoMapFunction<Integer, Integer, Integer>() {
            @Override
            public Integer map1(Integer integer) {
                System.out.println("Input 1: " + integer);
                return integer;
            }

            @Override
            public Integer map2(Integer integer) {
                System.out.println("Input 2: " + integer);
                return integer;
            }
        }).print();

        final JobGraph jobGraph = env.getStreamGraph().getJobGraph();

        MINI_CLUSTER_WITH_CLIENT_RESOURCE.getMiniCluster().submitJob(jobGraph).get();

        final CompletableFuture<JobResult> jobResultFuture = MINI_CLUSTER_WITH_CLIENT_RESOURCE.getMiniCluster().requestJobResult(jobGraph.getJobID());

        final ArrayList<CompletableFuture<Void>> finishedFutures = new ArrayList<>(PARALLELISM);

        for (int i = 0; i < PARALLELISM; i++) {
            MyControllableSourceFunction.startExecution(source1, i);
            finishedFutures.add(MyControllableSourceFunction.getFinishedFuture(source1, i));
        }

        FutureUtils.waitForAll(finishedFutures).join();

        for (int i = 0; i < PARALLELISM; i++) {
            MyControllableSourceFunction.startExecution(source2, i);
        }

        jobResultFuture.join();
    }

    private static class MyControllableSourceFunction extends RichParallelSourceFunction<Integer> {

        private static final ConcurrentMap<String, CountDownLatch> startLatches = new ConcurrentHashMap<>();
        private static final ConcurrentMap<String, CompletableFuture<Void>> finishedFutures = new ConcurrentHashMap<>();

        private final String name;

        private boolean running = true;

        private MyControllableSourceFunction(String name) {
            this.name = name;
        }

        @Override
        public void run(SourceContext<Integer> sourceContext) throws Exception {
            final int index = getRuntimeContext().getIndexOfThisSubtask();

            final CountDownLatch startLatch = startLatches.computeIfAbsent(getId(index), ignored -> new CountDownLatch(1));
            final CompletableFuture<Void> finishedFuture = finishedFutures.computeIfAbsent(getId(index), ignored -> new CompletableFuture<>());

            startLatch.await();
            int counter = 0;

            while (running && counter < 10) {
                synchronized (sourceContext.getCheckpointLock()) {
                    sourceContext.collect(counter++);
                }
            }

            finishedFuture.complete(null);
        }

        @Override
        public void cancel() {
            running = false;
        }

        private String getId(int index) {
            return name + '_' + index;
        }

        static void startExecution(MyControllableSourceFunction source, int index) {
            final CountDownLatch startLatch = startLatches.computeIfAbsent(source.getId(index), ignored -> new CountDownLatch(1));
            startLatch.countDown();
        }

        static CompletableFuture<Void> getFinishedFuture(MyControllableSourceFunction source, int index) {
            return finishedFutures.computeIfAbsent(source.getId(index), ignored -> new CompletableFuture<>());
        }
    }
}
0
ответ дан Till Rohrmann 27 February 2019 в 15:05
поделиться
Другие вопросы по тегам:

Похожие вопросы: