From 01293cee918e1b7798eb2544f28a9f84c8b093b2 Mon Sep 17 00:00:00 2001 From: VGalaxies Date: Mon, 13 Jan 2025 10:04:23 +0800 Subject: [PATCH] intro PipePluginAnnotationTest & fix show --- iotdb-core/confignode/pom.xml | 5 + .../pipe/plugin/PipePluginTableResp.java | 22 ++- .../persistence/pipe/PipePluginInfo.java | 5 +- .../annotation/PipePluginAnnotationTest.java | 144 ++++++++++++++++++ .../sync/IoTDBDataNodeSyncConnector.java | 4 + .../annotation/PipePluginAnnotationTest.java | 144 ++++++++++++++++++ iotdb-core/node-commons/pom.xml | 5 + .../pipe/agent/plugin/PipePluginAgent.java | 4 +- .../connector/PlaceholderConnector.java | 4 + .../iotdb/airgap/IoTDBAirGapConnector.java | 2 + .../PipeConsensusAsyncConnector.java | 4 + .../thrift/IoTDBLegacyPipeConnector.java | 2 + .../thrift/IoTDBThriftAsyncConnector.java | 5 + .../iotdb/thrift/IoTDBThriftConnector.java | 4 + .../iotdb/thrift/IoTDBThriftSslConnector.java | 4 + .../thrift/IoTDBThriftSyncConnector.java | 5 + .../connector/opcua/OpcUaConnector.java | 2 + .../websocket/WebSocketConnector.java | 2 + .../writeback/WriteBackConnector.java | 4 + .../extractor/iotdb/IoTDBExtractor.java | 4 + .../plugin/meta/PipePluginMetaKeeper.java | 4 +- .../connector/protocol/IoTDBConnector.java | 4 + .../protocol/IoTDBSslSyncConnector.java | 4 + .../pipe/extractor/IoTDBExtractor.java | 4 + .../annotation/PipePluginAnnotationTest.java | 144 ++++++++++++++++++ 25 files changed, 522 insertions(+), 13 deletions(-) create mode 100644 iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/pipe/annotation/PipePluginAnnotationTest.java create mode 100644 iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/plugin/annotation/PipePluginAnnotationTest.java create mode 100644 iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/pipe/plugin/annotation/PipePluginAnnotationTest.java diff --git a/iotdb-core/confignode/pom.xml b/iotdb-core/confignode/pom.xml index 3b7998f091e8..73cfd568d521 100644 --- a/iotdb-core/confignode/pom.xml +++ b/iotdb-core/confignode/pom.xml @@ -144,6 +144,11 @@ com.google.code.findbugs jsr305 + + org.reflections + reflections + test + junit junit diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/response/pipe/plugin/PipePluginTableResp.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/response/pipe/plugin/PipePluginTableResp.java index 5c784ba277e4..534b824ef47d 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/response/pipe/plugin/PipePluginTableResp.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/response/pipe/plugin/PipePluginTableResp.java @@ -31,6 +31,7 @@ import java.util.ArrayList; import java.util.List; import java.util.Map; +import java.util.stream.Collectors; public class PipePluginTableResp implements DataSet { @@ -56,13 +57,18 @@ public TGetPipePluginTableResp convertToThriftResponse() throws IOException { } public PipePluginTableResp filter(final boolean isTableModel) { - allPipePluginMeta.removeIf( - meta -> { - final String pipePluginName = meta.getPluginName(); - final Visibility visibility = - pipePluginNameToVisibilityMap.getOrDefault(pipePluginName, Visibility.TREE_ONLY); - return !VisibilityUtils.isCompatible(visibility, isTableModel); - }); - return this; + return new PipePluginTableResp( + status, + allPipePluginMeta.stream() + .filter( + meta -> { + final String pipePluginName = meta.getPluginName(); + final Visibility visibility = + pipePluginNameToVisibilityMap.getOrDefault( + pipePluginName, Visibility.TREE_ONLY); + return VisibilityUtils.isCompatible(visibility, isTableModel); + }) + .collect(Collectors.toList()), + pipePluginNameToVisibilityMap); } } diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipePluginInfo.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipePluginInfo.java index f4b4151d6a63..baecc0407353 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipePluginInfo.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipePluginInfo.java @@ -60,6 +60,8 @@ import java.util.Map; import java.util.Objects; import java.util.concurrent.locks.ReentrantLock; +import java.util.stream.Collectors; +import java.util.stream.StreamSupport; import static org.apache.iotdb.commons.pipe.agent.plugin.builtin.BuiltinPipePlugin.DO_NOTHING_PROCESSOR; import static org.apache.iotdb.commons.pipe.agent.plugin.builtin.BuiltinPipePlugin.IOTDB_EXTRACTOR; @@ -274,7 +276,8 @@ public TSStatus dropPipePlugin(final DropPipePluginPlan dropPipePluginPlan) { public DataSet showPipePlugins() { return new PipePluginTableResp( new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode()), - Arrays.asList(pipePluginMetaKeeper.getAllPipePluginMeta()), + StreamSupport.stream(pipePluginMetaKeeper.getAllPipePluginMeta().spliterator(), false) + .collect(Collectors.toList()), pipePluginMetaKeeper.getPipePluginNameToVisibilityMap()); } diff --git a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/pipe/annotation/PipePluginAnnotationTest.java b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/pipe/annotation/PipePluginAnnotationTest.java new file mode 100644 index 000000000000..554a343c98d0 --- /dev/null +++ b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/pipe/annotation/PipePluginAnnotationTest.java @@ -0,0 +1,144 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.confignode.pipe.annotation; + +import org.apache.iotdb.commons.pipe.datastructure.visibility.Visibility; +import org.apache.iotdb.pipe.api.PipePlugin; + +import org.junit.Test; +import org.reflections.Reflections; +import org.reflections.scanners.SubTypesScanner; + +import java.util.HashMap; +import java.util.Map; +import java.util.Set; + +import static org.apache.iotdb.commons.pipe.datastructure.visibility.VisibilityUtils.calculateFromPluginClass; +import static org.apache.iotdb.commons.pipe.datastructure.visibility.VisibilityUtils.isCompatible; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; + +public class PipePluginAnnotationTest { + + @Test + public void testPipePluginVisibility() { + // Use the Reflections library to scan the classpath + final Reflections reflections = + new Reflections("org.apache.iotdb.confignode", new SubTypesScanner(false)); + final Set> subTypes = reflections.getSubTypesOf(PipePlugin.class); + + // Create root node + final TreeNode root = new TreeNode(PipePlugin.class); + + // Build the tree + final Map, TreeNode> nodeMap = new HashMap<>(); + nodeMap.put(PipePlugin.class, root); + + for (final Class subType : subTypes) { + final TreeNode node = new TreeNode(subType); + nodeMap.put(subType, node); + + if (subType.isInterface()) { + // Handle super interfaces of the interface + final Class[] superInterfaces = subType.getInterfaces(); + for (Class superInterface : superInterfaces) { + while (!nodeMap.containsKey(superInterface) && superInterface != null) { + superInterface = getSuperInterface(superInterface); + } + + if (superInterface != null) { + nodeMap.get(superInterface).children.put(subType, node); + } + } + } else { + // Handle superclass of the class + Class superClass = subType.getSuperclass(); + if (superClass == Object.class) { + // If the superclass is Object, check the implemented interfaces + final Class[] interfaces = subType.getInterfaces(); + for (final Class iface : interfaces) { + if (nodeMap.containsKey(iface)) { + nodeMap.get(iface).children.put(subType, node); + } + } + } else { + while (!nodeMap.containsKey(superClass) && superClass != null) { + superClass = superClass.getSuperclass(); + } + + if (superClass != null) { + nodeMap.get(superClass).children.put(subType, node); + } + } + } + } + + root.visibility = Visibility.BOTH; + for (TreeNode node : root.children.values()) { + node.visibility = Visibility.BOTH; + } + + // Validate the correctness of the tree + assertNotNull(root); + + // Validate the visibility compatibility of the tree + assertTrue(validateTreeNode(root)); + } + + private static class TreeNode { + Class clazz; + Visibility visibility; + Map, TreeNode> children = new HashMap<>(); + + TreeNode(final Class clazz) { + this.clazz = clazz; + this.visibility = calculateFromPluginClass(clazz); + } + } + + // Get the super interface of an interface + private Class getSuperInterface(final Class interfaceClass) { + final Class[] superInterfaces = interfaceClass.getInterfaces(); + return superInterfaces.length > 0 ? superInterfaces[0] : null; + } + + private static boolean validateTreeNode(final TreeNode node) { + for (final TreeNode child : node.children.values()) { + if (!isCompatible(node.visibility, child.visibility)) { + assertTrue( + "Incompatible visibility detected:\n" + + "Parent class: " + + node.clazz.getName() + + ", Visibility: " + + node.visibility + + "\n" + + "Child class: " + + child.clazz.getName() + + ", Visibility: " + + child.visibility, + isCompatible(node.visibility, child.visibility)); + } + if (!validateTreeNode(child)) { + return false; + } + } + return true; + } +} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBDataNodeSyncConnector.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBDataNodeSyncConnector.java index 03754dff4363..d3386a3eb934 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBDataNodeSyncConnector.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBDataNodeSyncConnector.java @@ -26,6 +26,8 @@ import org.apache.iotdb.db.conf.IoTDBConfig; import org.apache.iotdb.db.conf.IoTDBDescriptor; import org.apache.iotdb.db.pipe.connector.client.IoTDBDataNodeSyncClientManager; +import org.apache.iotdb.pipe.api.annotation.TableModel; +import org.apache.iotdb.pipe.api.annotation.TreeModel; import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameterValidator; import org.slf4j.Logger; @@ -37,6 +39,8 @@ import java.util.Set; import java.util.stream.Collectors; +@TreeModel +@TableModel public abstract class IoTDBDataNodeSyncConnector extends IoTDBSslSyncConnector { private static final Logger LOGGER = LoggerFactory.getLogger(IoTDBDataNodeSyncConnector.class); diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/plugin/annotation/PipePluginAnnotationTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/plugin/annotation/PipePluginAnnotationTest.java new file mode 100644 index 000000000000..52db8c346eb6 --- /dev/null +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/plugin/annotation/PipePluginAnnotationTest.java @@ -0,0 +1,144 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.pipe.plugin.annotation; + +import org.apache.iotdb.commons.pipe.datastructure.visibility.Visibility; +import org.apache.iotdb.pipe.api.PipePlugin; + +import org.junit.Test; +import org.reflections.Reflections; +import org.reflections.scanners.SubTypesScanner; + +import java.util.HashMap; +import java.util.Map; +import java.util.Set; + +import static org.apache.iotdb.commons.pipe.datastructure.visibility.VisibilityUtils.calculateFromPluginClass; +import static org.apache.iotdb.commons.pipe.datastructure.visibility.VisibilityUtils.isCompatible; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; + +public class PipePluginAnnotationTest { + + @Test + public void testPipePluginVisibility() { + // Use the Reflections library to scan the classpath + final Reflections reflections = + new Reflections("org.apache.iotdb.db", new SubTypesScanner(false)); + final Set> subTypes = reflections.getSubTypesOf(PipePlugin.class); + + // Create root node + final TreeNode root = new TreeNode(PipePlugin.class); + + // Build the tree + final Map, TreeNode> nodeMap = new HashMap<>(); + nodeMap.put(PipePlugin.class, root); + + for (final Class subType : subTypes) { + final TreeNode node = new TreeNode(subType); + nodeMap.put(subType, node); + + if (subType.isInterface()) { + // Handle super interfaces of the interface + final Class[] superInterfaces = subType.getInterfaces(); + for (Class superInterface : superInterfaces) { + while (!nodeMap.containsKey(superInterface) && superInterface != null) { + superInterface = getSuperInterface(superInterface); + } + + if (superInterface != null) { + nodeMap.get(superInterface).children.put(subType, node); + } + } + } else { + // Handle superclass of the class + Class superClass = subType.getSuperclass(); + if (superClass == Object.class) { + // If the superclass is Object, check the implemented interfaces + final Class[] interfaces = subType.getInterfaces(); + for (final Class iface : interfaces) { + if (nodeMap.containsKey(iface)) { + nodeMap.get(iface).children.put(subType, node); + } + } + } else { + while (!nodeMap.containsKey(superClass) && superClass != null) { + superClass = superClass.getSuperclass(); + } + + if (superClass != null) { + nodeMap.get(superClass).children.put(subType, node); + } + } + } + } + + root.visibility = Visibility.BOTH; + for (TreeNode node : root.children.values()) { + node.visibility = Visibility.BOTH; + } + + // Validate the correctness of the tree + assertNotNull(root); + + // Validate the visibility compatibility of the tree + assertTrue(validateTreeNode(root)); + } + + private static class TreeNode { + Class clazz; + Visibility visibility; + Map, TreeNode> children = new HashMap<>(); + + TreeNode(final Class clazz) { + this.clazz = clazz; + this.visibility = calculateFromPluginClass(clazz); + } + } + + // Get the super interface of an interface + private Class getSuperInterface(final Class interfaceClass) { + final Class[] superInterfaces = interfaceClass.getInterfaces(); + return superInterfaces.length > 0 ? superInterfaces[0] : null; + } + + private static boolean validateTreeNode(final TreeNode node) { + for (final TreeNode child : node.children.values()) { + if (!isCompatible(node.visibility, child.visibility)) { + assertTrue( + "Incompatible visibility detected:\n" + + "Parent class: " + + node.clazz.getName() + + ", Visibility: " + + node.visibility + + "\n" + + "Child class: " + + child.clazz.getName() + + ", Visibility: " + + child.visibility, + isCompatible(node.visibility, child.visibility)); + } + if (!validateTreeNode(child)) { + return false; + } + } + return true; + } +} diff --git a/iotdb-core/node-commons/pom.xml b/iotdb-core/node-commons/pom.xml index 2243f340bfcf..cfbb3487da9b 100644 --- a/iotdb-core/node-commons/pom.xml +++ b/iotdb-core/node-commons/pom.xml @@ -167,6 +167,11 @@ com.github.luben zstd-jni + + org.reflections + reflections + test + org.mockito mockito-core diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/plugin/PipePluginAgent.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/plugin/PipePluginAgent.java index fe986ead0ca6..352c3be12838 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/plugin/PipePluginAgent.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/plugin/PipePluginAgent.java @@ -37,12 +37,12 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.Arrays; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Objects; import java.util.stream.Collectors; +import java.util.stream.StreamSupport; public abstract class PipePluginAgent { @@ -155,7 +155,7 @@ protected PipeConnector validateConnector( */ public final List getSubProcessorNamesWithSpecifiedParent( Class parentClass) throws PipeException { - return Arrays.stream(pipePluginMetaKeeper.getAllPipePluginMeta()) + return StreamSupport.stream(pipePluginMetaKeeper.getAllPipePluginMeta().spliterator(), false) .map(pipePluginMeta -> pipePluginMeta.getPluginName().toLowerCase()) .filter( pluginName -> { diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/plugin/builtin/connector/PlaceholderConnector.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/plugin/builtin/connector/PlaceholderConnector.java index 2ad6e893d3cc..39c2676120d2 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/plugin/builtin/connector/PlaceholderConnector.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/plugin/builtin/connector/PlaceholderConnector.java @@ -20,6 +20,8 @@ package org.apache.iotdb.commons.pipe.agent.plugin.builtin.connector; import org.apache.iotdb.pipe.api.PipeConnector; +import org.apache.iotdb.pipe.api.annotation.TableModel; +import org.apache.iotdb.pipe.api.annotation.TreeModel; import org.apache.iotdb.pipe.api.customizer.configuration.PipeConnectorRuntimeConfiguration; import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameterValidator; import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters; @@ -34,6 +36,8 @@ * server module will replace this class with the real implementation when initializing the IoTDB * pipe connector. */ +@TreeModel +@TableModel public class PlaceholderConnector implements PipeConnector { private static final String PLACEHOLDER_ERROR_MSG = diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/plugin/builtin/connector/iotdb/airgap/IoTDBAirGapConnector.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/plugin/builtin/connector/iotdb/airgap/IoTDBAirGapConnector.java index 641a0c993a61..28e7583b2f57 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/plugin/builtin/connector/iotdb/airgap/IoTDBAirGapConnector.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/plugin/builtin/connector/iotdb/airgap/IoTDBAirGapConnector.java @@ -20,6 +20,7 @@ package org.apache.iotdb.commons.pipe.agent.plugin.builtin.connector.iotdb.airgap; import org.apache.iotdb.commons.pipe.agent.plugin.builtin.connector.PlaceholderConnector; +import org.apache.iotdb.pipe.api.annotation.TreeModel; /** * This class is a placeholder and should not be initialized. It represents the IoTDB Air Gap @@ -27,4 +28,5 @@ * pipe agent in the server module will replace this class with the real implementation when * initializing the IoTDB Air Gap connector. */ +@TreeModel public class IoTDBAirGapConnector extends PlaceholderConnector {} diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/plugin/builtin/connector/iotdb/consensus/PipeConsensusAsyncConnector.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/plugin/builtin/connector/iotdb/consensus/PipeConsensusAsyncConnector.java index fb55bc71a10a..583cdf1fecd2 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/plugin/builtin/connector/iotdb/consensus/PipeConsensusAsyncConnector.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/plugin/builtin/connector/iotdb/consensus/PipeConsensusAsyncConnector.java @@ -20,6 +20,8 @@ package org.apache.iotdb.commons.pipe.agent.plugin.builtin.connector.iotdb.consensus; import org.apache.iotdb.commons.pipe.agent.plugin.builtin.connector.PlaceholderConnector; +import org.apache.iotdb.pipe.api.annotation.TableModel; +import org.apache.iotdb.pipe.api.annotation.TreeModel; /** * This class is a placeholder and should not be initialized. It represents the @@ -27,4 +29,6 @@ * imported here. The pipe agent in the server module will replace this class with the real * implementation when initializing the PipeConsensusAsyncConnector. */ +@TreeModel +@TableModel public class PipeConsensusAsyncConnector extends PlaceholderConnector {} diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/plugin/builtin/connector/iotdb/thrift/IoTDBLegacyPipeConnector.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/plugin/builtin/connector/iotdb/thrift/IoTDBLegacyPipeConnector.java index ec5e73f5298e..81d7efef3751 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/plugin/builtin/connector/iotdb/thrift/IoTDBLegacyPipeConnector.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/plugin/builtin/connector/iotdb/thrift/IoTDBLegacyPipeConnector.java @@ -20,6 +20,7 @@ package org.apache.iotdb.commons.pipe.agent.plugin.builtin.connector.iotdb.thrift; import org.apache.iotdb.commons.pipe.agent.plugin.builtin.connector.PlaceholderConnector; +import org.apache.iotdb.pipe.api.annotation.TreeModel; /** * This class is a placeholder and should not be initialized. It represents the IoTDB legacy pipe @@ -27,4 +28,5 @@ * imported here. The pipe agent in the server module will replace this class with the real * implementation when initializing the IoTDB legacy pipe connector. */ +@TreeModel public class IoTDBLegacyPipeConnector extends PlaceholderConnector {} diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/plugin/builtin/connector/iotdb/thrift/IoTDBThriftAsyncConnector.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/plugin/builtin/connector/iotdb/thrift/IoTDBThriftAsyncConnector.java index 8f3d02d5adaa..220083974647 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/plugin/builtin/connector/iotdb/thrift/IoTDBThriftAsyncConnector.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/plugin/builtin/connector/iotdb/thrift/IoTDBThriftAsyncConnector.java @@ -19,10 +19,15 @@ package org.apache.iotdb.commons.pipe.agent.plugin.builtin.connector.iotdb.thrift; +import org.apache.iotdb.pipe.api.annotation.TableModel; +import org.apache.iotdb.pipe.api.annotation.TreeModel; + /** * This class is a placeholder and should not be initialized. It represents the IoTDB Thrift async * connector. There is a real implementation in the server module but cannot be imported here. The * pipe agent in the server module will replace this class with the real implementation when * initializing the IoTDB Thrift async connector. */ +@TreeModel +@TableModel public class IoTDBThriftAsyncConnector extends IoTDBThriftConnector {} diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/plugin/builtin/connector/iotdb/thrift/IoTDBThriftConnector.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/plugin/builtin/connector/iotdb/thrift/IoTDBThriftConnector.java index a2786eba6164..4fb4178a6c96 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/plugin/builtin/connector/iotdb/thrift/IoTDBThriftConnector.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/plugin/builtin/connector/iotdb/thrift/IoTDBThriftConnector.java @@ -20,6 +20,8 @@ package org.apache.iotdb.commons.pipe.agent.plugin.builtin.connector.iotdb.thrift; import org.apache.iotdb.commons.pipe.agent.plugin.builtin.connector.PlaceholderConnector; +import org.apache.iotdb.pipe.api.annotation.TableModel; +import org.apache.iotdb.pipe.api.annotation.TreeModel; /** * This class is a placeholder and should not be initialized. It represents the IoTDB Thrift @@ -27,4 +29,6 @@ * pipe agent in the server module will replace this class with the real implementation when * initializing the IoTDB Thrift connector. */ +@TreeModel +@TableModel public class IoTDBThriftConnector extends PlaceholderConnector {} diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/plugin/builtin/connector/iotdb/thrift/IoTDBThriftSslConnector.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/plugin/builtin/connector/iotdb/thrift/IoTDBThriftSslConnector.java index 165d91707d7a..a1092f1faabf 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/plugin/builtin/connector/iotdb/thrift/IoTDBThriftSslConnector.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/plugin/builtin/connector/iotdb/thrift/IoTDBThriftSslConnector.java @@ -20,6 +20,8 @@ package org.apache.iotdb.commons.pipe.agent.plugin.builtin.connector.iotdb.thrift; import org.apache.iotdb.commons.pipe.agent.plugin.builtin.connector.PlaceholderConnector; +import org.apache.iotdb.pipe.api.annotation.TableModel; +import org.apache.iotdb.pipe.api.annotation.TreeModel; /** * This class is a placeholder and should not be initialized. It represents the IoTDB SSL connector. @@ -27,4 +29,6 @@ * in the server module will replace this class with the real implementation when initializing the * IoTDB SSL connector. */ +@TreeModel +@TableModel public class IoTDBThriftSslConnector extends PlaceholderConnector {} diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/plugin/builtin/connector/iotdb/thrift/IoTDBThriftSyncConnector.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/plugin/builtin/connector/iotdb/thrift/IoTDBThriftSyncConnector.java index ee80ef63c17a..6ef726b871c8 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/plugin/builtin/connector/iotdb/thrift/IoTDBThriftSyncConnector.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/plugin/builtin/connector/iotdb/thrift/IoTDBThriftSyncConnector.java @@ -19,10 +19,15 @@ package org.apache.iotdb.commons.pipe.agent.plugin.builtin.connector.iotdb.thrift; +import org.apache.iotdb.pipe.api.annotation.TableModel; +import org.apache.iotdb.pipe.api.annotation.TreeModel; + /** * This class is a placeholder and should not be initialized. It represents the IoTDB Thrift sync * connector. There is a real implementation in the server module but cannot be imported here. The * pipe agent in the server module will replace this class with the real implementation when * initializing the IoTDB Thrift sync connector. */ +@TreeModel +@TableModel public class IoTDBThriftSyncConnector extends IoTDBThriftConnector {} diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/plugin/builtin/connector/opcua/OpcUaConnector.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/plugin/builtin/connector/opcua/OpcUaConnector.java index a6448d4209c2..ccc2b2fd67a2 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/plugin/builtin/connector/opcua/OpcUaConnector.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/plugin/builtin/connector/opcua/OpcUaConnector.java @@ -20,6 +20,7 @@ package org.apache.iotdb.commons.pipe.agent.plugin.builtin.connector.opcua; import org.apache.iotdb.commons.pipe.agent.plugin.builtin.connector.PlaceholderConnector; +import org.apache.iotdb.pipe.api.annotation.TreeModel; /** * This class is a placeholder and should not be initialized. It represents the OPC UA connector. @@ -27,4 +28,5 @@ * in the server module will replace this class with the real implementation when initializing the * OPC UA connector. */ +@TreeModel public class OpcUaConnector extends PlaceholderConnector {} diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/plugin/builtin/connector/websocket/WebSocketConnector.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/plugin/builtin/connector/websocket/WebSocketConnector.java index a687a9c79a3e..0d6e1de3c570 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/plugin/builtin/connector/websocket/WebSocketConnector.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/plugin/builtin/connector/websocket/WebSocketConnector.java @@ -20,6 +20,7 @@ package org.apache.iotdb.commons.pipe.agent.plugin.builtin.connector.websocket; import org.apache.iotdb.commons.pipe.agent.plugin.builtin.connector.PlaceholderConnector; +import org.apache.iotdb.pipe.api.annotation.TreeModel; /** * This class is a placeholder and should not be initialized. It represents the Web Socket @@ -27,4 +28,5 @@ * pipe agent in the server module will replace this class with the real implementation when * initializing the Web Socket connector. */ +@TreeModel public class WebSocketConnector extends PlaceholderConnector {} diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/plugin/builtin/connector/writeback/WriteBackConnector.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/plugin/builtin/connector/writeback/WriteBackConnector.java index 11ab79200f41..f32471a88567 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/plugin/builtin/connector/writeback/WriteBackConnector.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/plugin/builtin/connector/writeback/WriteBackConnector.java @@ -20,6 +20,8 @@ package org.apache.iotdb.commons.pipe.agent.plugin.builtin.connector.writeback; import org.apache.iotdb.commons.pipe.agent.plugin.builtin.connector.PlaceholderConnector; +import org.apache.iotdb.pipe.api.annotation.TableModel; +import org.apache.iotdb.pipe.api.annotation.TreeModel; /** * This class is a placeholder and should not be initialized. It represents the Write Back @@ -27,4 +29,6 @@ * pipe agent in the server module will replace this class with the real implementation when * initializing the Write Back connector. */ +@TreeModel +@TableModel public class WriteBackConnector extends PlaceholderConnector {} diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/plugin/builtin/extractor/iotdb/IoTDBExtractor.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/plugin/builtin/extractor/iotdb/IoTDBExtractor.java index 53f048f148d5..368f1c2fce4d 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/plugin/builtin/extractor/iotdb/IoTDBExtractor.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/plugin/builtin/extractor/iotdb/IoTDBExtractor.java @@ -20,6 +20,8 @@ package org.apache.iotdb.commons.pipe.agent.plugin.builtin.extractor.iotdb; import org.apache.iotdb.pipe.api.PipeExtractor; +import org.apache.iotdb.pipe.api.annotation.TableModel; +import org.apache.iotdb.pipe.api.annotation.TreeModel; import org.apache.iotdb.pipe.api.customizer.configuration.PipeExtractorRuntimeConfiguration; import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameterValidator; import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters; @@ -31,6 +33,8 @@ * imported here. The pipe agent in the server module will replace this class with the real * implementation when initializing the extractor. */ +@TreeModel +@TableModel public class IoTDBExtractor implements PipeExtractor { private static final String PLACEHOLDER_ERROR_MSG = diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/plugin/meta/PipePluginMetaKeeper.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/plugin/meta/PipePluginMetaKeeper.java index 91538cdbc494..6f4ee1caa2f9 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/plugin/meta/PipePluginMetaKeeper.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/plugin/meta/PipePluginMetaKeeper.java @@ -68,8 +68,8 @@ public PipePluginMeta getPipePluginMeta(String pluginName) { return pipePluginNameToMetaMap.get(pluginName.toUpperCase()); } - public PipePluginMeta[] getAllPipePluginMeta() { - return pipePluginNameToMetaMap.values().toArray(new PipePluginMeta[0]); + public Iterable getAllPipePluginMeta() { + return pipePluginNameToMetaMap.values(); } public boolean containsPipePlugin(String pluginName) { diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/protocol/IoTDBConnector.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/protocol/IoTDBConnector.java index 8c116d70ce99..3539d25ddfbe 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/protocol/IoTDBConnector.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/protocol/IoTDBConnector.java @@ -29,6 +29,8 @@ import org.apache.iotdb.commons.pipe.receiver.PipeReceiverStatusHandler; import org.apache.iotdb.commons.utils.NodeUrlUtils; import org.apache.iotdb.pipe.api.PipeConnector; +import org.apache.iotdb.pipe.api.annotation.TableModel; +import org.apache.iotdb.pipe.api.annotation.TreeModel; import org.apache.iotdb.pipe.api.customizer.configuration.PipeConnectorRuntimeConfiguration; import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameterValidator; import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters; @@ -115,6 +117,8 @@ import static org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.SINK_LOAD_TSFILE_STRATEGY_KEY; import static org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.SINK_RATE_LIMIT_KEY; +@TreeModel +@TableModel public abstract class IoTDBConnector implements PipeConnector { private static final String PARSE_URL_ERROR_FORMATTER = diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/protocol/IoTDBSslSyncConnector.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/protocol/IoTDBSslSyncConnector.java index b56230109de3..329bbd031b82 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/protocol/IoTDBSslSyncConnector.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/protocol/IoTDBSslSyncConnector.java @@ -26,6 +26,8 @@ import org.apache.iotdb.commons.pipe.connector.client.IoTDBSyncClientManager; import org.apache.iotdb.commons.pipe.connector.payload.thrift.request.PipeTransferFilePieceReq; import org.apache.iotdb.commons.pipe.connector.payload.thrift.response.PipeTransferFilePieceResp; +import org.apache.iotdb.pipe.api.annotation.TableModel; +import org.apache.iotdb.pipe.api.annotation.TreeModel; import org.apache.iotdb.pipe.api.customizer.configuration.PipeConnectorRuntimeConfiguration; import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameterValidator; import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters; @@ -58,6 +60,8 @@ import static org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.SINK_KEY; import static org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.SINK_LEADER_CACHE_ENABLE_KEY; +@TreeModel +@TableModel public abstract class IoTDBSslSyncConnector extends IoTDBConnector { private static final Logger LOGGER = LoggerFactory.getLogger(IoTDBSslSyncConnector.class); diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/extractor/IoTDBExtractor.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/extractor/IoTDBExtractor.java index cecee61c7fae..39ea9f9b60f8 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/extractor/IoTDBExtractor.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/extractor/IoTDBExtractor.java @@ -23,6 +23,8 @@ import org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant; import org.apache.iotdb.commons.pipe.config.plugin.env.PipeTaskExtractorRuntimeEnvironment; import org.apache.iotdb.pipe.api.PipeExtractor; +import org.apache.iotdb.pipe.api.annotation.TableModel; +import org.apache.iotdb.pipe.api.annotation.TreeModel; import org.apache.iotdb.pipe.api.customizer.configuration.PipeExtractorRuntimeConfiguration; import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameterValidator; import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters; @@ -41,6 +43,8 @@ import static org.apache.iotdb.commons.pipe.datastructure.options.PipeInclusionOptions.hasAtLeastOneOption; import static org.apache.iotdb.commons.pipe.datastructure.options.PipeInclusionOptions.optionsAreAllLegal; +@TreeModel +@TableModel public abstract class IoTDBExtractor implements PipeExtractor { // Record these variables to provide corresponding value to tag key of monitoring metrics diff --git a/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/pipe/plugin/annotation/PipePluginAnnotationTest.java b/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/pipe/plugin/annotation/PipePluginAnnotationTest.java new file mode 100644 index 000000000000..32ecc7801445 --- /dev/null +++ b/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/pipe/plugin/annotation/PipePluginAnnotationTest.java @@ -0,0 +1,144 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.commons.pipe.plugin.annotation; + +import org.apache.iotdb.commons.pipe.datastructure.visibility.Visibility; +import org.apache.iotdb.pipe.api.PipePlugin; + +import org.junit.Test; +import org.reflections.Reflections; +import org.reflections.scanners.SubTypesScanner; + +import java.util.HashMap; +import java.util.Map; +import java.util.Set; + +import static org.apache.iotdb.commons.pipe.datastructure.visibility.VisibilityUtils.calculateFromPluginClass; +import static org.apache.iotdb.commons.pipe.datastructure.visibility.VisibilityUtils.isCompatible; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; + +public class PipePluginAnnotationTest { + + @Test + public void testPipePluginVisibility() { + // Use the Reflections library to scan the classpath + final Reflections reflections = + new Reflections("org.apache.iotdb.commons", new SubTypesScanner(false)); + final Set> subTypes = reflections.getSubTypesOf(PipePlugin.class); + + // Create root node + final TreeNode root = new TreeNode(PipePlugin.class); + + // Build the tree + final Map, TreeNode> nodeMap = new HashMap<>(); + nodeMap.put(PipePlugin.class, root); + + for (final Class subType : subTypes) { + final TreeNode node = new TreeNode(subType); + nodeMap.put(subType, node); + + if (subType.isInterface()) { + // Handle super interfaces of the interface + final Class[] superInterfaces = subType.getInterfaces(); + for (Class superInterface : superInterfaces) { + while (!nodeMap.containsKey(superInterface) && superInterface != null) { + superInterface = getSuperInterface(superInterface); + } + + if (superInterface != null) { + nodeMap.get(superInterface).children.put(subType, node); + } + } + } else { + // Handle superclass of the class + Class superClass = subType.getSuperclass(); + if (superClass == Object.class) { + // If the superclass is Object, check the implemented interfaces + final Class[] interfaces = subType.getInterfaces(); + for (final Class iface : interfaces) { + if (nodeMap.containsKey(iface)) { + nodeMap.get(iface).children.put(subType, node); + } + } + } else { + while (!nodeMap.containsKey(superClass) && superClass != null) { + superClass = superClass.getSuperclass(); + } + + if (superClass != null) { + nodeMap.get(superClass).children.put(subType, node); + } + } + } + } + + root.visibility = Visibility.BOTH; + for (TreeNode node : root.children.values()) { + node.visibility = Visibility.BOTH; + } + + // Validate the correctness of the tree + assertNotNull(root); + + // Validate the visibility compatibility of the tree + assertTrue(validateTreeNode(root)); + } + + private static class TreeNode { + Class clazz; + Visibility visibility; + Map, TreeNode> children = new HashMap<>(); + + TreeNode(final Class clazz) { + this.clazz = clazz; + this.visibility = calculateFromPluginClass(clazz); + } + } + + // Get the super interface of an interface + private Class getSuperInterface(final Class interfaceClass) { + final Class[] superInterfaces = interfaceClass.getInterfaces(); + return superInterfaces.length > 0 ? superInterfaces[0] : null; + } + + private static boolean validateTreeNode(final TreeNode node) { + for (final TreeNode child : node.children.values()) { + if (!isCompatible(node.visibility, child.visibility)) { + assertTrue( + "Incompatible visibility detected:\n" + + "Parent class: " + + node.clazz.getName() + + ", Visibility: " + + node.visibility + + "\n" + + "Child class: " + + child.clazz.getName() + + ", Visibility: " + + child.visibility, + isCompatible(node.visibility, child.visibility)); + } + if (!validateTreeNode(child)) { + return false; + } + } + return true; + } +}