ProductDocsArchitectureBlogGitHubGitHubGet Started
Available

State

ValueState, MapState, ListState — per-key operator state for process functions.

Overview

State objects are created inside process functions and scoped to the current key. State is backed by krishiv-state (in-memory or RocksDB depending on the durability profile). State is serialised to/from JSON values.

ValueState

ValueState(name: str)
MethodReturnsDescription
key() -> strstrReturn the current partition key.
set_json(value: Any)NoneStore a JSON-serialisable value for the current key.
clear()NoneDelete the stored value for the current key.

MapState

MapState(name: str)
MethodReturnsDescription
key() -> strstrReturn the current partition key.
put_json(map_key: str, value: Any)NoneStore a value at map_key within the keyed map.
clear()NoneClear all entries for the current key.

ListState

ListState(name: str)
MethodReturnsDescription
key() -> strstrReturn the current partition key.
add_json(value: Any)NoneAppend a JSON-serialisable value to the list for the current key.
clear()NoneClear the list for the current key.

Example — Stateful Process Function

import krishiv as ks
from krishiv import apply_process_function, ProcessContext, ValueState

def count_events(ctx: ProcessContext, batch, state: ValueState):
    current = state.set_json.__doc__  # read pattern
    n = 0  # accumulate from batch
    for _ in range(batch.num_rows):
        n += 1
    state.set_json(n)
    ctx.emit(batch)

session = ks.Session.embedded()
stream, sender = session.memory_stream(schema)
keyed = stream.key_by("user_id")
result = apply_process_function(keyed, count_events, ValueState("event_count"))