Build your own Redis (3/5) - Redis Replication
Following our exploration of Redis's persistence mechanisms, we now dive into one of Redis's most critical architectural features: replication. Replication enables data redundancy, read scaling, and high availability by maintaining copies of data across multiple Redis instances. Let's explore how Redis achieves this through sophisticated master-replica synchronization.
Imagine you're building a distributed system that needs to serve thousands of read requests while maintaining data consistency across multiple servers. You face a fundamental challenge: how do you ensure that multiple copies of your data stay synchronized without sacrificing performance? This is where Redis's elegant replication architecture comes into play.
Redis uses a master-replica replication model where one master instance can have multiple replica instances. Replicas connect to the master and receive a continuous stream of updates, maintaining exact copies of the master's data. This approach provides both read scaling (clients can read from replicas) and data redundancy (replicas can take over if the master fails).
Replication Handshake
The replication process begins with an intricate handshake sequence between replica and master. Think of it as a formal introduction where both parties establish their capabilities and agree on communication protocols.
The handshake flow involves these key steps:

Here's the handshake implementation:
async def __ping_master_node(self, master_address: str) -> bool:
master_host, master_port = master_address.split(" ")
try:
reader, writer = await asyncio.open_connection(
master_host, int(master_port)
)
# Send PING
response = await send_command(
RESPArray(value=[RESPBulkString(value="PING")])
)
if not response or response != b"+PONG\r\n":
return False
# Send REPLCONF listening-port
replconf_listening_port = RESPArray(value=[
RESPBulkString(value="REPLCONF"),
RESPBulkString(value="listening-port"),
RESPBulkString(value=str(self.port)),
])
response = await send_command(replconf_listening_port)
if not response or response != b"+OK\r\n":
return False
# Send REPLCONF capa psync2
replconf_capa_psync2 = RESPArray(value=[
RESPBulkString(value="REPLCONF"),
RESPBulkString(value="capa"),
RESPBulkString(value="psync2"),
])
response = await send_command(replconf_capa_psync2)
if not response or response != b"+OK\r\n":
return False
# Send PSYNC command
psync_command = RESPArray(value=[
RESPBulkString(value="PSYNC"),
RESPBulkString(value="?"),
RESPBulkString(value="-1"),
])
writer.write(psync_command.serialize())
await writer.drain()
return True
except Exception as e:
print(f"Connection error: {e}")
return False
PSYNC: Partial Synchronization
Redis supports two synchronization modes: full synchronization and partial synchronization. The PSYNC command is the key to this flexibility.
- Full Synchronization (
PSYNC ? -1): The replica requests a complete snapshot of the master's data. The master responds with a FULLRESYNC message followed by the RDB file containing all data. - Partial Synchronization (
PSYNC <replid> <offset>): The replica requests only the changes since a specific replication ID and offset. This is much more efficient for replicas that have been disconnected temporarily.
Here's how we handle PSYNC responses:
case RedisCommand.PSYNC:
repl_id = data.value[1].value
repl_offset = int(data.value[2].value)
if repl_id != "?" or repl_offset != -1:
raise NotImplementedError(
"Only PSYNC with ? and -1 is supported"
)
# Send current dataset to replica
rdb_content = self.__data_store.dump_to_rdb()
await self.__send_data(
writer,
RESPSimpleString(
value=f"FULLRESYNC {self.__repl_info.master_replid} {self.__repl_info.master_repl_offset}"
),
)
await self.__send_data(
writer, RESPBytesLength(value=len(rdb_content))
)
await self.__send_data(writer, rdb_content)
Command Propagation
Once the initial synchronization is complete, the master continuously propagates write commands to all connected replicas. This ensures that replicas stay up-to-date with the master's data changes.
The propagation mechanism follows this flow:

The propagation implementation:
async def __propagate_to_slaves(self, data: RESPObject):
failed_replicas = []
for replica_writer in list(self.__replica_acks.keys()):
if not replica_writer.is_closing():
success = await self.__send_data(replica_writer, data)
if not success:
failed_replicas.append(replica_writer)
else:
failed_replicas.append(replica_writer)
# Clean up failed replicas
for failed_writer in failed_replicas:
self.__replica_acks.pop(failed_writer, None)
try:
failed_writer.close()
await failed_writer.wait_closed()
except Exception as e:
print(f"Error closing failed replica connection: {e}")
Replication Offsets and Acknowledgments
Redis maintains replication offsets to track the progress of each replica. The master assigns a unique offset to every write command, and replicas acknowledge the offsets they've received and processed. This mechanism enables:
- Replication Lag Detection: Master can identify lagging replicas
- Partial Resynchronization: Replicas can request only missing commands
- Wait Command: Clients can wait for specific replicas to acknowledge writes
The offset tracking system works like this:

The WAIT command provides configurable durability guarantees:
case RedisCommand.WAIT:
num_replicas = int(data.value[1].value)
timeout_ms = int(data.value[2].value)
client_offset = self.__repl_info.master_repl_offset
start_time = time.monotonic()
while True:
# Check how many replicas have acknowledged our offset
active_replicas = [
(w, r, o) for w, (r, o) in self.__replica_acks.items()
if not w.is_closing()
]
acked_replicas = sum(
1 for _, _, offset in active_replicas
if offset >= client_offset
)
if acked_replicas >= num_replicas:
return RESPInteger(value=acked_replicas)
# Handle timeout
if timeout_ms > 0:
elapsed_ms = (time.monotonic() - start_time) * 1000
if elapsed_ms >= timeout_ms:
return RESPInteger(value=acked_replicas)
# Request acknowledgments from lagging replicas
for replica_writer, replica_reader, current_offset in active_replicas:
if current_offset < client_offset:
ack_cmd = RESPArray(value=[
RESPBulkString(value="REPLCONF"),
RESPBulkString(value="GETACK"),
RESPBulkString(value="*"),
])
await self.__send_data(replica_writer, ack_cmd)
await asyncio.sleep(0.1)
REPLCONF: Replication Configuration
The REPLCONF command manages various replication configuration options:
- listening-port: Replica announces its listening port
- capa: Replica announces capabilities (like psync2)
- ack: Replica acknowledges processed commands
- getack: Master requests acknowledgment status=
Here's the REPLCONF implementation:
case RedisCommand.REPLCONF:
attr = data.value[1].value.lower()
if self.__repl_info.role == RedisReplicationRole.MASTER:
if attr.lower() == "listening-port":
# Register new replica
self.__replica_acks[writer] = (reader, 0)
print(f"New connected replica: {writer.get_extra_info('peername')}")
elif attr.lower() == "ack":
# Replica acknowledging commands
self.__replica_acks[writer] = (
reader,
int(data.value[2].value),
)
elif attr.lower() == "capa":
# Replica capability announcement
if data.value[2].value.lower() == "psync2":
pass # Handle PSYNC2 capability
await self.__send_data(writer, RESPSimpleString(value="OK"))
else:
# Replica handling
if attr.lower() == "getack":
await self.__send_data(
writer,
RESPArray(value=[
RESPBulkString(value="REPLCONF"),
RESPBulkString(value="ACK"),
RESPBulkString(value=str(self.__repl_ack_offset)),
]),
)
Master-Replica Roles
Redis instances can operate in three roles:
- Master: Accepts writes, propagates to replicas
- Replica/Slave: Receives updates from master, serves reads
- Standalone: No replication configured
The role determination happens at startup based on configuration:
class RedisReplicationRole(StrEnum):
MASTER = "master"
SLAVE = "slave"
REPLICA = "replica"
# In RedisServer.__init__
self.__repl_info = RedisReplicationInformation(
role=RedisReplicationRole.MASTER
if replicaof is None
else RedisReplicationRole.SLAVE,
connected_slaves=0,
master_replid=self.__generate_master_replid(),
master_repl_offset=0,
# ... other fields
)
Continuous Synchronization
After initial synchronization, the master continuously streams write commands to replicas. Replicas process these commands in the same order as the master, ensuring consistency.
The replica's message handler processes incoming commands:
async def handle_master_message(
self, reader: asyncio.StreamReader, writer: asyncio.StreamWriter
):
try:
while True:
data = await reader.read(1024)
if not data:
break
commands = self.resp_parser.parse(data)
for cmd in commands:
if isinstance(cmd, RESPSimpleString) and cmd.value == "PING":
self.__repl_ack_offset += len(data)
elif cmd.type == RESPObjectType.ARRAY:
await self.handle_command(reader, writer, cmd, True)
elif cmd.type == RESPObjectType.BULK_BYTES:
# Handle RDB data during initial sync
pass
except Exception as e:
print(f"Error handling master message: {str(e)}")
finally:
print("Closing connection to master")
writer.close()
await writer.wait_closed()
Replication Information
Redis provides detailed replication information through the INFO command, showing the current replication state, connected replicas, and synchronization offsets:
@dataclass
class RedisReplicationInformation:
role: RedisReplicationRole
connected_slaves: int
master_replid: str
master_repl_offset: int
second_repl_offset: int
repl_backlog_active: bool
repl_backlog_size: int
repl_backlog_first_byte_offset: int
repl_backlog_histlen: int
def serialize(self) -> RESPBulkString:
return RESPBulkString(
value=(
f"role:{self.role}\n"
f"connected_slaves:{self.connected_slaves}\n"
f"master_replid:{self.master_replid}\n"
f"master_repl_offset:{self.master_repl_offset}\n"
f"second_repl_offset:{self.second_repl_offset}\n"
f"repl_backlog_active:{1 if self.repl_backlog_active else 0}\n"
f"repl_backlog_size:{self.repl_backlog_size}\n"
f"repl_backlog_first_byte_offset:{self.repl_backlog_first_byte_offset}\n"
f"repl_backlog_histlen:{self.repl_backlog_histlen}\n"
)
)
Fault Tolerance and Recovery
Redis replication includes robust fault handling mechanisms:
- Automatic Reconnection: Replicas automatically reconnect to masters
- Partial Resynchronization: Efficiently sync only missing data
- Replica Promotion: Replicas can be promoted to master during failover
- Backlog Management: Masters maintain replication backlogs for partial resync
The fault tolerance flow:

Performance Considerations
Redis replication is designed for high performance:
- Asynchronous Replication: Write commands propagate asynchronously
- Non-blocking Operations: Replication doesn't block master operations
- Efficient Serialization: Uses RESP protocol for command transmission
- Selective Replication: Only write commands are replicated
Lessons
Redis's replication architecture offers valuable insights into distributed systems design:
- Handshake Protocol: The multi-stage handshake ensures compatibility and capability negotiation between master and replicas.
- Offset-Based Tracking: Using offsets instead of timestamps provides precise tracking of replication progress and enables efficient partial resynchronization.
- Asynchronous Design: Non-blocking replication ensures that master performance remains high even with many replicas.
- Flexible Durability: The WAIT command allows applications to choose their preferred balance between performance and durability guarantees.
In the next part, we'll explore Redis's data structures, examining how Redis implements efficient in-memory data structures like strings, lists, sets, and streams, and how these structures enable the rich command set that makes Redis so versatile.