ProductDocsArchitectureBlogGitHubGitHubGet Started
Preview

Iceberg

Apache Iceberg REST catalog, table reads, and MERGE INTO.

Requirements

Enable the iceberg Cargo/Python feature. Krishiv targets Apache Iceberg v2+ (v3 for row lineage). The primary certified catalog is REST.

Catalog Registration

CREATE CATALOG my_catalog
TYPE ICEBERG_REST
OPTIONS (
  'uri'       = 'http://catalog.internal:8181',
  'warehouse' = 's3://my-bucket/warehouse',
  'token'     = '<oauth-token>'           -- optional
);

After registration, tables are addressable as my_catalog.my_namespace.my_table in SQL.

Reading Iceberg Tables

-- Snapshot read (current)
SELECT * FROM my_catalog.ns.orders WHERE order_date >= '2024-01-01';

-- Time travel by snapshot ID
SELECT * FROM my_catalog.ns.orders FOR VERSION AS OF 1234567890;

-- Time travel by timestamp
SELECT * FROM my_catalog.ns.orders FOR TIMESTAMP AS OF '2024-06-01 00:00:00';

Writing Iceberg Tables

-- Append
INSERT INTO my_catalog.ns.orders SELECT * FROM new_orders;

-- Overwrite by predicate
INSERT OVERWRITE my_catalog.ns.orders
OVERWRITE PARTITION (order_date = '2024-06-01')
SELECT * FROM daily_orders WHERE order_date = '2024-06-01';

-- Merge
MERGE INTO my_catalog.ns.orders AS target
USING new_data AS source
ON target.order_id = source.order_id
WHEN MATCHED THEN UPDATE SET *
WHEN NOT MATCHED THEN INSERT *;

Rust API

use krishiv_api::{Session, SessionBuilder, KrishivCatalog, Result};
use std::sync::Arc;

#[tokio::main]
async fn main() -> Result<()> {
    let catalog = Arc::new(KrishivCatalog::rest("http://catalog.internal:8181", "s3://bucket/wh")?);
    let session = Session::builder()
        .with_iceberg_catalog(catalog, "my_catalog")?
        .build().await?;

    let df = session.sql("SELECT count(*) FROM my_catalog.ns.orders").await?;
    df.show().await?;
    Ok(())
}

Python API

import krishiv as ks

session = ks.Session.embedded()
# IcebergRestCatalog for metadata inspection
catalog = ks.IcebergRestCatalog(uri="http://catalog:8181", warehouse="s3://bucket/wh")
tables = catalog.list_tables("my_ns")
print(tables)

# Direct read (iceberg feature required)
df = ks.read_iceberg("s3://bucket/wh/my_ns/orders/", catalog_uri="http://catalog:8181")
df.show()