Integration with Apache Airflow

Apache Airflow (opens in a new tab) is a popular open-source workflow scheduler commonly used for data orchestration. Astro (opens in a new tab) is a fully managed service for Airflow by Astronomer (opens in a new tab).

This guide demonstrates how to setup Cube and Airflow to work together so that Airflow can push changes from upstream data sources to Cube via the Orchestration API.

Tasks

In Airflow, pipelines are represented by directed acyclic graphs (DAGs), Python function decorated with a @dag decorator. DAGs include calls to tasks, implemented as instances of the Operator class. Operators can perform various tasks: poll for some precondition, perform extract-load-transform (ETL), or trigger external systems like Cube.

Integration between Cube and Airflow is enabled by the airflow-provider-cube (opens in a new tab) package that provides the following operators.

CubeQueryOperator

CubeQueryOperator is used to query Cube via the /v1/load endpoint of the REST API.

It supports the following options:

OptionTypeDefaultDescription
cube_conn_idstringcube_defaultAirflow connection name.
headersdictHTTP headers to be added to the request.
querydictCube query object.
timeoutint30Response wait timeout in seconds.
waitint10Interval between API calls in seconds.

CubeBuildOperator

CubeBuildOperator is used to trigger pre-aggregation builds and check their status via the /v1/pre-aggregations/jobs endpoint of the Orchestration API.

It supports the following options:

OptionTypeDefaultDescription
cube_conn_idstringcube_defaultAirflow connection name.
headersdictHTTP headers to be added to the request.
selectordict/v1/pre-aggregations/jobs selector.
completeboolFalseWhether a task should wait for builds to complete or not.
waitint10Interval between API calls in seconds.

Installation

Install Astro CLI installed (opens in a new tab).

Create a new directory and initialize (opens in a new tab) a new Astro project:

mkdir cube-astro
cd cube-astro
astro dev init

Add the integration package to requirements.txt:

echo "airflow-provider-cube" >> ./requirements.txt

Configuration

Connection

Create an Airflow connection via the web console or by adding the following contents to the airflow_settings.yaml file:

airflow:
  connections:
    - conn_id: cube_default
      conn_type: generic
      conn_host: https://awesome-ecom.gcp-us-central1.cubecloudapp.dev
      conn_schema:
      conn_login:
      conn_password: SECRET
      conn_port:
      conn_extra:
        security_context: {}

Let's break the options down:

  • By default, Cube operators use cube_default as an Airflow connection name.
  • The connection shoud be of the generic type.
  • conn_host should be set to the URL of your Cube deployment.
  • conn_password should be set to the value of the CUBEJS_API_SECRET environment variable.
  • conn_extra should contain a security context (as security_context) that will be sent with API requests.

DAGs

Create a new DAG named cube_query.py in the dags subdirectory with the following contents. As you can see, the CubeQueryOperator accepts a Cube query via the query option.

from typing import Any
from pendulum import datetime
from airflow.decorators import dag, task
from cube_provider.operators.cube import CubeQueryOperator
 
@dag(
  start_date=datetime(2023, 6, 1),
  schedule='*/1 * * * *',
  max_active_runs=1,
  concurrency=1,
  default_args={"retries": 1, "cube_conn_id": "cube_default"},
  tags=["cube"],
)
def cube_query_workflow():
  query_op = CubeQueryOperator(
    task_id="query_op",
    query={
      "measures": ["Orders.count"],
      "dimensions": ["Orders.status"]
    }
  )
 
  @task()
  def print_op(data: Any):
    print(f"Result: {data}")
 
  print_op(query_op.output)
 
cube_query_workflow()

Create a new DAG named cube_build.py in the dags subdirectory with the following contents. As you can see, the CubeBuildOperator accepts a pre-aggregation selector via the selector option.

from typing import Any
from pendulum import datetime
from airflow.decorators import dag, task
from cube_provider.operators.cube import CubeBuildOperator
 
@dag(
  start_date=datetime(2023, 6, 1),
  schedule='*/1 * * * *',
  max_active_runs=1,
  concurrency=1,
  default_args={"retries": 1, "cube_conn_id": "cube_default"},
  tags=["cube"],
)
def cube_build_workflow():
  build_op = CubeBuildOperator(
      task_id="build_op",
      selector={
        "contexts": [
          {"securityContext": {}}
        ],
        "timezones": ["UTC"]
      },
      complete=True,
      wait=10,
  )
 
  @task()
  def print_op(data: Any):
    print(f"Result: {data}")
 
  print_op(build_op.output)
 
cube_build_workflow()

Pay attention to the complete option. When it's set to True, the operator will wait for pre-aggregation builds to complete before allowing downstream tasks to run.

Running workflows

Now, you can run these DAGs:

astro run cube_query_workflow
astro run cube_build_workflow

Alternatively, you can run Airflow and navigate to the web console at localhost:8080 (opens in a new tab) (use admin/admin to authenticate):

astro dev start