Stream & KeyedStream
Streaming pipeline builder for unbounded, event-time workloads.
Stream
Stream wraps an unbounded data source and provides chaining methods to build streaming pipelines.
| Method | Returns | Description |
|---|---|---|
key_by(key_col) | Result<KeyedStream> | Partition the stream by a key column for stateful ops. |
broadcast() | BroadcastStream | Broadcast stream to all operator instances. |
connect(other: Stream) | ConnectedStreams | Pair two streams for a co-process function. |
watermark(col, lag_ms) | Result<Stream> | Assign a watermark using an event-time column and lag. |
with_watermark(spec: WatermarkSpec) | Result<Stream> | Assign a watermark using a typed spec. |
with_multi_source_watermark(spec) | Result<Stream> | Multi-source watermark alignment. |
with_state_ttl(config: StateTtlConfig) | Result<Stream> | Attach a TTL policy to downstream keyed state. |
tumbling_window(size_ms) | WindowedStream | Apply a tumbling window of the given duration. |
sliding_window_ms(size_ms, slide_ms) | WindowedStream | Apply a sliding (hop) window. |
session_window_ms(gap_ms) | WindowedStream | Apply a session window with inactivity gap. |
KeyedStream
KeyedStream is a stream partitioned by a key. Stateful operators (process functions, window aggregations) operate per-key.
| Method | Returns | Description |
|---|---|---|
tumbling_window(size_ms) | WindowedStream | Fixed-size non-overlapping window per key. |
sliding_window_ms(size_ms, slide_ms) | WindowedStream | Sliding/hop window per key. |
session_window_ms(gap_ms) | SessionWindowedStream | Session window per key. |
window(spec: LocalWindowKind) | WindowedStream | Apply a window using a typed spec. |
connect(other: KeyedStream) | ConnectedStreams | Join two keyed streams for co-process. |
with_multi_source_watermark(spec) | Result<KeyedStream> | Multi-source watermark for aligned processing. |
Example
use krishiv_api::{Session, Result};
#[tokio::main]
async fn main() -> Result<()> {
let session = Session::embedded().await?;
let (stream, sender) = session.memory_stream(schema)?;
// Build a keyed tumbling window
let windowed = stream
.watermark("event_time", 5000)?
.key_by("user_id")?
.tumbling_window(60_000); // 1-minute window
// Aggregate and collect
let agg = windowed.agg(vec![count(col("*")), sum(col("amount"))]);
// ... submit or collect
Ok(())
}