“выбрать” на нескольких Очередях многопроцессорной обработки Python?

Как сказанный Мир, Вы, вероятно, не собираетесь добираться немного быстрее, чем это. Причина состоит в том, что нет почти никаких файловых систем, которые поддерживают усечение с начала файла, таким образом, это будет O (n) операция, где n размер файла. Что можно сделать очень быстрее, хотя перезапись первая строка с тем же числом байтов (возможно, с пробелами или комментарием), который мог бы работать на Вас в зависимости от точно, что Вы пытаетесь сделать (что это между прочим?).

27
задан cdleary 14 July 2009 в 06:58
поделиться

4 ответа

Это не похоже, есть официальный способ справиться с этим. Или, по крайней мере, не основываясь на этом:

Вы можете попробовать что-то вроде того, что делает этот пост - доступ к указателям файлов основного канала:

14
ответ дан 28 November 2019 в 05:21
поделиться

Вы можете использовать что-то вроде шаблона Observer , в котором подписчики очереди уведомляются об изменениях состояния.

В этом случае ваш рабочий поток может быть обозначен как слушатель в каждой очереди, и всякий раз, когда он получает сигнал готовности, он может работать с новым элементом, в противном случае - спать.

1
ответ дан 28 November 2019 в 05:21
поделиться

Похоже, использование потоков, которые перенаправляют входящие элементы в одну очередь, которую вы затем ожидаете, является практическим выбором при использовании многопроцессорной обработки независимым от платформы способом.

Чтобы избежать потоков, необходимо либо обрабатывать низкоуровневые каналы / FD, что и зависит от платформы, и нелегко обрабатывать согласованно с высокоуровневым API.

Или вам понадобятся очереди с возможностью установки обратных вызовов, которые, как мне кажется, являются подходящим интерфейсом более высокого уровня. Т.е. вы могли бы написать что-то вроде:

  singlequeue = Queue()
  incoming_queue1.setcallback(singlequeue.put)
  incoming_queue2.setcallback(singlequeue.put)
  ...
  singlequeue.get()

Может быть, пакет многопроцессорной обработки может расширить этот API, но его еще нет. Эта концепция хорошо работает с py.execnet, который использует термин «канал» вместо «очереди», см. Здесь http://tinyurl.com/nmtr4w

2
ответ дан 28 November 2019 в 05:21
поделиться

NSITRACEABLECHANNEL действительно способом идти сюда. Сообщения в блоге Jan Odvarko (Softwareishard.com) и SOA (ASHITA.ORG) показывают, как это сделать. Вы также можете увидеть http://www.ashita.org/implementing-an-xpcom-firefox-interface-and-creating-observers/ , однако, на самом деле не нужно делать это в компонент XPCOM.

Шаги в основном:

  1. создают объектный прототип, реализующий NSITRACEABLECHANNEL; И создать наблюдатель, чтобы прослушать http-on-modify-requestion и http-on-examine-agtress
  2. реестр наблюдателя
  3. наблюдателя, прослушивание двух типов запросов, добавляет наш объект NSITRACEALECHANNEL в цепочку слушателей и убедитесь, что наш NSITC знает, кто следующий в цепочке
  4. NSITC Object предоставляет три обратных вызова, и каждый будет вызываться на соответствующем этапе: OnStartrequest, ondata_available и onstoprequest
  5. в каждом из вышеперечисленных вызовов, наш объект NSITC должен пройти на данные К следующему пункту в цепочке

ниже


ниже - это фактический код из дополнительного дополнения, который я писал, что ведет себя очень похожи на вас от того, что я могу сказать.

function TracingListener() {
    //this.receivedData = [];
}

TracingListener.prototype =
{
    originalListener: null,
    receivedData: null,   // array for incoming data.

    onDataAvailable: function(request, context, inputStream, offset, count)
    {
        var binaryInputStream = CCIN("@mozilla.org/binaryinputstream;1", "nsIBinaryInputStream");
        var storageStream = CCIN("@mozilla.org/storagestream;1", "nsIStorageStream");
        binaryInputStream.setInputStream(inputStream);
        storageStream.init(8192, count, null);

        var binaryOutputStream = CCIN("@mozilla.org/binaryoutputstream;1",
                "nsIBinaryOutputStream");

        binaryOutputStream.setOutputStream(storageStream.getOutputStream(0));

        // Copy received data as they come.
        var data = binaryInputStream.readBytes(count);
        //var data = inputStream.readBytes(count);

        this.receivedData.push(data);

        binaryOutputStream.writeBytes(data, count);
        this.originalListener.onDataAvailable(request, context,storageStream.newInputStream(0), offset, count);
    },

    onStartRequest: function(request, context) {
        this.receivedData = [];
        this.originalListener.onStartRequest(request, context);
    },

    onStopRequest: function(request, context, statusCode)
    {
        try 
        {
            request.QueryInterface(Ci.nsIHttpChannel);

            if (request.originalURI && piratequesting.baseURL == request.originalURI.prePath && request.originalURI.path.indexOf("/index.php?ajax=") == 0) 
            {

                var data = null;
                if (request.requestMethod.toLowerCase() == "post") 
                {
                    var postText = this.readPostTextFromRequest(request, context);
                    if (postText) 
                        data = ((String)(postText)).parseQuery();

                }
                var date = Date.parse(request.getResponseHeader("Date"));
                var responseSource = this.receivedData.join('');

                //fix leading spaces bug
                responseSource = responseSource.replace(/^\s+(\S[\s\S]+)/, "$1");

                piratequesting.ProcessRawResponse(request.originalURI.spec, responseSource, date, data);
            }
        } 
        catch (e) 
        {
            dumpError(e);
        }
        this.originalListener.onStopRequest(request, context, statusCode);
    },

    QueryInterface: function (aIID) {
        if (aIID.equals(Ci.nsIStreamListener) ||
            aIID.equals(Ci.nsISupports)) {
            return this;
        }
        throw Components.results.NS_NOINTERFACE;
    },
    readPostTextFromRequest : function(request, context) {
        try
        {
            var is = request.QueryInterface(Ci.nsIUploadChannel).uploadStream;
            if (is)
            {
                var ss = is.QueryInterface(Ci.nsISeekableStream);
                var prevOffset;
                if (ss)
                {
                    prevOffset = ss.tell();
                    ss.seek(Ci.nsISeekableStream.NS_SEEK_SET, 0);
                }

                // Read data from the stream..
                var charset = "UTF-8";
                var text = this.readFromStream(is, charset, true);

                // Seek locks the file so, seek to the beginning only if necko hasn't read it yet,
                // since necko doesn't seek to 0 before reading (at lest not till 459384 is fixed).
                if (ss && prevOffset == 0) 
                    ss.seek(Ci.nsISeekableStream.NS_SEEK_SET, 0);

                return text;
            }
            else {
                dump("Failed to Query Interface for upload stream.\n");
            }
        }
        catch(exc)
        {
            dumpError(exc);
        }

        return null;
    },
    readFromStream : function(stream, charset, noClose) {

        var sis = CCSV("@mozilla.org/binaryinputstream;1", "nsIBinaryInputStream");
        sis.setInputStream(stream);

        var segments = [];
        for (var count = stream.available(); count; count = stream.available())
            segments.push(sis.readBytes(count));

        if (!noClose)
            sis.close();

        var text = segments.join("");
        return text;
    }

}


hRO = {

    observe: function(request, aTopic, aData){
        try {
            if (typeof Cc == "undefined") {
                var Cc = Components.classes;
            }
            if (typeof Ci == "undefined") {
                var Ci = Components.interfaces;
            }
            if (aTopic == "http-on-examine-response") {
                request.QueryInterface(Ci.nsIHttpChannel);

                if (request.originalURI && piratequesting.baseURL == request.originalURI.prePath && request.originalURI.path.indexOf("/index.php?ajax=") == 0) {
                    var newListener = new TracingListener();
                    request.QueryInterface(Ci.nsITraceableChannel);
                    newListener.originalListener = request.setNewListener(newListener);
                }
            } 
        } catch (e) {
            dump("\nhRO error: \n\tMessage: " + e.message + "\n\tFile: " + e.fileName + "  line: " + e.lineNumber + "\n");
        }
    },

    QueryInterface: function(aIID){
        if (typeof Cc == "undefined") {
            var Cc = Components.classes;
        }
        if (typeof Ci == "undefined") {
            var Ci = Components.interfaces;
        }
        if (aIID.equals(Ci.nsIObserver) ||
        aIID.equals(Ci.nsISupports)) {
            return this;
        }

        throw Components.results.NS_NOINTERFACE;

    },
};


var observerService = Cc["@mozilla.org/observer-service;1"]
    .getService(Ci.nsIObserverService);

observerService.addObserver(hRO,
    "http-on-examine-response", false);

В приведенном выше коде Originallistener - слушатель, который мы вставляем себя раньше в цепочке. Крайне важно, чтобы вы сохраняли эту информацию при создании прослушивателя отслеживания и передаем данные во всех трех обратных вызовах. В противном случае ничего не будет работать (страницы даже не загружаются. Сам Firefox в последний раз в цепочке).

Примечание. В коде выше, что в коде есть некоторые функции, которые являются частью дополнения Piratequesting, например: по парамеру () и Dumperror ()

-121--4349530-

На самом деле вы можете использовать многопроцессорные объекты в выборе. Выберите. I.E.

que = multiprocessing.Queue()
(input,[],[]) = select.select([que._reader],[],[])

будет выбрать Que только в том случае, если он готов к чтению.

Нет документации об этом, хотя. Я читал исходный код мультипроцессирующей библиотеки (в Linux, как правило, sth, как /usr/lib/python2.6/multibrocessing/queue.py), чтобы узнать его.

С Queue.Queue я не нашел никакого умного способа сделать это (и я бы очень хотел).

26
ответ дан 28 November 2019 в 05:21
поделиться
Другие вопросы по тегам:

Похожие вопросы: