From ab2e6a805a5266968185b39a1907356687ce36b3 Mon Sep 17 00:00:00 2001 From: Matteo Merli Date: Mon, 2 May 2022 20:31:01 -0700 Subject: [PATCH] Removed usage of `sample/standalone/ns1` namespaces in standalone (#15186) --- bin/pulsar-managed-ledger-admin | 8 +- faq.md | 24 +++--- .../org/apache/pulsar/PulsarStandalone.java | 79 +++++-------------- .../broker/stats/PrometheusMetricsTest.java | 4 +- .../worker/PulsarFunctionTestUtils.java | 4 +- pulsar-client-cpp/docs/MainPage.md | 4 +- .../pulsar-test-service-start.sh | 8 +- .../impl/schema/DefaultSchemasTest.java | 2 +- .../pulsar/functions/sink/PulsarSinkTest.java | 2 +- .../kubernetes/KubernetesRuntimeTest.java | 6 +- .../runtime/process/ProcessRuntimeTest.java | 4 +- .../worker/rest/api/FunctionsImplTest.java | 2 +- .../api/v2/FunctionApiV2ResourceTest.java | 4 +- .../api/v3/FunctionApiV3ResourceTest.java | 4 +- .../rest/api/v3/SinkApiV3ResourceTest.java | 2 +- .../server/ProxyPrometheusMetricsTest.java | 4 +- site2/docs/sql-getting-started.md | 5 +- 17 files changed, 63 insertions(+), 103 deletions(-) diff --git a/bin/pulsar-managed-ledger-admin b/bin/pulsar-managed-ledger-admin index 9ed5d69bb132d..36ff51569cd80 100755 --- a/bin/pulsar-managed-ledger-admin +++ b/bin/pulsar-managed-ledger-admin @@ -195,7 +195,7 @@ mlPath : str managed-ledger path eg: -print-managed-ledger --zkServer localhost:2181 --managedLedgerPath sample/standalone/ns1/persistent/test +print-managed-ledger --zkServer localhost:2181 --managedLedgerPath public/default/persistent/test ''' def printManagedLedgerCommand(zk, mlPath): print(getManagedLedgerInfo(zk, mlPath)) @@ -213,7 +213,7 @@ cursorName : str managed-cursor path eg: -print-cursor --zkServer localhost:2181 --managedLedgerPath sample/standalone/ns1/persistent/test --cursorName s1 +print-cursor --zkServer localhost:2181 --managedLedgerPath public/default/persistent/test --cursorName s1 ''' def printManagedCursorCommand(zk, mlPath, cursorName): try: @@ -236,7 +236,7 @@ mlPath : str deleteLedgerIds : str comma separated deleting ledger-ids (eg: 123,124) eg: -delete-managed-ledger-ids --zkServer localhost:2181 --managedLedgerPath sample/standalone/ns1/persistent/test --ledgerIds 3 +delete-managed-ledger-ids --zkServer localhost:2181 --managedLedgerPath public/default/persistent/test --ledgerIds 3 ''' def deleteMLLedgerIdsCommand(zk, mlPath, deleteLedgerIds): try: @@ -266,7 +266,7 @@ markDeletePosition: str markDeletePosition combination of : (eg. 123:1) eg: -update-mark-delete-cursor --zkServer localhost:2181 --managedLedgerPath sample/standalone/ns1/persistent/test --cursorName s1 --cursorMarkDelete 0:1 +update-mark-delete-cursor --zkServer localhost:2181 --managedLedgerPath public/default/persistent/test --cursorName s1 --cursorMarkDelete 0:1 ''' def updateMarkDeleteOfCursorCommand(zk, mlPath, cursorName, markDeletePosition): try: diff --git a/faq.md b/faq.md index 190aee51a8f86..3a33e77fda863 100644 --- a/faq.md +++ b/faq.md @@ -94,8 +94,8 @@ There is regex subscription coming up in Pulsar 2.0. See [PIP-13](https://github ### Does Pulsar have, or plan to have, a concept of log compaction where only the latest message with the same key will be kept ? Yes, see [PIP-14](https://github.com/apache/pulsar/wiki/PIP-14:-Topic-compaction) for more details. -### When I use an exclusive subscription to a partitioned topic, is the subscription attached to the "whole topic" or to a "topic partition"? -On a partitioned topic, you can use all the 3 supported subscription types (exclusive, failover, shared), same as with non partitioned topics. +### When I use an exclusive subscription to a partitioned topic, is the subscription attached to the "whole topic" or to a "topic partition"? +On a partitioned topic, you can use all the 3 supported subscription types (exclusive, failover, shared), same as with non partitioned topics. The “subscription” concept is roughly similar to a “consumer-group” in Kafka. You can have multiple of them in the same topic, with different names. If you use “exclusive”, a consumer will try to consume from all partitions, or fail if any partition is already being consumed. @@ -105,7 +105,7 @@ The mode similar to Kafka is “failover” subscription. In this case, you have ### What is the proxy component? It’s a component that was introduced recently. Essentially it’s a stateless proxy that speaks that Pulsar binary protocol. The motivation is to avoid (or overcome the impossibility) of direct connection between clients and brokers. ---- +--- ## Usage and Configuration ### Can I manually change the number of bundles after creating namespaces? @@ -119,7 +119,7 @@ Yes, you can use the cli tool `bin/pulsar-admin persistent unsubscribe $TOPIC -s ### How are subscription modes set? Can I create new subscriptions over the WebSocket API? Yes, you can set most of the producer/consumer configuration option in websocket, by passing them as HTTP query parameters like: -`ws://localhost:8080/ws/consumer/persistent/sample/standalone/ns1/my-topic/my-sub?subscriptionType=Shared` +`ws://localhost:8080/ws/consumer/persistent/public/default/my-topic/my-sub?subscriptionType=Shared` see [the doc](http://pulsar.apache.org/docs/latest/clients/WebSocket/#RunningtheWebSocketservice-1fhsvp). @@ -153,7 +153,7 @@ There is no currently "infinite" retention, other than setting to very high valu The key is that you should use different subscriptions for each consumer. Each subscription is completely independent from others. ### The default when creating a consumer, is it to "tail" from "now" on the topic, or from the "last acknowledged" or something else? -So when you spin up a consumer, it will try to subscribe to the topic, if the subscription doesn't exist, a new one will be created, and it will be positioned at the end of the topic ("now"). +So when you spin up a consumer, it will try to subscribe to the topic, if the subscription doesn't exist, a new one will be created, and it will be positioned at the end of the topic ("now"). Once you reconnect, the subscription will still be there and it will be positioned on the last acknowledged messages from the previous session. @@ -190,16 +190,16 @@ What’s your use case for timeout on the `receiveAsync()`? Could that be achiev ### Why do we choose to use bookkeeper to store consumer offset instead of zookeeper? I mean what's the benefits? ZooKeeper is a “consensus” system that while it exposes a key/value interface is not meant to support a large volume of writes per second. -ZK is not an “horizontally scalable” system, because every node receive every transaction and keeps the whole data set. Effectively, ZK is based on a single “log” that is replicated consistently across the participants. +ZK is not an “horizontally scalable” system, because every node receive every transaction and keeps the whole data set. Effectively, ZK is based on a single “log” that is replicated consistently across the participants. -The max throughput we have observed on a well configured ZK on good hardware was around ~10K writes/s. If you want to do more than that, you would have to shard it.. +The max throughput we have observed on a well configured ZK on good hardware was around ~10K writes/s. If you want to do more than that, you would have to shard it.. To store consumers cursor positions, we need to write potentially a large number of updates per second. Typically we persist the cursor every 1 second, though the rate is configurable and if you want to reduce the amount of potential duplicates, you can increase the persistent frequency. With BookKeeper it’s very efficient to have a large throughput across a huge number of different “logs”. In our case, we use 1 log per cursor, and it becomes feasible to persist every single cursor update. ### I'm facing some issue using `.receiveAsync` that it seems to be related with `UnAckedMessageTracker` and `PartitionedConsumerImpl`. We are consuming messages with `receiveAsync`, doing instant `acknowledgeAsync` when message is received, after that the process will delay the next execution of itself. In such scenario we are consuming a lot more messages (repeated) than the num of messages produced. We are using Partitioned topics with setAckTimeout 30 seconds and I believe this issue could be related with `PartitionedConsumerImpl` because the same test in a non-partitioned topic does not generate any repeated message. -PartitionedConsumer is composed of a set of regular consumers, one per partition. To have a single `receive()` abstraction, messages from all partitions are then pushed into a shared queue. +PartitionedConsumer is composed of a set of regular consumers, one per partition. To have a single `receive()` abstraction, messages from all partitions are then pushed into a shared queue. The thing is that the unacked message tracker works at the partition level.So when the timeout happens, it’s able to request redelivery for the messages and clear them from the queue when that happens, but if the messages were already pushed into the shared queue, the “clearing” part will not happen. @@ -229,8 +229,8 @@ A final option is to check the topic stats. This is a tiny bit involved, because There’s not currently an option for “infinite” (though it sounds a good idea! maybe we could use `-1` for that). The only option now is to use INT_MAX for `retentionTimeInMinutes` and LONG_MAX for `retentionSizeInMB`. It’s not “infinite” but 4085 years of retention should probably be enough! ### Is there a profiling option in Pulsar, so that we can breakdown the time costed in every stage? For instance, message A stay in queue 1ms, bk writing time 2ms(interval between sending to bk and receiving ack from bk) and so on. -There are latency stats at different stages. In the client (eg: reported every 1min in info logs). -In the broker: accessible through the broker metrics, and finally in bookies where there are several different latency metrics. +There are latency stats at different stages. In the client (eg: reported every 1min in info logs). +In the broker: accessible through the broker metrics, and finally in bookies where there are several different latency metrics. In broker there’s just the write latency on BK, because there is no other queuing involved in the write path. @@ -242,7 +242,7 @@ you can create reader with `MessageId.earliest` yes, broker performs auth&auth while creating producer/consumer and this information presents under namespace policies.. so, if auth is enabled then broker does validation ### From what I’ve seen so far, it seems that I’d instead want to do a partitioned topic when I want a firehose/mix of data, and shuffle that firehose in to specific topics per entity when I’d have more discrete consumers. Is that accurate? -Precisely, you can use either approach, and even combine them, depending on what is more convenient for the use case. The general traits to choose one or the other are: +Precisely, you can use either approach, and even combine them, depending on what is more convenient for the use case. The general traits to choose one or the other are: - Partitions -> Maintain a single “logical” topic but scale throughput to multiple machines. Also, ability to consume in order for a “partition” of the keys. In general, consumers are assigned a partition (and thus a subset of keys) without specifying anything. @@ -258,7 +258,7 @@ Main difference: a reader can be used when manually managing the offset/messageI ### Hey, question on routing mode for partitioned topics. What is the default configuration and what is used in the Kafka adaptor? -The default is to use the hash of the key on a message. If the message has no key, the producer will use a “default” partition (picks 1 random partition and use it for all the messages it publishes). +The default is to use the hash of the key on a message. If the message has no key, the producer will use a “default” partition (picks 1 random partition and use it for all the messages it publishes). This is to maintain the same ordering guarantee when no partitions are there: per-producer ordering. diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/PulsarStandalone.java b/pulsar-broker/src/main/java/org/apache/pulsar/PulsarStandalone.java index f8d1548c842fa..085ce94b9f36f 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/PulsarStandalone.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/PulsarStandalone.java @@ -24,20 +24,20 @@ import com.google.common.collect.Sets; import java.io.File; import java.nio.file.Paths; -import java.util.List; +import java.util.Collections; import java.util.Optional; import org.apache.bookkeeper.conf.ServerConfiguration; import org.apache.logging.log4j.LogManager; import org.apache.pulsar.broker.PulsarService; import org.apache.pulsar.broker.ServiceConfiguration; import org.apache.pulsar.broker.resources.NamespaceResources; +import org.apache.pulsar.broker.resources.TenantResources; import org.apache.pulsar.client.admin.PulsarAdmin; -import org.apache.pulsar.client.admin.PulsarAdminException; +import org.apache.pulsar.common.naming.NamespaceName; import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.partition.PartitionedTopicMetadata; -import org.apache.pulsar.common.policies.data.ClusterData; +import org.apache.pulsar.common.policies.data.Policies; import org.apache.pulsar.common.policies.data.TenantInfo; -import org.apache.pulsar.common.policies.data.TenantInfoImpl; import org.apache.pulsar.functions.worker.WorkerConfig; import org.apache.pulsar.functions.worker.WorkerService; import org.apache.pulsar.functions.worker.service.WorkerServiceLoader; @@ -301,18 +301,11 @@ public void start() throws Exception { admin = broker.getAdminClient(); - ClusterData clusterData = ClusterData.builder() - .serviceUrl(broker.getWebServiceAddress()) - .serviceUrlTls(broker.getWebServiceAddressTls()) - .brokerServiceUrl(broker.getBrokerServiceUrl()) - .brokerServiceUrlTls(broker.getBrokerServiceUrlTls()) - .build(); - createSampleNameSpace(clusterData, cluster); - //create default namespace - createNameSpace(cluster, TopicName.PUBLIC_TENANT, TopicName.PUBLIC_TENANT + "/" + TopicName.DEFAULT_NAMESPACE); + createNameSpace(cluster, TopicName.PUBLIC_TENANT, + NamespaceName.get(TopicName.PUBLIC_TENANT, TopicName.DEFAULT_NAMESPACE)); //create pulsar system namespace - createNameSpace(cluster, SYSTEM_NAMESPACE.getTenant(), SYSTEM_NAMESPACE.toString()); + createNameSpace(cluster, SYSTEM_NAMESPACE.getTenant(), SYSTEM_NAMESPACE); if (config.isTransactionCoordinatorEnabled()) { NamespaceResources.PartitionedTopicResources partitionedTopicResources = broker.getPulsarResources().getNamespaceResources().getPartitionedTopicResources(); @@ -327,52 +320,22 @@ public void start() throws Exception { log.debug("--- setup completed ---"); } - private void createNameSpace(String cluster, String publicTenant, String defaultNamespace) { - try { - if (!admin.tenants().getTenants().contains(publicTenant)) { - admin.tenants().createTenant(publicTenant, - TenantInfo.builder() - .adminRoles(Sets.newHashSet(config.getSuperUserRoles())) - .allowedClusters(Sets.newHashSet(cluster)) - .build()); - } - if (!admin.namespaces().getNamespaces(publicTenant).contains(defaultNamespace)) { - admin.namespaces().createNamespace(defaultNamespace); - admin.namespaces().setNamespaceReplicationClusters( - defaultNamespace, Sets.newHashSet(config.getClusterName())); - } - } catch (PulsarAdminException e) { - log.info(e.getMessage(), e); - } - } + private void createNameSpace(String cluster, String publicTenant, NamespaceName ns) throws Exception { + TenantResources tr = broker.getPulsarResources().getTenantResources(); + NamespaceResources nsr = broker.getPulsarResources().getNamespaceResources(); - private void createSampleNameSpace(ClusterData clusterData, String cluster) { - // Create a sample namespace - final String tenant = "sample"; - final String globalCluster = "global"; - final String namespace = tenant + "/ns1"; - try { - List clusters = admin.clusters().getClusters(); - if (!clusters.contains(cluster)) { - admin.clusters().createCluster(cluster, clusterData); - } else { - admin.clusters().updateCluster(cluster, clusterData); - } - // Create marker for "global" cluster - if (!clusters.contains(globalCluster)) { - admin.clusters().createCluster(globalCluster, ClusterData.builder().build()); - } - - if (!admin.tenants().getTenants().contains(tenant)) { - admin.tenants().createTenant(tenant, - new TenantInfoImpl(Sets.newHashSet(config.getSuperUserRoles()), Sets.newHashSet(cluster))); - } + if (!tr.tenantExists(publicTenant)) { + tr.createTenant(publicTenant, + TenantInfo.builder() + .adminRoles(Sets.newHashSet(config.getSuperUserRoles())) + .allowedClusters(Sets.newHashSet(cluster)) + .build()); + } - if (!admin.namespaces().getNamespaces(tenant).contains(namespace)) { - admin.namespaces().createNamespace(namespace); - } - } catch (PulsarAdminException e) { - log.warn(e.getMessage(), e); + if (!nsr.namespaceExists(ns)) { + Policies nsp = new Policies(); + nsp.replication_clusters = Collections.singleton(config.getClusterName()); + nsr.createPolicies(ns, nsp); } } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java index 2a6754bfb1c37..2686b6451ed13 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java @@ -1457,8 +1457,8 @@ public static Multimap parseMetrics(String metrics) { // Example of lines are // jvm_threads_current{cluster="standalone",} 203.0 // or - // pulsar_subscriptions_count{cluster="standalone", namespace="sample/standalone/ns1", - // topic="persistent://sample/standalone/ns1/test-2"} 0.0 1517945780897 + // pulsar_subscriptions_count{cluster="standalone", namespace="public/default", + // topic="persistent://public/default/test-2"} 0.0 1517945780897 Pattern pattern = Pattern.compile("^(\\w+)\\{([^\\}]+)\\}\\s([+-]?[\\d\\w\\.-]+)(\\s(\\d+))?$"); Pattern tagsPattern = Pattern.compile("(\\w+)=\"([^\"]+)\"(,\\s?)?"); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionTestUtils.java b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionTestUtils.java index 46aa37da61de3..08708813be9e0 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionTestUtils.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionTestUtils.java @@ -73,8 +73,8 @@ public static Map parseMetrics(String metrics) { // Example of lines are // jvm_threads_current{cluster="standalone",} 203.0 // or - // pulsar_subscriptions_count{cluster="standalone", namespace="sample/standalone/ns1", - // topic="persistent://sample/standalone/ns1/test-2"} 0.0 1517945780897 + // pulsar_subscriptions_count{cluster="standalone", namespace="public/default", + // topic="persistent://public/default/test-2"} 0.0 1517945780897 Pattern pattern = Pattern.compile("^(\\w+)(\\{[^\\}]+\\})?\\s([+-]?[\\d\\w\\.-]+)(\\s(\\d+))?$"); Pattern tagsPattern = Pattern.compile("(\\w+)=\"([^\"]+)\"(,\\s?)?"); Arrays.asList(metrics.split("\n")).forEach(line -> { diff --git a/pulsar-client-cpp/docs/MainPage.md b/pulsar-client-cpp/docs/MainPage.md index 72dece6f88c5e..168c2e682b027 100644 --- a/pulsar-client-cpp/docs/MainPage.md +++ b/pulsar-client-cpp/docs/MainPage.md @@ -111,7 +111,7 @@ $ make Client client("pulsar://localhost:6650"); Consumer consumer; -Result result = client.subscribe("persistent://sample/standalone/ns1/my-topic", "my-subscribtion-name", consumer); +Result result = client.subscribe("persistent://public/default/my-topic", "my-subscribtion-name", consumer); if (result != ResultOk) { LOG_ERROR("Failed to subscribe: " << result); return -1; @@ -136,7 +136,7 @@ client.close(); Client client("pulsar://localhost:6650"); Producer producer; -Result result = client.createProducer("persistent://sample/standalone/ns1/my-topic", producer); +Result result = client.createProducer("persistent://public/default/my-topic", producer); if (result != ResultOk) { LOG_ERROR("Error creating producer: " << result); return -1; diff --git a/pulsar-client-cpp/pulsar-test-service-start.sh b/pulsar-client-cpp/pulsar-test-service-start.sh index 2bee18e64b9c5..928f72b4a4b70 100755 --- a/pulsar-client-cpp/pulsar-test-service-start.sh +++ b/pulsar-client-cpp/pulsar-test-service-start.sh @@ -74,12 +74,10 @@ $PULSAR_DIR/bin/pulsar-admin clusters create \ --broker-url pulsar://localhost:6650/ \ --broker-url-secure pulsar+ssl://localhost:6651/ -# Create "public" tenant -$PULSAR_DIR/bin/pulsar-admin tenants create public -r "anonymous" -c "standalone" +# Update "public" tenant +$PULSAR_DIR/bin/pulsar-admin tenants update public -r "anonymous" -c "standalone" -# Create "public/default" with no auth required -$PULSAR_DIR/bin/pulsar-admin namespaces create public/default \ - --clusters standalone +# Update "public/default" with no auth required $PULSAR_DIR/bin/pulsar-admin namespaces grant-permission public/default \ --actions produce,consume \ --role "anonymous" diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/DefaultSchemasTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/DefaultSchemasTest.java index 4d48e95b4e7f0..459376a4f33b0 100644 --- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/DefaultSchemasTest.java +++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/DefaultSchemasTest.java @@ -35,7 +35,7 @@ public class DefaultSchemasTest { private PulsarClient client; - private static final String TEST_TOPIC = "persistent://sample/standalone/ns1/test-topic"; + private static final String TEST_TOPIC = "test-topic"; @BeforeClass public void setup() throws PulsarClientException { diff --git a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/sink/PulsarSinkTest.java b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/sink/PulsarSinkTest.java index 48a6d1767d765..94878e33c5788 100644 --- a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/sink/PulsarSinkTest.java +++ b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/sink/PulsarSinkTest.java @@ -75,7 +75,7 @@ @Slf4j public class PulsarSinkTest { - private static final String TOPIC = "persistent://sample/standalone/ns1/test_result"; + private static final String TOPIC = "test_result"; public static class TestSerDe implements SerDe { diff --git a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeTest.java b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeTest.java index 7c64b66e530d9..18e3d9954aaad 100644 --- a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeTest.java +++ b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeTest.java @@ -87,8 +87,8 @@ public class KubernetesRuntimeTest { private static final String narExtractionDirectory = "/tmp/foo"; static { - topicsToSerDeClassName.put("persistent://sample/standalone/ns1/test_src", ""); - topicsToSchema.put("persistent://sample/standalone/ns1/test_src", + topicsToSerDeClassName.put("test_src", ""); + topicsToSchema.put("test_src", ConsumerSpec.newBuilder().setSerdeClassName("").setIsRegexPattern(false).build()); } @@ -857,7 +857,7 @@ private void verifyGolangInstance(InstanceConfig config) throws Exception { assertEquals(goInstanceConfig.get("killAfterIdleMs"), 0); assertEquals(goInstanceConfig.get("parallelism"), 0); assertEquals(goInstanceConfig.get("className"), ""); - assertEquals(goInstanceConfig.get("sourceSpecsTopic"), "persistent://sample/standalone/ns1/test_src"); + assertEquals(goInstanceConfig.get("sourceSpecsTopic"), "test_src"); assertEquals(goInstanceConfig.get("sourceSchemaType"), ""); assertEquals(goInstanceConfig.get("sinkSpecsTopic"), TEST_NAME + "-output"); assertEquals(goInstanceConfig.get("clusterName"), "standalone"); diff --git a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/process/ProcessRuntimeTest.java b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/process/ProcessRuntimeTest.java index bcbb0a4841612..955f2bb5399ef 100644 --- a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/process/ProcessRuntimeTest.java +++ b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/process/ProcessRuntimeTest.java @@ -100,8 +100,8 @@ public Type getSecretObjectType() { private static final Map topicsToSerDeClassName = new HashMap<>(); private static final Map topicsToSchema = new HashMap<>(); static { - topicsToSerDeClassName.put("persistent://sample/standalone/ns1/test_src", ""); - topicsToSchema.put("persistent://sample/standalone/ns1/test_src", + topicsToSerDeClassName.put("test_src", ""); + topicsToSchema.put("test_src", ConsumerSpec.newBuilder().setSerdeClassName("").setIsRegexPattern(false).build()); } diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImplTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImplTest.java index 36e7d76ebd5cd..3a98416fe813f 100644 --- a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImplTest.java +++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImplTest.java @@ -89,7 +89,7 @@ public String process(String input, Context context) { private Function.SubscriptionType subscriptionType = Function.SubscriptionType.FAILOVER; private static final Map topicsToSerDeClassName = new HashMap<>(); static { - topicsToSerDeClassName.put("persistent://sample/standalone/ns1/test_src", TopicSchema.DEFAULT_SERDE); + topicsToSerDeClassName.put("test_src", TopicSchema.DEFAULT_SERDE); } private static final int parallelism = 1; private static final String workerId = "worker-0"; diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2ResourceTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2ResourceTest.java index 09d0bfd2b828d..2bad2bd266eaa 100644 --- a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2ResourceTest.java +++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2ResourceTest.java @@ -114,7 +114,7 @@ public String process(String input, Context context) { private SubscriptionType subscriptionType = SubscriptionType.FAILOVER; private static final Map topicsToSerDeClassName = new HashMap<>(); static { - topicsToSerDeClassName.put("persistent://sample/standalone/ns1/test_src", TopicSchema.DEFAULT_SERDE); + topicsToSerDeClassName.put("persistent://public/default/test_src", TopicSchema.DEFAULT_SERDE); } private static final int parallelism = 1; @@ -420,7 +420,7 @@ public void testRegisterFunctionWrongParallelism() { } @Test(expectedExceptions = RestException.class, - expectedExceptionsMessageRegExp = "Output topic persistent://sample/standalone/ns1/test_src is also being used as an input topic \\(topics must be one or the other\\)") + expectedExceptionsMessageRegExp = "Output topic persistent://public/default/test_src is also being used as an input topic \\(topics must be one or the other\\)") public void testRegisterFunctionSameInputOutput() { try { testRegisterFunctionMissingArguments( diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/FunctionApiV3ResourceTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/FunctionApiV3ResourceTest.java index f6f2771d12920..fae1ae6d975d3 100644 --- a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/FunctionApiV3ResourceTest.java +++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/FunctionApiV3ResourceTest.java @@ -121,7 +121,7 @@ public void accept(String s) { private SubscriptionType subscriptionType = SubscriptionType.FAILOVER; private static final Map topicsToSerDeClassName = new HashMap<>(); static { - topicsToSerDeClassName.put("persistent://sample/standalone/ns1/test_src", TopicSchema.DEFAULT_SERDE); + topicsToSerDeClassName.put("persistent://public/default/test_src", TopicSchema.DEFAULT_SERDE); } private static final int parallelism = 1; @@ -425,7 +425,7 @@ public void testRegisterFunctionWrongParallelism() { } @Test(expectedExceptions = RestException.class, - expectedExceptionsMessageRegExp = "Output topic persistent://sample/standalone/ns1/test_src is also being used as an input topic \\(topics must be one or the other\\)") + expectedExceptionsMessageRegExp = "Output topic persistent://public/default/test_src is also being used as an input topic \\(topics must be one or the other\\)") public void testRegisterFunctionSameInputOutput() { try { testRegisterFunctionMissingArguments( diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/SinkApiV3ResourceTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/SinkApiV3ResourceTest.java index 8a06da1ac6351..1e588e4801644 100644 --- a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/SinkApiV3ResourceTest.java +++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/SinkApiV3ResourceTest.java @@ -101,7 +101,7 @@ public class SinkApiV3ResourceTest { private static final Map topicsToSerDeClassName = new HashMap<>(); static { - topicsToSerDeClassName.put("persistent://sample/standalone/ns1/test_src", DEFAULT_SERDE); + topicsToSerDeClassName.put("test_src", DEFAULT_SERDE); } private static final String subscriptionName = "test-subscription"; diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyPrometheusMetricsTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyPrometheusMetricsTest.java index 63ac43d321056..86e0b8727ab61 100644 --- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyPrometheusMetricsTest.java +++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyPrometheusMetricsTest.java @@ -131,8 +131,8 @@ public static Multimap parseMetrics(String metrics) { // Example of lines are // jvm_threads_current{cluster="standalone",} 203.0 // or - // pulsar_subscriptions_count{cluster="standalone", namespace="sample/standalone/ns1", - // topic="persistent://sample/standalone/ns1/test-2"} 0.0 1517945780897 + // pulsar_subscriptions_count{cluster="standalone", namespace="public/default", + // topic="persistent://public/default/test-2"} 0.0 1517945780897 Pattern pattern = Pattern.compile("^(\\w+)\\{([^\\}]+)\\}\\s([+-]?[\\d\\w\\.-]+)(\\s(\\d+))?$"); Pattern tagsPattern = Pattern.compile("(\\w+)=\"([^\"]+)\"(,\\s?)?"); diff --git a/site2/docs/sql-getting-started.md b/site2/docs/sql-getting-started.md index 6c9319b9248a2..a4e9da069f9d7 100644 --- a/site2/docs/sql-getting-started.md +++ b/site2/docs/sql-getting-started.md @@ -51,9 +51,8 @@ presto> show schemas in pulsar; ----------------------- information_schema public/default - public/functions - sample/standalone/ns1 -(4 rows) + public/functions +(3 rows) Query 20180829_211818_00005_7qpwh, FINISHED, 1 node Splits: 19 total, 19 done (100.00%)