Skip to main content

Integration with Dagster

Dagster is a popular open-source data pipeline orchestrator. Dagster Cloud is a fully managed service for Dagster. This guide demonstrates how to setup Cube and Dagster to work together so that Dagster can push changes from upstream data sources to Cube via the Orchestration API.

Resources

In Dagster, each workflow is represented by jobs, Python functions decorated with a @job decorator. Jobs include calls to ops, Python functions decorated with an @op decorator. Ops represent distinct pieces of work executed within a job. They can perform various jobs: poll for some precondition, perform extract-load-transform (ETL), or trigger external systems like Cube. Integration between Cube and Dagster is enabled by the dagster_cube package.
Cube and Dagster integration package was originally contributed by Olivier Dupuis, founder of discursus.io, for which we’re very grateful.
The package provides the CubeResource class: Please refer to the package documentation for details and options reference.

Installation

Install Dagster. Create a new directory:
mkdir cube-dagster
cd cube-dagster
Install the integration package:
pip install dagster_cube

Configuration

Create a new file named cube.py with the following contents:
from dagster import asset
from dagster_cube.cube_resource import CubeResource

@asset
def cube_query_workflow():
  my_cube_resource = CubeResource(
    instance_url="https://awesome-ecom.gcp-us-central1.cubecloudapp.dev/cubejs-api/v1/",
    api_key="eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJpYXQiOjEwMDAwMDAwMDAsImV4cCI6NTAwMDAwMDAwMH0.OHZOpOBVKr-sCwn8sbZ5UFsqI3uCs6e4omT7P6WVMFw"
  )

  response = my_cube_resource.make_request(
    method="POST",
    endpoint="load",
    data={
      'query': {
        'measures': ['Orders.count'],
        'dimensions': ['Orders.status']
      }
    }
  )

  return response

@asset
def cube_build_workflow():
  my_cube_resource = CubeResource(
    instance_url="https://awesome-ecom.gcp-us-central1.cubecloudapp.dev/cubejs-api/v1/",
    api_key="eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJpYXQiOjEwMDAwMDAwMDAsImV4cCI6NTAwMDAwMDAwMH0.OHZOpOBVKr-sCwn8sbZ5UFsqI3uCs6e4omT7P6WVMFw"
  )

  response = my_cube_resource.make_request(
    method="POST",
    endpoint="pre-aggregations/jobs",
    data={
      'action': 'post',
      'selector': {
        'timezones': ['UTC'],
        'contexts': [{'securityContext': {}}]
      }
    }
  )

  return response
As you can see, the make_request method for the load endpoint accepts a Cube query via the query option and the make_request method for the pre-aggregations/jobs endpoint accepts a pre-aggregation selector via the selector option.

Running jobs

Now, you can load these jobs to Dagster:
dagster dev -f cube.py
Navigate to Dagit UI at localhost:3000 and click Materialize all to run both jobs: