# Manage Flows

> Describes how to manage flows in GreptimeDB, including creating, updating, and deleting flows. It explains the syntax for creating flows, the importance of sink tables, and how to use the EXPIRE AFTER clause. Examples of SQL queries for managing flows are provided.

# Manage Flows

Each `flow` is a continuous aggregation query in GreptimeDB.
It continuously updates the aggregated data based on the incoming data.
This document describes how to create, and delete a flow.

:::note
Flow uses batching mode for aggregation and TQL workloads. Simple non-aggregation Flow queries currently use the deprecated streaming mode and are not recommended for new workloads.
:::

## Create a Source Table

Before creating a flow, you need to create a source table to store the raw data. Like this:

```sql
CREATE TABLE temp_sensor_data (
  sensor_id INT,
  loc STRING,
  temperature DOUBLE,
  ts TIMESTAMP TIME INDEX,
  PRIMARY KEY(sensor_id, loc)
);
```
Avoid using `WITH ('ttl' = 'instant')` for new Flow source tables. Source tables with `ttl='instant'` fall back to the deprecated streaming mode. Keep the source data with an appropriate TTL instead, so aggregation and TQL Flow workloads can run in batching mode.

For existing legacy streaming-mode deployments, a source table may use `WITH ('ttl' = 'instant')`:

```sql
CREATE TABLE temp_sensor_data (
  sensor_id INT,
  loc STRING,
  temperature DOUBLE,
  ts TIMESTAMP TIME INDEX,
  PRIMARY KEY(sensor_id, loc)
) WITH ('ttl' = 'instant');
```

Setting `'ttl'` to `'instant'` makes the table discard inserted data immediately and only sends rows to a legacy streaming-mode flow task. This pattern is deprecated for new workloads.

## Create a Sink Table

Before creating a flow, you need a sink table to store the aggregated data generated by the flow.
While it is the same to a regular time series table, there are a few important considerations:

- **Column order and type**: Ensure the order and type of the columns in the sink table match the query result of the flow.
- **Time index**: Specify the `TIME INDEX` for the sink table, typically using the time window column generated by the time window function.
- **Update time**: The Flow engine automatically appends the update time to the end of each computation result row. This update time is stored in the `update_at` column. Ensure that this column is included in the sink table schema.
- **Tags**: Use `PRIMARY KEY` to specify Tags, which together with the time index serves as a unique identifier for row data and optimizes query performance.

For example:

```sql
/* Create sink table */
CREATE TABLE temp_alerts (
  sensor_id INT,
  loc STRING,
  max_temp DOUBLE,
  time_window TIMESTAMP TIME INDEX,
  update_at TIMESTAMP,
  PRIMARY KEY(sensor_id, loc)
);

CREATE FLOW temp_monitoring
SINK TO temp_alerts
AS
SELECT
  sensor_id,
  loc,
  max(temperature) AS max_temp,
  date_bin('10 seconds'::INTERVAL, ts) AS time_window
FROM temp_sensor_data
GROUP BY
  sensor_id,
  loc,
  time_window
HAVING max_temp > 100;
```

The sink table has the columns `sensor_id`, `loc`, `max_temp`, `time_window`, and `update_at`.

- The first four columns correspond to the query result columns of flow: `sensor_id`, `loc`, `max(temperature)` and `date_bin('10 seconds'::INTERVAL, ts)` respectively.
- The `time_window` column is specified as the `TIME INDEX` for the sink table.
- The `update_at` column is the last one in the schema to store the update time of the data.
- The `PRIMARY KEY` at the end of the schema definition specifies `sensor_id` and `loc` as the tag columns.
  This means the flow will insert or update data based on the tags `sensor_id` and `loc` along with the time index `time_window`.

## Create a flow

The grammar to create a flow is:

<!-- ```sql
CREATE [ OR REPLACE ] FLOW [ IF NOT EXISTS ] <name>
SINK TO <sink-table-name>
[ EXPIRE AFTER <expr> ]
[ COMMENT '<string>' ]
AS 
<SQL>;
``` -->

```sql
CREATE [ OR REPLACE ] FLOW [ IF NOT EXISTS ] <flow-name>
SINK TO <sink-table-name>
[ EXPIRE AFTER <expr> ]
[ COMMENT '<string>' ]
[ WITH (<flow-option> = <value> [, ...]) ]
AS 
<SQL>;
```

When `OR REPLACE` is specified, any existing flow with the same name will be updated to the new version. It's important to note that this only affects the flow task itself; the source and sink tables will remain unchanged.

Conversely, when `IF NOT EXISTS` is specified, the command will have no effect if the flow already exists, rather than reporting an error. Additionally, please note that `OR REPLACE` cannot be used in conjunction with `IF NOT EXISTS`.

- `flow-name` is an unique identifier in the catalog level.
- `sink-table-name` is the table name where the materialized aggregated data is stored.
  It can be an existing table or a new one. `flow` will create the sink table if it doesn't exist. 
  <!-- If the table already exists, its schema must match the schema of the query result. -->
- `EXPIRE AFTER` is an optional interval to expire the data from the Flow engine.
  For more details, please refer to the [`EXPIRE AFTER`](#expire-after) part.
- `COMMENT` is the description of the flow.
- `WITH` specifies flow options.
  For example, the experimental `experimental_enable_incremental_read` option enables incremental source reads for eligible batching flows.
- `SQL` part defines the continuous aggregation query.
  It defines the source tables provide data for the flow.
  Each flow can have multiple source tables.
  Please Refer to [Write a Query](#write-a-sql-query) for the details.

A simple example to create a flow:

```sql
CREATE FLOW IF NOT EXISTS my_flow
SINK TO my_sink_table
EXPIRE AFTER '1 hour'::INTERVAL
COMMENT 'My first flow in GreptimeDB'
AS
SELECT
    max(temperature) as max_temp,
    date_bin('10 seconds'::INTERVAL, ts) as time_window,
FROM temp_sensor_data
GROUP BY time_window;
```

The created flow will compute `max(temperature)` for every 10 seconds and store the result in `my_sink_table`. All data comes within 1 hour will be used in the flow.

### EXPIRE AFTER

The `EXPIRE AFTER` clause specifies the interval after which data will expire from the flow engine. 

Data in the source table that exceeds the specified expiration time will no longer be included in the flow's calculations.
Similarly, data in the sink table that is older than the expiration time will not be updated. 
This means the flow engine will ignore data older than the specified interval during aggregation.
This mechanism helps to manage the state size for stateful queries, such as those involving `GROUP BY`.

It is important to note that the `EXPIRE AFTER` clause does not delete data from either the source table or the sink table.
It only controls how the flow engine processes the data.
If you want to delete data from the source or sink table, please [set the `TTL` option](/user-guide/manage-data/overview.md#manage-data-retention-with-ttl-policies) when creating tables.

Setting a reasonable time interval for `EXPIRE AFTER` is helpful to limit how far back the batching engine needs to recompute results and to avoid excessive resource usage. It serves a similar purpose to bounding lateness in stream processing systems, but new Flow workloads should use batching mode.

For example, if the flow engine processes the aggregation at 10:00:00 and the `'1 hour'::INTERVAL` is set,
any input data that arrive now with a time index older than 1 hour (before 09:00:00) will expire and be ignore.
Only data timestamped from 09:00:00 onwards will be used in the aggregation and update to sink table.

### Experimental incremental source reads

:::warning Experimental feature
The `experimental_enable_incremental_read` option is experimental.
Its behavior and limitations may change in future releases.
:::

For batching SQL flows whose source tables are append-only, you can enable incremental source reads:

```sql
CREATE TABLE temp_sensor_data (
  sensor_id INT,
  loc STRING,
  temperature DOUBLE,
  ts TIMESTAMP TIME INDEX,
  PRIMARY KEY(sensor_id, loc)
) WITH ('append_mode' = 'true');

CREATE FLOW temp_monitoring
SINK TO temp_alerts
WITH (experimental_enable_incremental_read = 'true')
AS
SELECT
  sensor_id,
  loc,
  max(temperature) AS max_temp,
  date_bin('10 seconds'::INTERVAL, ts) AS time_window
FROM temp_sensor_data
GROUP BY
  sensor_id,
  loc,
  time_window;
```

When this option is enabled, Flow keeps per-region source sequence watermarks and attempts to read only newly appended source rows after the initial full snapshot.
This is an execution optimization and does not change the query result.

The current limitations are:

- All source tables must be append-only tables created with `append_mode = 'true'`.
  Flow creation fails if any source table is not append-only.
- The optimization only applies to batching SQL flows.
  TQL flows, unsupported aggregate shapes, and simple projection/filter flows do not use incremental source reads.
- Source tables created with `ttl = 'instant'` currently use streaming mode and do not use this batching-mode option.
- The first run still needs a full snapshot.
  Later runs may fall back to full snapshot or retry/repair when GreptimeDB cannot safely use incremental source reads.

### Write a SQL query

The `SQL` part of the flow is similar to a standard `SELECT` clause with a few differences. The syntax of the query is as follows:

```sql
SELECT AGGR_FUNCTION(column1, column2,..) [, TIME_WINDOW_FUNCTION() as time_window] FROM <source_table> GROUP BY {time_window | column1, column2,.. };
```

Only the following types of expressions are allowed after the `SELECT` keyword:
- Aggregate functions: Refer to the [Expressions](expressions.md) documentation for details.
- Time window functions: Refer to the [define time window](#define-time-window) section for details.
- Scalar functions: Such as `col`, `to_lowercase(col)`, `col + 1`, etc. This part is the same as in a standard `SELECT` clause in GreptimeDB.

The following points should be noted about the rest of the query syntax:
- The query must include a `FROM` clause to specify the source table.
  As join clauses are currently not supported,
  the query can only aggregate columns from a single table.
- `WHERE` and `HAVING` clauses are supported.
  The `WHERE` clause filters data before aggregation,
  while the `HAVING` clause filters data after aggregation.
- `DISTINCT` currently only works with the `SELECT DISTINCT column1 ..` syntax.
  It is used to remove duplicate rows from the result set.
  Support for `SELECT count(DISTINCT column1) ...` is not available yet but will be added in the future.
- The `GROUP BY` clause works the same as a standard queries,
  grouping data by specified columns.
  The time window column in the `GROUP BY` clause is crucial for continuous aggregation scenarios.
  Other expressions in `GROUP BY` can include literals, columns, or scalar expressions.
- `ORDER BY`, `LIMIT`, and `OFFSET` are not supported.

Refer to [Continuous Aggregation](continuous-aggregation.md) for more examples of how to use continuous aggregation in real-time analytics, monitoring, and dashboards.

### Define time window

A time window is a crucial attribute of your continuous aggregation query.
It determines how data is aggregated within the flow.
These time windows are left-closed and right-open intervals.

A time window represents a specific range of time.
Data from the source table is mapped to the corresponding window based on the time index column.
The time window also defines the scope for each calculation of an aggregation expression,
resulting in one row per time window in the result table.

You can use `date_bin()` after the `SELECT` keyword to define fixed time windows.
For example:

```sql
SELECT
    max(temperature) as max_temp,
    date_bin('10 seconds'::INTERVAL, ts) as time_window
FROM temp_sensor_data
GROUP BY time_window;
```

In this example, the `date_bin('10 seconds'::INTERVAL, ts)` function creates 10-second time windows starting from UTC 00:00:00.
The `max(temperature)` function calculates the maximum temperature value within each time window.

For more details on the behavior of the function,
please refer to [`date_bin`](/reference/sql/functions/df-functions.md#date_bin).

:::tip NOTE
Currently, flow rely on the time window expr to determine how to incrementally update the result. So it's better to use a relatively small time window when possible.
:::

## Flush a flow

The flow engine automatically processes aggregation operations within a short period(i.e. few seconds) when new data arrives in the source table.
However, you can manually trigger the flow engine to process the aggregation operation immediately using the `ADMIN FLUSH_FLOW` command.

```sql
ADMIN FLUSH_FLOW('<flow-name>')
```

## Delete a flow

To delete a flow, use the following `DROP FLOW` clause:

```sql
DROP FLOW [IF EXISTS] <name>
```

For example:

```sql
DROP FLOW IF EXISTS my_flow;
```
