ProductDocsArchitectureBlogGitHubGitHubGet Started
Experimental

IncrementalFlow

Incremental view maintenance with DeltaBatch and tick-based processing.

Overview

IncrementalFlow maintains query results incrementally by processing DeltaBatch changes (weighted Arrow rows with an Int64 _weight column: +1 = insert, -1 = retract).

Construction

IncrementalFlow::new(session: &Session) -> Result<IncrementalFlow>

Methods

MethodReturnsDescription
register_source(name, schema)Result<()>Register a named source that accepts DeltaBatch input.
register_view(name, query)Result<()>Register a SQL query as an incrementally maintained view.
tick(source_name, delta_batch)Future<Result<StepSummary>>Deliver a delta batch and advance the view.
snapshot(view_name)Future<Result<Vec<RecordBatch>>>Read the current materialised snapshot of a view.
watch_output(view_name)Result<Receiver<DeltaBatch>>Subscribe to incremental output deltas from a view.
checkpoint(path)Future<Result<()>>Persist the current flow state to a checkpoint path.
restore(path)Future<Result<()>>Restore flow state from a checkpoint.

StepSummary

FieldTypeDescription
rows_inusizeRows received in this tick.
rows_outusizeDelta rows emitted to output views.
duration_msu64Wall-clock time to process this tick.

Example

use krishiv_api::{Session, IncrementalFlow, Result};

#[tokio::main]
async fn main() -> Result<()> {
    let session = Session::embedded().await?;
    let mut flow = IncrementalFlow::new(&session)?;

    flow.register_source("orders", orders_schema)?;
    flow.register_view("totals",
        "SELECT customer_id, SUM(amount) AS total FROM orders GROUP BY customer_id")?;

    // Deliver a delta (weighted batch)
    let summary = flow.tick("orders", delta_batch).await?;
    println!("Processed {} rows, emitted {} delta rows", summary.rows_in, summary.rows_out);

    // Read current view
    let snapshot = flow.snapshot("totals").await?;
    Ok(())
}