Pregunta Forma correcta de crear flujos de trabajo dinámicos en Airflow


Problema

¿Hay alguna manera en Airflow de crear un flujo de trabajo tal que el número de tareas B. * se desconozca hasta la finalización de la Tarea A? He visto subdags, pero parece que solo puede funcionar con un conjunto estático de tareas que deben determinarse en la creación de Dag.

¿Funcionarían los triggers? Y si es así, podrían dar un ejemplo.

Tengo un problema donde es imposible saber la cantidad de tareas B que se necesitarán para calcular la Tarea C hasta que se haya completado la Tarea A. Cada tarea B. * tardará varias horas en computarse y no se puede combinar.

              |---> Task B.1 --|
              |---> Task B.2 --|
 Task A ------|---> Task B.3 --|-----> Task C
              |       ....     |
              |---> Task B.N --|

Idea n. ° 1

No me gusta esta solución porque tengo que crear un ExternalTaskSensor de bloqueo y toda la Tarea B. * tardará entre 2-24 horas en completarse. Entonces no considero que esta sea una solución viable. Seguramente hay una manera más fácil? ¿O Airflow no fue diseñado para esto?

Dag 1
Task A -> TriggerDagRunOperator(Dag 2) -> ExternalTaskSensor(Dag 2, Task Dummy B) -> Task C

Dag 2 (Dynamically created DAG though python_callable in TriggerDagrunOperator)
               |-- Task B.1 --|
               |-- Task B.2 --|
Task Dummy A --|-- Task B.3 --|-----> Task Dummy B
               |     ....     |
               |-- Task B.N --|

Editar 1:

A partir de ahora esta pregunta todavía no tiene una gran respuesta. Varias personas me han contactado para buscar una solución.


32
2018-01-07 04:32


origen


Respuestas:


Así es como lo hice con una solicitud similar sin subdags:

Primero crea un método que devuelva los valores que quieras

def values_function():
     return values

A continuación, cree el método que generará los trabajos dinámicamente:

def group(number, **kwargs):
        #load the values if needed in the command you plan to execute
        dyn_value = "{{ task_instance.xcom_pull(task_ids='push_func') }}"
        return BashOperator(
                task_id='JOB_NAME_{}'.format(number),
                bash_command='script.sh {} {}'.format(dyn_value, number),
                dag=dag)

Y luego combinarlos:

push_func = PythonOperator(
        task_id='push_func',
        provide_context=True,
        python_callable=values_function,
        dag=dag)

complete = DummyOperator(
        task_id='All_jobs_completed',
        dag=dag)

for i in values_function():
        push_func >> group(i) >> complete

9
2018-01-13 02:37



OA: "¿Hay alguna manera en Airflow de crear un flujo de trabajo tal que el número de tareas B. * se desconozca hasta que se complete la Tarea A?"

La respuesta corta es no. Airflow construirá el flujo DAG antes de comenzar a ejecutarlo.

Dicho esto, llegamos a una conclusión simple, es decir, no tenemos esa necesidad. Cuando desee paralelizar algún trabajo, debe evaluar los recursos que tiene disponibles y no la cantidad de elementos que debe procesar.

Lo hicimos así: generamos dinámicamente un número fijo de tareas, digamos 10, que dividirán el trabajo. Por ejemplo, si tenemos que procesar 100 archivos, cada tarea procesará 10 de ellos. Voy a publicar el código más tarde hoy.


1
2018-06-14 13:44