KafScale Ops API v1
This document defines the Kafka admin APIs KafScale exposes to support operator workflows. The scope stays within KafScale’s architecture: stateless brokers, S3 durability, and etcd-backed metadata. Transactions, compaction, and KRaft APIs remain out of scope.
Guiding Principles
- Prioritize operator parity (visibility, recovery, and cleanup).
- Keep config mutation narrow and explicit.
- Preserve a single source of truth in etcd metadata snapshots.
- Avoid UI write paths until authentication and authorization exist.
API Coverage (v1)
| Category | API Key | Name | Version | Notes |
|---|---|---|---|---|
| Group visibility | 15 | DescribeGroups | v5 | Required for kafka-consumer-groups.sh --describe. |
| Group visibility | 16 | ListGroups | v5 | Required to enumerate consumer groups. |
| Group cleanup | 42 | DeleteGroups | v2 | Removes group metadata without touching topic data. |
| Recovery | 23 | OffsetForLeaderEpoch | v3 | Safe recovery after broker failover/truncation. |
| Config read | 32 | DescribeConfigs | v4 | Read-only visibility into topic/broker configuration. |
| Config write | 33 | AlterConfigs | v1 | Topic whitelist only; broker mutation rejected. |
| Scaling | 37 | CreatePartitions | v3 | Additive only; no partition reduction. |
Selection Rationale
- These APIs unlock core ops workflows without requiring KRaft, transactions, or compaction.
- Write APIs are intentionally narrow to reduce blast radius until auth/ACLs exist.
- Anything outside this list is deferred and advertised as unsupported in ApiVersions.
Client Compatibility
KafScale’s Ops API v1 has been tested with these client versions:
| Client | Minimum Version | Recommended | Notes |
|---|---|---|---|
| librdkafka | 1.9.0 | 2.3.0+ | Required for flexible versions |
| confluent-kafka-python | 1.9.0 | 2.3.0+ | Wraps librdkafka |
| kafka-python | 2.0.2 | 2.0.2 | Legacy; limited AdminClient support |
| franz-go | 1.15.0 | 1.16.0+ | Full kmsg support |
| Kafka CLI tools | 3.5.0 | 3.7.0+ | Match implemented API versions |
Older clients may work but could fall back to older API versions or miss features.
Versioning Notes
For v1 we implement the Kafka 3.7 CLI versions in active use; flexible versions are supported where client tooling requires them. IncrementalAlterConfigs remains deferred.
Implemented versions:
- DescribeGroups v5
- ListGroups v5
- OffsetForLeaderEpoch v3
- DescribeConfigs v4
- AlterConfigs v1
- DeleteGroups v2
- CreatePartitions v3
ApiVersions Advertising
KafScale advertises supported ops APIs via ApiVersions so clients can select the
correct versions. Unsupported ops APIs are listed with MinVersion = -1 and
MaxVersion = -1 for clarity.
Config Surface (Describe/AlterConfigs)
KafScale will only expose topic/broker configs that map cleanly to existing metadata
and runtime behavior. Everything else returns INVALID_CONFIG (or the closest Kafka
error code supported by the request version).
Topic Config Whitelist
| Config Key | Maps To | Notes |
|---|---|---|
retention.ms |
TopicConfig.retention_ms |
-1 for unlimited, otherwise >= 0. |
retention.bytes |
TopicConfig.retention_bytes |
-1 for unlimited, otherwise >= 0. |
segment.bytes |
TopicConfig.segment_bytes |
Must be > 0. |
AlterConfigs will accept only the topic keys above. Broker-level mutation is out of scope for v1.
Broker Config Whitelist (Read-Only)
Broker-level configs are read-only in v1 and are exposed under a KafScale-specific namespace so they do not conflict with upstream Kafka defaults.
| Config Key | Source |
|---|---|
broker.id |
Broker node ID |
advertised.listeners |
Kafka listener address |
kafscale.s3.bucket |
KAFSCALE_S3_BUCKET |
kafscale.s3.region |
KAFSCALE_S3_REGION |
kafscale.s3.endpoint |
KAFSCALE_S3_ENDPOINT |
kafscale.cache.bytes |
KAFSCALE_CACHE_BYTES |
kafscale.readahead.segments |
KAFSCALE_READAHEAD_SEGMENTS |
kafscale.segment.bytes |
KAFSCALE_SEGMENT_BYTES |
kafscale.flush.interval.ms |
KAFSCALE_FLUSH_INTERVAL_MS |
Metadata + Etcd Integration
All topic configuration changes must be persisted to etcd using the existing snapshot
schema (kafscale.metadata.TopicConfig). The operator remains the source of truth:
- Kafka admin API updates are validated, written into etcd, and reconciled back into the operator-managed snapshot.
- Brokers consume the snapshot and adjust runtime behavior accordingly.
- The operator should reject config keys outside the whitelist.
Topic/Partition Management in Etcd
KafScale persists topic configuration and partition counts in etcd via the operator’s metadata snapshot. The Kafka admin APIs will map to the same storage rules:
- CreateTopics writes a new
TopicConfigentry and seeds partition metadata. - AlterConfigs updates only the whitelisted config keys.
- CreatePartitions is additive only; reducing partition count is rejected.
- Partition creation should also pre-create empty partition metadata so brokers can begin serving immediately.
Until auth lands, write access should remain API-only (no UI mutation).
Error Semantics
To align with Kafka client expectations:
- Unknown group:
GROUP_ID_NOT_FOUND - Coordinator unavailable/loading:
COORDINATOR_NOT_AVAILABLEorCOORDINATOR_LOAD_IN_PROGRESS - Unknown topic/partition:
UNKNOWN_TOPIC_OR_PARTITION - Unsupported API/versions:
UNSUPPORTED_VERSION - Invalid config key/value:
INVALID_CONFIG - Unauthorized (when auth lands):
TOPIC_AUTHORIZATION_FAILEDorGROUP_AUTHORIZATION_FAILED
Request-specific notes:
- DescribeGroups/ListGroups return an empty result set on no matches rather than an error.
- DeleteGroups returns per-group errors; a missing group reports
GROUP_ID_NOT_FOUND. - DescribeConfigs returns only keys from the whitelist and ignores unknown keys.
- AlterConfigs rejects any broker resource mutation with
INVALID_CONFIG.
Ops Examples
Kafka CLI
These examples use the ops/admin APIs implemented in v1:
# List consumer groups
kafka-consumer-groups.sh --bootstrap-server <broker> --list
# Describe a consumer group
kafka-consumer-groups.sh --bootstrap-server <broker> --describe --group <group-id>
# Read topic configs
kafka-configs.sh --bootstrap-server <broker> --describe --entity-type topics --entity-name <topic>
# Update topic retention
kafka-configs.sh --bootstrap-server <broker> --alter --entity-type topics --entity-name <topic> \
--add-config retention.ms=604800000
# Fetch offsets by leader epoch
kafka-run-class kafka.admin.OffsetForLeaderEpochClient \
--bootstrap-server <broker> \
--offset-for-leader-epoch-request "topic=<topic>,partition=0,leaderEpoch=0"
# Increase partition count for a topic (additive only)
kafka-topics.sh --bootstrap-server <broker> --alter --topic <topic> --partitions <count>
# Alter topic configs (whitelist only)
kafka-configs.sh --bootstrap-server <broker> --alter --entity-type topics --entity-name <topic> \
--add-config retention.ms=120000
# Delete a consumer group
kafka-consumer-groups.sh --bootstrap-server <broker> --delete --group <group-id>
Python (confluent-kafka)
from confluent_kafka.admin import AdminClient, ConfigResource, ConfigSource
admin = AdminClient({"bootstrap.servers": "127.0.0.1:9092"})
# List consumer groups
groups = admin.list_consumer_groups()
print("Groups:", [g.group_id for g in groups.result().valid])
# Describe a consumer group
described = admin.describe_consumer_groups(["my-group"])
for group_id, future in described.items():
group = future.result()
print(f"Group: {group.group_id}, State: {group.state}")
for member in group.members:
print(f" Member: {member.member_id}, Client: {member.client_id}")
# Describe topic configs
resource = ConfigResource(ConfigResource.Type.TOPIC, "orders")
configs = admin.describe_configs([resource])
for res, future in configs.items():
config = future.result()
for key, entry in config.items():
print(f" {key} = {entry.value}")
# Alter topic configs (whitelist only: retention.ms, retention.bytes, segment.bytes)
resource = ConfigResource(
ConfigResource.Type.TOPIC,
"orders",
set_config={"retention.ms": "120000"}
)
result = admin.alter_configs([resource])
for res, future in result.items():
future.result() # Raises on error
print(f"Updated {res.name}")
# Create partitions (additive only)
from confluent_kafka.admin import NewPartitions
new_parts = {"orders": NewPartitions(6)}
result = admin.create_partitions(new_parts)
for topic, future in result.items():
future.result() # Raises on error
print(f"Expanded {topic} to 6 partitions")
# Delete consumer group
result = admin.delete_consumer_groups(["my-group"])
for group_id, future in result.items():
future.result() # Raises on error
print(f"Deleted group: {group_id}")
Franz-go (Go)
package main
import (
"context"
"fmt"
"time"
"github.com/twmb/franz-go/pkg/kgo"
"github.com/twmb/franz-go/pkg/kmsg"
)
func main() {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
client, _ := kgo.NewClient(kgo.SeedBrokers("127.0.0.1:9092"))
defer client.Close()
listReq := kmsg.NewPtrListGroupsRequest()
listReq.Version = 5
listReq.StatesFilter = []string{"Stable"}
listReq.TypesFilter = []string{"classic"}
listResp, _ := listReq.RequestWith(ctx, client)
fmt.Println("groups:", listResp.Groups)
describeReq := kmsg.NewPtrDescribeGroupsRequest()
describeReq.Version = 5
describeReq.Groups = []string{"my-group"}
describeResp, _ := describeReq.RequestWith(ctx, client)
fmt.Println("describe:", describeResp.Groups)
offsetReq := kmsg.NewPtrOffsetForLeaderEpochRequest()
offsetReq.Version = 3
offsetReq.ReplicaID = -1
offsetReq.Topics = []kmsg.OffsetForLeaderEpochRequestTopic{
{
Topic: "orders",
Partitions: []kmsg.OffsetForLeaderEpochRequestTopicPartition{
{Partition: 0, CurrentLeaderEpoch: -1, LeaderEpoch: 0},
},
},
}
offsetResp, _ := offsetReq.RequestWith(ctx, client)
fmt.Println("offsets:", offsetResp.Topics)
describeCfg := kmsg.NewPtrDescribeConfigsRequest()
describeCfg.Version = 4
describeCfg.Resources = []kmsg.DescribeConfigsRequestResource{
{
ResourceType: kmsg.ConfigResourceTypeTopic,
ResourceName: "orders",
ConfigNames: []string{"retention.ms"},
},
}
describeCfgResp, _ := describeCfg.RequestWith(ctx, client)
fmt.Println("configs:", describeCfgResp.Resources)
alterCfg := kmsg.NewPtrAlterConfigsRequest()
alterCfg.Version = 1
value := "120000"
alterCfg.Resources = []kmsg.AlterConfigsRequestResource{
{
ResourceType: kmsg.ConfigResourceTypeTopic,
ResourceName: "orders",
Configs: []kmsg.AlterConfigsRequestResourceConfig{
{Name: "retention.ms", Value: &value},
},
},
}
alterCfgResp, _ := alterCfg.RequestWith(ctx, client)
fmt.Println("alter:", alterCfgResp.Resources)
createReq := kmsg.NewPtrCreatePartitionsRequest()
createReq.Version = 3
createReq.Topics = []kmsg.CreatePartitionsRequestTopic{
{Topic: "orders", Count: 6},
}
createResp, _ := createReq.RequestWith(ctx, client)
fmt.Println("create partitions:", createResp.Topics)
deleteReq := kmsg.NewPtrDeleteGroupsRequest()
deleteReq.Version = 2
deleteReq.Groups = []string{"my-group"}
deleteResp, _ := deleteReq.RequestWith(ctx, client)
fmt.Println("delete groups:", deleteResp.Groups)
}
PromQL Queries
Common queries for monitoring ops API usage and performance:
# P95 produce latency
histogram_quantile(0.95, rate(kafscale_produce_latency_ms_bucket[5m]))
# P99 consumer lag
histogram_quantile(0.99, rate(kafscale_consumer_lag_bucket[5m]))
# S3 health check (returns 1 when healthy)
kafscale_s3_health_state{state="healthy"} == 1
# Total produce throughput across all brokers
sum(kafscale_produce_rps)
# Admin API request rate by operation
sum by (api) (rate(kafscale_admin_requests_total[5m]))
# Admin API error rate
sum(rate(kafscale_admin_request_errors_total[5m])) / sum(rate(kafscale_admin_requests_total[5m]))
# Broker memory usage percentage (approximate)
kafscale_broker_mem_alloc_bytes / kafscale_broker_mem_sys_bytes * 100
# etcd snapshot staleness by cluster
kafscale_operator_etcd_snapshot_age_seconds > 3600
# Consumer lag exceeding threshold
kafscale_consumer_lag_max > 100000
Local Testing Tip
If you want to test without external S3, run the broker with in-memory S3:
KAFSCALE_USE_MEMORY_S3=1 go run ./cmd/broker
Troubleshooting
Common Errors
| Error | Cause | Fix |
|---|---|---|
COORDINATOR_NOT_AVAILABLE |
Broker still starting or etcd unreachable | Wait for broker readiness; check etcd connectivity |
COORDINATOR_LOAD_IN_PROGRESS |
Group metadata loading from etcd | Retry after a few seconds |
GROUP_ID_NOT_FOUND |
Group doesn’t exist or was already deleted | Verify group name; check if already cleaned up |
UNKNOWN_TOPIC_OR_PARTITION |
Topic doesn’t exist in etcd metadata | Create topic first; verify topic name spelling |
UNSUPPORTED_VERSION |
Client requesting API version KafScale doesn’t implement | Upgrade/downgrade client; check API Coverage table |
INVALID_CONFIG |
Config key not in whitelist or invalid value | Use only retention.ms, retention.bytes, segment.bytes for topics |
REQUEST_TIMED_OUT |
S3 latency spike or broker overloaded | Check kafscale_s3_latency_ms_avg; scale brokers if needed |
Debugging Checklist
“Consumer groups not showing up”
- Verify broker is healthy:
curl http://<broker>:9093/metrics | grep kafscale_s3_health_state - Check etcd connectivity: broker logs should show successful metadata sync
- Confirm group actually exists and has committed offsets
- Try ListGroups first, then DescribeGroups with exact group ID
“AlterConfigs returns INVALID_CONFIG”
- Only topic-level configs are writable in v1
- Only these keys work:
retention.ms,retention.bytes,segment.bytes - Broker-level configs are read-only
- Values must be valid:
retention.ms >= -1,segment.bytes > 0
“CreatePartitions fails”
- New count must be greater than current count (additive only)
- Topic must already exist
- Check operator logs for etcd write errors
“High admin API latency”
- Check S3 health:
kafscale_s3_health_state{state="healthy"} - Check S3 latency:
kafscale_s3_latency_ms_avg(expect 50-200ms typical) - Check etcd latency in operator metrics
- Consider broker scaling if
kafscale_admin_requests_totalrate is high
Getting Help
If you’re stuck:
- Check broker logs:
kubectl logs -l app.kubernetes.io/component=broker - Check operator logs:
kubectl logs -l app.kubernetes.io/component=operator - Scrape metrics endpoint directly:
curl http://<broker>:9093/metrics - Open an issue: github.com/KafScale/platform/issues