Skip to main content
Warning: You are using the test version of PyPI. This is a pre-production deployment of Warehouse. Changes made here affect the production instance of TestPyPI (testpypi.python.org).
Help us improve Python packaging - Donate today!

Airflow EMR Operators/Hooks

Project Description

Overview

These are a set of Airflow operators for working with AWS EMR clusters. These are a work-in-progress exploring the possibilities of using Airflow to provide a high level overview of EMR batch workloads and (at least at present) are shared for discussion/feedback.

The overall approach is to make use of boto3 EMR methods as much as possible and externalising as much of the EMR configuration as possible into external YAML files associated with the configuration of the EMR Cluster and the EMR steps.

A sample EMR cluster definition might be…

---
Name: airflow-{{ task.task_id }}
LogUri: s3://my-logs-bucket/logs
ServiceRole: EMR_DefaultRole
JobFlowRole: EMR_EC2_DefaultRole
VisibleToAllUsers: true
ReleaseLabel: emr-4.4.0
Applications:
- Name: Hadoop
- Name: Spark
- Name: Hue
- Name: Hive
Instances:
  MasterInstanceType: {{ task.instance_type }}
  SlaveInstanceType: {{ task.instance_type }}
  KeepJobFlowAliveWhenNoSteps: false
  InstanceCount: 1
  Ec2KeyName: my-key-name
  Ec2SubnetId: subnet-1234567

and a sample set of steps might be

---

### Read from RAW and write into INPUT
- Name: CreateSchema
  ActionOnFailure: TERMINATE_JOB_FLOW
  HadoopJarStep:
    Jar: command-runner.jar
    Args:
    - hive-script
    - '--run-hive-script'
    - '--args'
    - '-f'
    - 's3://my-emr-bucket/schema.sql'
    - '-d'
    - INPUT={{ params.warehouse_read }}
    - '-d'
    - OUTPUT={{ params.transform_write }}

- Name: RunStep1
  ActionOnFailure: TERMINATE_JOB_FLOW
  HadoopJarStep:
    Jar: command-runner.jar
    Args:
    - hive-script
    - '--run-hive-script'
    - '--args'
    - '-f'
    - 's3://my-emr-bucket/step1.sql'
    - '-d'
    - YEAR={{ params.year }}
    - '-d'
    - MONTH={{ params.month }}

NB: I chose to use YAML as the representation due to the ability to map it easily to boto3 kwargs whilst also being easy to read and add comments inline.

EMR Usage Patterns

1. EMR Cluster per Task

In the simplest form, you can use the EmrSensor to create an EMR cluster for each task. This Operator overrides BaseSensor and implements thepoke() method to create the EMR Cluster and monitor its state - returning normally on completion of raising AirflowException if the EMR Cluster terminates with errors. NB: The expectation is that in this use case you would define your cluster to auto-terminate on completion.

task = EmrSensor(
    config='./resources/emr/cluster/single.yaml',
    steps='./resources/emr/steps/export_to_postgres.yaml',
    task_id='my_task',
    params={'db_host': "{{ task_instance.xcom_pull(task_ids='create_db') }}"},
    dag=dag
)

2. CreateCluster / EmrRunSteps / DeleteCluster with mutex queue

If you have numerous short lived tasks it may be more advantageous to create a cluster up front and reuse this with subsequent tasks. In this circumstance, you should call EmrSensor with auto_terminate disabled and probably with no steps defined. Subsequent tasks then use the EmrRunSteps operator to which adds job steps to the EMR cluster and monitors the status of these specific steps. At the end you should use the EmrDeleteCluster operator to ensure the cluster is removed when no longer required. This should use the trigger_rule='all_completed' argument to ensure it is executed regardless of the success or failure of the other steps.

In this example, you associate a queue with one slot available to allow Airflow to manage when steps are added to the cluster, and this means that only one of the EmrRunSteps operators is running at any given time. This means that the GanttChart run times accurately reflects how long each job actually runs.

create_cluster = EmrSensor(
    config='./resources/emr/cluster/medium_no_terminate.yaml',
    task_id='warehouse',
    noop=NOOP,
    dag=dag
)


job_steps1 = EmrRunSteps(
    cluster_id="{{ task_instance.xcom_pull(task_ids='create_cluster', key='cluster_id') }}",
    steps='./resources/emr/steps/job_steps1.yaml',
    task_id='job_steps1',
    pool='emr_run_steps',
    params={
        'year': "{{ (execution_date - macros.timedelta(days=18)).year }}",
        'month': "{{ (execution_date - macros.timedelta(days=18)).month }}"
    },
    dag=dag
)

job_steps2 = EmrRunSteps(
    cluster_id="{{ task_instance.xcom_pull(task_ids='create_cluster', key='cluster_id') }}",
    steps='./resources/emr/steps/job_steps2.yaml',
    task_id='job_steps2',
    pool='emr_run_steps',
    params={
        'year': "{{ (execution_date - macros.timedelta(days=18)).year }}",
        'month': "{{ (execution_date - macros.timedelta(days=18)).month }}"
    },
    dag=dag
)

delete_cluster = EmrDeleteCluster(
    cluster_id="{{ task_instance.xcom_pull(task_ids='create_cluster', key='cluster_id') }}",
    task_id='delete-cluster',
    trigger_rule='all_completed',
    dag=dag
)

create_cluster.set_downstream([job_steps1, job_steps2])
delete_cluster.set_upstream([job_steps1, job_steps2])

The EmrRunSteps operator has an argument called remove_duplicates. If this is set to True the operator will query the cluster and only add steps that have not already been applied - this is solely based on the name of the step.

3. CreateCluster / EmrRunSteps / DeleteCluster without mutex queue

This step is equivalent to the previous section, except that when specifying the EmrRunSteps operator you do not specify an associated pool.

In these circumstances, the RunSteps are not constrained and all of these tasks will be running and monitoring the same queue at once, waiting for their steps to complete. Whether this approach has value is yet to be determined!

Developer Notes

1. AwsHook and EmrHook

I’ve create an EmrHook for abstracting the interaction with EMR via boto3. This is actually subclassed from AwsHook which duplicates much of the functionality found in the core S3Hook. Since over time there are likely to be many AWS related hooks it seemed sensible to abstract the core mechanism for making connections into a separate class. Ideally, I would submit this as a PR on airflow, but there are a number of blocking factors on this.

  • the S3Hook uses boto and not boto3. The newer boto3 is more up-to-date and is better at managing instance profiles and profiles from .aws/credentials and .aws/config configuration files
  • the S3Hook is hardcoded to expect AWS credentials but when using AWS instance profiles you would not specify any credentials at all
  • there are a lot of different use cases associated with getting a connection from S3Hook and few of these are well documented or covered by unit tests so it’s difficult to migrate the existing hook to boto3

2. Templating of params

The approach used in these operators expects you to work with relatively large external YAML files for defining your EMR clusters and steps. It is expected that these YAML files will be templated (such as the year and month parameters in the examples above), and as such it should be possible to use params specified at the operator level. However, the core Airflow currently does not support rendering the params argument.

To ensure that these params are properly rendered there is a render() method called during execution to rerender templates with the rendered params. This is far from ideal, since ideally we want to inspect these values prior to running (i.e. via the GUI). There is an outstanding issue/PR [AIRFLOW-103] which should hopefully address this when it is merged.

3. Managing of pools

When using EmrRunSteps it would be useful to be able to define the pool used within the DAG but this is currently not possible. At present you would need to create this pool manually via the GUI.

4. Naming Conventions

The names of the operators are subject to change. The EmrSensor has been named as such because it is derived from BaseSensor and the implementation utilises the poke() method but at a high level it’s not very similar to other Sensors. The EmrRunSteps operator similarly uses BaseSensor. I’d like to converge on a naming scheme that makes sense here.

Release History

Release History

This version
History Node

0.1.dev2

History Node

0.1.dev1

Download Files

Download Files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

File Name & Checksum SHA256 Checksum Help Version File Type Upload Date
airflow_emr-0.1.dev2.tar.gz (15.4 kB) Copy SHA256 Checksum SHA256 Source May 19, 2016

Supported By

WebFaction WebFaction Technical Writing Elastic Elastic Search Pingdom Pingdom Monitoring Dyn Dyn DNS Sentry Sentry Error Logging CloudAMQP CloudAMQP RabbitMQ Heroku Heroku PaaS Kabu Creative Kabu Creative UX & Design Fastly Fastly CDN DigiCert DigiCert EV Certificate Rackspace Rackspace Cloud Servers DreamHost DreamHost Log Hosting