Skip to content

Commit

Permalink
[fix][test] Fix multiple thread leaks in tests, part 3 (apache#21543)
Browse files Browse the repository at this point in the history
  • Loading branch information
lhotari authored and Nikolai Borisov committed Nov 13, 2023
1 parent faddf4d commit 5ac660a
Show file tree
Hide file tree
Showing 21 changed files with 345 additions and 219 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

import lombok.Cleanup;
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.client.AsyncCallback;
import org.apache.bookkeeper.client.BookKeeper;
Expand Down Expand Up @@ -130,6 +131,7 @@ private void setup() {

LedgerHandle ledgerHandle = mock(LedgerHandle.class);
LedgerHandle newLedgerHandle = mock(LedgerHandle.class);
@Cleanup("shutdownNow")
OrderedExecutor executor = OrderedExecutor.newBuilder().name("Test").build();
given(bookKeeper.getMainWorkerPool()).willReturn(executor);
doAnswer(inv -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,10 @@ public String getAuthMethodName() {

@Override
public void close() throws IOException {
if (jaasCredentialsContainer != null) {
jaasCredentialsContainer.close();
jaasCredentialsContainer = null;
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import com.google.common.collect.ImmutableSet;
import java.io.File;
import java.io.FileWriter;
import java.io.IOException;
import java.lang.reflect.Field;
import java.net.URI;
import java.nio.file.Files;
Expand All @@ -43,6 +44,7 @@
import javax.security.auth.login.Configuration;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import lombok.Cleanup;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.io.FileUtils;
import org.apache.pulsar.client.admin.PulsarAdmin;
Expand Down Expand Up @@ -77,7 +79,7 @@ public class SaslAuthenticateTest extends ProducerConsumerBase {
private static Properties properties;

private static String localHostname = "localhost";
private static Authentication authSasl;
private Authentication authSasl;

@BeforeClass
public static void startMiniKdc() throws Exception {
Expand Down Expand Up @@ -146,17 +148,11 @@ public static void startMiniKdc() throws Exception {
System.setProperty("java.security.krb5.conf", krb5file.getAbsolutePath());
Configuration.getConfiguration().refresh();

// Client config
Map<String, String> clientSaslConfig = new HashMap<>();
clientSaslConfig.put("saslJaasClientSectionName", "PulsarClient");
clientSaslConfig.put("serverType", "broker");
log.info("set client jaas section name: PulsarClient");
authSasl = AuthenticationFactory.create(AuthenticationSasl.class.getName(), clientSaslConfig);
log.info("created AuthenticationSasl");

}

@AfterClass(alwaysRun = true)
public static void stopMiniKdc() {
public static void stopMiniKdc() throws IOException {
System.clearProperty("java.security.auth.login.config");
System.clearProperty("java.security.krb5.conf");
if (kdc != null) {
Expand All @@ -175,6 +171,14 @@ protected void setup() throws Exception {
// use http lookup to verify HttpClient works well.
isTcpLookup = false;

// Client config
Map<String, String> clientSaslConfig = new HashMap<>();
clientSaslConfig.put("saslJaasClientSectionName", "PulsarClient");
clientSaslConfig.put("serverType", "broker");
log.info("set client jaas section name: PulsarClient");
authSasl = AuthenticationFactory.create(AuthenticationSasl.class.getName(), clientSaslConfig);
log.info("created AuthenticationSasl");

conf.setAdvertisedAddress(localHostname);
conf.setAuthenticationEnabled(true);
conf.setSaslJaasClientAllowedIds(".*" + "client" + ".*");
Expand All @@ -187,9 +191,6 @@ protected void setup() throws Exception {
conf.setAuthenticationProviders(providers);
conf.setClusterName("test");
conf.setSuperUserRoles(ImmutableSet.of("client" + "@" + kdc.getRealm()));
Map<String, String> clientSaslConfig = new HashMap<>();
clientSaslConfig.put("saslJaasClientSectionName", "PulsarClient");
clientSaslConfig.put("serverType", "broker");
conf.setBrokerClientAuthenticationPlugin(AuthenticationSasl.class.getName());
conf.setBrokerClientAuthenticationParameters(ObjectMapperFactory
.getMapper().getObjectMapper().writeValueAsString(clientSaslConfig));
Expand Down Expand Up @@ -307,8 +308,11 @@ public void testSaslServerAndClientAuth() throws Exception {

@Test
public void testSaslOnlyAuthFirstStage() throws Exception {
AuthenticationProviderSasl saslServer = (AuthenticationProviderSasl) pulsar.getBrokerService()
.getAuthenticationService().getAuthenticationProvider(SaslConstants.AUTH_METHOD_NAME);
@Cleanup
AuthenticationProviderSasl saslServer = new AuthenticationProviderSasl();
// The cache expiration time is set to 50ms. Residual auth info should be cleaned up
conf.setInflightSaslContextExpiryMs(50);
saslServer.initialize(conf);

HttpServletRequest servletRequest = mock(HttpServletRequest.class);
doReturn("Init").when(servletRequest).getHeader("State");
Expand All @@ -325,9 +329,6 @@ public void testSaslOnlyAuthFirstStage() throws Exception {
field.setAccessible(true);
Cache<Long, AuthenticationState> cache = (Cache<Long, AuthenticationState>) field.get(saslServer);
assertEquals(cache.asMap().size(), 10);
// The cache expiration time is set to 1ms. Residual auth info should be cleaned up
conf.setInflightSaslContextExpiryMs(1);
saslServer.initialize(conf);
// Add more auth info into memory
for (int i = 0; i < 10; i++) {
AuthenticationDataProvider dataProvider = authSasl.getAuthData("localhost");
Expand All @@ -339,22 +340,22 @@ public void testSaslOnlyAuthFirstStage() throws Exception {
}
long start = System.currentTimeMillis();
while (true) {
if (System.currentTimeMillis() - start > 10_00) {
if (System.currentTimeMillis() - start > 1000) {
fail();
}
cache = (Cache<Long, AuthenticationState>) field.get(saslServer);
// Residual auth info should be cleaned up
if (CollectionUtils.hasElements(cache.asMap())) {
break;
}
Thread.yield();
Thread.sleep(5);
}
}

@Test
public void testMaxInflightContext() throws Exception {
AuthenticationProviderSasl saslServer = (AuthenticationProviderSasl) pulsar.getBrokerService()
.getAuthenticationService().getAuthenticationProvider(SaslConstants.AUTH_METHOD_NAME);
@Cleanup
AuthenticationProviderSasl saslServer = new AuthenticationProviderSasl();
HttpServletRequest servletRequest = mock(HttpServletRequest.class);
doReturn("Init").when(servletRequest).getHeader("State");
conf.setInflightSaslContextExpiryMs(Integer.MAX_VALUE);
Expand All @@ -375,5 +376,4 @@ public void testMaxInflightContext() throws Exception {
//only 1 context was left in the memory
assertEquals(cache.asMap().size(), 1);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import java.util.List;
import javax.net.ssl.SSLContext;
import javax.net.ssl.SSLHandshakeException;
import lombok.Cleanup;
import lombok.extern.slf4j.Slf4j;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.config.RegistryBuilder;
Expand All @@ -47,6 +48,7 @@ public class JettySslContextFactoryTest {

@Test
public void testJettyTlsServerTls() throws Exception {
@Cleanup("stop")
Server server = new Server();
List<ServerConnector> connectors = new ArrayList<>();
SslContextFactory factory = JettySslContextFactory.createServerSslContext(
Expand All @@ -72,15 +74,15 @@ public void testJettyTlsServerTls() throws Exception {
new SSLConnectionSocketFactory(getClientSslContext(), new NoopHostnameVerifier()));
PoolingHttpClientConnectionManager cm = new PoolingHttpClientConnectionManager(registryBuilder.build());
httpClientBuilder.setConnectionManager(cm);
@Cleanup
CloseableHttpClient httpClient = httpClientBuilder.build();
HttpGet httpGet = new HttpGet("https://localhost:" + connector.getLocalPort());
httpClient.execute(httpGet);
httpClient.close();
server.stop();
}

@Test(expectedExceptions = SSLHandshakeException.class)
public void testJettyTlsServerInvalidTlsProtocol() throws Exception {
@Cleanup("stop")
Server server = new Server();
List<ServerConnector> connectors = new ArrayList<>();
SslContextFactory factory = JettySslContextFactory.createServerSslContext(
Expand Down Expand Up @@ -110,15 +112,15 @@ public void testJettyTlsServerInvalidTlsProtocol() throws Exception {
new String[]{"TLSv1.2"}, null, new NoopHostnameVerifier()));
PoolingHttpClientConnectionManager cm = new PoolingHttpClientConnectionManager(registryBuilder.build());
httpClientBuilder.setConnectionManager(cm);
@Cleanup
CloseableHttpClient httpClient = httpClientBuilder.build();
HttpGet httpGet = new HttpGet("https://localhost:" + connector.getLocalPort());
httpClient.execute(httpGet);
httpClient.close();
server.stop();
}

@Test(expectedExceptions = SSLHandshakeException.class)
public void testJettyTlsServerInvalidCipher() throws Exception {
@Cleanup("stop")
Server server = new Server();
List<ServerConnector> connectors = new ArrayList<>();
SslContextFactory factory = JettySslContextFactory.createServerSslContext(
Expand Down Expand Up @@ -154,11 +156,10 @@ public void testJettyTlsServerInvalidCipher() throws Exception {
new NoopHostnameVerifier()));
PoolingHttpClientConnectionManager cm = new PoolingHttpClientConnectionManager(registryBuilder.build());
httpClientBuilder.setConnectionManager(cm);
@Cleanup
CloseableHttpClient httpClient = httpClientBuilder.build();
HttpGet httpGet = new HttpGet("https://localhost:" + connector.getLocalPort());
httpClient.execute(httpGet);
httpClient.close();
server.stop();
}

private static SSLContext getClientSslContext() throws GeneralSecurityException, IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import javax.net.ssl.SSLContext;
import javax.net.ssl.SSLHandshakeException;
import javax.net.ssl.TrustManagerFactory;
import lombok.Cleanup;
import lombok.extern.slf4j.Slf4j;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.config.RegistryBuilder;
Expand Down Expand Up @@ -62,6 +63,7 @@ public class JettySslContextFactoryWithKeyStoreTest {

@Test
public void testJettyTlsServerTls() throws Exception {
@Cleanup("stop")
Server server = new Server();
List<ServerConnector> connectors = new ArrayList<>();
SslContextFactory.Server factory = JettySslContextFactory.createServerSslContextWithKeystore(null,
Expand All @@ -81,16 +83,16 @@ public void testJettyTlsServerTls() throws Exception {
new SSLConnectionSocketFactory(getClientSslContext(), new NoopHostnameVerifier()));
PoolingHttpClientConnectionManager cm = new PoolingHttpClientConnectionManager(registryBuilder.build());
httpClientBuilder.setConnectionManager(cm);
@Cleanup
CloseableHttpClient httpClient = httpClientBuilder.build();
HttpGet httpGet = new HttpGet("https://localhost:" + connector.getLocalPort());
httpClient.execute(httpGet);
httpClient.close();
server.stop();
}

@Test(expectedExceptions = SSLHandshakeException.class)
public void testJettyTlsServerInvalidTlsProtocol() throws Exception {
Configurator.setRootLevel(Level.INFO);
@Cleanup("stop")
Server server = new Server();
List<ServerConnector> connectors = new ArrayList<>();
SslContextFactory.Server factory = JettySslContextFactory.createServerSslContextWithKeystore(null,
Expand All @@ -114,15 +116,15 @@ public void testJettyTlsServerInvalidTlsProtocol() throws Exception {
new String[]{"TLSv1.2"}, null, new NoopHostnameVerifier()));
PoolingHttpClientConnectionManager cm = new PoolingHttpClientConnectionManager(registryBuilder.build());
httpClientBuilder.setConnectionManager(cm);
@Cleanup
CloseableHttpClient httpClient = httpClientBuilder.build();
HttpGet httpGet = new HttpGet("https://localhost:" + connector.getLocalPort());
httpClient.execute(httpGet);
httpClient.close();
server.stop();
}

@Test(expectedExceptions = SSLHandshakeException.class)
public void testJettyTlsServerInvalidCipher() throws Exception {
@Cleanup("stop")
Server server = new Server();
List<ServerConnector> connectors = new ArrayList<>();
SslContextFactory.Server factory = JettySslContextFactory.createServerSslContextWithKeystore(null,
Expand Down Expand Up @@ -151,11 +153,10 @@ public void testJettyTlsServerInvalidCipher() throws Exception {
new NoopHostnameVerifier()));
PoolingHttpClientConnectionManager cm = new PoolingHttpClientConnectionManager(registryBuilder.build());
httpClientBuilder.setConnectionManager(cm);
@Cleanup
CloseableHttpClient httpClient = httpClientBuilder.build();
HttpGet httpGet = new HttpGet("https://localhost:" + connector.getLocalPort());
httpClient.execute(httpGet);
httpClient.close();
server.stop();
}

private static SSLContext getClientSslContext() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -768,6 +768,7 @@ public void testConcurrentBatchMessageAck(BatcherBuilder builder) throws Excepti
final Consumer<byte[]> myConsumer = pulsarClient.newConsumer().topic(topicName)
.subscriptionName(subscriptionName).subscriptionType(SubscriptionType.Shared).subscribe();
// assertEquals(dispatcher.getTotalUnackedMessages(), 1);
@Cleanup("shutdownNow")
ExecutorService executor = Executors.newFixedThreadPool(10);

final CountDownLatch latch = new CountDownLatch(numMsgs);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,12 @@ public void start() throws PulsarClientException {
public void close() throws IOException {
if (client != null) {
client.close();
client = null;
}
if (jaasCredentialsContainer != null) {
jaasCredentialsContainer.close();
jaasCredentialsContainer = null;
initializedJAAS = false;
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ private long getRefreshTime(KerberosTicket tgt) {
@Override
public void run() {
log.info("TGT refresh thread started.");
while (true) {
while (!Thread.currentThread().isInterrupted()) {
// renewal thread's main loop. if it exits from here, thread will exit.
KerberosTicket tgt = getTGT();
long now = System.currentTimeMillis();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import java.lang.reflect.Field;
import java.lang.reflect.Method;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
Expand All @@ -37,6 +38,7 @@
import java.util.concurrent.atomic.AtomicReference;
import lombok.Getter;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.client.api.ClientBuilder;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
Expand All @@ -60,10 +62,14 @@
import org.awaitility.Awaitility;
import org.jetbrains.annotations.NotNull;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;

@Slf4j
public class JavaInstanceRunnableTest {
private final List<AutoCloseable> closeables = new ArrayList<>();

static class IntegerSerDe implements SerDe<Integer> {
@Override
Expand Down Expand Up @@ -113,8 +119,10 @@ private JavaInstanceRunnable createRunnable(SourceSpec sourceSpec,
.build();
InstanceConfig config = createInstanceConfig(functionDetails);
config.setClusterName("test-cluster");
PulsarClient pulsarClient = PulsarClient.builder().serviceUrl("pulsar://test-cluster:6650").build();
registerCloseable(pulsarClient);
return new JavaInstanceRunnable(config, clientBuilder,
PulsarClient.builder().serviceUrl("pulsar://test-cluster:6650").build(), null, null, null, null, null,
pulsarClient, null, null, null, null, null,
Thread.currentThread().getContextClassLoader(), null);
}

Expand Down Expand Up @@ -493,4 +501,29 @@ public void testFatalTheInstance(FailComponentType failComponentType) throws Exc
Assert.assertFalse((boolean) getPrivateField(javaInstanceRunnable, "isInitialized"));
});
}

@AfterClass
public void cleanupInstanceCache() {
InstanceCache.shutdown();
}

@AfterMethod(alwaysRun = true)
public void cleanupCloseables() {
callCloseables(closeables);
}

protected <T extends AutoCloseable> T registerCloseable(T closeable) {
closeables.add(closeable);
return closeable;
}

private static void callCloseables(List<AutoCloseable> closeables) {
for (int i = closeables.size() - 1; i >= 0; i--) {
try {
closeables.get(i).close();
} catch (Exception e) {
log.error("Failure in calling close method", e);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ public void add(Event<Integer> event) {

@AfterMethod(alwaysRun = true)
public void tearDown() {
// waterMarkEventGenerator.shutdown();
waterMarkEventGenerator.shutdown();
eventList.clear();
}

Expand Down
Loading

0 comments on commit 5ac660a

Please sign in to comment.