IncrementalFlow
Incremental view maintenance with DeltaBatch and tick-based processing.
Overview
IncrementalFlow maintains query results incrementally by processing DeltaBatch changes (weighted Arrow rows with an Int64 _weight column: +1 = insert, -1 = retract).
Construction
IncrementalFlow::new(session: &Session) -> Result<IncrementalFlow>
Methods
| Method | Returns | Description |
|---|---|---|
register_source(name, schema) | Result<()> | Register a named source that accepts DeltaBatch input. |
register_view(name, query) | Result<()> | Register a SQL query as an incrementally maintained view. |
tick(source_name, delta_batch) | Future<Result<StepSummary>> | Deliver a delta batch and advance the view. |
snapshot(view_name) | Future<Result<Vec<RecordBatch>>> | Read the current materialised snapshot of a view. |
watch_output(view_name) | Result<Receiver<DeltaBatch>> | Subscribe to incremental output deltas from a view. |
checkpoint(path) | Future<Result<()>> | Persist the current flow state to a checkpoint path. |
restore(path) | Future<Result<()>> | Restore flow state from a checkpoint. |
StepSummary
| Field | Type | Description |
|---|---|---|
rows_in | usize | Rows received in this tick. |
rows_out | usize | Delta rows emitted to output views. |
duration_ms | u64 | Wall-clock time to process this tick. |
Example
use krishiv_api::{Session, IncrementalFlow, Result};
#[tokio::main]
async fn main() -> Result<()> {
let session = Session::embedded().await?;
let mut flow = IncrementalFlow::new(&session)?;
flow.register_source("orders", orders_schema)?;
flow.register_view("totals",
"SELECT customer_id, SUM(amount) AS total FROM orders GROUP BY customer_id")?;
// Deliver a delta (weighted batch)
let summary = flow.tick("orders", delta_batch).await?;
println!("Processed {} rows, emitted {} delta rows", summary.rows_in, summary.rows_out);
// Read current view
let snapshot = flow.snapshot("totals").await?;
Ok(())
}