ProductDocsArchitectureBlogGitHubGitHubGet Started
Available

Kafka Connector

Source and sink connector for Apache Kafka and Confluent.

Requirements

Enable the kafka Cargo feature. In Python: maturin develop --features kafka.

SQL DDL

-- Kafka source (streaming)
CREATE SOURCE orders_raw
TYPE KAFKA
OPTIONS (
  'brokers'           = 'broker1:9092,broker2:9092',
  'topic'             = 'orders',
  'group.id'          = 'krishiv-consumer-1',
  'auto.offset.reset' = 'latest',
  'format'            = 'json'        -- 'json' | 'avro' | 'protobuf'
)
WITH SCHEMA (
  order_id   BIGINT   NOT NULL,
  customer   VARCHAR,
  amount     DOUBLE,
  event_time TIMESTAMP
);

-- Kafka sink
CREATE SINK results_sink
TYPE KAFKA
OPTIONS (
  'brokers' = 'broker1:9092',
  'topic'   = 'results',
  'format'  = 'json'
);

Rust API

use krishiv_api::{Session, Result};
use arrow::datatypes::{DataType, Field, Schema};
use std::sync::Arc;

#[tokio::main]
async fn main() -> Result<()> {
    let session = Session::embedded().await?;
    let schema = Arc::new(Schema::new(vec![
        Field::new("order_id",   DataType::Int64,   false),
        Field::new("customer",   DataType::Utf8,    true),
        Field::new("amount",     DataType::Float64, true),
        Field::new("event_time", DataType::Timestamp(TimeUnit::Millisecond, None), false),
    ]));
    session.register_kafka_source(
        "orders_raw",
        schema,
        "broker1:9092,broker2:9092",
        "orders",
        "krishiv-consumer-1",
    )?;
    let df = session.sql("SELECT * FROM orders_raw WHERE amount > 100").await?;
    df.show().await?;
    Ok(())
}

Python API

import krishiv as ks
import pyarrow as pa

session = ks.Session.embedded()
schema = pa.schema([
    pa.field("order_id",   pa.int64()),
    pa.field("customer",   pa.utf8()),
    pa.field("amount",     pa.float64()),
    pa.field("event_time", pa.timestamp("ms")),
])
session.register_kafka_source(
    "orders_raw", schema,
    brokers="broker1:9092",
    topic="orders",
    group="krishiv-consumer-1",
)
session.sql("SELECT customer, SUM(amount) AS total FROM orders_raw GROUP BY customer").show()

Kafka Options

OptionDefaultDescription
brokersComma-separated Kafka broker list. Required.
topicTopic name. Required.
group.idConsumer group ID. Required for sources.
auto.offset.resetlatestearliest | latest
formatjsonjson | avro | protobuf
schema.registry.urlConfluent Schema Registry URL (required for Avro/Protobuf).
fetch.message.max.bytes1048576Max fetch size per partition.