Skip to content

Commit

Permalink
REFACTOR: methods in MemcachedConnectio invoked with updateReplConnec…
Browse files Browse the repository at this point in the history
…tion.
  • Loading branch information
brido4125 committed Oct 14, 2024
1 parent 8b9d693 commit e699a66
Show file tree
Hide file tree
Showing 4 changed files with 134 additions and 64 deletions.
68 changes: 7 additions & 61 deletions src/main/java/net/spy/memcached/MemcachedConnection.java
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@
import java.nio.channels.SocketChannel;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
Expand Down Expand Up @@ -367,38 +366,6 @@ private void updateConnections(List<InetSocketAddress> addrs) throws IOException
}

/* ENABLE_REPLICATION if */
private Set<String> findChangedGroups(List<InetSocketAddress> addrs,
Collection<MemcachedNode> nodes) {
Map<String, InetSocketAddress> addrMap = new HashMap<>();
for (InetSocketAddress each : addrs) {
addrMap.put(each.toString(), each);
}

Set<String> changedGroupSet = new HashSet<>();
for (MemcachedNode node : nodes) {
String nodeAddr = ((InetSocketAddress) node.getSocketAddress()).toString();
if (addrMap.remove(nodeAddr) == null) { // removed node
changedGroupSet.add(node.getReplicaGroup().getGroupName());
}
}
for (String addr : addrMap.keySet()) { // newly added node
ArcusReplNodeAddress a = (ArcusReplNodeAddress) addrMap.get(addr);
changedGroupSet.add(a.getGroupName());
}
return changedGroupSet;
}

private List<InetSocketAddress> findAddrsOfChangedGroups(List<InetSocketAddress> addrs,
Set<String> changedGroups) {
List<InetSocketAddress> changedGroupAddrs = new ArrayList<>();
for (InetSocketAddress addr : addrs) {
if (changedGroups.contains(((ArcusReplNodeAddress) addr).getGroupName())) {
changedGroupAddrs.add(addr);
}
}
return changedGroupAddrs;
}

private void updateReplConnections(List<InetSocketAddress> addrs) throws IOException {
List<MemcachedNode> attachNodes = new ArrayList<>();
List<MemcachedNode> removeNodes = new ArrayList<>();
Expand All @@ -416,10 +383,11 @@ private void updateReplConnections(List<InetSocketAddress> addrs) throws IOExcep
* we find out the changed groups with the comparison of previous and current znode list,
* and update the state of groups based on them.
*/
Set<String> changedGroups = findChangedGroups(addrs, locator.getAll());
Set<String> changedGroups = MemcachedReplicaGroup.findChangedGroups(addrs, locator.getAll());

Map<String, List<ArcusReplNodeAddress>> newAllGroups =
ArcusReplNodeAddress.makeGroupAddrsList(findAddrsOfChangedGroups(addrs, changedGroups));
ArcusReplNodeAddress.makeGroupAddrsList(
MemcachedReplicaGroup.findAddrsOfChangedGroups(addrs, changedGroups));

// remove invalidated groups in changedGroups
for (Map.Entry<String, List<ArcusReplNodeAddress>> entry : newAllGroups.entrySet()) {
Expand Down Expand Up @@ -467,8 +435,10 @@ private void updateReplConnections(List<InetSocketAddress> addrs) throws IOExcep
assert oldMasterAddr != null : "invalid old rgroup";
assert newMasterAddr != null : "invalid new rgroup";

Set<ArcusReplNodeAddress> oldSlaveAddrs = getAddrsFromNodes(oldSlaveNodes);
Set<ArcusReplNodeAddress> newSlaveAddrs = getSlaveAddrsFromGroupAddrs(newGroupAddrs);
Set<ArcusReplNodeAddress> oldSlaveAddrs
= MemcachedReplicaGroup.getAddrsFromNodes(oldSlaveNodes);
Set<ArcusReplNodeAddress> newSlaveAddrs
= MemcachedReplicaGroup.getSlaveAddrsFromGroupAddrs(newGroupAddrs);

if (oldMasterAddr.isSameAddress(newMasterAddr)) {
// add newly added slave node
Expand Down Expand Up @@ -560,30 +530,6 @@ private void updateReplConnections(List<InetSocketAddress> addrs) throws IOExcep
// Remove the unavailable nodes.
handleNodesToRemove(removeNodes);
}

private Set<ArcusReplNodeAddress> getAddrsFromNodes(List<MemcachedNode> nodes) {
Set<ArcusReplNodeAddress> addrs = Collections.emptySet();
if (!nodes.isEmpty()) {
addrs = new HashSet<>((int) (nodes.size() / .75f) + 1);
for (MemcachedNode node : nodes) {
addrs.add((ArcusReplNodeAddress) node.getSocketAddress());
}
}
return addrs;
}

private Set<ArcusReplNodeAddress> getSlaveAddrsFromGroupAddrs(
List<ArcusReplNodeAddress> groupAddrs) {
Set<ArcusReplNodeAddress> slaveAddrs = Collections.emptySet();
int groupSize = groupAddrs.size();
if (groupSize > 1) {
slaveAddrs = new HashSet<>((int) ((groupSize - 1) / .75f) + 1);
for (int i = 1; i < groupSize; i++) {
slaveAddrs.add(groupAddrs.get(i));
}
}
return slaveAddrs;
}
/* ENABLE_REPLICATION end */

/* ENABLE_REPLICATION if */
Expand Down
48 changes: 48 additions & 0 deletions src/main/java/net/spy/memcached/MemcachedReplicaGroup.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,15 @@
/* ENABLE_REPLICATION if */
package net.spy.memcached;

import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;

import net.spy.memcached.compat.SpyObject;

Expand Down Expand Up @@ -190,5 +196,47 @@ private MemcachedNode getNextActiveSlaveNodeNoRotate() {
public static String getGroupNameFromNode(final MemcachedNode node) {
return ((ArcusReplNodeAddress) node.getSocketAddress()).getGroupName();
}

static Set<String> findChangedGroups(List<InetSocketAddress> update,
Collection<MemcachedNode> olds) {
Set<String> changedGroupSet = new HashSet<>();
Map<String, InetSocketAddress> addrMap = update.stream()
.collect(Collectors.toMap(InetSocketAddress::toString, addr -> addr));

for (MemcachedNode node : olds) {
if (addrMap.remove(node.getSocketAddress().toString()) == null) {
changedGroupSet.add(node.getReplicaGroup().getGroupName());
}
}

addrMap.values().stream()
.map(addr -> ((ArcusReplNodeAddress) addr).getGroupName())
.forEach(changedGroupSet::add);

return changedGroupSet;
}

static List<InetSocketAddress> findAddrsOfChangedGroups(List<InetSocketAddress> addrs,
Set<String> changedGroups) {
List<InetSocketAddress> changedGroupAddrs = new ArrayList<>();
addrs.stream()
.filter(addr -> changedGroups.contains(((ArcusReplNodeAddress) addr).getGroupName()))
.forEach(changedGroupAddrs::add);
return changedGroupAddrs;
}

static Set<ArcusReplNodeAddress> getAddrsFromNodes(List<MemcachedNode> nodes) {
return nodes.stream()
.map(node -> (ArcusReplNodeAddress) node.getSocketAddress())
.collect(Collectors.toSet());
}

static Set<ArcusReplNodeAddress> getSlaveAddrsFromGroupAddrs(
List<ArcusReplNodeAddress> groupAddrs) {
if (groupAddrs.size() <= 1) {
return Collections.emptySet();
}
return new HashSet<>(groupAddrs.subList(1, groupAddrs.size()));
}
}
/* ENABLE_REPLICATION end */
76 changes: 76 additions & 0 deletions src/test/java/net/spy/memcached/MemcachedReplicaGroupTest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
package net.spy.memcached;


import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;

import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;


class MemcachedReplicaGroupTest {

@Test
void findChangedGroupsTest() {
List<ArcusReplNodeAddress> g0 = createReplList("g0", "192.168.0.1");
List<ArcusReplNodeAddress> g1 = createReplList("g1", "192.168.0.2");
List<MemcachedNode> old = new ArrayList<>();
setReplGroup(g0, old);
setReplGroup(g1, old);

List<InetSocketAddress> update = new ArrayList<>(g0);

Set<String> changedGroups = MemcachedReplicaGroup.findChangedGroups(update, old);
Assertions.assertEquals(1, changedGroups.size());
Assertions.assertTrue(changedGroups.contains("g1"));
}

@Test
void findAddrsOfChangedGroupsTest() {
List<ArcusReplNodeAddress> g0 = createReplList("g0", "192.168.0.1");
List<ArcusReplNodeAddress> g1 = createReplList("g1", "192.168.0.2");
List<MemcachedNode> old = new ArrayList<>();
setReplGroup(g0, old);
setReplGroup(g1, old);

List<InetSocketAddress> update = new ArrayList<>();
update.addAll(g0.subList(0, 2));
update.addAll(g1.subList(0, 2));

Set<String> changedGroups = MemcachedReplicaGroup.findChangedGroups(update, old);
List<InetSocketAddress> result
= MemcachedReplicaGroup.findAddrsOfChangedGroups(update, changedGroups);

Assertions.assertEquals(4, result.size());
Assertions.assertTrue(result.contains(g0.get(0)));
Assertions.assertTrue(result.contains(g0.get(1)));
Assertions.assertTrue(result.contains(g1.get(0)));
Assertions.assertTrue(result.contains(g1.get(1)));
}

private void setReplGroup(List<ArcusReplNodeAddress> group, List<MemcachedNode> old) {
List<MockMemcachedNode> collect = group.stream()
.map(MockMemcachedNode::new)
.collect(Collectors.toList());
MemcachedReplicaGroupImpl impl = null;
for (MockMemcachedNode node : collect) {
if (impl == null) {
impl = new MemcachedReplicaGroupImpl(node);
} else {
node.setReplicaGroup(impl);
}
}
old.addAll(collect);
}

private List<ArcusReplNodeAddress> createReplList(String group, String ip) {
List<ArcusReplNodeAddress> replList = new ArrayList<>();
replList.add(ArcusReplNodeAddress.create(group, true, ip + ":" + 11211));
replList.add(ArcusReplNodeAddress.create(group, false, ip + ":" + (11211 + 1)));
replList.add(ArcusReplNodeAddress.create(group, false, ip + ":" + (11211 + 2)));
return replList;
}
}
6 changes: 3 additions & 3 deletions src/test/java/net/spy/memcached/MockMemcachedNode.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@

public class MockMemcachedNode implements MemcachedNode {
private final InetSocketAddress socketAddress;
private MemcachedReplicaGroup memcachedReplicaGroup;

public SocketAddress getSocketAddress() {
return socketAddress;
Expand Down Expand Up @@ -260,13 +261,12 @@ public String getOpQueueStatus() {

@Override
public void setReplicaGroup(MemcachedReplicaGroup g) {
// noop
this.memcachedReplicaGroup = g;
}

@Override
public MemcachedReplicaGroup getReplicaGroup() {
// noop
return null;
return memcachedReplicaGroup;
}

@Override
Expand Down

0 comments on commit e699a66

Please sign in to comment.