Создание соединения с БД и поддержка нескольких процессов (многопроцессорность)

Подобно другому сообщению, которое я сделал, это отвечает на это сообщение и создает новый вопрос.

Резюме: мне нужно обновить каждую запись в пространственной базе данных, в которой у меня есть набор данных точек, которые перекрывают набор данных полигонов. Для каждого точечного объекта я хочу назначить ключ, чтобы связать его с полигональным объектом, внутри которого он находится. Итак, если моя точка «Нью-Йорк» находится в пределах многоугольника США, а для многоугольника США «GID = 1» я назначу «gid_fkey = 1» для моей точки Нью-Йорк.

Хорошо, это было достигнуто с помощью многопроцессорной обработки. Я заметил прирост скорости на 150%, так что он действительно работает. Но я думаю, что есть куча ненужных накладных расходов, поскольку для каждой записи требуется одно соединение с БД.

Вот код:

import multiprocessing, time, psycopg2

class Consumer(multiprocessing.Process):

    def __init__(self, task_queue, result_queue):
        multiprocessing.Process.__init__(self)
        self.task_queue = task_queue
        self.result_queue = result_queue

    def run(self):
        proc_name = self.name
        while True:
            next_task = self.task_queue.get()
            if next_task is None:
                print 'Tasks Complete'
                self.task_queue.task_done()
                break            
            answer = next_task()
            self.task_queue.task_done()
            self.result_queue.put(answer)
        return


class Task(object):
    def __init__(self, a):
        self.a = a

    def __call__(self):        
        pyConn = psycopg2.connect("dbname='geobase_1' host = 'localhost'")
        pyConn.set_isolation_level(0)
        pyCursor1 = pyConn.cursor()

        procQuery = 'UPDATE city SET gid_fkey = gid FROM country  WHERE ST_within((SELECT the_geom FROM city WHERE city_id = %s), country.the_geom) AND city_id = %s' % (self.a, self.a)

        pyCursor1.execute(procQuery)
        print 'What is self?'
        print self.a

        return self.a

    def __str__(self):
        return 'ARC'
    def run(self):
        print 'IN'

if __name__ == '__main__':
    tasks = multiprocessing.JoinableQueue()
    results = multiprocessing.Queue()

    num_consumers = multiprocessing.cpu_count() * 2
    consumers = [Consumer(tasks, results) for i in xrange(num_consumers)]
    for w in consumers:
        w.start()

    pyConnX = psycopg2.connect("dbname='geobase_1' host = 'localhost'")
    pyConnX.set_isolation_level(0)
    pyCursorX = pyConnX.cursor()

    pyCursorX.execute('SELECT count(*) FROM cities WHERE gid_fkey IS NULL')    
    temp = pyCursorX.fetchall()    
    num_job = temp[0]
    num_jobs = num_job[0]

    pyCursorX.execute('SELECT city_id FROM city WHERE gid_fkey IS NULL')    
    cityIdListTuple = pyCursorX.fetchall()    

    cityIdListList = []

    for x in cityIdListTuple:
        cityIdList.append(x[0])


    for i in xrange(num_jobs):
        tasks.put(Task(cityIdList[i - 1]))

    for i in xrange(num_consumers):
        tasks.put(None)

    while num_jobs:
        result = results.get()
        print result
        num_jobs -= 1

Похоже, что на одно соединение от 0,3 до 1,5 секунд, поскольку я измерил его с помощью модуля «время».

Есть ли способ установить соединение с БД для каждого процесса, а затем просто использовать информацию city_id в качестве переменной, которую я могу использовать в запросе курсора в этом открытии? Таким образом, я делаю четыре процесса, каждый из которых имеет соединение с БД, а затем добавляю мне city_id как-то для обработки.

21
задан Cédric Julien 26 September 2011 в 13:41
поделиться