Skip to content

Commit

Permalink
Add HA test
Browse files Browse the repository at this point in the history
  • Loading branch information
poorna committed Sep 12, 2016
1 parent 32d96fd commit aeb1250
Show file tree
Hide file tree
Showing 4 changed files with 130 additions and 38 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
package org.apache.tephra.distributed;

import com.google.common.util.concurrent.Service;
import com.google.common.util.concurrent.SettableFuture;
import com.google.inject.AbstractModule;
import com.google.inject.Guice;
import com.google.inject.Inject;
Expand All @@ -28,7 +27,6 @@
import com.google.inject.Scopes;
import com.google.inject.util.Modules;
import org.apache.hadoop.conf.Configuration;
import org.apache.tephra.ThriftTransactionSystemTest;
import org.apache.tephra.Transaction;
import org.apache.tephra.TransactionSystemClient;
import org.apache.tephra.TxConstants;
Expand All @@ -46,9 +44,6 @@
import org.apache.twill.discovery.DiscoveryService;
import org.apache.twill.internal.zookeeper.InMemoryZKServer;
import org.apache.twill.zookeeper.ZKClientService;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
Expand Down Expand Up @@ -179,7 +174,7 @@ public void run() {
TimeUnit.SECONDS.sleep(1);

// Expire zookeeper session, which causes Thrift server to stop.
expireZkSession(zkClientService);
Tests.expireZkSession(zkClientService);
waitForThriftStop();

// Stop Zookeeper client so that it does not re-connect to Zookeeper and start Thrift server again.
Expand All @@ -203,7 +198,7 @@ public void testThriftServerRestart() throws Exception {
txClient.commit(tx);

// Expire zookeeper session, which causes Thrift server to stop running.
expireZkSession(zkClientService);
Tests.expireZkSession(zkClientService);
waitForThriftStop();

// wait for the thrift rpc server to be in running state again
Expand All @@ -221,27 +216,6 @@ public Boolean call() throws Exception {
txClient.commit(tx);
}

private void expireZkSession(ZKClientService zkClientService) throws Exception {
ZooKeeper zooKeeper = zkClientService.getZooKeeperSupplier().get();
final SettableFuture<?> connectFuture = SettableFuture.create();
Watcher watcher = new Watcher() {
@Override
public void process(WatchedEvent event) {
if (event.getState() == Event.KeeperState.SyncConnected) {
connectFuture.set(null);
}
}
};

// Create another Zookeeper session with the same sessionId so that the original one expires.
ZooKeeper dupZookeeper =
new ZooKeeper(zkClientService.getConnectString(), zooKeeper.getSessionTimeout(), watcher,
zooKeeper.getSessionId(), zooKeeper.getSessionPasswd());
connectFuture.get(30, TimeUnit.SECONDS);
Assert.assertEquals("Failed to re-create current session", dupZookeeper.getState(), ZooKeeper.States.CONNECTED);
dupZookeeper.close();
}

private void waitForThriftStop() throws Exception {
Tests.waitFor("Failed to wait for txService to stop", new Callable<Boolean>() {
@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.tephra.distributed;

import com.google.common.util.concurrent.Service;
import com.google.inject.AbstractModule;
import com.google.inject.Guice;
import com.google.inject.Injector;
import org.apache.tephra.TransactionSystemClient;
import org.apache.tephra.runtime.ConfigModule;
import org.apache.tephra.runtime.DiscoveryModules;
import org.apache.tephra.runtime.TransactionClientModule;
import org.apache.tephra.runtime.TransactionModules;
import org.apache.tephra.util.Tests;
import org.apache.twill.zookeeper.ZKClient;
import org.apache.twill.zookeeper.ZKClientService;
import org.junit.AfterClass;
import org.junit.BeforeClass;

import java.util.concurrent.Callable;

/**
* Test HA behavior of Transaction Service
*/
public class ThriftTransactionSystemHATest extends ThriftTransactionSystemTest {

@BeforeClass
public static void start() throws Exception {
// Start tx service
ThriftTransactionSystemTest.start();

// Expire zk session to make tx service follower
Tests.expireZkSession(zkClientService);
Tests.waitFor("Failed to wait for txService to stop", new Callable<Boolean>() {
@Override
public Boolean call() throws Exception {
return Service.State.RUNNING != txService.thriftRPCServerState();
}
});

// wait for the thrift rpc server to be in running state again
Tests.waitFor("Failed to wait for txService to be running.", new Callable<Boolean>() {
@Override
public Boolean call() throws Exception {
return Service.State.RUNNING == txService.thriftRPCServerState();
}
});

// we need to get a new txClient, because the old one will no longer work after the thrift server restart
Injector injector = Guice.createInjector(
new ConfigModule(conf),
new AbstractModule() {
@Override
protected void configure() {
// Instead of using ZKClientModule that will create new instance of ZKClient, we create instance
// binding to reuse the same ZKClient used for leader election
bind(ZKClient.class).toInstance(zkClientService);
bind(ZKClientService.class).toInstance(zkClientService);
}
},
new DiscoveryModules().getDistributedModules(),
new TransactionModules().getDistributedModules(),
new TransactionClientModule()
);
txClient = injector.getInstance(TransactionSystemClient.class);
Tests.waitForTxReady(txClient);
}

@AfterClass
public static void stop() throws Exception {
ThriftTransactionSystemTest.stop();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,15 @@
* limitations under the License.
*/

package org.apache.tephra;
package org.apache.tephra.distributed;

import com.google.inject.Guice;
import com.google.inject.Injector;
import org.apache.hadoop.conf.Configuration;
import org.apache.tephra.distributed.TransactionService;
import org.apache.tephra.TestTransactionManagerProvider;
import org.apache.tephra.TransactionSystemClient;
import org.apache.tephra.TransactionSystemTest;
import org.apache.tephra.TxConstants;
import org.apache.tephra.persist.TransactionStateStorage;
import org.apache.tephra.runtime.ConfigModule;
import org.apache.tephra.runtime.DiscoveryModules;
Expand All @@ -45,10 +48,10 @@ public class ThriftTransactionSystemTest extends TransactionSystemTest {
private static final Logger LOG = LoggerFactory.getLogger(ThriftTransactionSystemTest.class);

private static InMemoryZKServer zkServer;
private static ZKClientService zkClientService;
private static TransactionService txService;
private static TransactionStateStorage storage;
private static TransactionSystemClient txClient;
static ZKClientService zkClientService;
static TransactionService txService;
static TransactionSystemClient txClient;
static Configuration conf;

@ClassRule
public static TemporaryFolder tmpFolder = new TemporaryFolder();
Expand All @@ -58,7 +61,7 @@ public static void start() throws Exception {
zkServer = InMemoryZKServer.builder().setDataDir(tmpFolder.newFolder()).build();
zkServer.startAndWait();

Configuration conf = new Configuration();
conf = new Configuration();
conf.setBoolean(TxConstants.Manager.CFG_DO_PERSIST, false);
conf.set(TxConstants.Service.CFG_DATA_TX_ZOOKEEPER_QUORUM, zkServer.getConnectionStr());
conf.set(TxConstants.Service.CFG_DATA_TX_CLIENT_RETRY_STRATEGY, "n-times");
Expand Down Expand Up @@ -90,8 +93,6 @@ public static void start() throws Exception {
}

Tests.waitForTxReady(txClient);
Assert.assertNotNull(txService.getTransactionManager());
storage = txService.getTransactionManager().getTransactionStateStorage();
}

@Before
Expand All @@ -113,6 +114,7 @@ protected TransactionSystemClient getClient() throws Exception {

@Override
protected TransactionStateStorage getStateStorage() throws Exception {
return storage;
Assert.assertNotNull(txService.getTransactionManager());
return txService.getTransactionManager().getTransactionStateStorage();
}
}
26 changes: 26 additions & 0 deletions tephra-core/src/test/java/org/apache/tephra/util/Tests.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,13 @@

package org.apache.tephra.util;

import com.google.common.util.concurrent.SettableFuture;
import org.apache.tephra.TransactionSystemClient;
import org.apache.tephra.TxConstants;
import org.apache.twill.zookeeper.ZKClientService;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
import org.junit.Assert;

import java.util.concurrent.Callable;
Expand Down Expand Up @@ -56,4 +61,25 @@ public Boolean call() throws Exception {
}
});
}

public static void expireZkSession(ZKClientService zkClientService) throws Exception {
ZooKeeper zooKeeper = zkClientService.getZooKeeperSupplier().get();
final SettableFuture<?> connectFuture = SettableFuture.create();
Watcher watcher = new Watcher() {
@Override
public void process(WatchedEvent event) {
if (event.getState() == Event.KeeperState.SyncConnected) {
connectFuture.set(null);
}
}
};

// Create another Zookeeper session with the same sessionId so that the original one expires.
ZooKeeper dupZookeeper =
new ZooKeeper(zkClientService.getConnectString(), zooKeeper.getSessionTimeout(), watcher,
zooKeeper.getSessionId(), zooKeeper.getSessionPasswd());
connectFuture.get(30, TimeUnit.SECONDS);
Assert.assertEquals("Failed to re-create current session", dupZookeeper.getState(), ZooKeeper.States.CONNECTED);
dupZookeeper.close();
}
}

0 comments on commit aeb1250

Please sign in to comment.