ProductDocsArchitectureBlogGitHubGitHubGet Started
Available

Stream & KeyedStream

Streaming pipeline builder for unbounded, event-time workloads.

Stream

Stream wraps an unbounded data source and provides chaining methods to build streaming pipelines.

MethodReturnsDescription
key_by(key_col)Result<KeyedStream>Partition the stream by a key column for stateful ops.
broadcast()BroadcastStreamBroadcast stream to all operator instances.
connect(other: Stream)ConnectedStreamsPair two streams for a co-process function.
watermark(col, lag_ms)Result<Stream>Assign a watermark using an event-time column and lag.
with_watermark(spec: WatermarkSpec)Result<Stream>Assign a watermark using a typed spec.
with_multi_source_watermark(spec)Result<Stream>Multi-source watermark alignment.
with_state_ttl(config: StateTtlConfig)Result<Stream>Attach a TTL policy to downstream keyed state.
tumbling_window(size_ms)WindowedStreamApply a tumbling window of the given duration.
sliding_window_ms(size_ms, slide_ms)WindowedStreamApply a sliding (hop) window.
session_window_ms(gap_ms)WindowedStreamApply a session window with inactivity gap.

KeyedStream

KeyedStream is a stream partitioned by a key. Stateful operators (process functions, window aggregations) operate per-key.

MethodReturnsDescription
tumbling_window(size_ms)WindowedStreamFixed-size non-overlapping window per key.
sliding_window_ms(size_ms, slide_ms)WindowedStreamSliding/hop window per key.
session_window_ms(gap_ms)SessionWindowedStreamSession window per key.
window(spec: LocalWindowKind)WindowedStreamApply a window using a typed spec.
connect(other: KeyedStream)ConnectedStreamsJoin two keyed streams for co-process.
with_multi_source_watermark(spec)Result<KeyedStream>Multi-source watermark for aligned processing.

Example

use krishiv_api::{Session, Result};

#[tokio::main]
async fn main() -> Result<()> {
    let session = Session::embedded().await?;
    let (stream, sender) = session.memory_stream(schema)?;

    // Build a keyed tumbling window
    let windowed = stream
        .watermark("event_time", 5000)?
        .key_by("user_id")?
        .tumbling_window(60_000); // 1-minute window

    // Aggregate and collect
    let agg = windowed.agg(vec![count(col("*")), sum(col("amount"))]);
    // ... submit or collect
    Ok(())
}