Integration with Dagster

Dagster (opens in a new tab) is a popular open-source data pipeline orchestrator. Dagster Cloud (opens in a new tab) is a fully managed service for Dagster.

This guide demonstrates how to setup Cube and Dagster to work together so that Dagster can push changes from upstream data sources to Cube via the Orchestration API.

Resources

In Dagster, each workflow is represented by jobs, Python functions decorated with a @job decorator. Jobs include calls to ops, Python functions decorated with an @op decorator. Ops represent distinct pieces of work executed within a job. They can perform various jobs: poll for some precondition, perform extract-load-transform (ETL), or trigger external systems like Cube.

Integration between Cube and Dagster is enabled by the dagster_cube (opens in a new tab) package.

Cube and Dagster integration package was originally contributed by Olivier Dupuis (opens in a new tab), founder of discursus.io (opens in a new tab), for which we're very grateful.

The package provides the CubeResource class:

Please refer to the package documentation (opens in a new tab) for details and options reference.

Installation

Install Dagster (opens in a new tab).

Create a new directory:

mkdir cube-dagster
cd cube-dagster

Install the integration package:

pip install dagster_cube

Configuration

Create a new file named cube.py with the following contents:

from dagster import asset
from dagster_cube.cube_resource import CubeResource
 
@asset
def cube_query_workflow():
  my_cube_resource = CubeResource(
    instance_url="https://awesome-ecom.gcp-us-central1.cubecloudapp.dev/cubejs-api/v1/",
    api_key="eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJpYXQiOjEwMDAwMDAwMDAsImV4cCI6NTAwMDAwMDAwMH0.OHZOpOBVKr-sCwn8sbZ5UFsqI3uCs6e4omT7P6WVMFw"
  )
 
  response = my_cube_resource.make_request(
    method="POST",
    endpoint="load",
    data={
      'query': {
        'measures': ['Orders.count'],
        'dimensions': ['Orders.status']
      }
    }
  )
 
  return response
 
@asset
def cube_build_workflow():
  my_cube_resource = CubeResource(
    instance_url="https://awesome-ecom.gcp-us-central1.cubecloudapp.dev/cubejs-api/v1/",
    api_key="eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJpYXQiOjEwMDAwMDAwMDAsImV4cCI6NTAwMDAwMDAwMH0.OHZOpOBVKr-sCwn8sbZ5UFsqI3uCs6e4omT7P6WVMFw"
  )
 
  response = my_cube_resource.make_request(
    method="POST",
    endpoint="pre-aggregations/jobs",
    data={
      'action': 'post',
      'selector': {
        'timezones': ['UTC'],
        'contexts': [{'securityContext': {}}]
      }
    }
  )
 
  return response

As you can see, the make_request method for the load endpoint accepts a Cube query via the query option and the make_request method for the pre-aggregations/jobs endpoint accepts a pre-aggregation selector via the selector option.

Running jobs

Now, you can load these jobs to Dagster:

dagster dev -f cube.py

Navigate to Dagit UI (opens in a new tab) at localhost:3000 (opens in a new tab) and click Materialize all to run both jobs: