Обрабатывать исключение глобально для конвейера данных

Я согласен с тем, что тесты не могут быть заказаны. В некоторых случаях это помогает (проще, черт возьми!) Иметь их в порядке ... в конце концов, это причина для «единицы» в UnitTest.

Тем не менее, одна из альтернатив - использовать макетные объекты для mockout и исправлять элементы, которые должны выполняться до того, как тестируется этот конкретный код. Вы также можете поместить туда фиктивную функцию, чтобы обезопасить свой код. Для получения дополнительной информации ознакомьтесь с Mock, который теперь является частью стандартной библиотеки. Mock

Вот несколько видеороликов YouTube, если вы еще не использовали Mock раньше.

Видео 1

Видео 2

Видео 3

Подробнее, попробуйте использовать методы класса для структурирования вашего кода, затем поместите все методы класса в одном основном методе тестирования.

import unittest
import sqlite3

class MyOrderedTest(unittest.TestCase):

    @classmethod
    def setUpClass(cls):
        cls.create_db()
        cls.setup_draft()
        cls.draft_one()
        cls.draft_two()
        cls.draft_three()

    @classmethod
    def create_db(cls):
        cls.conn = sqlite3.connect(":memory:")

    @classmethod
    def setup_draft(cls):
        cls.conn.execute("CREATE TABLE players ('draftid' INTEGER PRIMARY KEY AUTOINCREMENT NOT NULL, 'first', 'last')")

    @classmethod
    def draft_one(cls):
        player = ("Hakeem", "Olajuwon")
        cls.conn.execute("INSERT INTO players (first, last) VALUES (?, ?)", player)

    @classmethod
    def draft_two(cls):
        player = ("Sam", "Bowie")
        cls.conn.execute("INSERT INTO players (first, last) VALUES (?, ?)", player)

    @classmethod
    def draft_three(cls):
        player = ("Michael", "Jordan")
        cls.conn.execute("INSERT INTO players (first, last) VALUES (?, ?)", player)

    def test_unordered_one(self):
        cur = self.conn.execute("SELECT * from players")
        draft = [(1, u'Hakeem', u'Olajuwon'), (2, u'Sam', u'Bowie'), (3, u'Michael', u'Jordan')]
        query = cur.fetchall()
        print query
        self.assertListEqual(query, draft)

    def test_unordered_two(self):
        cur = self.conn.execute("SELECT first, last FROM players WHERE draftid=3")
        result = cur.fetchone()
        third = " ".join(result)
        print third
        self.assertEqual(third, "Michael Jordan")

0
задан Selva 17 January 2019 в 14:01
поделиться

1 ответ

Я бы рекомендовал использовать функцию бокового вывода Флинка для сбора исключений, а затем выводить их в тему Кафки.

final OutputTag<String> outputTag = new OutputTag<String>("side-output"){};
SingleOutputStreamOperator<Integer> task1 = ...;
SingleOutputStreamOperator<Integer> task2 = ...;
SingleOutputStreamOperator<Integer> task3 = ...;
DataStream<String> exceptions1 = task1.getSideOutput(outputTag);
DataStream<String> exceptions2 = task2.getSideOutput(outputTag);
DataStream<String> exceptions3 = task3.getSideOutput(outputTag);

DataStream<String> exceptions = exceptions1.union(exceptions2, exceptions3);
exceptions.addSink(new FlinkKafkaProducer(...));

Обновление

Вы также можете заключить свой результат в Left и исключения в Right типа Either. В конце вашего конвейера вам нужно разделить поток на полезную нагрузку и исключения с помощью функции split/select.

DataStream<Either<Payload, Exception>> stage2 = stage1.flatMap(...);
DataStream<Either<Payload2, Exception>> stage3 = stage2.flatMap((Either<Payload, Exception> payload, Collector out) -> {
    if (payload.isLeft()) {
        out.collect(Left.of(map(payload.left)));
    } else {
        out.collect(Right.of(payload.right()));
    }   
});

SplitStream<Either<Payload2, Exception>> split = stage3.split((Either<Payload2, Exception> value) -> {
    if (value.isLeft()) {
        return Colletions.singleton("left");
    } else {
        return Collections.singleton("right");
    }
});

DataStream<Either<Payload2, Exception>> payloads = split.select("left");
DataStream<Either<Payload2, Exception>> exceptions = split.select("right");
0
ответ дан Till Rohrmann 17 January 2019 в 14:01
поделиться