Using Pre-Aggregations

Pre-aggregations is a powerful way to speed up your Cube queries. There are many configuration options to consider. Please make sure to also check the Pre-Aggregations reference in the data schema section.

Refresh strategy can be customized by setting the refreshKey property for the pre-aggregation.

The default value of refreshKey is every: '1 hour'. It can be redefined either by overriding the default value of the every property:

cube(`Orders`, {

  ...,

  preAggregations: {
    amount_by_created: {
      type: `rollup`,
      measures: [amount],
      timeDimension: created_at,
      granularity: `month`,
      refreshKey: {
        every: `12 hour`,
      },
    },
  },
});

Or by providing a sql property instead, and leaving every unchanged from its' default value:

cube(`Orders`, {

  ...,

  preAggregations: {
    amount_by_created: {
      measures: [amount],
      timeDimension: created_at,
      granularity: `month`,
      refreshKey: {
        // every will default to `10 seconds` here
        sql: `SELECT MAX(created_at) FROM orders`,
      },
    },
  },
});

Or both every and sql can be defined together:

cube(`Orders`, {

  ...,

  preAggregations: {
    amount_by_created: {
      measures: [amount],
      timeDimension: created_at,
      granularity: `month`,
      refreshKey: {
        every: `12 hour`,
        sql: `SELECT MAX(created_at) FROM orders`,
      },
    },
  },
});

When every and sql are used together, Cube will run the query from the sql property on an interval defined by the every property. If the query returns new results, then the pre-aggregation will be refreshed.

To make Cube only serve requests from pre-aggregations, the CUBEJS_ROLLUP_ONLY environment variable can be set to true on an API instance. This will prevent serving data on API requests from the source database.

When using this configuration in a single node deployment (where the API instance and Refresh Worker are configured on the same host), requests made to the API that cannot be satisfied by a rollup throw an error. Scheduled refreshes will continue to work in the background.

Partitioning is an extremely effective optimization for accelerating pre-aggregations build and refresh time. It effectively "shards" the data between multiple tables, splitting them by a defined attribute. Cube can be configured to incrementally refresh only the last set of partitions through the updateWindow property. This leads to faster refresh times due to unnecessary data not being reloaded, and even reduced cost for some databases like BigQuery or AWS Athena.

Any rollup pre-aggregation can be partitioned by time using the partitionGranularity property in a pre-aggregation definition. In the example below, the partitionGranularity is set to month, which means Cube will generate separate tables for each month's worth of data. Once built, it will continue to refresh on a daily basis the last 3 months of data.

cube(`Orders`, {
  sql: `select * from orders`,

  ...,

  preAggregations: {
    category_and_date: {
      measures: [Orders.count, revenue],
      dimensions: [category],
      timeDimension: created_at,
      granularity: `day`,
      partitionGranularity: `month`,
      refreshKey: {
        every: `1 day`,
        incremental: true,
        updateWindow: `3 months`
      }
    },
  },
});

When to use indexes?

Indexes are great when you filter large amounts of data across one or several dimension columns. You can read more about them here.

Best Practices

To maximize performance, you can introduce an index per type of query so the set of dimensions used in the query overlap as much as possible with the ones defined in the index. Measures are traditionally only used in indexes if you plan to filter a measured value and the cardinality of the possible values of the measure is low.

The order in which columns are specified in the index is very important; suboptimal ordering can lead to diminished performance. To improve the performance of an index the main thing to consider is the order of the columns defined in it. The order of the columns should be defined in accordance with the Index Selectivity. In a nutshell, index selectivity measures how many unique values a database has. An index is said to have a higher selectivity as the number of unique values goes up and a lower selectivity as the number of unique values goes down. Once we know the selectivity of the columns we'll be adding to the index, we need to place them from lowest to highest selectivity (this is probably contrary to what you've read about the topic in traditional row-based databases).

Example:

Suppose you have a pre-aggregation that has millions of rows with the following structure:

timestampproduct_nameproduct_categoryzip_codeorder_total
2023-01-01 10:00:00Plastic ChairFurniture885232000
2023-01-01 10:00:00KeyboardElectronics885231000
2023-01-01 10:00:00MouseElectronics88523800
2023-01-01 11:00:00Plastic ChairFurniture885243000
2023-01-01 11:00:00KeyboardElectronics885242000

The pre-aggregation code would look as follows:

JS
YAML
cube('Orders', {
  sql: `SELECT * FROM Orders`,
  preAggregations: {
    main: {
      measures: [CUBE.order_total],
      dimensions: [CUBE.product_name, CUBE.product_category, CUBE.zip_code],
      timeDimension: CUBE.timestamp,
      granularity: `hour`,
      partitionGranularity: `day`,
      allowNonStrictDateRangeMatch: true,
      refreshKey: {
        every: `1 hour`,
        updateWindow: `1 day`,
        incremental: true,
      },
      buildRangeStart: {
        sql: `SELECT date_sub(NOW(), 365)`,
      },
      buildRangeEnd: {
        sql: `SELECT NOW()`,
      },
    },
  },
});

You run the following query on a regular basis, with the only difference between queries being the filter values:

{
  "measures": [
    "Orders.order_total"
  ],
  "timeDimensions": [
    {
      "dimension": "Orders.timestamp",
      "granularity": "hour",
      "dateRange": [
        "2022-12-14T06:00:00.000",
        "2023-01-13T06:00:00.000"
      ]
    }
  ],
  "order": [
    [
      "Orders.timestamp",
      "asc"
    ]
  ],
  "filters": [
    {
      "member": "Orders.product_category",
      "operator": "equals",
      "values": [
        "Electronics"
      ]
    },
    {
      "member": "Orders.product_name",
      "operator": "equals",
      "values": [
        "Keyboard",
        "Mouse"
      ]
    }
  ],
  "dimensions": [
    "Orders.zip_code"
  ],
  "limit": 10000
}

After running this on a dataset with millions of records you find that it's taking a long time to run, so you decide to add an index to target this specific query. Taking into account the best practices mentioned previously you should define an index as follows:

JS
YAML
cube('Orders', {
  sql: `SELECT * FROM Orders;`,
  preAggregations: {
    main: {
      measures: [CUBE.order_total],
      dimensions: [CUBE.product_name, CUBE.product_category, CUBE.zip_code],
      timeDimension: CUBE.timestamp,
      granularity: `hour`,
      partitionGranularity: `day`,
      allowNonStrictDateRangeMatch: true,
      refreshKey: {
        every: `1 hour`,
        updateWindow: `1 day`,
        incremental: true,
      },
      buildRangeStart: {
        sql: `SELECT date_sub(NOW(), 365)`,
      },
      buildRangeEnd: {
        sql: `SELECT NOW()`,
      },
      // New Index
      indexes: {
        category_productname_zipcode_index: {
          columns: [CUBE.product_category, CUBE.product_name, CUBE.zip_code],
        },
      },
    },
  },
});

Then the data within category_productname_zipcode_index would look like:

product_categoryproduct_namezip_codetimestamporder_total
FurniturePlastic Chair885232023-01-01 10:00:002000
ElectronicsKeyboard885232023-01-01 10:00:001000
ElectronicsMouse885232023-01-01 10:00:00800
FurniturePlastic Chair885242023-01-01 11:00:003000
ElectronicsKeyboard885242023-01-01 11:00:002000

The columns are ordered from lowest to highest selectivity. We can expect there to be a lower number of product categories, hence a lower number of unique records resulting in a lower selectivity. Although zip_code may have lower or higher selectivity, the dimensions used to filter must come first than the dimensions in the GROUP BY part of the query.

Aggregated indexes

Aggregated indexes be defined as well. You can read more about them here.

Example:

JS
YAML
cube('Orders', {
  sql: `SELECT * FROM Orders;`,
  preAggregations: {
    main: {
      measures: [CUBE.order_total],
      dimensions: [CUBE.product_name, CUBE.product_category, CUBE.zip_code],
      timeDimension: CUBE.timestamp,
      granularity: `hour`,
      partitionGranularity: `day`,
      allowNonStrictDateRangeMatch: true,
      refreshKey: {
        every: `1 hour`,
        updateWindow: `1 day`,
        incremental: true,
      },
      buildRangeStart: {
        sql: `SELECT date_sub(NOW(), 365)`,
      },
      buildRangeEnd: {
        sql: `SELECT NOW()`,
      },
      indexes: {
        category_productname_zipcode_index: {
          columns: [CUBE.product_category, CUBE.product_name, CUBE.zip_code],
        },
        // New Index
        zip_code_index: {
          columns: [CUBE.zip_code],
          type: `aggregate`,
        },
      },
    },
  },
});

And the data for zip_code_index would look like the following:

zip_codeorder_total
885233800
885245000

Cube Store partially supports the MySQL protocol. This allows you to execute simple queries using a familiar SQL syntax. You can connect using the MySQL CLI client, for example:

mysql -h <CUBESTORE_IP> --user=cubestore -pcubestore

Only Linux and Mac OS versions of MySQL client are supported as of right now. You can install one on ubuntu using apt-get install default-mysql-client command or brew install mysql-client on Mac OS. Windows versions of the MySQL client aren't supported.

To check which pre-aggregations are managed by Cube Store, you could run the following query:

SELECT * FROM information_schema.tables;
+----------------------+-----------------------------------------------+
| table_schema         | table_name                                    |
+----------------------+-----------------------------------------------+
| dev_pre_aggregations | orders_main20190101_23jnqarg_uiyfxd0f_1gifflf |
| dev_pre_aggregations | orders_main20190301_24ph0a1c_utzntnv_1gifflf  |
| dev_pre_aggregations | orders_main20190201_zhrh5kj1_rkmsrffi_1gifflf |
| dev_pre_aggregations | orders_main20191001_mdw2hxku_waxajvwc_1gifflf |
| dev_pre_aggregations | orders_main20190701_izc2tl0h_bxsf1zlb_1gifflf |
+----------------------+-----------------------------------------------+
5 rows in set (0.01 sec)

These pre-aggregations are stored as Parquet files under the .cubestore/ folder in the project root during development.

Cube Store's MySQL protocol also supports EXPLAIN and EXPLAIN ANALYZE queries both of which are useful for determining how much processing a query will require.

EXPLAIN queries show the logical plan for a query:

 EXPLAIN SELECT orders__platform, orders__gender, sum(orders__count) FROM dev_pre_aggregations.orders_general_o32v4dvq_vbyemtl2_1h5hs8r
 GROUP BY orders__gender, orders__platform;
+-------------------------------------------------------------------------------------------------------------------------------------+
| logical plan                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                |
+--------------------------------------------------------------------------------------------------------------------------------------+
| Projection, [dev_pre_aggregations.orders_general_o32v4dvq_vbyemtl2_1h5hs8r.orders__platform, dev_pre_aggregations.orders_general_o32v4dvq_vbyemtl2_1h5hs8r.orders__gender, SUM(dev_pre_aggregations.orders_general_o32v4dvq_vbyemtl2_1h5hs8r.orders__count)]
  Aggregate
    ClusterSend, indices: [[96]]
      Scan dev_pre_aggregations.orders_general_o32v4dvq_vbyemtl2_1h5hs8r, source: CubeTable(index: orders_general_plat_gender_o32v4dvq_vbyemtl2_1h5hs8r:96:[123, 126]), fields: [orders__gender, orders__platform, orders__count] |
+-------------------------------------------------------------------------------------------------------------------------------------+

EXPLAIN ANALYZE queries show the physical plan for the router and all workers used for query processing:

 EXPLAIN ANALYZE SELECT orders__platform, orders__gender, sum(orders__count) FROM dev_pre_aggregations.orders_general_o32v4dvq_vbyemtl2_1h5hs8r
 GROUP BY orders__gender, orders__platform

+-----------+-----------------+--------------------------------------------------------------------------------------------------------------------------+
| node type | node name       | physical plan                                                                                                                                                                                                                                                                                                                                                   |
+-----------+-----------------+--------------------------------------------------------------------------------------------------------------------------+
| router    |                 | Projection, [orders__platform, orders__gender, SUM(dev_pre_aggregations.orders_general_o32v4dvq_vbyemtl2_1h5hs8r.orders__count)@2:SUM(orders__count)]
  FinalInplaceAggregate
    ClusterSend, partitions: [[123, 126]]                                                                                                                                         |
| worker    | 127.0.0.1:10001 | PartialInplaceAggregate
  Merge
    Scan, index: orders_general_plat_gender_o32v4dvq_vbyemtl2_1h5hs8r:96:[123, 126], fields: [orders__gender, orders__platform, orders__count]
      Projection, [orders__gender, orders__platform, orders__count]
        ParquetScan, files: /.cubestore/data/126-0qtyakym.parquet |
+-----------+-----------------+--------------------------------------------------------------------------------------------------------------------------+

The default pre-aggregations storage in Cube is its own purpose-built storage layer: Cube Store.

Alternatively, you can store pre-aggregations internally in the source database. To store a pre-aggregation internally, set external: false in pre-aggregation definition.

Please note, that originalSql pre-aggregations are stored internally by default. It is not recommended to store originalSql pre-aggregations in Cube Store.

This feature is in Preview and the API may change in a future release. Joining pre-aggregations only works with databases of the same type, support for joining pre-aggregations from different databases is coming soon.

When making a query that joins data from two different cubes, Cube can use pre-aggregations instead of running the base SQL queries. To get started, first ensure both cubes have valid pre-aggregations:

// Orders
cube(`Orders`, {
  sql: `SELECT * FROM public.orders`,

  ...,

  preAggregations: {
    orders_rollup: {
      measures: [CUBE.count],
      dimensions: [CUBE.user_id, CUBE.status],
      timeDimension: CUBE.created_at,
      granularity: `day`,
    },
  },

  joins: {
    Users: {
      sql: `${CUBE.user_id} = ${Users.id}`,
      relationship: `belongsTo`
    },
  },
});

// Users
cube(`Users`, {
  sql: `SELECT * FROM public.users`,

  ...,

  preAggregations: {
    users_rollup: {
      dimensions: [CUBE.id, CUBE.name],
    },
  },
});

Before we continue, let's add an index to the orders_rollup pre-aggregation so that the rollupJoin pre-aggregation can work correctly:

cube(`Orders`, {
  ...,

  preAggregations: {
    orders_rollup: {
      ...,
      indexes: {
        userIndex: {
          columns: [CUBE.user_id],
        },
      },
    },
  },
});

Now we can add a new pre-aggregation of type rollupJoin to the Orders cube:

cube(`Orders`, {
  ...,

  preAggregations: {
    orders_with_users_rollup: {
      type: `rollupJoin`,
      measures: [CUBE.count],
      dimensions: [Users.name],
      timeDimension: CUBE.created_at,
      granularity: `day`,
      rollups: [Users.users_rollup, CUBE.orders_rollup],
    },
  },
});

With all of the above set up, making a query such as the following will now use Orders.orders_rollup and Users.users_rollup, avoiding a database request:

{
  "dimensions": ["Users.name"],
  "timeDimensions": [
    {
      "dimension": "Orders.created_at",
      "dateRange": "This month"
    }
  ],
  "order": {
    "Orders.count": "desc"
  },
  "measures": ["Orders.count"]
}

For ideal performance, pre-aggregations should be built using a dedicated Refresh Worker. See here for more details.

Cube supports three different strategies for building pre-aggregations. To see which strategies your database supports, please refer to its individual page from Connecting to the Database.

When using the simple strategy, Cube will use the source database as a temporary staging area for writing pre-aggregations to determine column types. The data is loaded back into memory before writing them to Cube Store (or an external database).

For larger datasets, we strongly recommend using the Batching or Export Bucket strategies instead.

Internal vs External vs External with Cube Store diagram

Batching is a more performant strategy where Cube sends compressed CSVs for Cube Store to ingest.

Internal vs External vs External with Cube Store diagram

The performance scales to the amount of memory available on the Cube instance. Batching is automatically enabled for any databases that can support it.

The export bucket strategy requires permission to execute CREATE TABLE statements in the data source as part of the pre-aggregation build process.

When dealing with larger pre-aggregations (more than 100k rows), performance can be significantly improved by using an export bucket. This allows the source database to temporarily materialize the data locally, which is then loaded into Cube Store in parallel:

Internal vs External vs External with Cube Store diagram

Enabling the export bucket functionality requires extra configuration; please refer to the database-specific documentation for more details:

When using cloud storage, it is important to correctly configure any data retention policies to clean up the data in the export bucket as Cube does not currently manage this. For most use-cases, 1 day is sufficient.

Streaming pre-aggregations are different from traditional pre-aggregations in the way they are being updated. Traditional pre-aggregations follow the “pull” model — Cube pulls updates from the data source based on some cadence and/or condition. Streaming pre-aggregations follow the “push” model — Cube subscribes to the updates from the data source and always keeps pre-aggregation up to date.

You don’t need to define refreshKey for streaming pre-aggregations. Whether pre-aggregation is streaming or not is defined by the data source.

Currently, Cube supports only one streaming data source - ksqlDB. All pre-aggregations where data source is ksqlDB are streaming.

We’re working on supporting streaming pre-aggregations for the following data sources -

  • Materialize
  • Flink SQL
  • Spark Streaming

Please let us know if you are interested in early access to any of these drivers or would like Cube to support any other SQL streaming engine.

Did you find this page useful?