Documentation
Using pre-aggregations

Using pre-aggregations

Pre-aggregations is a powerful way to speed up your Cube queries. There are many configuration options to consider. Please make sure to also check the Pre-Aggregations reference in the data modeling section.

Refresh strategy

Refresh strategy can be customized by setting the refresh_key property for the pre-aggregation.

The default value of refresh_key is every: 1 hour, if neither of the cubes overrides it's refreshKey parameter. It can be redefined either by overriding the default value of the every property:

YAML
JavaScript
cubes:
  - name: orders
    # ...
 
    pre_aggregations:
      - name: amount_by_created
        type: rollup
        measures:
          - amount
        time_dimension: created_at
        granularity: month
        refresh_key:
          every: 12 hour

Or by providing a sql property instead, and leaving every unchanged from its default value:

YAML
JavaScript
cubes:
  - name: orders
    # ...
 
    pre_aggregations:
      - name: amount_by_created
        measures:
          - amount
        time_dimension: created_at
        granularity: month
        refresh_key:
          # every will default to `10 seconds` here
          sql: SELECT MAX(created_at) FROM orders

Or both every and sql can be defined together:

YAML
JavaScript
cubes:
  - name: orders
    # ...
 
    pre_aggregations:
      - name: amount_by_created
        measures:
          - amount
        time_dimension: created_at
        granularity: month
        refresh_key:
          every: 12 hour
          sql: SELECT MAX(created_at) FROM orders

When every and sql are used together, Cube will run the query from the sql property on an interval defined by the every property. If the query returns new results, then the pre-aggregation will be refreshed.

Rollup-only mode

To make Cube only serve requests from pre-aggregations, the CUBEJS_ROLLUP_ONLY environment variable can be set to true on an API instance. This will prevent serving data on API requests from the source database.

When using this configuration in a single node deployment (where the API instance and Refresh Worker are configured on the same host), requests made to the API that cannot be satisfied by a rollup throw an error. Scheduled refreshes will continue to work in the background.

Partitioning

Partitioning (opens in a new tab) is an extremely effective optimization for accelerating pre-aggregations build and refresh time. It effectively "shards" the data between multiple tables, splitting them by a defined attribute. Cube can be configured to incrementally refresh only the last set of partitions through the updateWindow property. This leads to faster refresh times due to unnecessary data not being reloaded, and even reduced cost for some databases like BigQuery or AWS Athena.

Any rollup pre-aggregation can be partitioned by time using the partition_granularity property in a pre-aggregation definition. In the example below, the partition_granularity is set to month, which means Cube will generate separate tables for each month's worth of data. Once built, it will continue to refresh on a daily basis the last 3 months of data.

YAML
JavaScript
cubes:
  - name: orders
    # ...
 
    pre_aggregations:
      - name: category_and_date
        measures:
          - count
          - revenue
        dimensions:
          - category
        time_dimension: created_at
        granularity: day
        partition_granularity: month
        refresh_key:
          every: 1 day
          incremental: true
          update_window: 3 months

Partitioning by non-time dimension

Cube Store uses an auto-partitioning technique to split Cube logical partitions into multiple physical ones. The partitioning key is the same as the sorting key of an index. Every physical partition is stored as a separate parquet file. Split is performed based on underlying parquet file sizes and rows inside those files. So simplest way to ensure proper partitioning is to introduce an index. For bigger pre-aggregations first columns of an index will determine the partitioning scheme. An interesting consequence of having time dimension partitioning enabled with an index is data partitioned by time and then by sorting the key of an index. It leads to that even in case of optimal index in place querying time is proportional to count of involved time partitions. This issue can be addressed by lambda pre-aggregations.

Alternatively, if you want to explicitly introduce key partitioning, you can use multi-tenancy to introduce multiple orchestrator IDs. Each orchestrator ID can use a different pre-aggregation schema, so you may define those based on the partitioning key you want to introduce. This technique, together with multi-router Cube Store approach, allows you to achieve linear scaling on the partitioning key of your choice.

Using indexes

Indexes are sorted copies of pre-aggregation data.

When you define a pre-aggregation without any explicit indexes, the default index is created. In this index, dimensions come first, time dimensions come second, and measures come last.

When you define additional indexes, you don't incur any additional costs on the data warehouse side. However, the pre-aggregation build time for a particular pre-aggregation increases with each index.

When to use indexes?

At query time, if the default index can't be selected for a merge sort scan, then a less performant hash aggregation would be used. It usually means that the full table needs to be scanned to get query results.

It usually doesn't make much difference if the pre-aggregation table is only several MBs in size. However, for larger pre-aggregations, indexes are usually required to achieve optimal performance, especially if not all dimensions from a pre-aggregation are used in a particular query.

Best practices

Most pre-aggregations represent additive rollups. For such rollups, the rule of thumb is that, for most queries, there should be at least one index that makes a particular query scan very little amount of data, which makes it very fast. (There are exceptions to this rule like top-k queries or queries with only low selectivity range filters. Optimization for these use cases usually involves remodeling data and queries.)

To maximize performance, you can introduce an index per each query type so that the set of dimensions used in a query overlaps as much as possible with the set of dimensions in the index. Measures are usually only used in indexes if you plan to filter on a measure value and the cardinality of the possible values of the measure is low.

The order in which dimensions are specified in the index is very important; suboptimal ordering can lead to diminished performance. To improve the performance of an index the main thing to consider is its order of dimensions. The rule of thumb for dimension order is as follows:

  • Dimensions used in high selectivity, single-value filters come first.
  • Dimensions used in GROUP BY come second.
  • Everything else used in the query comes in the end, including dimensions used in low selectivity, multiple-value filters.

It might sound counter-intuitive to have dimensions used in GROUP BY before dimensions used in multiple-value filters. However, Cube Store always performs scans on sorted data, and if GROUP BY matches index ordering, merge sort-based algorithms are used for querying, which are usually much faster than hash-based GROUP BY in case index ordering doesn't match the query.

If in doubt, always use EXPLAIN and EXPLAIN ANALYZE to figure out the final query plan.

Example

Suppose you have a pre-aggregation that has millions of rows and the following structure:

timestampproduct_nameproduct_categoryzip_codeorder_total
2023-01-01 10:00:00KeyboardElectronics885231000
2023-01-01 10:00:00MouseElectronics88523800
2023-01-01 10:00:00Plastic ChairFurniture885232000
2023-01-01 11:00:00KeyboardElectronics885242000
2023-01-01 11:00:00Plastic ChairFurniture885243000

The pre-aggregation definition looks as follows:

YAML
JavaScript
cubes:
  - name: orders
    # ...
 
    pre_aggregations:
      - name: main
        measures:
          - order_total
        dimensions:
          - product_name
          - product_category
          - zip_code
        time_dimension: timestamp
        granularity: hour
        partition_granularity: day
        allow_non_strict_date_range_match: true
        refresh_key:
          every: 1 hour
          incremental: true
          update_window: 1 day
        build_range_start:
          sql: SELECT DATE_SUB(NOW(), 365)
        build_range_end:
          sql: SELECT NOW()

You run the following query on a regular basis, with the only difference between queries being the filter values:

{
  "measures": [
    "orders.order_total"
  ],
  "timeDimensions": [
    {
      "dimension": "orders.timestamp",
      "granularity": "hour",
      "dateRange": [
        "2022-12-14T06:00:00.000",
        "2023-01-13T06:00:00.000"
      ]
    }
  ],
  "order": {
    "orders.timestamp": "asc"
  },
  "filters": [
    {
      "member": "orders.product_category",
      "operator": "equals",
      "values": [
        "Electronics"
      ]
    },
    {
      "member": "orders.product_name",
      "operator": "equals",
      "values": [
        "Keyboard",
        "Mouse"
      ]
    }
  ],
  "dimensions": [
    "orders.zip_code"
  ],
  "limit": 10000
}

After running this query on a dataset with millions of records you find that it's taking too long to run, so you decide to add an index to target this specific query. Taking into account the best practices, you should define an index as follows:

YAML
JavaScript
cubes:
  - name: orders
    # ...
 
    pre_aggregations:
      - name: main
        # ...
 
        indexes:
          - name: category_productname_zipcode_index
            columns:
              - product_category
              - zip_code
              - product_name

Here's why:

  • The product_category dimension comes first as it's used in a single-value filter.
  • Then, the zip_code dimension comes second as it's used in GROUP BY.
  • The product_name dimension comes last as it's used in a multiple-value filter.

The data within category_productname_zipcode_index would look as follows:

product_categoryzip_codeproduct_nametimestamporder_total
Electronics88523Mouse2023-01-01 10:00:00800
Electronics88523Plastic Chair2023-01-01 10:00:002000
Furniture88523Keyboard2023-01-01 10:00:001000
Electronics88524Keyboard2023-01-01 11:00:002000
Furniture88524Plastic Chair2023-01-01 11:00:003000

Aggregating indexes

Aggregating indexes can be defined as well. Such indexes contain only dimensions and pre-aggregated measures from the pre-aggregation definition.

Queries with the following characteristics can target aggregating indexes:

  • They cannot make use of any filters other than for dimensions that are included in that index.
  • All dimensions used in the query must be defined in the aggregating index.

Queries that do not have the characteristics above can still make use of regular indexes so that their performance can still be optimized.

In other words, an aggregating index is a rollup of data in a rollup table. Data needs to be downloaded from the upstream data source as many times as many pre-aggregations you have. Compared to having multiple pre-aggregations, having a single pre-aggregation with multiple aggregating indexes gives you pretty much the same performance from the Cube Store side but multiple times less cost from a data warehouse side.

Aggregating indexes are defined by using the type option in the index definition:

YAML
JavaScript
cubes:
  - name: orders
    # ...
 
    pre_aggregations:
      - name: main
        # ...
 
        indexes:
          # ...
 
          - name: zip_code_index
            columns:
              - zip_code
            type: aggregate

The data for zip_code_index would look as follows:

zip_codeorder_total
885233800
885245000

Inspecting pre-aggregations

Cube Store partially supports the MySQL protocol. This allows you to execute simple queries using a familiar SQL syntax. You can connect using the MySQL CLI client, for example:

mysql -h <CUBESTORE_IP> --user=cubestore -pcubestore

Only Linux and Mac OS versions of MySQL client are supported as of right now. You can install one on ubuntu using apt-get install default-mysql-client command or brew install mysql-client on Mac OS. Windows versions of the MySQL client aren't supported.

To check which pre-aggregations are managed by Cube Store, you could run the following query:

SELECT * FROM information_schema.tables;
+----------------------+-----------------------------------------------+
| table_schema         | table_name                                    |
+----------------------+-----------------------------------------------+
| dev_pre_aggregations | orders_main20190101_23jnqarg_uiyfxd0f_1gifflf |
| dev_pre_aggregations | orders_main20190301_24ph0a1c_utzntnv_1gifflf  |
| dev_pre_aggregations | orders_main20190201_zhrh5kj1_rkmsrffi_1gifflf |
| dev_pre_aggregations | orders_main20191001_mdw2hxku_waxajvwc_1gifflf |
| dev_pre_aggregations | orders_main20190701_izc2tl0h_bxsf1zlb_1gifflf |
+----------------------+-----------------------------------------------+
5 rows in set (0.01 sec)

These pre-aggregations are stored as Parquet files under the .cubestore/ folder in the project root during development.

EXPLAIN queries

Cube Store's MySQL protocol also supports EXPLAIN and EXPLAIN ANALYZE queries both of which are useful for determining how much processing a query will require.

EXPLAIN queries show the logical plan for a query:

 EXPLAIN SELECT orders__platform, orders__gender, sum(orders__count) FROM dev_pre_aggregations.orders_general_o32v4dvq_vbyemtl2_1h5hs8r
 GROUP BY orders__gender, orders__platform;
+-------------------------------------------------------------------------------------------------------------------------------------+
| logical plan                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                |
+--------------------------------------------------------------------------------------------------------------------------------------+
| Projection, [dev_pre_aggregations.orders_general_o32v4dvq_vbyemtl2_1h5hs8r.orders__platform, dev_pre_aggregations.orders_general_o32v4dvq_vbyemtl2_1h5hs8r.orders__gender, SUM(dev_pre_aggregations.orders_general_o32v4dvq_vbyemtl2_1h5hs8r.orders__count)]
  Aggregate
    ClusterSend, indices: [[96]]
      Scan dev_pre_aggregations.orders_general_o32v4dvq_vbyemtl2_1h5hs8r, source: CubeTable(index: orders_general_plat_gender_o32v4dvq_vbyemtl2_1h5hs8r:96:[123, 126]), fields: [orders__gender, orders__platform, orders__count] |
+-------------------------------------------------------------------------------------------------------------------------------------+

EXPLAIN ANALYZE queries show the physical plan for the router and all workers used for query processing:

 EXPLAIN ANALYZE SELECT orders__platform, orders__gender, sum(orders__count) FROM dev_pre_aggregations.orders_general_o32v4dvq_vbyemtl2_1h5hs8r
 GROUP BY orders__gender, orders__platform
 
+-----------+-----------------+--------------------------------------------------------------------------------------------------------------------------+
| node type | node name       | physical plan                                                                                                                                                                                                                                                                                                                                                   |
+-----------+-----------------+--------------------------------------------------------------------------------------------------------------------------+
| router    |                 | Projection, [orders__platform, orders__gender, SUM(dev_pre_aggregations.orders_general_o32v4dvq_vbyemtl2_1h5hs8r.orders__count)@2:SUM(orders__count)]
  FinalInplaceAggregate
    ClusterSend, partitions: [[123, 126]]                                                                                                                                         |
| worker    | 127.0.0.1:10001 | PartialInplaceAggregate
  Merge
    Scan, index: orders_general_plat_gender_o32v4dvq_vbyemtl2_1h5hs8r:96:[123, 126], fields: [orders__gender, orders__platform, orders__count]
      Projection, [orders__gender, orders__platform, orders__count]
        ParquetScan, files: /.cubestore/data/126-0qtyakym.parquet |
+-----------+-----------------+--------------------------------------------------------------------------------------------------------------------------+

When you're debugging performance, one thing to keep in mind is that Cube Store, due to its design, will always use some index to query data, and usage of the index itself doesn't necessarily tell if the particular query is performing optimally or not. What's important to look at is aggregation and partition merge strategies. In most of the cases for aggregation, Cube Store will use HashAggregate or InplaceAggregate strategy as well as Merge and MergeSort operators to merge different partitions. Even for larger datasets, scan operations on sorted data will almost always be much more efficient and faster than hash aggregate as the Cube Store optimizer decides to use those only if there's an index with appropriate sorting. So, as a rule of thumb, if you see in your plan PartialHashAggregate and FinalHashAggregate nodes together with Merge operators, those queries most likely perform sub-optimally. On the other hand, if you see PartialInplaceAggregate, FinalInplaceAggregate, and FullInplaceAggregate together with MergeSort operators in your plan, then there's a high chance the query performs optimally. Sometimes, there can be exceptions to this rule. For example, a total count query run on top of the index will perform HashAggregate strategy on top of MergeSort nodes even if all required indexes are in place. This query would be optimal as well.

Pre-aggregations storage

The default pre-aggregations storage in Cube is its own purpose-built storage layer: Cube Store.

Alternatively, you can store pre-aggregations internally in the source database. To store a pre-aggregation internally, set external: false in pre-aggregation definition.

Please note, that original_sql pre-aggregations are stored internally by default. It is not recommended to store original_sql pre-aggregations in Cube Store.

Joins between pre-aggregations

When making a query that joins data from two different cubes, Cube can use pre-aggregations instead of running the base SQL queries. To get started, first ensure both cubes have valid pre-aggregations:

YAML
JavaScript
cubes:
  - name: orders
    # ...
 
    pre_aggregations:
      - name: orders_rollup
        measures:
          - CUBE.count
        dimensions:
          - CUBE.user_id
          - CUBE.status
        time_dimension: CUBE.created_at
        granularity: day
 
    joins:
      - name: users
        sql: "{CUBE.user_id} = ${users.id}"
        relationship: many_to_one
 
  - name: users
    # ...
 
    pre_aggregations:
      - name: users_rollup
        dimensions:
          - CUBE.id
          - CUBE.name

Before we continue, let's add an index to the orders_rollup pre-aggregation so that the rollup_join pre-aggregation can work correctly:

YAML
JavaScript
cubes:
  - name: orders
    # ...
 
    pre_aggregations:
      - name: orders_rollup
        # ...
 
        indexes:
          - name: user_index
            columns:
              - CUBE.user_id

Now we can add a new pre-aggregation of type rollup_join to the orders cube:

YAML
JavaScript
cubes:
  - name: orders
    # ...
 
    pre_aggregations:
      # ...
 
      - name: orders_with_users_rollup
        type: rollup_join
        measures:
          - CUBE.count
        dimensions:
          - users.name
        time_dimension: CUBE.created_at
        granularity: day
        rollups:
          - users.users_rollup
          - CUBE.orders_rollup

With all of the above set up, making a query such as the following will now use orders.orders_rollup and users.users_rollup, avoiding a database request:

{
  "dimensions": ["users.name"],
  "timeDimensions": [
    {
      "dimension": "orders.created_at",
      "dateRange": "This month"
    }
  ],
  "order": {
    "orders.count": "desc"
  },
  "measures": ["orders.count"]
}

Pre-Aggregation build strategies

For ideal performance, pre-aggregations should be built using a dedicated Refresh Worker. See here for more details.

Cube supports three different strategies for building pre-aggregations. To see which strategies your database supports, please refer to its individual page from Connecting to the Database.

Simple

When using the simple strategy, Cube will use the source database as a temporary staging area for writing pre-aggregations to determine column types. The data is loaded back into memory before writing them to Cube Store (or an external database).

For larger datasets, we strongly recommend using the Batching or Export Bucket strategies instead.

Internal vs External vs External with Cube Store diagram

Batching

Batching is a more performant strategy where Cube sends compressed CSVs for Cube Store to ingest.

Internal vs External vs External with Cube Store diagram

The performance scales to the amount of memory available on the Cube instance. Batching is automatically enabled for any databases that can support it.

Export bucket

The export bucket strategy requires permission to execute CREATE TABLE statements in the data source as part of the pre-aggregation build process.

Do not confuse the export bucket with the Cube Store storage bucket. Those are two separate storages and should never be mixed.

When dealing with larger pre-aggregations (more than 100k rows), performance can be significantly improved by using an export bucket. This allows the source database to temporarily materialize the data locally, which is then loaded into Cube Store in parallel:

Internal vs External vs External with Cube Store diagram

Enabling the export bucket functionality requires extra configuration; please refer to the database-specific documentation for more details:

When using cloud storage, it is important to correctly configure any data retention policies to clean up the data in the export bucket as Cube does not currently manage this. For most use-cases, 1 day is sufficient.

Streaming pre-aggregations

Streaming pre-aggregations are different from traditional pre-aggregations in the way they are being updated. Traditional pre-aggregations follow the “pull” model — Cube pulls updates from the data source based on some cadence and/or condition. Streaming pre-aggregations follow the “push” model — Cube subscribes to the updates from the data source and always keeps pre-aggregation up to date.

You don’t need to define refresh_key for streaming pre-aggregations. Whether pre-aggregation is streaming or not is defined by the data source.

Currently, Cube supports only one streaming data source - ksqlDB. All pre-aggregations where data source is ksqlDB are streaming.

We are working on supporting more data sources for streaming pre-aggregations, please let us know (opens in a new tab) if you are interested in early access to any of these drivers or would like Cube to support any other SQL streaming engine.