Improving data workflows with Airflow and PySpark

Within the Technical Research team, we have developed many data workflows in a variety of projects. These workflows normally need to run on a schedule, contain multiple tasks to execute and a network of data dependencies to manage. We have a requirement to monitor the execution of a workflow to make sure each task is successful, and when there's a failure, we can quickly locate the problem and resume the workflow later.

We found Apache Airflow meets our needs to manage workflows. It's an open-source platform for describing, executing and monitoring workflows, originally built by Airbnb and now widely used by many companies. This post is not meant to be an extensive tutorial for Airflow, instead, we'll take the Zone Scan data processing as an example, to show how Airflow improves workflow management.

We also wanted to speed up our big data analysis by migrating Hive queries to Apache Spark. We'll introduce how we use PySpark in an Airflow task to achieve this purpose.

Current workflow management

We didn't have a common framework for managing workflows. Workflows created at different times by different authors were designed in different ways. For example, the Zone Scan processing used a Makefile to organize jobs and dependencies, which is originally an automation tool to build software, not very intuitive for people who are not familiar with it.

Migrating to Airflow

Airflow is a modern system specifically designed for workflow management with a Web-based User Interface. We explored this by migrating the Zone Scan processing workflows to use Airflow.

An Airflow workflow is designed as a DAG (Directed Acyclic Graph), consisting of a sequence of tasks without cycles. The structure of a DAG can be viewed on the Web UI as in the following screenshot for the portal-upload-dag (one of the workflows in the Zone Scan processing).
dag_view-3

portal-upload-dag is a workflow to generate reports from the Zone Scan data and upload them to the Internet Data Portal (IDP). We can clearly see the three main tasks and their dependencies (running in the order indicated by the arrows):

  1. getdata-subdag: to extract all data needed on the reports
  2. prepare-task: to prepare the data in a format ready for uploading
  3. upload-task: to upload to the IDP.

To make a complex DAG easy to maintain, sub-DAG can be created to include a nested workflow, such as getdata-subdag. Sub-DAG can be zoomed in to show the tasks contained. Below is the graph view after zooming into getdata-subdag:
subdag_view

The status of the tasks for the latest run are indicated by colour, making it very easy to know what's happening at a glance.

You can interact with a task through the web UI. This is often useful when debugging a task, you want to manually run an individual task ignoring its dependencies. Some actions can be performed to a task instance as shown in the following screenshot:
task-action-1

The Airflow UI contains many other views that cater for different needs, such as inspecting DAG status that spans across runs, Gantt Chart to show what order tasks run and which task is taking a long time (as shown in the following screenshot), task duration historical graph, and allowing to drill into task details for metadata and log information, which is extremely convenient for troubleshooting.
gantt

Multiple workflows can be monitored in Airflow through the following view:
dag_list

A list of DAGs in your environment is shown with summarized information and shortcuts to useful pages. You can see how many tasks succeeded, failed, or are currently running at a glance.

To use Airflow, you need to write Python scripts to describe workflows, which increases flexibility. For example, a batch of tasks can be created in a loop, and dynamic workflows can be generated in various ways. Different types of operators can be used to execute a task, such as BashOperator to run a Bash command, and PythonOperator to call a Python function, specific operators such as HiveOperator, S3FileTransformOperator, and more operators built by the community. Tasks can be configured with a set of arguments, such as schedule, retries, timeout, catchup, and trigger rule.

Airflow also has more advanced features which make it very powerful, such as branching a workflow, hooking to external platforms and databases like Hive, S3, Postgres, HDFS, etc., running tasks in parallel locally or on a cluster with task queues such as Celery.

Airflow can be integrated with many well-known platforms such as Google Cloud Platform (GCP) and Amazon Web services (AWS).

Running PySpark in an Airflow task

We use many Hive queries running on Hadoop in our data analysis, and wanted to migrate them to Spark, a faster big data processing engine. As we use Python in most of our projects, PySpark (Spark Python API) naturally becomes our choice.

With the Spark SQL module and HiveContext, we wrote python scripts to run the existing Hive queries and UDFs (User Defined Functions) on the Spark engine.

To embed the PySpark scripts into Airflow tasks, we used Airflow's BashOperator to run Spark's spark-submit command to launch the PySpark scripts on Spark.

After migrating the Zone Scan processing workflows to use Airflow and Spark, we ran some tests and verified the results. The workflows were completed much faster with expected results. Moreover, the progress of the tasks can be easily monitored, and workflows are more maintainable and manageable.

Future work

We explored Apache Airflow on the Zone Scan processing, and it proved to be a great tool to improve the current workflow management. We also succeeded to integrate PySpark scripts with airflow tasks, which sped up our data analysis jobs.

We plan to use Airflow as a tool in all our projects across the team. In addition, a centralized platform can be established for all the workflows we have, which will definitely bring our workflow management to a new level.