Distributed Systems for the Rest of Us
A CloudStreet Book
This book is for developers who have hit a race condition in production and lost.
Not “lost” as in “my program crashed and I debugged it.” Lost as in: “The data was wrong, nobody knows how it got wrong, two customers called, and we spent a Thursday morning reading logs from three different services trying to reconstruct what happened. We found the bug. We fixed it. We added a comment that says // THIS ORDER MATTERS. We moved on.”
That experience — that specific Thursday — is what distributed systems is actually about.
The academic literature calls it “the problem of coordinating state across multiple nodes in the presence of partial failures.” You called it “the worst day this quarter.” Both descriptions are accurate.
What This Book Is
This is a practical book about distributed systems concepts: the ideas you need to understand why things go wrong and how to design systems that fail less badly.
It is not a comprehensive academic treatment. It will not teach you to implement Raft from scratch (though you’ll understand how Raft works). It will not make you ready to design Google Spanner (though you’ll understand what Spanner is solving). What it will do is make you a better architect, a sharper debugger, and someone who can look at a system design and say “that’s going to have consistency problems under partition” before it’s 3am and the on-call rotation has your name on it.
The concepts here scale from “I have two microservices and a database” all the way up. The math works at any scale. The trade-offs just get more expensive as you grow.
What This Book Assumes
You understand databases and networking at a working level. You know what a transaction is. You’ve dealt with HTTP. You’ve probably seen a queue. You don’t need reminding what a foreign key is.
What this book does not assume is that you’ve thought deeply about what happens when your database has two replicas and the replication lag is 300ms and a request hits the replica and not the primary. Or what “eventually consistent” actually means in terms of what your users see and when.
That’s the gap. This book lives there.
How to Read This Book
Sequentially, ideally. The chapters build on each other. Fallacies before CAP, CAP before consistency models, consistency before replication. If you skip around, the forward references will be annoying.
That said: if you’re here because you’re debugging something right now, jump to the chapter that matches your problem. The chapter on network partitions doesn’t require you to have read the chapter on clocks. Use what you need.
Let’s start with the uncomfortable truth: you’re already doing this.
You’re Already Doing Distributed Systems
Here is a thing that happens to developers:
You’re building a feature. You need to send a welcome email when a user signs up. Simple enough. You save the user to the database, then call your email service. It works fine in development. It works fine in staging. You ship it.
Three weeks later, you notice some users never got the welcome email. You dig into the logs. The database write succeeded. The email call… timed out. Or the email service was down for 90 seconds at 2am. Or the server process was killed halfway through. The user exists. The email never sent.
Congratulations. You have just experienced your first distributed systems problem in production.
The system was distributed the moment you had two things — a database and an email service — that needed to agree on what happened. They didn’t agree. Reality was inconsistent. Someone noticed.
What “Distributed” Actually Means
People hear “distributed systems” and picture Google’s infrastructure: hundreds of datacenters, thousands of nodes, petabytes of data, entire teams whose job is keeping the thing running. That’s distributed systems at scale. But the problems start the moment you have more than one process that needs to coordinate.
Two microservices talking over HTTP: distributed system. An application server plus a database: distributed system. A job queue with workers: distributed system. An application that calls a third-party API: distributed system.
The defining characteristic isn’t scale — it’s that you have multiple processes that don’t share memory, connected by a network, that need to do work together.
This matters because:
- Processes can fail independently. Your app server can die while the database stays up, or vice versa.
- The network between them can fail. Or go slow. Or deliver messages out of order.
- There’s no shared clock. Two processes saying “this happened at 14:32:07” may be referring to slightly different moments, and may not agree on which happened first.
- There’s no shared state. Each process sees its own view of the world, and those views can diverge.
If that sounds scary, it should, a little. But these problems are understood. They have names. They have known solutions and known trade-offs. That’s what this book is about.
The Problems Have Been Around Forever
The canonical paper on distributed systems fallacies was written in the early 1990s. The CAP theorem was formalized in 2000. Lamport clocks date to 1978. The Byzantine Generals Problem is from 1982.
These aren’t new problems. They’re not unsolved problems. They’re just problems that become your problems the moment you wire two processes together.
The frustrating part isn’t that the problems exist. It’s that they’re often invisible until something goes wrong. Single-process applications fail loudly: an exception propagates up, you get a stack trace, you find the line. Distributed systems fail quietly: a write succeeded on one side and silently didn’t propagate to the other. A message was delivered twice. A timeout happened and the caller doesn’t know whether the operation succeeded.
A Taxonomy of What Can Go Wrong
Before we get into solutions, let’s be precise about the problem space. Distributed systems fail in a few distinct ways:
Crash failures
A process dies. It stops responding. It doesn’t send any more messages. This is actually the best kind of failure — it’s detectable and it’s clean. The dead process doesn’t lie to you.
Omission failures
A process is running but stops responding to some (or all) messages. This is harder — from the outside, it looks like a crash, but it isn’t. Network partitions produce this kind of behavior.
Timing failures
A process responds, but too slowly. Your distributed system makes assumptions about timing (implicit or explicit), and a slow response violates them. This is why timeouts are hard: too short and you get false failures; too long and you’re just sitting there waiting while your users notice.
Byzantine failures
A process responds with incorrect or malicious information. It actively lies. This is the hardest category to handle and also the rarest in systems you control. We’ll touch on it when we discuss consensus, but for most of this book, we assume nodes fail by crashing or going silent, not by fabricating responses.
Most production incidents involve omission failures and timing failures, not Byzantine ones. Your database isn’t lying to you. It’s just slow, or unreachable, or in the middle of a failover.
The Fundamental Tension
Every distributed systems decision comes down to a tension between two things:
- Correctness: making sure your system’s data and behavior are accurate and consistent
- Availability: making sure your system keeps working even when parts of it fail
These goals pull in opposite directions. To be maximally correct, you sometimes need to stop and refuse to answer (because you’re not sure you have the right answer). To be maximally available, you sometimes need to answer even though you might be wrong.
Most systems don’t live at either extreme. They make a choice somewhere in the middle, often without realizing they’ve made a choice at all. One of the goals of this book is to help you make that choice deliberately.
What “Thinking Like a Distributed Systems Engineer” Means
It mostly means getting comfortable with a specific mental shift: instead of thinking about what your code does, think about what can happen between the steps.
Single-process thinking: “I write to the database, then I send the email.”
Distributed-systems thinking: “Between writing to the database and sending the email, what can fail? What does the system look like if it fails? Is that state valid? Can we recover from it?”
This isn’t pessimism. It’s precision. The failures will happen. The question is whether you designed for them or not.
The email example at the top of this chapter has several correct solutions. You could use a transactional outbox pattern (write the email intent to the database in the same transaction as the user creation, then have a separate process pick it up and send it). You could use an event-driven architecture where “user created” is an event that the email service consumes reliably. You could accept the rare failure and add a retry mechanism.
All of these are reasonable. None of them are “just call the email API in the handler.” Because that’s not the shape of the problem.
Let’s Map the Terrain
Here’s what the rest of this book covers, and why it’s in this order:
Chapter 2 — The Eight Fallacies lays out the wrong assumptions that every developer makes about distributed systems. Read them once, internalize them, and a lot of later bugs will become obvious.
Chapter 3 — CAP Theorem introduces the central trade-off of distributed systems: you can’t have consistency and availability both, all the time. This gives us a framework for making decisions.
Chapter 4 — Consistency Models goes deeper on what “consistent” even means. Turns out it’s a spectrum, and where you sit on that spectrum has concrete effects on what your users see.
Chapter 5 — Replication covers what happens when you have multiple copies of data and need them to stay in sync. Spoiler: they won’t, perfectly, and that’s okay if you understand the rules.
Chapter 6 — Consensus explains how distributed systems make collective decisions — electing leaders, agreeing on a log of operations. This is where Raft and Paxos live.
Chapter 7 — Clocks and Time tackles the uncomfortable truth that time doesn’t work the way you think it does across a network. Lamport clocks. Vector clocks. Why “last write wins” is more complicated than it sounds.
Chapter 8 — Fault Tolerance covers the patterns for building systems that degrade gracefully: retries, idempotency, bulkheads, graceful degradation.
Chapter 9 — Network Partitions goes deep on the specific failure mode that causes the most trouble: the network split where both sides are running and neither can tell the other is still there.
Chapter 10 — Patterns is the practical chapter: sagas, the outbox pattern, circuit breakers, and other concrete solutions to recurring problems.
Chapter 11 — Making Real Decisions brings it together. Given your actual system, your actual scale, and your actual constraints, how do you pick the right tools and make the right trade-offs?
Chapter 12 — Where to Go From Here is resources, further reading, and the honest acknowledgment that this book is a starting point, not an ending one.
Let’s get started with the assumptions that will kill you if you don’t unlearn them.
The Eight Fallacies of Distributed Computing (They Were Right)
In the early 1990s, Peter Deutsch and colleagues at Sun Microsystems compiled a list of assumptions that developers routinely make when building distributed systems. They called them “the fallacies of distributed computing.”
The list has survived 30 years because the fallacies are sticky. Every developer who has shipped distributed code has violated at least four of them. Most have violated all eight, several times, in production.
Here they are, with commentary on how they kill you.
Fallacy 1: The Network Is Reliable
This is the foundational mistake. It feels true locally — you open a connection, you send bytes, you receive bytes. The connection is either up or down, and it’s usually up. So you code as if that’s always the case.
But at any given moment, your network call might:
- Succeed normally
- Fail immediately with a connection error
- Start, then hang indefinitely
- Start, deliver the request, and fail before delivering the response
- Succeed, but the response arrives after your timeout has already fired
That last one is particularly treacherous. You timed out and retried, but the original request already succeeded. You’ve now done the operation twice.
Client Network Server
| | |
|--- request --->| |
| -|-- request --->|
| -| |--- processing...
[timeout fires] -| |
|--- retry ----->| |
| -|-- retry ----->|
| -| |--- processing again...
| |<-- response --| (first response, delivered late)
|<-- response ---| |
| |<-- response --| (second response)
|<-- response ---| |
The client got two responses. The server did the work twice. If that work was “charge this card” or “place this order,” you have a problem.
The fix isn’t to use a more reliable network. The fix is to design operations to be idempotent — safe to execute multiple times with the same effect as executing once. More on that in the fault tolerance chapter.
What this means in practice: Every network call needs a timeout. Every network call that can be retried should be designed to handle retries safely. Never assume a call succeeded because you didn’t get an error.
Fallacy 2: Latency Is Zero
Local function calls take nanoseconds. Database calls on the same machine take microseconds. A network call to another service takes milliseconds. Depending on what’s between you and the other service, it can take tens of milliseconds.
That doesn’t sound like much until you have a request handler that makes 20 sequential database calls. Each one is 5ms. That’s 100ms of just database time, minimum, and that’s assuming everything is on the same local network. Add a downstream service call and you’re at 150ms. Add another and you’re at 200ms.
Request comes in
└─ Auth check (5ms)
└─ Load user (5ms)
└─ Load user preferences (5ms)
└─ Load feature flags (5ms)
└─ Fetch recommendations (30ms via HTTP)
└─ Load each recommendation's details (10ms × 5 = 50ms)
└─ Log the request (5ms)
Total: ~105ms minimum, not counting processing time
And that’s the happy path, with no retries and no queueing.
What this means in practice: Latency accumulates. Sequential calls multiply. Parallel calls help but add complexity. Think about the critical path of every request — the longest sequential chain of operations — and optimize that specifically. Caching and batching exist because latency isn’t zero; use them.
Fallacy 3: Bandwidth Is Infinite
This one was more relevant in the 1990s when networks were genuinely slow. But it still bites in predictable ways.
Serializing a large object graph and sending it over the wire costs more than you think. Sending a 50-field database record when you need 3 fields wastes bandwidth and CPU on serialization and deserialization. Sending the same data to 100 subscribers instead of once puts load on the network that you didn’t account for.
The more interesting modern version of this fallacy is chatty interfaces. A REST API that requires 10 round-trips to get the data needed to render one screen isn’t just slow because of latency — each round-trip also consumes bandwidth proportional to the payload size. At scale, this becomes a significant cost.
What this means in practice: Design APIs around the data consumers need, not around the entities in your data model. GraphQL exists partly because of this fallacy. Paginate large result sets. Don’t send the whole object when you only need the ID.
Fallacy 4: The Network Is Secure
The network is not secure. Traffic can be intercepted. Services can be spoofed. A caller claiming to be your authentication service might not be.
The good news is that the industry has mostly internalized this one for public-facing traffic. HTTPS is the default. TLS is standard.
The bad news is that it’s still widely violated for internal traffic. “It’s fine, it’s all behind the firewall.” Until it isn’t — because someone gets in, or an employee misbehaves, or a misconfigured cloud security group exposes an internal service.
Zero-trust networking exists because “behind the firewall” is not a security model, it’s an assumption. Every service-to-service call should authenticate. Every piece of data should be encrypted in transit. Treat internal traffic with the same skepticism as external traffic.
What this means in practice: mTLS for service-to-service communication. Don’t trust network location as a substitute for authentication. Assume breach — design systems that limit what an attacker can do even if they get access to your internal network.
Fallacy 5: Topology Doesn’t Change
The network layout you have today will not be the network layout you have in six months. Services get added. Services get removed. Load balancers get reconfigured. Cloud instances get terminated and replaced. IP addresses change.
Code that hardcodes IP addresses is the obvious violation, but the more subtle violation is code that assumes stable routing. If Service A always talks to Service B via a fixed path, and that path changes due to a load balancer reconfiguration, A might not notice. Or it might start getting errors it doesn’t understand.
Service discovery exists to handle this. DNS exists to handle this. Health checks and circuit breakers exist to handle the case where a formerly reachable endpoint is no longer there.
What this means in practice: Use service discovery or DNS, not hardcoded addresses. Assume your topology will change and build graceful handling for the case where a previously reliable endpoint is no longer available.
Fallacy 6: There Is One Administrator
This one is about operational reality. In a system with a single administrator, you can coordinate changes. You can push a config change and know it’s applied everywhere. You can reason about the state of the system because there’s one authoritative view.
In a distributed system — especially one with multiple teams, multiple services, and multiple deployment pipelines — there are many administrators, and they don’t always coordinate.
Team A deploys a new version of Service X at 2pm. Team B is planning a database migration at 3pm. Nobody told Team A that the migration changes a column that Service X reads. Now there’s an incident at 3:05pm and two teams are in a Slack channel trying to figure out who broke what.
This is Conway’s Law meeting distributed systems. The system’s architecture will mirror the communication structure of the organization building it. That’s not a platitude — it’s a real constraint that affects how you design.
What this means in practice: API versioning. Schema migrations that maintain backwards compatibility. Contract testing between services. Runbooks that describe dependencies. Change management that crosses team boundaries.
Fallacy 7: Transport Cost Is Zero
Related to the bandwidth fallacy, but distinct. Sending a message across a network isn’t just about the bits. There’s CPU time to serialize and deserialize. There’s the overhead of establishing a connection. There’s the cost of encryption/decryption. There’s memory allocation for buffers. There’s the latency of the network stack itself.
All of this is cheap per message. None of it is zero. At scale, it adds up.
A system that makes a network call for every event processed is going to have very different performance characteristics than one that batches events and processes them together. This isn’t just about latency — it’s about total compute cost.
The classic example is n+1 queries: loading a list of objects and then making one database call per object to fetch a related entity. It works fine with 10 objects. With 10,000 objects, you’ve just made 10,000 individual database calls when one batched query would have sufficed.
What this means in practice: Batch where you can. Connection pooling is important. Profile the actual cost of your serialization layer — protobuf and JSON have meaningfully different costs. Consider the transport cost when deciding whether to call a service or bundle its functionality.
Fallacy 8: The Network Is Homogeneous
The network is not one thing. It’s composed of multiple segments with different characteristics: different physical media, different speeds, different reliability profiles, different MTU sizes, different routing rules.
The link between your application server and its database is not the same as the link between your datacenter and your CDN. The connection across an ocean is not the same as the connection within a datacenter. Your VPN connection home has different characteristics than the office network.
This matters for several reasons:
- MTU fragmentation: Packets too large for a segment get fragmented. Fragmented packets can get lost differently than whole packets. This causes bizarre, intermittent failures that are very hard to debug.
- Latency variance: Different paths have wildly different latency profiles. Traffic routing changes can suddenly change your p99 latency.
- Reliability variance: Some network segments are more reliable than others. Designing as if all segments are equally reliable leads to surprised faces when the undersea cable has issues.
What this means in practice: Test your system with realistic network conditions, not local loopback. Use tools that simulate latency, packet loss, and jitter. Don’t assume the network between services in the same datacenter is as reliable as local memory access — it isn’t.
The Meta-Fallacy
There’s an unspoken ninth fallacy underneath all of these: that you can reason about a distributed system from a single point of view.
When you write code in a single process, you’re god. You can read all the state, control all the execution, and see the whole picture. In a distributed system, every process has a limited, potentially stale view of the world. There is no omniscient observer. The “current state” of the system is not a single thing — it’s the union of the current states of all nodes, which may disagree.
This cognitive shift — from a coherent single view to a collection of partial views — is the hardest thing about distributed systems. The fallacies are symptoms of thinking with the wrong mental model. The right mental model is: assume partial information, assume partial failure, and design for correctness under both.
The fallacies tell you what to worry about. The next chapters tell you how to think about it. We start with the framework that captures the deepest trade-off: CAP.
CAP Theorem: Consistency, Availability, and the Lie You Have to Pick
In 2000, Eric Brewer gave a keynote at PODC (Principles of Distributed Computing) in which he conjectured that a distributed system can provide at most two of three properties: Consistency, Availability, and Partition Tolerance. Two years later, Gilbert and Lynch formalized it as a theorem.
For about a decade afterward, “CAP theorem” became a kind of magic incantation in systems design discussions. “We chose AP over CP” was said with the confidence of someone who knew exactly what they were talking about. Often, they didn’t.
CAP theorem is real, important, and widely misunderstood. Let’s fix that.
The Three Properties
Consistency (C)
In CAP, consistency means linearizability: every read sees the most recent write, or an error. If I write a value to the system and you immediately read that key, you get that value. No exceptions, no “maybe.”
This is a strong guarantee. It means the system behaves as if there’s a single up-to-date copy of the data, even if there are many replicas behind the scenes.
Important: this is not the same as the “C” in ACID (which is about constraints within a transaction). CAP consistency is specifically about the freshness and ordering guarantees for reads and writes across a distributed system. The naming collision is genuinely unfortunate and has confused more engineers than you’d think.
Availability (A)
Availability means every request receives a response — not a timeout, not an error, but an actual answer. The response might not be the most current value, but the system will respond.
Importantly, CAP availability doesn’t mean “99.9% uptime.” It’s a binary property in the formal definition: a system is either always available (every node always responds) or it isn’t.
Partition Tolerance (P)
A network partition is when the network between nodes drops messages. Nodes can’t tell whether the other nodes are down or whether the network between them has failed. The system is split into two (or more) groups that can’t communicate.
Partition tolerance means the system continues operating even when partitions occur.
Why You Can’t Have All Three
Here’s the problem. Suppose you have two nodes, Node A and Node B, that replicate data to each other. A network partition occurs — they can no longer communicate.
[PARTITION]
Node A -----X----- Node B
| |
Client 1 Client 2
Now a client writes to Node A. Node A can’t tell Node B about the write because of the partition.
Client 2 then reads from Node B.
You now have to make a choice:
Option 1: Respond with the (possibly stale) value on Node B
- Node B responds, so the system is available
- But the response might be wrong (pre-partition value), so the system is not consistent
- This is AP behavior
Option 2: Refuse to respond until the partition heals
- Node B won’t answer read requests it can’t verify
- The system is consistent (it never serves stale data)
- But the system is not available (it’s refusing requests)
- This is CP behavior
There is no Option 3 that provides both. If you’re partitioned, you have to pick.
The Part That’s Usually Glossed Over
Here’s what the simplified explanation misses: you can’t just “drop” partition tolerance.
A network partition — messages being lost between nodes — is not a hypothetical. It happens. In any real distributed system running over a real network, partitions occur. Hardware fails. Cables get pulled. Routers go wrong. Cloud providers have network incidents. You cannot build a distributed system that is immune to partitions.
This means in practice, the CAP theorem is not a choice between three options — it’s a choice between two options when partitions occur:
- CP: Refuse requests that you can’t serve consistently. Sacrifice availability.
- AP: Serve requests even if the answer might be stale. Sacrifice consistency.
When there’s no partition — the normal, happy-path case — you can have both consistency and availability. The trade-off only bites during partitions.
The practical framing, then, is: what does your system do when the network between your nodes goes down?
CP Systems: Consistent Under Partition
A CP system responds to a partition by refusing to answer queries it can’t guarantee are correct.
The classic examples are:
- Zookeeper: A distributed coordination service. During a partition, the minority partition (nodes that can’t reach the quorum) will refuse writes and may refuse reads. Better to be unavailable than to be wrong.
- HBase (and most strongly-consistent databases): Writes are refused if replication can’t complete. Reads may be refused if they can’t be guaranteed fresh.
- etcd: Used in Kubernetes for cluster state. Correctness of cluster configuration trumps availability.
The pattern here is: these are systems where being wrong is worse than being unavailable. If your cluster coordination service tells half of your workers to take on a job that the other half already took, you have a bigger problem than “service was down for a minute.”
In CP systems, during a partition:
Client ─── request ───> Node (minority partition)
Node: "I can't guarantee this is correct. Returning error."
Client: gets an error
The client has to handle errors. The system maintains consistency guarantees for clients that do reach a healthy partition.
AP Systems: Available Under Partition
An AP system responds to a partition by serving requests from whatever data it has, even if that data might be stale.
The classic examples:
- DynamoDB (in its default configuration): Prioritizes availability. You’ll get a response, but it might not reflect the most recent write.
- Cassandra: AP by default. Writes go to multiple nodes, but depending on consistency level, you might read from a node that hasn’t received the latest write yet.
- CouchDB: Allows multiple writers to diverge during a partition and resolves conflicts later.
The pattern: these are systems where being unavailable is worse than being potentially stale. A shopping cart that occasionally shows one fewer item than you added is annoying. A shopping cart that returns a 503 for 10 minutes during a network blip is a business problem.
In AP systems, during a partition:
Client ─── request ───> Node (minority partition)
Node: "Here's what I know, which might be stale."
Client: gets a (possibly stale) response
The client gets an answer. Whether that answer reflects the latest reality is not guaranteed.
The Part That Actually Matters
The formal CAP theorem — with its binary properties — is useful as a mental model but not as a practical design tool. Real systems operate on a spectrum. Here’s why:
Consistency is a spectrum
CAP’s “C” (linearizability) is the strongest consistency model. There are weaker ones: sequential consistency, causal consistency, eventual consistency. A system might not be linearizable but still be strong enough for your use case. (Chapter 4 covers this in detail.)
Availability is a spectrum
“The system responds” can mean different things. Is 99% uptime “available”? 99.9%? The formal definition is unhelpful here. What matters is: how often does your system return errors or timeouts to clients under normal operation, under degraded operation, and under partition?
Partitions are rare but not exceptional
You’re going to spend 99.9% of your time in the happy path where there’s no partition. The CP vs AP choice mostly affects rare failure scenarios. What are the actual consequences of your choice in those scenarios? Design for that.
A More Useful Framework: PACELC
In 2012, Daniel Abadi proposed PACELC as an extension of CAP that captures the tradeoff more completely.
PACELC says:
- During a Partition, choose between Availability and Consistency (CAP’s core insight)
- Else (during normal operation), choose between Latency and Consistency
The second part is the addition. Even when there’s no partition, there’s a trade-off: to guarantee consistency across multiple nodes, you need to coordinate those nodes (which takes time and adds latency). Or you can skip the coordination (lower latency) and accept that reads might not reflect the latest write.
This is the trade-off that actually dominates day-to-day system behavior:
| System | Partition | Normal |
|---|---|---|
| DynamoDB | PA | EL (with eventual consistency) |
| Zookeeper | PC | EC |
| Cassandra | PA | EL |
| PostgreSQL (single node) | N/A | EC |
| CockroachDB | PC | EC |
The last two rows illustrate something: single-node PostgreSQL sidesteps CAP entirely (there’s no distributed state to partition), but once you add replicas, you’re back in the trade-off.
How to Actually Use CAP in System Design
Here’s the practical question you should ask:
What happens in my system when two of my nodes can’t talk to each other?
If your answer is “one of them stops accepting writes” — that’s CP behavior. You’ve decided consistency matters more than availability during partitions.
If your answer is “they both keep accepting writes and sync up later” — that’s AP behavior. You’ve decided availability matters more. You now need to think about conflict resolution when the partition heals.
What happens to your users in each case?
CP failure: Some users get errors or time out. Clear, if unpleasant.
AP failure: Some users might see stale data or have their writes conflict with others. Subtle, and potentially confusing.
What does your business logic require?
If you’re storing inventory counts and you can’t afford to oversell: lean CP. Errors are preferable to selling something you don’t have.
If you’re storing user sessions or view counts or shopping carts: lean AP. A stale session is recoverable. An unavailable login is a lost user.
A Note on “Eventual Consistency”
You’ll often hear “we use eventual consistency” as a response to CAP, as if it neatly settles the question. It doesn’t.
“Eventual consistency” means: if no new writes happen, all replicas will eventually converge to the same value. This is a liveness property — it tells you things will get consistent eventually but says nothing about when, or about what “consistent” means in the interim.
What it means in practice:
- How eventual is “eventual”? Milliseconds? Seconds? Minutes? Under what conditions?
- What do reads return during the eventually-consistent window? The old value? A random one of several competing values?
- How are write conflicts resolved? Last-write-wins (by timestamp)? Application-level merge? Manual intervention?
“We use eventual consistency” is a starting point for a design conversation, not an answer to one.
The Real Lesson
CAP theorem tells you that perfection is not available in a distributed system under partition. You cannot be simultaneously consistent and available when the network breaks.
But the more important lesson is: you need to understand your failure modes and design for them. Most of the time your network isn’t partitioned. But when it is — or when a single node fails, or when a database is slow, or when a third-party service is down — what does your system do? Who sees what? What is the blast radius?
Answering those questions requires understanding consistency models deeply. Which is why that’s the next chapter.
Consistency Models: From Strong to Eventual and Everything Uncomfortable In Between
The CAP theorem gave us a binary: consistent or available under partition. But “consistent” is doing a lot of work in that sentence. Consistency isn’t a single property — it’s a spectrum of guarantees, ranging from “reads always see the latest write” down to “reads will eventually agree with other reads, probably, at some point.”
Where you sit on that spectrum determines what kind of bugs your users encounter and what kind of performance trade-offs you’re making.
This chapter is a map of that spectrum. We’ll go from the strongest guarantees to the weakest, with concrete examples of what breaks at each level.
Why Consistency Models Exist
The need for consistency models comes from replication. If you have a single copy of your data and one process accessing it, there’s no consistency question — you read what’s there. The problem starts the moment you have two copies (replicas) and want them to stay in sync.
Replication exists for two reasons:
- Availability: If one replica goes down, others can still serve requests.
- Performance: Reads can be distributed across replicas, reducing load on any single node.
Both of these are good reasons. The cost is that replicas can diverge — one might have seen a write that the other hasn’t yet. When that happens, what do you guarantee to clients?
That’s what a consistency model specifies.
The Consistency Hierarchy
Here’s the landscape, from strongest to weakest:
Stronger guarantees (more correct, more coordination required)
│
├─ Linearizability (strong consistency)
│
├─ Sequential consistency
│
├─ Causal consistency
│
├─ Read-your-writes (session consistency)
│
├─ Monotonic reads
│
└─ Eventual consistency
Weaker guarantees (more performant, less coordination required)
These aren’t arbitrary categories — each one adds a specific guarantee on top of the ones below it, and removing that guarantee is what enables a specific performance optimization.
Linearizability: The Gold Standard
Linearizability is the strongest consistency model for a distributed system. It provides the following guarantee:
Every operation appears to take effect atomically at some point between its invocation and its response, and operations are ordered consistently with real time.
In plain English: the system behaves as if there’s exactly one copy of the data, and every operation takes effect at a single moment in time. If operation A completes before operation B begins, then any read that sees B’s write must also see A’s write.
Time ─────────────────────────────────────────────────>
Client 1: ──[write X=1]──────────────────────────────
Client 2: ──[read X]── gets 1 ✓
Client 3: ──[read X]── gets ???
Client 3's read overlaps with Client 1's write.
In a linearizable system, it gets either 0 or 1,
and the choice must be consistent with other reads.
Linearizability is what you get with a single-leader database operating synchronously. It’s also what systems like Zookeeper and etcd provide, and what Google’s Spanner provides globally (using atomic clocks).
The cost: to provide linearizability, the system must coordinate before responding. In practice, this means: a write must be acknowledged by a quorum of nodes before it’s confirmed; a read must contact the node with the authoritative latest value. This coordination takes time and fails when nodes can’t communicate.
When you need it: distributed locking, leader election, any case where “multiple clients race for something and only one should win” must work correctly.
Sequential Consistency
Sequential consistency relaxes one part of linearizability: operations don’t need to take effect in real-time order, but they must appear in some order that is consistent across all nodes.
Specifically: every node sees operations in the same order, and for each client, their operations appear in the order the client issued them.
What’s lost compared to linearizability:
Time ─────────────────────────────────────────────────>
Client 1: ──[write X=1]──────────────────────────────
Client 2: ──[read X]── gets 0 (!) ✓ for SC
This is allowed in sequential consistency.
The system might see Client 2's read as if it happened
before Client 1's write, even though Client 2 started
reading after Client 1 finished writing.
This seems wrong, but it’s allowed because sequential consistency doesn’t require operations to respect real-world time — only that the order is consistent for all clients. Client 2 seeing 0 is fine as long as every client sees that read as having happened “before” Client 1’s write.
Sequential consistency is rarely used in practice because: (a) the relaxation from linearizability is subtle enough that most engineers don’t notice it, and (b) the real benefit comes from weaker models that allow much more concurrency.
Causal Consistency
Causal consistency tracks causality between operations and preserves it. If event A causally precedes event B (A happened-before B, or A produced a value that B reads), then every process sees A before B.
Operations that are not causally related can appear in any order on different nodes.
Client 1: ──[write post="Hello"]──────────────────────────────
Client 2: ──[write reply="World"(to Hello)]──
Client 3: ──[read]──
Client 3 reads post="Hello" and reply="World".
In causal consistency, if Client 3 sees the reply,
it is guaranteed to also see the post it's replying to.
But the reply might not be visible yet on Node B
while it's visible on Node A. That's fine —
causal consistency doesn't require everyone to see
things immediately, just in the right order.
Causal consistency is powerful and increasingly popular. MongoDB’s causally consistent sessions use it. It’s the model that allows geo-distributed systems to avoid coordination for most operations while still preserving the ordering that applications care about.
The insight: most applications don’t actually need full linearizability. They need causal ordering — if you see the effect of an action, you should also see its cause. A comments system needs to show comments in the right thread. It doesn’t need every comment to be globally ordered with every other event in the system.
Session Guarantees
Below causal consistency, we have a set of weaker guarantees often called “session guarantees.” These are per-client guarantees rather than global ones.
Read-Your-Writes (RYW)
If a client writes a value, it will see that value (or a later one) on subsequent reads.
This sounds obvious, but it breaks in common architectures. If you write to a primary database and then read from a replica that hasn’t caught up yet, you might not see your own write. This is a concrete, common production bug.
Client writes to Primary: user.email = "new@email.com"
Client reads from Replica (lagging 200ms): gets "old@email.com"
Client: "why didn't my change save??"
RYW prevents this. The system might route reads to the primary after a write, or use sticky sessions to route to the same replica, or use tokens to ensure the replica has caught up before serving the read.
Monotonic Reads
Once you’ve seen a value, you’ll never see an older one. If you read X=5, a subsequent read will return 5 or greater — never 4.
Client reads from Replica A: gets version 42
Client reads from Replica B: should get ≥ version 42
Without monotonic reads: might get version 38
Without monotonic reads, a client can see what looks like time going backwards — data that was there and then isn’t. This is extremely confusing.
Monotonic Writes
Writes from a single client are executed in order. If Client 1 does Write A and then Write B, Write A must be applied before Write B everywhere.
Writes Follow Reads
If a client reads X=5 and then writes Y (based on seeing that X=5), then the write of Y should be sequenced after the write that set X=5. This is the write side of causality.
Eventual Consistency
Eventual consistency is the weakest useful guarantee: if no new updates are made to an object, eventually all reads will return the same last-written value.
That’s it. No ordering guarantees. No freshness guarantees. Just: we’ll get there eventually.
What this means concretely:
- Two clients can read the same key at the same time and get different values
- A client can read a key, write to it, and then read a stale value
- Different nodes can have different current values for the same key
- None of this is a bug — it’s the design
The system will resolve these discrepancies. The question is how, and when.
Conflict resolution under eventual consistency
When two nodes accept conflicting writes (during a partition, for example), they need a rule for which write wins when they sync up.
Last-Write-Wins (LWW): Whichever write has the later timestamp wins. The problem: timestamps in distributed systems are not reliable (see the clocks chapter). A write might arrive with an earlier timestamp even if it happened later. You can silently lose writes.
Multi-Value / “Siblings”: Some systems (like Riak and CouchDB) store all conflicting versions and surface them to the client for resolution. This is correct but puts the burden on the application.
Conflict-free Replicated Data Types (CRDTs): Specially-designed data structures where all possible orderings of operations produce the same result. A CRDT counter that supports increment/decrement can be merged from any two states. This sounds magical and has real limits, but for the right use cases (counters, sets, maps with specific semantics) it works well.
The Production Problem: Stale Reads
Here’s the scenario that will make you care about all of this.
You’re building a system where users can disable their account. The admin panel writes account.active = false to the primary database. The API that checks whether a user can log in reads from a read replica. The replica is 500ms behind.
For the next 500ms, users of the disabled account can still log in.
This might be acceptable (it’s a narrow window). It might be catastrophic (abuse case, court order, account compromise). The right answer depends on the application. But you need to know you’ve made this trade-off.
Primary DB: active=false (written at T=0)
Replica: active=true (not yet replicated, T=0 to T+500ms)
Auth service reads replica: user is active ✓ (but shouldn't be)
The fix depends on what guarantee you need:
- If you need linearizability: Read from the primary always (or use a system that provides synchronous replication).
- If RYW is enough: Route the disable request’s originator to the primary, but other clients can read from replicas.
- If you can tolerate a bounded window: Accept the 500ms lag, document it, and make sure it’s not a security boundary.
Choosing a Consistency Model
Here’s a practical decision matrix:
| Use Case | Minimum Required Guarantee |
|---|---|
| Distributed lock / leader election | Linearizability |
| Financial transactions, inventory counts | Linearizability or strong serializable |
| “Can this user log in?” with security implications | Linearizability or synchronous replication |
| Social media feed ordering | Causal consistency |
| User sees their own profile edits | Read-your-writes |
| Read-heavy analytics | Eventual consistency |
| View counters, likes, non-critical metrics | Eventual consistency |
| User session / shopping cart | Read-your-writes + monotonic reads |
The right-hand column is the minimum. You can always use a stronger guarantee than required — the cost is coordination overhead and reduced availability under partition.
The Uncomfortable Truth About Consistency
Most applications use relational databases with ACID transactions and assume they get linearizability. For writes within a single database, that’s often true. But:
- Reads from replicas are not linearizable by default in most databases
- Reads from caches are not linearizable
- Cross-service operations are not linearizable even if each service individually is
- Any operation involving a clock (“last updated at”) is not linearizable without synchronized clocks
You’re already trading away consistency in ways you might not have explicitly decided. The question is: are those trade-offs appropriate for your use case?
After this chapter, you should be able to answer that. Or at least ask the right questions.
Next: how do you actually keep multiple copies of data in sync? That’s replication — and it turns out there are more ways to do it badly than there are to do it well.
Replication Strategies: Leaders, Followers, and Who Gets to Be Right
Replication is how distributed systems achieve two things simultaneously: fault tolerance (if one copy of the data disappears, others remain) and scalability (many nodes can serve reads). The catch is that keeping multiple copies of data consistent is, in practice, one of the hardest problems in distributed systems.
This chapter covers how replication actually works, what the trade-offs are, and what breaks when it goes wrong.
Why Replication Is Hard
Imagine you have a single database. You can read and write to it. It’s either up or down. Its data is whatever you last wrote. This is simple.
Now add a second database that mirrors the first. You need:
- Writes to the first to propagate to the second
- Reads to return consistent data regardless of which copy you hit
- A decision about what to do when the two copies disagree
- A plan for when the network between them fails
- A way to handle the primary going down
Each of these is a non-trivial problem. The interactions between them are the hard part.
Single-Leader Replication
The most common approach. One node is designated the leader (also called primary or master). All writes go to the leader. The leader propagates changes to followers (replicas, standbys, secondaries).
Clients
/ \
Writes Reads
| | \
Leader Follower1 Follower2
| | |
└──── replication ──────┘
Writes always go to the leader. Reads can go to the leader (for strong consistency) or to followers (for scalability, with potential staleness).
Synchronous vs Asynchronous Replication
This is where the first major trade-off appears.
Synchronous replication: The leader waits for at least one follower to confirm it received the write before acknowledging success to the client.
Client ──[write]──> Leader
Leader ──[replicate]──> Follower
Follower ──[ack]──> Leader
Leader ──[success]──> Client
Advantages: If the leader crashes immediately after acknowledging a write, a follower has the data. You don’t lose confirmed writes.
Disadvantages: A write is only as fast as your slowest synchronous follower. If that follower is slow or unreachable, writes stall.
Asynchronous replication: The leader acknowledges the write immediately, replicates to followers in the background.
Client ──[write]──> Leader
Leader ──[success]──> Client
Leader ──[replicate, eventually]──> Follower
Advantages: Writes are fast. Network issues between leader and followers don’t block writes.
Disadvantages: If the leader crashes before replication completes, those writes are gone. Followers may lag, serving stale data.
Most databases use asynchronous replication by default because synchronous replication is too expensive for most use cases. PostgreSQL’s synchronous replication, MySQL with rpl_semi_sync, and similar features exist specifically for use cases where you can’t afford to lose any confirmed write.
Replication Lag
With asynchronous replication, followers will always be behind the leader by some amount — the replication lag. Under normal conditions, this is milliseconds. Under high write load or a slow network, it can be seconds or minutes.
During that lag window, followers are serving stale data. This is often acceptable. It becomes a problem when:
- You write to the leader and immediately read from a follower (read-your-writes violation)
- A follower is used for “current” data in a security check
- The lag is measured in minutes and your users notice
Monitoring replication lag is not optional. It’s a key operational metric. Unexpected lag spikes are early warning signs of follower trouble.
Failover: When the Leader Dies
Single-leader replication’s critical operational question is: what happens when the leader fails?
The sequence for a typical automated failover:
- Leader stops responding (crash, network failure, overload)
- Follower(s) detect the leader is gone (via heartbeats, timeouts)
- A new leader is elected (usually the most up-to-date follower)
- Remaining followers are reconfigured to replicate from the new leader
- Clients are redirected to the new leader
This sounds clean. In practice:
Problem 1: What if the old leader wasn’t really dead?
Maybe it was just slow. Now you have two nodes that both think they’re the leader. This is called split-brain, and it can cause writes to diverge on two separate leaders simultaneously. Systems handle this with “fencing” — ensuring the old leader can’t accept writes even if it comes back (by using lease-based locks, STONITH, or other mechanisms).
Problem 2: The new leader might not have all the data.
If replication was asynchronous, the new leader (the most up-to-date follower) might be missing some writes that the old leader confirmed to clients. Those writes are gone. The clients that received “success” now have data that never existed. What do you do about the clients that received a success and cached the response?
Problem 3: The old leader comes back.
It comes back with writes that the new leader doesn’t have. These need to be discarded or reconciled. Most systems discard them. This is another way confirmed writes can disappear.
Problem 4: What’s the right timeout?
Too short: you do unnecessary failovers when the leader is just temporarily slow (false positive). Too long: you’re down for longer than necessary when the leader really is dead. There’s no right answer; it depends on your workload.
Multi-Leader Replication
What if every node can accept writes? Now you have multi-leader (or multi-master) replication.
The use case: you want to accept writes in multiple datacenters, each datacenter has its own leader, and they replicate to each other. Writes from users in Europe go to the European datacenter immediately; they replicate to the US datacenter asynchronously.
US Datacenter EU Datacenter
| |
US Leader <─── sync ──> EU Leader
| |
US Followers EU Followers
Advantage: writes are fast locally. You’re not waiting for cross-Atlantic replication before acknowledging to the user.
The problem: write conflicts.
If a user edits a document from two browser tabs at the same time — one tab going to the US datacenter, one to the EU — both writes succeed locally. Now you have two different versions of the document and no canonical truth.
Conflict Resolution Strategies
Last-write-wins (LWW): Whichever write has the later timestamp wins. The earlier write is silently discarded. This is simple and lossy. Real data disappears without any indication.
Merge: Keep all values and present them to the application for resolution. Git does this — a merge conflict. It’s correct but requires application logic.
CRDT: Use data types specifically designed so that merges are always well-defined. Increment-only counters are the simplest example. A counter that’s been incremented by 3 on node A and 5 on node B gives 8 when merged, regardless of order.
Custom resolution: The application receives the conflicting values and decides. Works for specific cases (a collaborative text editor can do operational transform). Hard to get right in general.
Multi-leader replication is generally avoided unless there’s a strong geographic latency requirement driving it. Offline-first applications (mobile apps that sync when they reconnect) are effectively multi-leader systems. CouchDB and similar databases are designed specifically for this model.
Leaderless Replication
What if there’s no designated leader at all? Any node can accept any write. Reads and writes go to multiple nodes simultaneously, and the client (or a coordinator) decides what to believe.
This is the model used by Dynamo-style databases: Cassandra, Riak, and others.
Quorums
With N replicas, a write is sent to W nodes, and a read is sent to R nodes. For the system to be consistent, you need:
W + R > N
If this holds, the read set and the write set must overlap — at least one node that received the write must be included in the read. That node has the latest value.
Common configuration: N=3, W=2, R=2.
Write to 2 of 3: Read from 2 of 3:
┌───────────────┐ ┌───────────────┐
W -> │ Node A ✓ │ R -> │ Node A ✓ │
W -> │ Node B ✓ │ R -> │ Node B ✓ │
│ Node C (skip) │ │ Node C (skip) │
└───────────────┘ └───────────────┘
At least one node in the read set was in the write set.
The reader sees the latest value.
If W=N (synchronous write to all nodes), you can read from any single node and always get the latest value — but writes are slow and unavailable if any node is down.
If R=1 (read from any single node), reads are fast but might be stale unless W=N.
Sloppy Quorums and Hinted Handoff
During a partition, the quorum nodes might not be reachable. What do you do?
Strict quorum: Refuse the write. Maintain consistency, lose availability.
Sloppy quorum: Accept the write at a different node (not in the normal N replicas), record that it’s holding data meant for the real replicas, and hand it off when the partition heals. This maintains availability at the cost of consistency guarantees.
Cassandra uses sloppy quorums by default. This is why Cassandra is AP — it will accept writes even when the normal replicas can’t be reached.
Concurrent Writes and Version Vectors
With leaderless replication, two clients can write to the same key simultaneously on different nodes. Unlike multi-leader where there’s at least a per-datacenter ordering, here there’s no ordering at all.
Databases handle this with version vectors (sometimes called vector clocks in this context): each key carries metadata tracking which nodes have seen which writes. When two versions are concurrent (neither happened-before the other), the database keeps both and requires the application to merge.
This is correct but places burden on the application. Dynamo’s original design required the application to merge conflicts. DynamoDB simplified this by using Last-Write-Wins (at the cost of occasional data loss). CRDTs solve it for specific data types.
Replication in Practice: What Actually Goes Wrong
The “read your own writes” bug
You write to the leader, then immediately redirect to a page that reads from a follower. The follower hasn’t replicated yet. The data you just wrote isn’t there. The user reports a bug. You can’t reproduce it in testing because testing has no replication lag.
Fix: after a write, route subsequent reads to the leader for a short window, or use sticky sessions, or check timestamps to ensure the replica has caught up.
The “disappearing write” after failover
User submits a form. Write goes to the leader. Leader confirms. Leader dies. New leader is elected but didn’t have that write (it was async). User reloads page. Their submission is gone.
Fix: synchronous replication for critical writes, or wait for acknowledgment from N nodes before confirming to the user.
The replication lag spike causing incorrect data
Your reporting dashboard reads from a replica. Replication lag spikes to 10 minutes during a high-write period. Your dashboard is showing 10-minute-old data. You make a business decision based on it.
Fix: monitor replication lag. Have alerts. Know your lag when it matters.
The split-brain write conflict
Network partition causes two nodes to both believe they’re the leader. Both accept writes for the same key. Partition heals. Both values exist. One gets silently discarded. Data is lost with no error, no log entry, no indication that anything went wrong.
Fix: proper fencing, or use a consensus-based system for leader election. Which brings us to the next chapter.
Choosing a Replication Strategy
| Requirement | Strategy |
|---|---|
| Simple, strong consistency | Single-leader, synchronous to at least one follower |
| High write throughput, tolerate some lag | Single-leader, asynchronous |
| Low-latency writes across geographic regions | Multi-leader |
| High availability, tolerate eventual consistency | Leaderless (Dynamo-style) |
| Offline-first / sync-on-connect | Multi-leader / CRDTs |
The right choice depends on what you can afford to lose. Single-leader async is the default for most applications because it’s simple and fast and the failure modes are well-understood. Move to synchronous or multi-datacenter replication when the cost of data loss exceeds the cost of coordination.
Whatever you choose: monitor lag, test failover, and know what happens to in-flight writes when a leader dies. Because it will.
Consensus: How Nodes Agree on Anything at All
At the heart of many distributed systems problems is a deceptively simple question: how do multiple nodes agree on a single value?
This is the consensus problem. It appears whenever you need to:
- Elect a single leader from multiple candidates
- Decide which write wins when two nodes have conflicting values
- Commit a transaction that spans multiple nodes
- Append to a distributed log in a consistent order
Consensus is what makes distributed systems correct rather than merely fast. It’s also what makes them slow, complicated, and expensive. Understanding consensus is understanding the boundary between what’s possible in a distributed system and what isn’t.
Why Consensus Is Hard
Consider two nodes that need to agree on a value. The naive approach:
- Node A proposes a value
- Node B accepts the value
- Both nodes now agree
What if the message from A to B gets lost? B never accepts. Now what? A doesn’t know whether B received the proposal or not. A can resend, but what if the original message arrived after all, and B accepted it, and then B accepted the resent message too?
What if A crashes after sending the proposal but before receiving B’s acceptance?
What if the network delivers A’s message, B sends an acceptance, the acceptance gets lost, and then B crashes? A never hears from B. Does A proceed alone? If it does, and B comes back, they might disagree.
These scenarios escalate quickly. The impossibility result at the heart of this problem — called FLP (Fischer, Lynch, and Paterson, 1985) — says that in an asynchronous network where even one process can crash, you cannot guarantee that a consensus algorithm terminates.
This isn’t a proof that consensus is impossible in practice. It’s a proof that no algorithm can guarantee progress under all conditions. Real systems get around this by making reasonable assumptions about timing and failure rates.
The Consensus Requirements
A correct consensus algorithm must satisfy:
Agreement: All correct nodes decide on the same value.
Validity: The decided value must have been proposed by some node (you can’t just make up a value).
Termination: Every correct node eventually decides on a value (the algorithm doesn’t run forever).
The FLP result says you can’t guarantee all three in an asynchronous system with failures. In practice, this means algorithms relax the termination guarantee slightly: they guarantee termination unless the system is in a particularly bad state (lots of simultaneous failures, poor timing).
Paxos: The Algorithm That Launched a Thousand Papers
Paxos was described by Leslie Lamport in 1989 (published 1998 after significant delay). It was the first practical consensus algorithm. It’s also famously difficult to understand, implement correctly, and reason about.
Single-decree Paxos (consensus on one value) has two phases:
Phase 1: Prepare
A proposer selects a proposal number N (larger than any it has used before) and sends a Prepare(N) to a majority of acceptors.
Each acceptor:
- If N is larger than any prepare it has seen, responds with
Promise(N)and the highest-numbered proposal it has already accepted (if any) - Otherwise, ignores the prepare
Proposer ──Prepare(N=5)──> Acceptor 1
Proposer ──Prepare(N=5)──> Acceptor 2 ← quorum (3 of 5 acceptors)
Proposer ──Prepare(N=5)──> Acceptor 3
Acceptor 1 ──Promise(N=5, accepted=none)──> Proposer
Acceptor 2 ──Promise(N=5, accepted=none)──> Proposer
Acceptor 3 ──Promise(N=5, accepted=none)──> Proposer
Phase 2: Accept
If the proposer gets promises from a majority of acceptors, it sends Accept(N, value) to those acceptors. The value is:
- The value from the highest-numbered accepted proposal any acceptor reported, or
- The proposer’s own value if no acceptor has accepted anything
Each acceptor:
- Accepts the proposal if it hasn’t promised to ignore proposals with number ≥ N
- Notifies learners of the accepted value
When a majority of acceptors have accepted the same proposal number with the same value, consensus is reached.
Why this is subtle
The proposal number juggling is not bureaucracy — it’s safety. If a proposer fails midway through and a new proposer takes over, the promise mechanism ensures the new proposer can figure out whether any value was already accepted (by a previous majority) and carries it forward rather than overwriting it.
Multi-Paxos extends this for sequences of values (a log), which is what you actually need in practice. And this is where Paxos gets complicated quickly.
The honest assessment: Paxos is correct but implementing it in a production system is extraordinarily hard. Lamport himself said that every published variant he’d seen contained subtle bugs. Most production systems use Raft instead.
Raft: Paxos for the Rest of Us
Raft was designed explicitly to be understandable. The paper is titled “In Search of an Understandable Consensus Algorithm.” It achieves roughly the same guarantees as Multi-Paxos but through a cleaner decomposition.
Raft decomposes consensus into three sub-problems:
- Leader election: Which node is the leader?
- Log replication: The leader accepts entries and replicates them to followers
- Safety: Guarantees that the right entries are in the right place
Leader Election
Time in Raft is divided into terms — monotonically increasing integers. Each term begins with an election.
Nodes start as followers. If a follower doesn’t hear from a leader within an election timeout (randomized to prevent ties), it becomes a candidate and starts an election.
Term 1 Term 2
─────────────────────────────────────────
[ Leader ] [leader fails]
[ Follower ] [ Leader ]
[ Follower ] [ Follower ]
^ election happens here
A candidate sends RequestVote to all nodes. A node grants its vote if:
- It hasn’t voted in this term yet
- The candidate’s log is at least as up-to-date as the voter’s log
If a candidate gets votes from a majority, it becomes the new leader and starts sending heartbeats. If the election times out (split vote), start again with a new term.
The randomized timeout is key: if all nodes started elections simultaneously, you’d always get a split vote. Randomization means one node usually starts first and wins before others wake up.
Log Replication
The leader accepts writes, appends them to its log, and sends AppendEntries RPCs to followers to replicate the log.
Leader Log: [1: set x=1] [2: set y=2] [3: set x=3]
| | |
replicate replicate replicate
| | |
Follower Log: [1: set x=1] [2: set y=2] [3: set x=3]
A log entry is committed once the leader has replicated it to a majority of nodes. Once committed, it will persist regardless of future leader changes.
The leader tracks the commitIndex — the highest log entry known to be committed — and piggybacks this in AppendEntries. Followers apply committed entries to their state machines.
Safety: Why Log Entries Don’t Disappear
Raft’s critical safety property: if a log entry is committed at one term, no future leader will overwrite it.
This is guaranteed by the voting rule: a candidate can only win an election if it has a log at least as up-to-date as a majority of nodes. Since committed entries exist on a majority, any winner must have them.
This means you never need to “fix” a follower’s log by rolling back committed entries — you only need to find the point where the follower diverges from the leader and replicate forward from there.
Why Raft Became the Default
Raft is used in:
- etcd: The key-value store at the heart of Kubernetes
- CockroachDB: Distributed SQL
- TiDB: Distributed database
- Consul: Service mesh and configuration
- Many others
It’s not perfect (there are edge cases in Raft’s log compaction and joint consensus phases), but it’s understandable enough that implementations can be reasoned about and audited. The Raft paper includes a user study showing that students understood Raft significantly better than Paxos.
Viewstamped Replication
An older algorithm (1988, predating Paxos) that’s equivalent in power but less well-known. VR is arguably cleaner than Paxos and predates it. It influenced much of the later work on consensus. Worth knowing as context, though Raft is the practical choice today.
Byzantine Fault Tolerant Consensus
Everything above assumes crash-stop failures: nodes either work correctly or stop responding. They don’t lie, fabricate messages, or collude.
Byzantine faults are the harder case: nodes can behave arbitrarily — sending conflicting messages to different nodes, delaying messages strategically, pretending to accept and then reneging.
Byzantine fault tolerance (BFT) requires at least 3f+1 nodes to tolerate f Byzantine failures. With 4 nodes, you can tolerate 1 Byzantine node. With 7 nodes, you can tolerate 2.
The most well-known BFT algorithm is PBFT (Practical Byzantine Fault Tolerance, Castro and Liskov, 1999). It works but is expensive — O(n²) message complexity — and doesn’t scale beyond tens of nodes.
BFT consensus is used in:
- Blockchain systems: Nodes are adversarial by assumption (different economic interests)
- Safety-critical systems: Aviation, nuclear, where a single faulty sensor can’t cause a disaster
- Systems crossing trust boundaries: Multiple organizations running nodes with different incentives
For most application databases, Byzantine faults are not in scope. Your database replicas are operated by you — you trust them to be correct even if they crash. BFT matters when nodes are operated by parties that might act against your interests.
Consensus in Practice: What This Means for You
“Don’t implement consensus yourself”
This is the advice. Consensus algorithms have subtle failure modes that appear only in specific timing conditions. Even the best implementers have gotten them wrong. Use a battle-tested implementation: etcd, Zookeeper, Consul.
Leader election via external coordination
The most common pattern: use an external consensus service (Zookeeper, etcd, Consul) to perform leader election. Your application nodes try to acquire a lease. The lease holder is the leader. When the lease expires or the holder crashes, another node acquires it.
# Pseudo-code for leader election via etcd
lease = etcd.grant_lease(ttl=10) # 10-second lease
result = etcd.put_if_absent("/leader", my_node_id, lease=lease)
if result.succeeded:
# We are the leader. Keep the lease alive.
while is_leader:
do_leader_work()
etcd.keepalive(lease) # Reset TTL before expiry
else:
# Someone else is leader. Watch for changes.
etcd.watch("/leader", callback=on_leader_change)
Distributed transactions via Two-Phase Commit (2PC)
2PC is not a consensus algorithm but it uses similar principles to make a distributed transaction atomic.
Phase 1 (Prepare): Coordinator asks all participants: “Are you ready to commit?”
Phase 2 (Commit or Abort): If all respond “yes,” coordinator sends “commit.” If any respond “no” or time out, coordinator sends “abort.”
Coordinator ──PREPARE──> Participant A
Coordinator ──PREPARE──> Participant B
Participant A ──READY──> Coordinator
Participant B ──READY──> Coordinator
Coordinator ──COMMIT──> Participant A
Coordinator ──COMMIT──> Participant B
The critical failure mode: if the coordinator crashes after sending PREPARE but before sending COMMIT, participants are stuck. They’ve locked resources and voted “yes” but don’t know the final decision. This is the 2PC blocking problem.
Three-phase commit (3PC) attempts to solve this, but introduces other failure modes and isn’t widely used in practice. Modern distributed databases (CockroachDB, Spanner) use Paxos/Raft for the commit decision instead of 2PC to avoid the blocking problem.
The performance cost of consensus
Consensus requires at least one round-trip to a quorum. In a 3-node cluster with nodes in the same datacenter, this adds maybe 1-5ms per committed operation. In a geo-distributed cluster with nodes on different continents, a quorum round-trip is the speed of light plus processing — 100-300ms.
This is the fundamental tension between consistency and performance in distributed systems. Linearizable operations require consensus. Consensus requires network round-trips. Network round-trips take time. If you want consistency in a geo-distributed system, you’re paying in milliseconds. Spanner’s atomic clock approach reduces the coordination cost but doesn’t eliminate it.
The Aha Moment
Here’s the thing that clicks when you really understand consensus: a single-leader database with synchronous replication to at least one follower is already doing consensus, informally. The leader is the proposer; the synchronous follower is the required quorum member; the write isn’t committed until both have it.
The difference between “informal single-leader” consensus and Raft/Paxos consensus is what happens when the leader fails. Informal: you run a script or Pacemaker or some ops procedure to elect a new leader. Formal consensus algorithm: the nodes elect a new leader automatically, safely, without external intervention.
As your system grows and manual operations become impractical, the formal approach becomes necessary. That’s why etcd is at the center of Kubernetes — Kubernetes can’t afford to wait for an operator to manually promote a new leader when the current one dies.
The next chapter tackles something consensus relies on but we’ve been hand-waving: time. Specifically, the fact that distributed nodes don’t share a clock, and the many interesting ways this causes problems.
Time Is a Flat Circle: Clocks, Ordering, and Logical Time
Here is a thing that should disturb you: no two computers in a distributed system share the same clock.
They have their own clocks, which are continuously drifting relative to each other and relative to real time. They synchronize against NTP servers periodically, but synchronization is imperfect, and between synchronizations they drift. By milliseconds, usually. Sometimes by more.
This means “the timestamp on this event” is not a fact about when the event happened — it’s a fact about what the clock on that particular machine said at that particular moment. And those clocks disagree.
This is a problem because ordering matters. “Last write wins” only works if you can reliably say which write was last. “This event happened before that event” is only meaningful if you can actually determine the order.
In a single-process system, ordering is free — the program counter gives you a total order of all operations. In a distributed system, ordering is something you have to build.
The Clock Problem
Clock drift
Quartz oscillators — the hardware basis of computer clocks — are accurate to roughly 1 part per million. This sounds precise. It means a clock drifts about 86 milliseconds per day. Over a week without resynchronization, two clocks on separate machines could diverge by over half a second.
NTP (Network Time Protocol) corrects this by synchronizing clocks against a hierarchy of reference servers. NTP can get clocks to within a few milliseconds on a local network, and within tens of milliseconds over the internet.
A few milliseconds is not zero. And NTP can fail, be slow, or be skewed.
The leap second problem
Once or twice a year, a leap second is added to UTC to account for the Earth’s variable rotation. Every computer that knows about this gets a correction. Some computers don’t. Some handle it by “smearing” the second across hours (Google’s solution). Some insert it literally (causing 23:59:60 to appear in logs). Some crash because they assumed time is monotonically increasing.
The 2012 leap second caused massive failures across the internet, including Reddit, LinkedIn, and various Java applications. The root cause: code that assumed the wall clock never goes backwards.
NTP clock jumps
When NTP corrects a clock that’s drifted forward, it can set the clock backwards. Code that takes a timestamp at the start of an operation and a timestamp at the end can observe a negative elapsed time.
Robust code uses a monotonic clock for measuring elapsed time (which never goes backwards) and the wall clock only for human-readable timestamps. Many languages expose both:
# Wall clock — suitable for timestamps, not duration measurement
import time
wall = time.time() # Can go backwards after NTP correction
# Monotonic clock — suitable for measuring elapsed time
import time
mono = time.monotonic() # Never goes backwards
The Ordering Problem
The fundamental issue: without shared clocks, you can’t reliably order events across machines using timestamps.
Consider this scenario:
Machine A (clock: 10:00:00.000): Write X=1
Machine B (clock: 09:59:59.998): Write X=2
Machine B’s clock is 2ms behind. Both writes are “simultaneous” in real time, but if you order by timestamp, Machine A’s write appears to come last. If you’re using “last write wins,” Machine A’s value persists — even though B wrote later by real-world time.
Now consider the even worse case: you need to audit a system and determine whether Event A caused Event B. “Event A happened at 10:00:00.001 on Server 1 and Event B happened at 10:00:00.003 on Server 2” doesn’t tell you if A caused B — not with clocks that can be off by 50ms.
Lamport Clocks: Logical Time
In 1978, Leslie Lamport published “Time, Clocks, and the Ordering of Events in a Distributed System” — one of the most cited papers in computer science. In it, he proposed logical clocks: a way to order events based on causality rather than physical time.
The key insight is Lamport’s happens-before relation, written →.
We say A → B (“A happened before B”) if:
- A and B are events in the same process, and A comes before B in program order
- A is the sending of a message and B is the receipt of that same message
- There exists some C where A → C and C → B (transitivity)
If neither A → B nor B → A holds, A and B are concurrent — neither caused the other.
How Lamport timestamps work
Each process maintains a counter, initially 0.
Rule 1: Before executing an event, increment the counter.
Rule 2: When sending a message, include the current counter value.
Rule 3: When receiving a message with timestamp T, set the counter to max(own_counter, T) + 1.
Process A Message Process B
L=0 L=0
event a1
L=1
──(ts=1)──>
L=max(0,1)+1 = 2
event b1
L=3
event b2
L=4
<──(ts=4)──
L=max(1,4)+1 = 5
event a2
L=6
If A → B, then timestamp(A) < timestamp(B).
But the converse doesn’t hold: timestamp(A) < timestamp(B) does NOT mean A → B. Concurrent events can have any timestamp relationship.
Lamport clocks give you a partial order, not a total order. You can use them to identify when one event definitely happened before another. You cannot use them to identify when two events are concurrent — two events with different Lamport timestamps might still be concurrent.
Vector Clocks: Tracking Causality
To detect concurrency, you need vector clocks.
Instead of a single counter, each process maintains a vector of counters — one per process.
System with 3 processes: A, B, C
Each vector clock: [A_count, B_count, C_count]
Rule 1: On an internal event, increment your own counter.
Rule 2: When sending a message, increment your own counter and send the full vector.
Rule 3: When receiving a message with vector V, take the component-wise maximum of your vector and V, then increment your own counter.
Process A: [0,0,0]
Process B: [0,0,0]
Process C: [0,0,0]
A performs event: A=[1,0,0]
A sends to B (ts=[1,0,0]):
B receives: B=[max(0,1), max(0,0)+1, max(0,0)] = [1,1,0]
B performs event: B=[1,2,0]
B sends to C (ts=[1,2,0]):
C receives: C=[max(0,1), max(0,2), max(0,0)+1] = [1,2,1]
A performs another event: A=[2,0,0]
Now we can compare events:
Vector clock V1 happens-before V2 if every component of V1 is ≤ every component of V2, and at least one component is strictly less.
V1 and V2 are concurrent if neither dominates the other.
[1,0,0] vs [1,1,0]: [1,0,0] happens-before [1,1,0] ✓
[2,0,0] vs [1,2,1]: concurrent (neither dominates)
[1,2,1] vs [2,2,1]: [1,2,1] happens-before [2,2,1] ✓
This is exactly what you need for conflict detection in distributed databases. When two writes have concurrent vector clocks, they’re conflicting writes — neither caused the other, and you need a merge strategy.
DynamoDB’s original design used vector clocks for exactly this purpose (though they called them “version vectors” and later moved to a simpler model). Riak still uses them.
The cost: vector clocks grow
Vector clocks have one entry per process. In a system with thousands of processes, this is impractical. Various optimizations exist (dotted version vectors, pruning), but vector clocks scale poorly to very large systems. This is why many systems use simpler approaches with weaker guarantees.
Hybrid Logical Clocks (HLC)
HLC is a practical combination of physical and logical time. The idea: track a logical timestamp that’s always at least as large as the current wall clock. When you need to create a timestamp:
- If your wall clock is ahead of your current HLC, advance the HLC to match the wall clock
- If your HLC is already ahead of the wall clock, increment the logical component
The result: events get timestamps that are close to real time (within NTP error bounds) and still maintain the happens-before property.
CockroachDB uses HLC. It enables a clever optimization: instead of 2PC coordination for distributed transactions, CockroachDB uses HLC timestamps and a brief uncertainty window to determine if a historical read can be served safely without coordination.
TrueTime: The Google Solution
In 2012, Google published the Spanner paper describing their globally-distributed relational database. At the heart of Spanner is TrueTime: a system that provides bounded time uncertainty using GPS receivers and atomic clocks in every datacenter.
Instead of returning a single timestamp, TrueTime returns an interval: [earliest, latest], with a guarantee that the true current time lies within that interval. The interval width is typically 1-7 milliseconds.
Spanner uses this to provide external consistency (essentially linearizability) for distributed transactions:
- Acquire locks for the transaction
- Get TrueTime interval [T_earliest, T_latest]
- Choose commit timestamp T_s = T_latest
- Wait until the current time is definitely past T_s (i.e., wait for the interval to close)
- Release locks and commit
By waiting for the uncertainty interval to pass, Spanner guarantees that any transaction started after this one will see it — the commit timestamp is in the real past by the time the commit is visible.
This is elegant and expensive. It requires dedicated hardware (GPS clocks, atomic clocks) in every datacenter. It’s not available to the rest of us, but understanding it helps clarify what “global consistency” actually requires.
What This Means in Practice
Don’t use timestamps for ordering
“Last write wins by timestamp” loses data silently. Timestamps in distributed systems are wrong by up to tens of milliseconds and can go backwards. Using them to determine causality is asking for trouble.
If you must use LWW, use a monotonically increasing logical counter (like a database sequence or a Lamport timestamp) rather than a wall clock timestamp.
NTP failure is a real failure mode
Monitor clock drift. Alert when clocks diverge from expected values. If a node’s clock drifts significantly, its logical clock-based timestamps will be wrong, and any order-dependent logic will misbehave.
Duration measurement: use monotonic clocks
For measuring “how long did this take,” always use a monotonic clock. Never subtract two time.now() calls if you care about the result being non-negative.
For conflict detection: vector clocks or CRDTs
If you need to detect concurrent writes and merge them correctly, you need either vector clocks or data types designed to merge without conflicts. “Last write wins” is not conflict resolution — it’s conflict hiding.
The production moment
Here’s when this bites you: you have two services, both reading from a database and writing results somewhere. You add a timestamp to the result to enable “show the latest version.” One service’s server has a clock that’s 200ms ahead. Its results consistently beat the other service’s results in “last write wins” comparisons, even when the other service’s results are newer.
You debug this for hours. Finally you check the NTP status on each server. One of them hasn’t resynchronized in three days. Clocks diverged. Logic based on clock ordering was wrong.
This is not a hypothetical. It happens.
Summary
Physical clocks drift, can go backwards, and are imprecise. You cannot use wall clock timestamps to reliably order events across machines.
Lamport clocks provide a partial order that respects happens-before relationships. They’re simple, cheap, and sufficient for many use cases.
Vector clocks provide true causality tracking — they can detect when two events are concurrent, not just ordered. They’re more expensive and don’t scale to large process counts.
Hybrid logical clocks combine physical and logical time, giving you timestamps that are close to real time and still logically ordered.
TrueTime provides bounded physical time uncertainty using specialized hardware, enabling external consistency at Google scale.
For most applications: use monotonic clocks for duration, log-sequence-based ordering for event ordering, and be very skeptical of any logic that depends on wall clock timestamps for correctness.
Next: when nodes fail (and they will), how do you keep the system running? Fault tolerance.
Fault Tolerance: Designing for the Failures You Know and the Ones You Don’t
Everything fails. Hardware fails. Software fails. Operators make mistakes. Networks have incidents. Cloud providers have outages that knock out three availability zones simultaneously (this has happened). The goal of fault tolerance isn’t to prevent failure — it’s to ensure that when failure happens, the blast radius is limited, the system degrades gracefully, and recovery is possible.
This chapter is about the concrete patterns and techniques for building systems that fail less badly.
A Taxonomy of Failure
Before designing for failure, be precise about what can fail and how.
Hardware failures
Disks fail. Estimated annualized failure rate for enterprise SSDs: 0.5-2%. For spinning disks: 2-5%. In a cluster of 1,000 disks, expect 5-50 disk failures per year. This is normal. RAID and replication exist because hardware failure is expected, not exceptional.
Servers fail. Memory fails with bit errors (ECC memory catches most of these). Network interface cards fail. Power supplies fail. Datacenter power fails (and sometimes the backup generator doesn’t start).
Software failures
Programs crash. Processes run out of memory and get killed by the OS. Bugs appear under edge conditions that testing didn’t cover. Libraries have concurrency bugs that only trigger under specific scheduling conditions. Garbage collectors pause for 10 seconds and everything downstream times out.
Operator failures
Configurations get changed. The wrong service gets restarted. A deployment pushes a bad version. A database migration locks a table for longer than expected. An autoscaling rule triggers during a traffic spike and scales down instead of up.
Cascading failures
The most insidious: a failure in one component causes failures in others. A slow database causes queries to pile up, which increases memory usage, which causes the application server to slow down, which causes the load balancer to mark it unhealthy, which concentrates load on fewer servers, which causes them to slow down too. A cascade can take a system from “one slow node” to “total outage” in minutes.
Idempotency: The Foundation of Safe Retries
Almost every fault tolerance technique involves retrying operations. Retries only work safely if operations are idempotent — producing the same result whether executed once or many times.
Consider two operations:
// NOT idempotent:
account.balance += 10
// Idempotent:
account.balance = 100
If you execute the first operation twice, you’ve added 20 instead of 10. If you execute the second operation twice, you get the same result as executing it once.
Making operations idempotent
Idempotency keys: Assign each operation a unique ID. The server stores which IDs it has processed. On receipt, check: have I seen this ID? If yes, return the stored result without re-executing. If no, execute and store the result.
def process_payment(payment_id: str, amount: int, account_id: str):
# Check if we've already processed this payment
existing = db.query(
"SELECT result FROM processed_payments WHERE id = ?",
payment_id
)
if existing:
return existing.result # Idempotent: return stored result
# Execute the actual operation
result = charge_account(account_id, amount)
# Store the result atomically
db.execute(
"INSERT INTO processed_payments (id, result) VALUES (?, ?)",
payment_id, result
)
return result
This pattern requires generating a unique ID per logical operation (not per attempt). The ID should be generated on the client side before the first attempt, and reused on retries.
Natural idempotency: Some operations are naturally idempotent. Setting a key to a value is idempotent. Deleting a record that might not exist (and returning success either way) is idempotent. Incrementing a counter is not.
Conditional operations: “Set X to 5 if it’s currently 3” is idempotent — it only succeeds if the precondition holds. Using optimistic locking / compare-and-swap (CAS) operations makes many things idempotent.
Retries: When to Retry and When Not To
The naive retry: if a request fails, wait a moment and try again.
The problems:
- Retrying a non-idempotent operation causes the operation to execute multiple times
- Retrying immediately puts more load on a system that’s already struggling
- Retrying non-retryable errors wastes time (a 400 Bad Request won’t succeed on retry)
- Infinite retries without a circuit breaker can mask outages and fill up queues
What to retry
Retry transient errors: network timeouts, connection resets, 503 Service Unavailable, 429 Too Many Requests (after the indicated backoff).
Don’t retry permanent errors: 400 Bad Request, 401 Unauthorized, 404 Not Found, 422 Validation Error. These won’t succeed on retry.
Maybe retry: 500 Internal Server Error (could be transient, could be a bug). 408 Request Timeout (might succeed on retry with different timing).
Exponential backoff with jitter
Don’t retry immediately. Wait, then retry. Wait longer, then retry again. Each successive wait should be longer than the last.
Why? If 1,000 clients are all timing out and all retry at the same time, you’ve just generated a second wave of requests that’s just as large as the first. The system was already overwhelmed. Now it’s being hit by 1,000 retries simultaneously. This is called a thundering herd.
Exponential backoff spreads retries out over time:
import random
import time
def retry_with_backoff(operation, max_attempts=5, base_delay=1.0, max_delay=60.0):
for attempt in range(max_attempts):
try:
return operation()
except TransientError as e:
if attempt == max_attempts - 1:
raise
# Exponential backoff: 1s, 2s, 4s, 8s, 16s (capped at max_delay)
delay = min(base_delay * (2 ** attempt), max_delay)
# Add jitter: randomize within [delay/2, delay]
# This spreads retries from different clients
jittered_delay = delay * (0.5 + random.random() * 0.5)
time.sleep(jittered_delay)
The jitter is important. Without it, all clients that started failing at the same time will retry at the same times. With jitter, their retry windows are spread randomly.
Timeouts: How Long Is Long Enough?
Every network call needs a timeout. Every one. A call without a timeout can hang indefinitely, holding resources (threads, connections, memory) while the caller waits for something that will never come.
Choosing a timeout:
-
Profile the normal case. If your p99 latency is 200ms, a timeout of 5 seconds is probably too generous — you’re waiting 25x longer than usual for failure. A timeout of 500ms might be too tight (you’d fail 1% of healthy requests). Something like 2-3x your p99 is a reasonable starting point.
-
Consider what’s downstream. If Service A calls Service B which calls Service C, set A→B timeout > B→C timeout + B’s processing time. If B times out calling C, it needs time to return a response to A before A times out.
-
Set timeouts at multiple levels. Connection timeout (how long to wait for the connection to establish) vs. read timeout (how long to wait for data after the connection is established) vs. total timeout (maximum total time).
-
Tune over time. Timeouts should be informed by real latency data. Set up percentile dashboards, observe p99/p99.9, and set timeouts accordingly.
The timeout cascade problem
User request (30s timeout)
└─ Service A calls B (25s timeout)
└─ Service B calls C (20s timeout)
└─ Service C is down
[C hangs for 20s]
Service B times out after 20s, returns error to A
Service A has only 5s left, tries one retry, times out
User experiences ~25s wait before getting an error
This is a cascade timeout: each layer’s slow failure eats into the timeout budget of its callers. The solution: set tighter timeouts at lower levels, fail fast, and propagate deadlines (a hint about when the original caller will give up anyway).
Deadline propagation: Pass the original request’s deadline through the call chain. If a downstream service receives a request with 100ms remaining before deadline, it should fail immediately rather than starting work it won’t complete in time. gRPC supports this natively via context propagation.
Circuit Breakers: Failing Fast
The circuit breaker pattern is named after the electrical device that disconnects a circuit when it detects excessive current — protecting the rest of the circuit from a fault.
In distributed systems: when a service is failing repeatedly, stop calling it for a while. Fail fast and let it recover.
Circuit breaker states
┌──── too many failures ──────┐
│ ▼
[CLOSED] [OPEN]
(normal) (fail immediately)
▲ │
│ timeout expires
│ ▼
└── success ──── [HALF-OPEN]
(probe with one request)
CLOSED: Normal operation. Requests flow through. Failures are counted in a sliding window.
OPEN: Failures exceeded threshold. Requests fail immediately without even attempting the call. The downstream service gets a chance to recover without being hammered by retries.
HALF-OPEN: After a timeout, allow one request through. If it succeeds, close the circuit. If it fails, return to OPEN and reset the timer.
class CircuitBreaker:
def __init__(self, failure_threshold=5, timeout=30.0, success_threshold=2):
self.failure_threshold = failure_threshold
self.timeout = timeout
self.success_threshold = success_threshold
self.failure_count = 0
self.success_count = 0
self.state = "CLOSED"
self.opened_at = None
def call(self, operation):
if self.state == "OPEN":
if time.time() - self.opened_at > self.timeout:
self.state = "HALF_OPEN"
self.success_count = 0
else:
raise CircuitOpenError("Circuit breaker is open")
try:
result = operation()
self._on_success()
return result
except Exception as e:
self._on_failure()
raise
def _on_success(self):
if self.state == "HALF_OPEN":
self.success_count += 1
if self.success_count >= self.success_threshold:
self.state = "CLOSED"
self.failure_count = 0
elif self.state == "CLOSED":
self.failure_count = 0
def _on_failure(self):
self.failure_count += 1
if self.failure_count >= self.failure_threshold:
self.state = "OPEN"
self.opened_at = time.time()
Circuit breakers are implemented in libraries like Resilience4j (Java), Polly (.NET), and available as service mesh features in Envoy/Istio (where they’re called “outlier detection”).
Bulkheads: Containing Blast Radius
On a ship, bulkheads are watertight compartments. If one compartment floods, the bulkhead prevents it from flooding the rest of the ship.
In distributed systems, bulkheads isolate failures in one part of the system from cascading to others.
Thread pool bulkheads
Instead of one shared thread pool for all downstream calls, use separate pools:
Incoming request thread pool: 100 threads
├─ Service A thread pool: 20 threads
├─ Service B thread pool: 20 threads
├─ Database thread pool: 30 threads
└─ Other: 30 threads
If Service A is slow and backs up all 20 threads, Service B calls still have their own 20 threads available. Without bulkheads, Service A’s slowness would fill up all 100 threads and take down Service B too.
Connection pool bulkheads
Same principle for database/HTTP connection pools. Separate pools for separate services. A connection leak to Service A doesn’t exhaust the connections to Service B.
Rate limiting as a bulkhead
Limit the number of requests from any single source. Prevents one bad client from consuming all your capacity at the expense of others.
Graceful Degradation
Sometimes, rather than failing completely, a system can return a degraded but useful response.
Examples:
- A recommendation service is down → show popular items instead of personalized recommendations
- The user preference service is slow → show the default UI without personalization
- A non-critical analytics write fails → log the error, return success to the user, investigate later
- Full search is unavailable → fall back to basic string matching
Graceful degradation requires explicitly deciding what’s critical and what’s optional in each request path, then designing fallbacks for optional components.
def get_user_homepage(user_id):
# Critical: must succeed or return error
user = user_service.get_user(user_id)
# Optional: show default if fails
try:
recommendations = recommendation_service.get(user_id, timeout=0.5)
except Exception:
recommendations = cache.get("popular_items", default=[])
# Optional: skip if fails
try:
notifications = notification_service.get(user_id, timeout=0.3)
except Exception:
notifications = []
return render_homepage(user, recommendations, notifications)
The key discipline: be explicit about which failures are acceptable and which aren’t. “Swallow all exceptions” is not graceful degradation — it’s hiding failures.
Health Checks and Readiness Probes
A service that’s running but not healthy is often worse than a service that’s down — it accepts requests and fails them, rather than being routed around.
Liveness probe: Is this process alive? If not, restart it.
Readiness probe: Is this process ready to serve traffic? If not, stop sending it traffic.
These are different. A service starting up might be alive but not ready (still loading caches). A service that’s overwhelmed might be alive but temporarily not ready. A service in an unrecoverable error state should fail its liveness probe to trigger a restart.
GET /healthz
→ 200 if process is alive and functional
→ 503 if process should be restarted
GET /readyz
→ 200 if process can serve traffic
→ 503 if process is alive but not ready (starting, draining, overloaded)
Kubernetes, load balancers, and service meshes all use these to route traffic intelligently.
Observability: You Can’t Fix What You Can’t See
Fault tolerance includes the ability to diagnose failures when they happen.
Structured logging: Log events as structured data (JSON) not free-text strings. This makes logs queryable. Log correlation IDs so you can trace a request through multiple services.
Metrics: Track the four golden signals:
- Latency: How long do requests take? (p50, p95, p99)
- Traffic: How many requests per second?
- Errors: What fraction of requests fail?
- Saturation: How full is your most constrained resource (CPU, memory, connection pool)?
Distributed tracing: Track a request’s journey through multiple services. Each service adds a span to the trace. The trace shows the full call tree, with timing at each step. Jaeger, Zipkin, and OpenTelemetry are common implementations.
Alerting on symptoms, not causes: Alert on “error rate > 5%” (symptom that users are experiencing problems). Don’t alert on “CPU > 80%” unless that CPU usage is actually causing user-facing problems. Alert fatigue is a real operational hazard.
Chaos Engineering: Breaking Things on Purpose
The most reliable way to know your fault tolerance works is to test it.
Chaos engineering is the practice of deliberately introducing failures into production (or production-like environments) to verify that systems handle them gracefully and to discover failure modes before they discover you.
Netflix’s Chaos Monkey randomly terminates production instances. Their reasoning: instances will fail eventually; it’s better to fail on a Tuesday afternoon when engineers are watching than at 2am on a Friday.
Starting small:
- Kill a single instance and verify failover works
- Simulate network latency between services and verify timeouts/circuit breakers trigger
- Fill up a disk and verify error handling
- Cut a database connection pool in half and observe the effect
The goal isn’t to cause outages — it’s to verify that your fault tolerance actually works before you need it in anger.
The Production Moment
Here’s the cascade failure that this chapter exists to prevent:
Your recommendation service starts responding slowly (a bad deploy, a database lock, whatever). Your application calls it synchronously in every page load. The slow calls pile up, consuming threads. Your thread pool fills up. New requests can’t be served — they wait for a thread that’s busy waiting for the recommendation service. Your application starts returning 503s. Users retry. More requests. More 503s. Your load balancer marks your app servers as unhealthy. Traffic routes to fewer servers. Those servers are now overloaded. They start failing too.
In five minutes, you went from “recommendations are slow” to “entire application is down.”
With a circuit breaker on the recommendation service, that slow call would have tripped the breaker after a few failures. Subsequent calls would fail immediately (not after waiting 5 seconds). The thread pool stays healthy. The rest of the application keeps running. Users see “we couldn’t load recommendations” but everything else works.
With a bulkhead, the recommendation thread pool fills up, but that doesn’t affect the rest of the application’s threads.
With graceful degradation, users see popular items instead of personalized recommendations and never notice an outage at all.
That’s the difference between a fault-tolerant system and one that isn’t.
Next: the specific failure mode that causes the most philosophical distress in distributed systems — network partitions, where both sides are running, neither can see the other, and neither can tell which one has the correct view of reality.
Network Partitions: When the Network Lies to You
The network partition is the defining failure mode of distributed systems. Not because it’s the most common failure — it isn’t — but because it’s the most philosophically troubling, the most difficult to reason about, and the one that forces you to confront the hardest trade-offs.
A network partition is not “the network is down.” It’s more subtle: two parts of your system are running and healthy, but they can’t talk to each other. Both sides are operational. Both sides are processing requests. Both sides believe they’re correct. Neither side can tell whether the other is down or whether the link between them has failed.
From inside either partition, there’s no way to know which scenario is true.
What a Partition Actually Looks Like
Consider a database cluster: three nodes, replicating state to each other.
Normal operation:
Node A ──── Node B ──── Node C
(all three talking, all agree)
A partition occurs — the network link between Node A and Nodes B+C fails:
After partition:
Node A ╳╳╳ Node B ──── Node C
(A can't reach B or C)
(B and C can't reach A)
From Node A’s perspective: Nodes B and C have stopped responding. Are they crashed? Is the network between them failed? Node A cannot know. Node A has been receiving client requests during this time. Should it serve them?
From Nodes B and C’s perspective: Node A has stopped responding. Same questions. Same uncertainty.
This is the fundamental problem. Decisions must be made. The system can’t just pause and wait — clients are waiting for responses right now.
The Split-Brain Problem
If your system doesn’t handle this correctly, you get split-brain: both partitions believe they’re the authoritative half and accept writes independently.
After partition:
Node A (believes it's primary): Node B ──── Node C (elect new primary)
Client 1 ─> write X=1 to A Client 2 ─> write X=2 to B
Both writes succeed.
Both clients get a success response.
X=1 on Node A.
X=2 on Node B and C.
When partition heals:
??? (which value is correct?)
If you’re a bank and X is an account balance, one of those writes disappears. Which one? Last write wins? But you confirmed both to your clients.
Split-brain is why distributed systems design is hard. It’s not a hypothetical — it happens every time there’s a network partition without proper partition handling.
The Three Responses to a Partition
When a partition occurs, your system can do one of three things:
1. Stop accepting writes on the minority side (CP behavior)
The minority partition (the side that can’t reach a quorum) refuses to accept writes. It may also refuse reads if it can’t guarantee freshness.
This is the correct approach if you can’t tolerate inconsistency. You pay with availability: clients on the minority partition get errors instead of responses.
How quorums help: With 5 nodes, a write requires 3 nodes to acknowledge. If 2 nodes are partitioned off, they can’t form a quorum and refuse writes. The other 3 can still form a quorum and continue operating.
Partition: [Node A, Node B] ╳╳╳ [Node C, Node D, Node E]
Node A: "I can only reach Node B. That's 2 of 5 — no quorum."
→ refuse writes
Node C: "I can reach D and E. That's 3 of 5 — quorum achieved."
→ accept writes
The minority partition is effectively read-only (or offline) until the partition heals. The majority continues as normal. When the partition heals, the minority nodes resync from the majority.
2. Accept writes on both sides (AP behavior)
Both partitions accept writes, accepting that they’ll diverge. When the partition heals, conflicts must be resolved.
This maximizes availability — no client gets an error due to a partition. The cost is that you might have conflicting writes to resolve.
This is viable if:
- Your data types support automatic merging (CRDTs: counters, sets, maps with appropriate semantics)
- Your business logic can tolerate last-write-wins (and the occasional lost write)
- You have an application-level conflict resolution strategy
E-commerce shopping carts are a classic example. If two browser sessions add different items to a cart during a partition, both additions should survive. The merged cart is the union of both. This is a CRDT-friendly operation.
Account balances are a counterexample. If two ATMs dispense cash during a partition, the total balance might go negative. Automatic merging doesn’t work here — you need coordination.
3. Detect and queue
Don’t respond immediately. Queue the operation, detect the partition, and either fail it or hold it until the partition heals.
This is what message queues and write-ahead logs do. Instead of writing to the database directly, you write to a durable queue. The queue acknowledges receipt. Later, when the database is available, the operation is applied.
This trades latency for durability: the client gets a “queued” response, not a “committed” response. The operation will be applied eventually. This works for asynchronous workflows (sending emails, processing jobs) but not for synchronous interactions (“is this username taken?”).
Detecting Partitions
From inside a distributed system, there’s no reliable way to distinguish a partition from a crashed node. Both look the same: the remote node has stopped responding.
Heartbeats: Send periodic messages to check aliveness. If X consecutive heartbeats fail, assume the remote node is unreachable (either crashed or partitioned). Kubernetes does this; etcd’s leader heartbeats do this; most distributed databases do this.
The timeout problem: Choose the heartbeat timeout. Too short: false positives trigger unnecessary failovers. Too long: real failures take a long time to detect. There’s no universally right answer.
Phi Accrual Failure Detector: Instead of a binary “up or down” threshold, compute a probability score based on historical heartbeat timing. As heartbeats come in late or not at all, the score rises. Different systems trigger at different threshold scores. This adapts better to variable network conditions. Cassandra uses it.
Partition Tolerance in Practice
Here’s the uncomfortable truth about partition tolerance in production systems: partitions are rare but not exceptional.
In a large enough system running long enough, every possible failure mode occurs. Partitions happen due to:
- A misconfigured firewall rule that drops traffic between two services
- A NIC that starts dropping some percentage of packets (not all — some)
- A cloud provider’s network maintenance window that wasn’t communicated
- A BGP routing change that briefly splits your datacenter
- A firewall that times out long-lived connections mid-request
- A load balancer misconfiguration that routes some traffic to unreachable backends
Many of these produce partial partitions — some traffic gets through, some doesn’t. This is worse than a complete partition, because it’s harder to detect and triggers edge cases in partition detection logic.
Gray failures are a specific horror: the network doesn’t drop packets, it just delays them. Delays that are long enough to trigger timeouts but not long enough to produce connection errors. The system thinks the remote node is unreachable (timeout), but it’s actually just slow. If you trigger failover based on this, you might cause a split-brain on a system that was actually fine.
Designing for Partitions
Use quorums correctly
Ensure your write quorum overlaps with your read quorum. With N=3 replicas, W=2 writes, R=2 reads: W+R=4>3=N. Any read will include at least one node that received the last write.
During a 1-node partition, writes and reads can still proceed (1 node is down; 2 remain; 2>1 = quorum). During a 2-node partition (only 1 node reachable), operations are refused: no quorum achievable.
Make operations idempotent everywhere
During a partition, queued operations may be retried when the partition heals. If they’re not idempotent, retries cause duplicates. Design every operation to be safe to execute multiple times with the same effect as once.
Design for eventual reconciliation
If your system uses AP behavior, plan the reconciliation strategy before you need it. What happens when a node that was partitioned comes back? Does it accept the state it missed? Does it replay its writes to the rejoining nodes? Does it have conflicts to resolve?
Most databases handle this automatically (replication catchup). At the application level, you may have jobs that need to be reconciled, state that needs to be merged, or events that need to be replayed.
Test with real partition simulation
Tools like tc netem (Linux traffic control) and iptables let you simulate partitions in test environments:
# Drop all traffic to/from a specific host
iptables -A INPUT -s 192.168.1.100 -j DROP
iptables -A OUTPUT -d 192.168.1.100 -j DROP
# Simulate 50% packet loss
tc qdisc add dev eth0 root netem loss 50%
# Simulate 200ms latency
tc qdisc add dev eth0 root netem delay 200ms
# Restore
iptables -D INPUT -s 192.168.1.100 -j DROP
iptables -D OUTPUT -d 192.168.1.100 -j DROP
tc qdisc del dev eth0 root
Tools like Toxiproxy (Shopify), Chaos Mesh (Kubernetes), and Pumba (Docker) provide higher-level partition simulation. Netflix’s Chaos Monkey and its successors do this in production.
The Fencing Problem in Detail
When a partition heals and a previously-partitioned leader tries to rejoin, you have a specific problem: the old leader may still believe it’s the leader, while a new leader has been elected.
This is the fencing problem, and it requires an active mechanism to prevent the old leader from acting.
STONITH (Shoot The Other Node In The Head): Literally power off the old leader before electing a new one. Ensures the old leader cannot take any actions. Requires hardware-level access (IPMI, iLO, cloud API to terminate the instance).
Fencing tokens: When a leader acquires its lease, it gets a monotonically increasing token. Every write must include this token. If a node attempts a write with a lower token than what the storage system has seen, the write is rejected.
Leader 1 acquires lease, gets token=42
[network partition]
Leader 2 elected, gets token=43
Leader 2 starts writing with token=43
[partition heals]
Leader 1 attempts write with token=42
Storage: "I've seen token=43. Token=42 is stale. Rejected."
This prevents the “zombie leader” problem where an old leader comes back and stomps over writes made by the new leader.
The Humbling Lesson
Network partitions are a reminder that your system has no global view. Every node knows only what it has seen. It cannot know what other nodes have seen, or what the true current state of shared data is, without coordination — and coordination requires a network that isn’t always reliable.
This is the core reason distributed systems are hard: not the complexity of any single algorithm, but the impossibility of perfect shared knowledge. Every decision in a distributed system is made under partial information.
The right response isn’t despair. It’s: understand your system’s partition behavior, make the consistency/availability trade-off deliberately for each data type in your system, design for reconciliation, test partition handling, and monitor for the early signs of network degradation before it becomes a full partition.
Most importantly: know whether your system will be CP or AP under partition, before the partition happens. “We’ll figure it out when it occurs” is not a partition strategy.
Next: the specific patterns — sagas, outbox, circuit breakers, and others — that turn these principles into working code.
Patterns That Work: Sagas, Outbox, Circuit Breakers, and Friends
Previous chapters have been about problems. This one is about solutions — or more precisely, about the patterns that smart engineers have repeatedly converged on when solving distributed systems problems. These aren’t novel ideas; they’re battle-tested approaches with known trade-offs.
If chapters 1-9 are the theory, this is the practice.
The Transactional Outbox Pattern
The problem it solves: You need to perform a database write and publish an event/send a message, and you need both to happen atomically (either both succeed or neither does).
This comes up constantly. You save a new order to the database and need to publish an OrderCreated event to a queue. If the database write succeeds but the message publish fails, your system is inconsistent. If the message publish succeeds but the database write fails, you’ve published a lie.
The naive solution — write to DB, then publish — breaks because these are two separate operations with no transaction spanning them:
// This is broken
db.save(order) // succeeds
queue.publish(OrderCreated) // network error!
// order exists in DB, event never published
The outbox pattern:
Write both to the database, in the same transaction:
BEGIN TRANSACTION;
INSERT INTO orders (id, ...) VALUES (...);
INSERT INTO outbox (id, event_type, payload, published)
VALUES (uuid(), 'OrderCreated', '{"orderId": ...}', false);
COMMIT;
A separate process (the “outbox poller” or “relay”) reads unpublished outbox entries and publishes them to the queue:
def outbox_relay():
while True:
events = db.query("""
SELECT * FROM outbox
WHERE published = false
ORDER BY created_at
LIMIT 100
FOR UPDATE SKIP LOCKED
""")
for event in events:
try:
queue.publish(event.event_type, event.payload)
db.execute(
"UPDATE outbox SET published = true WHERE id = ?",
event.id
)
except Exception:
log.error(f"Failed to publish {event.id}, will retry")
time.sleep(0.1) # poll interval
The guarantee: if the transaction commits, the event is in the outbox. The relay will publish it eventually, retrying if necessary. Because the relay can retry (and the queue consumer must be idempotent), this gives you at-least-once delivery with the ordering guarantees of your database.
For high-throughput systems, replace polling with Change Data Capture (CDC): tools like Debezium tail the database’s replication log (binlog for MySQL, WAL for Postgres) and publish events for each committed change. Zero polling overhead; events published in near-real-time.
Saga Pattern: Distributed Transactions Without 2PC
The problem it solves: A business operation spans multiple services, each with its own database. You need the operation to be consistent — either all services complete their step, or the operation is rolled back across all of them.
The classic example: placing an order requires:
- Reserving inventory (Inventory Service)
- Charging the payment method (Payment Service)
- Creating the shipment (Shipping Service)
- Sending the confirmation email (Notification Service)
If payment succeeds but inventory reservation fails, you’ve charged a customer for something you can’t ship. If shipment succeeds but payment fails, you’ve shipped for free. You need transactional guarantees across services that don’t share a database.
Two-phase commit across services is technically possible but terrible in practice: it requires all services to implement 2PC, creates tight coupling, and leaves the system blocked if the coordinator fails.
Sagas take a different approach: decompose the distributed transaction into a sequence of local transactions, each with a corresponding compensating transaction that undoes it.
Place Order Saga:
Step 1: Reserve inventory → compensate: release reservation
Step 2: Charge payment → compensate: refund charge
Step 3: Create shipment → compensate: cancel shipment
Step 4: Send confirmation email → compensate: (none / send cancellation)
If any step fails, execute the compensating transactions for all completed steps in reverse order:
Step 1: Reserve inventory ✓
Step 2: Charge payment ✓
Step 3: Create shipment ✗ (failure!)
→ Compensate step 2: Refund charge
→ Compensate step 1: Release reservation
Saga complete (rolled back)
Choreography vs Orchestration
There are two approaches to implementing sagas:
Choreography: Each service publishes events. Other services listen for events and take the next action.
Order Service publishes OrderCreated
→ Inventory Service hears it, reserves inventory, publishes InventoryReserved
→ Payment Service hears it, charges card, publishes PaymentProcessed
→ Shipping Service hears it, creates shipment
Advantages: decoupled, no single point of failure. Disadvantages: the business logic is distributed across services, hard to see the full flow, complex to debug.
Orchestration: A central “saga orchestrator” tells each service what to do next.
Order Orchestrator:
1. Call Inventory Service: reserve(orderId)
2. Call Payment Service: charge(orderId)
3. Call Shipping Service: create_shipment(orderId)
if any step fails:
4. Call Payment Service: refund(orderId) (if charged)
5. Call Inventory Service: release(orderId) (if reserved)
Advantages: business logic is in one place, easy to visualize and debug, easier to implement complex compensation logic. Disadvantages: the orchestrator is a central point of failure (mitigated by making it stateless and persisting state to a database).
The orchestration implementation: Store the saga state in a database. On each step completion (success or failure), update the state. The orchestrator can be restarted from any point — it just picks up from the last saved state.
class OrderSaga:
def __init__(self, db, inventory_svc, payment_svc, shipping_svc):
self.db = db
self.inventory = inventory_svc
self.payment = payment_svc
self.shipping = shipping_svc
def execute(self, order_id):
state = self.db.get_or_create_saga_state(order_id)
if state.step < 1:
result = self.inventory.reserve(order_id)
if not result.success:
return self.compensate(order_id, step=0)
self.db.update_saga_state(order_id, step=1, inventory_reservation_id=result.id)
if state.step < 2:
result = self.payment.charge(order_id)
if not result.success:
return self.compensate(order_id, step=1)
self.db.update_saga_state(order_id, step=2, payment_id=result.id)
if state.step < 3:
result = self.shipping.create_shipment(order_id)
if not result.success:
return self.compensate(order_id, step=2)
self.db.update_saga_state(order_id, step=3, shipment_id=result.id)
self.db.update_saga_state(order_id, status="completed")
def compensate(self, order_id, step):
state = self.db.get_saga_state(order_id)
if step >= 2 and state.payment_id:
self.payment.refund(state.payment_id)
if step >= 1 and state.inventory_reservation_id:
self.inventory.release(state.inventory_reservation_id)
self.db.update_saga_state(order_id, status="compensated")
What sagas don’t give you
Sagas provide atomicity (either all steps complete or they’re compensated) but not isolation. While a saga is executing, other sagas can see intermediate states. If two sagas both try to reserve the same last unit of inventory, one might succeed and then later fail at payment, while the other was told “no inventory” incorrectly.
This is the dirty reads problem in saga terms. Mitigations: semantic locks (mark resources as “reserved” before they’re fully committed), process managers that coordinate conflicting sagas, careful ordering of steps (reserve last-item-in-stock at the end, not the beginning).
Circuit Breaker (Detailed Implementation Notes)
Covered in the fault tolerance chapter, but worth expanding on implementation details.
What counts as a failure?
- HTTP 5xx responses (server errors)
- Network timeouts
- Connection refused errors
What doesn’t count as a failure?
- HTTP 4xx responses (client errors — these aren’t the downstream service’s fault)
- Business logic errors (an order can’t be fulfilled because it’s invalid — that’s not a service failure)
Misclassifying 4xx as failures causes circuit breakers to trip on bad client input, which is not what you want.
Sliding window vs count-based thresholds
Count-based: “trip after 5 failures.” Simple but doesn’t account for recovery time. If the service had 5 failures in the last hour, the circuit would be tripped even though it’s been healthy for 55 minutes.
Time-based: “trip if error rate exceeds 50% in the last 10 seconds.” More accurate. Requires tracking recent history. Libraries like Resilience4j implement sliding windows.
Half-open probing strategy
Don’t just send one probe when moving to HALF_OPEN. The single probe might fail by chance, sending you back to OPEN when the service has actually recovered. Consider:
- Sending a configurable number of probes before fully closing
- Using a lower failure threshold to re-open from HALF_OPEN
- Incrementally increasing allowed traffic (10% → 25% → 50% → 100%)
Read-Your-Writes via Sticky Sessions or Version Tokens
The problem: You write to a primary database, immediately redirect to a page that reads from a replica. The replica hasn’t caught up. Your write is invisible. User reports a bug.
Solution 1: Sticky reads to primary after writes
Track a “recently wrote” flag (in a session cookie, JWT claim, or request header). For a short window (e.g., 30 seconds) after a write, route reads to the primary.
def handle_write(user_id, data):
db.primary.write(data)
session["wrote_at"] = time.time()
return redirect(...)
def handle_read(user_id):
wrote_recently = (
"wrote_at" in session and
time.time() - session["wrote_at"] < 30
)
db_to_use = db.primary if wrote_recently else db.replica
return db_to_use.read(user_id)
Disadvantage: routes potentially expensive reads to the primary, partially defeating the purpose of replicas.
Solution 2: Replication position tokens
After a write, capture the WAL/replication position (a log sequence number). Pass this as a token to subsequent requests. On the replica, wait until the replica has caught up to at least that position before serving the read.
Client writes → Primary returns LSN: 12345
Client next request includes token: min_lsn=12345
Replica checks: is my current LSN ≥ 12345?
If yes: serve read
If no: wait briefly, recheck (or route to primary)
This is more precise than time-based routing. PostgreSQL exposes its WAL LSN; MySQL exposes binlog coordinates. CockroachDB has built-in follower reads with this mechanism.
Event Sourcing
The pattern: Instead of storing the current state of an entity, store the sequence of events that produced that state. Current state is computed by replaying events.
Traditional approach:
accounts table: { id: 1, balance: 150 }
Event sourcing approach:
events table:
{ account_id: 1, type: "Deposit", amount: 100, at: T1 }
{ account_id: 1, type: "Deposit", amount: 100, at: T2 }
{ account_id: 1, type: "Withdrawal", amount: 50, at: T3 }
Current balance: sum events to get 150
Advantages:
- Full audit trail: you know exactly how you got to current state
- Time travel: compute state at any historical point
- Event replay: rebuild projections from scratch (useful for adding new views)
- Natural fit with CQRS and event-driven architecture
Disadvantages:
- Querying current state requires aggregation (mitigated with snapshots and read models)
- Schema evolution of events is hard (old events must still be interpretable)
- Mental model shift — requires discipline to work with
- Storage grows without bound unless snapshots and compaction are implemented
Event sourcing is not universally applicable, but for domains where audit, history, and the ability to re-derive state are important (financial systems, legal records, collaborative editing), it’s a natural fit.
CQRS: Command Query Responsibility Segregation
The pattern: Separate the read path from the write path. Write operations (commands) update an authoritative data store. Read operations (queries) read from one or more separate read models optimized for specific queries.
Write path:
Command → validate → execute → update write store → publish events
Read path:
Event listener → update read model(s)
Query → read from read model
Why it helps: Write stores don’t need to be optimized for queries. Read models can be denormalized, pre-aggregated, and optimized for specific access patterns. Multiple read models can be maintained for different query patterns.
Write store: normalized relational tables
Read model A: denormalized table for customer order history (optimized for "show me my orders")
Read model B: Elasticsearch index for full-text order search
Read model C: materialized view for monthly revenue reporting
The trade-off: Read models are derived from events and may lag behind the write store. Queries may see slightly stale data. This is the eventual consistency in CQRS.
CQRS pairs naturally with event sourcing (events update the read models) but can also be used with traditional state-based persistence.
Sidecar/Proxy Pattern for Cross-Cutting Concerns
The pattern: Run a proxy process alongside each service instance. The proxy handles cross-cutting concerns (retries, circuit breaking, load balancing, tracing, mutual TLS) so the service doesn’t have to.
Service code:
"Call http://payment-service/charge"
(no retry logic, no circuit breaker, no tracing)
Sidecar proxy intercepts:
- Adds distributed trace headers
- Enforces mTLS
- Applies retry policy
- Applies circuit breaker
- Load balances across payment-service instances
- Reports metrics
Service sees a simple HTTP call.
The sidecar handles everything else.
This is the service mesh pattern: Envoy, Linkerd, Consul Connect. The infrastructure concerns are handled at the infrastructure level, not in application code.
The trade-off: Operational complexity. You’re now running two processes per service instance. The proxy is another thing that can fail, another thing to configure, another thing to understand. The benefit (uniform policy enforcement, language-agnostic implementation) is real, but the cost is real too.
Inbox Pattern (Idempotent Message Consumption)
Companion to the outbox pattern. When consuming messages from a queue, you need to handle duplicates — the queue might deliver the same message more than once (at-least-once delivery).
The pattern: Before processing a message, check if you’ve already processed it. If yes, skip processing (return success). If no, process it and record that you’ve done so.
def handle_message(message):
message_id = message.headers["message-id"]
# Check if already processed (atomic with processing via DB transaction)
with db.transaction():
if db.query("SELECT 1 FROM processed_messages WHERE id = ?", message_id):
return # Already processed, skip
# Process the message
process_business_logic(message.body)
# Record as processed
db.execute(
"INSERT INTO processed_messages (id, processed_at) VALUES (?, NOW())",
message_id
)
This requires messages to have a stable, unique ID. Most message brokers (Kafka, SQS, RabbitMQ) provide message IDs. For Kafka, the combination of topic + partition + offset is a unique identifier.
Patterns Summary
| Pattern | Problem Solved | Key Trade-off |
|---|---|---|
| Transactional Outbox | Atomic DB write + event publish | Added latency; requires outbox poller |
| Saga (choreography) | Distributed transactions | Hard to visualize; complex compensation |
| Saga (orchestration) | Distributed transactions | Orchestrator is central point of concern |
| Circuit Breaker | Cascade failure prevention | Configuration complexity; false trips |
| Idempotency Key | Safe retries | Requires unique ID generation; storage |
| Read-Your-Writes | Replication lag consistency | Routes load to primary |
| Event Sourcing | Audit trail, history, replay | Storage growth; schema evolution |
| CQRS | Read/write optimization | Eventual consistency on reads |
| Sidecar/Service Mesh | Cross-cutting concerns | Operational complexity |
| Inbox / Deduplication | Idempotent message consumption | Storage for processed message IDs |
None of these patterns are magic. Each adds complexity in exchange for solving a specific problem. The discipline is matching the pattern to the problem — not reaching for a saga when a simple transaction would do, and not “just using a transaction” when you actually need a saga.
Next: the chapter that ties everything together — how to make actual architecture decisions for your actual system, with your actual constraints.
Making Real Decisions: How to Think About Your Actual System
Everything in this book up to this point has been building toward this chapter. You now have a mental model of the failure modes, the theoretical constraints, and the patterns. The question is: how do you apply all of that to your actual system, with your actual team, your actual scale, and your actual constraints?
This chapter is opinionated. That’s the point.
Start With the Actual Problem
The most common mistake in distributed systems design is solving the wrong problem at the wrong scale.
Before you reach for a distributed database, a message queue, or a saga pattern, ask:
What is the simplest system that solves the actual problem?
A monolith with a single PostgreSQL database is not an embarrassing architecture. It’s a legitimate choice that solves real problems with significantly less complexity than a microservices deployment. Shopify ran on a single Rails monolith for years while processing billions in transactions. Stack Overflow runs on three web servers and two SQL Server instances serving millions of requests per day.
The question isn’t “can this scale?” The question is “does it need to scale, and in what way, right now?”
Distributed systems problems are overhead. You pay the overhead in exchange for something: more capacity, better fault isolation, geographic distribution, independent deployability. If you’re not getting the benefit, you’re paying the overhead for nothing.
The Decision Framework
When you need to make an architectural decision, work through this sequence:
1. What is the consistency requirement?
For each piece of state your system manages, determine the minimum consistency guarantee that correctness requires.
Examples:
| Data | Required Consistency |
|---|---|
| User account balance | Strong (linearizable) — incorrect balance is a real error |
| Shopping cart contents | Read-your-writes + monotonic reads — user must see their own edits |
| Social media feed | Causal or eventual — stale feed is acceptable |
| View/like counts | Eventual — approximate counts are fine |
| Distributed lock / leader | Linearizable — split-brain is catastrophic |
| User profile data | Read-your-writes — user must see their own profile changes |
| Session data | Read-your-writes — user must stay logged in |
| Analytics/metrics | Eventual — approximate is sufficient |
Do this exercise explicitly, per-data-type. Not as a team assumption, not as “we’ll use eventual consistency” applied uniformly. Per. Data. Type.
2. What is the availability requirement?
How much downtime is acceptable? Translate that into concrete numbers:
- 99% uptime = 87.6 hours/year of downtime acceptable
- 99.9% = 8.76 hours/year
- 99.99% = 52.6 minutes/year
- 99.999% = 5.26 minutes/year
The cost to achieve each additional 9 is roughly an order of magnitude more expensive. 99% is achievable with a single well-managed server. 99.9% requires redundancy. 99.99% requires active-active multi-zone setups. 99.999% (“five nines”) requires geo-distribution, extremely careful engineering, and probably a dedicated platform team.
What do your users actually experience? A B2B SaaS with enterprise customers has different availability requirements than a consumer app. A midnight maintenance window on an enterprise product is far less painful than 10 minutes of downtime during peak hours on a consumer product.
Be honest about your SLA. If you’re a startup with no SLA, you have more flexibility than you might think. If you have contractual 99.9% SLA, you need to engineer for it.
3. What are the partition scenarios?
Your system will be partitioned. When it is, what should happen?
Option A (CP): Some users get errors. Acceptable if the alternative is corrupted data.
Option B (AP): Some users get stale data. Acceptable if errors are worse than staleness.
Option C (AP with compensation): Some users get stale data, and you have a process to reconcile when the partition heals.
Make this decision now, not during an incident. Write it down. Make sure your team agrees.
4. What’s the actual scale?
Not the imagined scale. The actual, measured, or rigorously estimated scale.
Reads per second: How many reads does your system serve? A typical small SaaS might see 100-1,000 RPS. A medium-scale product might see 10,000-100,000. Large-scale is 100,000+. These are very different engineering problems.
Write throughput: Writes are more expensive than reads in most systems. A single PostgreSQL primary can handle ~10,000 writes/second if they’re simple. More complex writes (with indexing, constraints) are lower.
Data volume: How much data are you storing? PostgreSQL handles terabytes without breaking a sweat on appropriate hardware. Beyond that, you start needing specialized tooling.
Latency requirements: What’s the acceptable latency at p99? Consumer-facing products typically need p99 < 500ms. Latency-sensitive systems (trading, gaming) need p99 < 50ms or less.
Measure first. Estimate if you can’t measure. Don’t guess.
The Database Decision
Most distributed systems problems are really “which database should I use?” in disguise.
Start with a relational database. PostgreSQL specifically. It handles ACID transactions, foreign keys, complex queries, JSON, full-text search, time-series-like queries, geospatial queries, and replication — and it scales further than most teams need. If you outgrow it, you’ll know, and you’ll have good reasons to reach for something else.
The “when to add a read replica” question: when your primary’s CPU is consistently above 70% due to read load. Or when you need geographic proximity for reads. Not before.
The “when to shard” question: when a single node genuinely can’t hold your data or handle your write volume. This is much rarer than people think. Before sharding, profile what’s actually causing your problems. It’s usually schema design, query performance, or missing indexes — not the fundamental limits of a single-node database.
When to reach for a distributed database
CockroachDB or Spanner (distributed SQL): When you need:
- Geo-distributed writes with acceptable latency
- Horizontal write scaling across nodes
- Strong consistency across a distributed cluster
- SQL interface
The cost: higher write latency (consensus required per transaction), operational complexity, cost.
Cassandra or DynamoDB (distributed KV/wide-column): When you need:
- Very high write throughput (millions of writes/second)
- Linear horizontal scalability
- Acceptable eventual consistency
- Simple access patterns (key-based lookups, range scans)
The cost: no joins, limited query flexibility, eventual consistency trade-offs, conflict resolution complexity.
Redis (in-memory): When you need:
- Sub-millisecond reads and writes
- Caching, session storage, rate limiting, pub/sub
- Data that fits in memory or where cache eviction is acceptable
Not a primary database. A cache and operational data store.
Kafka (distributed log): When you need:
- Durable, ordered event streaming
- Multiple consumers reading the same events
- Temporal decoupling between producers and consumers
- Long event retention for replay
Not a database. A durable message log.
The Microservices Decision
Microservices are a deployment and organizational pattern, not a performance optimization. They solve specific problems:
- Independent deployability: Service A can be deployed without affecting Service B
- Independent scalability: Scale the services that need scaling without scaling everything
- Organizational boundaries: Different teams own different services with clear interfaces
- Technology diversity: Different services can use different languages/databases where appropriate
They create specific problems:
- Network calls replace function calls: Added latency, failure modes, serialization overhead
- Distributed transactions: The saga chapter exists because of this
- Operational complexity: N services = N deployment pipelines, N monitoring dashboards, N on-call escalation paths
- Observability: Debugging a problem that spans 5 services is significantly harder than debugging a monolith
The honest trade-off: Microservices are appropriate at organizational scale. When you have multiple teams, they need boundaries. When services have truly different scaling requirements, they need independent scaling. When you’re deploying dozens of times per day, independent deployability matters.
They’re frequently inappropriate for small teams. A team of 5 engineers managing 20 microservices is paying enormous overhead for problems they don’t have.
The “modular monolith” is often the right intermediate step: a single deployable that’s internally organized into well-separated modules with clean interfaces. You get development velocity, simple deployment, easy local development — and when a module genuinely needs to be extracted, it’s easier to do from a clean module boundary than from spaghetti code.
Designing for Operations
A system that works in development and fails mysteriously in production is not a working system.
Observability as a first-class concern
Build logging, metrics, and tracing before you need them. The format: structured JSON logs with correlation IDs. The minimum metrics: request rate, error rate, latency (p50/p95/p99), saturation (connection pool usage, queue depth, memory usage).
Instrument your application so that when something goes wrong, you can answer: “What was happening when this broke?” Without observability, you’re debugging blindfolded.
Runbooks
For every operational procedure (deploying a new version, adding a read replica, failing over to a standby database, rolling back a bad deployment), write the steps down. A runbook doesn’t need to be beautiful — it needs to be accurate. Update it when the procedure changes.
The runbook’s audience is: you, at 3am, after being woken up, slightly panicked. Write for that person.
Graceful degradation in the design phase
Decide, for every external dependency your service has, what to do if that dependency is unavailable. Document it. Implement it. Test it.
“This service will be unavailable if the payment service is down” is a valid choice if the entire function of the service is payment processing. “This service will show cached recommendations if the recommendation service is down” is the right choice if recommendations are non-critical.
Both are reasonable. Undocumented implicit behavior is not.
Debugging Distributed Systems
When something goes wrong in a distributed system, the debugging process is different from debugging a single-process application.
Step 1: Establish a timeline. What happened, in what order, on which services? This requires correlated logs with timestamps and a common correlation ID. If you don’t have this, you’re guessing.
Step 2: Find the divergence point. At what point did the behavior stop being what you expected? Work backwards from the visible symptom to find where the bad state originated.
Step 3: Check the boring stuff first. Disk full? Memory exhausted? Connection pool exhausted? CPU pinned? Replication lag spiked? Before assuming a subtle consensus bug, check that the system has the resources it needs.
Step 4: Check external dependencies. Is the downstream service healthy? Is there network degradation between your services? Is the database having lock contention? Distributed system bugs are often not bugs in your code — they’re emergent behavior from the interaction of multiple systems under stress.
Step 5: Reproduce in isolation. Can you reproduce the problem with a single service in a test environment? If so, you’ve narrowed the problem significantly. If not, it’s likely a timing or interaction issue that requires the full system.
The hardest class of bugs: race conditions that appear under specific timing conditions. These are reproducible only under load and require either distributed tracing (to see the actual sequence of events) or careful log analysis (to reconstruct it after the fact). This is why observability is not optional.
The Checklist
When designing a new distributed system component, answer these questions before writing code:
Consistency
- What consistency guarantee does each data type require?
- Are reads from replicas acceptable for this data? What’s the maximum acceptable lag?
- Do you need read-your-writes consistency? How will you implement it?
Failures
- What happens when downstream Service X is unavailable?
- What happens when a database write succeeds but the subsequent publish fails?
- What happens when a network partition occurs between your nodes?
- Is every operation idempotent, or have you designed safe retry behavior?
Operations
- How do you deploy a new version with zero downtime?
- How do you roll back a bad deployment?
- How do you fail over to a standby database?
- What are your health check endpoints?
- What metrics are you tracking?
- What alerts fire, and at what thresholds?
Testing
- Do you have integration tests that simulate failure scenarios?
- Have you tested failover? (Test it in staging before production needs it.)
- Have you tested with realistic data volumes?
Documentation
- Is the consistency model documented for each data type?
- Is the failure behavior documented for each external dependency?
- Is there a runbook for the key operational procedures?
You don’t need to answer all of these perfectly before you start. You do need to know which ones you’re deferring and why.
The “Good Enough” Question
Not every system needs to be engineered for five nines and global scale. Not every problem needs the most correct solution — it needs a solution that’s correct enough for the use case, with acceptable failure modes, and maintainable by the team that will operate it.
The goal is deliberate decisions with known trade-offs, not perfect decisions.
A shopping cart that occasionally shows one fewer item than expected is a minor bug. A payment system that occasionally double-charges is a serious incident. They deserve different levels of engineering rigor.
Know which kind of system you’re building. Apply the rigor that situation requires. Don’t apply the same rigor uniformly across everything — you’ll either over-engineer the low-stakes parts or under-engineer the high-stakes parts. Usually both.
The Meta-Skill
The meta-skill in distributed systems design is: thinking concurrently about what can go wrong.
When you write db.write() followed by queue.publish(), think: what is the state of the system if the first succeeds and the second fails? Is that state valid? Is it recoverable?
When you design a service that reads from a replica, think: what do users experience when the replica is lagging? Is that acceptable?
When you design a multi-step workflow, think: if the process crashes at step 3, what state is the system in? What needs to happen to complete or roll back from that state?
This habit — examining the gaps between operations — is the difference between code that works in testing and infrastructure that holds up in production.
That’s the practical core. The last chapter points you toward where to go from here.
Where to Go From Here
If you’ve read this far, you now have a coherent mental model for distributed systems. You understand why things go wrong, you have vocabulary for the trade-offs, and you have patterns for the common problems. That’s enough to make meaningfully better decisions about the systems you build.
But this book is an introduction, not a ceiling. Here’s what’s beyond it.
The Papers
The field of distributed systems is unusually well-documented in academic literature. The foundational papers are accessible and worth reading.
Start here:
“Time, Clocks, and the Ordering of Events in a Distributed System” — Leslie Lamport (1978) The paper that defined happens-before, logical clocks, and much of the framework for thinking about ordering in distributed systems. Lamport writes with unusual clarity for an academic. Read the original.
“In Search of an Understandable Consensus Algorithm” — Ongaro and Ousterhout (2014) The Raft paper. Designed to be readable. Succeeds. If you want to understand how consensus actually works, this is the best starting point.
“Dynamo: Amazon’s Highly Available Key-Value Store” — DeCandia et al. (2007) The paper that introduced the consistent hashing, vector clocks, and sloppy quorum approach that influenced an entire generation of distributed databases. Shows how real engineering trade-offs are made at scale.
“Spanner: Google’s Globally-Distributed Database” — Corbett et al. (2012) TrueTime, external consistency, global transactions. The engineering behind the most ambitious consistency guarantees in a geo-distributed system.
“MapReduce: Simplified Data Processing on Large Clusters” — Dean and Ghemawat (2004) The paper that started the big data era. Important not because MapReduce is the right answer anymore, but because it shows how to design for failure at scale.
“The Byzantine Generals Problem” — Lamport, Shostak, Pease (1982) The paper that defined the hardest fault model. Essential context for understanding Byzantine fault tolerant systems.
“Harvest, Yield, and Scalable Tolerant Systems” — Fox and Brewer (1999) A more practical take on availability trade-offs than the CAP theorem formalization. “Harvest vs. yield” is a useful framing.
CRDT papers — Shapiro et al., “Conflict-free Replicated Data Types” (2011) The formal treatment of CRDTs. More mathematically dense than the above, but worth it if you’re working with leaderless eventually-consistent systems.
The Books
“Designing Data-Intensive Applications” — Martin Kleppmann If this book is the introduction, Kleppmann’s book is the comprehensive treatment. It covers everything here in more depth, plus storage engines, batch processing, stream processing, and more. It’s dense and long and worth every page.
“Database Internals” — Alex Petrov Deep dive into how databases actually work: B-trees, LSM-trees, WAL, replication internals. If you want to understand why distributed databases make the choices they do, this gives you the foundation.
“Release It! Design and Deploy Production-Ready Software” — Michael Nygard The fault tolerance and operational patterns book. Circuit breakers, bulkheads, timeouts — Nygard wrote about these before they had universally recognized names. Practical and battle-tested.
“Building Microservices” — Sam Newman If you’re going down the microservices path, this is the most grounded treatment. Newman doesn’t oversell microservices; he’s honest about the costs.
“Software Engineering at Google” — Winters, Manshreck, Wright Not strictly distributed systems, but the chapter on reliability and the discussion of Google’s engineering practices around distributed systems are excellent. How do you actually run this stuff at scale?
The Courses and Talks
MIT 6.824: Distributed Systems (available online) The gold standard distributed systems course. The labs — implementing Raft, building a distributed key-value store — are genuinely educational. The lecture recordings are on YouTube.
Martin Kleppmann’s lectures (available on YouTube) Kleppmann teaches at Cambridge and has posted excellent lectures on distributed systems that expand on his book.
“Turning the database inside out” — Martin Kleppmann (Strange Loop 2015) A talk on event sourcing, CDC, and the stream processing model for data systems. Changes how you think about databases.
“How Kafka is tested” — Colin McCabe (2015) Understanding how Kafka tests for correctness under failure is instructive for how to think about your own systems.
The LADIS workshop proceedings Less accessible, but the workshop papers (“Large-Scale Distributed Systems and Middleware”) contain useful practical discussions of real-world distributed systems problems.
Tools Worth Understanding Deeply
etcd: The Raft implementation that runs Kubernetes. Understanding its operational model — leader election, watch semantics, lease management — is useful even if you’re not running Kubernetes.
Apache Kafka: The durable distributed log. Understanding its partition model, consumer groups, offsets, and exactly-once semantics is increasingly important as event-driven architecture becomes more common.
PostgreSQL: The most capable single-node database. Understanding its WAL, MVCC, replication, and logical decoding internals pays off repeatedly.
Redis: The Swiss army knife of operational data structures. Understanding its persistence models, cluster mode, and Sentinel vs Cluster architectures.
Jepsen: Kyle Kingsbury’s project for testing distributed systems for safety violations. The Jepsen analyses of popular databases are eye-opening — many systems that claimed strong guarantees violated them under partition. Reading through the analyses at jepsen.io is a masterclass in what actually goes wrong.
What to Do Next
Instrument your current system. Add structured logging, metrics for the four golden signals, and distributed traces if you have more than two services. You can’t improve what you can’t see.
Test a failover. Take your production database primary, deliberately fail it (in a staging environment first), and observe what happens. Does the replica promote cleanly? How long does it take? What do clients experience? Do this before you have to do it under pressure.
Read one paper. Pick the Raft paper or the Dynamo paper. Read it. The gap between “understanding a concept” and “reading the original source” is surprisingly large.
Audit your consistency assumptions. For each database read in your application, ask: could this be from a replica? If yes, what’s the maximum acceptable lag? Have you designed for the case where that lag is exceeded?
Find the race conditions. Pick one multi-step operation in your codebase. Draw out all the possible failure scenarios between steps. Which ones leave the system in a bad state? Which of those are handled? This exercise is uncomfortable but valuable.
A Final Thought
Distributed systems are hard because they deal with partial information, partial failure, and the impossibility of perfect coordination. These aren’t problems that engineering skill fully overcomes — they’re fundamental constraints of what computing over a network can and cannot guarantee.
The engineers who are good at distributed systems aren’t the ones who’ve found a way around these constraints. They’re the ones who’ve internalized them deeply enough to design systems that fail gracefully, make explicit trade-offs, and are honest about what can go wrong.
You’re already doing distributed systems. Now you’re doing it with better maps.
Good luck out there. Monitor your replication lag.