Объедините Pool.map с Массивом общей памяти в многопроцессорной обработке Python

У меня есть очень большой массив (только для чтения) данных, что я хочу быть обработанным несколькими процессами параллельно.

Я люблю функцию Pool.map и хотел бы использовать ее для вычисления функций на те данные параллельно.

Я видел, что можно использовать класс Значения или Массива для использования данных общей памяти между процессами. Но когда я пытаюсь использовать это, я получаю RuntimeError: 'Объекты SynchronizedString должны только быть совместно использованы процессами посредством наследования при использовании функции Pool.map:

Вот упрощенный пример того, что я пытаюсь сделать:

from sys import stdin
from multiprocessing import Pool, Array

def count_it( arr, key ):
  count = 0
  for c in arr:
    if c == key:
      count += 1
  return count

if __name__ == '__main__':
  testData = "abcabcs bsdfsdf gdfg dffdgdfg sdfsdfsd sdfdsfsdf"
  # want to share it using shared memory
  toShare = Array('c', testData)

  # this works
  print count_it( toShare, "a" )

  pool = Pool()

  # RuntimeError here
  print pool.map( count_it, [(toShare,key) for key in ["a", "b", "s", "d"]] )

Кто-либо может сказать мне, что я делаю неправильно здесь?

Таким образом, то, что я хотел бы сделать, передать информацию о выделенном массиве недавно созданной общей памяти к процессам после того, как они были созданы в пуле процесса.

52
задан Acumenus 3 October 2019 в 04:54
поделиться

5 ответов

Попробую еще раз, как я только что видел щедрость;)

В основном я думаю, что сообщение об ошибке означает то, о чем говорилось - многопроцессорная разделяемая память Массивы нельзя передавать в качестве аргументов (путем травления). Нет смысла сериализовать данные - дело в том, что данные являются разделяемой памятью. Итак, вам нужно сделать общий массив глобальным. Я думаю это' Было бы проще использовать его как атрибут модуля, как в моем первом ответе, но просто оставить его как глобальную переменную в вашем примере также хорошо. Принимая во внимание вашу точку зрения о нежелании устанавливать данные перед форком, вот модифицированный пример. Если вы хотите иметь более одного возможного разделяемого массива (и именно поэтому вы хотели передать toShare в качестве аргумента), вы могли бы аналогичным образом создать глобальный список общих массивов и просто передать индекс в count_it (который станет для c в toShare [i]: ).

s, почему вы хотели передать toShare в качестве аргумента), вы могли бы аналогичным образом создать глобальный список разделяемых массивов и просто передать индекс в count_it (который стал бы для c в toShare [i]: ).

s, почему вы хотели передать toShare в качестве аргумента), вы могли бы аналогичным образом создать глобальный список разделяемых массивов и просто передать индекс в count_it (который стал бы для c в toShare [i]: ).

from sys import stdin
from multiprocessing import Pool, Array, Process

def count_it( key ):
  count = 0
  for c in toShare:
    if c == key:
      count += 1
  return count

if __name__ == '__main__':
  # allocate shared array - want lock=False in this case since we 
  # aren't writing to it and want to allow multiple processes to access
  # at the same time - I think with lock=True there would be little or 
  # no speedup
  maxLength = 50
  toShare = Array('c', maxLength, lock=False)

  # fork
  pool = Pool()

  # can set data after fork
  testData = "abcabcs bsdfsdf gdfg dffdgdfg sdfsdfsd sdfdsfsdf"
  if len(testData) > maxLength:
      raise ValueError, "Shared array too small to hold data"
  toShare[:len(testData)] = testData

  print pool.map( count_it, ["a", "b", "s", "d"] )

[РЕДАКТИРОВАТЬ: Вышеупомянутое не работает в Windows, потому что не используется fork. Тем не менее, приведенное ниже работает в Windows, все еще используя Pool, поэтому я думаю, что это наиболее близко к тому, что вы хотите:

from sys import stdin
from multiprocessing import Pool, Array, Process
import mymodule

def count_it( key ):
  count = 0
  for c in mymodule.toShare:
    if c == key:
      count += 1
  return count

def initProcess(share):
  mymodule.toShare = share

if __name__ == '__main__':
  # allocate shared array - want lock=False in this case since we 
  # aren't writing to it and want to allow multiple processes to access
  # at the same time - I think with lock=True there would be little or 
  # no speedup
  maxLength = 50
  toShare = Array('c', maxLength, lock=False)

  # fork
  pool = Pool(initializer=initProcess,initargs=(toShare,))

  # can set data after fork
  testData = "abcabcs bsdfsdf gdfg dffdgdfg sdfsdfsd sdfdsfsdf"
  if len(testData) > maxLength:
      raise ValueError, "Shared array too small to hold data"
  toShare[:len(testData)] = testData

  print pool.map( count_it, ["a", "b", "s", "d"] )

Не уверен, почему карта не обрабатывает массив, а Process и Pool будут - я думаю, возможно, это было передается в момент инициализации подпроцесса в windows. Обратите внимание, что данные все еще устанавливаются после вилки.

42
ответ дан 7 November 2019 в 09:35
поделиться

Если Вы видите RuntimeError: Synchronized objects should only be shared between processes through inheritance ошибка, рассматриваете использование multiprocessing.Manager , поскольку она не имеет этого ограничения. Менеджер работает, полагая, что это, по-видимому, работает в отдельном процессе в целом.

import ctypes
import multiprocessing

manager = multiprocessing.Manager()
counter = manager.Value(ctypes.c_ulonglong, 0)
counter_lock = manager.Lock()  # pylint: disable=no-member

with counter_lock:
    counter.value = count = counter.value + 1
0
ответ дан 7 November 2019 в 09:35
поделиться

Если данные читаются только для чтения, просто сделайте их переменной в модуле перед вилкой из Pool. Тогда все дочерние процессы должны иметь к нему доступ, и он не будет скопирован, если вы не напишете в него.

import myglobals # anything (empty .py file)
myglobals.data = []

def count_it( key ):
    count = 0
    for c in myglobals.data:
        if c == key:
            count += 1
    return count

if __name__ == '__main__':
myglobals.data = "abcabcs bsdfsdf gdfg dffdgdfg sdfsdfsd sdfdsfsdf"

pool = Pool()
print pool.map( count_it, ["a", "b", "s", "d"] )

Если вы действительно хотите попробовать использовать Array, вы можете попробовать с блокировкой = False аргумент ключевого слова (по умолчанию истинно).

2
ответ дан 7 November 2019 в 09:35
поделиться

Проблема, которую я вижу, заключается в том, что пул не поддерживает сбор общих данных через свой список аргументов. Это то, что сообщение об ошибке означает, что «объекты должны совместно использоваться между процессами только посредством наследования». Общие данные должны быть унаследованы, т. Е. Глобальными, если вы хотите поделиться ими с помощью класса Pool.

Если вам нужно передать их явно, вам, возможно, придется использовать multiprocessing.Process. Вот ваш переработанный пример:

from multiprocessing import Process, Array, Queue

def count_it( q, arr, key ):
  count = 0
  for c in arr:
    if c == key:
      count += 1
  q.put((key, count))

if __name__ == '__main__':
  testData = "abcabcs bsdfsdf gdfg dffdgdfg sdfsdfsd sdfdsfsdf"
  # want to share it using shared memory
  toShare = Array('c', testData)

  q = Queue()
  keys = ['a', 'b', 's', 'd']
  workers = [Process(target=count_it, args = (q, toShare, key))
    for key in keys]

  for p in workers:
    p.start()
  for p in workers:
    p.join()
  while not q.empty():
    print q.get(),

Вывод: ('s', 9) ('a', 2) ('b', 3) ('d', 12)

Порядок элементов очереди может варьироваться.

Чтобы сделать это более общим и похожим на Pool, вы можете создать фиксированное количество процессов N, разделив список ключей на N частей, а затем использовать функцию-оболочку в качестве цели процесса, которая будет вызывать count_it для каждого ключа в переданном списке, например:

def wrapper( q, arr, keys ):
  for k in keys:
    count_it(q, arr, k)
5
ответ дан 7 November 2019 в 09:35
поделиться

The multiprocessing.sharedctypes module provides functions for allocating ctypes objects from shared memory which can be inherited by child processes.

So your usage of sharedctypes is wrong. Do you wish to inherit this array from parent process or you prefer to pass it explicitly? In the former case you have to create a global variable as other answers suggest. But you don't need to use sharedctypes to pass it explicitly, just pass original testData.

BTW, your usage of Pool.map() is wrong. It has the same interface as builtin map() function (did you messed it with starmap()?). Below is working example with, passing array explicitly:

from multiprocessing import Pool

def count_it( (arr, key) ):
    count = 0
    for c in arr:
        if c == key:
            count += 1
    return count

if __name__ == '__main__':
    testData = "abcabcs bsdfsdf gdfg dffdgdfg sdfsdfsd sdfdsfsdf"
    pool = Pool()
    print pool.map(count_it, [(testData, key) for key in ["a", "b", "s", "d"]])
-1
ответ дан 7 November 2019 в 09:35
поделиться
Другие вопросы по тегам:

Похожие вопросы: