Skip to content

Commit

Permalink
Implement Elasticsearch / OpenSearch storage auto detection to pick t…
Browse files Browse the repository at this point in the history
…he right job instance

Signed-off-by: Andriy Redko <[email protected]>
  • Loading branch information
reta committed May 15, 2024
1 parent b3523e3 commit 2da5eba
Show file tree
Hide file tree
Showing 5 changed files with 174 additions and 51 deletions.
36 changes: 5 additions & 31 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,7 @@ are supported, including Cassandra, MySQL and Elasticsearch.

* `STORAGE_TYPE=cassandra3` : requires Cassandra 3.11.3+; tested against the latest patch of 4.0
* `STORAGE_TYPE=mysql` : requires MySQL 5.6+; tested against MySQL 10.11
* `STORAGE_TYPE=elasticsearch` : requires Elasticsearch 7+; tested against last minor release of 7.x and 8.x
* `STORAGE_TYPE=opensearch` : requires OpenSearch 2+; tested against last minor release of 2.x
* `STORAGE_TYPE=elasticsearch` : requires Elasticsearch 7+ or OpenSearch 2.x; tested against last minor release of Elasticsearch 7.x and 8.x, OpenSearch 2.x

## Quick-start

Expand Down Expand Up @@ -93,20 +92,20 @@ $ STORAGE_TYPE=mysql MYSQL_USER=root java -jar zipkin-dependencies.jar
```

### Elasticsearch Storage
Elasticsearch is used when `STORAGE_TYPE=elasticsearch`. The schema is compatible with Zipkin's [Elasticsearch storage component](https://github.com/openzipkin/zipkin/tree/master/zipkin-storage/elasticsearch).
Elasticsearch/OpenSearch is used when `STORAGE_TYPE=elasticsearch`. The schema is compatible with Zipkin's [Elasticsearch storage component](https://github.com/openzipkin/zipkin/tree/master/zipkin-storage/elasticsearch).

* `ES_INDEX`: The index prefix to use when generating daily index names. Defaults to zipkin.
* `ES_DATE_SEPARATOR`: The separator used when generating dates in index.
Defaults to '-' so the queried index look like zipkin-yyyy-DD-mm
Could for example be changed to '.' to give zipkin-yyyy.MM.dd
* `ES_HOSTS`: A comma separated list of elasticsearch hosts advertising http. Defaults to
* `ES_HOSTS`: A comma separated list of Elasticsearch / OpenSearch hosts advertising http. Defaults to
localhost. Add port section if not listening on port 9200. Only one of these hosts
needs to be available to fetch the remaining nodes in the cluster. It is
recommended to set this to all the master nodes of the cluster. Use url format for
SSL. For example, "https://yourhost:8888"
* `ES_NODES_WAN_ONLY`: Set to true to only use the values set in ES_HOSTS, for example if your
elasticsearch cluster is in Docker. Defaults to false
* `ES_USERNAME` and `ES_PASSWORD`: Elasticsearch basic authentication. Use when X-Pack security
Elasticsearch / OpenSearch cluster is in Docker. Defaults to false
* `ES_USERNAME` and `ES_PASSWORD`: Elasticsearch / OpenSearch basic authentication. Use when X-Pack security
(formerly Shield) is in place. By default no username or
password is provided to elasticsearch.

Expand All @@ -118,31 +117,6 @@ $ STORAGE_TYPE=elasticsearch ES_HOSTS=host1,host2 java -jar zipkin-dependencies.
$ STORAGE_TYPE=elasticsearch ES_HOSTS=host1:9201 java -jar zipkin-dependencies.jar
```

### OpenSearch Storage
OpenSearch is used when `STORAGE_TYPE=opensearch`. The schema is compatible with Zipkin's [Elasticsearch storage component](https://github.com/openzipkin/zipkin/tree/master/zipkin-storage/elasticsearch).

* `OS_INDEX`: The index prefix to use when generating daily index names. Defaults to zipkin.
* `OS_DATE_SEPARATOR`: The separator used when generating dates in index.
Defaults to '-' so the queried index look like zipkin-yyyy-DD-mm
Could for example be changed to '.' to give zipkin-yyyy.MM.dd
* `OS_HOSTS`: A comma separated list of OpenSearch hosts advertising http. Defaults to
localhost. Add port section if not listening on port 9200. Only one of these hosts
needs to be available to fetch the remaining nodes in the cluster. It is
recommended to set this to all the master nodes of the cluster. Use url format for
SSL. For example, "https://yourhost:8888"
* `OS_NODES_WAN_ONLY`: Set to true to only use the values set in OS_HOSTS, for example if your
OpenSearch cluster is in Docker. Defaults to false
* `OS_USERNAME` and `OS_PASSWORD`: OpenSearch basic authentication. Use when security plugin
is in place. By default no username or password is provided to OpenSearch.

Example usage:

```bash
$ STORAGE_TYPE=opensearch OS_HOSTS=host1,host2 java -jar zipkin-dependencies.jar
# To override the http port, add it to the host string
$ STORAGE_TYPE=opensearch OS_HOSTS=host1:9201 java -jar zipkin-dependencies.jar
```

#### Custom certificates

When using an https endpoint in `ES_HOSTS`, you can use the following standard properties to
Expand Down
4 changes: 2 additions & 2 deletions docker/examples/docker-compose-opensearch.yml
Original file line number Diff line number Diff line change
Expand Up @@ -36,5 +36,5 @@ services:
file: docker-compose.yml
service: dependencies
environment:
- STORAGE_TYPE=opensearch
- OS_HOSTS=opensearch
- STORAGE_TYPE=elasticsearch
- ES_HOSTS=opensearch
29 changes: 17 additions & 12 deletions main/src/main/java/zipkin2/dependencies/ZipkinDependenciesJob.java
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,16 @@

import java.io.File;
import java.io.UnsupportedEncodingException;
import java.net.URI;
import java.net.URL;
import java.net.URLDecoder;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Collection;
import java.util.LinkedHashMap;
import java.util.TimeZone;

import zipkin2.dependencies.elasticsearch.ElasticsearchDependenciesJob;
import zipkin2.dependencies.opensearch.OpensearchDependenciesJob;
import zipkin2.dependencies.mysql.MySQLDependenciesJob;
Expand Down Expand Up @@ -62,29 +66,30 @@ public static void main(String[] args) throws UnsupportedEncodingException {
.run();
break;
case "elasticsearch":
ElasticsearchDependenciesJob.builder()
.logInitializer(logInitializer)
.jars(jarPath)
.day(day)
.conf(sparkConf)
.build()
.run();
break;
case "opensearch":
if (ZipkinElasticsearchStorage.flavor().equalsIgnoreCase("elasticsearch")) {
ElasticsearchDependenciesJob.builder()
.logInitializer(logInitializer)
.jars(jarPath)
.day(day)
.conf(sparkConf)
.build()
.run();
} else { // "opensearch"
OpensearchDependenciesJob.builder()
.logInitializer(logInitializer)
.jars(jarPath)
.day(day)
.conf(sparkConf)
.build()
.run();
break;
}
break;
default:
throw new UnsupportedOperationException("Unsupported STORAGE_TYPE: " + storageType + "\n"
+ "Options are: cassandra3, mysql, elasticsearch, opensearch");
+ "Options are: cassandra3, mysql, elasticsearch");
}
}

static String[] pathToUberJar() throws UnsupportedEncodingException {
URL jarFile = ZipkinDependenciesJob.class.getProtectionDomain().getCodeSource().getLocation();
return new File(jarFile.getPath()).isDirectory() ? null
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,144 @@
/*
* Copyright The OpenZipkin Authors
* SPDX-License-Identifier: Apache-2.0
*/

package zipkin2.dependencies;

import java.io.IOException;
import java.net.Authenticator;
import java.net.PasswordAuthentication;
import java.net.URI;
import java.net.http.HttpClient;
import java.net.http.HttpRequest;
import java.net.http.HttpResponse;
import java.net.http.HttpResponse.BodyHandlers;
import java.security.KeyManagementException;
import java.security.NoSuchAlgorithmException;
import java.security.SecureRandom;
import java.security.cert.X509Certificate;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

import javax.net.ssl.SSLContext;
import javax.net.ssl.TrustManager;
import javax.net.ssl.X509TrustManager;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

final class ZipkinElasticsearchStorage {
private static final Logger LOG = LoggerFactory.getLogger(ZipkinElasticsearchStorage.class);
private static final Pattern DISTRIBUTION = Pattern.compile("\"distribution\"\s*[:]\s*\"([^\"]+)\"");

static final String HOSTS = getEnv("ES_HOSTS", "127.0.0.1");
static final String USERNAME = getEnv("ES_USERNAME", null);
static final String PASSWORD = getEnv("ES_PASSWORD", null);

static TrustManager[] TRUST_ALL = new TrustManager [] {
new X509TrustManager() {
@Override
public X509Certificate[] getAcceptedIssuers() {
return null;
}

@Override
public void checkClientTrusted(X509Certificate[] certs, String authType) {
}

@Override
public void checkServerTrusted(X509Certificate[] certs, String authType) {
}
}
};

static String flavor() {
final HttpClient.Builder builder = HttpClient
.newBuilder()
.connectTimeout(Duration.ofSeconds(5));

if (USERNAME != null && PASSWORD != null) {
builder.authenticator(new Authenticator() {
@Override
protected PasswordAuthentication getPasswordAuthentication() {
return new PasswordAuthentication(USERNAME, PASSWORD.toCharArray());
}
});
}

try {
final SSLContext sslContext = SSLContext.getInstance("TLS");
sslContext.init(null, TRUST_ALL, new SecureRandom());

final HttpClient client = builder.sslContext(sslContext).build();
try {
for (String host: parseHosts(HOSTS)) {
final HttpRequest request = HttpRequest.newBuilder().GET().uri(URI.create(host)).build();
try {
final HttpResponse<String> response = client.send(request, BodyHandlers.ofString());
final Matcher matcher = DISTRIBUTION.matcher(response.body());
if (matcher.find()) {
return matcher.group(1).toLowerCase();
}
} catch (InterruptedException | IOException ex) {
LOG.warn("Unable issue HTTP GET request to '" + host + "'", ex);
}
}
} finally {
if (client instanceof AutoCloseable) {
try {
// Since JDK-21, the HttpClient is AutoCloseable
((AutoCloseable) client).close();
} catch (Exception ex) {
/* Ignore */
}
}
}
} catch (final NoSuchAlgorithmException | KeyManagementException ex) {
LOG.warn("Unable to configure HttpClient", ex);
}

return "elasticsearch";
}

private static String getEnv(String key, String defaultValue) {
String result = System.getenv(key);
return result != null && !result.isEmpty() ? result : defaultValue;
}

static String[] parseHosts(String hosts) {
final String[] hostParts = hosts.split(",", -1);

// Detect default scheme to use if not specified
String defaultScheme = "http";
for (int i = 0; i < hostParts.length; i++) {
String host = hostParts[i];
if (host.startsWith("https")) {
defaultScheme = "https";
break;
}
}

Collection<String> list = new ArrayList<>();
for (int i = 0; i < hostParts.length; i++) {
String host = hostParts[i];
URI httpUri = host.startsWith("http") ? URI.create(host) : URI.create(defaultScheme + "://" + host);

int port = httpUri.getPort();
if (port == -1) {
port = 9200; /* default Elasticsearch / OpenSearch port */
}

list.add(httpUri.getScheme() + "://" + httpUri.getHost() + ":" + port);
}

return list.toArray(new String[0]);
}

public static void main(String[] s) {
System.out.println(flavor());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -53,18 +53,18 @@ public static Builder builder() {

public static final class Builder {

String index = getEnv("OS_INDEX", "zipkin");
String hosts = getEnv("OS_HOSTS", "127.0.0.1");
String username = getEnv("OS_USERNAME", null);
String password = getEnv("OS_PASSWORD", null);
String index = getEnv("ES_INDEX", "zipkin");
String hosts = getEnv("ES_HOSTS", "127.0.0.1");
String username = getEnv("ES_USERNAME", null);
String password = getEnv("ES_PASSWORD", null);

final Map<String, String> sparkProperties = new LinkedHashMap<>();

Builder() {
sparkProperties.put("spark.ui.enabled", "false");
// don't die if there are no spans
sparkProperties.put(OPENSEARCH_INDEX_READ_MISSING_AS_EMPTY, "true");
sparkProperties.put(OPENSEARCH_NODES_WAN_ONLY, getEnv("OS_NODES_WAN_ONLY", "false"));
sparkProperties.put(OPENSEARCH_NODES_WAN_ONLY, getEnv("ES_NODES_WAN_ONLY", "false"));
sparkProperties.put(OPENSEARCH_NET_SSL_KEYSTORE_LOCATION,
getSystemPropertyAsFileResource("javax.net.ssl.keyStore"));
sparkProperties.put(OPENSEARCH_NET_SSL_KEYSTORE_PASS,
Expand Down Expand Up @@ -149,7 +149,7 @@ private static String getSystemPropertyAsFileResource(String key) {

OpensearchDependenciesJob(Builder builder) {
this.index = builder.index;
String dateSeparator = getEnv("OS_DATE_SEPARATOR", "-");
String dateSeparator = getEnv("ES_DATE_SEPARATOR", "-");
SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd".replace("-", dateSeparator));
df.setTimeZone(TimeZone.getTimeZone("UTC"));
this.dateStamp = df.format(new Date(builder.day));
Expand Down

0 comments on commit 2da5eba

Please sign in to comment.