Kafka Connector
Source and sink connector for Apache Kafka and Confluent.
Requirements
Enable the kafka Cargo feature. In Python: maturin develop --features kafka.
SQL DDL
-- Kafka source (streaming)
CREATE SOURCE orders_raw
TYPE KAFKA
OPTIONS (
'brokers' = 'broker1:9092,broker2:9092',
'topic' = 'orders',
'group.id' = 'krishiv-consumer-1',
'auto.offset.reset' = 'latest',
'format' = 'json' -- 'json' | 'avro' | 'protobuf'
)
WITH SCHEMA (
order_id BIGINT NOT NULL,
customer VARCHAR,
amount DOUBLE,
event_time TIMESTAMP
);
-- Kafka sink
CREATE SINK results_sink
TYPE KAFKA
OPTIONS (
'brokers' = 'broker1:9092',
'topic' = 'results',
'format' = 'json'
);
Rust API
use krishiv_api::{Session, Result};
use arrow::datatypes::{DataType, Field, Schema};
use std::sync::Arc;
#[tokio::main]
async fn main() -> Result<()> {
let session = Session::embedded().await?;
let schema = Arc::new(Schema::new(vec![
Field::new("order_id", DataType::Int64, false),
Field::new("customer", DataType::Utf8, true),
Field::new("amount", DataType::Float64, true),
Field::new("event_time", DataType::Timestamp(TimeUnit::Millisecond, None), false),
]));
session.register_kafka_source(
"orders_raw",
schema,
"broker1:9092,broker2:9092",
"orders",
"krishiv-consumer-1",
)?;
let df = session.sql("SELECT * FROM orders_raw WHERE amount > 100").await?;
df.show().await?;
Ok(())
}
Python API
import krishiv as ks
import pyarrow as pa
session = ks.Session.embedded()
schema = pa.schema([
pa.field("order_id", pa.int64()),
pa.field("customer", pa.utf8()),
pa.field("amount", pa.float64()),
pa.field("event_time", pa.timestamp("ms")),
])
session.register_kafka_source(
"orders_raw", schema,
brokers="broker1:9092",
topic="orders",
group="krishiv-consumer-1",
)
session.sql("SELECT customer, SUM(amount) AS total FROM orders_raw GROUP BY customer").show()
Kafka Options
| Option | Default | Description |
|---|---|---|
brokers | — | Comma-separated Kafka broker list. Required. |
topic | — | Topic name. Required. |
group.id | — | Consumer group ID. Required for sources. |
auto.offset.reset | latest | earliest | latest |
format | json | json | avro | protobuf |
schema.registry.url | — | Confluent Schema Registry URL (required for Avro/Protobuf). |
fetch.message.max.bytes | 1048576 | Max fetch size per partition. |