ZooKeeper Example

The following code excerpt shows how to use ZooKeeper to implement a "barrier." A barrier separates a process into two logical halves. Multiple machines running in coordination with one another will all perform the first half of the process. No machine can begin the second half of the process until everyone has completed the first half. The barrier sits between these processes. As nodes reach the barrier, they all wait until everyone has reached the barrier. Then all nodes are released to begin the second half. A distributed barrier implementation written for ZooKeeper follows:

Watcher watcher = new Watcher() {
public void process(WatchEvent event) {}
};

ZooKeeper zk = new ZooKeeper(hosts, 3000, watcher);

Object notifyObject = new Object();
String root;
int size;

Barrier(ZooKeeper zk, String name, int size) throws KeeperException, InterruptedException {
this.zk = zk;
this.root = name;
this.size = size;
// Make sure the barrier node exists
try {
zk.create(root, new byte[0], Ids.OPEN_ACL_UNSAFE, 0);
} catch (NodeExistsException e) {}
}


b.enter()
/** work with everyone **/
b.leave()



/**
* Join barrier
* @return
* @throws KeeperException
* @throws InterruptedException */
boolean enter() throws KeeperException, InterruptedException {
zk.create(root + "/" + name, new byte[0], Ids.OPEN_ACL_UNSAFE, CreateFlags.EPHEMERAL);
while (true) {
synchronized (notifyObject) {
ArrayList list = zk.getChildren(root, new Watcher() {
public void process(Event e) { notifyObject.notifyAll(); }
});

if (list.size() < size) {
notifyObject.wait();
} else {
return true;
}
}
}
}

/**
* Wait until all reach barrier
* @return
* @throws KeeperException
* @throws InterruptedException */
boolean leave() throws KeeperException, InterruptedException {
zk.delete(root + "/" + name, 0);
while (true) {
synchronized (notifyObject) {
ArrayList list = zk.getChildren(root, new Watcher() {
public void process(Event e) { notifyObject.notifyAll(); }
});

if (list.size() > 0) {
notifyObject.wait();
} else {
return true;
}
}
}
}

Distributed Consensus

A reasonable question is how the ZooKeeper service can function across multiple nodes and remain synchronized. If distributed synchronization is why your services must use ZooKeeper, how does ZooKeeper itself bootstrap this capability?

ZooKeeper implements a distributed consensus protocol. ZooKeeper internally uses a leader election protocol such as Paxos to determine which node in the ZooKeeper service is the master. While clients connect to any node in the ZooKeeper service, these additional nodes will forward agreed-upon facts back to clients. Updates to the shared state require the intervention of the master. All updates to the shared state are ordered with timestamps. These timestamped updates are then disseminated to the nodes in the ZooKeeper service. When a majority of nodes acknowledge an update, it is said to be held by a quorum of the nodes. Any fact that a quorum has agreed upon may be returned to clients. Conversely, any updates that have not reached a quorum will not be returned to the clients. The timestamps are used to order the updates to elements of the data store. If multiple updates are made to the state of an individual node, the newest update is used.

The use of a quorum ensures that the service always returns consistent answers. Because a vote is effectively held before returning a response, any nodes which hold stale data will be outvoted by the nodes with more current information. This also makes ZooKeeper resilient to failure. Up to 49% of the ZooKeeper service nodes can shut down or become desynchronized before ZooKeeper loses its ability to authoritatively answer responses. So if 11 nodes run ZooKeeper, up to 5 of these may disconnect without incident. After more than half the nodes fail, ZooKeeper will refuse service until the machines are restored.

If the node of the ZooKeeper cluster which was elected leader fails, then a new leader election will be held and the cluster will continue to function.

The reason for electing a leader in such a system is to ensure that timestamps assigned to updates are only issued by a single authority. ZooKeeper is designed to reduce or eliminate possible race conditions in distributed applications.

One consequence of ZooKeeper's design is that it is intended to serve many more read requests than writes. A ZooKeeper cluster can handle hundreds or thousands of clients, issuing many tens of thousands of requests per second--if the majority of these requests (90--99%) are reads. Only a small fraction should be updates.

ZooKeeper Applications

ZooKeeper can be used for a variety of distributed coordination tasks. In addition to leader election, system bootstrapping, and various types of locks (mutual exclusion, reader/writer, etc), other synchronization primitives such as barriers, producer/consumer queues, priority queues, and multi-phase commit operations can be encoded in ZooKeeper. The ZooKeeper tutorial and recipes pages describe how to implement these algorithms. ZooKeeper itself is implemented in Java, but provides APIs for both Java- and C-based programs.

ZooKeeper can also be used as a central message board for an application. Individual nodes of a distributed system can store their current operational status in ZooKeeper for easy central reporting. The ZooKeeper service can also be used to form sub-groups of nodes or other hierarchical arrangements within a distributed system.

As mentioned, data stored in ZooKeeper is accessed by manipulating the nodes in the data hierarchy. This is done in a manner similar to file system access. But ZooKeeper does not implement the POSIX file system API. On the other hand, it also adds a set of other primitives not ordinarily found in a file system. Nodes can be opened with a number of special flags. One such flag is "ephemeral," meaning that the node disappears when the client who opened it disconnects. Another such flag is "sequence," which means that ZooKeeper will append a sequential id number to the node name you are trying to create. These id numbers are handed out in order, and the same id number is not reused. ZooKeeper does not provide exclusive locks on nodes directly, but a lock can be created by careful use of the ephemeral and sequence flags. The ZooKeeper recipes wiki page describes how to implement global locks using these flags. It also describes protocols for implementing shared (reader-writer) and revocable locks.

Data Storage

Several ZooKeeper daemons can be started on different machines. Clients can connect to any daemon in the cluster; the clients will always see the same view of the ZooKeeper world regardless of which daemon they connect to. User data is stored in objects with a hierarchical addressing system similar to that used by a conventional file system. It has a root object named /, and additional nodes can be extended off of this in a tree-like fashion. Each node of the tree can both hold data (i.e., act like a file) and have child nodes (i.e., act like a directory). The amount of data that can be stored in an object is small: there is a hard cap of 1 MB. The reason for the cap is so that the entire data store can be stored in the RAM of the ZooKeeper machines.

This allows requests to be dispatched with high throughput. Changes are written to disk to provide permanence, but read requests are entirely handled by the data cached in memory. This is usually not a major limitation; the data stored at a node is intended to be used as a pointer. For example, ZooKeeper may know about the filename in another conventional file system, which contains the authoritative configuration file for a distributed system. The distributed system components first contact ZooKeeper to get the definitive filename, and then fetch that file for the configuration.

Introduction

Suppose you are building a large-scale distributed system. Several different services need to be brought online and must discover one another. It is not guaranteed that each service will have a fixed master IP address. For example, it may be the case that you start the same service on 100 nodes, and they elect a master from among whichever of the 100 boots first. Each of these disparate services must communicate with each other. How do all the nodes of each service find the master IP address of each other service? How do all the nodes in a single service agree on which one of them becomes the master?

ZooKeeper is a service designed to handle all of these problems. ZooKeeper will allow you to store small amounts of information in a central location. It provides coordinated access to this information. Most importantly, it provides high-availability: the ZooKeeper service is intended to run on a set of several machines, which prevents the loss of individual nodes from bringing down the cluster. But these nodes communicate information in a careful way, ensuring that all nodes in the ZooKeeper cluster provide the same consistent answer for a query, regardless of which ZooKeeper server you contact.