Session & SessionBuilder
Create and configure a Krishiv session for batch, streaming, or incremental work.
SessionBuilder
All sessions are constructed via SessionBuilder. The builder is obtained from Session::builder() and follows the builder pattern.
Session::builder() -> SessionBuilder
Builder Methods
| Method | Description |
|---|---|
embedded() | Shorthand for SessionBuilder::default().build().await in embedded mode. |
mode(ExecutionMode) | Set the execution mode: Embedded, SingleNode, Distributed. |
coordinator_url(impl Into<String>) | Flight/gRPC endpoint for distributed mode. |
with_auth(impl AuthProvider) | Attach a bearer-token or custom auth provider. |
with_policy(impl PolicyHook) | Attach a governance/access-control hook. |
target_parallelism(NonZeroUsize) | DataFusion target partition count for parallel execution. |
with_iceberg_catalog(Arc<KrishivCatalog>, name) | Register an Iceberg catalog under a given name (requires iceberg-catalog feature). |
with_shuffle_partitions(Option<u32>) | Override shuffle bucket count; None = auto. |
config(key, value) | Set a session config key. |
build() -> Result<Session> | Construct and return the session (async, requires .await). |
Session Constructors
Session::embedded() -> impl Future<Output = Result<Session>>
Session::from_env() -> impl Future<Output = Result<Session>>
Session::connect(url: &str) -> impl Future<Output = Result<Session>>
SQL Methods
| Method | Returns | Description |
|---|---|---|
sql(query: &str) | Future<Result<DataFrame>> | Parse and plan a SQL query. Returns a lazy DataFrame. |
sql_with_timeout(query, ms) | Future<Result<DataFrame>> | Same as sql with an execution timeout in milliseconds. |
prepare(query: &str) | Future<Result<PreparedStatement>> | Create a parameterised prepared statement. |
explain(query: &str) | Future<Result<String>> | Return the DataFusion logical plan as a string. |
Data Registration
| Method | Returns | Description |
|---|---|---|
read_parquet(path) | Future<Result<DataFrame>> | Read a local Parquet file into a DataFrame. |
read_csv(path) | Future<Result<DataFrame>> | Read a local CSV file. |
read_json(path) | Future<Result<DataFrame>> | Read a local NDJSON file. |
register_parquet(name, path) | Future<Result<()>> | Register a Parquet file as a named SQL table. |
register_record_batches(name, batches) | Future<Result<()>> | Register in-memory Arrow batches as a SQL table. |
register_udf(udf: ScalarUdf) | Result<()> | Register a scalar UDF. |
register_aggregate_udf(udf) | Result<()> | Register an aggregate UDF. |
register_table_udf_fn(name, schema, f) | Result<()> | Register a closure-based table-valued function. |
register_kafka_source(name, schema, brokers, topic, group) | Result<()> | Register a Kafka topic as an unbounded streaming table. |
deregister_table(name) | Result<()> | Remove a registered table. |
table_exists(name) | bool | Check if a table is registered. |
list_tables() | Vec<String> | List registered table names. |
Streaming Methods
| Method | Returns | Description |
|---|---|---|
memory_stream(schema) | Result<(Stream, Sender)> | Create an in-memory stream and its push handle. |
from_bounded_stream(schema, batches) | Result<Stream> | Create a bounded stream from a static list of batches. |
submit_stream_job(plan, name) | Future<Result<JobStatus>> | Submit a streaming job to the scheduler. |
Example
use krishiv_api::{Session, Result};
use arrow::datatypes::{DataType, Field, Schema};
use std::sync::Arc;
#[tokio::main]
async fn main() -> Result<()> {
let session = Session::embedded().await?;
// Register in-memory data
let schema = Arc::new(Schema::new(vec![
Field::new("id", DataType::Int64, false),
Field::new("val", DataType::Float64, true),
]));
// ... build RecordBatch ...
// SQL query
let df = session.sql("SELECT id, val FROM my_table WHERE val > 10").await?;
df.show().await?;
Ok(())
}