Iceberg Processor User Guide
The Iceberg Processor is a KafScale addon that reads completed segments directly from S3 and writes them to Apache Iceberg tables. Like KafScale brokers, the processor is stateless: offsets live in etcd, data lives in S3, output goes to your Iceberg catalog. Pods can scale up, down, or restart without coordination overhead.
This guide is for operators and platform engineers deploying the processor in Kubernetes. It focuses on configuration, behavior, and operations. For implementation details and code changes, see the developer guide in the main branch.
What It Does
- Reads completed KafScale segments from S3 (bypasses brokers entirely).
- Decodes records and writes them to Iceberg tables (append-only).
- Tracks offsets with a lease-per-topic model (at-least-once).
- Optional JSON schema validation and schema-driven columns.
Feature Highlights
- Storage-native processing (no Kafka protocol or brokers required).
- Iceberg REST catalog support with auto-create tables (works with Unity Catalog, Polaris, etc.).
- Mapping-driven or registry-driven columns with schema evolution.
- Lease-based offsets with replay-safe at-least-once semantics.
- Metrics and health endpoints for ops visibility.
Prerequisites
- Access to the S3 bucket where KafScale segments are stored.
- An Iceberg REST catalog endpoint reachable from the cluster.
- An offsets backend (etcd by default).
- Optional: JSON schema registry endpoint for validation or column discovery.
Configuration Overview
The processor reads a YAML config file (mounted into the container).
Required fields:
s3.bucketiceberg.catalog.typeiceberg.catalog.urietcd.endpoints(whenoffsets.backend=etcd)mappings[]withtopicandtable
Common fields:
s3:
bucket: kafscale-data
namespace: production
endpoint: ""
region: us-east-1
path_style: false
iceberg:
catalog:
type: rest
uri: https://iceberg-catalog.example.com
token: ""
username: ""
password: ""
warehouse: s3://iceberg-warehouse/production
offsets:
backend: etcd
lease_ttl_seconds: 30
key_prefix: processors
discovery:
mode: auto
etcd:
endpoints:
- http://etcd.kafscale.svc.cluster.local:2379
username: ""
password: ""
For MinIO or any non-AWS S3 endpoint:
s3:
bucket: kafscale-snapshots
namespace: kafscale-demo
endpoint: http://minio.kafscale-demo.svc.cluster.local:9000
region: us-east-1
path_style: true
Warehouse semantics:
iceberg.warehouse is the S3 prefix where Iceberg metadata and Parquet files are written directly by the processor. Point your catalog (Unity Catalog, Polaris, etc.) at this path.
Security and Data Governance
The operator is responsible for storage security and governance:
- IAM roles/permissions for S3 access.
- Encryption at rest (SSE-S3 or SSE-KMS) and in transit.
- Bucket policies, logging, and data retention.
If you use Unity Catalog on AWS S3, point the catalog at the same warehouse path. The processor writes Iceberg tables there; Unity Catalog reads the metadata and Parquet files directly from S3.
Topic-to-Table Mapping
mappings:
- topic: orders
table: prod.orders
mode: append
create_table_if_missing: true
Notes:
- Only
appendis supported. create_table_if_missingauto-creates tables when topics are new.
Schema Configuration
Schema configuration has two parts: defining table columns (structure) and optionally validating incoming JSON (correctness).
Columns and Evolution
You can define columns directly in the mapping or resolve them from a registry.
Mapping-defined columns:
mappings:
- topic: orders
table: prod.orders
mode: append
create_table_if_missing: true
schema:
columns:
- name: order_id
type: long
required: true
- name: status
type: string
allow_type_widening: true
Registry-driven columns:
schema:
mode: "off"
registry:
base_url: https://schemas.example.com
timeout_seconds: 5
cache_seconds: 300
mappings:
- topic: orders
table: prod.orders
schema:
source: registry
Supported column types:
boolean, int, long, float, double, string, binary, timestamp, date.
Validation (Optional)
schema.mode controls JSON validation against the registry:
off: no validation (default).lenient: drops invalid records and continues.strict: stops on validation errors.
Validation fetches schemas from schema.registry.base_url/<topic>.json.
Discovery Modes
Discovery determines how the processor finds topics, partitions, and segments.
auto(default): uses etcd for topic/partition metadata when available, S3 for segment discovery. Best for most deployments.etcd: requiresetcd.endpoints. Use when you want faster startup and already run etcd.s3: ignores etcd, relies on S3 listing only. Use when you don’t run etcd or want simpler dependencies.
Offsets and Leases
Offsets are tracked per topic with a TTL lease:
- Only one worker processes a topic at a time (all its partitions).
- No cross-worker synchronization required.
- If a pod dies, the lease expires and another pod resumes.
- At-least-once semantics by design.
Tune offsets.lease_ttl_seconds based on segment size and processing time.
Record IDs and Idempotency
Each record includes a deterministic record_id column of the form <topic>:<partition>:<offset>. Downstream consumers can use this to dedupe replays (at-least-once semantics) without relying on Kafka metadata.
Write Serialization (Iceberg)
Table create/load/evolve/commit operations are serialized per topic to avoid Iceberg commit races. This keeps correctness but reduces per-topic concurrency.
Deployment (Helm)
Use the values file in the main repo as the base: https://github.com/novatechflow/kafscale/blob/main/addons/processors/iceberg-processor/deploy/helm/iceberg-processor/values.yaml.
Key Helm values:
config.s3.*config.iceberg.*config.etcd.*config.mappingss3.credentialsSecretRefwithAWS_ACCESS_KEY_IDandAWS_SECRET_ACCESS_KEY
Metrics and Health
Default metrics bind: KAFSCALE_METRICS_ADDR (default :9093).
/metricsexposes Prometheus metrics./healthzreturnsok.
Key metrics:
kafscale_processor_records_total{topic,result}kafscale_processor_batches_total{topic}kafscale_processor_write_latency_ms{topic}kafscale_processor_errors_total{stage}kafscale_processor_last_offset{topic,partition}kafscale_processor_watermark_offset{topic,partition}kafscale_processor_watermark_timestamp_ms{topic,partition}
Scaling
Work is topic-scoped. One worker holds the lease for an entire topic at a time, which avoids synchronization between workers. Throughput scales with topic count: more topics means more parallelism across workers.
The Helm chart does not install an HPA by default. Set replicaCount manually or add your own HPA based on CPU, memory, or custom metrics.
Minimal HPA example:
apiVersion: autoscaling/v2
kind: HorizontalPodAutoscaler
metadata:
name: iceberg-processor
spec:
scaleTargetRef:
apiVersion: apps/v1
kind: Deployment
name: iceberg-processor
minReplicas: 1
maxReplicas: 10
metrics:
- type: Resource
resource:
name: cpu
target:
type: Utilization
averageUtilization: 80
Troubleshooting
- Missing data: verify S3 path
s3://{bucket}/{namespace}/{topic}/{partition}. - Schema errors: check registry URL and payloads for valid JSON.
- Catalog errors: verify REST endpoint and credentials.
- Offsets not advancing: check etcd connectivity and lease TTL.
Local Demo
From repo root:
make iceberg-demo
This boots a local kind cluster, deploys the processor, produces data, and validates the S3 and Iceberg outputs. See the root Makefile for image and warehouse overrides.