Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Move org.apache.zookeeper classes from common to adapter sub-project. #5

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
package com.pega.charlatan.broker.service;

import com.pega.charlatan.node.service.NodeService;
import com.pega.charlatan.server.session.bean.Session;
import com.pega.charlatan.server.session.dao.SessionDao;
import com.pega.charlatan.server.session.service.SessionServiceImpl;
import com.pega.charlatan.utils.NamedThreadFactory;
import org.apache.zookeeper.ZooKeeper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -22,15 +22,15 @@ public class BrokerMonitorService extends SessionServiceImpl{

private final ScheduledExecutorService brokerInfoUpdater;
private final ScheduledExecutorService brokersChecker;
private final NodeService nodeService;
private final ZooKeeper zooKeeper;
private int brokerId;
private Session session;
private long lastSeen;
private State state;

public BrokerMonitorService(Session session, SessionDao sessionDao, NodeService nodeService) {
public BrokerMonitorService(Session session, SessionDao sessionDao, ZooKeeper zooKeeper) {
super(sessionDao);
this.nodeService = nodeService;
this.zooKeeper = zooKeeper;
this.session = session;

this.brokerInfoUpdater = Executors.newSingleThreadScheduledExecutor(new NamedThreadFactory("BrokerMonitor"));
Expand All @@ -50,7 +50,7 @@ private void invalidateStaleBrokers(boolean includeThisBroker) {
} else {
logger.info(String.format("Found stale session %d, invalidating the session", staleSession.getSessionId()));

nodeService.removeEphemeralSessionNodes(staleSession.getSessionId());
zooKeeper.removeEphemeralSessionNodes(staleSession.getSessionId());

// Delete broker session info only after session ephemeral nodes are removed.
deleteSession(staleSession.getUuid());
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
package com.pega.charlatan.utils;



import com.pega.charlatan.node.bean.Node;
import com.pega.charlatan.node.service.NodeService;
import com.pega.charlatan.node.bean.NodeState;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.data.Stat;
import org.apache.zookeeper.data.ACL;
import org.apache.zookeeper.KeeperException;

import java.util.List;

/**
* NodeService wrapper that translates Zookeeper API to the Charlatan API
*/
public class ZookeeperNodeService {

private final NodeService nodeService;

public ZookeeperNodeService(NodeService nodeService) {
this.nodeService = nodeService;
}

public void close(long session) {
nodeService.close(session);
}

public String create(long session, String path, byte[] data, List<ACL> acl, CreateMode createMode) throws KeeperException {
try {
return nodeService.create(session, path, data, com.pega.charlatan.node.bean.CreateMode.fromFlag(createMode.toFlag()));
} catch (CharlatanException e) {
throw toKeeperException(e);
}
}

public void delete(String path, int version) throws KeeperException {
try {
nodeService.delete(path, version);
} catch (CharlatanException e) {
throw toKeeperException(e);
}
}

public List<String> getChildren(String path, Watcher watcher) throws KeeperException {
try {
Node node = nodeService.getNode(path, toCharlatanWatcher(watcher), com.pega.charlatan.watches.bean.Watcher.Type.Children);
return node.getChildren();
} catch (CharlatanException e) {
throw toKeeperException(e);
}
}

public byte[] getData(String path, Watcher watcher, Stat stat) throws KeeperException {
try {
Node node = nodeService.getNode(path, toCharlatanWatcher(watcher), com.pega.charlatan.watches.bean.Watcher.Type.Data);
loadStat(stat, node.getState());
return node.getData();
} catch (CharlatanException e) {
throw toKeeperException(e);
}
}

public Stat setData(String path, byte[] data, int version) throws KeeperException {
try {
NodeState nodeState = nodeService.setData(path,data, version);
return toStat(nodeState);
} catch (CharlatanException e) {
throw toKeeperException(e);
}
}

public Stat exists(String path, Watcher watcher) {
NodeState nodeState = nodeService.exists(path, toCharlatanWatcher(watcher));
return toStat(nodeState);
}

public void removeEphemeralSessionNodes(long session) {
nodeService.removeEphemeralSessionNodes(session);
}

public void registerWatch(Watcher watcher, List<String> dataWatches, List<String> childWatches, List<String> existWatches) {
nodeService.registerWatch(toCharlatanWatcher(watcher), dataWatches, childWatches, existWatches);
}


private Stat toStat(NodeState nodeState) {
Stat stat = new Stat();
loadStat(stat, nodeState);

return stat;
}

private void loadStat(Stat stat, NodeState nodeState) {
stat.setVersion(nodeState.getVersion());
stat.setAversion(nodeState.getAversion());
stat.setCversion(nodeState.getCversion());
stat.setCtime(nodeState.getCtime());
stat.setMtime(nodeState.getMtime());
stat.setCzxid(nodeState.getCzxid());
stat.setMzxid(nodeState.getMzxid());
stat.setPzxid(nodeState.getPzxid());
stat.setDataLength(nodeState.getDataLength());
stat.setEphemeralOwner(nodeState.getEphemeralOwner());
stat.setNumChildren(nodeState.getNumChildren());
}

private com.pega.charlatan.watches.bean.Watcher toCharlatanWatcher(final Watcher watcher) {
return new com.pega.charlatan.watches.bean.Watcher() {
@Override
public void process(com.pega.charlatan.watches.bean.WatchedEvent event) {
watcher.process(toWatchedEvent(event));
}
};
}

private WatchedEvent toWatchedEvent(com.pega.charlatan.watches.bean.WatchedEvent charlatanWatchedEvent) {
return new WatchedEvent(Watcher.Event.EventType.fromInt(charlatanWatchedEvent.getType().getCode()),
Watcher.Event.KeeperState.fromInt(charlatanWatchedEvent.getState().getCode()),
charlatanWatchedEvent.getPath());
}

private KeeperException toKeeperException(CharlatanException e) {
return KeeperException.create(e.code().intValue(), e.getPath());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ public int hashCode() {

/**
* A result from a setData operation. This kind of result provides access
* to the Stat structure from the update.
* to the NodeState structure from the update.
*/
public static class SetDataResult extends OpResult {
private Stat stat;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import com.pega.charlatan.node.service.NodeServiceImpl;
import com.pega.charlatan.server.session.bean.Session;
import com.pega.charlatan.utils.ZookeeperClassLoader;
import com.pega.charlatan.utils.ZookeeperNodeService;
import org.apache.zookeeper.data.ACL;
import org.apache.zookeeper.data.Stat;
import org.slf4j.Logger;
Expand All @@ -40,7 +41,7 @@ public class ZooKeeper implements ZookeeperInterface {

private static Logger logger = LoggerFactory.getLogger(ZooKeeper.class);
private final ExecutorService executor;
private NodeService nodeService;
private ZookeeperNodeService nodeService;
private BrokerMonitorService brokerMonitorService;
private long session;
private Watcher defaultWatcher;
Expand All @@ -63,10 +64,12 @@ public ZooKeeper(String connectString, int sessionTimeout, Watcher watcher) thro

brokerMonitorService = new BrokerMonitorService(session, ZookeeperClassLoader.getSessionDaoImpl(), this);

this.nodeService = new NodeServiceImpl(
NodeService ns = new NodeServiceImpl(
ZookeeperClassLoader.getNodeDao(),
ZookeeperClassLoader.getWatchService());

this.nodeService = new ZookeeperNodeService(ns);

executor.submit(new Runnable() {
@Override
public void run() {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,183 @@
package org.apache.zookeeper;

import org.apache.zookeeper.data.ACL;
import org.apache.zookeeper.data.Stat;

import java.util.List;

public interface ZookeeperInterface {

String create(String path, byte[] data, List<ACL> acl, CreateMode createMode) throws KeeperException;

void create(final String path, final byte[] data, final List<ACL> acl, final CreateMode createMode, final AsyncCallback.StringCallback cb, final Object ctx);

/**
* Create a node with the given path and data.
* <p>
* The createMode argument can also specify to create a sequential node. The
* actual path name of a sequential node will be the given path plus a
* suffix "i" where i is the current sequential number of the node. The sequence
* number is always fixed length of 10 digits, 0 padded. Once
* such a node is created, the sequential number will be incremented by one.
* <p>
* If a node with the same actual path already exists in the ZooKeeper, a
* KeeperException with error code KeeperException.NodeExists will be
* thrown.
* <p>
* An ephemeral node cannot have children. If the parent node of the given
* path is ephemeral, a KeeperException with error code
* KeeperException.NoChildrenForEphemerals will be thrown.
* <p>
* If the parent node does not exist in the ZooKeeper, a KeeperException
* with error code KeeperException.NoNode will be thrown.
* <p>
* This operation, if successful, will trigger all the watches left on the
* node of the given path by exists and getData API calls, and the watches
* left on the parent node by getChildren API calls.
* <p>
* If a node is created successfully, the ZooKeeper server will trigger the
* watches on the path left by exists calls, and the watches on the parent
* of the node by getChildren calls.
*
* @param path node path
* @param data node data
* @param acl node acl
* @param createMode node mode
* @return the actual path of the created node
*/
String create(long session, String path, byte[] data, List<ACL> acl, CreateMode createMode) throws KeeperException;

/**
* The asynchronous version of getChildren. Return the list of the children of the node of the given path.
*
* @param path
* @param watch
* @param cb
* @param ctx
*/
void getChildren(String path, boolean watch, AsyncCallback.ChildrenCallback cb, Object ctx);

List<String> getChildren(String path, boolean watch) throws KeeperException;

/**
* The asynchronous version of getData.
*/
void getData(String path, boolean watch, AsyncCallback.DataCallback cb, Object ctx);

byte[] getData(String path, boolean watch, Stat stat) throws KeeperException;

Stat exists(String path, boolean watch);

void setACL(String path, List<ACL> acl, int version, AsyncCallback.StatCallback cb, Object ctx);

public long getSessionId();

void close(long session);

/**
* Delete the node with the given path. The call will succeed if such a node
* exists, and the given version matches the node's version (if the given
* version is -1, it matches any node's versions).
* <p>
* A KeeperException with error code KeeperException.NoNode will be thrown
* if the nodes does not exist.
* <p>
* A KeeperException with error code KeeperException.BadVersion will be
* thrown if the given version does not match the node's version.
* <p>
* A KeeperException with error code KeeperException.NotEmpty will be thrown
* if the node has children.
* <p>
* This operation, if successful, will trigger all the watches on the node
* of the given path left by exists API calls, and the watches on the parent
* node left by getChildren API calls.
*
* @param path the path of the node to be deleted.
* @param version the expected node version.
* @throws KeeperException If the server signals an error with a non-zero
* return code.
*/
void delete(String path, int version) throws KeeperException;

/**
* Return the list of the children of the node of the given path.
* <p>
* If the watcher isn't null and the call is successful (no exception is thrown),
* a watch will be left on the node with the given path. The watch will be
* triggered by a successful operation that deletes the node of the given
* path or creates/delete a child under the node.
* <p>
* The list of children returned is not sorted and no guarantee is provided
* as to its natural or lexical order.
* <p>
* A KeeperException with error code KeeperException.NoNode will be thrown
* if no node with the given path exists.
*
* @param path
* @param watcher
* @return an unordered array of children of the node with the given path
* @throws KeeperException If the server signals an error with a non-zero error code.
*/
List<String> getChildren(String path, Watcher watcher) throws KeeperException;

/**
* Return the data and the stat of the node of the given path.
* <p>
* If the watch is true and the call is successful (no exception is
* thrown), a watch will be left on the node with the given path. The watch
* will be triggered by a successful operation that sets data on the node, or
* deletes the node.
* <p>
* A KeeperException with error code KeeperException.NoNode will be thrown
* if no node with the given path exists.
*
* @param path the given path
* @param watch whether need to watch this node
* @param stat the stat of the node
* @return the data of the node
* @throws KeeperException If the server signals an error with a non-zero error code
*/
byte[] getData(String path, Watcher watch, Stat stat) throws KeeperException;

/**
* Set the data for the node of the given path if such a node exists and the
* given version matches the version of the node (if the given version is
* -1, it matches any node's versions). Return the stat of the node.
* <p>
* This operation, if successful, will trigger all the watches on the node
* of the given path left by getData calls.
* <p>
* A KeeperException with error code KeeperException.NoNode will be thrown
* if no node with the given path exists.
* <p>
* A KeeperException with error code KeeperException.BadVersion will be
* thrown if the given version does not match the node's version.
*
* @param path the path of the node
* @param data the data to set
* @param version the expected matching version
* @return the state of the node
* @throws KeeperException If the server signals an error with a non-zero error code.
*/
Stat setData(String path, byte[] data, int version) throws KeeperException;

/**
* Return the stat of the node of the given path. Return null if no such a
* node exists.
* <p>
* If the watcher is set and the call is successful (no exception is thrown),
* a watch will be left on the node with the given path. The watch will be
* triggered by a successful operation that creates/delete the node or sets
* the data on the node.
*
* @param path the node path
* @param watcher whether need to watch this node
* @return the stat of the node of the given path; return null if no such a
* node exists.
*/
Stat exists(String path, Watcher watcher);

void removeEphemeralSessionNodes(long session);

void registerWatch(Watcher watcher, List<String> dataWatches, List<String> childWatches, List<String> existWatches);
}
Loading