Regardless of the replication mechanism you must fsync() your data to prevent global data loss in non-Byzantine protocols.

Why `fsync()`: Losing unsynced data on a single node leads to global data loss

Since the beginning of Redpanda’s development, our aim has always been to build a faster, safer streaming data platform that can support mission-critical applications without fail. This goal has influenced many of our design choices:

  • Choosing Raft replication protocol over Apache Kafka®’s ISR replication protocol since Raft is widely studied in academia, has multiple reference industrial scale implementations, and a strong consistency model with well-defined failure modes.

  • Rejecting a tx protocol with higher throughput but affected by KAFKA-14402 and opting for writing explicit begin transaction markers (released in v22.3.1).

  • Engaging with Jepsen, a company that provides auditing services for distributed systems, to independently test Redpanda for data loss and other consistency violations.

  • Running 11 hours long chaos-consistency testing every single day.

  • Always using fsync before acknowledging a client’s request when safety is requested with acks=all.

In this post, we focus on the last point and demonstrate—both in theory and in practice—that losing unsynced data on even a single node (!) is enough to cause global data loss in a replicated system.

What is fsync?

By default, the disk write API is asynchronous. When an application uses the operating system’s API to write data to disk, the OS copies the data and may confirm the write request without waiting for the data to reach the disk. This behavior improves latency and throughput, but it reduces safety.

Although this behavior is safer than buffering the data on the application level, it’s not completely consistent. The OS copies the data, so even if the application crashes, the data remains intact and the OS completes the write. However, if the machine loses power or the OS crashes, recently written data can still be lost.

With fsync, the application asks the OS to return control flow only when all recently written data has reached the disk. This ensures that the application works synchronously with the disk, guaranteeing that all data is written to disk before continuing with the program execution.

What is replication for?

Replication is a technique used to improve the availability and durability of an application’s data. When an application or its data is hosted on a single node, if that node goes down, the application becomes unavailable and the data becomes inaccessible. Replication solves this problem by storing the data on different nodes, keeping it in sync, and making it accessible for reading and writing even in the presence of network partitioning and crashes (i.e., fail-stop faults).

Replication ensures that data is consistent across all nodes and that it is as consistent as it would be on a single node (linearizability). This means that even if one node fails, the application and its data remain available, and users can continue to access and use the application without interruption.

Can replication reduce the safety risks of running a system without fsync?

Node crashes are the main vulnerability of running a system without fsync. However, consistent replication can withstand node crashes without compromising data consistency. So it appears that replication resolves this vulnerability and makes fsyncs unnecessary for replicated systems.

Of course, data loss can still occur if all nodes lose power or experience simultaneous OS crashes. However, it is possible to minimize the likelihood of this event e.g. by using availability zones, making things feel safe.

The argument presented above is a common misunderstanding. Even the loss of power on a single node, resulting in local data loss of unsynchronized data, can lead to silent global data loss in a replicated system that does not use fsync, regardless of the replication protocol in use.

Note: The caveat is that most replication protocols only tolerate fail-stop faults, which means that while nodes may crash, they must have the same state (data) upon restart as they did at the moment of the crash.

Running a system without fsync removes node crashes from the category of fail-stop faults, and we cannot use replication to justify the absence of fsync.

Can a replication protocol support faults beyond fail-stop and tolerate the lack of fsync?

Yes, Byzantine faults are a broader class of faults that assume nodes can exhibit any type of adverse behavior, including the loss of unsynchronized writes. The “Reaching Agreement in the Presence of Faults” paper shows that to tolerate n Byzantine faults, a system must be replicated to 3n+1 nodes.

However, setting a replication factor of four in Redpanda or Kafka is not enough to protect against a single Byzantine fault. A system must use cutting-edge Byzantine fault-tolerant (BFT) replication protocols, which neither of these systems currently employ.

We did not choose a BFT protocol due to its complexity, maturity and performance characteristics. It takes time to mature a research protocol for industry use. For example, Paxos, the first consistent replication protocol, required a decade of distributed system research to support essential industry features, such as the ability to replace crashed nodes.

The loss of unsynchronized data is only a small subset of Byzantine faults. Is there a protocol that covers this subset and provides the same level of guarantees as Raft, with 2n+1 nodes instead of 3n+1?

It’s a good computer science question, and I don’t know the answer in general, but with n=1, it’s impossible.

Proof of impossibility

The Raft/Paxos replication protocols are designed with fault tolerance in mind. As long as a majority of the nodes are available to elect a leader (2 RTTs), the system can guarantee availability and consistency. However, if a node is allowed to lose unsynced data, this invariant cannot be maintained in any replication protocol. Let’s assume the opposite and prove this by contradiction.

Consider a replicated system consisting of three nodes ({A,B,C}) with RF=3. Suppose node {A} gets isolated from a client and the peers, and the client writes sequential records {1,2,3,4,5} to the replicated system. As soon as the client receives an ack, the following happens:

  • Node {B} gets isolated from the peers

  • Node {C} crashes, loses unsynced data, and restarts only with {1,2,3} data

  • Node {A} gets reconnected with node {C} and the client

We assumed that after a short period of time (2 RTTs), the cluster should become available to serve reads and writes. However, nodes {A,C} don’t have {4,5} suffixes, so if they do become available, there will be data loss. This contradicts the system’s guarantee of consistency and proves the impossibility.

In conclusion, it is impossible for a replicated system to tolerate the loss of unsynced data and maintain the same level of consistency and availability as the Raft/Paxos protocols with RF=3.

Counter-example

A counterexample can sometimes be more convincing than proof, so we decided to reproduce the data loss. Apache Kafka doesn’t use fsync by default, so we chose to use it to demonstrate the possibility of global data loss caused by local data loss on a single node.

Clone the example repo

git clone https://github.com/redpanda-data/kafka-fsync
cd kafka-fsync

Build a container with a locally-deployed Kafka cluster (three Kafka processes and one Apache ZooKeeper™ process)

docker build -t kafka-fsync .

Start the container and log into it

docker run -d --name kafka_fsync -v $(pwd):/fsync kafka-fsync
docker exec -it kafka_fsync /bin/bash

Create data directories

cd /fsync
./create.dirs.sh

Start the ZooKeeper process

/root/apache-zookeeper-3.8.1-bin/bin/zkServer.sh --config . start

Start three Kafka processes

nohup /root/kafka_2.12-3.4.0/bin/kafka-server-start.sh kafka1/server.properties >> /fsync/kafka1/kafka.log 2>&1 & echo $! > /fsync/kafka1/pid &
nohup /root/kafka_2.12-3.4.0/bin/kafka-server-start.sh kafka2/server.properties >> /fsync/kafka2/kafka.log 2>&1 & echo $! > /fsync/kafka2/pid &
nohup /root/kafka_2.12-3.4.0/bin/kafka-server-start.sh kafka3/server.properties >> /fsync/kafka3/kafka.log 2>&1 & echo $! > /fsync/kafka3/pid &

Create topic1 with RF=3

/root/kafka_2.12-3.4.0/bin/kafka-topics.sh --create --topic topic1 --partitions 1 --replication-factor 3 --bootstrap-server 127.0.0.1:9092,127.0.0.1:9093,127.0.0.1:9094

First, let’s focus on isolating the kafka1 process. While there are various methods such as using docker compose and manipulating iptables, for the sake of simplicity, we opt to terminate the kafka1 process (network partitions and process crashes are indistinguishable to an observer). Furthermore, as the operating system remains unaffected, kafka1 has not encountered any local data loss.

cat kafka1/pid | xargs kill -9

Write ten records with acks=all

python3 write10.py

Output:

wrote key0=value0 at offset=0
wrote key1=value1 at offset=1
...
wrote key8=value8 at offset=8
wrote key9=value9 at offset=9

Let's figure out which node is the leader

/root/kafka_2.12-3.4.0/bin/kafka-topics.sh --describe --topic topic1 --bootstrap-server 127.0.0.1:9092,127.0.0.1:9093,127.0.0.1:9094

On my machine it was kafka3. Now, let’s proceed with isolating ZooKeeper to temporarily “freeze” leadership on the third node. Similar to our approach with kafka1, we will simulate this by terminating the ZooKeeper process.

cat zookeeper/zookeeper_server.pid | xargs kill -9

Now let's simulate the effects of the following events:

  • kafka2 got isolated

  • kafka3 crashes & loses log suffix

  • ZooKeeper, kafka3 & kafka1 restore connectivity and form a cluster

We terminate kafka2 to simulate node isolation and terminate kafka3 to simulate the crash:

cat kafka2/pid kafka3/pid | xargs kill -9

Simply terminating a process is not sufficient to induce local data loss of the unsynchronized data. Therefore, to simulate this, we will manually remove the last ten bytes using the truncate command.

truncate -s -10 kafka3/data/topic1-0/00000000000000000000.log

Restore Zookeeper connectivity

/root/apache-zookeeper-3.8.1-bin/bin/zkServer.sh --config . start

Let's give it a minute to remove ephemeral info. Then start the former leader (kafka3, in my case)

nohup /root/kafka_2.12-3.4.0/bin/kafka-server-start.sh kafka3/server.properties >> /fsync/kafka3/kafka.log 2>&1 & echo $! > /fsync/kafka3/pid &

Wait until it becomes a leader

/root/kafka_2.12-3.4.0/bin/kafka-topics.sh --describe --topic topic1 --bootstrap-server 127.0.0.1:9092,127.0.0.1:9093,127.0.0.1:9094

Then restore connectivity of kafka1 process

nohup /root/kafka_2.12-3.4.0/bin/kafka-server-start.sh kafka1/server.properties >> /fsync/kafka1/kafka.log 2>&1 & echo $! > /fsync/kafka1/pid &

Again, let's wait until two nodes ISR form

/root/kafka_2.12-3.4.0/bin/kafka-topics.sh --describe --topic topic1 --bootstrap-server 127.0.0.1:9092,127.0.0.1:9093,127.0.0.1:9094

We got an operational Kafka cluster, let's write another ten records

python3 write10.py

Ideally, we should see

wrote key0=value0 at offset=10
wrote key1=value1 at offset=11
...
wrote key8=value8 at offset=18
wrote key9=value9 at offset=19

But what we actually see is

wrote key0=value0 at offset=9
wrote key1=value1 at offset=10
...
wrote key8=value8 at offset=17
wrote key9=value9 at offset=18

So, by causing local data loss on a single node (it may happen without the fsync) we caused global data loss and Kafka lost record key9=value9 at offset=9.

Conclusion

The use of fsync is essential for ensuring data consistency and durability in a replicated system. The post highlights the common misconception that replication alone can eliminate the need for fsync and demonstrates that the loss of unsynchronized data on a single node still can cause global data loss in a replicated non-Byzantine system.

To learn more about Redpanda, check the documentation and browse the Redpanda blog for tutorials. If you have questions or want to chat with the team and fellow Redpanda users, join the Redpanda Community on Slack.

Let's keep in touch

Subscribe and never miss another blog post, announcement, or community event. We hate spam and will never sell your contact information.