Overview
Coalesce's code-first GUI-driven approach has makes it easier to build, test, and deploy data pipelines than ever before. This considerably improves the data pipeline development workflow when compared to creating directed acyclic graphs (or DAGs) purely with code.
Coalesce relies on third-party services and orchestrators to schedule and execute the Coalesce-built data pipelines, which are organized into Jobs for execution.
Apache Airflow is a common orchestrator used with Coalesce.
Airflow stands out as a widely embraced open-source platform dedicated to crafting, scheduling, overseeing, and archiving workflows. Rooted in Python and backed by a thriving community, it has evolved into an essential tool in the repertoire of data engineers, proficiently handling the orchestration, scheduling, and construction of pipelines.
With respect to orchestrating Coalesce Jobs, Airflow can be used to schedule and execute these Jobs directly, or within the constructs of a larger organizational data pipeline or stack via Airflow DAGs.
This article details the steps for configuring nodes in an Airflow DAG to utilize the Coalesce API to trigger Coalesce Jobs on a schedule.
Assumptions
The steps detailed in this article assume the following is already in place:
- You have a basic Airflow webserver and scheduler already running.
- You have created the Coalesce Job(s) for your data pipelines.
- You have deployed your Coalesce Job(s) to your Coalesce Environments you wish to refresh.
- You have generated your Coalesce Access Token as is required to use the Coalesce Command-Line Interface (CLI) or API.
- You have the credentials readily available for a supported Authentication to Snowflake method for the Coalesce API.
Configure Airflow to Execute Coalesce Jobs on a Schedule
The step below will detail how to configure Airflow to execute Coalesce Jobs on a Schedule.
Configure Airflow Variables to Support Authentication to Snowflake
In Airflow, you will need to create variables to store your the Snowflake credentials for your selected Snowflake authentication method.
In this article, we will be leveraging the KeyPair Authentication method to avoid exposing actual username/password credentials in the DAG, and the instructions and screen captures will reflect that authentication method. You will need to modify to support your selected authentication method if it is not KeyPair Authentication.
- Within Airflow, go to the Admin tab, and click variables.
- Create two variables:
- AIRFLOW_VAR_COALESCE_TOKEN: to hold your Coalesce Access Token.
-
AIRFLOW_VAR_SNOWFLAKE_PRIVATE_KEY: to hold the private key you generated for Snowflake. (for more information on generating your keys for KeyPair authentication to Snowflake, see the Snowflake Documentation) Note that the formatting of the private key you provide to Airflow must contain newline (\n) characters at the end of every line; do not just copy/paste the private key into the the AIRFLOW_VAR_SNOWFLAKE_PRIVATE_KEY box, you will have to manually add the \n characters).
For example, if your private key looks like:
-----BEGIN PRIVATE KEY-----
it must be input as:
MIIEvQIBADANBgkqhkiG9w0BAQEFAASCBKcwggSjAgEAAoIBAQCv2xVhFpaM6hhf
ce4U5GRfdArGkoqkL2EBRs0zGMn1YYfQ8+zDuN9YkMTNC1pNxQptGn921teGk0wv
cMP+I83P390jqXh56TlQtwn2reRXH7OlLdELttof4VGYb4I6KpdBhDaid8bys2FE
f0r948EXM81Euh9FgmMbc4KzeF1tBDyU0sqAcAJCQXOl95jUR6Wqdp04LXJVoGmI
-----END PRIVATE KEY-----
-----BEGIN PRIVATE KEY-----\n
within the variable AIRFLOW_VAR_SNOWFLAKE_PRIVATE_KEY.
MIIEvQIBADANBgkqhkiG9w0BAQEFAASCBKcwggSjAgEAAoIBAQCv2xVhFpaM6hhf\n
ce4U5GRfdArGkoqkL2EBRs0zGMn1YYfQ8+zDuN9YkMTNC1pNxQptGn921teGk0wv\n
cMP+I83P390jqXh56TlQtwn2reRXH7OlLdELttof4VGYb4I6KpdBhDaid8bys2FE\n
f0r948EXM81Euh9FgmMbc4KzeF1tBDyU0sqAcAJCQXOl95jUR6Wqdp04LXJVoGmI\n
-----END PRIVATE KEY-----
Create and Configure Your Airflow DAG to Run a Coalesce Job
In the place where your DAGs are stored within Airflow:
- Create a new DAG
- Paste in the following code:
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.models import Variable
default_args = {
'owner': 'airflow',
'start_date': datetime(2023, 01, 01),
'retries': 1,
'retry_delay': timedelta(minutes=2),
}
dag = DAG(
'example_coalesce_trigger_w_status_check_parametrized_token',
default_args=default_args,
description='An example Airflow DAG',
schedule_interval=timedelta(days=1),
)
print_hello_command = 'echo "Hello, World"'
execute_api_call_command = """
response=$(curl --location 'https://app.coalescesoftware.io/scheduler/startRun' \
--header 'accept: application/json' \
--header 'content-type: application/json' \
--header 'Authorization: Bearer {{ params.coalesce_token }}' \
--data-raw '{
"runDetails": {
"parallelism": 16,
"environmentID": "3",
"jobID": "3"
},
"userCredentials": {
"snowflakeAuthType": "KeyPair",
"snowflakeKeyPairKey": "{{ params.sf_private_key }}"
}
}')
# Extract runCounter from the JSON response using Python and remove the prefix.
runCounter=$(echo "$response" | python -c "import sys, json; print(json.load(sys.stdin)['runCounter'])" | sed 's/Run Counter: //')
echo "$runCounter"
"""
check_status_command = """
curl --request GET \
--url 'https://app.coalescesoftware.io/scheduler/runStatus?runCounter={{ task_instance.xcom_pull(task_ids="execute_api_call") }}' \
--header 'Authorization: Bearer {{ params.coalesce_token }}' \
--header 'accept: application/json'
"""
# Define tasks
task_print_hello = BashOperator(
task_id='print_hello',
bash_command=print_hello_command,
dag=dag,
)
task_execute_api_call = BashOperator(
task_id='execute_api_call',
params={'coalesce_token': Variable.get('AIRFLOW_VAR_COALESCE_TOKEN'),
'sf_private_key': str(Variable.get('AIRFLOW_VAR_SNOWFLAKE_PRIVATE_KEY'))},
bash_command=execute_api_call_command,
dag=dag,
)
task_check_status = BashOperator(
task_id='check_status',
params={'coalesce_token': Variable.get('AIRFLOW_VAR_COALESCE_TOKEN')},
bash_command=check_status_command,
dag=dag,
)
# Set task dependencies
task_print_hello >> task_execute_api_call >> task_check_status
Ensure that you replace the values for environmentId and jobId in the code with the relevant ID’s of the Environment and Job in your organization in Coalesce. If you don’t want to specify a job, and instead, run the entire Coalesce pipeline, simply remove the jobId as an argument in the payload of the node in the DAG.
The environmentId can be found in the Deploy Interface of the Coalesce UI.
The jobID can be found in a Workspace that contains your Job.
- Edit the values for attributes like start_date, retries, retry_delay, schedule_interval, etc. to reflect your requirements.
- Change the name of the DAG to reflect its purpose
Once configured and the DAG is triggered, manually or on the schedule, you will see the corresponding Job executions and their details in the Activity Feed in the Coalesce UI. You are also able to retrieve run results and details via the API.
Running Multiple Coalesce Jobs Using Airflow
One may desire to run/schedule multiple Coalesce Jobs using the Airflow Scheduler for which there are multiple ways of doing so.
1. You can create additional nodes within the same Airflow DAG, each of which calls a different job executing sequentially. You can indicate the order by modifying the task dependencies at the bottom of the DAG code:
refresh_job_1 >> refresh_job_2 >> refresh_job_3 >> ...
where each "refresh_job_" is the name of a task that executes the API call to trigger a refresh of an environment/job.
2. You can create multiple Airflow DAG's that you can schedule independently.
Copy the example code into a new DAG within your /dags folder in Airflow, edit the execute_api_call_command function with the desired environmentID and jobID parameters, modify the schedule, and give the DAG a new name.
dag = DAG(
'example_coalesce_trigger_w_status_check_parametrized_token',
default_args=default_args,
description='An example Airflow DAG',
schedule_interval=timedelta(days=1),
)
Note that it is best practice to save the DAG .py file with the same name as that given to the DAG within the file.