Reference · Distributed Systems
Apache Kafka — Architecture, Message Flow, and CLI Reference
A compact reference for the parts of Kafka that come up most often when designing, debugging, or operating a cluster: the broker / topic / partition / consumer-group architecture, the end-to-end produce → replicate → consume flow with acks=all and ISR semantics, and the CLI commands used to inspect and manage all of the above.
1. Architecture
A Kafka cluster is a set of brokers that store messages in append-only logs. Each log is a partition, and partitions are grouped into named topics. Every partition has one leader broker and zero or more follower replicas; producers and consumers always talk to the leader for a given partition. The set of replicas that are caught up with the leader is the in-sync replica set (ISR). Cluster metadata (which broker leads which partition, which replicas are in-sync, topic configs, ACLs) is managed by the controller — in modern Kafka, this is a KRaft quorum running inside the broker JVMs; in older deployments, it is an external ZooKeeper ensemble.
Solid arrows: producer → partition leader and leader → consumer fetch. Dashed arrows: leader → follower replication (only ISR members are eligible for leader election on failure).
Core concepts in one line each
- Broker — a Kafka server process; holds partition replicas on local disk and serves produce / fetch requests.
- Topic — a named, partitioned, append-only log. Retention is time-based or size-based; compacted topics keep the latest value per key.
- Partition — the unit of parallelism, ordering, and replication. Order is guaranteed within a partition, never across partitions.
- Leader / Follower — one replica per partition is the leader; followers replicate from it. Reads and writes go to the leader by default.
- ISR (in-sync replicas) — replicas that have caught up to the leader.
acks=all+min.insync.replicasdefines the durability contract. - Producer — picks a partition (by key hash, sticky partitioner, or explicit), batches records, and waits for the configured acks before considering a send successful.
- Consumer group — a set of consumers that share partition assignments via the group coordinator. Each partition is consumed by exactly one member of a group at a time.
- Offset — the monotonically increasing position within a partition. Consumers commit progress to the
__consumer_offsetsinternal topic. - Controller — broker(s) that own cluster metadata. In KRaft mode this is a Raft quorum of dedicated or co-located controllers; in legacy mode it lives in ZooKeeper.
2. Message Flow
The end-to-end path of a single record with acks=all, two-replica replication, and a consumer group that commits offsets after processing.
Blue arrows: requests. Green arrows: successful responses / acks. HW (high-water mark) is the highest offset that has been replicated to all in-sync replicas — consumers can only read up to it.
What the steps guarantee
- Step 1–6 (produce path with
acks=all) — the producer's send returns success only after every in-sync replica has appended the record. Withmin.insync.replicas=2, a single broker loss does not lose acknowledged data. - Step 5 (high-water mark) — readers only see offsets at or below the HW. Records above HW are written to disk but not yet visible to consumers, which prevents a consumer from reading data that might be lost on leader failover.
- Step 7–8 (consumer poll) — order is preserved within a partition. Two consumers in the same group never receive the same partition, so each record is processed by exactly one consumer per group.
- Step 9–10 (offset commit) — committing after processing gives at-least-once delivery (a crash between processing and commit re-processes records). Committing before processing gives at-most-once. Exactly-once across produce + consume requires the transactional API (
enable.idempotence=true,transactional.id,isolation.level=read_committed).
3. Exactly-Once Semantics
Kafka's exactly-once story is built out of two independent mechanisms layered on top of each other: an idempotent producer that eliminates duplicates from producer retries within a single partition, and a transactional API that ties multiple partition writes and a consumer-offset commit into a single atomic unit. The transactional API enables the canonical consume → process → produce pattern (the basis of Kafka Streams' EOS guarantee) where the input offset commit and the output records become visible to downstream consumers together or not at all.
Idempotent producer
When you set enable.idempotence=true, the producer is assigned a 64-bit Producer ID (PID) by the broker at first contact and a per-partition monotonically increasing sequence number is attached to every record batch. The leader broker remembers the last sequence number it accepted from a (PID, partition) pair; any batch arriving with a sequence number that has already been committed (because the producer retried after a transient ack failure) is silently de-duplicated, and any out-of-order sequence is rejected with OutOfOrderSequenceException. The guarantee is scoped to a single producer session and a single partition.
Transactional API
Setting transactional.id (in addition to enable.idempotence=true) promotes the producer to a transactional one and pins it to a transaction coordinator — the broker that leads the __transaction_state partition corresponding to the transactional.id hash. The coordinator hands out a producer epoch that fences any older instance of the same transactional.id (so a zombie producer that comes back after a long pause cannot publish into a transaction owned by its successor). The transaction lifecycle is a two-phase commit:
- BeginTxn — producer signals "I'm starting a transaction". No broker-side write yet.
- AddPartitionsToTxn — for every new partition the producer touches, it tells the coordinator. The coordinator records this in
__transaction_stateso it knows which partitions need a commit/abort marker at the end. - Send records — the producer writes data records as usual, but each batch carries the
(PID, epoch, txn-flag). The records are appended to the data partitions before commit; consumers usingread_committedwill not see them yet. - SendOffsetsToTxn (consume-process-produce) — the producer tells the coordinator "commit these consumer offsets atomically with the transaction". The coordinator forwards the offset commit to the group coordinator inside the transaction.
- CommitTxn / AbortTxn — the coordinator writes a PREPARE_COMMIT (or
PREPARE_ABORT) record to__transaction_state, then dispatches control records (commit or abort markers) to every partition the transaction touched. Only after all markers are appended does the coordinator write COMPLETE_COMMIT. Consumers usingisolation.level=read_committedfollow the markers to filter out aborted batches and only return committed ones, up to the last stable offset (LSO) — the offset before the earliest still-open transaction.
Blue: client request flow. Orange: coordinator-internal control writes that drive the commit decision. Green: commit markers + the LSO advance that makes data visible to read_committed consumers. Until step 10 lands on every touched partition, the data records from steps 5–6 sit in the log but are invisible to read_committed.
Key knobs and properties
- Producer —
enable.idempotence=true,transactional.id=<stable-id>,acks=all(forced by idempotence),max.in.flight.requests.per.connection≤ 5 (forced). - Consumer —
isolation.level=read_committed. Withread_uncommitted(the default) you see aborted batches too. - Broker —
transaction.state.log.replication.factor,transaction.state.log.min.isr,transaction.max.timeout.ms. The__transaction_statetopic is created automatically. - Failure mode — if the producer crashes mid-transaction, the coordinator times out the transaction and writes an
AbortTxnmarker, so consumers never see the half-written batches. A zombie producer with a stale epoch is rejected on its next request.
4. Memory / CPU / I/O Pipeline
A Kafka broker is essentially a network-attached append-only log that aggressively uses the kernel page cache instead of a JVM-resident cache. The throughput numbers Kafka is known for come from three architectural choices: sequential disk I/O on the produce path, page-cache reads served via sendfile() zero-copy on the consume path, and a thread pool design that decouples slow network I/O from CPU-bound request processing.
Memory layout of a broker process
- JVM heap (often 6–8 GB) — controller / metadata cache, request queues, in-flight produce buffers, request-handler local state, index entries cached in heap. Intentionally kept small relative to RAM so the kernel can use the rest as page cache.
- Off-heap (direct buffers) — Java NIO buffers used by the network layer and by
FileChannelreads/writes. Sized by-XX:MaxDirectMemorySize. - Page cache (all remaining RAM) — OS-owned, holds recently written log segments. Both producers and consumers hit this rather than going to disk. A reader at the tail of the log (lag ≈ 0) almost never touches a spinning platter; it reads the same page the producer just wrote.
- Disk — log segments stored as flat append-only files under
log.dirs. The OS flushes dirty pages on its own schedule (controlled byvm.dirty_*sysctls). Kafka deliberately does not callfsyncper record; durability comes from ISR replication, not from waiting for disk.
CPU / threading model
- Network threads (
num.network.threads, default 3) — non-blockingepoll-style loop. One acceptor binds the listener, the others read framed requests off sockets and enqueue them onto a request channel. No per-connection thread. - Request handler / IO threads (
num.io.threads, default 8) — pull from the request channel and execute the actual work againstReplicaManager/LogManager/GroupCoordinator/TransactionCoordinator. This is where the CPU spends most of its time on the produce path (CRC, compression, log append) and on the consume path (offset lookup, fetch session bookkeeping). - Replica fetcher threads — follower brokers run dedicated fetcher threads that issue
FetchRequestagainst partition leaders to replicate. ISR membership is driven by how fresh each follower's fetch position is relative to the leader's log-end offset. - Background threads — log cleaner (compaction), log retention deleter, group coordinator heartbeat, transaction coordinator expiration, controller event thread (in KRaft mode the controllers run a separate Raft event loop).
The hot path for a tail-following consumer never reads from disk — bytes flow page cache → kernel sendfile → NIC with no copy into user space. The hot path for produce is a sequential append into the same page cache, and durability is provided by ISR replication (followers pull through their own fetcher threads), not by fsync per write.
Why Kafka is fast (in one paragraph each)
- Sequential append — every produce is a write to the end of a flat file. SSDs and spinning disks both like this; there are no random seeks, no in-place updates.
- Page cache instead of an internal cache — keeping JVM heap small lets the OS use the rest of RAM as page cache. A tail-following consumer hits the same page the producer just wrote, so there is no disk I/O on the hot path.
- Zero-copy on consume —
FileChannel.transferTo()callssendfile(); bytes go page cache → NIC without traversing user-space buffers. CPU spends no cycles on memcpy. - Batching + compression — producers batch records per partition before sending; brokers store, replicate, and serve those batches as-is. Compression is end-to-end (decompress only at the consumer).
- Pull-based replication — followers fetch from the leader on their own schedule with the same fetch API as a consumer; the leader has no per-follower send loop. ISR membership is driven by how fresh each follower's pull is.
5. CLI Commands
The scripts ship inside the Kafka distribution's bin/ directory. Examples assume BOOTSTRAP=localhost:9092. For a secured cluster, add --command-config client.properties.
Topic lifecycle — kafka-topics.sh
# Create a topic with 6 partitions and replication factor 3
kafka-topics.sh --bootstrap-server $BOOTSTRAP \
--create --topic orders \
--partitions 6 --replication-factor 3 \
--config min.insync.replicas=2 \
--config retention.ms=604800000
# List all topics (excluding internal __ topics)
kafka-topics.sh --bootstrap-server $BOOTSTRAP --list \
--exclude-internal
# Describe a topic: leader, replicas, ISR per partition
kafka-topics.sh --bootstrap-server $BOOTSTRAP --describe --topic orders
# Increase partition count (cannot decrease)
kafka-topics.sh --bootstrap-server $BOOTSTRAP \
--alter --topic orders --partitions 12
# Delete (requires delete.topic.enable=true on brokers)
kafka-topics.sh --bootstrap-server $BOOTSTRAP --delete --topic orders
Inspect or change topic config — kafka-configs.sh
# Show all configs for a topic (only dynamically overridden ones)
kafka-configs.sh --bootstrap-server $BOOTSTRAP \
--describe --entity-type topics --entity-name orders
# Add or update a topic-level config
kafka-configs.sh --bootstrap-server $BOOTSTRAP \
--alter --entity-type topics --entity-name orders \
--add-config min.insync.replicas=2,retention.ms=604800000
# Remove a previously-overridden config (revert to broker default)
kafka-configs.sh --bootstrap-server $BOOTSTRAP \
--alter --entity-type topics --entity-name orders \
--delete-config retention.ms
# Same pattern works for entity-type brokers, clients, users, ips
Quick produce / consume — kafka-console-producer.sh / kafka-console-consumer.sh
# Produce: one record per line, key:value separated by ':'
kafka-console-producer.sh --bootstrap-server $BOOTSTRAP \
--topic orders \
--property "parse.key=true" --property "key.separator=:"
# > order-42:{"id":42,"amount":1990}
# Consume from the beginning of the topic, printing key, value, offset
kafka-console-consumer.sh --bootstrap-server $BOOTSTRAP \
--topic orders \
--from-beginning \
--property print.key=true --property print.offset=true \
--property print.timestamp=true
# Consume as part of a consumer group, with manual commit disabled
kafka-console-consumer.sh --bootstrap-server $BOOTSTRAP \
--topic orders --group billing \
--consumer-property enable.auto.commit=false
Consumer groups and offsets — kafka-consumer-groups.sh
# List all consumer groups
kafka-consumer-groups.sh --bootstrap-server $BOOTSTRAP --list
# Describe a group: current offset, log-end-offset, lag per partition
kafka-consumer-groups.sh --bootstrap-server $BOOTSTRAP \
--describe --group billing
# Reset offsets to the earliest available for the entire topic
kafka-consumer-groups.sh --bootstrap-server $BOOTSTRAP \
--group billing --topic orders \
--reset-offsets --to-earliest --execute
# Reset to a specific timestamp (epoch ms) — useful for replay windows
kafka-consumer-groups.sh --bootstrap-server $BOOTSTRAP \
--group billing --topic orders \
--reset-offsets --to-datetime 2026-06-20T00:00:00.000 --execute
# Delete a consumer group (only when no active members remain)
kafka-consumer-groups.sh --bootstrap-server $BOOTSTRAP \
--delete --group billing
Partition reassignment — kafka-reassign-partitions.sh
# 1. Generate a candidate plan to move two topics across brokers 1,2,3
cat > topics.json <<'JSON'
{ "version": 1, "topics": [ { "topic": "orders" }, { "topic": "payments" } ] }
JSON
kafka-reassign-partitions.sh --bootstrap-server $BOOTSTRAP \
--topics-to-move-json-file topics.json \
--broker-list "1,2,3" --generate
# 2. Save the proposed assignment to reassign.json, then execute
kafka-reassign-partitions.sh --bootstrap-server $BOOTSTRAP \
--reassignment-json-file reassign.json --execute \
--throttle 50000000 # bytes/sec replication throttle
# 3. Verify when the move is complete, then drop the throttle
kafka-reassign-partitions.sh --bootstrap-server $BOOTSTRAP \
--reassignment-json-file reassign.json --verify
ACLs — kafka-acls.sh
# Allow user 'billing-app' to consume topic 'orders' as group 'billing'
kafka-acls.sh --bootstrap-server $BOOTSTRAP \
--add --allow-principal User:billing-app \
--operation Read --topic orders \
--operation Read --group billing
# Allow a producer to write to 'orders' (Write + Describe on the topic)
kafka-acls.sh --bootstrap-server $BOOTSTRAP \
--add --allow-principal User:order-svc \
--operation Write --operation Describe --topic orders
# List existing ACLs scoped to a topic
kafka-acls.sh --bootstrap-server $BOOTSTRAP \
--list --topic orders