Я не слышал термин «эксплуатационные испытания» раньше. Но, учитывая, что вы используете Ghost Inspector, я предполагаю, что вы будете выполнять «сквозное» тестирование. Вот некоторые вещи, которые необходимо учитывать перед созданием и комплексным набором тестов:
Первое, что нужно сделать, это понять различные потоки через приложение и ранжировать эти потоки в порядке влияния на бизнес. Автоматизируйте наиболее критические потоки в первую очередь. Затем постепенно продвигайтесь вниз по списку в порядке влияния на бизнес.
Имейте в виду, что сквозные тесты дороги в обслуживании и обслуживании. Не поддавайтесь искушению проверить все. Для этого и нужны юнит и интеграционные тесты. Просто автоматизируйте достаточно, чтобы чувствовать себя комфортно, чтобы критические потоки через систему были здоровы.
В качестве примера рассмотрим сайт электронной коммерции. Каковы наиболее важные потоки?
Может быть достаточно протестировать только потоки оформления заказа и корзины покупок, хотя можно привести аргумент для функции получения электронной почты, так как электронный маркетинг является источником электронной коммерции.
Но отображение товара и инвентарь явно второстепенные проблемы, как и логин и история заказов. Пока пользователи могут давать вам деньги, сайт способен приносить доход.
Не нужно отправлять количество чанков каждому процессу, просто используйте get_nowait () и обработайте возможную очередь . Пустое исключение. Каждый процесс будет получать разное количество процессорного времени, и это должно держать их всех занятыми.
import multiprocessing, Queue
class Worker(multiprocessing.Process):
def process(self, x):
for i in range(15):
x += (float(i) / 2.6)
return x
def __init__(self, input, output):
self.input = input
self.output = output
super(Worker, self).__init__()
def run(self):
try:
while True:
self.output.put(self.process(self.input.get_nowait()))
except Queue.Empty:
pass
if name == 'main':
dataset = range(10)
processes = multiprocessing.cpu_count()
input = multiprocessing.Queue()
output = multiprocessing.Queue()
for obj in dataset:
input.put(obj)
for i in range(processes):
Worker(input, output).start()
for i in range(len(dataset)):
print output.get()
Вы не изменили метод run
. Есть два способа с процессами (или потоками) заставить его выполнять код:
run
. Переопределение __ init__
просто означает, что все ваши процессы одеты и некуда идти. Он должен использоваться для придания ему атрибутов, необходимых для выполнения того, что ему нужно, но не должен указывать задачу, которая должна быть выполнена.
В вашем коде все тяжелые работы выполняются в этой строке:
exec('worker'+str(i)+' = Worker(tmp)')
и здесь ничего не делается:
exec('worker'+str(i)+'.start()')
Поэтому проверка результатов с помощью exec ('print worker' + str (i) + '. result [0]')
должна дать вам что-то значимое, но только потому, что код, который вы хотите выполнить , был выполнен , но при построении процесса, а не при запуске процесса.
Попробуйте это:
class Worker(Process):
# example data transform
def process(self, x): return (x * 2) / 3
def __init__(self, list):
self.data = list
self.result = []
super(Worker, self).__init__()
def run(self):
self.result = map(self.process, self.data)
РЕДАКТИРОВАТЬ:
Хорошо ... так что я просто летал, основываясь на моих инстинктах потоков, и все они были не правы. Что мы оба не поняли о процессах, так это то, что вы не можете напрямую делиться переменными. Все, что вы передаете новому процессу, читается, копируется и исчезает навсегда. Если вы не используете один из двух стандартных способов обмена данными: очереди и каналы . Я немного поиграл, пытаясь заставить твой код работать, но пока не повезло. Я думаю, что это поставит вас на правильный путь.
Все, что вы передаете новому процессу, читается, копируется и исчезает навсегда. Если вы не используете один из двух стандартных способов обмена данными: очереди и каналы . Я немного поиграл, пытаясь заставить твой код работать, но пока не повезло. Я думаю, что это поставит вас на правильный путь. Все, что вы передаете новому процессу, читается, копируется и исчезает навсегда. Если вы не используете один из двух стандартных способов обмена данными: очереди и каналы . Я немного поиграл, пытаясь заставить твой код работать, но пока не повезло. Я думаю, что это поставит вас на правильный путь.Хорошо, похоже, список не был потокобезопасным, и я перешел к использованию очереди (хотя она кажется намного медленнее). Этот код по существу выполняет то, что я пытался сделать:
import math, multiprocessing
class Worker(multiprocessing.Process):
def process(self, x):
for i in range(15):
x += (float(i) / 2.6)
return x
def __init__(self, input, output, chunksize):
self.input = input
self.output = output
self.chunksize = chunksize
super(Worker, self).__init__()
def run(self):
for x in range(self.chunksize):
self.output.put(self.process(self.input.get()))
if __name__ == '__main__':
dataset = range(10)
processes = multiprocessing.cpu_count()
input = multiprocessing.Queue()
output = multiprocessing.Queue()
for obj in dataset:
input.put(obj)
for i in range(processes):
chunk = int(math.floor(len(dataset) / float(processes)))
if i + 1 == processes:
remainder = len(dataset) % processes
else: remainder = 0
Worker(input, output, chunk + remainder).start()
for i in range(len(dataset)):
print output.get()