State
ValueState, MapState, ListState — per-key operator state for process functions.
Overview
State objects are created inside process functions and scoped to the current key. State is backed by krishiv-state (in-memory or RocksDB depending on the durability profile). State is serialised to/from JSON values.
ValueState
ValueState(name: str)
| Method | Returns | Description |
|---|---|---|
key() -> str | str | Return the current partition key. |
set_json(value: Any) | None | Store a JSON-serialisable value for the current key. |
clear() | None | Delete the stored value for the current key. |
MapState
MapState(name: str)
| Method | Returns | Description |
|---|---|---|
key() -> str | str | Return the current partition key. |
put_json(map_key: str, value: Any) | None | Store a value at map_key within the keyed map. |
clear() | None | Clear all entries for the current key. |
ListState
ListState(name: str)
| Method | Returns | Description |
|---|---|---|
key() -> str | str | Return the current partition key. |
add_json(value: Any) | None | Append a JSON-serialisable value to the list for the current key. |
clear() | None | Clear the list for the current key. |
Example — Stateful Process Function
import krishiv as ks
from krishiv import apply_process_function, ProcessContext, ValueState
def count_events(ctx: ProcessContext, batch, state: ValueState):
current = state.set_json.__doc__ # read pattern
n = 0 # accumulate from batch
for _ in range(batch.num_rows):
n += 1
state.set_json(n)
ctx.emit(batch)
session = ks.Session.embedded()
stream, sender = session.memory_stream(schema)
keyed = stream.key_by("user_id")
result = apply_process_function(keyed, count_events, ValueState("event_count"))