AvailableStream & Windows
Stream, KeyedStream, WindowedStream — streaming pipeline builders.
Stream
Obtained from session.stream(name), session.memory_stream(schema), or the top-level read_kafka() / read_kinesis() helpers.
| Method | Returns | Description |
key_by(key_col: str) -> KeyedStream | KeyedStream | Partition the stream by a key column. |
broadcast() -> BroadcastStream | BroadcastStream | Broadcast stream to all operator instances. |
connect(other: Stream) -> ConnectedStreams | ConnectedStreams | Pair two streams for co-process. |
watermark(col, lag_ms) -> Stream | Stream | Assign watermark using event-time column and lag. |
with_watermark(spec) -> Stream | Stream | Assign watermark with a typed WatermarkSpec. |
with_multi_source_watermark(spec) -> Stream | Stream | Multi-source watermark alignment. |
with_state_ttl(config) -> Stream | Stream | Attach TTL policy to downstream keyed state. |
tumbling_window(size_ms) -> WindowedStream | WindowedStream | Fixed-size non-overlapping window. |
tumbling_window_ms(size_ms) -> WindowedStream | WindowedStream | Same as tumbling_window. |
sliding_window_ms(size_ms, slide_ms) -> WindowedStream | WindowedStream | Sliding (hop) window. |
session_window_ms(gap_ms) -> WindowedStream | WindowedStream | Session window with inactivity gap. |
KeyedStream
| Method | Returns | Description |
tumbling_window(size_ms) -> WindowedStream | WindowedStream | Per-key tumbling window. |
tumbling_window_ms(size_ms) -> WindowedStream | WindowedStream | Alias. |
sliding_window_ms(size_ms, slide_ms) -> WindowedStream | WindowedStream | Per-key sliding window. |
session_window_ms(gap_ms) -> WindowedStream | WindowedStream | Per-key session window. |
window(spec) -> WindowedStream | WindowedStream | Apply a typed window specification. |
connect(other: KeyedStream) -> ConnectedStreams | ConnectedStreams | Pair two keyed streams for co-process. |
with_multi_source_watermark(spec) -> KeyedStream | KeyedStream | Multi-source watermark for aligned processing. |
WindowedStream
| Method | Returns | Description |
agg(exprs) -> WindowedStream | WindowedStream | Apply aggregate expressions to each window. |
collect() -> list[RecordBatch] | list | Materialise all window results (bounded streams only). |
try_next() -> RecordBatch | None | RecordBatch | None | Poll for the next window result batch. |
window_kind() -> str | str | "Tumbling", "Sliding", or "Session". |
window_size_ms() -> int | int | Window size in milliseconds. |
slide_ms() -> int | None | int | None | Slide interval (sliding windows only). |
session_gap_ms() -> int | None | int | None | Inactivity gap (session windows only). |
tumbling_window(size_ms) -> WindowedStream | WindowedStream | Re-window an existing windowed stream. |
Example
import krishiv as ks
from krishiv.functions import count, sum, col
session = ks.Session.embedded()
schema = ... # PyArrow Schema
stream, sender = session.memory_stream(schema)
windowed = (stream
.watermark("event_time", 5000) # 5-second lag
.key_by("user_id")
.tumbling_window(60_000)) # 1-minute windows
result = windowed.agg([count(col("*")).alias("events"), sum(col("amount")).alias("total")])
# Push data
import pyarrow as pa
sender.send(pa.record_batch(...))
# Collect results
batches = result.collect()