Integration with Prefect

Prefect (opens in a new tab) is a popular open-source orchestrator for data-intensive workflows. Prefect Cloud (opens in a new tab) is a fully managed service for Prefect.

This guide demonstrates how to setup Cube and Prefect to work together so that Prefect can push changes from upstream data sources to Cube via the Orchestration API.

Tasks

In Prefect, each workflow is represented by flows, Python functions decorated with a @flow decorator. Flows include calls to tasks, Python functions decorated with a @task decorator, as well as to child flows. Tasks represent distinct pieces of work executed within a flow. They can perform various jobs: poll for some precondition, perform extract-load-transform (ETL), or trigger external systems like Cube.

Integration between Cube and Prefect is enabled by the prefect-cubejs (opens in a new tab) package.

Cube and Prefect integration package was originally contributed by Alessandro Lollo (opens in a new tab), Data Engineering Manager at Cloud Academy (case study (opens in a new tab)), for which we're very grateful.

The package provides the following tasks:

Please refer to the package documentation (opens in a new tab) for details and options reference.

Installation

Install Prefect (opens in a new tab).

Create a new directory:

mkdir cube-prefect
cd cube-prefect

Install the integration package:

pip install prefect-cubejs

Configuration

Create a new workflow named cube_query.py with the following contents. As you can see, the run_query task accepts a Cube query via the query option.

from prefect import flow
from prefect_cubejs.tasks import (
  run_query
)
 
@flow
def cube_query_workflow():
  run_query(
    url="https://awesome-ecom.gcp-us-central1.cubecloudapp.dev/cubejs-api",
    api_secret="SECRET",
    query="""{
      "measures": ["Orders.count"],
      "dimensions": ["Orders.status"]
    }"""
  )
 
cube_query_workflow()

Create a new workflow named cube_build.py with the following contents. As you can see, the build_pre_aggregations task accepts a pre-aggregation selector via the selector option.

from prefect import flow
from prefect_cubejs.tasks import (
  build_pre_aggregations
)
 
@flow
def cube_build_workflow():
  build_pre_aggregations(
    url="https://awesome-ecom.gcp-us-central1.cubecloudapp.dev/cubejs-api",
    api_secret="SECRET",
    selector={
      "contexts": [
        {"securityContext": {}}
      ],
      "timezones": ["UTC"]
    },
    wait_for_job_run_completion=True
  )
 
cube_build_workflow()

Running workflows

Now, you can run these workflows:

python cube_query.py
python cube_build.py