Azizi Othman’s Post

View profile for Azizi Othman, graphic

AI & Robotics | IT Professional | Exploring the Impact of Emerging Technologies on Society & Business | "the idiot that doesn't know when to shut up" - My Mom

3 Surprising Use-cases for Branching in Airflow you’ve not seen before Your Data Pipelines can have as many branches as this nice tree. Photo by Andrew Svk on Unsplash Branching Conditionality is an important feature of many DAGs Introduction How often is it that you’re writing a Data Pipeline and then you wish you could do something contingently? Something that only happens if a set of conditions are satisfied? Hopefully, not that often! Airflow has supported this type of functionality via the AirflowBranchPython Operator. Many other workflow Orchestration tools have followed suit. Prefect have Conditional Flows, Dagster have DyanmicOutput, and in Orchestra we facilitate branching based on status. This leads us to the most important question: Why? Why bother at all with branching, thereby making your pipeline more complicated than it needs to be. We’ll see there are actually some pretty incredible use-cases, especially for folks that are looking for a greater amount of automation in their lives. A quick example of Branching in Airflow Before diving in to use-cases, we’ll use the below code as a reference so we can understand how branching works in practice. from airflow import DAG from airflow.operators.dummy import DummyOperator from airflow.operators.python import BranchPythonOperator from datetime import datetime def choose_branch(**kwargs): value = kwargs['ti'].xcom_pull(task_ids='check_value') if value > 10: return 'path_a' else: return 'path_b' default_args = { 'owner': 'airflow', 'start_date': datetime(2023, 1, 1), 'retries': 1, } dag = DAG('example_branching', default_args=default_args, schedule_interval='@daily') start = DummyOperator(task_id='start', dag=dag) check_value = PythonOperator( task_id='check_value', python_callable=lambda: 15, # Example condition value dag=dag ) branch_task = BranchPythonOperator( task_id='branch_task', provide_context=True, python_callable=choose_branch, dag=dag, ) path_a = DummyOperator(task_id='path_a', dag=dag) path_b = DummyOperator(task_id='path_b', dag=dag) end = DummyOperator(task_id='end', dag=dag) start >> check_value >> branch_task >> [path_a, path_b] >> end The choose_branch button function returns a different value depending on a task value that is stored in an xcom (a temporary data store for tasks). The branch_taskis actually a separate task, that invokes a python callable (in this case the choose_branch function). By specifying the variables path_aand path_b, and finally adding these as the possible outputs in array format to the branch_task, Airflow knows how to branch based on the branching logic. Automating Model Training and Deployment Branching is really powerful in the Machine Learning and Data Science world. Suppose you have a Machine Learning model that needs to be trained every week, because every week yo...

3 Surprising Use-cases for Branching in Airflow you’ve not seen before

Your Data Pipelines can have as many branches as this nice tree. Photo by Andrew Svk on Unsplash

Branching Conditionality is an important feature of many DAGs

Introduction

How often is it that you’re writing a Data Pipeline and then you wish you could do something contingently? Something that only happens if a set of c...

3 Surprising Use-cases for Branching in Airflow you’ve not seen before Your Data Pipelines can have as many branches as this nice tree. Photo by Andrew Svk on Unsplash Branching Conditionality is an important feature of many DAGs Introduction How often is it that you’re writing a Data Pipeline and then you wish you could do something contingently? Something that only happens if a set of c...

towardsdatascience.com

To view or add a comment, sign in

Explore topics