Skip to content

Commit

Permalink
Update Accumulo to 3.0.0
Browse files Browse the repository at this point in the history
  • Loading branch information
wendigo committed Feb 20, 2024
1 parent 266e9d0 commit 9ffcf42
Show file tree
Hide file tree
Showing 19 changed files with 289 additions and 193 deletions.
21 changes: 11 additions & 10 deletions plugin/trino-accumulo-iterators/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@

<properties>
<air.main.basedir>${project.parent.basedir}</air.main.basedir>
<project.build.targetJdk>8</project.build.targetJdk>
<project.build.targetJdk>11</project.build.targetJdk>
</properties>

<dependencies>
Expand All @@ -24,22 +24,23 @@
<artifactId>guava</artifactId>
</dependency>

<dependency>
<groupId>io.prestosql.hadoop</groupId>
<artifactId>hadoop-apache</artifactId>
<version>${dep.accumulo-hadoop.version}</version>
<scope>provided</scope>
</dependency>

<dependency>
<groupId>org.apache.accumulo</groupId>
<artifactId>accumulo-core</artifactId>
<version>${dep.accumulo.version}</version>
<scope>provided</scope>
<exclusions>
<exclusion>
<groupId>*</groupId>
<artifactId>*</artifactId>
<groupId>commons-logging</groupId>
<artifactId>commons-logging</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client-runtime</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-1.2-api</artifactId>
</exclusion>
</exclusions>
</dependency>
Expand Down
82 changes: 53 additions & 29 deletions plugin/trino-accumulo/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -15,19 +15,9 @@

<properties>
<air.main.basedir>${project.parent.basedir}</air.main.basedir>
<dep.curator.version>2.13.0</dep.curator.version>
<dep.curator.version>5.6.0</dep.curator.version>
</properties>

<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.apache.thrift</groupId>
<artifactId>libthrift</artifactId>
<version>0.9.3-1</version>
</dependency>
</dependencies>
</dependencyManagement>

<dependencies>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
Expand Down Expand Up @@ -74,12 +64,6 @@
<artifactId>units</artifactId>
</dependency>

<dependency>
<groupId>io.prestosql.hadoop</groupId>
<artifactId>hadoop-apache</artifactId>
<version>${dep.accumulo-hadoop.version}</version>
</dependency>

<dependency>
<groupId>io.trino</groupId>
<artifactId>trino-accumulo-iterators</artifactId>
Expand All @@ -96,6 +80,11 @@
<artifactId>trino-plugin-toolkit</artifactId>
</dependency>

<dependency>
<groupId>io.trino.hadoop</groupId>
<artifactId>hadoop-apache</artifactId>
</dependency>

<dependency>
<groupId>jakarta.annotation</groupId>
<artifactId>jakarta.annotation-api</artifactId>
Expand Down Expand Up @@ -131,6 +120,10 @@
<groupId>commons-logging</groupId>
<artifactId>commons-logging</artifactId>
</exclusion>
<exclusion>
<groupId>jakarta.servlet</groupId>
<artifactId>jakarta.servlet-api</artifactId>
</exclusion>
<exclusion>
<groupId>jline</groupId>
<artifactId>jline</artifactId>
Expand All @@ -144,24 +137,20 @@
<artifactId>hadoop-client</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.htrace</groupId>
<artifactId>htrace-core</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.maven.scm</groupId>
<artifactId>maven-scm-api</artifactId>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client-api</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.maven.scm</groupId>
<artifactId>maven-scm-provider-svnexe</artifactId>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client-runtime</artifactId>
</exclusion>
<exclusion>
<groupId>org.codehaus.plexus</groupId>
<artifactId>plexus-utils</artifactId>
<groupId>org.apache.htrace</groupId>
<artifactId>htrace-core</artifactId>
</exclusion>
<exclusion>
<groupId>org.codehaus.plexus</groupId>
<artifactId>plexus-utils</artifactId>
<groupId>org.hdrhistogram</groupId>
<artifactId>HdrHistogram</artifactId>
</exclusion>
</exclusions>
</dependency>
Expand Down Expand Up @@ -231,6 +220,12 @@
</exclusions>
</dependency>

<dependency>
<groupId>io.dropwizard.metrics</groupId>
<artifactId>metrics-core</artifactId>
<scope>runtime</scope>
</dependency>

<dependency>
<groupId>com.github.docker-java</groupId>
<artifactId>docker-java-api</artifactId>
Expand Down Expand Up @@ -291,6 +286,35 @@
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.apache.accumulo</groupId>
<artifactId>accumulo-minicluster</artifactId>
<version>${dep.accumulo.version}</version>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>jakarta.servlet</groupId>
<artifactId>jakarta.servlet-api</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client-api</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client-runtime</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-core</artifactId>
</exclusion>
<exclusion>
<groupId>org.glassfish.hk2.external</groupId>
<artifactId>aopalliance-repackaged</artifactId>
</exclusion>
</exclusions>
</dependency>

<dependency>
<groupId>org.assertj</groupId>
<artifactId>assertj-core</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,9 @@
import io.trino.spi.connector.TableNotFoundException;
import io.trino.spi.predicate.Domain;
import io.trino.spi.type.TimestampType;
import org.apache.accumulo.core.client.AccumuloClient;
import org.apache.accumulo.core.client.AccumuloException;
import org.apache.accumulo.core.client.AccumuloSecurityException;
import org.apache.accumulo.core.client.Connector;
import org.apache.accumulo.core.client.IteratorSetting;
import org.apache.accumulo.core.client.Scanner;
import org.apache.accumulo.core.data.Key;
Expand Down Expand Up @@ -91,26 +91,26 @@ public class AccumuloMetadataManager
private final ZooKeeperMetadataManager metaManager;
private final Authorizations auths;
private final AccumuloTableManager tableManager;
private final Connector connector;
private final AccumuloClient client;
private final IndexLookup indexLookup;
private final String username;

@Inject
public AccumuloMetadataManager(
Connector connector,
AccumuloClient client,
AccumuloConfig config,
ZooKeeperMetadataManager metaManager,
AccumuloTableManager tableManager,
IndexLookup indexLookup)
throws AccumuloException, AccumuloSecurityException
{
this.connector = requireNonNull(connector, "connector is null");
this.client = requireNonNull(client, "client is null");
this.username = config.getUsername();
this.metaManager = requireNonNull(metaManager, "metaManager is null");
this.tableManager = requireNonNull(tableManager, "tableManager is null");
this.indexLookup = requireNonNull(indexLookup, "indexLookup is null");

this.auths = connector.securityOperations().getUserAuthorizations(username);
this.auths = client.securityOperations().getUserAuthorizations(username);

// The default namespace is created in ZooKeeperMetadataManager's constructor
if (!tableManager.namespaceExists(DEFAULT_SCHEMA)) {
Expand Down Expand Up @@ -773,7 +773,7 @@ private Authorizations getScanAuthorizations(ConnectorSession session, String sc
{
String sessionScanUser = AccumuloSessionProperties.getScanUsername(session);
if (sessionScanUser != null) {
Authorizations scanAuths = connector.securityOperations().getUserAuthorizations(sessionScanUser);
Authorizations scanAuths = client.securityOperations().getUserAuthorizations(sessionScanUser);
LOG.debug("Using session scan auths for user %s: %s", sessionScanUser, scanAuths);
return scanAuths;
}
Expand Down Expand Up @@ -805,7 +805,7 @@ private Collection<Range> splitByTabletBoundaries(String tableName, Collection<R
}
else {
// Call out to Accumulo to split the range on tablets
rangeBuilder.addAll(connector.tableOperations().splitRangeByTablets(tableName, range, Integer.MAX_VALUE));
rangeBuilder.addAll(client.tableOperations().splitRangeByTablets(tableName, range, Integer.MAX_VALUE));
}
}
return rangeBuilder.build();
Expand All @@ -822,10 +822,10 @@ private Optional<String> getTabletLocation(String table, Key key)
{
try {
// Get the Accumulo table ID so we can scan some fun stuff
String tableId = connector.tableOperations().tableIdMap().get(table);
String tableId = client.tableOperations().tableIdMap().get(table);

// Create our scanner against the metadata table, fetching 'loc' family
Scanner scanner = connector.createScanner("accumulo.metadata", auths);
Scanner scanner = client.createScanner("accumulo.metadata", auths);
scanner.fetchColumnFamily(new Text("loc"));

// Set the scan range to just this table, from the table ID to the default tablet
Expand Down Expand Up @@ -895,10 +895,10 @@ private Optional<String> getTabletLocation(String table, Key key)
private Optional<String> getDefaultTabletLocation(String fulltable)
{
try {
String tableId = connector.tableOperations().tableIdMap().get(fulltable);
String tableId = client.tableOperations().tableIdMap().get(fulltable);

// Create a scanner over the metadata table, fetching the 'loc' column of the default tablet row
Scanner scan = connector.createScanner("accumulo.metadata", connector.securityOperations().getUserAuthorizations(username));
Scanner scan = client.createScanner("accumulo.metadata", client.securityOperations().getUserAuthorizations(username));
scan.fetchColumnFamily(new Text("loc"));
scan.setRange(new Range(tableId + '<'));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,18 +29,11 @@
import io.trino.plugin.accumulo.io.AccumuloRecordSetProvider;
import io.trino.plugin.accumulo.metadata.AccumuloTable;
import io.trino.plugin.accumulo.metadata.ZooKeeperMetadataManager;
import io.trino.spi.TrinoException;
import org.apache.accumulo.core.client.AccumuloException;
import org.apache.accumulo.core.client.AccumuloSecurityException;
import org.apache.accumulo.core.client.Connector;
import org.apache.accumulo.core.client.Instance;
import org.apache.accumulo.core.client.ZooKeeperInstance;
import org.apache.accumulo.core.client.security.tokens.PasswordToken;
import org.apache.accumulo.core.client.Accumulo;
import org.apache.accumulo.core.client.AccumuloClient;

import static io.airlift.configuration.ConfigBinder.configBinder;
import static io.airlift.json.JsonCodecBinder.jsonCodecBinder;
import static io.trino.plugin.accumulo.AccumuloErrorCode.UNEXPECTED_ACCUMULO_ERROR;
import static java.nio.charset.StandardCharsets.UTF_8;

/**
* Trino module to do all kinds of run Guice injection stuff!
Expand All @@ -66,25 +59,25 @@ public void configure(Binder binder)
binder.bind(AccumuloTableManager.class).in(Scopes.SINGLETON);
binder.bind(IndexLookup.class).in(Scopes.SINGLETON);
binder.bind(ColumnCardinalityCache.class).in(Scopes.SINGLETON);
binder.bind(Connector.class).toProvider(ConnectorProvider.class);
binder.bind(AccumuloClient.class).toProvider(ClientProvider.class).in(Scopes.SINGLETON);

configBinder(binder).bindConfig(AccumuloConfig.class);

jsonCodecBinder(binder).bindMapJsonCodec(String.class, JsonCodec.listJsonCodec(AccumuloTable.class));
}

private static class ConnectorProvider
implements Provider<Connector>
private static class ClientProvider
implements Provider<AccumuloClient>
{
private static final Logger LOG = Logger.get(ConnectorProvider.class);
private static final Logger LOG = Logger.get(ClientProvider.class);

private final String instance;
private final String zooKeepers;
private final String username;
private final String password;

@Inject
public ConnectorProvider(AccumuloConfig config)
public ClientProvider(AccumuloConfig config)
{
this.instance = config.getInstance();
this.zooKeepers = config.getZooKeepers();
Expand All @@ -93,17 +86,14 @@ public ConnectorProvider(AccumuloConfig config)
}

@Override
public Connector get()
public AccumuloClient get()
{
try {
Instance inst = new ZooKeeperInstance(instance, zooKeepers);
Connector connector = inst.getConnector(username, new PasswordToken(password.getBytes(UTF_8)));
LOG.info("Connection to instance %s at %s established, user %s", instance, zooKeepers, username);
return connector;
}
catch (AccumuloException | AccumuloSecurityException e) {
throw new TrinoException(UNEXPECTED_ACCUMULO_ERROR, "Failed to get connector to Accumulo", e);
}
AccumuloClient client = Accumulo.newClient()
.to(instance, zooKeepers)
.as(username, password)
.build();
LOG.info("Connection to instance %s at %s established, user %s", instance, zooKeepers, username);
return client;
}
}
}
Loading

0 comments on commit 9ffcf42

Please sign in to comment.