diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index 7761d46..42ee077 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -50,6 +50,9 @@ jobs: - name: zipkin-dependencies-elasticsearch-v8 module: zipkin-dependencies-elasticsearch groups: docker,elasticsearch8 + - name: zipkin-dependencies-opensearch-v2 + module: zipkin-dependencies-opensearch + groups: docker,opensearch2 - name: zipkin-dependencies-mysql module: zipkin-dependencies-mysql groups: docker diff --git a/README.md b/README.md index e296e55..e7b3b33 100644 --- a/README.md +++ b/README.md @@ -15,7 +15,7 @@ are supported, including Cassandra, MySQL and Elasticsearch. * `STORAGE_TYPE=cassandra3` : requires Cassandra 3.11.3+; tested against the latest patch of 4.0 * `STORAGE_TYPE=mysql` : requires MySQL 5.6+; tested against MySQL 10.11 -* `STORAGE_TYPE=elasticsearch` : requires Elasticsearch 7+; tested against last minor release of 7.x and 8.x +* `STORAGE_TYPE=elasticsearch` : requires Elasticsearch 7+ or OpenSearch 2.x; tested against last minor release of Elasticsearch 7.x and 8.x, OpenSearch 2.x ## Quick-start @@ -92,20 +92,20 @@ $ STORAGE_TYPE=mysql MYSQL_USER=root java -jar zipkin-dependencies.jar ``` ### Elasticsearch Storage -Elasticsearch is used when `STORAGE_TYPE=elasticsearch`. The schema is compatible with Zipkin's [Elasticsearch storage component](https://github.com/openzipkin/zipkin/tree/master/zipkin-storage/elasticsearch). +Elasticsearch/OpenSearch is used when `STORAGE_TYPE=elasticsearch`. The schema is compatible with Zipkin's [Elasticsearch storage component](https://github.com/openzipkin/zipkin/tree/master/zipkin-storage/elasticsearch). * `ES_INDEX`: The index prefix to use when generating daily index names. Defaults to zipkin. * `ES_DATE_SEPARATOR`: The separator used when generating dates in index. Defaults to '-' so the queried index look like zipkin-yyyy-DD-mm Could for example be changed to '.' to give zipkin-yyyy.MM.dd - * `ES_HOSTS`: A comma separated list of elasticsearch hosts advertising http. Defaults to + * `ES_HOSTS`: A comma separated list of Elasticsearch / OpenSearch hosts advertising http. Defaults to localhost. Add port section if not listening on port 9200. Only one of these hosts needs to be available to fetch the remaining nodes in the cluster. It is recommended to set this to all the master nodes of the cluster. Use url format for SSL. For example, "https://yourhost:8888" * `ES_NODES_WAN_ONLY`: Set to true to only use the values set in ES_HOSTS, for example if your - elasticsearch cluster is in Docker. Defaults to false - * `ES_USERNAME` and `ES_PASSWORD`: Elasticsearch basic authentication. Use when X-Pack security + Elasticsearch / OpenSearch cluster is in Docker. Defaults to false + * `ES_USERNAME` and `ES_PASSWORD`: Elasticsearch / OpenSearch basic authentication. Use when X-Pack security (formerly Shield) is in place. By default no username or password is provided to elasticsearch. diff --git a/docker/examples/README.md b/docker/examples/README.md index 80e5d17..a63ac9d 100644 --- a/docker/examples/README.md +++ b/docker/examples/README.md @@ -15,6 +15,13 @@ $ STORAGE_TYPE=elasticsearch $ docker-compose -f docker-compose.yml -f docker-compose-${STORAGE_TYPE}.yml up ``` +The `elasticsearch` storage type is also compatible with OpenSearch, +you can start the example setup like that: + +``` +$ docker-compose -f docker-compose.yml -f docker-compose-opensearch.yml up +``` + This starts zipkin, the corresponding storage and makes an example request. After that, it runs the dependencies job on-demand. diff --git a/docker/examples/docker-compose-opensearch.yml b/docker/examples/docker-compose-opensearch.yml new file mode 100644 index 0000000..67563f1 --- /dev/null +++ b/docker/examples/docker-compose-opensearch.yml @@ -0,0 +1,40 @@ +# +# Copyright The OpenZipkin Authors +# SPDX-License-Identifier: Apache-2.0 +# + +# This file uses the version 2 docker-compose file format, described here: +# https://docs.docker.com/compose/compose-file/#version-2 +# +# It extends the default configuration from docker-compose.yml to run the +# zipkin-opensearch2 container instead of the zipkin-mysql container. + +version: '2.4' + +services: + storage: + image: ghcr.io/openzipkin/zipkin-opensearch2:${TAG:-latest} + container_name: opensearch + # Uncomment to expose the storage port for testing + # ports: + # - 9200:9200 + + # Use OpenSearch instead of in-memory storage + zipkin: + extends: + file: docker-compose.yml + service: zipkin + environment: + - STORAGE_TYPE=elasticsearch + # Point the zipkin at the storage backend + - ES_HOSTS=opensearch:9200 + # Uncomment to see requests to and from elasticsearch + # - ES_HTTP_LOGGING=BODY + + dependencies: + extends: + file: docker-compose.yml + service: dependencies + environment: + - STORAGE_TYPE=elasticsearch + - ES_HOSTS=opensearch diff --git a/main/pom.xml b/main/pom.xml index c61b8a6..7605326 100644 --- a/main/pom.xml +++ b/main/pom.xml @@ -39,6 +39,19 @@ zipkin-dependencies-elasticsearch ${project.version} + + + ${project.groupId} + zipkin-dependencies-opensearch + ${project.version} + + + + com.linecorp.armeria + armeria-junit5 + ${armeria.version} + test + diff --git a/main/src/main/java/zipkin2/dependencies/ZipkinDependenciesJob.java b/main/src/main/java/zipkin2/dependencies/ZipkinDependenciesJob.java index 7265d1c..b1775eb 100644 --- a/main/src/main/java/zipkin2/dependencies/ZipkinDependenciesJob.java +++ b/main/src/main/java/zipkin2/dependencies/ZipkinDependenciesJob.java @@ -13,6 +13,7 @@ import java.util.LinkedHashMap; import java.util.TimeZone; import zipkin2.dependencies.elasticsearch.ElasticsearchDependenciesJob; +import zipkin2.dependencies.opensearch.OpensearchDependenciesJob; import zipkin2.dependencies.mysql.MySQLDependenciesJob; public final class ZipkinDependenciesJob { @@ -61,13 +62,23 @@ public static void main(String[] args) throws UnsupportedEncodingException { .run(); break; case "elasticsearch": - ElasticsearchDependenciesJob.builder() - .logInitializer(logInitializer) - .jars(jarPath) - .day(day) - .conf(sparkConf) - .build() - .run(); + if (ZipkinElasticsearchStorage.flavor().equalsIgnoreCase("elasticsearch")) { + ElasticsearchDependenciesJob.builder() + .logInitializer(logInitializer) + .jars(jarPath) + .day(day) + .conf(sparkConf) + .build() + .run(); + } else { // "opensearch" + OpensearchDependenciesJob.builder() + .logInitializer(logInitializer) + .jars(jarPath) + .day(day) + .conf(sparkConf) + .build() + .run(); + } break; default: throw new UnsupportedOperationException("Unsupported STORAGE_TYPE: " + storageType + "\n" diff --git a/main/src/main/java/zipkin2/dependencies/ZipkinElasticsearchStorage.java b/main/src/main/java/zipkin2/dependencies/ZipkinElasticsearchStorage.java new file mode 100644 index 0000000..b257663 --- /dev/null +++ b/main/src/main/java/zipkin2/dependencies/ZipkinElasticsearchStorage.java @@ -0,0 +1,163 @@ +/* + * Copyright The OpenZipkin Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package zipkin2.dependencies; + +import java.io.IOException; +import java.net.Authenticator; +import java.net.PasswordAuthentication; +import java.net.Socket; +import java.net.URI; +import java.net.http.HttpClient; +import java.net.http.HttpRequest; +import java.net.http.HttpResponse; +import java.net.http.HttpResponse.BodyHandlers; +import java.security.KeyManagementException; +import java.security.NoSuchAlgorithmException; +import java.security.SecureRandom; +import java.security.cert.CertificateException; +import java.security.cert.X509Certificate; +import java.time.Duration; +import java.util.ArrayList; +import java.util.Collection; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +import javax.net.ssl.SSLContext; +import javax.net.ssl.SSLEngine; +import javax.net.ssl.TrustManager; +import javax.net.ssl.X509ExtendedTrustManager; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +final class ZipkinElasticsearchStorage { + private static final Logger LOG = LoggerFactory.getLogger(ZipkinElasticsearchStorage.class); + private static final Pattern DISTRIBUTION = Pattern.compile("\"distribution\"\s*[:]\s*\"([^\"]+)\""); + + static final String HOSTS = getEnv("ES_HOSTS", "127.0.0.1"); + static final String USERNAME = getEnv("ES_USERNAME", null); + static final String PASSWORD = getEnv("ES_PASSWORD", null); + + static TrustManager[] TRUST_ALL = new TrustManager [] { + new X509ExtendedTrustManager() { + @Override + public X509Certificate[] getAcceptedIssuers() { + return null; + } + + @Override + public void checkClientTrusted(X509Certificate[] certs, String authType) { + } + + @Override + public void checkServerTrusted(X509Certificate[] certs, String authType) { + } + + @Override + public void checkServerTrusted(X509Certificate[] chain, String authType, SSLEngine engine) throws CertificateException { + } + + @Override + public void checkServerTrusted(X509Certificate[] chain, String authType, Socket socket) throws CertificateException { + } + + @Override + public void checkClientTrusted(X509Certificate[] chain, String authType, SSLEngine engine) throws CertificateException { + } + + @Override + public void checkClientTrusted(X509Certificate[] chain, String authType, Socket socket) throws CertificateException { + } + } + }; + + static String flavor() { + return flavor(HOSTS, USERNAME, PASSWORD); + } + + static String flavor(String hosts, String username, String password) { + final HttpClient.Builder builder = HttpClient + .newBuilder() + .connectTimeout(Duration.ofSeconds(5)); + + if (username != null && password != null) { + builder.authenticator(new Authenticator() { + @Override + protected PasswordAuthentication getPasswordAuthentication() { + return new PasswordAuthentication(username, password.toCharArray()); + } + }); + } + + try { + final SSLContext sslContext = SSLContext.getInstance("TLS"); + sslContext.init(null, TRUST_ALL, new SecureRandom()); + + final HttpClient client = builder.sslContext(sslContext).build(); + try { + for (String host: parseHosts(hosts)) { + final HttpRequest request = HttpRequest.newBuilder().GET().uri(URI.create(host)).build(); + try { + final HttpResponse response = client.send(request, BodyHandlers.ofString()); + final Matcher matcher = DISTRIBUTION.matcher(response.body()); + if (matcher.find()) { + return matcher.group(1).toLowerCase(); + } + } catch (InterruptedException | IOException ex) { + LOG.warn("Unable issue HTTP GET request to '" + host + "'", ex); + } + } + } finally { + if (client instanceof AutoCloseable) { + try { + // Since JDK-21, the HttpClient is AutoCloseable + ((AutoCloseable) client).close(); + } catch (Exception ex) { + /* Ignore */ + } + } + } + } catch (final NoSuchAlgorithmException | KeyManagementException ex) { + LOG.warn("Unable to configure HttpClient", ex); + } + + return "elasticsearch"; + } + + private static String getEnv(String key, String defaultValue) { + String result = System.getenv(key); + return result != null && !result.isEmpty() ? result : defaultValue; + } + + static String[] parseHosts(String hosts) { + final String[] hostParts = hosts.split(",", -1); + + // Detect default scheme to use if not specified + String defaultScheme = "http"; + for (int i = 0; i < hostParts.length; i++) { + String host = hostParts[i]; + if (host.startsWith("https")) { + defaultScheme = "https"; + break; + } + } + + Collection list = new ArrayList<>(); + for (int i = 0; i < hostParts.length; i++) { + String host = hostParts[i]; + URI httpUri = host.startsWith("http") ? URI.create(host) : URI.create(defaultScheme + "://" + host); + + int port = httpUri.getPort(); + if (port == -1) { + port = 9200; /* default Elasticsearch / OpenSearch port */ + } + + list.add(httpUri.getScheme() + "://" + httpUri.getHost() + ":" + port); + } + + return list.toArray(new String[0]); + } +} diff --git a/main/src/test/java/zipkin2/dependencies/ZipkinElasticsearchStorageTest.java b/main/src/test/java/zipkin2/dependencies/ZipkinElasticsearchStorageTest.java new file mode 100644 index 0000000..19a75cf --- /dev/null +++ b/main/src/test/java/zipkin2/dependencies/ZipkinElasticsearchStorageTest.java @@ -0,0 +1,88 @@ +/* + * Copyright The OpenZipkin Authors + * SPDX-License-Identifier: Apache-2.0 + */ +package zipkin2.dependencies; + +import static org.assertj.core.api.Assertions.assertThat; + +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; + +import com.linecorp.armeria.common.AggregatedHttpResponse; +import com.linecorp.armeria.common.HttpStatus; +import com.linecorp.armeria.common.MediaType; +import com.linecorp.armeria.testing.junit5.server.mock.MockWebServerExtension; + +class ZipkinElasticsearchStorageTest { + static final AggregatedHttpResponse ELASTICSEARCH_RESPONSE = AggregatedHttpResponse.of( + HttpStatus.OK, MediaType.JSON_UTF_8, """ + { + "name" : "zipkin-elasticsearch", + "cluster_name" : "docker-cluster", + "cluster_uuid" : "wByRPgSgTryYl0TZXW4MsA", + "version" : { + "number" : "7.0.1", + "build_flavor" : "default", + "build_type" : "tar", + "build_hash" : "e4efcb5", + "build_date" : "2019-04-29T12:56:03.145736Z", + "build_snapshot" : false, + "lucene_version" : "8.0.0", + "minimum_wire_compatibility_version" : "6.7.0", + "minimum_index_compatibility_version" : "6.0.0-beta1" + }, + "tagline" : "You Know, for Search" + } + """); + static final AggregatedHttpResponse OPENSEARCH_RESPONSE = AggregatedHttpResponse.of( + HttpStatus.OK, MediaType.JSON_UTF_8, """ + { + "name" : "PV-NhJd", + "cluster_name" : "CollectorDBCluster", + "cluster_uuid" : "UjZaM0fQRC6tkHINCg9y8w", + "version" : { + "distribution" : "opensearch", + "number" : "2.11.1", + "build_type" : "tar", + "build_hash" : "6b1986e964d440be9137eba1413015c31c5a7752", + "build_date" : "2023-11-29T21:43:10.135035992Z", + "build_snapshot" : false, + "lucene_version" : "9.7.0", + "minimum_wire_compatibility_version" : "7.10.0", + "minimum_index_compatibility_version" : "7.0.0" + }, + "tagline" : "The OpenSearch Project: https://opensearch.org/" + } + """); + + @RegisterExtension static MockWebServerExtension server = new MockWebServerExtension(); + + @Test void opensearch_http() throws Exception { + server.enqueue(OPENSEARCH_RESPONSE); + + assertThat(ZipkinElasticsearchStorage.flavor(server.httpUri().toString(), null, null)) + .isEqualTo("opensearch"); + } + + @Test void opensearch_https() throws Exception { + server.enqueue(OPENSEARCH_RESPONSE); + + assertThat(ZipkinElasticsearchStorage.flavor(server.httpsUri().toString(), null, null)) + .isEqualTo("opensearch"); + } + + @Test void elasticsearch_http() throws Exception { + server.enqueue(ELASTICSEARCH_RESPONSE); + + assertThat(ZipkinElasticsearchStorage.flavor(server.httpUri().toString(), null, null)) + .isEqualTo("elasticsearch"); + } + + @Test void elasticsearch_https() throws Exception { + server.enqueue(ELASTICSEARCH_RESPONSE); + + assertThat(ZipkinElasticsearchStorage.flavor(server.httpsUri().toString(), null, null)) + .isEqualTo("elasticsearch"); + } +} diff --git a/opensearch/pom.xml b/opensearch/pom.xml new file mode 100644 index 0000000..01c335a --- /dev/null +++ b/opensearch/pom.xml @@ -0,0 +1,71 @@ + + + + 4.0.0 + + + io.zipkin.dependencies + zipkin-dependencies-parent + 3.2.0-SNAPSHOT + + + zipkin-dependencies-opensearch + Zipkin Dependencies: OpenSearch + + + ${project.basedir}/.. + 4.12.0 + + + + + org.opensearch.client + opensearch-spark-30_${scala.binary.version} + ${opensearch-spark.version} + + + + io.zipkin.zipkin2 + zipkin-storage-elasticsearch + ${zipkin.version} + test + + + com.squareup.okhttp3 + mockwebserver + ${okhttp.version} + test + + + com.squareup.okhttp3 + okhttp-tls + ${okhttp.version} + test + + + + org.slf4j + slf4j-api + ${slf4j.version} + + + + + com.linecorp.armeria + armeria-junit5 + ${armeria.version} + test + + + org.testcontainers + junit-jupiter + ${testcontainers.version} + test + + + diff --git a/opensearch/src/main/java/zipkin2/dependencies/opensearch/OpensearchDependenciesJob.java b/opensearch/src/main/java/zipkin2/dependencies/opensearch/OpensearchDependenciesJob.java new file mode 100644 index 0000000..e3854fa --- /dev/null +++ b/opensearch/src/main/java/zipkin2/dependencies/opensearch/OpensearchDependenciesJob.java @@ -0,0 +1,273 @@ +/* + * Copyright The OpenZipkin Authors + * SPDX-License-Identifier: Apache-2.0 + */ +package zipkin2.dependencies.opensearch; + +import com.google.gson.stream.JsonReader; +import com.google.gson.stream.MalformedJsonException; +import java.io.IOException; +import java.io.StringReader; +import java.net.URI; +import java.nio.charset.Charset; +import java.text.SimpleDateFormat; +import java.util.Collections; +import java.util.Date; +import java.util.LinkedHashMap; +import java.util.Map; +import java.util.TimeZone; +import javax.annotation.Nullable; +import org.apache.spark.SparkConf; +import org.apache.spark.api.java.JavaRDD; +import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.api.java.function.Function; +import org.apache.spark.api.java.function.PairFunction; +import org.opensearch.spark.rdd.api.java.JavaOpenSearchSpark; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import scala.Tuple2; +import zipkin2.DependencyLink; +import zipkin2.codec.SpanBytesDecoder; + +import static com.google.common.base.Preconditions.checkNotNull; +import static org.opensearch.hadoop.cfg.ConfigurationOptions.OPENSEARCH_INDEX_READ_MISSING_AS_EMPTY; +import static org.opensearch.hadoop.cfg.ConfigurationOptions.OPENSEARCH_NET_HTTP_AUTH_PASS; +import static org.opensearch.hadoop.cfg.ConfigurationOptions.OPENSEARCH_NET_HTTP_AUTH_USER; +import static org.opensearch.hadoop.cfg.ConfigurationOptions.OPENSEARCH_NET_SSL_KEYSTORE_LOCATION; +import static org.opensearch.hadoop.cfg.ConfigurationOptions.OPENSEARCH_NET_SSL_KEYSTORE_PASS; +import static org.opensearch.hadoop.cfg.ConfigurationOptions.OPENSEARCH_NET_SSL_TRUST_STORE_LOCATION; +import static org.opensearch.hadoop.cfg.ConfigurationOptions.OPENSEARCH_NET_SSL_TRUST_STORE_PASS; +import static org.opensearch.hadoop.cfg.ConfigurationOptions.OPENSEARCH_NET_USE_SSL; +import static org.opensearch.hadoop.cfg.ConfigurationOptions.OPENSEARCH_NODES; +import static org.opensearch.hadoop.cfg.ConfigurationOptions.OPENSEARCH_NODES_WAN_ONLY; +import static zipkin2.internal.DateUtil.midnightUTC; + +public final class OpensearchDependenciesJob { + static final Charset UTF_8 = Charset.forName("UTF-8"); + + private static final Logger log = LoggerFactory.getLogger(OpensearchDependenciesJob.class); + + public static Builder builder() { + return new Builder(); + } + + public static final class Builder { + + String index = getEnv("ES_INDEX", "zipkin"); + String hosts = getEnv("ES_HOSTS", "127.0.0.1"); + String username = getEnv("ES_USERNAME", null); + String password = getEnv("ES_PASSWORD", null); + + final Map sparkProperties = new LinkedHashMap<>(); + + Builder() { + sparkProperties.put("spark.ui.enabled", "false"); + // don't die if there are no spans + sparkProperties.put(OPENSEARCH_INDEX_READ_MISSING_AS_EMPTY, "true"); + sparkProperties.put(OPENSEARCH_NODES_WAN_ONLY, getEnv("ES_NODES_WAN_ONLY", "false")); + sparkProperties.put(OPENSEARCH_NET_SSL_KEYSTORE_LOCATION, + getSystemPropertyAsFileResource("javax.net.ssl.keyStore")); + sparkProperties.put(OPENSEARCH_NET_SSL_KEYSTORE_PASS, + System.getProperty("javax.net.ssl.keyStorePassword", "")); + sparkProperties.put(OPENSEARCH_NET_SSL_TRUST_STORE_LOCATION, + getSystemPropertyAsFileResource("javax.net.ssl.trustStore")); + sparkProperties.put(OPENSEARCH_NET_SSL_TRUST_STORE_PASS, + System.getProperty("javax.net.ssl.trustStorePassword", "")); + } + + // local[*] master lets us run & test the job locally without setting a Spark cluster + String sparkMaster = getEnv("SPARK_MASTER", "local[*]"); + // needed when not in local mode + String[] jars; + Runnable logInitializer; + + // By default, the job only works on traces whose first timestamp is today + long day = midnightUTC(System.currentTimeMillis()); + + /** When set, this indicates which jars to distribute to the cluster. */ + public Builder jars(String... jars) { + this.jars = jars; + return this; + } + + /** The index prefix to use when generating daily index names. Defaults to "zipkin" */ + public Builder index(String index) { + this.index = checkNotNull(index, "index"); + return this; + } + + public Builder hosts(String hosts) { + this.hosts = checkNotNull(hosts, "hosts"); + sparkProperties.put("opensearch.nodes.wan.only", "true"); + return this; + } + + /** username used for basic auth. Needed when Shield or X-Pack security is enabled */ + public Builder username(String username) { + this.username = username; + return this; + } + + /** password used for basic auth. Needed when Shield or X-Pack security is enabled */ + public Builder password(String password) { + this.password = password; + return this; + } + + /** Day (in epoch milliseconds) to process dependencies for. Defaults to today. */ + public Builder day(long day) { + this.day = midnightUTC(day); + return this; + } + + /** Extending more configuration of spark. */ + public Builder conf(Map conf) { + sparkProperties.putAll(conf); + return this; + } + + /** Ensures that logging is set up. Particularly important when in cluster mode. */ + public Builder logInitializer(Runnable logInitializer) { + this.logInitializer = checkNotNull(logInitializer, "logInitializer"); + return this; + } + + public OpensearchDependenciesJob build() { + return new OpensearchDependenciesJob(this); + } + } + + private static String getSystemPropertyAsFileResource(String key) { + String prop = System.getProperty(key, ""); + return prop != null && !prop.isEmpty() ? "file:" + prop : prop; + } + + final String index; + final String dateStamp; + final SparkConf conf; + @Nullable final Runnable logInitializer; + + OpensearchDependenciesJob(Builder builder) { + this.index = builder.index; + String dateSeparator = getEnv("ES_DATE_SEPARATOR", "-"); + SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd".replace("-", dateSeparator)); + df.setTimeZone(TimeZone.getTimeZone("UTC")); + this.dateStamp = df.format(new Date(builder.day)); + this.conf = new SparkConf(true).setMaster(builder.sparkMaster).setAppName(getClass().getName()); + if (builder.sparkMaster.startsWith("local[")) { + conf.set("spark.driver.bindAddress", "127.0.0.1"); + } + if (builder.jars != null) conf.setJars(builder.jars); + if (builder.username != null) conf.set(OPENSEARCH_NET_HTTP_AUTH_USER, builder.username); + if (builder.password != null) conf.set(OPENSEARCH_NET_HTTP_AUTH_PASS, builder.password); + conf.set(OPENSEARCH_NODES, parseHosts(builder.hosts)); + if (builder.hosts.contains("https")) conf.set(OPENSEARCH_NET_USE_SSL, "true"); + for (Map.Entry entry : builder.sparkProperties.entrySet()) { + conf.set(entry.getKey(), entry.getValue()); + log.debug("Spark conf properties: {}={}", entry.getKey(), entry.getValue()); + } + this.logInitializer = builder.logInitializer; + } + + public void run() { + String spanResource = index + "-span-" + dateStamp; + String dependencyLinkResource = index + "-dependency-" + dateStamp; + SpanBytesDecoder decoder = SpanBytesDecoder.JSON_V2; + + log.info("Processing spans from {}", spanResource); + JavaRDD> links; + try (JavaSparkContext sc = new JavaSparkContext(conf)) { + links = JavaOpenSearchSpark.openSearchJsonRDD(sc, spanResource) + .groupBy(JSON_TRACE_ID) + .flatMapValues(new TraceIdAndJsonToDependencyLinks(logInitializer, decoder)) + .values() + .mapToPair((PairFunction, DependencyLink>) l -> + new Tuple2<>(new Tuple2<>(l.parent(), l.child()), l)) + .reduceByKey((l, r) -> DependencyLink.newBuilder() + .parent(l.parent()) + .child(l.child()) + .callCount(l.callCount() + r.callCount()) + .errorCount(l.errorCount() + r.errorCount()) + .build()) + .values() + .map(DEPENDENCY_LINK_JSON); + + if (links.isEmpty()) { + log.info("No dependency links could be processed from spans in index {}", spanResource); + } else { + log.info("Saving dependency links to {}", dependencyLinkResource); + JavaOpenSearchSpark.saveToOpenSearch( + links, + dependencyLinkResource, + Collections.singletonMap("opensearch.mapping.id", "id")); // allows overwriting the link + } + } + + log.info("Done"); + } + + /** + * Same as {@linkplain DependencyLink}, except it adds an ID field so the job can be re-run, + * overwriting a prior run's value for the link. + */ + static final Function> DEPENDENCY_LINK_JSON = l -> { + Map result = new LinkedHashMap<>(); + result.put("id", l.parent() + "|" + l.child()); + result.put("parent", l.parent()); + result.put("child", l.child()); + result.put("callCount", l.callCount()); + result.put("errorCount", l.errorCount()); + return result; + }; + + private static String getEnv(String key, String defaultValue) { + String result = System.getenv(key); + return result != null && !result.isEmpty() ? result : defaultValue; + } + + static String parseHosts(String hosts) { + StringBuilder to = new StringBuilder(); + String[] hostParts = hosts.split(",", -1); + for (int i = 0; i < hostParts.length; i++) { + String host = hostParts[i]; + if (host.startsWith("http")) { + URI httpUri = URI.create(host); + int port = httpUri.getPort(); + if (port == -1) { + port = host.startsWith("https") ? 443 : 80; + } + to.append(httpUri.getHost()).append(":").append(port); + } else { + to.append(host); + } + if (i + 1 < hostParts.length) { + to.append(','); + } + } + return to.toString(); + } + + // defining what could be lambdas here until we update to minimum JRE 8 or retrolambda works. + static final Function, String> JSON_TRACE_ID = + new Function, String>() { + /** returns the lower 64 bits of the trace ID */ + @Override public String call(Tuple2 pair) throws IOException { + JsonReader reader = new JsonReader(new StringReader(pair._2)); + reader.beginObject(); + while (reader.hasNext()) { + String nextName = reader.nextName(); + if (nextName.equals("traceId")) { + String traceId = reader.nextString(); + return traceId.length() > 16 ? traceId.substring(traceId.length() - 16) : traceId; + } else { + reader.skipValue(); + } + } + throw new MalformedJsonException("no traceId in " + pair); + } + + @Override public String toString() { + return "pair._2.traceId"; + } + }; +} diff --git a/opensearch/src/main/java/zipkin2/dependencies/opensearch/TraceIdAndJsonToDependencyLinks.java b/opensearch/src/main/java/zipkin2/dependencies/opensearch/TraceIdAndJsonToDependencyLinks.java new file mode 100644 index 0000000..4123fe0 --- /dev/null +++ b/opensearch/src/main/java/zipkin2/dependencies/opensearch/TraceIdAndJsonToDependencyLinks.java @@ -0,0 +1,49 @@ +/* + * Copyright The OpenZipkin Authors + * SPDX-License-Identifier: Apache-2.0 + */ +package zipkin2.dependencies.opensearch; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import javax.annotation.Nullable; +import org.apache.spark.api.java.function.FlatMapFunction; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import scala.Tuple2; +import zipkin2.DependencyLink; +import zipkin2.Span; +import zipkin2.codec.SpanBytesDecoder; +import zipkin2.internal.DependencyLinker; + +final class TraceIdAndJsonToDependencyLinks + implements Serializable, FlatMapFunction>, DependencyLink> { + private static final long serialVersionUID = 0L; + private static final Logger log = LoggerFactory.getLogger(TraceIdAndJsonToDependencyLinks.class); + + @Nullable final Runnable logInitializer; + final SpanBytesDecoder decoder; + + TraceIdAndJsonToDependencyLinks(Runnable logInitializer, SpanBytesDecoder decoder) { + this.logInitializer = logInitializer; + this.decoder = decoder; + } + + @Override + public Iterator call(Iterable> traceIdJson) { + if (logInitializer != null) logInitializer.run(); + List sameTraceId = new ArrayList<>(); + for (Tuple2 row : traceIdJson) { + try { + decoder.decode(row._2.getBytes(OpensearchDependenciesJob.UTF_8), sameTraceId); + } catch (Exception e) { + log.warn("Unable to decode span from traces where trace_id=" + row._1, e); + } + } + DependencyLinker linker = new DependencyLinker(); + linker.putTrace(sameTraceId); + return linker.link().iterator(); + } +} diff --git a/opensearch/src/main/java/zipkin2/dependencies/opensearch/package-info.java b/opensearch/src/main/java/zipkin2/dependencies/opensearch/package-info.java new file mode 100644 index 0000000..5a2acb8 --- /dev/null +++ b/opensearch/src/main/java/zipkin2/dependencies/opensearch/package-info.java @@ -0,0 +1,6 @@ +/* + * Copyright The OpenZipkin Authors + * SPDX-License-Identifier: Apache-2.0 + */ +@javax.annotation.ParametersAreNonnullByDefault +package zipkin2.dependencies.opensearch; diff --git a/opensearch/src/test/java/zipkin2/dependencies/opensearch/OpensearchDependenciesJobTest.java b/opensearch/src/test/java/zipkin2/dependencies/opensearch/OpensearchDependenciesJobTest.java new file mode 100644 index 0000000..1fd4544 --- /dev/null +++ b/opensearch/src/test/java/zipkin2/dependencies/opensearch/OpensearchDependenciesJobTest.java @@ -0,0 +1,124 @@ +/* + * Copyright The OpenZipkin Authors + * SPDX-License-Identifier: Apache-2.0 + */ +package zipkin2.dependencies.opensearch; + +import java.io.IOException; +import java.util.Base64; +import okhttp3.mockwebserver.MockResponse; +import okhttp3.mockwebserver.MockWebServer; + +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.opensearch.hadoop.OpenSearchHadoopException; + +import static java.nio.charset.StandardCharsets.UTF_8; +import static okhttp3.mockwebserver.SocketPolicy.DISCONNECT_AT_START; +import static okhttp3.tls.internal.TlsUtil.localhost; +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +class OpensearchDependenciesJobTest { + MockWebServer es = new MockWebServer(); + + @BeforeEach void start() throws IOException { + es.start(); + } + + @AfterEach void stop() throws IOException { + es.close(); + } + + @Test void buildHttps() { + OpensearchDependenciesJob job = + OpensearchDependenciesJob.builder().hosts("https://foobar").build(); + assertThat(job.conf.get("opensearch.nodes")).isEqualTo("foobar:443"); + assertThat(job.conf.get("opensearch.net.ssl")).isEqualTo("true"); + } + + @Test void buildAuth() { + OpensearchDependenciesJob job = + OpensearchDependenciesJob.builder().username("foo").password("bar").build(); + assertThat(job.conf.get("opensearch.net.http.auth.user")).isEqualTo("foo"); + assertThat(job.conf.get("opensearch.net.http.auth.pass")).isEqualTo("bar"); + } + + @Test void authWorks() throws Exception { + es.enqueue(new MockResponse()); // let the HEAD request pass, so we can trap the header value + es.enqueue(new MockResponse().setSocketPolicy(DISCONNECT_AT_START)); // kill the job + OpensearchDependenciesJob job = OpensearchDependenciesJob.builder() + .username("foo") + .password("bar") + .hosts(es.url("").toString()) + .build(); + + assertThatThrownBy(job::run) + .isInstanceOf(OpenSearchHadoopException.class); + + String encoded = Base64.getEncoder().encodeToString("foo:bar".getBytes(UTF_8)); + assertThat(es.takeRequest().getHeader("Authorization")) + .isEqualTo("Basic " + encoded.trim()); + } + + @Test void authWorksWithSsl() throws Exception { + es.useHttps(localhost().sslSocketFactory(), false); + + es.enqueue(new MockResponse()); // let the HEAD request pass, so we can trap the header value + es.enqueue(new MockResponse().setSocketPolicy(DISCONNECT_AT_START)); // kill the job + + OpensearchDependenciesJob.Builder builder = OpensearchDependenciesJob.builder() + .username("foo") + .password("bar") + .hosts(es.url("").toString()); + + // temporarily hack-in self-signed until https://github.com/openzipkin/zipkin/issues/1683 + builder.sparkProperties.put("opensearch.net.ssl.cert.allow.self.signed", "true"); + + OpensearchDependenciesJob job = builder.build(); + + assertThatThrownBy(job::run) + .isInstanceOf(OpenSearchHadoopException.class); + + String encoded = Base64.getEncoder().encodeToString("foo:bar".getBytes(UTF_8)); + assertThat(es.takeRequest().getHeader("Authorization")) + .isEqualTo("Basic " + encoded.trim()); + } + + @Test void parseHosts_default() { + assertThat(OpensearchDependenciesJob.parseHosts("1.1.1.1")).isEqualTo("1.1.1.1"); + } + + @Test void parseHosts_commaDelimits() { + assertThat(OpensearchDependenciesJob.parseHosts("1.1.1.1:9200,2.2.2.2:9200")).isEqualTo( + "1.1.1.1:9200,2.2.2.2:9200"); + } + + @Test void parseHosts_http_defaultPort() { + assertThat(OpensearchDependenciesJob.parseHosts("http://1.1.1.1")).isEqualTo("1.1.1.1:80"); + } + + @Test void parseHosts_https_defaultPort() { + assertThat(OpensearchDependenciesJob.parseHosts("https://1.1.1.1")).isEqualTo("1.1.1.1:443"); + } + + @Test void javaSslOptsRedirected() { + System.setProperty("javax.net.ssl.keyStore", "keystore.jks"); + System.setProperty("javax.net.ssl.keyStorePassword", "superSecret"); + System.setProperty("javax.net.ssl.trustStore", "truststore.jks"); + System.setProperty("javax.net.ssl.trustStorePassword", "secretSuper"); + + OpensearchDependenciesJob job = OpensearchDependenciesJob.builder().build(); + + assertThat(job.conf.get("opensearch.net.ssl.keystore.location")).isEqualTo("file:keystore.jks"); + assertThat(job.conf.get("opensearch.net.ssl.keystore.pass")).isEqualTo("superSecret"); + assertThat(job.conf.get("opensearch.net.ssl.truststore.location")).isEqualTo("file:truststore.jks"); + assertThat(job.conf.get("opensearch.net.ssl.truststore.pass")).isEqualTo("secretSuper"); + + System.clearProperty("javax.net.ssl.keyStore"); + System.clearProperty("javax.net.ssl.keyStorePassword"); + System.clearProperty("javax.net.ssl.trustStore"); + System.clearProperty("javax.net.ssl.trustStorePassword"); + } +} diff --git a/opensearch/src/test/java/zipkin2/storage/elasticsearch/ITOpensearchDependenciesHeavyV2.java b/opensearch/src/test/java/zipkin2/storage/elasticsearch/ITOpensearchDependenciesHeavyV2.java new file mode 100644 index 0000000..6e288cf --- /dev/null +++ b/opensearch/src/test/java/zipkin2/storage/elasticsearch/ITOpensearchDependenciesHeavyV2.java @@ -0,0 +1,34 @@ +/* + * Copyright The OpenZipkin Authors + * SPDX-License-Identifier: Apache-2.0 + */ +package zipkin2.storage.elasticsearch; + +import java.io.IOException; +import java.util.List; +import org.junit.jupiter.api.Tag; +import org.junit.jupiter.api.TestInfo; +import org.testcontainers.junit.jupiter.Container; +import org.testcontainers.junit.jupiter.Testcontainers; +import zipkin2.Span; +import zipkin2.elasticsearch.ElasticsearchStorage; +import zipkin2.storage.ITDependenciesHeavy; + +@Tag("docker") +@Tag("opensearch2") +@Testcontainers(disabledWithoutDocker = true) +class ITOpensearchDependenciesHeavyV2 extends ITDependenciesHeavy { + @Container static OpensearchContainer opensearch = new OpensearchContainer(2); + + @Override protected ElasticsearchStorage.Builder newStorageBuilder(TestInfo testInfo) { + return opensearch.newStorageBuilder(); + } + + @Override public void clear() throws IOException { + storage.clear(); + } + + @Override protected void processDependencies(List spans) throws Exception { + opensearch.processDependencies(storage, spans); + } +} diff --git a/opensearch/src/test/java/zipkin2/storage/elasticsearch/ITOpensearchDependenciesV2.java b/opensearch/src/test/java/zipkin2/storage/elasticsearch/ITOpensearchDependenciesV2.java new file mode 100644 index 0000000..2494df0 --- /dev/null +++ b/opensearch/src/test/java/zipkin2/storage/elasticsearch/ITOpensearchDependenciesV2.java @@ -0,0 +1,34 @@ +/* + * Copyright The OpenZipkin Authors + * SPDX-License-Identifier: Apache-2.0 + */ +package zipkin2.storage.elasticsearch; + +import java.io.IOException; +import java.util.List; +import org.junit.jupiter.api.Tag; +import org.junit.jupiter.api.TestInfo; +import org.testcontainers.junit.jupiter.Container; +import org.testcontainers.junit.jupiter.Testcontainers; +import zipkin2.Span; +import zipkin2.elasticsearch.ElasticsearchStorage; +import zipkin2.storage.ITDependencies; + +@Tag("docker") +@Tag("opensearch2") +@Testcontainers(disabledWithoutDocker = true) +class ITOpensearchDependenciesV2 extends ITDependencies { + @Container static OpensearchContainer opensearch = new OpensearchContainer(2); + + @Override protected ElasticsearchStorage.Builder newStorageBuilder(TestInfo testInfo) { + return opensearch.newStorageBuilder(); + } + + @Override public void clear() throws IOException { + storage.clear(); + } + + @Override protected void processDependencies(List spans) throws Exception { + opensearch.processDependencies(storage, spans); + } +} diff --git a/opensearch/src/test/java/zipkin2/storage/elasticsearch/IgnoredDeprecationWarnings.java b/opensearch/src/test/java/zipkin2/storage/elasticsearch/IgnoredDeprecationWarnings.java new file mode 100644 index 0000000..5d00114 --- /dev/null +++ b/opensearch/src/test/java/zipkin2/storage/elasticsearch/IgnoredDeprecationWarnings.java @@ -0,0 +1,24 @@ +/* + * Copyright The OpenZipkin Authors + * SPDX-License-Identifier: Apache-2.0 + */ +package zipkin2.storage.elasticsearch; + +import java.util.List; +import java.util.regex.Pattern; + +import static java.util.Arrays.asList; + +/** + * When OS emits a deprecation warning header in response to a method being called, the integration + * test will fail. We cannot always fix our code however to take into account all deprecation + * warnings, as we have to support multiple versions of ES. For these cases, add the warning message + * to {@link #IGNORE_THESE_WARNINGS} array so it will not raise an exception anymore. + */ +abstract class IgnoredDeprecationWarnings { + + // These will be matched using header.contains(ignored[i]), so find a unique substring of the + // warning header for it to be ignored + static List IGNORE_THESE_WARNINGS = asList( + ); +} diff --git a/opensearch/src/test/java/zipkin2/storage/elasticsearch/OpensearchContainer.java b/opensearch/src/test/java/zipkin2/storage/elasticsearch/OpensearchContainer.java new file mode 100644 index 0000000..cb7ae44 --- /dev/null +++ b/opensearch/src/test/java/zipkin2/storage/elasticsearch/OpensearchContainer.java @@ -0,0 +1,114 @@ +/* + * Copyright The OpenZipkin Authors + * SPDX-License-Identifier: Apache-2.0 + */ +package zipkin2.storage.elasticsearch; + +import com.linecorp.armeria.client.ClientFactory; +import com.linecorp.armeria.client.ClientOptions; +import com.linecorp.armeria.client.ClientOptionsBuilder; +import com.linecorp.armeria.client.WebClient; +import com.linecorp.armeria.client.WebClientBuilder; +import com.linecorp.armeria.client.logging.ContentPreviewingClient; +import com.linecorp.armeria.client.logging.LoggingClient; +import com.linecorp.armeria.client.logging.LoggingClientBuilder; +import com.linecorp.armeria.common.HttpResponse; +import com.linecorp.armeria.common.logging.LogLevel; +import java.io.IOException; +import java.util.List; +import java.util.Set; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testcontainers.containers.GenericContainer; +import org.testcontainers.containers.output.Slf4jLogConsumer; +import org.testcontainers.containers.wait.strategy.Wait; +import zipkin2.Span; +import zipkin2.dependencies.opensearch.OpensearchDependenciesJob; +import zipkin2.elasticsearch.ElasticsearchStorage; + +import static org.testcontainers.utility.DockerImageName.parse; +import static zipkin2.storage.ITDependencies.aggregateLinks; + +class OpensearchContainer extends GenericContainer { + static final Logger LOGGER = LoggerFactory.getLogger(OpensearchContainer.class); + + OpensearchContainer(int majorVersion) { + super(parse("ghcr.io/openzipkin/zipkin-opensearch" + majorVersion + ":3.3.0")); + addExposedPort(9200); + waitStrategy = Wait.forHealthcheck(); + withLogConsumer(new Slf4jLogConsumer(LOGGER)); + } + + @Override public void start() { + super.start(); + LOGGER.info("Using baseUrl http://" + hostPort()); + } + + ElasticsearchStorage.Builder newStorageBuilder() { + + WebClientBuilder builder = WebClient.builder("http://" + hostPort()); + builder.decorator((delegate, ctx, req) -> { + final HttpResponse response = delegate.execute(ctx, req); + return HttpResponse.of(response.aggregate().thenApply(r -> { + // OS will return a 'warning' response header when using deprecated api, detect this and + // fail early, so we can do something about it. + // Example usage: https://github.com/elastic/elasticsearch/blob/3049e55f093487bb582a7e49ad624961415ba31c/x-pack/plugin/security/src/internalClusterTest/java/org/elasticsearch/integration/IndexPrivilegeIntegTests.java#L559 + final String warningHeader = r.headers().get("warning"); + if (warningHeader != null) { + if (IgnoredDeprecationWarnings.IGNORE_THESE_WARNINGS.stream().noneMatch(p -> p.matcher(warningHeader).find())) { + throw new IllegalArgumentException( + "Detected usage of deprecated API for request " + req + ":\n" + warningHeader); + } + } + // Convert AggregatedHttpResponse back to HttpResponse. + return r.toHttpResponse(); + })); + }); + + // When ES_DEBUG=true log full headers, request and response body to the category + // com.linecorp.armeria.client.logging + if (Boolean.parseBoolean(System.getenv("ES_DEBUG"))) { + ClientOptionsBuilder options = ClientOptions.builder(); + LoggingClientBuilder loggingBuilder = LoggingClient.builder() + .requestLogLevel(LogLevel.INFO) + .successfulResponseLogLevel(LogLevel.INFO); + options.decorator(loggingBuilder.newDecorator()); + options.decorator(ContentPreviewingClient.newDecorator(Integer.MAX_VALUE)); + builder.options(options.build()); + } + + WebClient client = builder.build(); + return ElasticsearchStorage.newBuilder(new ElasticsearchStorage.LazyHttpClient() { + @Override public WebClient get() { + return client; + } + + @Override public void close() { + client.endpointGroup().close(); + } + + @Override public String toString() { + return client.uri().toString(); + } + }).flushOnWrites(true); + } + + String hostPort() { + return getHost() + ":" + getMappedPort(9200); + } + + /** + * This processes the job as if it were a batch. For each day we had traces, run the job again. + */ + void processDependencies(ElasticsearchStorage storage, List spans) throws IOException { + storage.spanConsumer().accept(spans).execute(); + + // aggregate links in memory to determine which days they are in + Set days = aggregateLinks(spans).keySet(); + + // process the job for each day of links. + for (long day : days) { + OpensearchDependenciesJob.builder().hosts(hostPort()).day(day).build().run(); + } + } +} diff --git a/opensearch/src/test/resources/log4j2.properties b/opensearch/src/test/resources/log4j2.properties new file mode 100644 index 0000000..751fe39 --- /dev/null +++ b/opensearch/src/test/resources/log4j2.properties @@ -0,0 +1,31 @@ +# Maven configuration conflicts on simplelogger vs Log4J2, but IntelliJ unit tests use Log4J2 +appenders=console +appender.console.type=Console +appender.console.name=STDOUT +appender.console.layout.type=PatternLayout +appender.console.layout.pattern=%d{ABSOLUTE} %-5p [%t] %C{2} (%F:%L) - %m%n +rootLogger.level=warn +rootLogger.appenderRefs=stdout +rootLogger.appenderRef.stdout.ref=STDOUT + +# hush warning about loading native code +logger.util.name=org.apache.hadoop.util +logger.util.level=error + +# set to debug to see storage details +logger.zipkin.name=zipkin2 +logger.zipkin.level=warn + +# set to debug to see configuration and when the job starts and completes +logger.dependencies-opensearch.name=zipkin2.dependencies.opensearch +logger.dependencies-opensearch.level=warn + +# set to info to see feedback about starting the container +logger.testcontainers.name=org.testcontainers +logger.testcontainers.level=warn +logger.container.name=zipkin2.storage.opensearch.OpensearchContainer +logger.container.level=warn + +# uncomment to see outbound client connections (useful in OpenSearch troubleshooting) +#logger.client.name=com.linecorp.armeria.client +#logger.client.level=info diff --git a/pom.xml b/pom.xml index 96c80ea..fe1012d 100755 --- a/pom.xml +++ b/pom.xml @@ -17,6 +17,7 @@ cassandra3 mysql elasticsearch + opensearch main @@ -74,6 +75,9 @@ 8.14.0-SNAPSHOT + + 1.2.0 + 3.4.1