Like what you see? ⭐ Star the repo ⭐ to support the project and keep it in the spotlight. See the stargazers →

KafScale User Guide

KafScale is a Kafka-compatible, S3-backed message transport system. It keeps brokers stateless, stores data in S3, and relies on Kubernetes for scheduling and scaling. This guide covers how to interact with the platform once it is deployed.

Before you start

Latency expectations: KafScale has ~500ms end-to-end latency due to S3 flush semantics. It’s ideal for ETL, logs, CDC, and async events—not for sub-100ms use cases like real-time bidding or gaming.

Authentication: KafScale v1.x does not yet support SASL authentication or mTLS. Network-level security (Kubernetes NetworkPolicies, private VPCs) is the current isolation mechanism. See the Security page for the roadmap.

TLS: If TLS is enabled, it’s configured at the Kubernetes Ingress or LoadBalancer level by your operator—not in KafScale itself. Check with your platform team for connection details.

Idempotent producers: KafScale does not support idempotent producers (Kafka API 22). Since Kafka clients 3.0+, idempotence is enabled by default. You must set enable.idempotence=false in all producer configurations.

Concepts

Concept Description
Topics / Partitions Standard Kafka semantics. All Kafka client libraries work.
Brokers Stateless pods accepting Kafka protocol on port 9092, metrics on 9093.
Metadata Stored in etcd, encoded via protobufs (kafscale.metadata.*).
Storage Message segments live in S3; brokers keep only in-memory caches.
Operator Kubernetes controller that provisions brokers, topics, and wiring via CRDs.

Local Demo

For a kind-based demo environment run make demo-platform. The Makefile applies the demo resources via scripts/demo-platform-apply.sh; if your environment blocks inline heredocs, ensure the script is executable and run the target again.

Client Examples

Use this section to copy/paste minimal examples for your client. If you don’t control client config (managed apps, hosted integrations), ask the operator team to confirm idempotence and transactions are disabled.

For install and bootstrap steps, see Quickstart.

Java (plain)

Disable idempotence—KafScale does not support transactional semantics or idempotent producers.

# Java producer properties
bootstrap.servers=kafscale-broker:9092
enable.idempotence=false
acks=1

Producer:

Properties props = new Properties();
props.put("bootstrap.servers", "kafscale-broker:9092");
props.put("enable.idempotence", "false");
props.put("acks", "1");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

try (KafkaProducer<String, String> producer = new KafkaProducer<>(props)) {
    producer.send(new ProducerRecord<>("orders", "key-1", "value-1")).get();
}

Consumer:

Properties props = new Properties();
props.put("bootstrap.servers", "kafscale-broker:9092");
props.put("group.id", "orders-consumer");
props.put("auto.offset.reset", "earliest");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

try (KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props)) {
    consumer.subscribe(Collections.singletonList("orders"));
    ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(5));
    for (ConsumerRecord<String, String> record : records) {
        System.out.println(record.value());
    }
}

Spring Boot

# application.yml
spring:
  kafka:
    bootstrap-servers: kafscale-broker:9092
    producer:
      properties:
        enable.idempotence: false
        acks: 1
    consumer:
      group-id: orders-consumer
      auto-offset-reset: earliest

Python (confluent-kafka)

from confluent_kafka import Producer, Consumer

# Producer
producer = Producer({
    'bootstrap.servers': 'kafscale-broker:9092',
    'enable.idempotence': False
})
producer.produce('orders', key='key-1', value='value-1')
producer.flush()

# Consumer
consumer = Consumer({
    'bootstrap.servers': 'kafscale-broker:9092',
    'group.id': 'orders-consumer',
    'auto.offset.reset': 'earliest'
})
consumer.subscribe(['orders'])

while True:
    msg = consumer.poll(1.0)
    if msg is None:
        continue
    if msg.error():
        print(f"Error: {msg.error()}")
        continue
    print(f"{msg.key()}: {msg.value()}")

Go (franz-go)

Franz-go is the most feature-complete Kafka client in Go.

Producer:

client, _ := kgo.NewClient(
    kgo.SeedBrokers("kafscale-broker:9092"),
    kgo.AllowAutoTopicCreation(),
    kgo.DisableIdempotentWrite(),
)
defer client.Close()

client.ProduceSync(ctx, &kgo.Record{Topic: "orders", Value: []byte("hello")})

Consumer:

client, _ := kgo.NewClient(
    kgo.SeedBrokers("kafscale-broker:9092"),
    kgo.ConsumerGroup("orders-consumer"),
    kgo.ConsumeTopics("orders"),
)
defer client.Close()

fetches := client.PollFetches(ctx)
fetches.EachRecord(func(r *kgo.Record) {
    fmt.Println(string(r.Value))
})

Go (kafka-go)

// Producer
w := &kafka.Writer{
    Addr:  kafka.TCP("kafscale-broker:9092"),
    Topic: "orders",
}
defer w.Close()
w.WriteMessages(ctx, kafka.Message{Value: []byte("hello")})

// Consumer
r := kafka.NewReader(kafka.ReaderConfig{
    Brokers: []string{"kafscale-broker:9092"},
    GroupID: "orders-consumer",
    Topic:   "orders",
})
defer r.Close()
m, _ := r.ReadMessage(ctx)
fmt.Println(string(m.Value))

Kafka CLI

# Produce
kafka-console-producer \
  --bootstrap-server kafscale-broker:9092 \
  --topic orders \
  --producer-property enable.idempotence=false

# Consume
kafka-console-consumer \
  --bootstrap-server kafscale-broker:9092 \
  --topic orders \
  --from-beginning

Stream Processing Integration

KafScale is a transport layer—it doesn’t include embedded stream processing. This is intentional; data processing doesn’t belong in the message broker. Pair KafScale with external engines like Apache Flink or Apache Wayang for stateful transformations, windowing, and analytics.

Flink’s Kafka connector works with KafScale. Disable idempotence and exactly-once semantics since KafScale does not support transactions or idempotent producers.

Maven dependency:

<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-connector-kafka</artifactId>
  <version>3.1.0-1.18</version>
</dependency>

Source (read from KafScale):

KafkaSource<String> source = KafkaSource.<String>builder()
    .setBootstrapServers("kafscale-broker:9092")
    .setTopics("orders")
    .setGroupId("flink-orders")
    .setStartingOffsets(OffsetsInitializer.earliest())
    .setValueOnlyDeserializer(new SimpleStringSchema())
    .build();

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<String> stream = env.fromSource(
    source, 
    WatermarkStrategy.noWatermarks(), 
    "KafScale Source"
);

Sink (write to KafScale):

Properties producerConfig = new Properties();
producerConfig.setProperty("enable.idempotence", "false");

KafkaSink<String> sink = KafkaSink.<String>builder()
    .setBootstrapServers("kafscale-broker:9092")
    .setKafkaProducerConfig(producerConfig)
    .setRecordSerializer(KafkaRecordSerializationSchema.builder()
        .setTopic("orders-processed")
        .setValueSerializationSchema(new SimpleStringSchema())
        .build())
    .setDeliveryGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)
    .build();

stream.sinkTo(sink);
env.execute("Order Processing");

Note: You must explicitly set enable.idempotence=false via setKafkaProducerConfig(). Since Kafka 3.0, idempotence defaults to true, which requires API 22 that KafScale does not support.

Apache Wayang

Wayang provides a platform-agnostic API that can run on Java or Spark backends. Kafka source/sink support was added in 2024.

Maven dependencies:

<dependency>
  <groupId>org.apache.wayang</groupId>
  <artifactId>wayang-api-scala-java</artifactId>
  <version>1.0.0</version>
</dependency>
<dependency>
  <groupId>org.apache.wayang</groupId>
  <artifactId>wayang-java</artifactId>
  <version>1.0.0</version>
</dependency>

Read from KafScale, process, write back:

Configuration configuration = new Configuration();
WayangContext wayangContext = new WayangContext(configuration)
    .withPlugin(Java.basicPlugin());

JavaPlanBuilder planBuilder = new JavaPlanBuilder(wayangContext)
    .withJobName("OrderProcessing")
    .withUdfJarOf(MyJob.class);

planBuilder
    .readKafkaTopic("orders").withName("Load from KafScale")
    .flatMap(line -> Arrays.asList(line.split("\\W+")))
    .filter(token -> !token.isEmpty())
    .map(word -> new Tuple2<>(word.toLowerCase(), 1))
    .reduceByKey(Tuple2::getField0, 
        (t1, t2) -> new Tuple2<>(t1.getField0(), t1.getField1() + t2.getField1()))
    .writeKafkaTopic("orders-counts", 
        d -> String.format("%s: %d", d.getField0(), d.getField1()),
        "wayang-job",
        LoadProfileEstimators.createFromSpecification(
            "wayang.java.kafkatopicsink.load", configuration));

To switch from Java to Spark backend, change one line:

.withPlugin(Spark.basicPlugin());  // instead of Java.basicPlugin()

Important: Wayang uses the Kafka Java client internally but does not expose producer configuration through its API. Since Kafka 3.0+ defaults to enable.idempotence=true, you must disable it via JVM argument:

java -Denable.idempotence=false -jar your-wayang-app.jar

Requires Java 17. See wayang.apache.org for full implementation details.

Other compatible engines

Engine Notes
Spark Structured Streaming Use kafka format, set kafka.enable.idempotence=false
Kafka Streams Disable EOS: processing.guarantee=at_least_once
Redpanda Console / Kowl Compatible for topic browsing
Conduktor Compatible for admin UI

Monitoring

KafScale exposes Prometheus metrics on port 9093.

# Scrape metrics
curl http://kafscale-broker:9093/metrics

# Key metrics to watch
curl -s http://kafscale-broker:9093/metrics | grep -E "kafscale_(produce|fetch|s3)"

Key metrics:

Metric Description
kafscale_produce_requests_total Total produce requests
kafscale_fetch_requests_total Total fetch requests
kafscale_s3_upload_duration_ms S3 segment upload latency
kafscale_s3_download_duration_ms S3 segment download latency
kafscale_cache_hit_ratio LRU cache effectiveness

Brokers also emit structured JSON logs. The operator exposes its own metrics for CRD reconciliation.

Troubleshooting

Test broker connectivity:

kafka-broker-api-versions --bootstrap-server kafscale-broker:9092

List topics:

kafka-topics --bootstrap-server kafscale-broker:9092 --list

Describe a topic:

kafka-topics --bootstrap-server kafscale-broker:9092 --describe --topic orders

Check consumer group lag:

kafka-consumer-groups \
  --bootstrap-server kafscale-broker:9092 \
  --group orders-consumer \
  --describe

Common issues:

Symptom Likely cause Fix
NOT_LEADER_FOR_PARTITION Normal during rebalance Retry; client handles this
CLUSTER_AUTHORIZATION_FAILED Idempotence enabled, API 22 not supported Set enable.idempotence=false
High produce latency S3 flush taking long Check S3 health, increase buffer
Consumer lag growing Slow processing or S3 reads Scale consumers, check cache hit ratio
UNKNOWN_TOPIC Topic not created Create via CRD or enable auto-create

Scaling and Maintenance

The operator uses Kubernetes HPA and the BrokerControl gRPC API to safely drain partitions before restarts.

  • Scaling up: Add replicas via kubectl scale or adjust the CRD; partitions rebalance automatically.
  • Scaling down: The operator drains partitions before terminating pods.
  • Rolling restarts: Use kubectl rollout restart; the operator coordinates graceful handoff.

Limits and Non-Goals

KafScale intentionally does not support:

  • Transactions / exactly-once semantics — use at-least-once with idempotent consumers
  • Idempotent producers (API 22) — disable enable.idempotence in all clients
  • Log compaction — out of scope for MVP
  • Embedded stream processing — pair with Flink, Wayang, Spark, etc.
  • Sub-100ms latency — S3 flush semantics add ~500ms

Next steps

  • Configuration — tune cache sizes, buffer thresholds, S3 settings
  • Operations — S3 health states, failure modes, multi-region CRR
  • Security — current posture, TLS setup, auth roadmap
  • Architecture — how data flows through the system