Я запускаю тест, но я хочу выполнить 2 функции одновременно. У меня есть камера, и я говорю ей перемещаться через пену, я затем вхожу в камеру через SSH для проверки скорости, на которую установлена камера. Когда я проверяю скорость, камера остановилась так, никакая скорость не доступна. Есть ли способ, которым я могу заставить эти функции работать одновременно для тестирования скорости камеры. Пример кода ниже:
class VerifyPan(TestAbsoluteMove):
def runTest(self):
self.dest.PanTilt._x=350
# Runs soap move command
threading.Thread(target = SudsMove).start()
self.command = './ptzpanposition -c 0 -u degx10'
# Logs into camera and checks speed
TestAbsoluteMove.Ssh(self)
# Position of the camera verified through Ssh (No decimal point added to the Ssh value)
self.assertEqual(self.Value, '3500')
Я теперь попробовал модуль поточной обработки, как упомянуто ниже. Поток не работает в синхронизации с функциональным TestAbsoluteMove. Ssh (). Есть ли любой другой код, я должен сделать эту работу.
Я посмотрел на помещение аргументов в оператор потока, которые указывают выполнения потока, когда Ssh () функционируют. Кто-либо знает, что войти в этот оператор?
Извините, если я не объяснил правильно. Функция 'SudsMove' перемещает камеру, и функция 'Ssh' входит в камеру и проверяет скорость, в которую в настоящее время перемещается камера. Проблема состоит в том, что к тому времени, когда функция 'Ssh' входит в систему, камера остановилась. Мне нужны обе функции для выполнения параллельно, таким образом, я могу проверить скорость камеры, в то время как она все еще перемещается.
Спасибо
Если вы хотите использовать общую реализацию Python (CPYthon), вы, безусловно, можете использовать многопроцессорную модуль , который Чудеса (вы можете пройти небразжимые аргументы на подпроцессы, убивать задач, ...), предлагает интерфейс, похожий на темы, и не страдает от блокировки глобального интерпретатора.
Недостатком является то, что подпроцессы порождают, что требует больше времени, чем создание нитей; Это должно быть только проблема, если у вас много, много коротких задач. Кроме того, поскольку передаются данные (через сериализацию) между процессами, большие данные занимают много времени для прохождения и в конечном итоге имеют большую площадь памяти (как оно дублируется между каждым процессом). В ситуациях, когда каждая задача занимает «длинное» время, и данные в и из каждой задачи не слишком велики, многопроцессор модуль должен быть великолепен.
Импортируйте модуль Threading
модуль и запустить
SUDSMove ()
, как так:
threading.Thread(target = SudsMove).start()
, который создаст и начнет фоновый поток, который делает движение.
Ответ на отредактированный вопрос:
Насколько я понимаю это, Testabsolutemove.ssh (Self)
Опрашивает скорость один раз и хранит результат в Self.value
?! И вы проверяете ожидаемый конец наклона / вращения / положение с Self.assertequal (Self.value, '3500')
?!
Если это правильно, вы должны дождаться камеры, чтобы начать свое движение. Возможно, вы можете опросить скорость определенного интервала:
# Move camera in background thread
threading.Thread(target = SudsMove).start()
# What does this do?
self.command = './ptzpanposition -c 0 -u degx10'
# Poll the current speed in an interval of 250 ms
import time
measuredSpeedsList = []
for i in xrange(20):
# Assuming that this call will put the result in self.Value
TestAbsoluteMove.Ssh(self)
measuredSpeedsList.append(self.Value)
time.sleep(0.25)
print "Measured movement speeds: ", measuredSpeedsList
Скорость движения будет самым большим значением в измеренной комиссии
(I.E. MAX (измерение SPEEDSLIST)
). Надеюсь, что имеет смысл ...
Там может быть только один поток, работающий одновременно. Это было отвечено широко здесь . Одним из решений будет использоваться два отдельных процесса. Вышеуказанный ответ предоставляет несколько советов.
Если вы можете заставить свой код работать под Jython или IronPython, то вы можете запускать несколько потоков одновременно; у них нет этой дурацкой "Global Interpreter Lock" штуки CPython.