Skip to content

Commit

Permalink
Rename AccumuloClient to AccumuloMetadataManager
Browse files Browse the repository at this point in the history
  • Loading branch information
wendigo committed Feb 20, 2024
1 parent 4fcaada commit 266e9d0
Show file tree
Hide file tree
Showing 8 changed files with 54 additions and 54 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -64,28 +64,28 @@

/**
* Trino metadata provider for Accumulo.
* Responsible for creating/dropping/listing tables, schemas, columns, all sorts of goodness. Heavily leverages {@link AccumuloClient}.
* Responsible for creating/dropping/listing tables, schemas, columns, all sorts of goodness. Heavily leverages {@link AccumuloMetadataManager}.
*/
public class AccumuloMetadata
implements ConnectorMetadata
{
private static final JsonCodec<ConnectorViewDefinition> VIEW_CODEC =
new JsonCodecFactory(new ObjectMapperProvider()).jsonCodec(ConnectorViewDefinition.class);

private final AccumuloClient client;
private final AccumuloMetadataManager metadataManager;
private final AtomicReference<Runnable> rollbackAction = new AtomicReference<>();

@Inject
public AccumuloMetadata(AccumuloClient client)
public AccumuloMetadata(AccumuloMetadataManager metadataManager)
{
this.client = requireNonNull(client, "client is null");
this.metadataManager = requireNonNull(metadataManager, "metadataManager is null");
}

@Override
public void createSchema(ConnectorSession session, String schemaName, Map<String, Object> properties, TrinoPrincipal owner)
{
checkArgument(properties.isEmpty(), "Can't have properties for schema creation");
client.createSchema(schemaName);
metadataManager.createSchema(schemaName);
}

@Override
Expand All @@ -94,7 +94,7 @@ public void dropSchema(ConnectorSession session, String schemaName, boolean casc
if (cascade) {
throw new TrinoException(NOT_SUPPORTED, "This connector does not support dropping schemas with CASCADE option");
}
client.dropSchema(schemaName);
metadataManager.dropSchema(schemaName);
}

@Override
Expand All @@ -110,7 +110,7 @@ public ConnectorOutputTableHandle beginCreateTable(ConnectorSession session, Con
checkNoRollback();

SchemaTableName tableName = tableMetadata.getTable();
AccumuloTable table = client.createTable(tableMetadata);
AccumuloTable table = metadataManager.createTable(tableMetadata);

AccumuloTableHandle handle = new AccumuloTableHandle(
tableName.getSchemaName(),
Expand All @@ -134,7 +134,7 @@ public Optional<ConnectorOutputMetadata> finishCreateTable(ConnectorSession sess

private void rollbackCreateTable(AccumuloTable table)
{
client.dropTable(table);
metadataManager.dropTable(table);
}

@Override
Expand All @@ -143,53 +143,53 @@ public void createTable(ConnectorSession session, ConnectorTableMetadata tableMe
if (tableMetadata.getComment().isPresent()) {
throw new TrinoException(NOT_SUPPORTED, "This connector does not support creating tables with table comment");
}
client.createTable(tableMetadata);
metadataManager.createTable(tableMetadata);
}

@Override
public void dropTable(ConnectorSession session, ConnectorTableHandle tableHandle)
{
AccumuloTableHandle handle = (AccumuloTableHandle) tableHandle;
AccumuloTable table = client.getTable(handle.toSchemaTableName());
AccumuloTable table = metadataManager.getTable(handle.toSchemaTableName());
if (table != null) {
client.dropTable(table);
metadataManager.dropTable(table);
}
}

@Override
public void renameTable(ConnectorSession session, ConnectorTableHandle tableHandle,
SchemaTableName newTableName)
{
if (client.getTable(newTableName) != null) {
if (metadataManager.getTable(newTableName) != null) {
throw new TrinoException(ACCUMULO_TABLE_EXISTS, "Table " + newTableName + " already exists");
}

AccumuloTableHandle handle = (AccumuloTableHandle) tableHandle;
client.renameTable(handle.toSchemaTableName(), newTableName);
metadataManager.renameTable(handle.toSchemaTableName(), newTableName);
}

@Override
public void createView(ConnectorSession session, SchemaTableName viewName, ConnectorViewDefinition definition, boolean replace)
{
String viewData = VIEW_CODEC.toJson(definition);
if (replace) {
client.createOrReplaceView(viewName, viewData);
metadataManager.createOrReplaceView(viewName, viewData);
}
else {
client.createView(viewName, viewData);
metadataManager.createView(viewName, viewData);
}
}

@Override
public void dropView(ConnectorSession session, SchemaTableName viewName)
{
client.dropView(viewName);
metadataManager.dropView(viewName);
}

@Override
public Optional<ConnectorViewDefinition> getView(ConnectorSession session, SchemaTableName viewName)
{
return Optional.ofNullable(client.getView(viewName))
return Optional.ofNullable(metadataManager.getView(viewName))
.map(view -> VIEW_CODEC.fromJson(view.getData()));
}

Expand All @@ -209,13 +209,13 @@ private List<SchemaTableName> listViews(Optional<String> filterSchema)
{
ImmutableList.Builder<SchemaTableName> builder = ImmutableList.builder();
if (filterSchema.isPresent()) {
for (String view : client.getViewNames(filterSchema.get())) {
for (String view : metadataManager.getViewNames(filterSchema.get())) {
builder.add(new SchemaTableName(filterSchema.get(), view));
}
}
else {
for (String schemaName : client.getSchemaNames()) {
for (String view : client.getViewNames(schemaName)) {
for (String schemaName : metadataManager.getSchemaNames()) {
for (String view : metadataManager.getViewNames(schemaName)) {
builder.add(new SchemaTableName(schemaName, view));
}
}
Expand Down Expand Up @@ -264,7 +264,7 @@ public ConnectorTableHandle getTableHandle(ConnectorSession session, SchemaTable

// Need to validate that SchemaTableName is a table
if (!this.listViews(session, Optional.of(tableName.getSchemaName())).contains(tableName)) {
AccumuloTable table = client.getTable(tableName);
AccumuloTable table = metadataManager.getTable(tableName);
if (table == null) {
return null;
}
Expand Down Expand Up @@ -298,7 +298,7 @@ public Map<String, ColumnHandle> getColumnHandles(ConnectorSession session, Conn
{
AccumuloTableHandle handle = (AccumuloTableHandle) tableHandle;

AccumuloTable table = client.getTable(handle.toSchemaTableName());
AccumuloTable table = metadataManager.getTable(handle.toSchemaTableName());
if (table == null) {
throw new TableNotFoundException(handle.toSchemaTableName());
}
Expand All @@ -321,29 +321,29 @@ public void renameColumn(ConnectorSession session, ConnectorTableHandle tableHan
{
AccumuloTableHandle handle = (AccumuloTableHandle) tableHandle;
AccumuloColumnHandle columnHandle = (AccumuloColumnHandle) source;
AccumuloTable table = client.getTable(handle.toSchemaTableName());
AccumuloTable table = metadataManager.getTable(handle.toSchemaTableName());
if (table == null) {
throw new TableNotFoundException(new SchemaTableName(handle.getSchema(), handle.getTable()));
}

client.renameColumn(table, columnHandle.getName(), target);
metadataManager.renameColumn(table, columnHandle.getName(), target);
}

@Override
public List<String> listSchemaNames(ConnectorSession session)
{
return ImmutableList.copyOf(client.getSchemaNames());
return ImmutableList.copyOf(metadataManager.getSchemaNames());
}

@Override
public List<SchemaTableName> listTables(ConnectorSession session, Optional<String> filterSchema)
{
Set<String> schemaNames = filterSchema.<Set<String>>map(ImmutableSet::of)
.orElseGet(client::getSchemaNames);
.orElseGet(metadataManager::getSchemaNames);

ImmutableSet.Builder<SchemaTableName> builder = ImmutableSet.builder();
for (String schemaName : schemaNames) {
for (String tableName : client.getTableNames(schemaName)) {
for (String tableName : metadataManager.getTableNames(schemaName)) {
builder.add(new SchemaTableName(schemaName, tableName));
}
}
Expand Down Expand Up @@ -415,13 +415,13 @@ public void rollback()

private ConnectorTableMetadata getTableMetadata(SchemaTableName tableName)
{
if (!client.getSchemaNames().contains(tableName.getSchemaName())) {
if (!metadataManager.getSchemaNames().contains(tableName.getSchemaName())) {
return null;
}

// Need to validate that SchemaTableName is a table
if (!this.listViews(Optional.ofNullable(tableName.getSchemaName())).contains(tableName)) {
AccumuloTable table = client.getTable(tableName);
AccumuloTable table = metadataManager.getTable(tableName);
if (table == null) {
return null;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,16 @@

public class AccumuloMetadataFactory
{
private final AccumuloClient client;
private final AccumuloMetadataManager metadataManager;

@Inject
public AccumuloMetadataFactory(AccumuloClient client)
public AccumuloMetadataFactory(AccumuloMetadataManager metadataManager)
{
this.client = requireNonNull(client, "client is null");
this.metadataManager = requireNonNull(metadataManager, "metadataManager is null");
}

public AccumuloMetadata create()
{
return new AccumuloMetadata(client);
return new AccumuloMetadata(metadataManager);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -83,9 +83,9 @@
* This class is the main access point for the Trino connector to interact with Accumulo.
* It is responsible for creating tables, dropping tables, retrieving table metadata, and getting the ConnectorSplits from a table.
*/
public class AccumuloClient
public class AccumuloMetadataManager
{
private static final Logger LOG = Logger.get(AccumuloClient.class);
private static final Logger LOG = Logger.get(AccumuloMetadataManager.class);
private static final Splitter COMMA_SPLITTER = Splitter.on(',').omitEmptyStrings().trimResults();

private final ZooKeeperMetadataManager metaManager;
Expand All @@ -96,7 +96,7 @@ public class AccumuloClient
private final String username;

@Inject
public AccumuloClient(
public AccumuloMetadataManager(
Connector connector,
AccumuloConfig config,
ZooKeeperMetadataManager metaManager,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ public void configure(Binder binder)
binder.bind(AccumuloConnector.class).in(Scopes.SINGLETON);
binder.bind(AccumuloMetadata.class).in(Scopes.SINGLETON);
binder.bind(AccumuloMetadataFactory.class).in(Scopes.SINGLETON);
binder.bind(AccumuloClient.class).in(Scopes.SINGLETON);
binder.bind(AccumuloMetadataManager.class).in(Scopes.SINGLETON);
binder.bind(AccumuloSplitManager.class).in(Scopes.SINGLETON);
binder.bind(AccumuloRecordSetProvider.class).in(Scopes.SINGLETON);
binder.bind(AccumuloPageSinkProvider.class).in(Scopes.SINGLETON);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,12 +44,12 @@
public class AccumuloSplitManager
implements ConnectorSplitManager
{
private final AccumuloClient client;
private final AccumuloMetadataManager metadataManager;

@Inject
public AccumuloSplitManager(AccumuloClient client)
public AccumuloSplitManager(AccumuloMetadataManager metadataManager)
{
this.client = requireNonNull(client, "client is null");
this.metadataManager = requireNonNull(metadataManager, "metadataManager is null");
}

@Override
Expand All @@ -73,7 +73,7 @@ public ConnectorSplitSource getSplits(
Optional<Domain> rDom = getRangeDomain(rowIdName, handle.getConstraint());

// Call out to our client to retrieve all tablet split metadata using the row ID domain and the secondary index
List<TabletSplitMetadata> tabletSplits = client.getTabletSplits(session, schemaName, tableName, rDom, constraints, handle.getSerializerInstance());
List<TabletSplitMetadata> tabletSplits = metadataManager.getTabletSplits(session, schemaName, tableName, rDom, constraints, handle.getSerializerInstance());

// Pack the tablet split metadata into a connector split
ImmutableList.Builder<ConnectorSplit> cSplits = ImmutableList.builder();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,8 @@

import static com.google.common.base.Preconditions.checkArgument;
import static io.airlift.concurrent.Threads.daemonThreadsNamed;
import static io.trino.plugin.accumulo.AccumuloClient.getRangesFromDomain;
import static io.trino.plugin.accumulo.AccumuloErrorCode.UNEXPECTED_ACCUMULO_ERROR;
import static io.trino.plugin.accumulo.AccumuloMetadataManager.getRangesFromDomain;
import static io.trino.plugin.accumulo.conf.AccumuloSessionProperties.getIndexCardinalityCachePollingDuration;
import static io.trino.plugin.accumulo.conf.AccumuloSessionProperties.getIndexSmallCardThreshold;
import static io.trino.plugin.accumulo.conf.AccumuloSessionProperties.getIndexThreshold;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
package io.trino.plugin.accumulo.io;

import com.google.inject.Inject;
import io.trino.plugin.accumulo.AccumuloClient;
import io.trino.plugin.accumulo.AccumuloMetadataManager;
import io.trino.plugin.accumulo.conf.AccumuloConfig;
import io.trino.plugin.accumulo.model.AccumuloTableHandle;
import io.trino.spi.connector.ConnectorInsertTableHandle;
Expand All @@ -36,17 +36,17 @@
public class AccumuloPageSinkProvider
implements ConnectorPageSinkProvider
{
private final AccumuloClient client;
private final AccumuloMetadataManager metadataManager;
private final Connector connector;
private final String username;

@Inject
public AccumuloPageSinkProvider(
Connector connector,
AccumuloConfig config,
AccumuloClient client)
AccumuloMetadataManager metadataManager)
{
this.client = requireNonNull(client, "client is null");
this.metadataManager = requireNonNull(metadataManager, "metadataManager is null");
this.connector = requireNonNull(connector, "connector is null");
this.username = config.getUsername();
}
Expand All @@ -55,7 +55,7 @@ public AccumuloPageSinkProvider(
public ConnectorPageSink createPageSink(ConnectorTransactionHandle transactionHandle, ConnectorSession session, ConnectorOutputTableHandle outputTableHandle, ConnectorPageSinkId pageSinkId)
{
AccumuloTableHandle tableHandle = (AccumuloTableHandle) outputTableHandle;
return new AccumuloPageSink(connector, client.getTable(tableHandle.toSchemaTableName()), username);
return new AccumuloPageSink(connector, metadataManager.getTable(tableHandle.toSchemaTableName()), username);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,9 @@
import static org.junit.jupiter.api.TestInstance.Lifecycle.PER_CLASS;

@TestInstance(PER_CLASS)
public class TestAccumuloClient
public class TestAccumuloMetadataManager
{
private AccumuloClient client;
private AccumuloMetadataManager metadataManager;
private ZooKeeperMetadataManager zooKeeperMetadataManager;

@BeforeAll
Expand All @@ -55,14 +55,14 @@ public void setUp()
Connector connector = TestingAccumuloServer.getInstance().getConnector();
config.setZooKeepers(connector.getInstance().getZooKeepers());
zooKeeperMetadataManager = new ZooKeeperMetadataManager(config, TESTING_TYPE_MANAGER);
client = new AccumuloClient(connector, config, zooKeeperMetadataManager, new AccumuloTableManager(connector), new IndexLookup(connector, new ColumnCardinalityCache(connector, config)));
metadataManager = new AccumuloMetadataManager(connector, config, zooKeeperMetadataManager, new AccumuloTableManager(connector), new IndexLookup(connector, new ColumnCardinalityCache(connector, config)));
}

@AfterAll
public void tearDown()
{
zooKeeperMetadataManager = null;
client = null;
metadataManager = null;
}

@Test
Expand All @@ -82,13 +82,13 @@ public void testCreateTableEmptyAccumuloColumn()
new AccumuloTableProperties().getTableProperties().forEach(meta -> properties.put(meta.getName(), meta.getDefaultValue()));
properties.put("external", true);
properties.put("column_mapping", "a:a:a,b::b,c:c:,d::");
client.createTable(new ConnectorTableMetadata(tableName, columns, properties));
assertThat(client.getTable(tableName)).isNotNull();
metadataManager.createTable(new ConnectorTableMetadata(tableName, columns, properties));
assertThat(metadataManager.getTable(tableName)).isNotNull();
}
finally {
AccumuloTable table = zooKeeperMetadataManager.getTable(tableName);
if (table != null) {
client.dropTable(table);
metadataManager.dropTable(table);
}
}
}
Expand Down

0 comments on commit 266e9d0

Please sign in to comment.