Буферизированный фон реализации InputStream

Я записал фон InputStreamOutputStream) реализации, которые переносят другие потоки и читают вперед на фоновом потоке, прежде всего, позволяя, чтобы распаковка/сжатие произошла в различных потоках от обработки распакованного потока.

Это - довольно стандартная модель производителя/потребителя.

Это походит на простой способ хорошо использовать многоядерные центральные процессоры с простыми процессами, которые читают, обрабатывают, и данные записи, допуская более эффективное использование и ЦП и дисковых ресурсов. Возможно, 'эффективный' не лучшее слово, но оно обеспечивает более высокое использование, и более интересный для меня, уменьшенного времени выполнения, по сравнению с чтением непосредственно из a ZipInputStream и запись непосредственно в a ZipOutputStream.

Я рад отправить код, но мой вопрос состоит в том, переосмысливаю ли я что-то легко доступное в существующем (и в большей степени осуществленный) библиотеки?

Редактирование - отправляющий код...

Мой код для BackgroundInputStream ниже ( BackgroundOutputStream очень похоже), но существуют аспекты его, что я хотел бы улучшиться.

  1. Похоже, что я слишком упорно работаю для пасования назад буферов и вперед.
  2. Если код вызова выбрасывает ссылки на BackgroundInputStream, backgroundReaderThread будет бродить вокруг навсегда.
  3. Передача сигналов eof улучшение потребностей.
  4. Исключения должны быть распространены к приоритетному потоку.
  5. Я хотел бы позволить использовать поток от обеспеченного Executor.
  6. close() метод должен сигнализировать о фоновом потоке и не должен закрывать перенесенный поток, поскольку перенесенный поток должен принадлежать фоновому потоку, который читает из него.
  7. При выполнении глупых вещей как чтение после того, как закрытие должно быть обслужено соответственно.

package nz.co.datacute.io;

import java.io.IOException;
import java.io.InputStream;
import java.util.Arrays;
import java.util.concurrent.LinkedBlockingQueue;

public class BackgroundInputStream extends InputStream {
    private static final int DEFAULT_QUEUE_SIZE = 1;
    private static final int DEFAULT_BUFFER_SIZE = 64*1024;
    private final int queueSize;
    private final int bufferSize;
    private volatile boolean eof = false;
    private LinkedBlockingQueue<byte[]> bufferQueue;
    private final InputStream wrappedInputStream;
    private byte[] currentBuffer;
    private volatile byte[] freeBuffer;
    private int pos;

    public BackgroundInputStream(InputStream wrappedInputStream) {
        this(wrappedInputStream, DEFAULT_QUEUE_SIZE, DEFAULT_BUFFER_SIZE);
    }

    public BackgroundInputStream(InputStream wrappedInputStream,int queueSize,int bufferSize) {
        this.wrappedInputStream = wrappedInputStream;
        this.queueSize = queueSize;
        this.bufferSize = bufferSize;
    }

    @Override
    public int read() throws IOException {
        if (bufferQueue == null) {
            bufferQueue = new LinkedBlockingQueue<byte[]>(queueSize);
            BackgroundReader backgroundReader = new BackgroundReader();
            Thread backgroundReaderThread = new Thread(backgroundReader, "Background InputStream");
            backgroundReaderThread.start();
        }
        if (currentBuffer == null) {
            try {
                if ((!eof) || (bufferQueue.size() > 0)) {
                    currentBuffer = bufferQueue.take();
                    pos = 0;
                } else {
                    return -1;
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        int b = currentBuffer[pos++];
        if (pos == currentBuffer.length) {
            freeBuffer = currentBuffer;
            currentBuffer = null;
        }
        return b;
    }

    @Override
    public int available() throws IOException {
        if (currentBuffer == null) return 0;
        return currentBuffer.length;
    }

    @Override
    public void close() throws IOException {
        wrappedInputStream.close();
        currentBuffer = null;
        freeBuffer = null;
    }

    class BackgroundReader implements Runnable {

        @Override
        public void run() {
            try {
                while (!eof) {
                    byte[] newBuffer;
                    if (freeBuffer != null) {
                        newBuffer = freeBuffer;
                        freeBuffer = null;
                    } else {
                        newBuffer = new byte[bufferSize];
                    }
                    int bytesRead = 0;
                    int writtenToBuffer = 0;
                    while (((bytesRead = wrappedInputStream.read(newBuffer, writtenToBuffer, bufferSize - writtenToBuffer)) != -1) && (writtenToBuffer < bufferSize)) {
                        writtenToBuffer += bytesRead;
                    }
                    if (writtenToBuffer > 0) {
                        if (writtenToBuffer < bufferSize) {
                            newBuffer = Arrays.copyOf(newBuffer, writtenToBuffer);
                        }
                        bufferQueue.put(newBuffer);
                    }
                    if (bytesRead == -1) {
                        eof = true;
                    }
                }
            } catch (IOException e) {
                e.printStackTrace();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }

    }
}
8
задан Stephen Denne 2 February 2010 в 10:33
поделиться

2 ответа

Звучит интересно. Я никогда не сталкивался ни с чем, что делает это из коробки, но имеет смысл попробовать использовать незадействованное ядро для сжатия, если оно доступно.

Возможно, вы могли бы использовать Commons I/O - это хорошо протестированная библиотека, которая может помочь справиться с некоторыми более скучными вещами и позволить вам сосредоточиться на расширении крутых параллельных частей. Может быть, вы даже могли бы внести свой код в проект Commons ;-)

.
3
ответ дан 6 December 2019 в 00:57
поделиться

Мне было бы интересно. Я продумывал аналогичный проект, но не мог понять, как обращаться с частями, которые завершают сжатие не в порядке.

0
ответ дан 6 December 2019 в 00:57
поделиться
Другие вопросы по тегам:

Похожие вопросы: