Apache BookKeeper and Apache Pulsar

If you are familiar with messaging systems like Kafka and RocketMQ, you may know that services are typically closely related to storage in their architectures. Different from them, Apache Pulsar is designed with a two-layer architecture that separates storage from compute, which actually happens on its stateless brokers. Pulsar relies on Apache BookKeeper servers for persistent storage, also known as bookies. This blog focuses on the basics of BookKeeper and illustrates how it works to achieve high availability for the data it handles.

What is Apache BookKeeper

Originally developed at Yahoo, BookKeeper represents a reliable, high-performance storage system. It provides distributed, scalable storage services, featuring low latency and strong fault tolerance. These speak volumes about why it is capable of serving as Pulsar’s storage layer. BookKeeper stores data in ledgers, which are append-only and immutable. With a special replication protocol, BookKeeper stores log entries securely across multiple nodes in a concurrent way, which are highly available.

As the names suggest, you probably can tell what BookKeeper and ledgers are used for in a cloud-native environment. If not, just imagine a bookkeeper using ledgers to record all relevant account information to track the finances of a business.

Key Concepts in Apache BookKeeper

To gain a better understanding of how BookKeeper stores data, let’s first look at some basic concepts.

Bookie

Bookies are storage servers or nodes in BookKeeper, each of which is equivalent to one another. BookKeeper owes its strong dynamic scaling capability (especially in a containerized environment) to the leaderless design.

Ensemble

A collection of available bookies to store ledger entries, namely the nodes that entries are written to. When a bookie fails, the client that handles the writes to it will replace it with a new bookie, which is known as “ensemble change”. In Pulsar’s compute-storage separation architecture, your client application does not need to care about what is actually happening in the storage layer.

Ledger

Ledgers are the basic storage unit in BookKeeper, also referred to as segments in Pulsar. BookKeeper clients are responsible for creating and deleting ledgers, and they read entries from ledgers. When certain conditions are met (for example, the entry number or the lifespan reaches the preset threshold), a ledger will be closed, after which data can no longer be written to it and only reads are permitted. Any client can create a ledger. Ideally, the owner client that creates a ledger should be the one closing it. Once closed, a ledger is immutable. Ledgers are the smallest unit for deletion, which means you can only delete a ledger as a whole instead of deleting individual entries within the ledger.

Note: In exceptions, the original client that writes entries to a ledger may be suddenly disconnected, while that ledger is still open. When a new client takes over and starts to write data for the ledger, you will have a split-brain situation if the original client reconnects and continues to write data. However, BookKeeper has a fencing mechanism that allows us to avoid it. This topic requires more detailed discussions because it is very complicated as it happens, which is not covered in this introduction blog of BookKeeper.

In addition to ledgers used to store ordinary messages, BookKeeper also has a special kind of ledger, or the cursor ledger. Cursors provide a tracking mechanism for message consumption and acknowledgment in Pulsar. Each subscription has a cursor associated with it which stores the position information of message consumption and acknowledgment. Consumers may share the same cursor depending on the subscription type. We won’t get into the details of this topic here as it requires a separate blog to explain how it works. For now, we only need to know that Pulsar maintains a ledger in BookKeeper for each subscription. After a consumer has processed a message with an acknowledgment sent to the broker and the broker has received it, the broker then updates the cursor ledger accordingly. More specifically, Pulsar periodically aggregates all acknowledgment information of all consumers bound to the same subscription as an entry and writes it to bookies. This process is basically the same as writing ordinary messages.

Fragment

A fragment stores a continuous sequence of entries within a ledger on a bookie. A ledger can contain one or multiple fragments. As the smallest distribution unit in BookKeeper, fragments of an individual ledger can spread across different bookies. This means that technically, data stored on a single bookie are fragments of ledgers. For a single fragment, if writes to a bookie fail, a new bookie will be selected for the writes (that is, the above-mentioned “ensemble change”). As a result, a new fragment will be created on the bookie. Note that these two fragments belong to the same ledger, but different ensembles.

Entry

Entries contain the actual data written to a ledger. Each ledger may contain different numbers of entries. Each entry has an entry ID as its unique identifier in a ledger.

Message

Messages are the specific information stored in an entry. Messages can be divided into two types: single messages and batch messages.

A batch message is essentially a sequence of single messages. After you enable message batching on the client side, messages will be grouped as a whole before sent from producers to brokers. Brokers then call BookKeeper clients to write the batch message to books. When consumers read messages from brokers, brokers distribute the batch message to them. Note that batch messages are both combined and split on the client side. You can use parameters like batchingMaxPublishDelay (the time period within which messages to be sent will be batched) and batchingMaxMessages (the maximum number of messages in a batch message) to customize configurations of batch messages for a producer.

Note: Each entry may contain one or multiple messages. If message batching is disabled, one entry only stores one message.

Depending on your use case, you can choose to enable or disable message batching. Different from most distributed messaging systems, Pulsar supports both message queuing and streaming. In message queuing scenarios, high throughput is typically not a must, and message batching is thus seldom enabled on the client side. By contrast, messages are generated much faster in streaming cases, which necessitates batching so that producers can send messages as larger packages.

Data High Availability

To make sure data are highly available, BookKeeper adopts a quorum mechanism to write data to bookies concurrently. Specifically, we can define the following three key integers for a new ledger to be created:

  • Ensemble Size (E): The number of available books to write messages stored in a ledger.
  • Write Quorum (WQ): The copy of messages to be written, or the number of bookies to save an entry (or the fragment it belongs to) concurrently. WQ can be equal to or less than E but it cannot be less than 1 or AQ.
  • Ack Quorum (AQ): The number of acknowledgments brokers need to receive before a write can be considered successful. If this value is reached, brokers will send back the acknowledgment information to the client. Otherwise, the write fails.

These three parameters can be configured at broker, namespace, and topic levels respectively. At the broker level, for example, edit the broker.conf file in the conf directory of the Pulsar package and set the desired values:

# Number of bookies to use when creating a ledger
managedLedgerDefaultEnsembleSize=2
# Number of copies to store for each message
managedLedgerDefaultWriteQuorum=2
# Number of guaranteed copies (acks to wait before a write is complete)
managedLedgerDefaultAckQuorum=2

Note: We can configure the above parameters based on our demand for consistency. For example, WQ = AQ means the highest level of consistency. In this case, a message is successfully persisted only after the acknowledgments of all its copies are received. When AQ is relatively small (for example, AQ = 1), you will have lower latency but this comes with greater data loss risks. Generally, it is a good idea to set AQ ≥ (WQ + 1)/2.

In the image below, for example, we set E = 5, WQ = 3, and AQ = 2. It means:

  • 5 books are available to store the data in a ledger.
  • 3 copies of an entry are stored on 3 different books.
  • When acknowledgments from 2 bookies are received, it means an entry is written successfully.

Data are written to bookies using round-robin distribution. It is designed this way as it can take full advantage of the bandwidth of all bookies in the pool. As shown below, Entry 1 is written to Bookie 1, Bookie 2, and Bookie 3, Entry 2 is written to Bookie 2, Bookie 3, and Bookie 4, and so on.

The round-robin approach provides an easy way to quickly find out the books that store copies of a specific entry through the entry ID. For example, we know that Entry 2 is replicated on Bookie 2, Bookie 3, and Bookie 4. 2 mod 5 equals 2, which means for a given Entry X, if X mod 5 equals 2, Entry X is also stored on Bookie 2 , Bookie 3, and Bookie 4, such as Entry 7 (X=7).

Using Apache BookKeeper

When you install a Pulsar cluster, BookKeeper will be together. If you want to run BookKeeper, you do not need to download its code separately. Instead, download Pulsar and run the ./pulsar-daemon start bookie command in the bin directory. If you use the Helm provided by the Pulsar community to install it on a Kubernetes cluster, bookie Pods are chart in a StatefulSet.

Summary

This blog introduces BookKeeper basics and examines how data are written to bookies to achieve high availability. We can consider BookKeeper simply as a database as it does not contain any business logic. With great scalability, strong fault tolerance, and low latency, this open-source tool features what it requires to be a qualified cloud-native storage system, providing solid support for Pulsar’s unique architecture.

.

Leave a Comment