Я записал фон InputStream
(и OutputStream
) реализации, которые переносят другие потоки и читают вперед на фоновом потоке, прежде всего, позволяя, чтобы распаковка/сжатие произошла в различных потоках от обработки распакованного потока.
Это - довольно стандартная модель производителя/потребителя.
Это походит на простой способ хорошо использовать многоядерные центральные процессоры с простыми процессами, которые читают, обрабатывают, и данные записи, допуская более эффективное использование и ЦП и дисковых ресурсов. Возможно, 'эффективный' не лучшее слово, но оно обеспечивает более высокое использование, и более интересный для меня, уменьшенного времени выполнения, по сравнению с чтением непосредственно из a ZipInputStream
и запись непосредственно в a ZipOutputStream
.
Я рад отправить код, но мой вопрос состоит в том, переосмысливаю ли я что-то легко доступное в существующем (и в большей степени осуществленный) библиотеки?
Редактирование - отправляющий код...
Мой код для BackgroundInputStream
ниже ( BackgroundOutputStream
очень похоже), но существуют аспекты его, что я хотел бы улучшиться.
BackgroundInputStream
, backgroundReaderThread
будет бродить вокруг навсегда.eof
улучшение потребностей.Executor
.close()
метод должен сигнализировать о фоновом потоке и не должен закрывать перенесенный поток, поскольку перенесенный поток должен принадлежать фоновому потоку, который читает из него.package nz.co.datacute.io;
import java.io.IOException;
import java.io.InputStream;
import java.util.Arrays;
import java.util.concurrent.LinkedBlockingQueue;
public class BackgroundInputStream extends InputStream {
private static final int DEFAULT_QUEUE_SIZE = 1;
private static final int DEFAULT_BUFFER_SIZE = 64*1024;
private final int queueSize;
private final int bufferSize;
private volatile boolean eof = false;
private LinkedBlockingQueue<byte[]> bufferQueue;
private final InputStream wrappedInputStream;
private byte[] currentBuffer;
private volatile byte[] freeBuffer;
private int pos;
public BackgroundInputStream(InputStream wrappedInputStream) {
this(wrappedInputStream, DEFAULT_QUEUE_SIZE, DEFAULT_BUFFER_SIZE);
}
public BackgroundInputStream(InputStream wrappedInputStream,int queueSize,int bufferSize) {
this.wrappedInputStream = wrappedInputStream;
this.queueSize = queueSize;
this.bufferSize = bufferSize;
}
@Override
public int read() throws IOException {
if (bufferQueue == null) {
bufferQueue = new LinkedBlockingQueue<byte[]>(queueSize);
BackgroundReader backgroundReader = new BackgroundReader();
Thread backgroundReaderThread = new Thread(backgroundReader, "Background InputStream");
backgroundReaderThread.start();
}
if (currentBuffer == null) {
try {
if ((!eof) || (bufferQueue.size() > 0)) {
currentBuffer = bufferQueue.take();
pos = 0;
} else {
return -1;
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
int b = currentBuffer[pos++];
if (pos == currentBuffer.length) {
freeBuffer = currentBuffer;
currentBuffer = null;
}
return b;
}
@Override
public int available() throws IOException {
if (currentBuffer == null) return 0;
return currentBuffer.length;
}
@Override
public void close() throws IOException {
wrappedInputStream.close();
currentBuffer = null;
freeBuffer = null;
}
class BackgroundReader implements Runnable {
@Override
public void run() {
try {
while (!eof) {
byte[] newBuffer;
if (freeBuffer != null) {
newBuffer = freeBuffer;
freeBuffer = null;
} else {
newBuffer = new byte[bufferSize];
}
int bytesRead = 0;
int writtenToBuffer = 0;
while (((bytesRead = wrappedInputStream.read(newBuffer, writtenToBuffer, bufferSize - writtenToBuffer)) != -1) && (writtenToBuffer < bufferSize)) {
writtenToBuffer += bytesRead;
}
if (writtenToBuffer > 0) {
if (writtenToBuffer < bufferSize) {
newBuffer = Arrays.copyOf(newBuffer, writtenToBuffer);
}
bufferQueue.put(newBuffer);
}
if (bytesRead == -1) {
eof = true;
}
}
} catch (IOException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
Звучит интересно. Я никогда не сталкивался ни с чем, что делает это из коробки, но имеет смысл попробовать использовать незадействованное ядро для сжатия, если оно доступно.
Возможно, вы могли бы использовать Commons I/O - это хорошо протестированная библиотека, которая может помочь справиться с некоторыми более скучными вещами и позволить вам сосредоточиться на расширении крутых параллельных частей. Может быть, вы даже могли бы внести свой код в проект Commons ;-)
.Мне было бы интересно. Я продумывал аналогичный проект, но не мог понять, как обращаться с частями, которые завершают сжатие не в порядке.