Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[fix][test] Fix multiple thread leaks in tests #21460

Merged
merged 13 commits into from
Oct 30, 2023
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
import java.util.concurrent.CompletableFuture;
import javax.naming.AuthenticationException;
import javax.net.ssl.SSLSession;
import okhttp3.OkHttpClient;
import org.apache.commons.lang.StringUtils;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
Expand Down Expand Up @@ -145,6 +146,7 @@ public class AuthenticationProviderOpenID implements AuthenticationProvider {

// The list of audiences that are allowed to connect to this broker. A valid JWT must contain one of the audiences.
private String[] allowedAudiences;
private ApiClient k8sApiClient;

@Override
public void initialize(ServiceConfiguration config) throws IOException {
Expand Down Expand Up @@ -178,8 +180,7 @@ public void initialize(ServiceConfiguration config) throws IOException {
.setSslContext(sslContext)
.build();
httpClient = new DefaultAsyncHttpClient(clientConfig);
ApiClient k8sApiClient =
fallbackDiscoveryMode != FallbackDiscoveryMode.DISABLED ? Config.defaultClient() : null;
k8sApiClient = fallbackDiscoveryMode != FallbackDiscoveryMode.DISABLED ? Config.defaultClient() : null;
this.openIDProviderMetadataCache = new OpenIDProviderMetadataCache(config, httpClient, k8sApiClient);
this.jwksCache = new JwksCache(config, httpClient, k8sApiClient);
}
Expand Down Expand Up @@ -361,7 +362,17 @@ public AuthenticationState newAuthState(AuthData authData, SocketAddress remoteA

@Override
public void close() throws IOException {
httpClient.close();
if (httpClient != null) {
httpClient.close();
}
if (k8sApiClient != null) {
OkHttpClient okHttpClient = k8sApiClient.getHttpClient();
okHttpClient.dispatcher().executorService().shutdown();
okHttpClient.connectionPool().evictAll();
if (okHttpClient.cache() != null) {
okHttpClient.cache().close();
}
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -257,7 +257,8 @@ void beforeClass() throws IOException {
}

@AfterClass
void afterClass() {
void afterClass() throws IOException {
provider.close();
server.stop();
}

Expand Down Expand Up @@ -340,6 +341,7 @@ public void testKidCacheMissWhenRefreshConfigZero() throws Exception {
props.setProperty(AuthenticationProviderOpenID.ALLOWED_AUDIENCES, "allowed-audience");
props.setProperty(AuthenticationProviderOpenID.ALLOWED_TOKEN_ISSUERS, issuerWithMissingKid);

@Cleanup
AuthenticationProviderOpenID provider = new AuthenticationProviderOpenID();
provider.initialize(conf);

Expand All @@ -360,6 +362,7 @@ public void testKidCacheMissWhenRefreshConfigLongerThanDelta() throws Exception
props.setProperty(AuthenticationProviderOpenID.ALLOWED_AUDIENCES, "allowed-audience");
props.setProperty(AuthenticationProviderOpenID.ALLOWED_TOKEN_ISSUERS, issuerWithMissingKid);

@Cleanup
AuthenticationProviderOpenID provider = new AuthenticationProviderOpenID();
provider.initialize(conf);

Expand Down Expand Up @@ -387,6 +390,7 @@ public void testKubernetesApiServerAsDiscoverTrustedIssuerSuccess() throws Excep
// Test requires that k8sIssuer is not in the allowed token issuers
props.setProperty(AuthenticationProviderOpenID.ALLOWED_TOKEN_ISSUERS, "");

@Cleanup
AuthenticationProviderOpenID provider = new AuthenticationProviderOpenID();
provider.initialize(conf);

Expand Down Expand Up @@ -420,6 +424,7 @@ public void testKubernetesApiServerAsDiscoverTrustedIssuerFailsDueToMismatchedIs
props.setProperty(AuthenticationProviderOpenID.FALLBACK_DISCOVERY_MODE, "KUBERNETES_DISCOVER_TRUSTED_ISSUER");
props.setProperty(AuthenticationProviderOpenID.ALLOWED_TOKEN_ISSUERS, "");

@Cleanup
AuthenticationProviderOpenID provider = new AuthenticationProviderOpenID();
provider.initialize(conf);

Expand All @@ -446,6 +451,7 @@ public void testKubernetesApiServerAsDiscoverPublicKeySuccess() throws Exception
// Test requires that k8sIssuer is not in the allowed token issuers
props.setProperty(AuthenticationProviderOpenID.ALLOWED_TOKEN_ISSUERS, "");

@Cleanup
AuthenticationProviderOpenID provider = new AuthenticationProviderOpenID();
provider.initialize(conf);

Expand Down Expand Up @@ -476,6 +482,7 @@ public void testKubernetesApiServerAsDiscoverPublicKeyFailsDueToMismatchedIssuer
props.setProperty(AuthenticationProviderOpenID.FALLBACK_DISCOVERY_MODE, "KUBERNETES_DISCOVER_PUBLIC_KEYS");
props.setProperty(AuthenticationProviderOpenID.ALLOWED_TOKEN_ISSUERS, "");

@Cleanup
AuthenticationProviderOpenID provider = new AuthenticationProviderOpenID();
provider.initialize(conf);

Expand Down Expand Up @@ -538,6 +545,7 @@ public void testAuthenticationStateOpenIDForTokenExpiration() throws Exception {
props.setProperty(AuthenticationProviderOpenID.ALLOWED_TOKEN_ISSUERS, issuer);
// Use the leeway to allow the token to pass validation and then fail expiration
props.setProperty(AuthenticationProviderOpenID.ACCEPTED_TIME_LEEWAY_SECONDS, "10");
@Cleanup
AuthenticationProviderOpenID provider = new AuthenticationProviderOpenID();
provider.initialize(conf);

Expand Down Expand Up @@ -603,6 +611,7 @@ public void testAuthenticationProviderListStateSuccess() throws Exception {

@Test
void ensureRoleClaimForNonSubClaimReturnsRole() throws Exception {
@Cleanup
AuthenticationProviderOpenID provider = new AuthenticationProviderOpenID();
Properties props = new Properties();
props.setProperty(AuthenticationProviderOpenID.ALLOWED_TOKEN_ISSUERS, issuer);
Expand All @@ -623,6 +632,7 @@ void ensureRoleClaimForNonSubClaimReturnsRole() throws Exception {

@Test
void ensureRoleClaimForNonSubClaimFailsWhenClaimIsMissing() throws Exception {
@Cleanup
AuthenticationProviderOpenID provider = new AuthenticationProviderOpenID();
Properties props = new Properties();
props.setProperty(AuthenticationProviderOpenID.ALLOWED_TOKEN_ISSUERS, issuer);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import io.jsonwebtoken.SignatureAlgorithm;
import io.jsonwebtoken.impl.DefaultJwtBuilder;
import io.jsonwebtoken.security.Keys;
import java.io.IOException;
import java.security.KeyPair;
import java.sql.Date;
import java.time.Instant;
Expand All @@ -35,9 +36,11 @@
import java.util.Properties;
import java.util.Set;
import javax.naming.AuthenticationException;
import lombok.Cleanup;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.authentication.AuthenticationDataCommand;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;
Expand Down Expand Up @@ -83,15 +86,22 @@ public void setup() throws Exception {
basicProvider.initialize(conf);
}

@AfterClass
public void cleanup() throws IOException {
basicProvider.close();
}

@Test
public void testNullToken() {
public void testNullToken() throws IOException {
@Cleanup
AuthenticationProviderOpenID provider = new AuthenticationProviderOpenID();
Assert.assertThrows(AuthenticationException.class,
() -> provider.authenticate(new AuthenticationDataCommand(null)));
}

@Test
public void testThatNullAlgFails() {
public void testThatNullAlgFails() throws IOException {
@Cleanup
AuthenticationProviderOpenID provider = new AuthenticationProviderOpenID();
Assert.assertThrows(AuthenticationException.class,
() -> provider.verifyJWT(null, null, null));
Expand All @@ -103,10 +113,15 @@ public void testThatUnsupportedAlgsThrowExceptions() {
Arrays.stream(supportedAlgorithms()).map(o -> (SignatureAlgorithm) o[0]).toList()
.forEach(unsupportedAlgs::remove);
unsupportedAlgs.forEach(unsupportedAlg -> {
AuthenticationProviderOpenID provider = new AuthenticationProviderOpenID();
// We don't create a public key because it's irrelevant
Assert.assertThrows(AuthenticationException.class,
() -> provider.verifyJWT(null, unsupportedAlg.getValue(), null));
try {
@Cleanup
AuthenticationProviderOpenID provider = new AuthenticationProviderOpenID();
// We don't create a public key because it's irrelevant
Assert.assertThrows(AuthenticationException.class,
() -> provider.verifyJWT(null, unsupportedAlg.getValue(), null));
} catch (Exception e) {
throw new RuntimeException(e);
}
});
}

Expand All @@ -124,8 +139,9 @@ public void testThatSupportedAlgsWork(SignatureAlgorithm alg) throws Authenticat
}

@Test
public void testThatSupportedAlgWithMismatchedPublicKeyFromDifferentAlgFamilyFails() {
public void testThatSupportedAlgWithMismatchedPublicKeyFromDifferentAlgFamilyFails() throws IOException {
KeyPair keyPair = Keys.keyPairFor(SignatureAlgorithm.RS256);
@Cleanup
AuthenticationProviderOpenID provider = new AuthenticationProviderOpenID();
DefaultJwtBuilder defaultJwtBuilder = new DefaultJwtBuilder();
addValidMandatoryClaims(defaultJwtBuilder, basicProviderAudience);
Expand All @@ -137,8 +153,9 @@ public void testThatSupportedAlgWithMismatchedPublicKeyFromDifferentAlgFamilyFai
}

@Test
public void testThatSupportedAlgWithMismatchedPublicKeyFromSameAlgFamilyFails() {
public void testThatSupportedAlgWithMismatchedPublicKeyFromSameAlgFamilyFails() throws IOException {
KeyPair keyPair = Keys.keyPairFor(SignatureAlgorithm.RS256);
@Cleanup
AuthenticationProviderOpenID provider = new AuthenticationProviderOpenID();
DefaultJwtBuilder defaultJwtBuilder = new DefaultJwtBuilder();
addValidMandatoryClaims(defaultJwtBuilder, basicProviderAudience);
Expand Down Expand Up @@ -192,6 +209,7 @@ public void ensureRecentlyExpiredTokenWithinConfiguredLeewaySucceeds() throws Ex
KeyPair keyPair = Keys.keyPairFor(SignatureAlgorithm.RS256);

// Set up the provider
@Cleanup
AuthenticationProviderOpenID provider = new AuthenticationProviderOpenID();
Properties props = new Properties();
props.setProperty(AuthenticationProviderOpenID.ACCEPTED_TIME_LEEWAY_SECONDS, "10");
Expand Down Expand Up @@ -219,7 +237,8 @@ public void ensureRecentlyExpiredTokenWithinConfiguredLeewaySucceeds() throws Ex
}

@Test
public void ensureEmptyIssuersFailsInitialization() {
public void ensureEmptyIssuersFailsInitialization() throws IOException {
@Cleanup
AuthenticationProviderOpenID provider = new AuthenticationProviderOpenID();
Properties props = new Properties();
props.setProperty(AuthenticationProviderOpenID.ALLOWED_TOKEN_ISSUERS, "");
Expand All @@ -229,7 +248,8 @@ public void ensureEmptyIssuersFailsInitialization() {
}

@Test
public void ensureEmptyIssuersFailsInitializationWithDisabledDiscoveryMode() {
public void ensureEmptyIssuersFailsInitializationWithDisabledDiscoveryMode() throws IOException {
@Cleanup
AuthenticationProviderOpenID provider = new AuthenticationProviderOpenID();
Properties props = new Properties();
props.setProperty(AuthenticationProviderOpenID.ALLOWED_TOKEN_ISSUERS, "");
Expand All @@ -241,6 +261,7 @@ public void ensureEmptyIssuersFailsInitializationWithDisabledDiscoveryMode() {

@Test
public void ensureEmptyIssuersWithK8sTrustedIssuerEnabledPassesInitialization() throws Exception {
@Cleanup
AuthenticationProviderOpenID provider = new AuthenticationProviderOpenID();
Properties props = new Properties();
props.setProperty(AuthenticationProviderOpenID.ALLOWED_AUDIENCES, "my-audience");
Expand All @@ -253,6 +274,7 @@ public void ensureEmptyIssuersWithK8sTrustedIssuerEnabledPassesInitialization()

@Test
public void ensureEmptyIssuersWithK8sPublicKeyEnabledPassesInitialization() throws Exception {
@Cleanup
AuthenticationProviderOpenID provider = new AuthenticationProviderOpenID();
Properties props = new Properties();
props.setProperty(AuthenticationProviderOpenID.ALLOWED_AUDIENCES, "my-audience");
Expand All @@ -264,7 +286,8 @@ public void ensureEmptyIssuersWithK8sPublicKeyEnabledPassesInitialization() thro
}

@Test
public void ensureNullIssuersFailsInitialization() {
public void ensureNullIssuersFailsInitialization() throws IOException {
@Cleanup
AuthenticationProviderOpenID provider = new AuthenticationProviderOpenID();
ServiceConfiguration config = new ServiceConfiguration();
// Make sure this still defaults to null.
Expand All @@ -273,7 +296,8 @@ public void ensureNullIssuersFailsInitialization() {
}

@Test
public void ensureInsecureIssuerFailsInitialization() {
public void ensureInsecureIssuerFailsInitialization() throws IOException {
@Cleanup
AuthenticationProviderOpenID provider = new AuthenticationProviderOpenID();
Properties props = new Properties();
props.setProperty(AuthenticationProviderOpenID.ALLOWED_TOKEN_ISSUERS, "https://myissuer.com,http://myissuer.com");
Expand All @@ -293,6 +317,7 @@ public void ensureInsecureIssuerFailsInitialization() {
}

@Test void ensureRoleClaimForStringReturnsRole() throws Exception {
@Cleanup
AuthenticationProviderOpenID provider = new AuthenticationProviderOpenID();
Properties props = new Properties();
props.setProperty(AuthenticationProviderOpenID.ALLOWED_TOKEN_ISSUERS, "https://myissuer.com");
Expand All @@ -312,6 +337,7 @@ public void ensureInsecureIssuerFailsInitialization() {
}

@Test void ensureRoleClaimForSingletonListReturnsRole() throws Exception {
@Cleanup
AuthenticationProviderOpenID provider = new AuthenticationProviderOpenID();
Properties props = new Properties();
props.setProperty(AuthenticationProviderOpenID.ALLOWED_TOKEN_ISSUERS, "https://myissuer.com");
Expand All @@ -333,6 +359,7 @@ public void ensureInsecureIssuerFailsInitialization() {
}

@Test void ensureRoleClaimForMultiEntryListReturnsFirstRole() throws Exception {
@Cleanup
AuthenticationProviderOpenID provider = new AuthenticationProviderOpenID();
Properties props = new Properties();
props.setProperty(AuthenticationProviderOpenID.ALLOWED_TOKEN_ISSUERS, "https://myissuer.com");
Expand All @@ -354,6 +381,7 @@ public void ensureInsecureIssuerFailsInitialization() {
}

@Test void ensureRoleClaimForEmptyListReturnsNull() throws Exception {
@Cleanup
AuthenticationProviderOpenID provider = new AuthenticationProviderOpenID();
Properties props = new Properties();
props.setProperty(AuthenticationProviderOpenID.ALLOWED_TOKEN_ISSUERS, "https://myissuer.com");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ public PulsarStandaloneStarter(String[] args) throws Exception {
jcommander.parse(args);
if (this.isHelp()) {
jcommander.usage();
System.exit(0);
exit(0);
}
if (Strings.isNullOrEmpty(this.getConfigFile())) {
String configFile = System.getProperty(PULSAR_CONFIG_FILE);
Expand All @@ -62,7 +62,7 @@ public PulsarStandaloneStarter(String[] args) throws Exception {
CmdGenerateDocs cmd = new CmdGenerateDocs("pulsar");
cmd.addCommand("standalone", this);
cmd.run(null);
System.exit(0);
exit(0);
}

if (this.isNoBroker() && this.isOnlyBroker()) {
Expand All @@ -73,7 +73,7 @@ public PulsarStandaloneStarter(String[] args) throws Exception {
} catch (Exception e) {
jcommander.usage();
log.error(e.getMessage());
System.exit(1);
exit(1);
}

try (FileInputStream inputStream = new FileInputStream(this.getConfigFile())) {
Expand Down Expand Up @@ -109,6 +109,10 @@ public PulsarStandaloneStarter(String[] args) throws Exception {
}
}

registerShutdownHook();
}

protected void registerShutdownHook() {
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
try {
if (fnWorkerService != null) {
Expand All @@ -130,6 +134,10 @@ public PulsarStandaloneStarter(String[] args) throws Exception {
}));
}

protected void exit(int status) {
System.exit(status);
}

private static boolean argsContains(String[] args, String arg) {
return Arrays.asList(args).contains(arg);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1462,7 +1462,9 @@ public String setNamespaceBundleAffinity(String bundle, String broker) {
@Override
public void stop() throws PulsarServerException {
try {
loadReports.close();
if (loadReports != null) {
loadReports.close();
}
scheduler.shutdownNow();
scheduler.awaitTermination(5, TimeUnit.SECONDS);
} catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2603,14 +2603,25 @@ private void updateConfigurationAndRegisterListeners() {
new Semaphore((int) maxConcurrentTopicLoadRequest, false)));
registerConfigurationListener("loadManagerClassName", className -> {
pulsar.getExecutor().execute(() -> {
LoadManager newLoadManager = null;
try {
final LoadManager newLoadManager = LoadManager.create(pulsar);
newLoadManager = LoadManager.create(pulsar);
log.info("Created load manager: {}", className);
pulsar.getLoadManager().get().stop();
newLoadManager.start();
pulsar.getLoadManager().set(newLoadManager);
} catch (Exception ex) {
log.warn("Failed to change load manager", ex);
try {
if (newLoadManager != null) {
newLoadManager.stop();
newLoadManager = null;
}
} catch (PulsarServerException e) {
log.warn("Failed to close created load manager", e);
}
}
if (newLoadManager != null) {
pulsar.getLoadManager().set(newLoadManager);
}
});
});
Expand Down
Loading