ProductDocsArchitectureBlogGitHubGitHubGet Started
Available

Stream & Windows

Stream, KeyedStream, WindowedStream — streaming pipeline builders.

Stream

Obtained from session.stream(name), session.memory_stream(schema), or the top-level read_kafka() / read_kinesis() helpers.

MethodReturnsDescription
key_by(key_col: str) -> KeyedStreamKeyedStreamPartition the stream by a key column.
broadcast() -> BroadcastStreamBroadcastStreamBroadcast stream to all operator instances.
connect(other: Stream) -> ConnectedStreamsConnectedStreamsPair two streams for co-process.
watermark(col, lag_ms) -> StreamStreamAssign watermark using event-time column and lag.
with_watermark(spec) -> StreamStreamAssign watermark with a typed WatermarkSpec.
with_multi_source_watermark(spec) -> StreamStreamMulti-source watermark alignment.
with_state_ttl(config) -> StreamStreamAttach TTL policy to downstream keyed state.
tumbling_window(size_ms) -> WindowedStreamWindowedStreamFixed-size non-overlapping window.
tumbling_window_ms(size_ms) -> WindowedStreamWindowedStreamSame as tumbling_window.
sliding_window_ms(size_ms, slide_ms) -> WindowedStreamWindowedStreamSliding (hop) window.
session_window_ms(gap_ms) -> WindowedStreamWindowedStreamSession window with inactivity gap.

KeyedStream

MethodReturnsDescription
tumbling_window(size_ms) -> WindowedStreamWindowedStreamPer-key tumbling window.
tumbling_window_ms(size_ms) -> WindowedStreamWindowedStreamAlias.
sliding_window_ms(size_ms, slide_ms) -> WindowedStreamWindowedStreamPer-key sliding window.
session_window_ms(gap_ms) -> WindowedStreamWindowedStreamPer-key session window.
window(spec) -> WindowedStreamWindowedStreamApply a typed window specification.
connect(other: KeyedStream) -> ConnectedStreamsConnectedStreamsPair two keyed streams for co-process.
with_multi_source_watermark(spec) -> KeyedStreamKeyedStreamMulti-source watermark for aligned processing.

WindowedStream

MethodReturnsDescription
agg(exprs) -> WindowedStreamWindowedStreamApply aggregate expressions to each window.
collect() -> list[RecordBatch]listMaterialise all window results (bounded streams only).
try_next() -> RecordBatch | NoneRecordBatch | NonePoll for the next window result batch.
window_kind() -> strstr"Tumbling", "Sliding", or "Session".
window_size_ms() -> intintWindow size in milliseconds.
slide_ms() -> int | Noneint | NoneSlide interval (sliding windows only).
session_gap_ms() -> int | Noneint | NoneInactivity gap (session windows only).
tumbling_window(size_ms) -> WindowedStreamWindowedStreamRe-window an existing windowed stream.

Example

import krishiv as ks
from krishiv.functions import count, sum, col

session = ks.Session.embedded()
schema = ...  # PyArrow Schema
stream, sender = session.memory_stream(schema)

windowed = (stream
    .watermark("event_time", 5000)   # 5-second lag
    .key_by("user_id")
    .tumbling_window(60_000))        # 1-minute windows

result = windowed.agg([count(col("*")).alias("events"), sum(col("amount")).alias("total")])

# Push data
import pyarrow as pa
sender.send(pa.record_batch(...))

# Collect results
batches = result.collect()