ProductDocsArchitectureBlogGitHubGitHubGet Started
Available

Session

Session class — entry point for all Krishiv Python workloads.

Constructors

MethodDescription
Session.embedded()Create an in-process embedded session. No daemon or cluster needed.
Session.local()Create a single-node local session.
Session.from_env()Create a session using KRISHIV_COORDINATOR environment variable.
Session.connect(url: str)Connect to a remote Krishiv coordinator at url.
Session()Create a default embedded session (same as Session.embedded()).

SQL Methods

MethodReturnsDescription
sql(query: str) -> DataFrameDataFramePlan and return a lazy DataFrame for the given SQL.
sql_as(query: str, name: str) -> DataFrameDataFramePlan SQL and alias the result as a temp view.
sql_with_timeout(query: str, timeout_ms: int) -> DataFrameDataFrameSQL with execution timeout in milliseconds.
prepare(query: str) -> PreparedStatementPreparedStatementCreate a parameterised prepared statement.
execute_local(query: str) -> QueryResultQueryResultExecute SQL immediately and return all results.
execute_remote(query: str) -> QueryHandleQueryHandleSubmit SQL to a remote coordinator asynchronously.

Data Registration

MethodReturnsDescription
read_parquet(path: str) -> DataFrameDataFrameRead a local Parquet file.
read_parquet_with_options(path, opts) -> DataFrameDataFrameRead Parquet with typed options (batch_size, etc.).
read_csv(path: str) -> DataFrameDataFrameRead a local CSV file (auto-detects header).
read_csv_with_options(path, opts) -> DataFrameDataFrameRead CSV with options (delimiter, has_header).
read_json(path: str) -> DataFrameDataFrameRead a local NDJSON file.
read_file(path: str) -> DataFrameDataFrameRead a file (format inferred from extension).
register_parquet(name, path)NoneRegister a Parquet file as a named SQL table.
register_record_batches(name, batches)NoneRegister PyArrow batches as a SQL table.
register_unbounded(name, schema)NoneRegister an unbounded streaming table. Returns a push handle.
register_kafka_source(name, schema, brokers, topic, group)NoneRegister a Kafka topic as a streaming table.
table(name: str) -> DataFrameDataFrameReference a registered table as a DataFrame.
dataframe(record_batches) -> DataFrameDataFrameCreate a DataFrame from a list of PyArrow RecordBatches.
deregister_table(name)NoneRemove a registered table.
table_exists(name: str) -> boolboolCheck if a table is registered.
list_tables() -> list[str]list[str]Return registered table names.

UDF Registration

MethodDescription
register_udf(name, fn, input_types, return_type)Register a Python callable as a scalar UDF.
register_aggregate_udf(name, accumulator_class)Register a Python class as an aggregate UDF.
register_table_udf(name, fn, schema)Register a Python callable as a table-valued UDF.
register_function(name, fn)Register a generic Python function (type-inferred).
list_udfs() -> list[str]List registered scalar UDF names.
list_aggregate_udfs() -> list[str]List registered aggregate UDF names.
list_table_udfs() -> list[str]List registered table UDF names.

Streaming Methods

MethodReturnsDescription
stream(name: str) -> StreamStreamReference a registered unbounded table as a Stream.
memory_stream(schema) -> tuple[Stream, Sender](Stream, Sender)Create an in-memory stream and its push handle.
memory_stream_collect(schema, batches) -> StreamStreamCreate a bounded stream pre-loaded with batches.
from_bounded_stream(schema, batches) -> StreamStreamCreate a bounded (finite) stream.
from_source(source) -> StreamStreamCreate a stream from a source connector object.
submit_stream_job(plan, name) -> JobStatusJobStatusSubmit a streaming plan to the scheduler.
push_stream_job_input(job_id, batch)NonePush a batch to a running stream job's unbounded input.
poll_stream_job(job_id) -> JobStatusJobStatusPoll status of a submitted stream job.
close_unbounded_input(table_name)NoneSignal end-of-stream for an unbounded input table.
read_stream() -> DataStreamReaderDataStreamReaderGet a Spark-style structured streaming reader.
submit_async(plan) -> QueryHandleQueryHandleSubmit a streaming plan and get an async handle.

Config and Auth

MethodDescription
get_config(key: str) -> str | NoneRead a session config value.
set_config(key: str, value: str)Set a session config value.
unset_config(key: str)Remove a session config override.
configs() -> dict[str, str]Return all current session configs.
mode() -> strReturn the current execution mode string.
with_auth_token(token: str) -> SessionReturn a session copy with a bearer token attached.
with_oidc_provider(provider) -> SessionReturn a session copy with an OIDC auth provider.
with_policy(hook) -> SessionReturn a session copy with a governance policy hook.
operation_registry() -> OperationRegistryAccess the operation cancellation/progress registry.
jobs() -> list[JobStatus]List running and recent jobs.
live_table(name: str) -> LiveTableAccess a registered live table.
is_streaming_query(query: str) -> boolCheck if a SQL string references a streaming source.