Я согласен с тем, что тесты не могут быть заказаны. В некоторых случаях это помогает (проще, черт возьми!) Иметь их в порядке ... в конце концов, это причина для «единицы» в UnitTest.
Тем не менее, одна из альтернатив - использовать макетные объекты для mockout и исправлять элементы, которые должны выполняться до того, как тестируется этот конкретный код. Вы также можете поместить туда фиктивную функцию, чтобы обезопасить свой код. Для получения дополнительной информации ознакомьтесь с Mock, который теперь является частью стандартной библиотеки. Mock
Вот несколько видеороликов YouTube, если вы еще не использовали Mock раньше.
Подробнее, попробуйте использовать методы класса для структурирования вашего кода, затем поместите все методы класса в одном основном методе тестирования.
import unittest
import sqlite3
class MyOrderedTest(unittest.TestCase):
@classmethod
def setUpClass(cls):
cls.create_db()
cls.setup_draft()
cls.draft_one()
cls.draft_two()
cls.draft_three()
@classmethod
def create_db(cls):
cls.conn = sqlite3.connect(":memory:")
@classmethod
def setup_draft(cls):
cls.conn.execute("CREATE TABLE players ('draftid' INTEGER PRIMARY KEY AUTOINCREMENT NOT NULL, 'first', 'last')")
@classmethod
def draft_one(cls):
player = ("Hakeem", "Olajuwon")
cls.conn.execute("INSERT INTO players (first, last) VALUES (?, ?)", player)
@classmethod
def draft_two(cls):
player = ("Sam", "Bowie")
cls.conn.execute("INSERT INTO players (first, last) VALUES (?, ?)", player)
@classmethod
def draft_three(cls):
player = ("Michael", "Jordan")
cls.conn.execute("INSERT INTO players (first, last) VALUES (?, ?)", player)
def test_unordered_one(self):
cur = self.conn.execute("SELECT * from players")
draft = [(1, u'Hakeem', u'Olajuwon'), (2, u'Sam', u'Bowie'), (3, u'Michael', u'Jordan')]
query = cur.fetchall()
print query
self.assertListEqual(query, draft)
def test_unordered_two(self):
cur = self.conn.execute("SELECT first, last FROM players WHERE draftid=3")
result = cur.fetchone()
third = " ".join(result)
print third
self.assertEqual(third, "Michael Jordan")
Я бы рекомендовал использовать функцию бокового вывода Флинка для сбора исключений, а затем выводить их в тему Кафки.
final OutputTag<String> outputTag = new OutputTag<String>("side-output"){};
SingleOutputStreamOperator<Integer> task1 = ...;
SingleOutputStreamOperator<Integer> task2 = ...;
SingleOutputStreamOperator<Integer> task3 = ...;
DataStream<String> exceptions1 = task1.getSideOutput(outputTag);
DataStream<String> exceptions2 = task2.getSideOutput(outputTag);
DataStream<String> exceptions3 = task3.getSideOutput(outputTag);
DataStream<String> exceptions = exceptions1.union(exceptions2, exceptions3);
exceptions.addSink(new FlinkKafkaProducer(...));
Вы также можете заключить свой результат в Left
и исключения в Right
типа Either
. В конце вашего конвейера вам нужно разделить поток на полезную нагрузку и исключения с помощью функции split/select
.
DataStream<Either<Payload, Exception>> stage2 = stage1.flatMap(...);
DataStream<Either<Payload2, Exception>> stage3 = stage2.flatMap((Either<Payload, Exception> payload, Collector out) -> {
if (payload.isLeft()) {
out.collect(Left.of(map(payload.left)));
} else {
out.collect(Right.of(payload.right()));
}
});
SplitStream<Either<Payload2, Exception>> split = stage3.split((Either<Payload2, Exception> value) -> {
if (value.isLeft()) {
return Colletions.singleton("left");
} else {
return Collections.singleton("right");
}
});
DataStream<Either<Payload2, Exception>> payloads = split.select("left");
DataStream<Either<Payload2, Exception>> exceptions = split.select("right");