Вы можете попробовать ниже реализацию, мы создаем 3 операции с помощью цикла for
from datetime import datetime
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
dag = DAG(
"hello_world_0",
description="Starting tutorial",
schedule_interval=None,
start_date=datetime(2019, 1, 1),
catchup=False
)
chain_operators = []
max_attempt = 3
for attempt in range(max_attempt):
data_pull = BashOperator(
task_id='attempt_{}'.format(attempt),
bash_command='echo "Hello World - {}!"'.format(attempt),
dag=dag
)
chain_operators.append(data_pull)
data_validation = BashOperator(task_id='data_validation', bash_command='echo "Data Validation!"', dag=dag)
chain_operators.append(data_validation)
# Add downstream
for i,val in enumerate(chain_operators[:-1]):
val.set_downstream(chain_operators[i+1])
Я изменил schedule_interval на None, потому что с '* * * * *'
задание будет запускаться непрерывно
Мне кажется, вы можете заменить его, используя член $ res-> content ($ bytes).
Кстати, я нашел этот материал ищем источник LWP :: UserAgent, затем HTTP :: Response, затем HTTP :: Message .