Многопроцессорная обработка Python и общий счетчик

Инстинктивно я удалил бы обещания и использовал бы функцию async.eachSeries , чтобы связать ваши пинги.

Вот непроверенный фрагмент, адаптированный из вашего кода:

var net = require('net');
var client = new net.Socket();
const fs = require('fs');
const async = require("async");

var serverPingList = [
  {
    port:'80',
    domain:'domainexample'
  },
  {
    port:'8089',
    domain:'domainexample'
  },
  {
    port:'80',
    domain:'domainexample'
  },
  {
    port:'80',
    domain:'domainexample'
  },
  {
    port:'80',
    domain:'domainexample'
  },
]

async.eachSeries(serverPingList, function(server, callback){

  pingPlatforms(server, function(err){
    if(err){
      console.log("Error on pinging server", server.domain, err);
    }else{
      console.log("Success on pinging server",server.domain);
    }
    return callback();
  });
}, function(){
  console.log("All pings have been sent");
});

function pingPlatforms(server, cb){
  var logInit = `server ${server.domain} and port ${server.port} Connecting...`
  var log;
  fs.appendFile('ping.log', logInit + '\n', (err) => {
      if(err)
        console.log('Unable to append to server.log.');
  });

  //console.log(server.port);
  //console.log(server.domain);
  console.log('Connecting...')

  //https://stackoverflow.com/questions/8588689/node-js-socket-io-client-connect-failed-connect-error-event
  //https://nodejs.org/api/net.html#net_socket_connect
  var socket = client.connect(server.port, server.domain);
  //var socket = ns_news.socket;

  //the ping failed
  socket.on('error', function(){
      var now = new Date().toString();
      log = `${now}: server ${server.domain} and port ${server.port} Connection Failed`;
      fs.appendFile('ping.log', log + '\n', (err) => {
          if(err)
            console.log('Unable to append to ping.log.');
      });
      cb(null);
      //client.destroy();
  });
  // the ping is successful
  socket.on('connect', function(){
      console.log('Connected');

      var now = new Date().toString();
      log = `${now}: server ${server.domain} and port ${server.port} Connection Successful`;
      fs.appendFile('ping.log', log + '\n', (err) => {
          if(err)
            console.log('Unable to append to ping.log.');
      });
      cb(null);
      //client.destroy();
  });

  socket.on('disconnect', function () {
    console.log('Disconnected');
    cb("Error");
  });
  socket.on('close', function () {
    console.log('socket closed');
    //client.destroy();
  });
}

56
задан Benjamin 12 December 2013 в 02:06
поделиться

3 ответа

Проблема состоит в том, что счетчик переменная не передается между вашими процессами: каждый отдельный процесс создает свой локальный экземпляр и увеличивая это.

См. В этом разделе документации для некоторых методов вы можете использовать, чтобы поделиться состоянием между вашими процессами. В вашем случае вы можете поделиться экземпляром значение между вашими работниками

вот рабочая версия вашего примера (с некоторыми фиктивными входными данными). Обратите внимание, что он использует глобальные значения, которые я бы действительно старался избегать на практике:

from multiprocessing import Pool, Value
from time import sleep

counter = None

def init(args):
    ''' store the counter for later use '''
    global counter
    counter = args

def analyze_data(args):
    ''' increment the global counter, do something with the input '''
    global counter
    # += operation is not atomic, so we need to get a lock:
    with counter.get_lock():
        counter.value += 1
    print counter.value
    return args * 10

if __name__ == '__main__':
    #inputs = os.listdir(some_directory)

    #
    # initialize a cross-process counter and the input lists
    #
    counter = Value('i', 0)
    inputs = [1, 2, 3, 4]

    #
    # create the pool of workers, ensuring each one receives the counter 
    # as it starts. 
    #
    p = Pool(initializer = init, initargs = (counter, ))
    i = p.map_async(analyze_data, inputs, chunksize = 1)
    i.wait()
    print i.get()
61
ответ дан 7 November 2019 в 06:43
поделиться

Чрезвычайно простой пример, измененный из ответа jkp:

from multiprocessing import Pool, Value
from time import sleep

counter = Value('i', 0)
def f(x):
    global counter
    with counter.get_lock():
        counter.value += 1
    print("counter.value:", counter.value)
    sleep(1)
    return x

with Pool(4) as p:
    r = p.map(f, range(1000*1000))
0
ответ дан 7 November 2019 в 06:43
поделиться

Я работаю над панелью процесса в PyQT5, таким образом, я использую поток и объединяю вместе

import threading
import multiprocessing as mp
from queue import Queue

def multi(x):
    return x*x

def pooler(q):
    with mp.Pool() as pool:
    count = 0
    for i in pool.imap_unordered(ggg, range(100)):
        print(count, i)
        count += 1
        q.put(count)

def main():
    q = Queue()
    t = threading.Thread(target=thr, args=(q,))
    t.start()
    print('start')
    process = 0
    while process < 100:
        process = q.get()
        print('p',process)
if __name__ == '__main__':
    main()

это, я вставил рабочего Qthread, и это работает с приемлемой задержкой

0
ответ дан 7 November 2019 в 06:43
поделиться
Другие вопросы по тегам:

Похожие вопросы: