Reference · Distributed Systems

Apache Kafka — Architecture, Message Flow, and CLI Reference

A compact reference for the parts of Kafka that come up most often when designing, debugging, or operating a cluster: the broker / topic / partition / consumer-group architecture, the end-to-end produce → replicate → consume flow with acks=all and ISR semantics, and the CLI commands used to inspect and manage all of the above.

1. Architecture

A Kafka cluster is a set of brokers that store messages in append-only logs. Each log is a partition, and partitions are grouped into named topics. Every partition has one leader broker and zero or more follower replicas; producers and consumers always talk to the leader for a given partition. The set of replicas that are caught up with the leader is the in-sync replica set (ISR). Cluster metadata (which broker leads which partition, which replicas are in-sync, topic configs, ACLs) is managed by the controller — in modern Kafka, this is a KRaft quorum running inside the broker JVMs; in older deployments, it is an external ZooKeeper ensemble.

Producer A acks=all Producer B key=orderId Kafka cluster (3 brokers) Broker 1 orders P0 (leader) orders P1 (follower) ISR: {1,2,3} Broker 2 orders P1 (leader) orders P2 (follower) ISR: {1,2,3} Broker 3 orders P2 (leader) orders P0 (follower) ISR: {1,2,3} KRaft controller quorum metadata: leaders, ISR, configs, ACLs Consumer group: billing Consumer 1 assigned: P0 Consumer 2 assigned: P1 Consumer 3 assigned: P2

Solid arrows: producer → partition leader and leader → consumer fetch. Dashed arrows: leader → follower replication (only ISR members are eligible for leader election on failure).

Core concepts in one line each

  • Broker — a Kafka server process; holds partition replicas on local disk and serves produce / fetch requests.
  • Topic — a named, partitioned, append-only log. Retention is time-based or size-based; compacted topics keep the latest value per key.
  • Partition — the unit of parallelism, ordering, and replication. Order is guaranteed within a partition, never across partitions.
  • Leader / Follower — one replica per partition is the leader; followers replicate from it. Reads and writes go to the leader by default.
  • ISR (in-sync replicas) — replicas that have caught up to the leader. acks=all + min.insync.replicas defines the durability contract.
  • Producer — picks a partition (by key hash, sticky partitioner, or explicit), batches records, and waits for the configured acks before considering a send successful.
  • Consumer group — a set of consumers that share partition assignments via the group coordinator. Each partition is consumed by exactly one member of a group at a time.
  • Offset — the monotonically increasing position within a partition. Consumers commit progress to the __consumer_offsets internal topic.
  • Controller — broker(s) that own cluster metadata. In KRaft mode this is a Raft quorum of dedicated or co-located controllers; in legacy mode it lives in ZooKeeper.

2. Message Flow

The end-to-end path of a single record with acks=all, two-replica replication, and a consumer group that commits offsets after processing.

Producer Leader broker Follower broker (ISR) Consumer (group: billing) 1 ProduceRequest(record, acks=all) 2 append to log, assign offset 3 FetchRequest (followers pull) 4 FetchResponse + LEO advanced 5 advance high-water mark (HW) 6 ProduceResponse(offset, timestamp) 7 FetchRequest (consumer poll, up to HW) 8 FetchResponse(records, lastOffset) 9 OffsetCommit(group=billing, offset=N+1) 10 OffsetCommitResponse

Blue arrows: requests. Green arrows: successful responses / acks. HW (high-water mark) is the highest offset that has been replicated to all in-sync replicas — consumers can only read up to it.

What the steps guarantee

  • Step 1–6 (produce path with acks=all) — the producer's send returns success only after every in-sync replica has appended the record. With min.insync.replicas=2, a single broker loss does not lose acknowledged data.
  • Step 5 (high-water mark) — readers only see offsets at or below the HW. Records above HW are written to disk but not yet visible to consumers, which prevents a consumer from reading data that might be lost on leader failover.
  • Step 7–8 (consumer poll) — order is preserved within a partition. Two consumers in the same group never receive the same partition, so each record is processed by exactly one consumer per group.
  • Step 9–10 (offset commit) — committing after processing gives at-least-once delivery (a crash between processing and commit re-processes records). Committing before processing gives at-most-once. Exactly-once across produce + consume requires the transactional API (enable.idempotence=true, transactional.id, isolation.level=read_committed).

3. Exactly-Once Semantics

Kafka's exactly-once story is built out of two independent mechanisms layered on top of each other: an idempotent producer that eliminates duplicates from producer retries within a single partition, and a transactional API that ties multiple partition writes and a consumer-offset commit into a single atomic unit. The transactional API enables the canonical consume → process → produce pattern (the basis of Kafka Streams' EOS guarantee) where the input offset commit and the output records become visible to downstream consumers together or not at all.

Idempotent producer

When you set enable.idempotence=true, the producer is assigned a 64-bit Producer ID (PID) by the broker at first contact and a per-partition monotonically increasing sequence number is attached to every record batch. The leader broker remembers the last sequence number it accepted from a (PID, partition) pair; any batch arriving with a sequence number that has already been committed (because the producer retried after a transient ack failure) is silently de-duplicated, and any out-of-order sequence is rejected with OutOfOrderSequenceException. The guarantee is scoped to a single producer session and a single partition.

Transactional API

Setting transactional.id (in addition to enable.idempotence=true) promotes the producer to a transactional one and pins it to a transaction coordinator — the broker that leads the __transaction_state partition corresponding to the transactional.id hash. The coordinator hands out a producer epoch that fences any older instance of the same transactional.id (so a zombie producer that comes back after a long pause cannot publish into a transaction owned by its successor). The transaction lifecycle is a two-phase commit:

  • BeginTxn — producer signals "I'm starting a transaction". No broker-side write yet.
  • AddPartitionsToTxn — for every new partition the producer touches, it tells the coordinator. The coordinator records this in __transaction_state so it knows which partitions need a commit/abort marker at the end.
  • Send records — the producer writes data records as usual, but each batch carries the (PID, epoch, txn-flag). The records are appended to the data partitions before commit; consumers using read_committed will not see them yet.
  • SendOffsetsToTxn (consume-process-produce) — the producer tells the coordinator "commit these consumer offsets atomically with the transaction". The coordinator forwards the offset commit to the group coordinator inside the transaction.
  • CommitTxn / AbortTxn — the coordinator writes a PREPARE_COMMIT (or PREPARE_ABORT) record to __transaction_state, then dispatches control records (commit or abort markers) to every partition the transaction touched. Only after all markers are appended does the coordinator write COMPLETE_COMMIT. Consumers using isolation.level=read_committed follow the markers to filter out aborted batches and only return committed ones, up to the last stable offset (LSO) — the offset before the earliest still-open transaction.
Tx Producer Transaction Coordinator Topic partitions (A, B) Consumer (read_committed) 1 initTransactions() · get PID + epoch (fences zombies) 2 beginTransaction() 3 AddPartitionsToTxn(A, B) 4 write BEGIN to __transaction_state 5 send(A, records) · batch tagged (PID, epoch, txn=true) 6 send(B, records) 7 sendOffsetsToTransaction(consumer offsets) 8 commitTransaction() 9 write PREPARE_COMMIT to __transaction_state 10 append CommitMarker to A, B (control records) 11 LSO advances · committed batches visible to read_committed 12 write COMPLETE_COMMIT to __transaction_state

Blue: client request flow. Orange: coordinator-internal control writes that drive the commit decision. Green: commit markers + the LSO advance that makes data visible to read_committed consumers. Until step 10 lands on every touched partition, the data records from steps 5–6 sit in the log but are invisible to read_committed.

Key knobs and properties

  • Producerenable.idempotence=true, transactional.id=<stable-id>, acks=all (forced by idempotence), max.in.flight.requests.per.connection ≤ 5 (forced).
  • Consumerisolation.level=read_committed. With read_uncommitted (the default) you see aborted batches too.
  • Brokertransaction.state.log.replication.factor, transaction.state.log.min.isr, transaction.max.timeout.ms. The __transaction_state topic is created automatically.
  • Failure mode — if the producer crashes mid-transaction, the coordinator times out the transaction and writes an AbortTxn marker, so consumers never see the half-written batches. A zombie producer with a stale epoch is rejected on its next request.

4. Memory / CPU / I/O Pipeline

A Kafka broker is essentially a network-attached append-only log that aggressively uses the kernel page cache instead of a JVM-resident cache. The throughput numbers Kafka is known for come from three architectural choices: sequential disk I/O on the produce path, page-cache reads served via sendfile() zero-copy on the consume path, and a thread pool design that decouples slow network I/O from CPU-bound request processing.

Memory layout of a broker process

  • JVM heap (often 6–8 GB) — controller / metadata cache, request queues, in-flight produce buffers, request-handler local state, index entries cached in heap. Intentionally kept small relative to RAM so the kernel can use the rest as page cache.
  • Off-heap (direct buffers) — Java NIO buffers used by the network layer and by FileChannel reads/writes. Sized by -XX:MaxDirectMemorySize.
  • Page cache (all remaining RAM) — OS-owned, holds recently written log segments. Both producers and consumers hit this rather than going to disk. A reader at the tail of the log (lag ≈ 0) almost never touches a spinning platter; it reads the same page the producer just wrote.
  • Disk — log segments stored as flat append-only files under log.dirs. The OS flushes dirty pages on its own schedule (controlled by vm.dirty_* sysctls). Kafka deliberately does not call fsync per record; durability comes from ISR replication, not from waiting for disk.

CPU / threading model

  • Network threads (num.network.threads, default 3) — non-blocking epoll-style loop. One acceptor binds the listener, the others read framed requests off sockets and enqueue them onto a request channel. No per-connection thread.
  • Request handler / IO threads (num.io.threads, default 8) — pull from the request channel and execute the actual work against ReplicaManager / LogManager / GroupCoordinator / TransactionCoordinator. This is where the CPU spends most of its time on the produce path (CRC, compression, log append) and on the consume path (offset lookup, fetch session bookkeeping).
  • Replica fetcher threads — follower brokers run dedicated fetcher threads that issue FetchRequest against partition leaders to replicate. ISR membership is driven by how fresh each follower's fetch position is relative to the leader's log-end offset.
  • Background threads — log cleaner (compaction), log retention deleter, group coordinator heartbeat, transaction coordinator expiration, controller event thread (in KRaft mode the controllers run a separate Raft event loop).
NIC producers / consumers Linux kernel epoll · TCP stack sendfile() syscall Broker JVM process Network threads num.network.threads (≈3) epoll loop · no per-conn thread RequestChannel queue bounded · backpressure IO request handlers num.io.threads (≈8) ReplicaManager / LogManager Group / Tx coordinator Page cache (OS) most RAM lives here recent log segments tail reads = no disk seek OS flushes dirty pages async Log segment files log.dirs (sequential append) .log · .index · .timeindex retention / compaction in bg durability via ISR, not fsync background: log cleaner · retention · controller / KRaft event loop · fetcher schedulers Replica fetcher on follower brokers FetchRequest loop drives ISR membership Consumer fetch FileChannel.transferTo zero-copy: page cache → NIC, no user copy zero-copy Blue = request path (CPU-bound stages). Orange = page cache (OS-managed, holds most RAM). Green = zero-copy consume (page cache → kernel sendfile → NIC, skips user space). Dashed = async OS flush of dirty pages to disk.

The hot path for a tail-following consumer never reads from disk — bytes flow page cache → kernel sendfile → NIC with no copy into user space. The hot path for produce is a sequential append into the same page cache, and durability is provided by ISR replication (followers pull through their own fetcher threads), not by fsync per write.

Why Kafka is fast (in one paragraph each)

  • Sequential append — every produce is a write to the end of a flat file. SSDs and spinning disks both like this; there are no random seeks, no in-place updates.
  • Page cache instead of an internal cache — keeping JVM heap small lets the OS use the rest of RAM as page cache. A tail-following consumer hits the same page the producer just wrote, so there is no disk I/O on the hot path.
  • Zero-copy on consumeFileChannel.transferTo() calls sendfile(); bytes go page cache → NIC without traversing user-space buffers. CPU spends no cycles on memcpy.
  • Batching + compression — producers batch records per partition before sending; brokers store, replicate, and serve those batches as-is. Compression is end-to-end (decompress only at the consumer).
  • Pull-based replication — followers fetch from the leader on their own schedule with the same fetch API as a consumer; the leader has no per-follower send loop. ISR membership is driven by how fresh each follower's pull is.

5. CLI Commands

The scripts ship inside the Kafka distribution's bin/ directory. Examples assume BOOTSTRAP=localhost:9092. For a secured cluster, add --command-config client.properties.

Topic lifecycle — kafka-topics.sh

# Create a topic with 6 partitions and replication factor 3
kafka-topics.sh --bootstrap-server $BOOTSTRAP \
  --create --topic orders \
  --partitions 6 --replication-factor 3 \
  --config min.insync.replicas=2 \
  --config retention.ms=604800000

# List all topics (excluding internal __ topics)
kafka-topics.sh --bootstrap-server $BOOTSTRAP --list \
  --exclude-internal

# Describe a topic: leader, replicas, ISR per partition
kafka-topics.sh --bootstrap-server $BOOTSTRAP --describe --topic orders

# Increase partition count (cannot decrease)
kafka-topics.sh --bootstrap-server $BOOTSTRAP \
  --alter --topic orders --partitions 12

# Delete (requires delete.topic.enable=true on brokers)
kafka-topics.sh --bootstrap-server $BOOTSTRAP --delete --topic orders

Inspect or change topic config — kafka-configs.sh

# Show all configs for a topic (only dynamically overridden ones)
kafka-configs.sh --bootstrap-server $BOOTSTRAP \
  --describe --entity-type topics --entity-name orders

# Add or update a topic-level config
kafka-configs.sh --bootstrap-server $BOOTSTRAP \
  --alter --entity-type topics --entity-name orders \
  --add-config min.insync.replicas=2,retention.ms=604800000

# Remove a previously-overridden config (revert to broker default)
kafka-configs.sh --bootstrap-server $BOOTSTRAP \
  --alter --entity-type topics --entity-name orders \
  --delete-config retention.ms

# Same pattern works for entity-type brokers, clients, users, ips

Quick produce / consume — kafka-console-producer.sh / kafka-console-consumer.sh

# Produce: one record per line, key:value separated by ':'
kafka-console-producer.sh --bootstrap-server $BOOTSTRAP \
  --topic orders \
  --property "parse.key=true" --property "key.separator=:"
# > order-42:{"id":42,"amount":1990}

# Consume from the beginning of the topic, printing key, value, offset
kafka-console-consumer.sh --bootstrap-server $BOOTSTRAP \
  --topic orders \
  --from-beginning \
  --property print.key=true --property print.offset=true \
  --property print.timestamp=true

# Consume as part of a consumer group, with manual commit disabled
kafka-console-consumer.sh --bootstrap-server $BOOTSTRAP \
  --topic orders --group billing \
  --consumer-property enable.auto.commit=false

Consumer groups and offsets — kafka-consumer-groups.sh

# List all consumer groups
kafka-consumer-groups.sh --bootstrap-server $BOOTSTRAP --list

# Describe a group: current offset, log-end-offset, lag per partition
kafka-consumer-groups.sh --bootstrap-server $BOOTSTRAP \
  --describe --group billing

# Reset offsets to the earliest available for the entire topic
kafka-consumer-groups.sh --bootstrap-server $BOOTSTRAP \
  --group billing --topic orders \
  --reset-offsets --to-earliest --execute

# Reset to a specific timestamp (epoch ms) — useful for replay windows
kafka-consumer-groups.sh --bootstrap-server $BOOTSTRAP \
  --group billing --topic orders \
  --reset-offsets --to-datetime 2026-06-20T00:00:00.000 --execute

# Delete a consumer group (only when no active members remain)
kafka-consumer-groups.sh --bootstrap-server $BOOTSTRAP \
  --delete --group billing

Partition reassignment — kafka-reassign-partitions.sh

# 1. Generate a candidate plan to move two topics across brokers 1,2,3
cat > topics.json <<'JSON'
{ "version": 1, "topics": [ { "topic": "orders" }, { "topic": "payments" } ] }
JSON

kafka-reassign-partitions.sh --bootstrap-server $BOOTSTRAP \
  --topics-to-move-json-file topics.json \
  --broker-list "1,2,3" --generate

# 2. Save the proposed assignment to reassign.json, then execute
kafka-reassign-partitions.sh --bootstrap-server $BOOTSTRAP \
  --reassignment-json-file reassign.json --execute \
  --throttle 50000000   # bytes/sec replication throttle

# 3. Verify when the move is complete, then drop the throttle
kafka-reassign-partitions.sh --bootstrap-server $BOOTSTRAP \
  --reassignment-json-file reassign.json --verify

ACLs — kafka-acls.sh

# Allow user 'billing-app' to consume topic 'orders' as group 'billing'
kafka-acls.sh --bootstrap-server $BOOTSTRAP \
  --add --allow-principal User:billing-app \
  --operation Read --topic orders \
  --operation Read --group billing

# Allow a producer to write to 'orders' (Write + Describe on the topic)
kafka-acls.sh --bootstrap-server $BOOTSTRAP \
  --add --allow-principal User:order-svc \
  --operation Write --operation Describe --topic orders

# List existing ACLs scoped to a topic
kafka-acls.sh --bootstrap-server $BOOTSTRAP \
  --list --topic orders

Related