ProductDocsArchitectureBlogGitHubGitHubGet Started
Available

Session & SessionBuilder

Create and configure a Krishiv session for batch, streaming, or incremental work.

SessionBuilder

All sessions are constructed via SessionBuilder. The builder is obtained from Session::builder() and follows the builder pattern.

Session::builder() -> SessionBuilder

Builder Methods

MethodDescription
embedded()Shorthand for SessionBuilder::default().build().await in embedded mode.
mode(ExecutionMode)Set the execution mode: Embedded, SingleNode, Distributed.
coordinator_url(impl Into<String>)Flight/gRPC endpoint for distributed mode.
with_auth(impl AuthProvider)Attach a bearer-token or custom auth provider.
with_policy(impl PolicyHook)Attach a governance/access-control hook.
target_parallelism(NonZeroUsize)DataFusion target partition count for parallel execution.
with_iceberg_catalog(Arc<KrishivCatalog>, name)Register an Iceberg catalog under a given name (requires iceberg-catalog feature).
with_shuffle_partitions(Option<u32>)Override shuffle bucket count; None = auto.
config(key, value)Set a session config key.
build() -> Result<Session>Construct and return the session (async, requires .await).

Session Constructors

Session::embedded() -> impl Future<Output = Result<Session>>
Session::from_env() -> impl Future<Output = Result<Session>>
Session::connect(url: &str) -> impl Future<Output = Result<Session>>

SQL Methods

MethodReturnsDescription
sql(query: &str)Future<Result<DataFrame>>Parse and plan a SQL query. Returns a lazy DataFrame.
sql_with_timeout(query, ms)Future<Result<DataFrame>>Same as sql with an execution timeout in milliseconds.
prepare(query: &str)Future<Result<PreparedStatement>>Create a parameterised prepared statement.
explain(query: &str)Future<Result<String>>Return the DataFusion logical plan as a string.

Data Registration

MethodReturnsDescription
read_parquet(path)Future<Result<DataFrame>>Read a local Parquet file into a DataFrame.
read_csv(path)Future<Result<DataFrame>>Read a local CSV file.
read_json(path)Future<Result<DataFrame>>Read a local NDJSON file.
register_parquet(name, path)Future<Result<()>>Register a Parquet file as a named SQL table.
register_record_batches(name, batches)Future<Result<()>>Register in-memory Arrow batches as a SQL table.
register_udf(udf: ScalarUdf)Result<()>Register a scalar UDF.
register_aggregate_udf(udf)Result<()>Register an aggregate UDF.
register_table_udf_fn(name, schema, f)Result<()>Register a closure-based table-valued function.
register_kafka_source(name, schema, brokers, topic, group)Result<()>Register a Kafka topic as an unbounded streaming table.
deregister_table(name)Result<()>Remove a registered table.
table_exists(name)boolCheck if a table is registered.
list_tables()Vec<String>List registered table names.

Streaming Methods

MethodReturnsDescription
memory_stream(schema)Result<(Stream, Sender)>Create an in-memory stream and its push handle.
from_bounded_stream(schema, batches)Result<Stream>Create a bounded stream from a static list of batches.
submit_stream_job(plan, name)Future<Result<JobStatus>>Submit a streaming job to the scheduler.

Example

use krishiv_api::{Session, Result};
use arrow::datatypes::{DataType, Field, Schema};
use std::sync::Arc;

#[tokio::main]
async fn main() -> Result<()> {
    let session = Session::embedded().await?;

    // Register in-memory data
    let schema = Arc::new(Schema::new(vec![
        Field::new("id",  DataType::Int64,   false),
        Field::new("val", DataType::Float64, true),
    ]));
    // ... build RecordBatch ...

    // SQL query
    let df = session.sql("SELECT id, val FROM my_table WHERE val > 10").await?;
    df.show().await?;
    Ok(())
}