Iceberg
Apache Iceberg REST catalog, table reads, and MERGE INTO.
Requirements
Enable the iceberg Cargo/Python feature. Krishiv targets Apache Iceberg v2+ (v3 for row lineage). The primary certified catalog is REST.
Catalog Registration
CREATE CATALOG my_catalog
TYPE ICEBERG_REST
OPTIONS (
'uri' = 'http://catalog.internal:8181',
'warehouse' = 's3://my-bucket/warehouse',
'token' = '<oauth-token>' -- optional
);
After registration, tables are addressable as my_catalog.my_namespace.my_table in SQL.
Reading Iceberg Tables
-- Snapshot read (current)
SELECT * FROM my_catalog.ns.orders WHERE order_date >= '2024-01-01';
-- Time travel by snapshot ID
SELECT * FROM my_catalog.ns.orders FOR VERSION AS OF 1234567890;
-- Time travel by timestamp
SELECT * FROM my_catalog.ns.orders FOR TIMESTAMP AS OF '2024-06-01 00:00:00';
Writing Iceberg Tables
-- Append
INSERT INTO my_catalog.ns.orders SELECT * FROM new_orders;
-- Overwrite by predicate
INSERT OVERWRITE my_catalog.ns.orders
OVERWRITE PARTITION (order_date = '2024-06-01')
SELECT * FROM daily_orders WHERE order_date = '2024-06-01';
-- Merge
MERGE INTO my_catalog.ns.orders AS target
USING new_data AS source
ON target.order_id = source.order_id
WHEN MATCHED THEN UPDATE SET *
WHEN NOT MATCHED THEN INSERT *;
Rust API
use krishiv_api::{Session, SessionBuilder, KrishivCatalog, Result};
use std::sync::Arc;
#[tokio::main]
async fn main() -> Result<()> {
let catalog = Arc::new(KrishivCatalog::rest("http://catalog.internal:8181", "s3://bucket/wh")?);
let session = Session::builder()
.with_iceberg_catalog(catalog, "my_catalog")?
.build().await?;
let df = session.sql("SELECT count(*) FROM my_catalog.ns.orders").await?;
df.show().await?;
Ok(())
}
Python API
import krishiv as ks
session = ks.Session.embedded()
# IcebergRestCatalog for metadata inspection
catalog = ks.IcebergRestCatalog(uri="http://catalog:8181", warehouse="s3://bucket/wh")
tables = catalog.list_tables("my_ns")
print(tables)
# Direct read (iceberg feature required)
df = ks.read_iceberg("s3://bucket/wh/my_ns/orders/", catalog_uri="http://catalog:8181")
df.show()