> ## Documentation Index
> Fetch the complete documentation index at: https://docs.cube.dev/llms.txt
> Use this file to discover all available pages before exploring further.

# ksqlDB

> ksqlDB is a purpose-built database for stream processing applications, ingesting data from Apache Kafka.

[ksqlDB](https://ksqldb.io) is a purpose-built database for stream processing
applications, ingesting data from [Apache Kafka](https://kafka.apache.org).

<Note>
  Available on the [Enterprise plan](https://cube.dev/pricing).
  [Contact us](https://cube.dev/contact) for details.
</Note>

See how you can use ksqlDB and Cube Cloud to power real-time analytics in Power BI:

<iframe width="100%" height="400" src="https://www.youtube.com/embed/RD_HZ7xE8G0" title="YouTube video" frameBorder="0" allow="accelerometer; autoplay; clipboard-write; encrypted-media; gyroscope; picture-in-picture" allowFullScreen />

<Info>
  In this video, the SQL API is used to connect to [Power BI][ref-powerbi].
  Currently, it's recommended to use the [DAX API][ref-dax-api].
</Info>

## Prerequisites

* Hostname for the ksqlDB server
* Username and password (or an API key) to connect to ksqlDB server

### Confluent Cloud

If you are using [Confluent Cloud](https://www.confluent.io/confluent-cloud/),
you need to generate an API key and use the API key name as your username and
the API key secret as your password.

You can generate an API key by installing `confluent-cli` and running the
following commands in the command line:

```sh theme={"dark"}
brew install --cask confluent-cli
confluent login
confluent environment use <YOUR-ENVIRONMENT-ID>
confluent ksql cluster list
confluent api-key create --resource <YOUR-KSQL-CLUSTER-ID>
```

## Setup

### Manual

Add the following to a `.env` file in your Cube project:

```dotenv theme={"dark"}
CUBEJS_DB_TYPE=ksql
CUBEJS_DB_URL=https://xxxxxx-xxxxx.us-west4.gcp.confluent.cloud:443
CUBEJS_DB_USER=username
CUBEJS_DB_PASS=password
```

## Environment Variables

| Environment Variable                                                                                | Description                                                                                                  | Possible Values           | Required |
| --------------------------------------------------------------------------------------------------- | ------------------------------------------------------------------------------------------------------------ | ------------------------- | :------: |
| [`CUBEJS_DB_URL`](/reference/configuration/environment-variables#cubejs_db_url)                     | The host URL for ksqlDB with port                                                                            | A valid database host URL |     ✅    |
| [`CUBEJS_DB_USER`](/reference/configuration/environment-variables#cubejs_db_user)                   | The username used to connect to the ksqlDB. API key for Confluent Cloud.                                     | A valid database username |     ✅    |
| [`CUBEJS_DB_PASS`](/reference/configuration/environment-variables#cubejs_db_pass)                   | The password used to connect to the ksqlDB. API secret for Confluent Cloud.                                  | A valid database password |     ✅    |
| [`CUBEJS_DB_KAFKA_HOST`](/reference/configuration/environment-variables#cubejs_db_kafka_host)       | Kafka broker host(s) for [Kafka streams mode](#kafka-streams-mode). Multiple brokers can be comma-separated. | A valid Kafka broker URL  |     ❌    |
| [`CUBEJS_DB_KAFKA_USER`](/reference/configuration/environment-variables#cubejs_db_kafka_user)       | Username for Kafka broker authentication (SASL PLAIN)                                                        | A valid Kafka username    |     ❌    |
| [`CUBEJS_DB_KAFKA_PASS`](/reference/configuration/environment-variables#cubejs_db_kafka_pass)       | Password for Kafka broker authentication (SASL PLAIN)                                                        | A valid Kafka password    |     ❌    |
| [`CUBEJS_DB_KAFKA_USE_SSL`](/reference/configuration/environment-variables#cubejs_db_kafka_use_ssl) | If `true`, enables SASL\_SSL for the Kafka connection                                                        | `true`, `false`           |     ❌    |
| [`CUBEJS_CONCURRENCY`](/reference/configuration/environment-variables#cubejs_concurrency)           | The number of [concurrent queries][ref-data-source-concurrency] to the data source                           | A valid number            |     ❌    |

## Pre-Aggregations Support

ksqlDB supports only
[streaming pre-aggregations][ref-streaming-pre-aggs].

## Kafka streams mode

By default, Cube connects to ksqlDB via its REST API. ksqlDB uses its REST
API both for metadata (discovering tables and streams) and for streaming
data into Cube Store during pre-aggregation builds.

In this default mode, Cube may create tables and streams in ksqlDB as part
of the pre-aggregation build process (e.g., `CREATE TABLE ... AS SELECT`
statements for non-read-only pre-aggregations).

When **Kafka streams mode** is enabled, Cube reads data directly from the
underlying Kafka topics instead of going through the ksqlDB REST API for
data streaming. ksqlDB is still used for metadata operations such as
discovering tables, streams, and their schemas, but Cube Store subscribes
to the backing Kafka topic directly.

In this mode, Cube does not create any tables or streams in ksqlDB. All
pre-aggregations use the read-only refresh path: Cube discovers the
existing ksqlDB objects and their backing Kafka topics, then streams data
directly from Kafka into Cube Store.

### When to use Kafka streams mode

Kafka streams mode is useful when:

* You want to prevent Cube from creating any objects in ksqlDB
* You need higher throughput for data ingestion by reading Kafka directly
* Your ksqlDB environment has restricted permissions that don't allow
  creating tables or streams
* You prefer Cube Store to consume from Kafka topics without an
  intermediary

### Enabling Kafka streams mode

Set the `CUBEJS_DB_KAFKA_HOST` environment variable to the address of your
Kafka broker(s). This activates Kafka streams mode automatically:

```dotenv theme={"dark"}
CUBEJS_DB_TYPE=ksql
CUBEJS_DB_URL=https://xxxxxx-xxxxx.us-west4.gcp.confluent.cloud:443
CUBEJS_DB_USER=ksql_username
CUBEJS_DB_PASS=ksql_password
CUBEJS_DB_KAFKA_HOST=pkc-xxxxx.us-west4.gcp.confluent.cloud:9092
CUBEJS_DB_KAFKA_USER=kafka_api_key
CUBEJS_DB_KAFKA_PASS=kafka_api_secret
CUBEJS_DB_KAFKA_USE_SSL=true
```

Multiple Kafka brokers can be specified as a comma-separated list:

```dotenv theme={"dark"}
CUBEJS_DB_KAFKA_HOST=broker1:9092,broker2:9092,broker3:9092
```

<Info>
  When using [Confluent Cloud](https://www.confluent.io/confluent-cloud/),
  the Kafka credentials are separate from the ksqlDB credentials. Generate
  an API key for the Kafka cluster (not the ksqlDB cluster) and use it as
  `CUBEJS_DB_KAFKA_USER` and `CUBEJS_DB_KAFKA_PASS`.
</Info>

### How it works

With Kafka streams mode enabled:

1. Cube uses the ksqlDB REST API to discover available tables and streams
   and to retrieve their schemas via `DESCRIBE`.
2. For each table or stream, Cube resolves the backing Kafka topic name
   from the ksqlDB metadata.
3. Instead of streaming data through ksqlDB, Cube Store connects directly
   to the Kafka broker(s) and consumes from the resolved topic.
4. Pre-aggregation builds use the read-only refresh strategy. Cube does
   not issue any `CREATE TABLE` or `CREATE STREAM` statements to ksqlDB.

### Data modeling

ksqlDB is typically used as an additional data source alongside a primary
data warehouse. To use Kafka streams mode, configure ksqlDB as a named
data source using [decorated environment variables][ref-decorated-env-vars]
and point your cubes to it with the
[`data_source`][ref-cube-data-source] property.

First, declare the data sources and configure the ksqlDB connection with
Kafka credentials:

```dotenv theme={"dark"}
CUBEJS_DATASOURCES=default,ksql
CUBEJS_DB_TYPE=postgres
CUBEJS_DB_HOST=my.postgres.host
CUBEJS_DB_NAME=my_database
CUBEJS_DB_USER=postgres_user
CUBEJS_DB_PASS=postgres_password
CUBEJS_DS_KSQL_DB_TYPE=ksql
CUBEJS_DS_KSQL_DB_URL=https://xxxxxx-xxxxx.us-west4.gcp.confluent.cloud:443
CUBEJS_DS_KSQL_DB_USER=ksql_api_key
CUBEJS_DS_KSQL_DB_PASS=ksql_api_secret
CUBEJS_DS_KSQL_DB_KAFKA_HOST=pkc-xxxxx.us-west4.gcp.confluent.cloud:9092
CUBEJS_DS_KSQL_DB_KAFKA_USER=kafka_api_key
CUBEJS_DS_KSQL_DB_KAFKA_PASS=kafka_api_secret
CUBEJS_DS_KSQL_DB_KAFKA_USE_SSL=true
```

Then, create cubes that reference your data. A common pattern is to
combine a **batch cube** (reading historical data from your warehouse)
with a **streaming cube** (reading real-time data from ksqlDB via Kafka)
using a [lambda pre-aggregation][ref-lambda-pre-aggs].

The batch cube queries the warehouse and builds daily partitions
incrementally. The streaming cube points at an existing ksqlDB stream
with `data_source: ksql` and uses a read-only streaming pre-aggregation
that consumes from the backing Kafka topic directly. The lambda
pre-aggregation in the batch cube merges both, serving historical data
from the warehouse rollup and real-time data from the streaming rollup:

<CodeGroup>
  ```yaml title="YAML" theme={"dark"}
  cubes:
    - name: order_events
      data_source: default
      sql: >
        SELECT
          order_id,
          user_id,
          status,
          amount,
          created_at
        FROM ecommerce.order_events
        WHERE {FILTER_PARAMS.order_events.created_at.filter(
          (from, to) =>
          `created_at >= ${from} AND created_at < ${to}`
        )}

      measures:
        - name: count
          type: count

        - name: total_amount
          sql: amount
          type: sum

        - name: failed_count
          sql: "CASE WHEN status = 'failed' THEN 1 ELSE 0 END"
          type: sum

      dimensions:
        - name: order_id
          sql: order_id
          type: string
          primary_key: true

        - name: user_id
          sql: user_id
          type: string

        - name: status
          sql: status
          type: string

        - name: created_at
          sql: created_at
          type: time

      pre_aggregations:
        - name: lambda
          type: rollup_lambda
          rollups:
            - order_events.batch
            - order_events_stream.stream

        - name: batch
          type: rollup
          measures:
            - CUBE.count
            - CUBE.total_amount
            - CUBE.failed_count
          dimensions:
            - CUBE.order_id
            - CUBE.user_id
            - CUBE.status
          time_dimension: CUBE.created_at
          granularity: second
          partition_granularity: day
          build_range_start:
            sql: SELECT NOW() - INTERVAL '90 days'
          build_range_end:
            sql: SELECT NOW()
          refresh_key:
            every: 8 hour
            update_window: 1 day
            incremental: true
          indexes:
            - name: user_status
              columns:
                - CUBE.user_id
                - CUBE.status

    - name: order_events_stream
      data_source: ksql
      sql: "SELECT * FROM ORDER_EVENTS_STREAM"

      measures:
        - name: count
          type: count

        - name: total_amount
          sql: AMOUNT
          type: sum

        - name: failed_count
          sql: "CASE WHEN STATUS = 'failed' THEN 1 ELSE 0 END"
          type: sum

      dimensions:
        - name: order_id
          sql: ORDER_ID
          type: string
          primary_key: true

        - name: user_id
          sql: USER_ID
          type: string

        - name: status
          sql: STATUS
          type: string

        - name: created_at
          sql: CREATED_AT
          type: time

      pre_aggregations:
        - name: stream
          type: rollup
          read_only: true
          measures:
            - CUBE.count
            - CUBE.total_amount
            - CUBE.failed_count
          dimensions:
            - CUBE.order_id
            - CUBE.user_id
            - CUBE.status
          unique_key_columns:
            - order_id
            - user_id
            - status
            - created_at_second
          time_dimension: CUBE.created_at
          granularity: second
          partition_granularity: day
          # ksqlDB does not support NOW(); use CURRENT_TIMESTAMP with INTERVAL arithmetic instead.
          build_range_start:
            sql: "SELECT date_trunc('day', CURRENT_TIMESTAMP - INTERVAL '5 hour')"
          build_range_end:
            sql: "SELECT CURRENT_TIMESTAMP + INTERVAL '15 minute'"
          refresh_key:
            every: 1 minute
            update_window: 1 hour
            incremental: true
          indexes:
            - name: user_status
              columns:
                - CUBE.user_id
                - CUBE.status
                - orders__created_at_second
          stream_offset: latest
          output_column_types:
            - member: CUBE.order_id
              type: text
            - member: CUBE.user_id
              type: text
            - member: CUBE.status
              type: text
            - member: CUBE.created_at.second
              type: timestamp
            - member: CUBE.count
              type: int
            - member: CUBE.total_amount
              type: decimal
            - member: CUBE.failed_count
              type: int
  ```

  ```javascript title="JavaScript" theme={"dark"}
  cube("order_events", {
    data_source: "default",

    sql: `
      SELECT
        order_id,
        user_id,
        status,
        amount,
        created_at
      FROM ecommerce.order_events
      WHERE ${FILTER_PARAMS.order_events.created_at.filter(
        (from, to) => `created_at >= ${from} AND created_at < ${to}`
      )}
    `,

    measures: {
      count: {
        type: `count`,
      },

      total_amount: {
        sql: `amount`,
        type: `sum`,
      },

      failed_count: {
        sql: `CASE WHEN status = 'failed' THEN 1 ELSE 0 END`,
        type: `sum`,
      },
    },

    dimensions: {
      order_id: {
        sql: `order_id`,
        type: `string`,
        primary_key: true,
      },

      user_id: {
        sql: `user_id`,
        type: `string`,
      },

      status: {
        sql: `status`,
        type: `string`,
      },

      created_at: {
        sql: `created_at`,
        type: `time`,
      },
    },

    pre_aggregations: {
      lambda: {
        type: `rollup_lambda`,
        rollups: [
          order_events.batch,
          order_events_stream.stream,
        ],
      },

      batch: {
        type: `rollup`,
        measures: [CUBE.count, CUBE.total_amount, CUBE.failed_count],
        dimensions: [CUBE.order_id, CUBE.user_id, CUBE.status],
        time_dimension: CUBE.created_at,
        granularity: `second`,
        partition_granularity: `day`,
        build_range_start: {
          sql: `SELECT NOW() - INTERVAL '90 days'`,
        },
        build_range_end: {
          sql: `SELECT NOW()`,
        },
        refresh_key: {
          every: `8 hour`,
          update_window: `1 day`,
          incremental: true,
        },
        indexes: {
          user_status: {
            columns: [CUBE.user_id, CUBE.status],
          },
        },
      },
    },
  });

  cube("order_events_stream", {
    data_source: "ksql",

    sql: `SELECT * FROM ORDER_EVENTS_STREAM`,

    measures: {
      count: {
        type: `count`,
      },

      total_amount: {
        sql: `AMOUNT`,
        type: `sum`,
      },

      failed_count: {
        sql: `CASE WHEN STATUS = 'failed' THEN 1 ELSE 0 END`,
        type: `sum`,
      },
    },

    dimensions: {
      order_id: {
        sql: `ORDER_ID`,
        type: `string`,
        primary_key: true,
      },

      user_id: {
        sql: `USER_ID`,
        type: `string`,
      },

      status: {
        sql: `STATUS`,
        type: `string`,
      },

      created_at: {
        sql: `CREATED_AT`,
        type: `time`,
      },
    },

    pre_aggregations: {
      stream: {
        type: `rollup`,
        read_only: true,
        measures: [CUBE.count, CUBE.total_amount, CUBE.failed_count],
        dimensions: [CUBE.order_id, CUBE.user_id, CUBE.status],
        unique_key_columns: [
          `order_id`,
          `user_id`,
          `status`,
          `created_at_second`,
        ],
        time_dimension: CUBE.created_at,
        granularity: `second`,
        partition_granularity: `day`,
        // ksqlDB does not support NOW(); use CURRENT_TIMESTAMP with INTERVAL arithmetic instead.
        build_range_start: {
          sql: `SELECT date_trunc('day', CURRENT_TIMESTAMP - INTERVAL '5 hour')`,
        },
        build_range_end: {
          sql: `SELECT CURRENT_TIMESTAMP + INTERVAL '15 minute'`,
        },
        refresh_key: {
          every: `1 minute`,
          update_window: `1 hour`,
          incremental: true,
        },
        indexes: {
          user_status: {
            columns: [CUBE.user_id, CUBE.status, `orders__created_at_second`],
          },
        },
        stream_offset: `latest`,
        output_column_types: [
          { member: CUBE.order_id, type: `text` },
          { member: CUBE.user_id, type: `text` },
          { member: CUBE.status, type: `text` },
          { member: CUBE.created_at.second, type: `timestamp` },
          { member: CUBE.count, type: `int` },
          { member: CUBE.total_amount, type: `decimal` },
          { member: CUBE.failed_count, type: `int` },
        ],
      },
    },
  });
  ```
</CodeGroup>

Key properties for the streaming pre-aggregation:

* `read_only: true` — Cube will not create any objects in ksqlDB. The
  data is consumed directly from the backing Kafka topic.
* `stream_offset` — controls where Cube Store starts consuming from in
  the Kafka topic. Set to `"latest"` to only consume new messages
  arriving after the pre-aggregation is created. Set to `"earliest"` to
  replay the topic from the beginning. Defaults to `"latest"` if not
  specified. On subsequent refreshes, Cube Store automatically resumes
  from the last processed offset regardless of this setting.
* `unique_key_columns` — columns that uniquely identify a record, used
  for deduplication (see [below](#unique-key-columns-and-deduplication)).
* `output_column_types` — declares the output column types for the Cube
  Store table, required for Kafka streams mode (see
  [below](#output-column-types)).

#### Primary key and ungrouped queries

For the streaming pre-aggregation to work in read-only mode, the
generated SQL must not contain a `GROUP BY` clause — Cube Store's stream
post-processing engine does not support aggregation.

Cube automatically omits the `GROUP BY` clause when the dimensions
included in the pre-aggregation contain a primary key. In that case, the
generated query becomes a simple `SELECT ... FROM ...` without grouping,
and measures are passed through as raw expressions rather than
aggregated. This is what makes the pre-aggregation eligible for the
read-only streaming path.

You must include **all** primary key columns of the cube in the
streaming pre-aggregation's `dimensions` list. If any primary key
dimension is missing, the query may not be recognized as ungrouped
and will fail to use the streaming path.

The `sql_table` or `sql` value should reference an existing ksqlDB stream
or table. Cube discovers its schema automatically. With Kafka streams
mode enabled, the streaming pre-aggregation reads the backing Kafka topic
directly — no objects are created in ksqlDB.

<h4 id="topic-name-matching">
  Topic name matching
</h4>

In Kafka streams mode, Cube Store parses the `select_statement`
(generated from the cube's `sql` property) and matches the `FROM` table
name against the actual Kafka topic name. On managed platforms like
Confluent Cloud, the Kafka topic name often differs from the ksqlDB
stream or table name — for example, a ksqlDB stream called
`ORDER_EVENTS_STREAM` might be backed by a Kafka topic named
`pksqlc-abc123ORDER_EVENTS_STREAM`.

The cube's `sql` property must reference the **ksqlDB stream or table
name** (not the Kafka topic), because Cube uses ksqlDB `DESCRIBE` to
discover the schema and resolve the backing topic. However, Cube does
not currently rewrite the `FROM` clause in the generated
`select_statement` to use the resolved Kafka topic name. If the ksqlDB
object name differs from the Kafka topic name, Cube Store will fail
with:

> Topic table ORDER\_EVENTS\_STREAM is not found

<Warning>
  **The ksqlDB stream (or table) name and the backing Kafka topic name
  MUST be identical, including case.** Kafka streams mode will fail if
  they differ in any way.

  Take this into account when creating the stream — explicitly set the
  topic name to match the stream name (and vice versa). For example:

  ```sql theme={"dark"}
  CREATE STREAM ORDER_EVENTS_STREAM (...)
    WITH (KAFKA_TOPIC='ORDER_EVENTS_STREAM', VALUE_FORMAT='JSON', ...);
  ```

  The match is **case-sensitive**: `OrderEvents` and `ORDEREVENTS` are
  treated as different names and will cause the build to fail with
  `Topic table ... is not found`.

  This is a known limitation of Kafka streams mode. It does not occur
  when the ksqlDB object name and the Kafka topic name are the same,
  which is the default behavior when ksqlDB creates a stream or table
  with the default topic naming strategy.
</Warning>

### Unique key columns and deduplication

When `unique_key_columns` is set, Cube Store appends an internal
sequence column (`__seq`) to the table, populated from the Kafka
partition offset. The unique key columns together with `__seq` form the
sort key for all indexes on this table.

Entries in `unique_key_columns` are strings, not member references. Use
the dimension's own name (for example, `user_id`). To include the time
dimension as part of the unique key, use the form
`<time_dimension_name>_<granularity>` — for example, `created_at_second`
when `granularity` is `second`.

<Warning>
  The naming convention for the time dimension inside `unique_key_columns`
  is **not** the same as the column name used in
  [`indexes`](/reference/data-modeling/pre-aggregations#indexes). Indexes
  expect the fully-qualified alias from the generated `select_statement`
  (`<cube_name>__<time_dimension_name>_<granularity>`), while
  `unique_key_columns` expects only `<time_dimension_name>_<granularity>`.
</Warning>

If the pre-aggregation defines `indexes`, every dimension referenced by
any index must also be listed in `unique_key_columns`. Cube Store uses
`unique_key_columns` (plus `__seq`) as the sort key for all indexes, so
an index column that is not part of the unique key cannot be sorted on
during compaction and the pre-aggregation build will fail.

Deduplication is not applied at ingestion time — all incoming records are
appended as they arrive. Instead, Cube Store deduplicates during
**reads** and **compaction**: rows are sorted by the unique key columns
and then by `__seq`, and only the **last row per unique key** (the one
with the highest sequence number) is kept. This means that if the same
key appears multiple times in the stream, the most recent version is
always the one returned by queries.

For Kafka messages, unique key column values can come from either the
message **payload** (the JSON value) or the message **key**. If a column
listed in `unique_key_columns` is missing from the payload, Cube Store
falls back to the Kafka message key: for a single unique key column, the
raw key value is used; for composite keys, the key is expected to be a
JSON object with matching field names.

### Output column types

In Kafka streams mode, Cube Store creates its internal pre-aggregation
table based on column type information. By default, column types are
inferred from the source ksqlDB stream using `DESCRIBE`. However, the
pre-aggregation's `select_statement` (generated from the rollup
definition) renames and transforms columns — for example, a source
column `CREATED_AT` becomes `order_events_stream__created_at_second` in
the output.

When this renaming happens, the raw source column types no longer match
the output column names, causing errors like:

> Key column `order_events_stream__id` not found among column definitions

To fix this, define `output_column_types` on the streaming
pre-aggregation. This tells Cube the exact output column types to use
for the Cube Store table, and separately passes the source schema so
Cube Store can deserialize the raw Kafka messages correctly.

<CodeGroup>
  ```yaml title="YAML" theme={"dark"}
  pre_aggregations:
    - name: stream
      type: rollup
      read_only: true
      measures:
        - CUBE.count
        - CUBE.total_amount
        - CUBE.failed_count
      dimensions:
        - CUBE.order_id
        - CUBE.user_id
        - CUBE.status
      unique_key_columns:
        - order_id
      time_dimension: CUBE.created_at
      granularity: second
      partition_granularity: day
      # ksqlDB does not support NOW(); use CURRENT_TIMESTAMP with INTERVAL arithmetic instead.
      build_range_start:
        sql: "SELECT date_trunc('day', CURRENT_TIMESTAMP - INTERVAL '5 hour')"
      build_range_end:
        sql: "SELECT CURRENT_TIMESTAMP + INTERVAL '15 minute'"
      refresh_key:
        every: 1 minute
        update_window: 1 hour
        incremental: true
      stream_offset: latest
      output_column_types:
        - member: CUBE.order_id
          type: text
        - member: CUBE.user_id
          type: text
        - member: CUBE.status
          type: text
        - member: CUBE.created_at.second
          type: timestamp
        - member: CUBE.count
          type: int
        - member: CUBE.total_amount
          type: decimal
        - member: CUBE.failed_count
          type: int
  ```

  ```javascript title="JavaScript" theme={"dark"}
  pre_aggregations: {
    stream: {
      type: `rollup`,
      read_only: true,
      measures: [CUBE.count, CUBE.total_amount, CUBE.failed_count],
      dimensions: [CUBE.order_id, CUBE.user_id, CUBE.status],
      unique_key_columns: [`order_id`],
      time_dimension: CUBE.created_at,
      granularity: `second`,
      partition_granularity: `day`,
      // ksqlDB does not support NOW(); use CURRENT_TIMESTAMP with INTERVAL arithmetic instead.
      build_range_start: {
        sql: `SELECT date_trunc('day', CURRENT_TIMESTAMP - INTERVAL '5 hour')`,
      },
      build_range_end: {
        sql: `SELECT CURRENT_TIMESTAMP + INTERVAL '15 minute'`,
      },
      refresh_key: {
        every: `1 minute`,
        update_window: `1 hour`,
        incremental: true,
      },
      stream_offset: `latest`,
      output_column_types: [
        { member: CUBE.order_id, type: `text` },
        { member: CUBE.user_id, type: `text` },
        { member: CUBE.status, type: `text` },
        { member: CUBE.created_at.second, type: `timestamp` },
        { member: CUBE.count, type: `int` },
        { member: CUBE.total_amount, type: `decimal` },
        { member: CUBE.failed_count, type: `int` },
      ],
    },
  },
  ```
</CodeGroup>

Each entry in `output_column_types` has two properties:

* `member` — a reference to a dimension or measure included in the
  pre-aggregation. This must be a `CUBE` member reference (for example,
  `CUBE.user_id`), not a string. For the time dimension, reference it
  with the rollup granularity attached — for example,
  `CUBE.created_at.second` when `granularity` is `second`.
* `type` — the Cube Store column type. Common values: `text`, `int`,
  `bigint`, `decimal`, `float`, `boolean`, `timestamp`.

Include an entry for every dimension and measure in the pre-aggregation.
The time dimension must also be listed, with `type: timestamp` and the
granularity suffix on the member reference as shown above.

When `output_column_types` is defined, Cube uses the aliased column
names (matching the `select_statement`) for the Cube Store table
definition and passes the raw source schema separately via
`source_table`, so Cube Store knows how to deserialize incoming Kafka
messages. Without it, column names come from the raw ksqlDB `DESCRIBE`
output and will not match the aliased names in the `select_statement`
or `unique_key_columns`.

<Warning>
  `output_column_types` is required for Kafka streams mode when the
  pre-aggregation uses `unique_key_columns`. Without it, the unique key
  column names will not match the table column definitions, causing the
  pre-aggregation build to fail.
</Warning>

### Stream format

Cube Store expects Kafka messages to have a **JSON object** as their
value payload, with field names matching the column names defined in the
cube. For example, given the streaming cube above, each Kafka message
value should look like:

```json theme={"dark"}
{
  "ORDER_ID": "ord_12345",
  "USER_ID": "usr_789",
  "STATUS": "completed",
  "AMOUNT": 49.99,
  "CREATED_AT": "2025-01-15T10:30:00.000"
}
```

Field names are case-sensitive and must match the column names used in
the `sql` property of each dimension and measure definition. Missing
fields default to `null`.

The message key is optional. When present and the value starts with `{`,
it is parsed as a JSON object and used as a fallback source for unique
key column values (see [above](#unique-key-columns-and-deduplication)).

#### Timestamp handling

For dimensions with `type: time`, Cube Store accepts timestamp values in
two formats:

* **String** — parsed using ISO 8601 / RFC 3339 formats. Supported
  patterns include:
  * `2025-01-15T10:30:00.000Z`
  * `2025-01-15T10:30:00Z`
  * `2025-01-15 10:30:00.000 UTC`
  * `2025-01-15T10:30:00`
  * `2025-01-15 10:30:00`
  * `2025-01-15`
* **Number** — interpreted as **epoch milliseconds** (not seconds, not
  microseconds). For example, `1736939400000` represents
  `2025-01-15T10:30:00.000Z`.

If your Kafka topic produces timestamps as strings in a non-standard
format, you can use `PARSE_TIMESTAMP` in the cube's `sql` property to
convert them. In that case, define the source column as `type: string`
in a `source_table` and use the `select_statement` to transform it:

```javascript theme={"dark"}
sql: `SELECT PARSE_TIMESTAMP(TIMESTAMP_STR,
  'yyyy-MM-dd''T''HH:mm:ss.SSSX', 'UTC') AS created_at,
  ORDER_ID, USER_ID, STATUS, AMOUNT
  FROM ORDER_EVENTS_STREAM`,
```

Time dimension truncation (controlled by the `granularity` property of
the pre-aggregation) is handled automatically. Cube generates the
appropriate `PARSE_TIMESTAMP(FORMAT_TIMESTAMP(CONVERT_TZ(...)))`
expression chain to truncate timestamps to the configured granularity
(e.g., `day`, `hour`, `minute`). Cube Store evaluates these expressions
natively on each micro-batch during ingestion. Standard SQL functions
like `date_trunc` are also available in the `select_statement`.

##### Converting `bigint` timestamps

When a source column stores a timestamp as a `bigint` (a common pattern
in Kafka topics — for example, a `created_at` field with values like
`1778800167128`), the value must be converted to a timestamp before it
can be used as a time dimension.

The conversion depends on the unit of the `bigint` value:

* **Milliseconds** (most common, e.g. `Date.now()` in JavaScript, Java's
  `System.currentTimeMillis()`) — multiply by `1000` before casting,
  because `CAST(... AS TIMESTAMP(6))` interprets numeric input as
  **microseconds**. Without the multiplication, a millisecond value like
  `1778800167128` is read as microseconds and produces a timestamp in
  1970 instead of the intended date.
* **Microseconds** — cast directly to `TIMESTAMP(6)`.
* **Seconds** — multiply by `1000000` before casting.

Apply the conversion in the time dimension's `sql` property:

```javascript theme={"dark"}
dimensions: {
  created_at: {
    sql: `CAST(${CUBE}.created_at * 1000 AS TIMESTAMP(6))`,
    type: `time`
  }
}
```

The cast runs first; any granularity truncation generated by Cube
(`PARSE_TIMESTAMP(FORMAT_TIMESTAMP(CONVERT_TZ(...)))`) then operates on
the resulting `TIMESTAMP(6)` value.

<Tip>
  When debugging a `bigint` timestamp issue, temporarily remove
  `partition_granularity` from the pre-aggregation. This skips
  partitioned builds and makes it easier to verify the conversion is
  correct before re-enabling partitioning.
</Tip>

### Filtering on the stream

When the streaming cube defines a `sql` property with a `SELECT`
statement (rather than `sql_table`), Cube Store applies the projection
and any `WHERE` filters from that statement directly on each micro-batch
of incoming Kafka messages. This filtering happens inside Cube Store
using its query engine — it does not require ksqlDB to process the
filter. Only rows that pass the filter are ingested into the
pre-aggregation table.

This allows you to define a streaming cube that only ingests a subset of
the data from the underlying Kafka topic without creating any
server-side filter objects in ksqlDB.

#### Supported SQL syntax

The `SELECT` statement must follow a strict shape. Cube Store only
accepts plans that resolve to **Projection > Filter > TableScan** (where
the filter is optional). Any other query plan shape is rejected.

**Supported:**

* `SELECT` with column references (e.g., `SELECT col1, col2 FROM topic`)
* `SELECT *` wildcard
* Column aliases (`SELECT col1 AS my_alias`)
* `WHERE` clause with comparison operators (`=`, `!=`, `<`, `>`, `<=`,
  `>=`)
* Boolean logic in `WHERE` (`AND`, `OR`, `NOT`)
* `IS NULL` and `IS NOT NULL`
* `IN` lists (`col IN (1, 2, 3)`)
* `BETWEEN` expressions
* `CASE ... WHEN ... THEN ... ELSE ... END` expressions
* `CAST(expr AS type)` type conversions
* `EXTRACT(field FROM expr)` for date/time parts
* `SUBSTRING(expr FROM start FOR length)`
* Scalar functions (e.g., `COALESCE`, `CONCAT`, arithmetic)
* `CONVERT_TZ` for timezone conversion (internally rewritten for
  compatibility)
* `PARSE_TIMESTAMP` and `FORMAT_TIMESTAMP` for timestamp parsing and
  formatting using ksql-style format strings (e.g.,
  `yyyy-MM-dd'T'HH:mm:ss.SSS`)
* Nested expressions with parentheses
* `date_trunc` for timestamp truncation

**Not supported:**

* `JOIN` clauses — only a single `FROM` table is allowed
* Subqueries in `SELECT` or `WHERE`
* `GROUP BY`, `HAVING`, or aggregate functions (`SUM`, `COUNT`, `AVG`,
  etc.)
* `ORDER BY` (rows are consumed in stream order)
* `LIMIT` and `OFFSET`
* `UNION`, `INTERSECT`, `EXCEPT`
* Window functions (`OVER`, `PARTITION BY`)
* Multiple `FROM` or multiple `WHERE` clauses
* Common Table Expressions (`WITH ... AS`)

All column expressions in the `SELECT` list that are not simple column
references must have explicit aliases. Unique key columns may reference
the source column through a scalar function (e.g.,
`CAST(id AS VARCHAR) AS id`), but not through arbitrary expressions.

[ref-data-source-concurrency]: /admin/connect-to-data/concurrency#data-source-concurrency

[ref-powerbi]: /docs/integrations/power-bi

[ref-dax-api]: /reference/core-data-apis/dax-api

[ref-streaming-pre-aggs]: /docs/pre-aggregations/using-pre-aggregations#streaming-pre-aggregations

[ref-decorated-env-vars]: /admin/connect-to-data/multiple-data-sources#under-the-hood

[ref-cube-data-source]: /reference/data-modeling/cube#data_source

[ref-lambda-pre-aggs]: /docs/pre-aggregations/lambda-pre-aggregations
