Резюме
blockquote>
blob:
для Chrome 8+, Firefox 6+, Safari 6.0+, Opera 15 +data:application/javascript
для Opera 10.60 - 12eval
в противном случае (IE 10 +)
URL.createObjectURL(
можно использовать для создания веб-рабочего из строки. Блаб может быть создан с использованием) BlobBuilder
API устарел или конструкторBlob
.Демонстрация: http: // jsfiddle .net / uqcFM / 49 /
// URL.createObjectURL window.URL = window.URL || window.webkitURL; // "Server response", used in all examples var response = "self.onmessage=function(e){postMessage('Worker: '+e.data);}"; var blob; try { blob = new Blob([response], {type: 'application/javascript'}); } catch (e) { // Backwards-compatibility window.BlobBuilder = window.BlobBuilder || window.WebKitBlobBuilder || window.MozBlobBuilder; blob = new BlobBuilder(); blob.append(response); blob = blob.getBlob(); } var worker = new Worker(URL.createObjectURL(blob)); // Test, used in all examples: worker.onmessage = function(e) { alert('Response: ' + e.data); }; worker.postMessage('Test');
Совместимость
Работники Web поддерживаются в следующих браузерах source :
- Chrome 3
- Firefox 3.5
- IE 10
- Opera 10.60
- Safari 4
Поддержка этого метода основана на поддержке API
Blob
и методаURL.createObjectUrl
.Blob
Совместимость :
- Chrome 8+ (
WebKitBlobBuilder
), конструктор 20+ (Blob
)- Firefox 6+ (
MozBlobBuilder
), конструктор 13+ (Blob
)- Конструктор Safari 6+ (
Blob
)IE10 поддерживает
MSBlobBuilder
иURL.createObjectURL
. Тем не менее, попытка создания веб-рабочего изblob:
-URL выдает SecurityError.Opera 12 не поддерживает API
URL
. Некоторые пользователи могут иметь поддельную версию объектаURL
, благодаря этому взлому вbrowser.js
.Падение 1: data-URI
Opera поддерживает URI-данные в качестве аргумента для конструктора
Worker
. Примечание. Не забудьте, чтобы выбрали специальные символы (такие как#
и%
).// response as defined in the first example var worker = new Worker('data:application/javascript,' + encodeURIComponent(response) ); // ... Test as defined in the first example
Демо: http://jsfiddle.net / uqcFM / 37 /
Fallback 2: Eval
eval
может использоваться как резерв для Safari (& lt; 6) и IE 10.// Worker-helper.js self.onmessage = function(e) { self.onmessage = null; // Clean-up eval(e.data); }; // Usage: var worker = new Worker('Worker-helper.js'); // `response` as defined in the first example worker.postMessage(response); // .. Test as defined in the first example
Я бы предложил добавить контрольный код к вашим двум SourceFunctions
и использовать MiniClusterWithClientResource
. Это может выглядеть следующим образом:
public class JobITCase {
private static final int NUM_TMS = 2;
private static final int NUM_SLOTS = 2;
private static final int PARALLELISM = NUM_SLOTS * NUM_TMS;
@ClassRule
public final static MiniClusterWithClientResource MINI_CLUSTER_WITH_CLIENT_RESOURCE = new MiniClusterWithClientResource(
new MiniClusterResourceConfiguration.Builder()
.setNumberSlotsPerTaskManager(NUM_SLOTS)
.setNumberTaskManagers(NUM_TMS)
.build());
@Test
public void testJob() throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(PARALLELISM);
final MyControllableSourceFunction source1 = new MyControllableSourceFunction("source1");
final MyControllableSourceFunction source2 = new MyControllableSourceFunction("source2");
final DataStreamSource<Integer> input1 = env.addSource(source1);
final DataStreamSource<Integer> input2 = env.addSource(source2);
input1.connect(input2).map(new CoMapFunction<Integer, Integer, Integer>() {
@Override
public Integer map1(Integer integer) {
System.out.println("Input 1: " + integer);
return integer;
}
@Override
public Integer map2(Integer integer) {
System.out.println("Input 2: " + integer);
return integer;
}
}).print();
final JobGraph jobGraph = env.getStreamGraph().getJobGraph();
MINI_CLUSTER_WITH_CLIENT_RESOURCE.getMiniCluster().submitJob(jobGraph).get();
final CompletableFuture<JobResult> jobResultFuture = MINI_CLUSTER_WITH_CLIENT_RESOURCE.getMiniCluster().requestJobResult(jobGraph.getJobID());
final ArrayList<CompletableFuture<Void>> finishedFutures = new ArrayList<>(PARALLELISM);
for (int i = 0; i < PARALLELISM; i++) {
MyControllableSourceFunction.startExecution(source1, i);
finishedFutures.add(MyControllableSourceFunction.getFinishedFuture(source1, i));
}
FutureUtils.waitForAll(finishedFutures).join();
for (int i = 0; i < PARALLELISM; i++) {
MyControllableSourceFunction.startExecution(source2, i);
}
jobResultFuture.join();
}
private static class MyControllableSourceFunction extends RichParallelSourceFunction<Integer> {
private static final ConcurrentMap<String, CountDownLatch> startLatches = new ConcurrentHashMap<>();
private static final ConcurrentMap<String, CompletableFuture<Void>> finishedFutures = new ConcurrentHashMap<>();
private final String name;
private boolean running = true;
private MyControllableSourceFunction(String name) {
this.name = name;
}
@Override
public void run(SourceContext<Integer> sourceContext) throws Exception {
final int index = getRuntimeContext().getIndexOfThisSubtask();
final CountDownLatch startLatch = startLatches.computeIfAbsent(getId(index), ignored -> new CountDownLatch(1));
final CompletableFuture<Void> finishedFuture = finishedFutures.computeIfAbsent(getId(index), ignored -> new CompletableFuture<>());
startLatch.await();
int counter = 0;
while (running && counter < 10) {
synchronized (sourceContext.getCheckpointLock()) {
sourceContext.collect(counter++);
}
}
finishedFuture.complete(null);
}
@Override
public void cancel() {
running = false;
}
private String getId(int index) {
return name + '_' + index;
}
static void startExecution(MyControllableSourceFunction source, int index) {
final CountDownLatch startLatch = startLatches.computeIfAbsent(source.getId(index), ignored -> new CountDownLatch(1));
startLatch.countDown();
}
static CompletableFuture<Void> getFinishedFuture(MyControllableSourceFunction source, int index) {
return finishedFutures.computeIfAbsent(source.getId(index), ignored -> new CompletableFuture<>());
}
}
}