Keyboard shortcuts

Press or to navigate between chapters

Press S or / to search in the book

Press ? to show this help

Press Esc to hide this help

ISR: Kafka’s Approach to Not Quite Consensus

A Different Kind of Agreement

Apache Kafka is one of the most successful distributed systems ever built. It handles trillions of messages per day at companies like LinkedIn, Netflix, and Uber. It provides durability, ordering, and replication guarantees that, in practice, are good enough for an enormous range of use cases.

And it does not use consensus.

Or rather, it did not, for the first decade of its existence. Kafka’s replication mechanism — the In-Sync Replica (ISR) protocol — is a carefully designed system that provides guarantees similar to consensus without actually implementing a consensus protocol. It is a masterclass in engineering pragmatism: rather than solving the general problem, Kafka solves the specific problem of replicated log appending, and it does so with a mechanism that is simpler, faster, and more operationally transparent than Paxos or Raft.

Understanding the ISR protocol — what it guarantees, what it does not, and why the distinction matters — is essential for anyone who operates Kafka or who wants to understand the design space between “full consensus” and “no coordination at all.”

The ISR Model

Kafka’s replication model is built around a few key concepts:

Partition Leader. Each Kafka partition has a single leader replica. All reads and writes go through the leader. This is similar to Multi-Paxos or Raft — there is a designated leader for each unit of replication.

Followers. Other replicas for the partition are followers. Followers pull data from the leader by issuing fetch requests (similar to Raft’s AppendEntries, but reversed in direction — followers pull rather than the leader pushing).

In-Sync Replicas (ISR). The ISR is the set of replicas that are “caught up” with the leader. A replica is in-sync if it has fetched all messages up to the leader’s log end offset within a configurable time window (replica.lag.time.max.ms, default 30 seconds).

High Water Mark (HW). The high water mark is the offset up to which all ISR members have replicated. Messages below the HW are considered committed. Messages above the HW but below the log end offset (LEO) are written but not yet committed.

Structure PartitionState:
    leader: ReplicaId
    replicas: Set<ReplicaId>           // all assigned replicas
    isr: Set<ReplicaId>                // in-sync replicas (subset of replicas)
    leo: Map<ReplicaId, Offset>        // log end offset per replica
    hw: Offset                          // high water mark

    // INVARIANT: leader IN isr
    // INVARIANT: isr SUBSET_OF replicas
    // INVARIANT: hw = min(leo[r] for r in isr)

The Produce Path

When a producer sends a message to Kafka, the following happens:

Procedure HandleProduce(partition, message, required_acks):
    assert self.id == partition.leader

    // Step 1: Append to leader's local log
    offset = AppendToLog(partition, message)
    partition.leo[self.id] = offset

    // Step 2: Behavior depends on acks setting
    if required_acks == 0:
        // "Fire and forget" — don't even wait for local write
        Reply(Success{offset: offset})

    else if required_acks == 1:
        // Wait for local write only
        Reply(Success{offset: offset})

    else if required_acks == -1:  // acks=all
        // Wait until ALL replicas in ISR have fetched this offset
        WaitUntil(
            for all r in partition.isr:
                partition.leo[r] >= offset
        )
        // Update high water mark
        partition.hw = min(partition.leo[r] for r in partition.isr)
        Reply(Success{offset: offset})

The Fetch Path

Followers continuously fetch from the leader:

Procedure FollowerFetchLoop(partition):
    while true:
        // Fetch messages from leader starting at our current LEO
        response = FetchFromLeader(partition, start_offset = partition.leo[self.id])

        if response.messages is not empty:
            // Append fetched messages to local log
            for each message in response.messages:
                AppendToLog(partition, message)
            partition.leo[self.id] = last_appended_offset + 1

        // Update our local high water mark to match leader's
        partition.hw = min(partition.hw, response.leader_hw)

        // Brief pause if no new data
        if response.messages is empty:
            Sleep(fetch.wait.max.ms)
Procedure HandleFetchFromFollower(partition, follower_id, start_offset):
    // Update our knowledge of the follower's LEO
    partition.leo[follower_id] = start_offset

    // Check if follower should be in ISR
    UpdateISRMembership(partition, follower_id)

    // Advance high water mark if possible
    new_hw = min(partition.leo[r] for r in partition.isr)
    if new_hw > partition.hw:
        partition.hw = new_hw

    // Return messages from start_offset to our LEO
    messages = ReadLog(partition, start_offset, partition.leo[self.id])

    Reply(FetchResponse{
        messages: messages,
        leader_hw: partition.hw
    })

ISR Maintenance

The ISR is dynamic. Replicas can be added and removed based on their replication lag:

Procedure UpdateISRMembership(partition, follower_id):
    current_time = Now()

    if follower_id in partition.isr:
        // Check if follower has fallen behind
        if partition.leo[follower_id] < partition.leo[self.id]:
            if current_time - last_caught_up_time[follower_id] > replica.lag.time.max.ms:
                // Follower is too far behind — remove from ISR
                partition.isr.remove(follower_id)
                // Persist ISR change to controller/ZooKeeper
                PersistISRChange(partition)
                Log("Removed replica {} from ISR for partition {}",
                    follower_id, partition.id)
        else:
            // Follower is caught up
            last_caught_up_time[follower_id] = current_time

    else:
        // Follower is not in ISR — check if it has caught up
        if partition.leo[follower_id] >= partition.leo[self.id]:
            // Follower has caught up — add to ISR
            partition.isr.add(follower_id)
            PersistISRChange(partition)
            Log("Added replica {} to ISR for partition {}",
                follower_id, partition.id)

The min.insync.replicas Setting

The ISR can shrink. In the worst case, it can shrink to just the leader. If the leader is the only ISR member, then acks=all degenerates to acks=1 — the leader acknowledges writes that have not been replicated anywhere.

The min.insync.replicas setting addresses this. When set to M, the leader will reject produce requests with acks=all if the ISR has fewer than M members.

Procedure HandleProduceWithMinISR(partition, message, required_acks):
    if required_acks == -1:  // acks=all
        if |partition.isr| < min.insync.replicas:
            Reply(NotEnoughReplicasException)
            return

    // ... proceed with normal produce path

The typical production configuration is:

  • Replication factor: 3
  • min.insync.replicas: 2
  • acks: all

This means: writes are acknowledged only when at least 2 out of 3 replicas (the leader plus at least one follower) have the data. If two replicas fail, the partition becomes unavailable for writes (but may still serve reads from the remaining replica).

Why This Is Not Consensus

The ISR mechanism provides strong guarantees in practice, but it is not consensus in the formal sense. Here is why:

No Voting on Values

In Paxos or Raft, replicas vote on proposed values. A value is chosen only if a quorum agrees. The voting mechanism ensures that no two conflicting values can both be chosen.

In Kafka, the leader unilaterally decides the log contents. Followers do not vote — they replicate. If the leader appends message X at offset 100, followers will eventually have message X at offset 100 or they will be removed from the ISR. There is no mechanism for followers to reject or propose alternatives.

The Unclean Leader Election Problem

The most important difference between ISR and consensus is what happens when the leader fails.

In Raft, a new leader must have the most up-to-date log. The election protocol guarantees this: a candidate must receive votes from a majority, and replicas will not vote for a candidate with a shorter log. This means the new leader is guaranteed to have all committed entries.

In Kafka, when the leader fails, the controller selects a new leader from the ISR. If all ISR members are available, this is safe — any ISR member has all committed data (all data up to the high water mark).

But what if all ISR members fail? Kafka has a configuration called unclean.leader.election.enable. If set to true (the default in early Kafka versions, now false by default), Kafka will elect a non-ISR replica as leader. This replica may be missing committed messages. This results in data loss.

Timeline of the unclean leader election problem:

Time 1: Leader=R1, ISR={R1, R2, R3}
    R1 log: [A, B, C, D, E]    (LEO=5)
    R2 log: [A, B, C, D, E]    (LEO=5, in ISR)
    R3 log: [A, B, C, D]       (LEO=4, in ISR — slightly behind but within lag threshold)

Time 2: R1 fails, R2 becomes leader, ISR={R2, R3}
    R2 log: [A, B, C, D, E]
    R3 log: [A, B, C, D, E]    (catches up)
    OK so far — no data loss.

Time 3: R2 AND R3 fail. R1 recovers (but was already removed from ISR).

    If unclean.leader.election.enable = true:
        R1 becomes leader with log [A, B, C, D, E]  — actually this is fine here.

    More problematic scenario:

Time 1: Leader=R1, ISR={R1, R2}
    R1 log: [A, B, C, D, E]    (LEO=5, HW=5)
    R2 log: [A, B, C, D, E]    (LEO=5)
    R3 log: [A, B, C]          (LEO=3, NOT in ISR — fell behind)

Time 2: R1 and R2 fail simultaneously.
    If unclean.leader.election.enable = true:
        R3 becomes leader with log [A, B, C]
        Messages D and E are LOST — even though they were committed (HW=5)

With unclean.leader.election.enable = false (the recommended setting), Kafka will wait for an ISR member to recover rather than electing a stale replica. This trades availability for consistency — the partition is unavailable until an ISR member returns.

No Log Reconciliation Protocol

In Raft, when a new leader is elected, there is an explicit log reconciliation protocol. The leader overwrites inconsistent entries on followers. This ensures all replicas converge to the same log.

Kafka handles this more informally. When a new leader is elected, it truncates its log to the high water mark (discarding any uncommitted entries). Followers then truncate their logs to match the new leader’s log. This works correctly in the common case but depends on the HW being propagated correctly — which introduces subtle edge cases.

The HW propagation delay is one of the trickier aspects of Kafka’s replication. The HW is updated on the leader when all ISR members fetch up to a certain offset. But followers learn the HW from the next fetch response. This means there is always a window where a follower’s local HW is behind the leader’s HW. If a failure occurs during this window, the follower may truncate committed messages during leader transition.

Kafka addressed this with KIP-101 (leader epoch), which adds a mechanism for followers to verify their log consistency with the new leader using epoch numbers rather than relying solely on the HW.

ISR vs. Raft: A Detailed Comparison

PropertyRaftKafka ISR
Leader electionVoting with log completeness checkController selects from ISR
Replication directionLeader pushes (AppendEntries)Followers pull (Fetch)
Commit conditionMajority of ALL replicasAll replicas in ISR
Quorum membershipFixed (all replicas)Dynamic (ISR changes)
Handling slow replicasSlow replica = slow commitSlow replica removed from ISR
Data loss on leader failureImpossible (if majority survives)Impossible (if ISR member survives, unclean election disabled)
Availability during partitionsRequires majorityRequires min.insync.replicas ISR members
Log divergence resolutionLeader overwrites followersTruncate to HW, sync from new leader

The most important difference is how they handle slow replicas. In Raft, a slow replica is still part of the quorum. If you have 5 replicas and one is on a slow disk, every commit must still wait for 3 replicas — and the slow one might be the third. You cannot kick it out of the quorum.

In Kafka, a slow replica drops out of the ISR. The system continues committing with the remaining ISR members. This is operationally superior for Kafka’s use case (high-throughput log appending) because it prevents a single slow replica from degrading the entire system.

The trade-off is that the ISR can shrink to the point where a single failure causes data loss (if only one ISR member remains and it fails). Raft’s fixed majority quorum provides a stronger guarantee: as long as a majority of all replicas survive, no data is lost.

KRaft: Kafka’s Move Toward Real Consensus

For the first decade of its existence, Kafka relied on Apache ZooKeeper for cluster metadata management: topic configurations, partition assignments, ISR membership, controller election. ZooKeeper itself implements a consensus protocol (ZAB), so Kafka was already using consensus — just not for data replication.

KRaft (Kafka Raft), introduced in KIP-500 and generally available since Kafka 3.3, replaces ZooKeeper with a built-in Raft implementation for metadata management. This is significant for several reasons:

  1. Operational simplicity. No more managing a separate ZooKeeper cluster. One system instead of two.
  2. Scalability. ZooKeeper was a bottleneck for clusters with many partitions (hundreds of thousands). KRaft’s metadata log scales better.
  3. Faster controller failover. The Raft-based controller election is faster than ZooKeeper-based election.

Importantly, KRaft is used for metadata only. Data replication still uses the ISR protocol. This is a pragmatic choice: the ISR protocol’s performance characteristics (pull-based, dynamic quorum, optimized for throughput) are well-suited for data, while Raft’s stronger guarantees are appropriate for metadata (which is lower volume but requires stricter consistency).

// KRaft metadata log — uses Raft
Structure MetadataLog:
    // Topics, partitions, ISR membership, broker registrations
    // All metadata changes go through Raft consensus
    entries: List<MetadataEntry>
    committed_offset: Offset

// Data log — uses ISR
Structure DataPartitionLog:
    // Actual user messages
    // Replicated via pull-based ISR protocol
    entries: List<ProducerMessage>
    hw: Offset
    isr: Set<ReplicaId>  // ISR itself is managed via MetadataLog

The architecture is a clean separation of concerns: Raft handles the coordination plane (what should be where), and ISR handles the data plane (actually moving bytes around).

When ISR Works Brilliantly

Kafka’s ISR protocol is not a general-purpose consensus protocol, and it does not try to be. It is specifically designed for one thing: high-throughput, ordered, durable log appending. For this use case, it has several advantages:

Throughput. Pull-based replication lets the leader batch writes without waiting for per-message acknowledgment from followers. The leader appends messages to its log at full speed; followers fetch in large batches at their own pace. This is dramatically more efficient than Raft’s per-entry acknowledgment for high-throughput workloads.

Elastic fault tolerance. The dynamic ISR means the system degrades gracefully. Lose one replica? The ISR shrinks but the system continues. Lose two? Still running (if min.insync.replicas allows). In Raft, you cannot dynamically adjust the quorum size — you must reconfigure the cluster.

Operational simplicity. The ISR is observable. Operators can see exactly which replicas are in sync, how far behind each replica is, and when replicas fall out of the ISR. This operational transparency is invaluable. In contrast, Raft’s internals (commit index, match index, next index) are harder to monitor and interpret.

Natural back-pressure. When a follower cannot keep up, it naturally falls out of the ISR. The system does not slow down to wait for it. This is the right behavior for a messaging system where throughput is paramount.

When ISR Falls Short

Exactly-once semantics. Kafka supports “exactly-once” delivery with idempotent producers and transactions. But the transactional protocol is built on top of ISR replication, which creates interesting challenges. The transaction coordinator must ensure that transaction markers are committed (in the ISR sense) before reporting success. If the coordinator fails mid-transaction, recovery depends on the ISR protocol having replicated the transaction state correctly.

Strong consistency for reads. Kafka consumers, by default, read up to the high water mark. But the HW propagation delay means a consumer might not see the latest committed messages immediately. For use cases requiring strict read-your-writes consistency, consumers must either read from the leader (which Kafka does by default) or use specific fetch configurations.

State machine replication. ISR is designed for log appending, not for general state machine replication. You cannot use Kafka’s ISR protocol to replicate a key-value store or a database. For that, you need actual consensus. This is why systems like CockroachDB and etcd use Raft, not ISR.

Split-brain prevention. Kafka relies on the controller (and previously ZooKeeper) to prevent split-brain scenarios. The controller is the single source of truth for who the leader is. If the controller itself has a split-brain problem, Kafka’s ISR protocol provides no protection. This is why moving to KRaft (which uses Raft for the controller) was important.

The Broader Lesson

Kafka’s ISR protocol demonstrates that you do not always need to solve the general problem. Consensus — in its full generality — solves state machine replication for arbitrary deterministic state machines. But if your state machine is specifically a log (append-only, sequential writes, batch reads), you can design a simpler, faster protocol that provides “good enough” guarantees.

The engineering insight is this: the ISR protocol is not a weaker version of consensus. It is a different tool designed for a different problem. Comparing ISR to Raft is like comparing a socket wrench to a Swiss army knife — the socket wrench does fewer things but does its one thing very well.

The lesson for practitioners is to understand what guarantees you actually need before reaching for a consensus protocol. If you need to replicate a log with high throughput and you can tolerate the ISR’s edge cases (which most messaging use cases can), Kafka’s approach may be a better fit than bolting Raft onto everything.

If you need strict linearizability, arbitrary state machine replication, or protection against all possible failure modes without operational intervention, use a real consensus protocol. Just do not assume you always need one. Kafka’s trillion-message-per-day success is proof that sometimes “not quite consensus” is exactly right.