From 7aa2d940a8dd3b46a799b6f32508896175202932 Mon Sep 17 00:00:00 2001 From: Ping Yu Date: Tue, 11 Oct 2022 15:21:10 +0800 Subject: [PATCH 01/22] [close #657] add production-readiness document (#658) Co-authored-by: Xiaoguang Sun --- docs/src/SUMMARY.md | 2 ++ docs/src/production-readiness.md | 18 ++++++++++++++++++ 2 files changed, 20 insertions(+) create mode 100644 docs/src/production-readiness.md diff --git a/docs/src/SUMMARY.md b/docs/src/SUMMARY.md index 36db8fa621c..1916a51b1a0 100644 --- a/docs/src/SUMMARY.md +++ b/docs/src/SUMMARY.md @@ -4,6 +4,8 @@ - [Introduction](./introduction/introduction.md) +- [Production Readiness](./production-readiness.md) + - [Start With Examples](./examples/introduction.md) - [Quick Start](./examples/quick-start.md) - [Interact with TiKV RawKV API](./examples/rawkv.md) diff --git a/docs/src/production-readiness.md b/docs/src/production-readiness.md new file mode 100644 index 00000000000..6604363a302 --- /dev/null +++ b/docs/src/production-readiness.md @@ -0,0 +1,18 @@ +# Production Readiness + +In general, the latest [release](https://github.com/tikv/client-java/releases) of TiKV Java Client is ready for production use. But it is not battle-tested as full featured client for TiKV in all use cases. This page will give you more details. + +## RawKV +All RawKV APIs are covered by [CI](https://github.com/tikv/client-java/actions/workflows/ci.yml). + +At this time, RawKV has been used in the production environment of some commercial customers in latency sensitive systems. But they only use part of the RawKV APIs (mainly including `raw_put`, `raw_get`, `raw_compare_and_swap`, and `raw_batch_put`). + +## TxnKV +All TxnKV APIs are covered by [CI](https://github.com/tikv/client-java/actions/workflows/ci.yml). + +In addition, TxnKV has been used in the [TiSpark](https://docs.pingcap.com/tidb/stable/tispark-overview) and [TiBigData](https://github.com/tidb-incubator/TiBigData) project to integrate data from TiDB to Big Data ecosystem. TiSpark and TiBigData are used in the production system of some commercial customers and internet companies. + +Similar to RawKV, only part of APIs are used in this scenario (mainly including `prewrite/commit` and `coprocessor`). And this use case doesn't care about latency but throughput and reliability. + +## TiDB Cloud +Directly using TiKV is not possible on TiDB Cloud due to the fact that client has to access the whole cluster, which has security issues. And TiKV managed service is not coming soon as it's not contained in [roadmap](https://docs.pingcap.com/tidbcloud/tidb-cloud-roadmap) yet. From 8cb7a8294c4c26f0080192e338ad0b9188d50e31 Mon Sep 17 00:00:00 2001 From: shi yuhang <52435083+shiyuhang0@users.noreply.github.com> Date: Tue, 1 Nov 2022 15:55:18 +0800 Subject: [PATCH 02/22] [close #663] Select TiFlash Stores Round-Robin (#662) --- .github/workflows/license-checker.yml | 2 +- .../org/tikv/common/region/RegionManager.java | 12 +++-- src/test/java/org/tikv/common/GrpcUtils.java | 5 ++ .../org/tikv/common/RegionManagerTest.java | 51 ++++++++++++++++++- 4 files changed, 65 insertions(+), 5 deletions(-) diff --git a/.github/workflows/license-checker.yml b/.github/workflows/license-checker.yml index 4e1cf90a4fe..cd5c12f84c4 100644 --- a/.github/workflows/license-checker.yml +++ b/.github/workflows/license-checker.yml @@ -15,7 +15,7 @@ jobs: steps: - uses: actions/checkout@v2 - name: Check License Header - uses: apache/skywalking-eyes@main + uses: apache/skywalking-eyes@v0.3.0 env: GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} with: diff --git a/src/main/java/org/tikv/common/region/RegionManager.java b/src/main/java/org/tikv/common/region/RegionManager.java index 44c81375107..45cfc5160d9 100644 --- a/src/main/java/org/tikv/common/region/RegionManager.java +++ b/src/main/java/org/tikv/common/region/RegionManager.java @@ -26,6 +26,7 @@ import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.tikv.common.ReadOnlyPDClient; @@ -68,6 +69,7 @@ public class RegionManager { private final TiConfiguration conf; private final ScheduledExecutorService executor; private final StoreHealthyChecker storeChecker; + private AtomicInteger tiflashStoreIndex = new AtomicInteger(0); public RegionManager( TiConfiguration conf, ReadOnlyPDClient pdClient, ChannelFactory channelFactory) { @@ -191,17 +193,21 @@ public Pair getRegionStorePairByKey( Peer peer = region.getCurrentReplica(); store = getStoreById(peer.getStoreId(), backOffer); } else { - outerLoop: + List tiflashStores = new ArrayList<>(); for (Peer peer : region.getLearnerList()) { TiStore s = getStoreById(peer.getStoreId(), backOffer); for (Metapb.StoreLabel label : s.getStore().getLabelsList()) { if (label.getKey().equals(storeType.getLabelKey()) && label.getValue().equals(storeType.getLabelValue())) { - store = s; - break outerLoop; + tiflashStores.add(s); } } } + // select a tiflash with RR strategy + if (tiflashStores.size() > 0) { + store = tiflashStores.get(tiflashStoreIndex.getAndIncrement() % tiflashStores.size()); + } + if (store == null) { // clear the region cache, so we may get the learner peer next time cache.invalidateRegion(region); diff --git a/src/test/java/org/tikv/common/GrpcUtils.java b/src/test/java/org/tikv/common/GrpcUtils.java index e6793f01f1d..e7a268f6c31 100644 --- a/src/test/java/org/tikv/common/GrpcUtils.java +++ b/src/test/java/org/tikv/common/GrpcUtils.java @@ -24,6 +24,7 @@ import org.tikv.common.codec.Codec.BytesCodec; import org.tikv.common.codec.CodecDataOutput; import org.tikv.kvproto.Metapb.Peer; +import org.tikv.kvproto.Metapb.PeerRole; import org.tikv.kvproto.Metapb.Region; import org.tikv.kvproto.Metapb.RegionEpoch; import org.tikv.kvproto.Metapb.Store; @@ -61,6 +62,10 @@ public static Peer makePeer(long id, long storeId) { return Peer.newBuilder().setStoreId(storeId).setId(id).build(); } + public static Peer makeLearnerPeer(long id, long storeId) { + return Peer.newBuilder().setRole(PeerRole.Learner).setStoreId(storeId).setId(id).build(); + } + public static ByteString encodeKey(byte[] key) { CodecDataOutput cdo = new CodecDataOutput(); BytesCodec.writeBytes(cdo, key); diff --git a/src/test/java/org/tikv/common/RegionManagerTest.java b/src/test/java/org/tikv/common/RegionManagerTest.java index 6052640f9bc..eddd22a6c65 100644 --- a/src/test/java/org/tikv/common/RegionManagerTest.java +++ b/src/test/java/org/tikv/common/RegionManagerTest.java @@ -31,6 +31,7 @@ import org.tikv.common.region.RegionManager; import org.tikv.common.region.TiRegion; import org.tikv.common.region.TiStore; +import org.tikv.common.region.TiStoreType; import org.tikv.common.util.KeyRangeUtils; import org.tikv.common.util.Pair; import org.tikv.kvproto.Metapb; @@ -135,7 +136,7 @@ public void getStoreByKey() { Pair pair = mgr.getRegionStorePairByKey(searchKey); assertEquals(pair.first.getId(), regionId); - assertEquals(pair.first.getId(), storeId); + assertEquals(pair.second.getId(), 10); } @Test @@ -179,4 +180,52 @@ public void getStoreById() { } catch (Exception ignored) { } } + + @Test + public void getRegionStorePairByKeyWithTiFlash() { + + ByteString startKey = ByteString.copyFrom(new byte[] {1}); + ByteString endKey = ByteString.copyFrom(new byte[] {10}); + ByteString searchKey = ByteString.copyFrom(new byte[] {5}); + String testAddress = "testAddress"; + long firstStoreId = 233; + long secondStoreId = 234; + int confVer = 1026; + int ver = 1027; + long regionId = 233; + leader.addGetRegionListener( + request -> + GrpcUtils.makeGetRegionResponse( + leader.getClusterId(), + GrpcUtils.makeRegion( + regionId, + GrpcUtils.encodeKey(startKey.toByteArray()), + GrpcUtils.encodeKey(endKey.toByteArray()), + GrpcUtils.makeRegionEpoch(confVer, ver), + GrpcUtils.makeLearnerPeer(1, firstStoreId), + GrpcUtils.makeLearnerPeer(2, secondStoreId)))); + + AtomicInteger i = new AtomicInteger(0); + long[] ids = new long[] {firstStoreId, secondStoreId}; + leader.addGetStoreListener( + (request -> + GrpcUtils.makeGetStoreResponse( + leader.getClusterId(), + GrpcUtils.makeStore( + ids[i.getAndIncrement()], + testAddress, + StoreState.Up, + GrpcUtils.makeStoreLabel("engine", "tiflash"), + GrpcUtils.makeStoreLabel("k1", "v1"), + GrpcUtils.makeStoreLabel("k2", "v2"))))); + + Pair pair = mgr.getRegionStorePairByKey(searchKey, TiStoreType.TiFlash); + assertEquals(pair.first.getId(), regionId); + assertEquals(pair.second.getId(), firstStoreId); + + Pair secondPair = + mgr.getRegionStorePairByKey(searchKey, TiStoreType.TiFlash); + assertEquals(secondPair.first.getId(), regionId); + assertEquals(secondPair.second.getId(), secondStoreId); + } } From 870a9fb8bc282e4dee4bd25a0305615dafb857a3 Mon Sep 17 00:00:00 2001 From: Ping Yu Date: Fri, 2 Dec 2022 11:57:10 +0800 Subject: [PATCH 03/22] [close #666] Fix some issues in example and README (#667) Signed-off-by: pingyu Signed-off-by: pingyu --- README.md | 34 ++++++++++++++++++++++---------- docs/src/examples/quick-start.md | 2 +- docs/src/examples/rawkv.md | 4 ++-- docs/src/examples/txnkv.md | 2 +- 4 files changed, 28 insertions(+), 14 deletions(-) diff --git a/README.md b/README.md index 0acd816deb1..b2d64294b84 100644 --- a/README.md +++ b/README.md @@ -4,18 +4,17 @@ ## TiKV JAVA Client -A Java client for [TiDB](https://github.com/pingcap/tidb)/[TiKV](https://github.com/tikv/tikv). +A Java client for [TiKV](https://github.com/tikv/tikv). It is supposed to: + Communicate via [gRPC](http://www.grpc.io/) + Talk to Placement Driver searching for a region -+ Talk to TiKV for reading/writing data and the resulted data is encoded/decoded just like what we do in TiDB. -+ Talk to Coprocessor for calculation pushdown ++ Talk to TiKV for reading/writing data ## Quick Start -> TiKV Java Client is designed to communicate with [pd](https://github.com/tikv/pd) and [tikv](https://github.com/tikv/tikv), please run TiKV and PD in advance. +> TiKV Java Client is designed to communicate with [PD](https://github.com/tikv/pd) and [TiKV](https://github.com/tikv/tikv), please run PD and TiKV in advance. -Build java client from source file: +Build Java client from source file: ```sh mvn clean install -Dmaven.test.skip=true @@ -27,11 +26,27 @@ Add maven dependency to `pom.xml`: org.tikv tikv-client-java - 3.1.0 + 3.3.0 ``` -Create a RawKVClient and communicates with TiKV: +Create a transactional `KVClient` and communicates with TiKV: + +```java +import org.tikv.common.TiConfiguration; +import org.tikv.common.TiSession; +import org.tikv.txn.KVClient; + +public class Main { + public static void main(String[] args) throws Exception { + TiConfiguration conf = TiConfiguration.createDefault(YOUR_PD_ADDRESSES); + TiSession session = TiSession.create(conf); + KVClient client = session.createKVClient(); + } +} +``` + +Or create a `RawKVClient` if you don't need the transaction semantic: ```java import org.tikv.common.TiConfiguration; @@ -39,8 +54,7 @@ import org.tikv.common.TiSession; import org.tikv.raw.RawKVClient; public class Main { - public static void main() { - // You MUST create a raw configuration if you are using RawKVClient. + public static void main(String[] args) throws Exception { TiConfiguration conf = TiConfiguration.createRawDefault(YOUR_PD_ADDRESSES); TiSession session = TiSession.create(conf); RawKVClient client = session.createRawClient(); @@ -48,7 +62,7 @@ public class Main { } ``` -Find more demo in [KVRawClientTest](https://github.com/birdstorm/KVRawClientTest/) +Find more demo in [TiKV Java Client User Documents](https://tikv.github.io/client-java/examples/introduction.html) ## Documentation diff --git a/docs/src/examples/quick-start.md b/docs/src/examples/quick-start.md index 36078f26733..09c6b186457 100644 --- a/docs/src/examples/quick-start.md +++ b/docs/src/examples/quick-start.md @@ -24,7 +24,7 @@ Add maven dependency to `pom.xml`. org.tikv tikv-client-java - 3.1.0 + 3.3.0 org.slf4j diff --git a/docs/src/examples/rawkv.md b/docs/src/examples/rawkv.md index 182cc8d09c3..e4c9bcacad7 100644 --- a/docs/src/examples/rawkv.md +++ b/docs/src/examples/rawkv.md @@ -15,7 +15,7 @@ import org.tikv.raw.RawKVClient; import org.tikv.shade.com.google.protobuf.ByteString; public class Main { - public static void main() { + public static void main(String[] args) throws Exception { // You MUST create a raw configuration if you are using RawKVClient. TiConfiguration conf = TiConfiguration.createRawDefault("127.0.0.1:2379"); TiSession session = TiSession.create(conf); @@ -61,7 +61,7 @@ To enable the API V2 mode, users need to specify the API version of the client. import org.tikv.common.TiConfiguration.ApiVersion; public class Main { - public static void main() { + public static void main(String[] args) throws Exception { TiConfiguration conf = TiConfiguration.createRawDefault("127.0.0.1:2379"); conf.setApiVersion(ApiVersion.V2); try(TiSession session = TiSession.create(conf)) { diff --git a/docs/src/examples/txnkv.md b/docs/src/examples/txnkv.md index 9bcfb430c49..e3e2e0d4a73 100644 --- a/docs/src/examples/txnkv.md +++ b/docs/src/examples/txnkv.md @@ -23,7 +23,7 @@ import org.tikv.txn.TwoPhaseCommitter; public class App { public static void main(String[] args) throws Exception { - TiConfiguration conf = TiConfiguration.createDefault("127.0.0.1:2389"); + TiConfiguration conf = TiConfiguration.createDefault("127.0.0.1:2379"); try (TiSession session = TiSession.create(conf)) { // two-phrase write long startTS = session.getTimestamp().getVersion(); From 8936a91a98c160ee7bfbc4ab555e1576f4388e77 Mon Sep 17 00:00:00 2001 From: Xiang Zhang Date: Tue, 20 Dec 2022 17:12:10 +0800 Subject: [PATCH 04/22] Delete stale-checker.yml (#681) Signed-off-by: zhangyangyu --- .github/workflows/stale-checker.yml | 19 ------------------- 1 file changed, 19 deletions(-) delete mode 100644 .github/workflows/stale-checker.yml diff --git a/.github/workflows/stale-checker.yml b/.github/workflows/stale-checker.yml deleted file mode 100644 index 478fc504f7a..00000000000 --- a/.github/workflows/stale-checker.yml +++ /dev/null @@ -1,19 +0,0 @@ -name: 'Stale Checker' -on: - schedule: - - cron: '0 0 * * *' - -jobs: - stale: - runs-on: ubuntu-latest - steps: - - uses: actions/stale@v4 - with: - days-before-stale: 30 - stale-issue-message: 'This issue is stale because it has been open 30 days with no activity.' - stale-issue-label: 'status/stale' - days-before-issue-close: -1 - stale-pr-message: 'This PR is stale because it has been open 30 days with no activity. Remove the status/stale label or comment or this PR will be closed in 7 days.' - stale-pr-label: 'status/stale' - days-before-pr-close: 7 - close-pr-message: 'This PR was closed because it has been stalled for 7 days with no activity.' From 24ed9e2b8aed55e0d825121d36371806e3a8bffb Mon Sep 17 00:00:00 2001 From: shi yuhang <52435083+shiyuhang0@users.noreply.github.com> Date: Tue, 20 Dec 2022 20:31:42 +0800 Subject: [PATCH 05/22] [close #663] Avoid overflow (#664) Signed-off-by: shiyuhang <1136742008@qq.com> --- src/main/java/org/tikv/common/region/RegionManager.java | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/src/main/java/org/tikv/common/region/RegionManager.java b/src/main/java/org/tikv/common/region/RegionManager.java index 45cfc5160d9..2d84f9988e4 100644 --- a/src/main/java/org/tikv/common/region/RegionManager.java +++ b/src/main/java/org/tikv/common/region/RegionManager.java @@ -203,9 +203,11 @@ public Pair getRegionStorePairByKey( } } } - // select a tiflash with RR strategy + // select a tiflash with Round-Robin strategy if (tiflashStores.size() > 0) { - store = tiflashStores.get(tiflashStoreIndex.getAndIncrement() % tiflashStores.size()); + store = + tiflashStores.get( + Math.floorMod(tiflashStoreIndex.getAndIncrement(), tiflashStores.size())); } if (store == null) { From 1554ae5feccfddc8c67b59c417c201b322518027 Mon Sep 17 00:00:00 2001 From: shi yuhang <52435083+shiyuhang0@users.noreply.github.com> Date: Tue, 20 Dec 2022 20:54:20 +0800 Subject: [PATCH 06/22] let column_case_insentive (#678) Signed-off-by: shiyuhang <1136742008@qq.com> --- src/main/java/org/tikv/common/expression/ColumnRef.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/main/java/org/tikv/common/expression/ColumnRef.java b/src/main/java/org/tikv/common/expression/ColumnRef.java index 0a6ed6e4b0f..61746cd2e11 100644 --- a/src/main/java/org/tikv/common/expression/ColumnRef.java +++ b/src/main/java/org/tikv/common/expression/ColumnRef.java @@ -123,9 +123,9 @@ public boolean equals(Object another) { @Override public int hashCode() { if (isResolved()) { - return Objects.hash(this.name, this.dataType); + return Objects.hash(this.name.toLowerCase(), this.dataType); } else { - return Objects.hashCode(name); + return Objects.hashCode(name.toLowerCase()); } } From 1f096b5a381aaae0696f5030117f6f6197bbd48c Mon Sep 17 00:00:00 2001 From: shi yuhang <52435083+shiyuhang0@users.noreply.github.com> Date: Tue, 20 Dec 2022 21:32:21 +0800 Subject: [PATCH 07/22] [close #684] Fix batch get blocked by write (#685) Signed-off-by: shiyuhang <1136742008@qq.com> --- src/main/java/org/tikv/txn/KVClient.java | 8 +- src/test/java/org/tikv/txn/BatchGetTest.java | 103 +++++++++++++++++++ src/test/java/org/tikv/txn/TXNTest.java | 2 +- 3 files changed, 111 insertions(+), 2 deletions(-) create mode 100644 src/test/java/org/tikv/txn/BatchGetTest.java diff --git a/src/main/java/org/tikv/txn/KVClient.java b/src/main/java/org/tikv/txn/KVClient.java index dfa9b8b2962..e8c83c54463 100644 --- a/src/main/java/org/tikv/txn/KVClient.java +++ b/src/main/java/org/tikv/txn/KVClient.java @@ -50,6 +50,7 @@ public class KVClient implements AutoCloseable { private final RegionStoreClientBuilder clientBuilder; private final TiConfiguration conf; private final ExecutorService executorService; + private Set resolvedLocks = Collections.emptySet(); public KVClient(TiConfiguration conf, RegionStoreClientBuilder clientBuilder, TiSession session) { Objects.requireNonNull(conf, "conf is null"); @@ -223,6 +224,10 @@ private List doSendBatchGetInBatchesWithRetry( if (oldRegion.equals(currentRegion)) { RegionStoreClient client = clientBuilder.build(batch.getRegion()); + // set resolvedLocks for the new client + if (!resolvedLocks.isEmpty()) { + client.addResolvedLocks(version, resolvedLocks); + } try { return client.batchGet(backOffer, batch.getKeys(), version); } catch (final TiKVException e) { @@ -230,7 +235,8 @@ private List doSendBatchGetInBatchesWithRetry( clientBuilder.getRegionManager().invalidateRegion(batch.getRegion()); logger.warn("ReSplitting ranges for BatchGetRequest", e); - // retry + // get resolved locks and retry + resolvedLocks = client.getResolvedLocks(version); return doSendBatchGetWithRefetchRegion(backOffer, batch, version); } } else { diff --git a/src/test/java/org/tikv/txn/BatchGetTest.java b/src/test/java/org/tikv/txn/BatchGetTest.java new file mode 100644 index 00000000000..cbdff1b3920 --- /dev/null +++ b/src/test/java/org/tikv/txn/BatchGetTest.java @@ -0,0 +1,103 @@ +/* + * Copyright 2022 TiKV Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.tikv.txn; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotSame; + +import com.google.protobuf.ByteString; +import java.util.Arrays; +import java.util.List; +import org.junit.Test; +import org.tikv.common.BytePairWrapper; +import org.tikv.common.ByteWrapper; +import org.tikv.common.exception.KeyException; +import org.tikv.common.util.BackOffer; +import org.tikv.common.util.ConcreteBackOffer; +import org.tikv.kvproto.Kvrpcpb.KvPair; + +public class BatchGetTest extends TXNTest { + + @Test + public void BatchGetResolveLockTest() throws Exception { + long lockTTL = 20000L; + String key1 = "batchGetResolveLockTestKey1"; + String key2 = "batchGetResolveLockTestKey2"; + String val1 = "val1"; + String val2 = "val2"; + String val1_update = "val1_update"; + String val2_update = "val2_update"; + + // put key1 and key2 + putKV(key1, val1); + putKV(key2, val2); + + // run 2PC background + new Thread( + () -> { + long startTS = session.getTimestamp().getVersion(); + try (TwoPhaseCommitter twoPhaseCommitter = + new TwoPhaseCommitter(session, startTS, lockTTL)) { + byte[] primaryKey = key1.getBytes("UTF-8"); + byte[] secondary = key2.getBytes("UTF-8"); + // prewrite primary key + twoPhaseCommitter.prewritePrimaryKey( + ConcreteBackOffer.newCustomBackOff(5000), + primaryKey, + val1_update.getBytes("UTF-8")); + List pairs = + Arrays.asList(new BytePairWrapper(secondary, val2_update.getBytes("UTF-8"))); + // prewrite secondary key + twoPhaseCommitter.prewriteSecondaryKeys(primaryKey, pairs.iterator(), 5000); + + // get commitTS + long commitTS = session.getTimestamp().getVersion(); + Thread.sleep(5000); + // commit primary key + twoPhaseCommitter.commitPrimaryKey( + ConcreteBackOffer.newCustomBackOff(5000), primaryKey, commitTS); + // commit secondary key + List keys = Arrays.asList(new ByteWrapper(secondary)); + twoPhaseCommitter.commitSecondaryKeys(keys.iterator(), commitTS, 5000); + } catch (Exception e) { + KeyException keyException = (KeyException) e.getCause().getCause(); + assertNotSame("", keyException.getKeyErr().getCommitTsExpired().toString()); + } + }) + .start(); + + // wait 2PC get commitTS + Thread.sleep(2000); + // batch get key1 and key2 + try (KVClient kvClient = session.createKVClient()) { + long version = session.getTimestamp().getVersion(); + ByteString k1 = ByteString.copyFromUtf8(key1); + ByteString k2 = ByteString.copyFromUtf8(key2); + + BackOffer backOffer = ConcreteBackOffer.newCustomBackOff(5000); + List kvPairs = kvClient.batchGet(backOffer, Arrays.asList(k1, k2), version); + // Since TiKV v4.0.0 write locked key will not block read. it is supported by Min Commit + // Timestamp + assertEquals(ByteString.copyFromUtf8(val1), kvPairs.get(0).getValue()); + assertEquals(ByteString.copyFromUtf8(val2), kvPairs.get(1).getValue()); + System.out.println(kvPairs); + // wait 2PC finish + Thread.sleep(10000); + } + } +} diff --git a/src/test/java/org/tikv/txn/TXNTest.java b/src/test/java/org/tikv/txn/TXNTest.java index 92af0383da1..386ad8182e0 100644 --- a/src/test/java/org/tikv/txn/TXNTest.java +++ b/src/test/java/org/tikv/txn/TXNTest.java @@ -41,7 +41,7 @@ public class TXNTest extends BaseTxnKVTest { static final int DEFAULT_TTL = 10; - private TiSession session; + public TiSession session; RegionStoreClient.RegionStoreClientBuilder builder; @Before From a459a6ed3afda103fa24ce3b71a01a89ba2a7adf Mon Sep 17 00:00:00 2001 From: Ping Yu Date: Thu, 29 Dec 2022 15:08:33 +0800 Subject: [PATCH 08/22] [to #656] Fix scan with lock (#670) --- .../tikv/common/region/RegionStoreClient.java | 50 ++--- .../java/org/tikv/common/KVMockServer.java | 173 +++++++++++++++++- .../java/org/tikv/common/MockServerTest.java | 6 +- .../org/tikv/common/PDClientMockTest.java | 7 +- .../java/org/tikv/common/PDMockServer.java | 16 +- .../tikv/common/RegionStoreClientTest.java | 120 +++++++++++- 6 files changed, 332 insertions(+), 40 deletions(-) diff --git a/src/main/java/org/tikv/common/region/RegionStoreClient.java b/src/main/java/org/tikv/common/region/RegionStoreClient.java index ba742c872b0..22607b2bdb1 100644 --- a/src/main/java/org/tikv/common/region/RegionStoreClient.java +++ b/src/main/java/org/tikv/common/region/RegionStoreClient.java @@ -357,7 +357,7 @@ public List scan( this, lockResolverClient, resp -> resp.hasRegionError() ? resp.getRegionError() : null, - resp -> null, + resp -> resp.hasError() ? resp.getError() : null, resolveLockResult -> addResolvedLocks(version, resolveLockResult.getResolvedLocks()), version, forWrite); @@ -366,13 +366,14 @@ public List scan( // we need to update region after retry region = regionManager.getRegionByKey(startKey, backOffer); - if (isScanSuccess(backOffer, resp)) { - return doScan(resp); + if (handleScanResponse(backOffer, resp, version, forWrite)) { + return resp.getPairsList(); } } } - private boolean isScanSuccess(BackOffer backOffer, ScanResponse resp) { + private boolean handleScanResponse( + BackOffer backOffer, ScanResponse resp, long version, boolean forWrite) { if (resp == null) { this.regionManager.onRequestFail(region); throw new TiClientInternalException("ScanResponse failed without a cause"); @@ -381,28 +382,35 @@ private boolean isScanSuccess(BackOffer backOffer, ScanResponse resp) { backOffer.doBackOff(BoRegionMiss, new RegionException(resp.getRegionError())); return false; } - return true; - } - // TODO: resolve locks after scan - private List doScan(ScanResponse resp) { - // Check if kvPair contains error, it should be a Lock if hasError is true. - List kvPairs = resp.getPairsList(); - List newKvPairs = new ArrayList<>(); - for (KvPair kvPair : kvPairs) { + // Resolve locks + // Note: Memory lock conflict is returned by both `ScanResponse.error` & + // `ScanResponse.pairs[0].error`, while other key errors are returned by + // `ScanResponse.pairs.error` + // See https://github.com/pingcap/kvproto/pull/697 + List locks = new ArrayList<>(); + for (KvPair kvPair : resp.getPairsList()) { if (kvPair.hasError()) { Lock lock = AbstractLockResolverClient.extractLockFromKeyErr(kvPair.getError(), codec); - newKvPairs.add( - KvPair.newBuilder() - .setError(kvPair.getError()) - .setValue(kvPair.getValue()) - .setKey(lock.getKey()) - .build()); - } else { - newKvPairs.add(codec.decodeKvPair(kvPair)); + locks.add(lock); + } + } + if (!locks.isEmpty()) { + ResolveLockResult resolveLockResult = + lockResolverClient.resolveLocks(backOffer, version, locks, forWrite); + addResolvedLocks(version, resolveLockResult.getResolvedLocks()); + + long msBeforeExpired = resolveLockResult.getMsBeforeTxnExpired(); + if (msBeforeExpired > 0) { + // if not resolve all locks, we wait and retry + backOffer.doBackOffWithMaxSleep( + BoTxnLockFast, msBeforeExpired, new KeyException(locks.toString())); } + + return false; } - return Collections.unmodifiableList(newKvPairs); + + return true; } public List scan(BackOffer backOffer, ByteString startKey, long version) { diff --git a/src/test/java/org/tikv/common/KVMockServer.java b/src/test/java/org/tikv/common/KVMockServer.java index 69d8a55ee0e..ea09270cfc7 100644 --- a/src/test/java/org/tikv/common/KVMockServer.java +++ b/src/test/java/org/tikv/common/KVMockServer.java @@ -46,6 +46,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.tikv.common.key.Key; +import org.tikv.common.meta.TiTimestamp; import org.tikv.common.region.TiRegion; import org.tikv.kvproto.Coprocessor; import org.tikv.kvproto.Errorpb; @@ -67,6 +68,10 @@ public class KVMockServer extends TikvGrpc.TikvImplBase { private final Map> keyErrMap = new HashMap<>(); + private final Map> lockMap = new HashMap<>(); + private final Map> txnStatusMap = + new HashMap<>(); + // for KV error public static final int ABORT = 1; public static final int RETRY = 2; @@ -117,9 +122,68 @@ public void putError(String key, Supplier builder) { regionErrMap.put(toRawKey(key.getBytes(StandardCharsets.UTF_8)), builder); } + public void removeError(String key) { + regionErrMap.remove(toRawKey(key.getBytes(StandardCharsets.UTF_8))); + } + + // putWithLock is used to "prewrite" key-value without "commit" + public void putWithLock( + ByteString key, ByteString value, ByteString primaryKey, Long startTs, Long ttl) { + put(key, value); + + Kvrpcpb.LockInfo.Builder lock = + Kvrpcpb.LockInfo.newBuilder() + .setPrimaryLock(primaryKey) + .setLockVersion(startTs) + .setKey(key) + .setLockTtl(ttl); + lockMap.put(toRawKey(key), () -> lock); + } + + public void removeLock(ByteString key) { + lockMap.remove(toRawKey(key)); + } + + public boolean hasLock(ByteString key) { + return lockMap.containsKey(toRawKey(key)); + } + + // putTxnStatus is used to save transaction status + // commitTs > 0: committed + // commitTs == 0 && key is empty: rollback + // commitTs == 0 && key not empty: locked by key + public void putTxnStatus(Long startTs, Long commitTs, ByteString key) { + if (commitTs > 0 || (commitTs == 0 && key.isEmpty())) { // committed || rollback + Kvrpcpb.CheckTxnStatusResponse.Builder txnStatus = + Kvrpcpb.CheckTxnStatusResponse.newBuilder() + .setCommitVersion(commitTs) + .setLockTtl(0) + .setAction(Kvrpcpb.Action.NoAction); + txnStatusMap.put(startTs, () -> txnStatus); + } else { // locked + Kvrpcpb.LockInfo.Builder lock = lockMap.get(toRawKey(key)).get(); + Kvrpcpb.CheckTxnStatusResponse.Builder txnStatus = + Kvrpcpb.CheckTxnStatusResponse.newBuilder() + .setCommitVersion(commitTs) + .setLockTtl(lock.getLockTtl()) + .setAction(Kvrpcpb.Action.NoAction) + .setLockInfo(lock); + txnStatusMap.put(startTs, () -> txnStatus); + } + } + + // putTxnStatus is used to save transaction status + // commitTs > 0: committed + // commitTs == 0: rollback + public void putTxnStatus(Long startTs, Long commitTs) { + putTxnStatus(startTs, commitTs, ByteString.EMPTY); + } + public void clearAllMap() { dataMap.clear(); regionErrMap.clear(); + lockMap.clear(); + txnStatusMap.clear(); } private Errorpb.Error verifyContext(Context context) throws Exception { @@ -255,9 +319,12 @@ public void kvGet( return; } + Supplier lock = lockMap.get(key); Supplier errProvider = keyErrMap.remove(key); if (errProvider != null) { builder.setError(errProvider.get().build()); + } else if (lock != null) { + builder.setError(Kvrpcpb.KeyError.newBuilder().setLocked(lock.get())); } else { ByteString value = dataMap.get(key); builder.setValue(value); @@ -299,11 +366,17 @@ public void kvScan( kvs.entrySet() .stream() .map( - kv -> - Kvrpcpb.KvPair.newBuilder() - .setKey(kv.getKey().toByteString()) - .setValue(kv.getValue()) - .build()) + kv -> { + Kvrpcpb.KvPair.Builder kvBuilder = + Kvrpcpb.KvPair.newBuilder() + .setKey(kv.getKey().toByteString()) + .setValue(kv.getValue()); + Supplier lock = lockMap.get(kv.getKey()); + if (lock != null) { + kvBuilder.setError(Kvrpcpb.KeyError.newBuilder().setLocked(lock.get())); + } + return kvBuilder.build(); + }) .collect(Collectors.toList())); } responseObserver.onNext(builder.build()); @@ -354,6 +427,96 @@ public void kvBatchGet( } } + @Override + public void kvCheckTxnStatus( + org.tikv.kvproto.Kvrpcpb.CheckTxnStatusRequest request, + io.grpc.stub.StreamObserver + responseObserver) { + logger.info("KVMockServer.kvCheckTxnStatus"); + try { + Long startTs = request.getLockTs(); + Long currentTs = request.getCurrentTs(); + logger.info("kvCheckTxnStatus for txn: " + startTs); + Kvrpcpb.CheckTxnStatusResponse.Builder builder = Kvrpcpb.CheckTxnStatusResponse.newBuilder(); + + Error e = verifyContext(request.getContext()); + if (e != null) { + responseObserver.onNext(builder.setRegionError(e).build()); + responseObserver.onCompleted(); + return; + } + + Supplier txnStatus = txnStatusMap.get(startTs); + if (txnStatus != null) { + Kvrpcpb.CheckTxnStatusResponse resp = txnStatus.get().build(); + if (resp.getCommitVersion() == 0 + && resp.getLockTtl() > 0 + && TiTimestamp.extractPhysical(startTs) + resp.getLockInfo().getLockTtl() + < TiTimestamp.extractPhysical(currentTs)) { + ByteString key = resp.getLockInfo().getKey(); + logger.info( + String.format( + "kvCheckTxnStatus rollback expired txn: %d, remove lock: %s", + startTs, key.toStringUtf8())); + removeLock(key); + putTxnStatus(startTs, 0L, ByteString.EMPTY); + resp = txnStatusMap.get(startTs).get().build(); + } + logger.info("kvCheckTxnStatus resp: " + resp); + responseObserver.onNext(resp); + } else { + builder.setError( + Kvrpcpb.KeyError.newBuilder() + .setTxnNotFound( + Kvrpcpb.TxnNotFound.newBuilder() + .setPrimaryKey(request.getPrimaryKey()) + .setStartTs(startTs))); + logger.info("kvCheckTxnStatus, TxnNotFound"); + responseObserver.onNext(builder.build()); + } + responseObserver.onCompleted(); + } catch (Exception e) { + logger.error("kvCheckTxnStatus error: " + e); + responseObserver.onError(Status.INTERNAL.asRuntimeException()); + } + } + + @Override + public void kvResolveLock( + org.tikv.kvproto.Kvrpcpb.ResolveLockRequest request, + io.grpc.stub.StreamObserver responseObserver) { + logger.info("KVMockServer.kvResolveLock"); + try { + Long startTs = request.getStartVersion(); + Long commitTs = request.getCommitVersion(); + logger.info( + String.format( + "kvResolveLock for txn: %d, commitTs: %d, keys: %d", + startTs, commitTs, request.getKeysCount())); + Kvrpcpb.ResolveLockResponse.Builder builder = Kvrpcpb.ResolveLockResponse.newBuilder(); + + Error e = verifyContext(request.getContext()); + if (e != null) { + responseObserver.onNext(builder.setRegionError(e).build()); + responseObserver.onCompleted(); + return; + } + + if (request.getKeysCount() == 0) { + lockMap.entrySet().removeIf(entry -> entry.getValue().get().getLockVersion() == startTs); + } else { + for (int i = 0; i < request.getKeysCount(); i++) { + removeLock(request.getKeys(i)); + } + } + + responseObserver.onNext(builder.build()); + responseObserver.onCompleted(); + } catch (Exception e) { + responseObserver.onError(Status.INTERNAL.asRuntimeException()); + } + } + @Override public void coprocessor( org.tikv.kvproto.Coprocessor.Request requestWrap, diff --git a/src/test/java/org/tikv/common/MockServerTest.java b/src/test/java/org/tikv/common/MockServerTest.java index 02cab4c46f4..db9ae5694b4 100644 --- a/src/test/java/org/tikv/common/MockServerTest.java +++ b/src/test/java/org/tikv/common/MockServerTest.java @@ -39,6 +39,8 @@ public class MockServerTest extends PDMockServerTest { public void setup() throws IOException { super.setup(); + port = GrpcUtils.getFreePort(); + Metapb.Region r = Metapb.Region.newBuilder() .setRegionEpoch(Metapb.RegionEpoch.newBuilder().setConfVer(1).setVersion(2)) @@ -51,7 +53,7 @@ public void setup() throws IOException { List s = ImmutableList.of( Metapb.Store.newBuilder() - .setAddress("localhost:1234") + .setAddress(LOCAL_ADDR + ":" + port) .setVersion("5.0.0") .setId(13) .build()); @@ -70,6 +72,6 @@ public void setup() throws IOException { (request) -> Pdpb.GetStoreResponse.newBuilder().setStore(store).build()); } server = new KVMockServer(); - port = server.start(region); + server.start(region, port); } } diff --git a/src/test/java/org/tikv/common/PDClientMockTest.java b/src/test/java/org/tikv/common/PDClientMockTest.java index a8074d94572..6837334feea 100644 --- a/src/test/java/org/tikv/common/PDClientMockTest.java +++ b/src/test/java/org/tikv/common/PDClientMockTest.java @@ -74,9 +74,12 @@ public void testSwitchLeader() throws Exception { @Test public void testTso() throws Exception { try (PDClient client = session.getPDClient()) { + Long current = System.currentTimeMillis(); TiTimestamp ts = client.getTimestamp(defaultBackOff()); - // Test pdServer is set to generate physical == logical + 1 - assertEquals(ts.getPhysical(), ts.getLogical() + 1); + // Test pdServer is set to generate physical to current, logical to 1 + assertTrue(ts.getPhysical() >= current); + assertTrue(ts.getPhysical() < current + 100); + assertEquals(ts.getLogical(), 1); } } diff --git a/src/test/java/org/tikv/common/PDMockServer.java b/src/test/java/org/tikv/common/PDMockServer.java index 723034f1e33..99ccb66bbb5 100644 --- a/src/test/java/org/tikv/common/PDMockServer.java +++ b/src/test/java/org/tikv/common/PDMockServer.java @@ -75,8 +75,17 @@ public void getMembers(GetMembersRequest request, StreamObserver tso(StreamObserver resp) { return new StreamObserver() { - private int physical = 1; - private int logical = 0; + private long physical = System.currentTimeMillis(); + private long logical = 0; + + private void updateTso() { + logical++; + if (logical >= (1 << 18)) { + logical = 0; + physical++; + } + physical = Math.max(physical, System.currentTimeMillis()); + } @Override public void onNext(TsoRequest value) {} @@ -86,7 +95,8 @@ public void onError(Throwable t) {} @Override public void onCompleted() { - resp.onNext(GrpcUtils.makeTsoResponse(clusterId, physical++, logical++)); + updateTso(); + resp.onNext(GrpcUtils.makeTsoResponse(clusterId, physical, logical)); resp.onCompleted(); } }; diff --git a/src/test/java/org/tikv/common/RegionStoreClientTest.java b/src/test/java/org/tikv/common/RegionStoreClientTest.java index 1a03ad80e26..bb288c48aee 100644 --- a/src/test/java/org/tikv/common/RegionStoreClientTest.java +++ b/src/test/java/org/tikv/common/RegionStoreClientTest.java @@ -17,15 +17,16 @@ package org.tikv.common; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; +import static org.junit.Assert.*; import com.google.common.collect.ImmutableList; import com.google.protobuf.ByteString; import java.util.List; import java.util.Optional; import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.tikv.common.exception.KeyException; import org.tikv.common.region.RegionManager; import org.tikv.common.region.RegionStoreClient; import org.tikv.common.region.RegionStoreClient.RegionStoreClientBuilder; @@ -40,6 +41,7 @@ import org.tikv.kvproto.Metapb; public class RegionStoreClientTest extends MockServerTest { + private static final Logger logger = LoggerFactory.getLogger(MockServerTest.class); private RegionStoreClient createClientV2() { return createClient("2.1.19"); @@ -49,6 +51,10 @@ private RegionStoreClient createClientV3() { return createClient("3.0.12"); } + private RegionStoreClient createClientV4() { + return createClient("6.1.0"); + } + private RegionStoreClient createClient(String version) { Metapb.Store meta = Metapb.Store.newBuilder() @@ -161,30 +167,130 @@ public void doBatchGetTest(RegionStoreClient client) { @Test public void scanTest() { - doScanTest(createClientV3()); + doScanTest(createClientV4()); } public void doScanTest(RegionStoreClient client) { + Long startTs = session.getTimestamp().getVersion(); + server.put("key1", "value1"); server.put("key2", "value2"); server.put("key4", "value4"); server.put("key5", "value5"); - List kvs = client.scan(defaultBackOff(), ByteString.copyFromUtf8("key2"), 1); - assertEquals(3, kvs.size()); + + // put lock will expire in 1s + ByteString key6 = ByteString.copyFromUtf8("key6"); + server.putWithLock(key6, ByteString.copyFromUtf8("value6"), key6, startTs, 100L); + server.putTxnStatus(startTs, 0L, key6); + assertTrue(server.hasLock(key6)); + + List kvs = + client.scan( + defaultBackOff(), ByteString.copyFromUtf8("key2"), session.getTimestamp().getVersion()); + assertEquals(4, kvs.size()); kvs.forEach( kv -> assertEquals( kv.getKey().toStringUtf8().replace("key", "value"), kv.getValue().toStringUtf8())); + assertFalse(server.hasLock(key6)); + // put region error server.putError( "error1", () -> Errorpb.Error.newBuilder().setServerIsBusy(ServerIsBusy.getDefaultInstance())); try { - client.scan(defaultBackOff(), ByteString.copyFromUtf8("error1"), 1); + client.scan( + defaultBackOff(), ByteString.copyFromUtf8("error1"), session.getTimestamp().getVersion()); fail(); } catch (Exception e) { assertTrue(true); } + server.removeError("error1"); + + // put lock + Long startTs7 = session.getTimestamp().getVersion(); + ByteString key7 = ByteString.copyFromUtf8("key7"); + server.putWithLock(key7, ByteString.copyFromUtf8("value7"), key7, startTs7, 3000L); + server.putTxnStatus(startTs7, 0L, key7); + assertTrue(server.hasLock(key7)); + try { + client.scan( + defaultBackOff(), ByteString.copyFromUtf8("key2"), session.getTimestamp().getVersion()); + fail(); + } catch (Exception e) { + KeyException keyException = (KeyException) e.getCause(); + assertTrue(keyException.getMessage().contains("org.tikv.txn.Lock")); + } + assertTrue(server.hasLock(key7)); + + server.clearAllMap(); + client.close(); + } + + @Test + public void resolveLocksTest() { + doResolveLocksTest(createClientV4()); + } + + public void doResolveLocksTest(RegionStoreClient client) { + ByteString primaryKey = ByteString.copyFromUtf8("primary"); + server.put(primaryKey, ByteString.copyFromUtf8("value0")); + + // get with committed lock + { + Long startTs = session.getTimestamp().getVersion(); + Long commitTs = session.getTimestamp().getVersion(); + logger.info("startTs: " + startTs); + + ByteString key1 = ByteString.copyFromUtf8("key1"); + ByteString value1 = ByteString.copyFromUtf8("value1"); + server.putWithLock(key1, value1, primaryKey, startTs, 1L); + server.putTxnStatus(startTs, commitTs); + assertTrue(server.hasLock(key1)); + + ByteString expected1 = client.get(defaultBackOff(), key1, 200); + assertEquals(value1, expected1); + assertFalse(server.hasLock(key1)); + } + + // get with not expired lock. + { + Long startTs = session.getTimestamp().getVersion(); + logger.info("startTs: " + startTs); + + ByteString key2 = ByteString.copyFromUtf8("key2"); + ByteString value2 = ByteString.copyFromUtf8("value2"); + server.putWithLock(key2, value2, key2, startTs, 3000L); + server.putTxnStatus(startTs, 0L, key2); + assertTrue(server.hasLock(key2)); + + try { + client.get(defaultBackOff(), key2, session.getTimestamp().getVersion()); + fail(); + } catch (Exception e) { + KeyException keyException = (KeyException) e.getCause(); + assertTrue(keyException.getMessage().contains("org.tikv.txn.Lock")); + } + assertTrue(server.hasLock(key2)); + } + + // get with expired lock. + { + Long startTs = session.getTimestamp().getVersion(); + logger.info("startTs: " + startTs); + + ByteString key3 = ByteString.copyFromUtf8("key3"); + ByteString value3 = ByteString.copyFromUtf8("value3"); + server.putWithLock(key3, value3, key3, startTs, 100L); + server.putTxnStatus(startTs, 0L, key3); + assertTrue(server.hasLock(key3)); + + ByteString expected3 = + client.get(defaultBackOff(), key3, session.getTimestamp().getVersion()); + assertEquals(expected3, value3); + assertFalse(server.hasLock(key3)); + } + server.clearAllMap(); client.close(); } From 3e02966ac1467ac5e9703c22703c1fb337049b52 Mon Sep 17 00:00:00 2001 From: shi yuhang <52435083+shiyuhang0@users.noreply.github.com> Date: Thu, 29 Dec 2022 15:22:24 +0800 Subject: [PATCH 09/22] [close #693] Add test for RangeSplitter (#694) Signed-off-by: shiyuhang <1136742008@qq.com> --- .../org/tikv/common/MockRegionManager.java | 89 ++++++ .../java/org/tikv/util/RangeSplitterTest.java | 258 ++++++++++++++++++ 2 files changed, 347 insertions(+) create mode 100644 src/test/java/org/tikv/common/MockRegionManager.java create mode 100644 src/test/java/org/tikv/util/RangeSplitterTest.java diff --git a/src/test/java/org/tikv/common/MockRegionManager.java b/src/test/java/org/tikv/common/MockRegionManager.java new file mode 100644 index 00000000000..cac65312d0b --- /dev/null +++ b/src/test/java/org/tikv/common/MockRegionManager.java @@ -0,0 +1,89 @@ +/* + * Copyright 2022 TiKV Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.tikv.common; + +import static org.tikv.common.GrpcUtils.encodeKey; + +import com.google.common.collect.ImmutableList; +import com.google.protobuf.ByteString; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; +import org.tikv.common.apiversion.RequestKeyV1TxnCodec; +import org.tikv.common.key.Key; +import org.tikv.common.region.RegionManager; +import org.tikv.common.region.TiRegion; +import org.tikv.common.region.TiStore; +import org.tikv.common.region.TiStoreType; +import org.tikv.common.util.KeyRangeUtils; +import org.tikv.common.util.Pair; +import org.tikv.kvproto.Coprocessor.KeyRange; +import org.tikv.kvproto.Kvrpcpb.CommandPri; +import org.tikv.kvproto.Kvrpcpb.IsolationLevel; +import org.tikv.kvproto.Metapb; +import org.tikv.kvproto.Metapb.Peer; +import org.tikv.kvproto.Metapb.Region; + +public class MockRegionManager extends RegionManager { + + private final Map mockRegionMap; + + private static TiRegion region(long id, KeyRange range) { + RequestKeyV1TxnCodec v1 = new RequestKeyV1TxnCodec(); + + TiConfiguration configuration = new TiConfiguration(); + configuration.setIsolationLevel(IsolationLevel.RC); + configuration.setCommandPriority(CommandPri.Low); + Region r = + Metapb.Region.newBuilder() + .setRegionEpoch(Metapb.RegionEpoch.newBuilder().setConfVer(1).setVersion(2)) + .setId(id) + .setStartKey(encodeKey(range.getStart().toByteArray())) + .setEndKey(encodeKey(range.getEnd().toByteArray())) + .addPeers(Peer.getDefaultInstance()) + .build(); + + List s = ImmutableList.of(Metapb.Store.newBuilder().setId(id).build()); + + return new TiRegion( + configuration, + v1.decodeRegion(r), + null, + r.getPeersList(), + s.stream().map(TiStore::new).collect(Collectors.toList())); + } + + public MockRegionManager(List ranges) { + super(null, null); + mockRegionMap = + ranges.stream().collect(Collectors.toMap(kr -> kr, kr -> region(ranges.indexOf(kr), kr))); + } + + @Override + public Pair getRegionStorePairByKey(ByteString key, TiStoreType storeType) { + for (Map.Entry entry : mockRegionMap.entrySet()) { + KeyRange range = entry.getKey(); + if (KeyRangeUtils.makeRange(range.getStart(), range.getEnd()).contains(Key.toRawKey(key))) { + TiRegion region = entry.getValue(); + return Pair.create( + region, new TiStore(Metapb.Store.newBuilder().setId(region.getId()).build())); + } + } + return null; + } +} diff --git a/src/test/java/org/tikv/util/RangeSplitterTest.java b/src/test/java/org/tikv/util/RangeSplitterTest.java new file mode 100644 index 00000000000..7207f959d43 --- /dev/null +++ b/src/test/java/org/tikv/util/RangeSplitterTest.java @@ -0,0 +1,258 @@ +/* + * Copyright 2022 TiKV Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.tikv.util; + +import static org.junit.Assert.assertEquals; + +import com.google.common.collect.ImmutableList; +import com.google.protobuf.ByteString; +import gnu.trove.list.array.TLongArrayList; +import gnu.trove.map.hash.TLongObjectHashMap; +import java.util.ArrayList; +import java.util.List; +import org.junit.Test; +import org.tikv.common.MockRegionManager; +import org.tikv.common.codec.Codec.IntegerCodec; +import org.tikv.common.codec.CodecDataOutput; +import org.tikv.common.key.RowKey; +import org.tikv.common.key.RowKey.DecodeResult.Status; +import org.tikv.common.util.RangeSplitter; +import org.tikv.kvproto.Coprocessor.KeyRange; + +public class RangeSplitterTest { + + private static KeyRange keyRange(Long s, Long e) { + ByteString sKey = ByteString.EMPTY; + ByteString eKey = ByteString.EMPTY; + if (s != null) { + CodecDataOutput cdo = new CodecDataOutput(); + IntegerCodec.writeLongFully(cdo, s, true); + sKey = cdo.toByteString(); + } + + if (e != null) { + CodecDataOutput cdo = new CodecDataOutput(); + IntegerCodec.writeLongFully(cdo, e, true); + eKey = cdo.toByteString(); + } + + return KeyRange.newBuilder().setStart(sKey).setEnd(eKey).build(); + } + + private static KeyRange keyRangeByHandle(long tableId, Long s, Long e) { + return keyRangeByHandle(tableId, s, Status.EQUAL, e, Status.EQUAL); + } + + private static KeyRange keyRangeByHandle(long tableId, Long s, Status ss, Long e, Status es) { + ByteString sKey = shiftByStatus(handleToByteString(tableId, s), ss); + ByteString eKey = shiftByStatus(handleToByteString(tableId, e), es); + + return KeyRange.newBuilder().setStart(sKey).setEnd(eKey).build(); + } + + private static ByteString shiftByStatus(ByteString v, Status s) { + switch (s) { + case EQUAL: + return v; + case LESS: + return v.substring(0, v.size() - 1); + case GREATER: + return v.concat(ByteString.copyFrom(new byte[] {1, 0})); + default: + throw new IllegalArgumentException("Only EQUAL,LESS,GREATER allowed"); + } + } + + private static ByteString handleToByteString(long tableId, Long k) { + if (k != null) { + return RowKey.toRowKey(tableId, k).toByteString(); + } + return ByteString.EMPTY; + } + + @Test + public void splitRangeByRegionTest() { + MockRegionManager mgr = + new MockRegionManager( + ImmutableList.of(keyRange(null, 30L), keyRange(30L, 50L), keyRange(50L, null))); + RangeSplitter s = RangeSplitter.newSplitter(mgr); + List tasks = + s.splitRangeByRegion( + ImmutableList.of( + keyRange(0L, 40L), keyRange(41L, 42L), keyRange(45L, 50L), keyRange(70L, 1000L))); + + assertEquals(tasks.get(0).getRegion().getId(), 0); + assertEquals(tasks.get(0).getRanges().size(), 1); + KeyRange range = tasks.get(0).getRanges().get(0); + assertEquals(tasks.get(0).getRanges().get(0), keyRange(0L, 30L)); + + assertEquals(tasks.get(1).getRegion().getId(), 1); + assertEquals(tasks.get(1).getRanges().get(0), keyRange(30L, 40L)); + assertEquals(tasks.get(1).getRanges().get(1), keyRange(41L, 42L)); + assertEquals(tasks.get(1).getRanges().get(2), keyRange(45L, 50L)); + assertEquals(tasks.get(1).getRanges().size(), 3); + + assertEquals(tasks.get(2).getRegion().getId(), 2); + assertEquals(tasks.get(2).getRanges().size(), 1); + assertEquals(tasks.get(2).getRanges().get(0), keyRange(70L, 1000L)); + } + + @Test + public void splitAndSortHandlesByRegionTest() { + final long tableId = 1; + List handles = new ArrayList<>(); + handles.add(1L); + handles.add(5L); + handles.add(4L); + handles.add(3L); + handles.add(10L); + handles.add(2L); + handles.add(100L); + handles.add(101L); + handles.add(99L); + handles.add(88L); + handles.add(-1L); + handles.add(-255L); + handles.add(-100L); + handles.add(-99L); + handles.add(-98L); + handles.add(Long.MIN_VALUE); + handles.add(8960L); + handles.add(8959L); + handles.add(19999L); + handles.add(15001L); + + MockRegionManager mgr = + new MockRegionManager( + ImmutableList.of( + keyRangeByHandle(tableId, null, Status.EQUAL, -100L, Status.EQUAL), + keyRangeByHandle(tableId, -100L, Status.EQUAL, 10L, Status.GREATER), + keyRangeByHandle(tableId, 10L, Status.GREATER, 50L, Status.EQUAL), + keyRangeByHandle(tableId, 50L, Status.EQUAL, 100L, Status.GREATER), + keyRangeByHandle(tableId, 100L, Status.GREATER, 9000L, Status.LESS), + keyRangeByHandle(tableId, 0x2300L /*8960*/, Status.LESS, 16000L, Status.EQUAL), + keyRangeByHandle(tableId, 16000L, Status.EQUAL, null, Status.EQUAL))); + + RangeSplitter s = RangeSplitter.newSplitter(mgr); + List tasks = + new ArrayList<>( + s.splitAndSortHandlesByRegion( + ImmutableList.of(tableId), + new TLongArrayList(handles.stream().mapToLong(t -> t).toArray()))); + tasks.sort( + (l, r) -> { + Long regionIdLeft = l.getRegion().getId(); + Long regionIdRight = r.getRegion().getId(); + return regionIdLeft.compareTo(regionIdRight); + }); + + // [-INF, -100): [Long.MIN_VALUE, Long.MIN_VALUE + 1), [-255, -254) + assertEquals(tasks.get(0).getRegion().getId(), 0); + assertEquals(tasks.get(0).getRanges().size(), 2); + assertEquals( + tasks.get(0).getRanges().get(0), + keyRangeByHandle(tableId, Long.MIN_VALUE, Long.MIN_VALUE + 1)); + assertEquals(tasks.get(0).getRanges().get(1), keyRangeByHandle(tableId, -255L, -254L)); + + // [-100, 10.x): [-100, -97), [-1, 0), [1, 6), [10, 11) + assertEquals(tasks.get(1).getRegion().getId(), 1); + assertEquals(tasks.get(1).getRanges().size(), 4); + assertEquals(tasks.get(1).getRanges().get(0), keyRangeByHandle(tableId, -100L, -97L)); + assertEquals(tasks.get(1).getRanges().get(1), keyRangeByHandle(tableId, -1L, 0L)); + assertEquals(tasks.get(1).getRanges().get(2), keyRangeByHandle(tableId, 1L, 6L)); + assertEquals(tasks.get(1).getRanges().get(3), keyRangeByHandle(tableId, 10L, 11L)); + + // [10.x, 50): empty + // [50, 100.x): [88, 89) [99, 101) + assertEquals(tasks.get(2).getRegion().getId(), 3); + assertEquals(tasks.get(2).getRanges().size(), 2); + assertEquals(tasks.get(2).getRanges().get(0), keyRangeByHandle(tableId, 88L, 89L)); + assertEquals(tasks.get(2).getRanges().get(1), keyRangeByHandle(tableId, 99L, 101L)); + + // [100.x, less than 8960): [101, 102) [8959, 8960) + assertEquals(tasks.get(3).getRegion().getId(), 4); + assertEquals(tasks.get(3).getRanges().size(), 2); + assertEquals(tasks.get(3).getRanges().get(0), keyRangeByHandle(tableId, 101L, 102L)); + assertEquals(tasks.get(3).getRanges().get(1), keyRangeByHandle(tableId, 8959L, 8960L)); + + // [less than 8960, 16000): [9000, 9001), [15001, 15002) + assertEquals(tasks.get(4).getRegion().getId(), 5); + assertEquals(tasks.get(4).getRanges().size(), 2); + assertEquals(tasks.get(4).getRanges().get(0), keyRangeByHandle(tableId, 8960L, 8961L)); + assertEquals(tasks.get(4).getRanges().get(1), keyRangeByHandle(tableId, 15001L, 15002L)); + + // [16000, INF): [19999, 20000) + assertEquals(tasks.get(5).getRegion().getId(), 6); + assertEquals(tasks.get(5).getRanges().size(), 1); + assertEquals(tasks.get(5).getRanges().get(0), keyRangeByHandle(tableId, 19999L, 20000L)); + } + + @Test + public void groupByAndSortHandlesByRegionIdTest() { + final long tableId = 1; + List handles = new ArrayList<>(); + handles.add(1L); + handles.add(5L); + handles.add(4L); + handles.add(3L); + handles.add(10L); + handles.add(11L); + handles.add(12L); + handles.add(2L); + handles.add(100L); + handles.add(101L); + handles.add(99L); + handles.add(88L); + handles.add(-1L); + handles.add(-255L); + handles.add(-100L); + handles.add(-99L); + handles.add(-98L); + handles.add(Long.MIN_VALUE); + handles.add(8960L); + handles.add(8959L); + handles.add(19999L); + handles.add(15001L); + handles.add(99999999999L); + handles.add(Long.MAX_VALUE); + + MockRegionManager mgr = + new MockRegionManager( + ImmutableList.of( + keyRangeByHandle(tableId, null, Status.EQUAL, -100L, Status.EQUAL), + keyRangeByHandle(tableId, -100L, Status.EQUAL, 10L, Status.GREATER), + keyRangeByHandle(tableId, 10L, Status.GREATER, 50L, Status.EQUAL), + keyRangeByHandle(tableId, 50L, Status.EQUAL, 100L, Status.GREATER), + keyRangeByHandle(tableId, 100L, Status.GREATER, 9000L, Status.LESS), + keyRangeByHandle(tableId, 0x2300L /*8960*/, Status.LESS, 16000L, Status.EQUAL), + keyRangeByHandle(tableId, 16000L, Status.EQUAL, null, Status.EQUAL))); + + TLongObjectHashMap result = new TLongObjectHashMap<>(); + RangeSplitter.newSplitter(mgr) + .groupByAndSortHandlesByRegionId( + tableId, new TLongArrayList(handles.stream().mapToLong(t -> t).toArray())) + .forEach((k, v) -> result.put(k.first.getId(), v)); + assertEquals(2, result.get(0).size()); + assertEquals(10, result.get(1).size()); + assertEquals(2, result.get(2).size()); + assertEquals(3, result.get(3).size()); + assertEquals(2, result.get(4).size()); + assertEquals(2, result.get(5).size()); + assertEquals(3, result.get(6).size()); + } +} From d278e3ad19a5b4640e4a5d91de7ca47342449e0d Mon Sep 17 00:00:00 2001 From: shi yuhang <52435083+shiyuhang0@users.noreply.github.com> Date: Fri, 30 Dec 2022 17:22:04 +0800 Subject: [PATCH 10/22] [close #699] Rebase release 3.3.1.11 (#704) Co-authored-by: iosmanthus Co-authored-by: Xiaoguang Sun close https://github.com/tikv/client-java/issues/699 --- pom.xml | 11 ++- .../java/io/grpc/internal/ClientCallImpl.java | 22 +++-- src/main/java/io/netty/buffer/PoolArena.java | 91 +++++++++++-------- .../org/tikv/common/AbstractGRPCClient.java | 8 +- src/main/java/org/tikv/common/PDClient.java | 28 ++++-- .../common/operation/RegionErrorHandler.java | 7 ++ .../org/tikv/common/util/BackOffFunction.java | 3 +- .../tikv/common/util/ConcreteBackOffer.java | 7 ++ .../org/tikv/common/TsoBatchUsedUpTest.java | 46 ++++++++++ 9 files changed, 163 insertions(+), 60 deletions(-) create mode 100644 src/test/java/org/tikv/common/TsoBatchUsedUpTest.java diff --git a/pom.xml b/pom.xml index c771f5f4171..1b98009f300 100644 --- a/pom.xml +++ b/pom.xml @@ -57,7 +57,7 @@ 3.5.1 1.2.17 1.7.16 - 1.38.0 + 1.48.0 2.0.34.Final 2.8.9 1.6.6 @@ -75,12 +75,12 @@ com.google.protobuf protobuf-java - 3.16.1 + 3.19.6 com.google.protobuf protobuf-java-util - 3.16.1 + 3.19.6 io.perfmark @@ -232,6 +232,11 @@ 3.9 compile + + commons-codec + commons-codec + 1.15 + org.apache.httpcomponents httpclient diff --git a/src/main/java/io/grpc/internal/ClientCallImpl.java b/src/main/java/io/grpc/internal/ClientCallImpl.java index 5b8d2f6ba25..75b769d6f80 100644 --- a/src/main/java/io/grpc/internal/ClientCallImpl.java +++ b/src/main/java/io/grpc/internal/ClientCallImpl.java @@ -24,6 +24,7 @@ import static io.grpc.Status.DEADLINE_EXCEEDED; import static io.grpc.internal.GrpcUtil.CONTENT_ACCEPT_ENCODING_KEY; import static io.grpc.internal.GrpcUtil.CONTENT_ENCODING_KEY; +import static io.grpc.internal.GrpcUtil.CONTENT_LENGTH_KEY; import static io.grpc.internal.GrpcUtil.MESSAGE_ACCEPT_ENCODING_KEY; import static io.grpc.internal.GrpcUtil.MESSAGE_ENCODING_KEY; import static java.lang.Math.max; @@ -33,6 +34,7 @@ import io.grpc.Attributes; import io.grpc.CallOptions; import io.grpc.ClientCall; +import io.grpc.ClientStreamTracer; import io.grpc.Codec; import io.grpc.Compressor; import io.grpc.CompressorRegistry; @@ -166,6 +168,7 @@ static void prepareHeaders( DecompressorRegistry decompressorRegistry, Compressor compressor, boolean fullStreamDecompression) { + headers.discardAll(CONTENT_LENGTH_KEY); headers.discardAll(MESSAGE_ENCODING_KEY); if (compressor != Codec.Identity.NONE) { headers.put(MESSAGE_ENCODING_KEY, compressor.getMessageEncoding()); @@ -260,10 +263,13 @@ public void runInContext() { effectiveDeadline, context.getDeadline(), callOptions.getDeadline()); stream = clientStreamProvider.newStream(method, callOptions, headers, context); } else { + ClientStreamTracer[] tracers = + GrpcUtil.getClientStreamTracers(callOptions, headers, 0, false); stream = new FailingClientStream( DEADLINE_EXCEEDED.withDescription( - "ClientCall started after deadline exceeded: " + effectiveDeadline)); + "ClientCall started after deadline exceeded: " + effectiveDeadline), + tracers); } if (callExecutorIsDirect) { @@ -363,12 +369,14 @@ private static void logIfContextNarrowedTimeout( StringBuilder builder = new StringBuilder( String.format( - "Call timeout set to '%d' ns, due to context deadline.", effectiveTimeout)); + Locale.US, + "Call timeout set to '%d' ns, due to context deadline.", + effectiveTimeout)); if (callDeadline == null) { builder.append(" Explicit call timeout was not set."); } else { long callTimeout = callDeadline.timeRemaining(TimeUnit.NANOSECONDS); - builder.append(String.format(" Explicit call timeout was '%d' ns.", callTimeout)); + builder.append(String.format(Locale.US, " Explicit call timeout was '%d' ns.", callTimeout)); } log.fine(builder.toString()); @@ -562,6 +570,9 @@ public void setMessageCompression(boolean enabled) { @Override public boolean isReady() { + if (halfCloseCalled) { + return false; + } return stream.isReady(); } @@ -711,11 +722,6 @@ private void runInternal() { } } - @Override - public void closed(Status status, Metadata trailers) { - closed(status, RpcProgress.PROCESSED, trailers); - } - @Override public void closed(Status status, RpcProgress rpcProgress, Metadata trailers) { PerfMark.startTask("ClientStreamListener.closed", tag); diff --git a/src/main/java/io/netty/buffer/PoolArena.java b/src/main/java/io/netty/buffer/PoolArena.java index 66ac2ac85d4..fc6cbf258ae 100644 --- a/src/main/java/io/netty/buffer/PoolArena.java +++ b/src/main/java/io/netty/buffer/PoolArena.java @@ -57,7 +57,6 @@ enum SizeClass { final int numSmallSubpagePools; final int directMemoryCacheAlignment; - final int directMemoryCacheAlignmentMask; private final PoolSubpage[] smallSubpagePools; private final PoolChunkList q050; @@ -97,7 +96,6 @@ protected PoolArena( super(pageSize, pageShifts, chunkSize, cacheAlignment); this.parent = parent; directMemoryCacheAlignment = cacheAlignment; - directMemoryCacheAlignmentMask = cacheAlignment - 1; numSmallSubpagePools = nSubpages; smallSubpagePools = newSubpagePoolArray(numSmallSubpagePools); @@ -183,9 +181,9 @@ private void tcacheAllocateSmall( return; } - /** - * Synchronize on the head. This is needed as {@link PoolChunk#allocateSubpage(int)} and {@link - * PoolChunk#free(long)} may modify the doubly linked list as well. + /* + * Synchronize on the head. This is needed as {@link PoolChunk#allocateSubpage(int)} and + * {@link PoolChunk#free(long)} may modify the doubly linked list as well. */ final PoolSubpage head = smallSubpagePools[sizeIdx]; final boolean needsNormalAllocation; @@ -193,7 +191,13 @@ private void tcacheAllocateSmall( final PoolSubpage s = head.next; needsNormalAllocation = s == head; if (!needsNormalAllocation) { - assert s.doNotDestroy && s.elemSize == sizeIdx2size(sizeIdx); + assert s.doNotDestroy && s.elemSize == sizeIdx2size(sizeIdx) + : "doNotDestroy=" + + s.doNotDestroy + + ", elemSize=" + + s.elemSize + + ", sizeIdx=" + + sizeIdx; long handle = s.allocate(); assert handle >= 0; s.chunk.initBufWithSubpage(buf, null, handle, reqCapacity, cache); @@ -221,7 +225,7 @@ private void tcacheAllocateNormal( } } - // Method must be called inside synchronized(this) { ... } block + // Method must be called inside synchronized(this) { ... } block private void allocateNormal( PooledByteBuf buf, int reqCapacity, int sizeIdx, PoolThreadCache threadCache) { if (q050.allocate(buf, reqCapacity, sizeIdx, threadCache) @@ -272,7 +276,7 @@ void free( } } - private SizeClass sizeClass(long handle) { + private static SizeClass sizeClass(long handle) { return isSubpage(handle) ? SizeClass.Small : SizeClass.Normal; } @@ -499,6 +503,25 @@ public long numActiveBytes() { return max(0, val); } + /** + * Return the number of bytes that are currently pinned to buffer instances, by the arena. The + * pinned memory is not accessible for use by any other allocation, until the buffers using have + * all been released. + */ + public long numPinnedBytes() { + long val = + activeBytesHuge + .value(); // Huge chunks are exact-sized for the buffers they were allocated to. + synchronized (this) { + for (int i = 0; i < chunkListMetrics.size(); i++) { + for (PoolChunkMetric m : chunkListMetrics.get(i)) { + val += ((PoolChunk) m).pinnedBytes(); + } + } + } + return max(0, val); + } + protected abstract PoolChunk newChunk( int pageSize, int maxPageIdx, int pageShifts, int chunkSize); @@ -588,13 +611,8 @@ private void destroyPoolChunkLists(PoolChunkList... chunkLists) { static final class HeapArena extends PoolArena { - HeapArena( - PooledByteBufAllocator parent, - int pageSize, - int pageShifts, - int chunkSize, - int directMemoryCacheAlignment) { - super(parent, pageSize, pageShifts, chunkSize, directMemoryCacheAlignment); + HeapArena(PooledByteBufAllocator parent, int pageSize, int pageShifts, int chunkSize) { + super(parent, pageSize, pageShifts, chunkSize, 0); } private static byte[] newByteArray(int size) { @@ -610,12 +628,12 @@ boolean isDirect() { protected PoolChunk newChunk( int pageSize, int maxPageIdx, int pageShifts, int chunkSize) { return new PoolChunk( - this, newByteArray(chunkSize), pageSize, pageShifts, chunkSize, maxPageIdx, 0); + this, null, newByteArray(chunkSize), pageSize, pageShifts, chunkSize, maxPageIdx); } @Override protected PoolChunk newUnpooledChunk(int capacity) { - return new PoolChunk(this, newByteArray(capacity), capacity, 0); + return new PoolChunk(this, null, newByteArray(capacity), capacity); } @Override @@ -656,40 +674,33 @@ boolean isDirect() { return true; } - // mark as package-private, only for unit test - int offsetCacheLine(ByteBuffer memory) { - // We can only calculate the offset if Unsafe is present as otherwise directBufferAddress(...) - // will - // throw an NPE. - int remainder = - HAS_UNSAFE - ? (int) - (PlatformDependent.directBufferAddress(memory) & directMemoryCacheAlignmentMask) - : 0; - - // offset = alignment - address & (alignment - 1) - return directMemoryCacheAlignment - remainder; - } - @Override protected PoolChunk newChunk( int pageSize, int maxPageIdx, int pageShifts, int chunkSize) { if (directMemoryCacheAlignment == 0) { + ByteBuffer memory = allocateDirect(chunkSize); return new PoolChunk( - this, allocateDirect(chunkSize), pageSize, pageShifts, chunkSize, maxPageIdx, 0); + this, memory, memory, pageSize, pageShifts, chunkSize, maxPageIdx); } - final ByteBuffer memory = allocateDirect(chunkSize + directMemoryCacheAlignment); + + final ByteBuffer base = allocateDirect(chunkSize + directMemoryCacheAlignment); + final ByteBuffer memory = + PlatformDependent.alignDirectBuffer(base, directMemoryCacheAlignment); return new PoolChunk( - this, memory, pageSize, pageShifts, chunkSize, maxPageIdx, offsetCacheLine(memory)); + this, base, memory, pageSize, pageShifts, chunkSize, maxPageIdx); } @Override protected PoolChunk newUnpooledChunk(int capacity) { if (directMemoryCacheAlignment == 0) { - return new PoolChunk(this, allocateDirect(capacity), capacity, 0); + ByteBuffer memory = allocateDirect(capacity); + return new PoolChunk(this, memory, memory, capacity); } - final ByteBuffer memory = allocateDirect(capacity + directMemoryCacheAlignment); - return new PoolChunk(this, memory, capacity, offsetCacheLine(memory)); + + final ByteBuffer base = allocateDirect(capacity + directMemoryCacheAlignment); + final ByteBuffer memory = + PlatformDependent.alignDirectBuffer(base, directMemoryCacheAlignment); + return new PoolChunk(this, base, memory, capacity); } private static ByteBuffer allocateDirect(int capacity) { @@ -701,9 +712,9 @@ private static ByteBuffer allocateDirect(int capacity) { @Override protected void destroyChunk(PoolChunk chunk) { if (PlatformDependent.useDirectBufferNoCleaner()) { - PlatformDependent.freeDirectNoCleaner(chunk.memory); + PlatformDependent.freeDirectNoCleaner((ByteBuffer) chunk.base); } else { - PlatformDependent.freeDirectBuffer(chunk.memory); + PlatformDependent.freeDirectBuffer((ByteBuffer) chunk.base); } } diff --git a/src/main/java/org/tikv/common/AbstractGRPCClient.java b/src/main/java/org/tikv/common/AbstractGRPCClient.java index ac68552f7e2..6e2d9a7b5b8 100644 --- a/src/main/java/org/tikv/common/AbstractGRPCClient.java +++ b/src/main/java/org/tikv/common/AbstractGRPCClient.java @@ -193,13 +193,17 @@ private boolean doCheckHealth(BackOffer backOffer, String addressStr, HostMappin HealthCheckResponse resp = stub.check(req); return resp.getStatus() == HealthCheckResponse.ServingStatus.SERVING; } catch (Exception e) { - logger.warn("check health failed.", e); + logger.warn("check health failed, addr: {}, caused by: {}", addressStr, e.getMessage()); backOffer.doBackOff(BackOffFuncType.BoCheckHealth, e); } } } protected boolean checkHealth(BackOffer backOffer, String addressStr, HostMapping hostMapping) { - return doCheckHealth(backOffer, addressStr, hostMapping); + try { + return doCheckHealth(backOffer, addressStr, hostMapping); + } catch (Exception e) { + return false; + } } } diff --git a/src/main/java/org/tikv/common/PDClient.java b/src/main/java/org/tikv/common/PDClient.java index 1568a78e0ef..43383bbda87 100644 --- a/src/main/java/org/tikv/common/PDClient.java +++ b/src/main/java/org/tikv/common/PDClient.java @@ -462,14 +462,19 @@ private GetMembersResponse doGetMembers(BackOffer backOffer, URI uri) { } return resp; } catch (Exception e) { - logger.warn("failed to get member from pd server.", e); + logger.warn( + "failed to get member from pd server from {}, caused by: {}", uri, e.getMessage()); backOffer.doBackOff(BackOffFuncType.BoPDRPC, e); } } } private GetMembersResponse getMembers(BackOffer backOffer, URI uri) { - return doGetMembers(backOffer, uri); + try { + return doGetMembers(backOffer, uri); + } catch (Exception e) { + return null; + } } // return whether the leader has changed to target address `leaderUrlStr`. @@ -524,13 +529,16 @@ synchronized boolean createFollowerClientWrapper( public void tryUpdateLeaderOrForwardFollower() { if (updateLeaderNotify.compareAndSet(false, true)) { try { - BackOffer backOffer = defaultBackOffer(); updateLeaderService.submit( () -> { try { - updateLeaderOrForwardFollower(backOffer); + updateLeaderOrForwardFollower(); + } catch (Exception e) { + logger.info("update leader or forward follower failed", e); + throw e; } finally { updateLeaderNotify.set(false); + logger.info("updating leader finish"); } }); } catch (RejectedExecutionException e) { @@ -540,11 +548,13 @@ public void tryUpdateLeaderOrForwardFollower() { } } - private synchronized void updateLeaderOrForwardFollower(BackOffer backOffer) { + private synchronized void updateLeaderOrForwardFollower() { + logger.warn("updating leader or forward follower"); if (System.currentTimeMillis() - lastUpdateLeaderTime < MIN_TRY_UPDATE_DURATION) { return; } for (URI url : this.pdAddrs) { + BackOffer backOffer = this.probeBackOffer(); // since resp is null, we need update leader's address by walking through all pd server. GetMembersResponse resp = getMembers(backOffer, url); if (resp == null) { @@ -602,8 +612,9 @@ && createFollowerClientWrapper(backOffer, followerUrlStr, leaderUrlStr)) { } public void tryUpdateLeader() { + logger.info("try update leader"); for (URI url : this.pdAddrs) { - BackOffer backOffer = defaultBackOffer(); + BackOffer backOffer = this.probeBackOffer(); // since resp is null, we need update leader's address by walking through all pd server. GetMembersResponse resp = getMembers(backOffer, url); if (resp == null) { @@ -856,4 +867,9 @@ public RequestKeyCodec getCodec() { private static BackOffer defaultBackOffer() { return ConcreteBackOffer.newCustomBackOff(BackOffer.PD_INFO_BACKOFF); } + + private BackOffer probeBackOffer() { + int maxSleep = (int) getTimeout() * 2; + return ConcreteBackOffer.newCustomBackOff(maxSleep); + } } diff --git a/src/main/java/org/tikv/common/operation/RegionErrorHandler.java b/src/main/java/org/tikv/common/operation/RegionErrorHandler.java index 7e7eff2b9dc..c30f7b6f6fc 100644 --- a/src/main/java/org/tikv/common/operation/RegionErrorHandler.java +++ b/src/main/java/org/tikv/common/operation/RegionErrorHandler.java @@ -31,6 +31,7 @@ import org.tikv.common.region.RegionManager; import org.tikv.common.region.TiRegion; import org.tikv.common.util.BackOffFunction; +import org.tikv.common.util.BackOffFunction.BackOffFuncType; import org.tikv.common.util.BackOffer; import org.tikv.kvproto.Errorpb; import org.tikv.kvproto.Metapb; @@ -168,6 +169,12 @@ public boolean handleRegionError(BackOffer backOffer, Errorpb.Error error) { regionManager.clearRegionCache(); throw new StatusRuntimeException(Status.UNKNOWN.withDescription(error.toString())); } + // The tso cache is used up in TiKV servers, we should backoff and wait its cache is renewed. + else if (error.getMessage().contains("TsoBatchUsedUp")) { + logger.warn(String.format("tso batch used up for region [%s]", recv.getRegion())); + backOffer.doBackOff(BackOffFuncType.BoTsoBatchUsedUp, new GrpcException(error.getMessage())); + return true; + } logger.warn(String.format("Unknown error %s for region [%s]", error, recv.getRegion())); // For other errors, we only drop cache here. diff --git a/src/main/java/org/tikv/common/util/BackOffFunction.java b/src/main/java/org/tikv/common/util/BackOffFunction.java index 3eb07a725a9..33ccde27dc1 100644 --- a/src/main/java/org/tikv/common/util/BackOffFunction.java +++ b/src/main/java/org/tikv/common/util/BackOffFunction.java @@ -81,6 +81,7 @@ public enum BackOffFuncType { BoServerBusy, BoTxnNotFound, BoCheckTimeout, - BoCheckHealth + BoCheckHealth, + BoTsoBatchUsedUp } } diff --git a/src/main/java/org/tikv/common/util/ConcreteBackOffer.java b/src/main/java/org/tikv/common/util/ConcreteBackOffer.java index cef280567c6..39b65474040 100644 --- a/src/main/java/org/tikv/common/util/ConcreteBackOffer.java +++ b/src/main/java/org/tikv/common/util/ConcreteBackOffer.java @@ -174,6 +174,13 @@ private BackOffFunction createBackOffFunc(BackOffFunction.BackOffFuncType funcTy case BoCheckHealth: backOffFunction = BackOffFunction.create(100, 600, BackOffStrategy.EqualJitter); break; + case BoTsoBatchUsedUp: + backOffFunction = + BackOffFunction.create( + TiConfiguration.getInt(TIKV_BO_REGION_MISS_BASE_IN_MS), + 500, + BackOffStrategy.NoJitter); + break; } return backOffFunction; } diff --git a/src/test/java/org/tikv/common/TsoBatchUsedUpTest.java b/src/test/java/org/tikv/common/TsoBatchUsedUpTest.java new file mode 100644 index 00000000000..cda984f823f --- /dev/null +++ b/src/test/java/org/tikv/common/TsoBatchUsedUpTest.java @@ -0,0 +1,46 @@ +/* + * Copyright 2022 TiKV Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.tikv.common; + +import com.google.protobuf.ByteString; +import org.junit.Assert; +import org.junit.Test; +import org.tikv.kvproto.Errorpb.Error; +import org.tikv.raw.RawKVClient; + +public class TsoBatchUsedUpTest extends MockThreeStoresTest { + RawKVClient createClient() { + return session.createRawClient(); + } + + @Test + public void testTsoBatchUsedUp() { + ByteString key = ByteString.copyFromUtf8("tso"); + servers.get(0).putError("tso", () -> Error.newBuilder().setMessage("TsoBatchUsedUp")); + try (RawKVClient client = createClient()) { + try { + client.put(key, ByteString.EMPTY); + Assert.fail(); + } catch (Exception ignore) { + } + pdServers.get(0).addGetRegionListener(request -> null); + // Will not clean region cache + Assert.assertNotNull(session.getRegionManager().getRegionByKey(key)); + } + } +} From dda1029b94216ce7d97353df55ca514c1977b8e3 Mon Sep 17 00:00:00 2001 From: tomato <38561029+qidi1@users.noreply.github.com> Date: Mon, 30 Jan 2023 15:17:42 +0800 Subject: [PATCH 11/22] [close #654] Add RegionCacheInvalidCallBack (#653) * Close #654 To let the upper layers customize their own behavior when the region cache fails, Add RegionCacheInvalidCallBack. Signed-off-by: qidi1 <1083369179@qq.com> * change callback to list Signed-off-by: qidi1 <1083369179@qq.com> * format code Signed-off-by: qidi1 <1083369179@qq.com> * change as comment Signed-off-by: qidi1 <1083369179@qq.com> * change to synchronized Signed-off-by: qidi1 <1083369179@qq.com> * change list to copy on write Signed-off-by: qidi1 <1083369179@qq.com> * change to muti thread Signed-off-by: qidi1 <1083369179@qq.com> * format code Signed-off-by: qidi1 <1083369179@qq.com> * add comment Signed-off-by: qidi1 <1083369179@qq.com> * change to magical num Signed-off-by: qidi1 <1083369179@qq.com> * add comment Signed-off-by: qidi1 <1083369179@qq.com> * change log levle Signed-off-by: qidi1 <1083369179@qq.com> * Fmt --------- Signed-off-by: qidi1 <1083369179@qq.com> Co-authored-by: shi yuhang <52435083+shiyuhang0@users.noreply.github.com> --- .../common/event/CacheInvalidateEvent.java | 2 + .../common/operation/RegionErrorHandler.java | 71 ++++++++-- .../org/tikv/common/region/RegionManager.java | 54 ++++++-- .../tikv/common/CacheInvalidCallBackTest.java | 130 ++++++++++++++++++ 4 files changed, 240 insertions(+), 17 deletions(-) create mode 100644 src/test/java/org/tikv/common/CacheInvalidCallBackTest.java diff --git a/src/main/java/org/tikv/common/event/CacheInvalidateEvent.java b/src/main/java/org/tikv/common/event/CacheInvalidateEvent.java index 10d21942c91..ca7d73bac30 100644 --- a/src/main/java/org/tikv/common/event/CacheInvalidateEvent.java +++ b/src/main/java/org/tikv/common/event/CacheInvalidateEvent.java @@ -97,6 +97,8 @@ public String toString() { public enum CacheType implements Serializable { REGION_STORE, + STORE, + REGION, REQ_FAILED, LEADER } diff --git a/src/main/java/org/tikv/common/operation/RegionErrorHandler.java b/src/main/java/org/tikv/common/operation/RegionErrorHandler.java index c30f7b6f6fc..df95471ffa7 100644 --- a/src/main/java/org/tikv/common/operation/RegionErrorHandler.java +++ b/src/main/java/org/tikv/common/operation/RegionErrorHandler.java @@ -21,10 +21,13 @@ import io.grpc.StatusRuntimeException; import java.util.ArrayList; import java.util.List; +import java.util.concurrent.ExecutorService; import java.util.function.Function; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.tikv.common.codec.KeyUtils; +import org.tikv.common.event.CacheInvalidateEvent; +import org.tikv.common.event.CacheInvalidateEvent.CacheType; import org.tikv.common.exception.GrpcException; import org.tikv.common.exception.TiKVException; import org.tikv.common.region.RegionErrorReceiver; @@ -43,6 +46,11 @@ public class RegionErrorHandler implements ErrorHandler { private final Function getRegionError; private final RegionManager regionManager; private final RegionErrorReceiver recv; + private final List> cacheInvalidateCallBackList; + + private final ExecutorService callBackThreadPool; + private final int INVALID_STORE_ID = 0; + private final int INVALID_REGION_ID = 0; public RegionErrorHandler( RegionManager regionManager, @@ -51,6 +59,8 @@ public RegionErrorHandler( this.recv = recv; this.regionManager = regionManager; this.getRegionError = getRegionError; + this.cacheInvalidateCallBackList = regionManager.getCacheInvalidateCallbackList(); + this.callBackThreadPool = regionManager.getCallBackThreadPool(); } @Override @@ -107,6 +117,7 @@ public boolean handleRegionError(BackOffer backOffer, Errorpb.Error error) { if (!retry) { this.regionManager.invalidateRegion(recv.getRegion()); + notifyRegionLeaderError(recv.getRegion()); } backOffer.doBackOff(backOffFuncType, new GrpcException(error.toString())); @@ -116,15 +127,14 @@ public boolean handleRegionError(BackOffer backOffer, Errorpb.Error error) { // this error is reported from raftstore: // store_id requested at the moment is inconsistent with that expected // Solution:re-fetch from PD - long storeId = recv.getRegion().getLeader().getStoreId(); + long storeId = error.getStoreNotMatch().getRequestStoreId(); long actualStoreId = error.getStoreNotMatch().getActualStoreId(); logger.warn( String.format( "Store Not Match happened with region id %d, store id %d, actual store id %d", recv.getRegion().getId(), storeId, actualStoreId)); - - this.regionManager.invalidateRegion(recv.getRegion()); - this.regionManager.invalidateStore(storeId); + // may request store which is not leader. + invalidateRegionStoreCache(recv.getRegion(), storeId); // assume this is a low probability error, do not retry, just re-split the request by // throwing it out. return false; @@ -143,8 +153,6 @@ public boolean handleRegionError(BackOffer backOffer, Errorpb.Error error) { BackOffFunction.BackOffFuncType.BoServerBusy, new StatusRuntimeException( Status.fromCode(Status.Code.UNAVAILABLE).withDescription(error.toString()))); - backOffer.doBackOff( - BackOffFunction.BackOffFuncType.BoRegionMiss, new GrpcException(error.getMessage())); return true; } else if (error.hasStaleCommand()) { // this error is reported from raftstore: @@ -179,7 +187,7 @@ else if (error.getMessage().contains("TsoBatchUsedUp")) { logger.warn(String.format("Unknown error %s for region [%s]", error, recv.getRegion())); // For other errors, we only drop cache here. // Upper level may split this task. - invalidateRegionStoreCache(recv.getRegion()); + invalidateRegionStoreCache(recv.getRegion(), recv.getRegion().getLeader().getStoreId()); // retry if raft proposal is dropped, it indicates the store is in the middle of transition if (error.getMessage().contains("Raft ProposalDropped")) { backOffer.doBackOff( @@ -196,6 +204,7 @@ else if (error.getMessage().contains("TsoBatchUsedUp")) { private boolean onRegionEpochNotMatch(BackOffer backOffer, List currentRegions) { if (currentRegions.size() == 0) { this.regionManager.onRegionStale(recv.getRegion()); + notifyRegionCacheInvalidate(recv.getRegion()); return false; } @@ -229,6 +238,7 @@ private boolean onRegionEpochNotMatch(BackOffer backOffer, List c } if (needInvalidateOld) { + notifyRegionCacheInvalidate(recv.getRegion()); this.regionManager.onRegionStale(recv.getRegion()); } @@ -271,8 +281,51 @@ public TiRegion getRegion() { return recv.getRegion(); } - private void invalidateRegionStoreCache(TiRegion ctxRegion) { + private void notifyRegionRequestError( + TiRegion ctxRegion, long storeId, CacheInvalidateEvent.CacheType type) { + CacheInvalidateEvent event; + // When store(region) id is invalid, + // it implies that the error was not caused by store(region) error. + switch (type) { + case REGION: + case LEADER: + event = new CacheInvalidateEvent(ctxRegion.getId(), INVALID_STORE_ID, true, false, type); + break; + case REGION_STORE: + event = new CacheInvalidateEvent(ctxRegion.getId(), storeId, true, true, type); + break; + case REQ_FAILED: + event = new CacheInvalidateEvent(INVALID_REGION_ID, INVALID_STORE_ID, false, false, type); + break; + default: + throw new IllegalArgumentException("Unexpect invalid cache invalid type " + type); + } + if (cacheInvalidateCallBackList != null) { + for (Function cacheInvalidateCallBack : + cacheInvalidateCallBackList) { + callBackThreadPool.submit( + () -> { + try { + cacheInvalidateCallBack.apply(event); + } catch (Exception e) { + logger.error(String.format("CacheInvalidCallBack failed %s", e)); + } + }); + } + } + } + + private void invalidateRegionStoreCache(TiRegion ctxRegion, long storeId) { regionManager.invalidateRegion(ctxRegion); - regionManager.invalidateStore(ctxRegion.getLeader().getStoreId()); + regionManager.invalidateStore(storeId); + notifyRegionRequestError(ctxRegion, storeId, CacheType.REGION_STORE); + } + + private void notifyRegionCacheInvalidate(TiRegion ctxRegion) { + notifyRegionRequestError(ctxRegion, 0, CacheType.REGION); + } + + private void notifyRegionLeaderError(TiRegion ctxRegion) { + notifyRegionRequestError(ctxRegion, 0, CacheType.LEADER); } } diff --git a/src/main/java/org/tikv/common/region/RegionManager.java b/src/main/java/org/tikv/common/region/RegionManager.java index 2d84f9988e4..37c3d73f759 100644 --- a/src/main/java/org/tikv/common/region/RegionManager.java +++ b/src/main/java/org/tikv/common/region/RegionManager.java @@ -23,14 +23,18 @@ import io.prometheus.client.Histogram; import java.util.ArrayList; import java.util.List; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Function; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.tikv.common.ReadOnlyPDClient; import org.tikv.common.TiConfiguration; +import org.tikv.common.event.CacheInvalidateEvent; import org.tikv.common.exception.GrpcException; import org.tikv.common.exception.InvalidStoreException; import org.tikv.common.exception.TiClientInternalException; @@ -69,10 +73,36 @@ public class RegionManager { private final TiConfiguration conf; private final ScheduledExecutorService executor; private final StoreHealthyChecker storeChecker; + private final CopyOnWriteArrayList> + cacheInvalidateCallbackList; + private final ExecutorService callBackThreadPool; private AtomicInteger tiflashStoreIndex = new AtomicInteger(0); public RegionManager( TiConfiguration conf, ReadOnlyPDClient pdClient, ChannelFactory channelFactory) { + this(conf, pdClient, channelFactory, 1); + } + + public RegionManager(TiConfiguration conf, ReadOnlyPDClient pdClient) { + this(conf, pdClient, 1); + } + + public RegionManager( + TiConfiguration conf, ReadOnlyPDClient pdClient, int callBackExecutorThreadNum) { + this.cache = new RegionCache(); + this.pdClient = pdClient; + this.conf = conf; + this.storeChecker = null; + this.executor = null; + this.cacheInvalidateCallbackList = new CopyOnWriteArrayList<>(); + this.callBackThreadPool = Executors.newFixedThreadPool(callBackExecutorThreadNum); + } + + public RegionManager( + TiConfiguration conf, + ReadOnlyPDClient pdClient, + ChannelFactory channelFactory, + int callBackExecutorThreadNum) { this.cache = new RegionCache(); this.pdClient = pdClient; this.conf = conf; @@ -83,26 +113,34 @@ public RegionManager( this.storeChecker = storeChecker; this.executor = Executors.newScheduledThreadPool(1); this.executor.scheduleAtFixedRate(storeChecker, period, period, TimeUnit.MILLISECONDS); - } - - public RegionManager(TiConfiguration conf, ReadOnlyPDClient pdClient) { - this.cache = new RegionCache(); - this.pdClient = pdClient; - this.conf = conf; - this.storeChecker = null; - this.executor = null; + this.cacheInvalidateCallbackList = new CopyOnWriteArrayList<>(); + this.callBackThreadPool = Executors.newFixedThreadPool(callBackExecutorThreadNum); } public synchronized void close() { if (this.executor != null) { this.executor.shutdownNow(); } + this.callBackThreadPool.shutdownNow(); } public ReadOnlyPDClient getPDClient() { return this.pdClient; } + public ExecutorService getCallBackThreadPool() { + return callBackThreadPool; + } + + public List> getCacheInvalidateCallbackList() { + return cacheInvalidateCallbackList; + } + + public void addCacheInvalidateCallback( + Function cacheInvalidateCallback) { + this.cacheInvalidateCallbackList.add(cacheInvalidateCallback); + } + public void invalidateAll() { cache.invalidateAll(); } diff --git a/src/test/java/org/tikv/common/CacheInvalidCallBackTest.java b/src/test/java/org/tikv/common/CacheInvalidCallBackTest.java new file mode 100644 index 00000000000..5e4f0a992a0 --- /dev/null +++ b/src/test/java/org/tikv/common/CacheInvalidCallBackTest.java @@ -0,0 +1,130 @@ +/* + * Copyright 2022 TiKV Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.tikv.common; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.fail; + +import com.google.protobuf.ByteString; +import java.util.ArrayList; +import java.util.List; +import java.util.Optional; +import java.util.function.Function; +import org.junit.Test; +import org.tikv.common.event.CacheInvalidateEvent; +import org.tikv.common.region.RegionManager; +import org.tikv.common.region.RegionStoreClient; +import org.tikv.common.region.RegionStoreClient.RegionStoreClientBuilder; +import org.tikv.common.region.TiStore; +import org.tikv.common.util.BackOffer; +import org.tikv.common.util.ConcreteBackOffer; +import org.tikv.kvproto.Errorpb; +import org.tikv.kvproto.Errorpb.EpochNotMatch; +import org.tikv.kvproto.Errorpb.NotLeader; +import org.tikv.kvproto.Errorpb.StoreNotMatch; +import org.tikv.kvproto.Metapb; + +public class CacheInvalidCallBackTest extends MockServerTest { + + private RegionStoreClient createClient( + String version, Function cacheInvalidateCallBack) { + Metapb.Store meta = + Metapb.Store.newBuilder() + .setAddress(LOCAL_ADDR + ":" + port) + .setId(1) + .setState(Metapb.StoreState.Up) + .setVersion(version) + .build(); + TiStore store = new TiStore(meta); + + RegionManager manager = new RegionManager(session.getConf(), session.getPDClient()); + manager.addCacheInvalidateCallback(cacheInvalidateCallBack); + RegionStoreClientBuilder builder = + new RegionStoreClientBuilder( + session.getConf(), session.getChannelFactory(), manager, session.getPDClient()); + + return builder.build(region, store); + } + + @Test + public void testcacheInvalidCallBack() { + String version = "3.0.12"; + CacheInvalidateCallBack cacheInvalidateCallBack = new CacheInvalidateCallBack(); + doRawGetTest(createClient(version, cacheInvalidateCallBack), cacheInvalidateCallBack); + } + + public void doRawGetTest( + RegionStoreClient client, CacheInvalidateCallBack cacheInvalidateCallBack) { + server.put("key1", "value1"); + Optional value = client.rawGet(defaultBackOff(), ByteString.copyFromUtf8("key1")); + assertEquals(ByteString.copyFromUtf8("value1"), value.get()); + try { + server.putError( + "error1", () -> Errorpb.Error.newBuilder().setNotLeader(NotLeader.getDefaultInstance())); + client.rawGet(defaultBackOff(), ByteString.copyFromUtf8("error1")); + fail(); + } catch (Exception e) { + assertEquals(1, cacheInvalidateCallBack.cacheInvalidateEvents.size()); + } + server.putError( + "failure", + () -> Errorpb.Error.newBuilder().setEpochNotMatch(EpochNotMatch.getDefaultInstance())); + try { + client.rawGet(defaultBackOff(), ByteString.copyFromUtf8("failure")); + fail(); + } catch (Exception e) { + sleep(1000); + assertEquals(2, cacheInvalidateCallBack.cacheInvalidateEvents.size()); + } + server.putError( + "store_not_match", + () -> Errorpb.Error.newBuilder().setStoreNotMatch(StoreNotMatch.getDefaultInstance())); + try { + client.rawGet(defaultBackOff(), ByteString.copyFromUtf8("failure")); + fail(); + } catch (Exception e) { + sleep(1000); + assertEquals(3, cacheInvalidateCallBack.cacheInvalidateEvents.size()); + } + server.clearAllMap(); + client.close(); + } + + private void sleep(int time) { + try { + Thread.sleep(time); + } catch (InterruptedException e) { + fail(); + } + } + + private BackOffer defaultBackOff() { + return ConcreteBackOffer.newCustomBackOff(1000); + } + + static class CacheInvalidateCallBack implements Function { + + public List cacheInvalidateEvents = new ArrayList<>(); + + @Override + public Void apply(CacheInvalidateEvent cacheInvalidateEvent) { + cacheInvalidateEvents.add(cacheInvalidateEvent); + return null; + } + } +} From 2c48b4a3587ed7371fa80fc783a1580997371282 Mon Sep 17 00:00:00 2001 From: shi yuhang <52435083+shiyuhang0@users.noreply.github.com> Date: Fri, 3 Mar 2023 23:55:40 +0800 Subject: [PATCH 12/22] fix pair equality check (#724) Signed-off-by: shiyuhang <1136742008@qq.com> --- src/main/java/org/tikv/common/util/Pair.java | 18 +++++ .../java/org/tikv/common/util/PairTest.java | 74 +++++++++++++++++++ 2 files changed, 92 insertions(+) create mode 100644 src/test/java/org/tikv/common/util/PairTest.java diff --git a/src/main/java/org/tikv/common/util/Pair.java b/src/main/java/org/tikv/common/util/Pair.java index 803880a9d14..65ae082e788 100644 --- a/src/main/java/org/tikv/common/util/Pair.java +++ b/src/main/java/org/tikv/common/util/Pair.java @@ -18,6 +18,7 @@ package org.tikv.common.util; import java.io.Serializable; +import java.util.Objects; public class Pair implements Serializable { public final F first; @@ -36,4 +37,21 @@ public static Pair create(F f, S s) { public String toString() { return String.format("[%s:%s]", first, second); } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + Pair pair = (Pair) o; + return Objects.equals(first, pair.first) && Objects.equals(second, pair.second); + } + + @Override + public int hashCode() { + return Objects.hash(first, second); + } } diff --git a/src/test/java/org/tikv/common/util/PairTest.java b/src/test/java/org/tikv/common/util/PairTest.java new file mode 100644 index 00000000000..b1fd0c6bc98 --- /dev/null +++ b/src/test/java/org/tikv/common/util/PairTest.java @@ -0,0 +1,74 @@ +/* + * Copyright 2023 TiKV Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.tikv.common.util; + +import com.google.common.collect.ImmutableList; +import com.google.protobuf.ByteString; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; +import org.junit.Test; +import org.tikv.common.PDMockServerTest; +import org.tikv.common.region.TiRegion; +import org.tikv.common.region.TiStore; +import org.tikv.kvproto.Metapb; +import org.tikv.kvproto.Metapb.Peer; + +public class PairTest extends PDMockServerTest { + + @Test + public void testPair() { + Metapb.Region r = + Metapb.Region.newBuilder() + .setRegionEpoch(Metapb.RegionEpoch.newBuilder().setConfVer(1).setVersion(2)) + .setId(233) + .setStartKey(ByteString.EMPTY) + .setEndKey(ByteString.EMPTY) + .addPeers(Peer.getDefaultInstance()) + .build(); + List s = + ImmutableList.of( + Metapb.Store.newBuilder() + .setAddress(LOCAL_ADDR + ":" + 4000) + .setVersion("5.0.0") + .setId(1) + .build()); + + TiRegion region = + new TiRegion( + session.getConf(), + r, + r.getPeers(0), + r.getPeersList(), + s.stream().map(TiStore::new).collect(Collectors.toList())); + TiStore store = new TiStore(s.get(0)); + + Map, List> groupKeyMap = new HashMap<>(); + + for (int i = 0; i < 10; i++) { + Pair pair = Pair.create(region, store); + groupKeyMap + .computeIfAbsent(pair, e -> new ArrayList<>()) + .add(ByteString.copyFromUtf8("test")); + } + Pair pair = Pair.create(region, store); + assert (groupKeyMap.get(pair).size() == 10); + } +} From bcd11f34ca2b59c4a0ec0691a40adaf6e534e183 Mon Sep 17 00:00:00 2001 From: shi yuhang <52435083+shiyuhang0@users.noreply.github.com> Date: Tue, 7 Mar 2023 14:21:08 +0800 Subject: [PATCH 13/22] [to #773] implement UpdateServiceGCSafePoint (#723) * implement UpdateServiceGCSafePoint Signed-off-by: shiyuhang <1136742008@qq.com> --- src/main/java/org/tikv/common/PDClient.java | 26 +++++++++++++++++++ .../org/tikv/common/ReadOnlyPDClient.java | 12 +++++++++ 2 files changed, 38 insertions(+) diff --git a/src/main/java/org/tikv/common/PDClient.java b/src/main/java/org/tikv/common/PDClient.java index 43383bbda87..e3695b69161 100644 --- a/src/main/java/org/tikv/common/PDClient.java +++ b/src/main/java/org/tikv/common/PDClient.java @@ -104,6 +104,7 @@ import org.tikv.kvproto.Pdpb.Timestamp; import org.tikv.kvproto.Pdpb.TsoRequest; import org.tikv.kvproto.Pdpb.TsoResponse; +import org.tikv.kvproto.Pdpb.UpdateServiceGCSafePointRequest; public class PDClient extends AbstractGRPCClient implements ReadOnlyPDClient { @@ -383,6 +384,17 @@ private Supplier buildGetAllStoresReq() { return () -> GetAllStoresRequest.newBuilder().setHeader(header).build(); } + private Supplier buildUpdateServiceGCSafePointRequest( + ByteString serviceId, long ttl, long safePoint) { + return () -> + UpdateServiceGCSafePointRequest.newBuilder() + .setHeader(header) + .setSafePoint(safePoint) + .setServiceId(serviceId) + .setTTL(ttl) + .build(); + } + private PDErrorHandler buildPDErrorHandler() { return new PDErrorHandler<>( r -> r.getHeader().hasError() ? buildFromPdpbError(r.getHeader().getError()) : null, this); @@ -419,6 +431,20 @@ public TiConfiguration.ReplicaRead getReplicaRead() { return conf.getReplicaRead(); } + @Override + public Long updateServiceGCSafePoint( + String serviceId, long ttl, long safePoint, BackOffer backOffer) { + return callWithRetry( + backOffer, + PDGrpc.getUpdateServiceGCSafePointMethod(), + buildUpdateServiceGCSafePointRequest( + ByteString.copyFromUtf8(serviceId), ttl, safePoint), + new PDErrorHandler<>( + r -> r.getHeader().hasError() ? buildFromPdpbError(r.getHeader().getError()) : null, + this)) + .getMinSafePoint(); + } + @Override public void close() throws InterruptedException { etcdClient.close(); diff --git a/src/main/java/org/tikv/common/ReadOnlyPDClient.java b/src/main/java/org/tikv/common/ReadOnlyPDClient.java index ddf1855e614..58ad9b2a626 100644 --- a/src/main/java/org/tikv/common/ReadOnlyPDClient.java +++ b/src/main/java/org/tikv/common/ReadOnlyPDClient.java @@ -72,4 +72,16 @@ List scanRegions( Long getClusterId(); RequestKeyCodec getCodec(); + + /** + * Update ServiceGCSafePoint + * + * @param serviceId ServiceId + * @param ttl TTL in seconds + * @param safePoint The TiTimestamp you want to set. Set to start_ts.getPrevious() is a good + * practice + * @return the MinSafePoint of all services. If this value is greater than safePoint, it means + * update failed + */ + Long updateServiceGCSafePoint(String serviceId, long ttl, long safePoint, BackOffer backOffer); } From c1c804c865ce6f77f241abdd6b6c593011d02ad2 Mon Sep 17 00:00:00 2001 From: Jiaming Lu Date: Tue, 7 Mar 2023 14:39:31 +0800 Subject: [PATCH 14/22] Upgrade protobuf version so that it will build on M1 Macs (#714) Signed-off-by: Jiaming Lu Co-authored-by: Jiaming Lu Co-authored-by: shi yuhang <52435083+shiyuhang0@users.noreply.github.com> --- pom.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pom.xml b/pom.xml index 1b98009f300..2254b45bb60 100644 --- a/pom.xml +++ b/pom.xml @@ -54,7 +54,7 @@ 1.8 UTF-8 UTF-8 - 3.5.1 + 3.18.0 1.2.17 1.7.16 1.48.0 From a523312f0102ba69da7850e18161c19196c3ad7a Mon Sep 17 00:00:00 2001 From: shi yuhang <52435083+shiyuhang0@users.noreply.github.com> Date: Tue, 7 Mar 2023 15:26:56 +0800 Subject: [PATCH 15/22] fix byte overflow (#728) Signed-off-by: shiyuhang <1136742008@qq.com> --- src/main/java/org/tikv/common/codec/RowV2.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/main/java/org/tikv/common/codec/RowV2.java b/src/main/java/org/tikv/common/codec/RowV2.java index 44891e4e917..6893894a7bd 100644 --- a/src/main/java/org/tikv/common/codec/RowV2.java +++ b/src/main/java/org/tikv/common/codec/RowV2.java @@ -147,7 +147,7 @@ private int binarySearch(int i, int j, long colID) { if (this.large) { v = this.colIDs32[h]; } else { - v = this.colIDs[h]; + v = this.colIDs[h] & 0xFF; } if (v < colID) { i = h + 1; From 91b143988b4504ce28848f93279d3f9565734bfe Mon Sep 17 00:00:00 2001 From: shi yuhang <52435083+shiyuhang0@users.noreply.github.com> Date: Thu, 27 Apr 2023 18:14:52 +0800 Subject: [PATCH 16/22] fix CI (#741) Signed-off-by: shiyuhang <1136742008@qq.com> --- .github/workflows/ci.yml | 13 +++++-------- 1 file changed, 5 insertions(+), 8 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index c2fbf68aa88..354a7b5e938 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -44,23 +44,20 @@ jobs: - name: Start TiUP Playground run: | # Start TiKV in APIV1TTL - /home/runner/.tiup/bin/tiup playground ${{ matrix.tikv_version }} --mode tikv-slim --kv 1 --without-monitor --kv.config /home/runner/work/client-java/client-java/.github/config/tikv_rawkv.toml --pd.config /home/runner/work/client-java/client-java/.github/config/pd.toml &> raw.out 2>&1 & + /home/runner/.tiup/bin/tiup playground ${{ matrix.tikv_version }} --mode tikv-slim --kv 1 --without-monitor --kv.config /home/runner/work/client-java/client-java/.github/config/tikv_rawkv.toml --pd.config /home/runner/work/client-java/client-java/.github/config/pd.toml --pd.port 2379 2>&1 & # The first run of `tiup` has to download all components so it'll take longer. sleep 1m 30s # Start TiKV in APIV1 - /home/runner/.tiup/bin/tiup playground ${{ matrix.tikv_version }} --mode tikv-slim --kv 1 --without-monitor --kv.config /home/runner/work/client-java/client-java/.github/config/tikv_txnkv.toml --pd.config /home/runner/work/client-java/client-java/.github/config/pd.toml &> txn.out 2>&1 & + /home/runner/.tiup/bin/tiup playground ${{ matrix.tikv_version }} --mode tikv-slim --kv 1 --without-monitor --kv.config /home/runner/work/client-java/client-java/.github/config/tikv_txnkv.toml --pd.config /home/runner/work/client-java/client-java/.github/config/pd.toml --pd.port 2381 2>&1 & sleep 30s - # Parse PD address from `tiup` output - echo "RAWKV_PD_ADDRESSES=$(cat raw.out | grep -oP '(?<=PD client endpoints: \[)[0-9\.:]+(?=\])')" >> $GITHUB_ENV - echo "TXNKV_PD_ADDRESSES=$(cat txn.out | grep -oP '(?<=PD client endpoints: \[)[0-9\.:]+(?=\])')" >> $GITHUB_ENV + # Get PD address + echo "RAWKV_PD_ADDRESSES=127.0.0.1:2379" >> $GITHUB_ENV + echo "TXNKV_PD_ADDRESSES=127.0.0.1:2381" >> $GITHUB_ENV - # Log the output - echo "$(cat raw.out)" >&2 - echo "$(cat txn.out)" >&2 - name: Run Integration Test run: mvn clean test - name: Upload coverage From e8feb2334486ba0808cb749b70732900282620e5 Mon Sep 17 00:00:00 2001 From: shi yuhang <52435083+shiyuhang0@users.noreply.github.com> Date: Thu, 27 Apr 2023 18:35:45 +0800 Subject: [PATCH 17/22] [close #736] Select reachable store (#737) Signed-off-by: shiyuhang <1136742008@qq.com> --- pom.xml | 2 +- .../org/tikv/common/region/RegionManager.java | 17 +++++++++++++++-- .../java/org/tikv/common/region/TiRegion.java | 8 ++++++++ 3 files changed, 24 insertions(+), 3 deletions(-) diff --git a/pom.xml b/pom.xml index 2254b45bb60..2235c4d3cb1 100644 --- a/pom.xml +++ b/pom.xml @@ -3,7 +3,7 @@ 4.0.0 org.tikv tikv-client-java - 3.3.0-SNAPSHOT + 3.3.4-SNAPSHOT jar TiKV Java Client A Java Client for TiKV diff --git a/src/main/java/org/tikv/common/region/RegionManager.java b/src/main/java/org/tikv/common/region/RegionManager.java index 37c3d73f759..129afaf9d50 100644 --- a/src/main/java/org/tikv/common/region/RegionManager.java +++ b/src/main/java/org/tikv/common/region/RegionManager.java @@ -228,12 +228,25 @@ public Pair getRegionStorePairByKey( TiStore store = null; if (storeType == TiStoreType.TiKV) { - Peer peer = region.getCurrentReplica(); - store = getStoreById(peer.getStoreId(), backOffer); + // check from the first replica in case it recovers + List replicaList = region.getReplicaList(); + for (int i = 0; i < replicaList.size(); i++) { + Peer peer = replicaList.get(i); + store = getStoreById(peer.getStoreId(), backOffer); + if (store.isReachable()) { + // update replica's index + region.setReplicaIdx(i); + break; + } + logger.info("Store {} is unreachable, try to get the next replica", peer.getStoreId()); + } } else { List tiflashStores = new ArrayList<>(); for (Peer peer : region.getLearnerList()) { TiStore s = getStoreById(peer.getStoreId(), backOffer); + if (!s.isReachable()) { + continue; + } for (Metapb.StoreLabel label : s.getStore().getLabelsList()) { if (label.getKey().equals(storeType.getLabelKey()) && label.getValue().equals(storeType.getLabelValue())) { diff --git a/src/main/java/org/tikv/common/region/TiRegion.java b/src/main/java/org/tikv/common/region/TiRegion.java index 3c0ce8e48d4..9db3397c5ef 100644 --- a/src/main/java/org/tikv/common/region/TiRegion.java +++ b/src/main/java/org/tikv/common/region/TiRegion.java @@ -126,6 +126,14 @@ public Peer getNextReplica() { return getCurrentReplica(); } + public void setReplicaIdx(int idx) { + replicaIdx = idx; + } + + public List getReplicaList() { + return replicaList; + } + private boolean isLeader(Peer peer) { return getLeader().equals(peer); } From 8d70ed281610be3db35959106da2c61c8dbbfa22 Mon Sep 17 00:00:00 2001 From: "dependabot[bot]" <49699333+dependabot[bot]@users.noreply.github.com> Date: Thu, 27 Apr 2023 22:51:54 +0800 Subject: [PATCH 18/22] Bump jackson-databind from 2.13.2.2 to 2.13.4.2 (#735) Signed-off-by: dependabot[bot] --- pom.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pom.xml b/pom.xml index 2235c4d3cb1..dd34f060b3f 100644 --- a/pom.xml +++ b/pom.xml @@ -62,7 +62,7 @@ 2.8.9 1.6.6 2.13.2 - 2.13.2.2 + 2.13.4.2 3.0.1 0.4.1 2.9.9 From 71676ee36933d25a65290e08aef0cc74c0732a66 Mon Sep 17 00:00:00 2001 From: shi yuhang <52435083+shiyuhang0@users.noreply.github.com> Date: Thu, 25 May 2023 21:20:34 +0800 Subject: [PATCH 19/22] [close #749] Fix health checking issue (#748) Signed-off-by: shiyuhang <1136742008@qq.com> --- .../org/tikv/common/region/RegionManager.java | 7 ++--- .../common/region/StoreHealthyChecker.java | 30 +++++++++++++++++++ .../java/org/tikv/common/region/TiStore.java | 10 +++++++ 3 files changed, 42 insertions(+), 5 deletions(-) diff --git a/src/main/java/org/tikv/common/region/RegionManager.java b/src/main/java/org/tikv/common/region/RegionManager.java index 129afaf9d50..3a76aa8bebe 100644 --- a/src/main/java/org/tikv/common/region/RegionManager.java +++ b/src/main/java/org/tikv/common/region/RegionManager.java @@ -247,11 +247,8 @@ public Pair getRegionStorePairByKey( if (!s.isReachable()) { continue; } - for (Metapb.StoreLabel label : s.getStore().getLabelsList()) { - if (label.getKey().equals(storeType.getLabelKey()) - && label.getValue().equals(storeType.getLabelValue())) { - tiflashStores.add(s); - } + if (s.isTiFlash()) { + tiflashStores.add(s); } } // select a tiflash with Round-Robin strategy diff --git a/src/main/java/org/tikv/common/region/StoreHealthyChecker.java b/src/main/java/org/tikv/common/region/StoreHealthyChecker.java index 8d305649c4e..3ae3f40d1f5 100644 --- a/src/main/java/org/tikv/common/region/StoreHealthyChecker.java +++ b/src/main/java/org/tikv/common/region/StoreHealthyChecker.java @@ -20,17 +20,22 @@ import io.grpc.health.v1.HealthCheckRequest; import io.grpc.health.v1.HealthCheckResponse; import io.grpc.health.v1.HealthGrpc; +import io.grpc.stub.ClientCalls; import java.util.LinkedList; import java.util.List; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; +import java.util.function.Supplier; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.tikv.common.ReadOnlyPDClient; import org.tikv.common.util.ChannelFactory; import org.tikv.common.util.ConcreteBackOffer; import org.tikv.kvproto.Metapb; +import org.tikv.kvproto.Mpp; +import org.tikv.kvproto.Mpp.IsAliveRequest; +import org.tikv.kvproto.TikvGrpc; public class StoreHealthyChecker implements Runnable { private static final Logger logger = LoggerFactory.getLogger(StoreHealthyChecker.class); @@ -75,6 +80,30 @@ private List getValidStores() { private boolean checkStoreHealth(TiStore store) { String addressStr = store.getStore().getAddress(); + if (store.isTiFlash()) { + return checkTiFlashHealth(addressStr); + } + return checkTiKVHealth(addressStr); + } + + private boolean checkTiFlashHealth(String addressStr) { + try { + ManagedChannel channel = channelFactory.getChannel(addressStr, pdClient.getHostMapping()); + TikvGrpc.TikvBlockingStub stub = + TikvGrpc.newBlockingStub(channel).withDeadlineAfter(timeout, TimeUnit.MILLISECONDS); + Supplier factory = () -> Mpp.IsAliveRequest.newBuilder().build(); + Mpp.IsAliveResponse resp = + ClientCalls.blockingUnaryCall( + stub.getChannel(), TikvGrpc.getIsAliveMethod(), stub.getCallOptions(), factory.get()); + return resp != null && resp.getAvailable(); + } catch (Exception e) { + logger.info( + "fail to check TiFlash health, regard as unhealthy. TiFlash address: " + addressStr, e); + return false; + } + } + + private boolean checkTiKVHealth(String addressStr) { try { ManagedChannel channel = channelFactory.getChannel(addressStr, pdClient.getHostMapping()); HealthGrpc.HealthBlockingStub stub = @@ -83,6 +112,7 @@ private boolean checkStoreHealth(TiStore store) { HealthCheckResponse resp = stub.check(req); return resp.getStatus() == HealthCheckResponse.ServingStatus.SERVING; } catch (Exception e) { + logger.info("fail to check TiKV health, regard as unhealthy. TiKV address: " + addressStr, e); return false; } } diff --git a/src/main/java/org/tikv/common/region/TiStore.java b/src/main/java/org/tikv/common/region/TiStore.java index 8513e2b56e1..5feaa246fe5 100644 --- a/src/main/java/org/tikv/common/region/TiStore.java +++ b/src/main/java/org/tikv/common/region/TiStore.java @@ -105,4 +105,14 @@ public Metapb.Store getProxyStore() { public long getId() { return this.store.getId(); } + + public boolean isTiFlash() { + for (Metapb.StoreLabel label : store.getLabelsList()) { + if (label.getKey().equals(TiStoreType.TiFlash.getLabelKey()) + && label.getValue().equals(TiStoreType.TiFlash.getLabelValue())) { + return true; + } + } + return false; + } } From cb26d58a41c30549593bdf0c2fcedbc6d2642961 Mon Sep 17 00:00:00 2001 From: shi yuhang <52435083+shiyuhang0@users.noreply.github.com> Date: Wed, 31 May 2023 16:50:33 +0800 Subject: [PATCH 20/22] [fix #740] Add more logs in getregionstore (#751) * optimize getregionstore logical Signed-off-by: shiyuhang <1136742008@qq.com> * decrease impact Signed-off-by: shiyuhang <1136742008@qq.com> * Update RegionManager.java Signed-off-by: shiyuhang <1136742008@qq.com> * [close #749] Fix health checking issue (#748) Signed-off-by: shiyuhang <1136742008@qq.com> * Update RegionManager.java Signed-off-by: shiyuhang <1136742008@qq.com> * add log Signed-off-by: shiyuhang <1136742008@qq.com> * change log level Signed-off-by: shiyuhang <1136742008@qq.com> --------- Signed-off-by: shiyuhang <1136742008@qq.com> --- src/main/java/org/tikv/common/region/RegionManager.java | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/src/main/java/org/tikv/common/region/RegionManager.java b/src/main/java/org/tikv/common/region/RegionManager.java index 3a76aa8bebe..9678d9e813b 100644 --- a/src/main/java/org/tikv/common/region/RegionManager.java +++ b/src/main/java/org/tikv/common/region/RegionManager.java @@ -177,8 +177,13 @@ public TiRegion getRegionByKey(ByteString key, BackOffer backOffer) { Pair regionAndLeader = pdClient.getRegionByKey(backOffer, key); region = cache.putRegion(createRegion(regionAndLeader.first, regionAndLeader.second, backOffer)); + logger.debug( + String.format( + "get region id: %d with leader: %d", + region.getId(), region.getLeader().getStoreId())); } } catch (Exception e) { + logger.warn("Get region failed: ", e); return null; } finally { requestTimer.observeDuration(); @@ -240,6 +245,10 @@ public Pair getRegionStorePairByKey( } logger.info("Store {} is unreachable, try to get the next replica", peer.getStoreId()); } + // Does not set unreachable store to null in case it is incompatible with GrpcForward + if (store == null || !store.isReachable()) { + logger.warn("No TiKV store available for region: " + region); + } } else { List tiflashStores = new ArrayList<>(); for (Peer peer : region.getLearnerList()) { From 3b503fbf4d30c8caa699ae1c16e0f8be311069f8 Mon Sep 17 00:00:00 2001 From: Ping Yu Date: Sat, 7 Oct 2023 16:01:36 +0800 Subject: [PATCH 21/22] [close #772] Fix flaky integration tests (#770) * wait for tiup playground by grep Signed-off-by: Ping Yu * test on one version first Signed-off-by: Ping Yu * wip Signed-off-by: Ping Yu * reduce tikv resource usage Signed-off-by: Ping Yu * wip Signed-off-by: Ping Yu * wait for bootstrap Signed-off-by: Ping Yu * print logs on error Signed-off-by: Ping Yu * add api v2 test suite Signed-off-by: Ping Yu * use different port Signed-off-by: Ping Yu * specify kv.port Signed-off-by: Ping Yu * add more versions Signed-off-by: Ping Yu --------- Signed-off-by: Ping Yu --- .github/config/tikv_rawkv.toml | 11 ++++--- .github/config/tikv_txnkv.toml | 8 +++++- .github/config/tikv_v2.toml | 17 +++++++++++ .github/workflows/ci.yml | 34 ++++++++++++++++------ .github/workflows/ci_v2.yml | 52 ++++++++++++++++++++++++++++++++++ 5 files changed, 108 insertions(+), 14 deletions(-) create mode 100644 .github/config/tikv_v2.toml create mode 100644 .github/workflows/ci_v2.yml diff --git a/.github/config/tikv_rawkv.toml b/.github/config/tikv_rawkv.toml index 5ee1bfe07ed..c339b486398 100644 --- a/.github/config/tikv_rawkv.toml +++ b/.github/config/tikv_rawkv.toml @@ -2,17 +2,20 @@ [raftstore] # set store capacity, if no set, use disk capacity. -capacity = "8G" +capacity = "6G" pd-heartbeat-tick-interval = "2s" pd-store-heartbeat-tick-interval = "5s" split-region-check-tick-interval = "1s" -[storage] -enable-ttl = true - [rocksdb] max-open-files = 10000 [raftdb] max-open-files = 10000 +[storage.block-cache] +capacity = "128MB" + +[storage] +reserve-space = "0MB" +enable-ttl = true diff --git a/.github/config/tikv_txnkv.toml b/.github/config/tikv_txnkv.toml index c083cfa31b0..e327632e583 100644 --- a/.github/config/tikv_txnkv.toml +++ b/.github/config/tikv_txnkv.toml @@ -2,7 +2,7 @@ [raftstore] # set store capacity, if no set, use disk capacity. -capacity = "8G" +capacity = "6G" pd-heartbeat-tick-interval = "2s" pd-store-heartbeat-tick-interval = "5s" split-region-check-tick-interval = "1s" @@ -12,3 +12,9 @@ max-open-files = 10000 [raftdb] max-open-files = 10000 + +[storage.block-cache] +capacity = "128MB" + +[storage] +reserve-space = "0MB" diff --git a/.github/config/tikv_v2.toml b/.github/config/tikv_v2.toml new file mode 100644 index 00000000000..a1b5b657061 --- /dev/null +++ b/.github/config/tikv_v2.toml @@ -0,0 +1,17 @@ +# TiKV Configuration. + +[raftstore] +pd-heartbeat-tick-interval = "2s" +pd-store-heartbeat-tick-interval = "5s" +split-region-check-tick-interval = "1s" + +[rocksdb] +max-open-files = 10000 + +[raftdb] +max-open-files = 10000 + +[storage] +reserve-space = "0MB" +api-version = 2 +enable-ttl = true diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 354a7b5e938..6511ec63c15 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -31,7 +31,8 @@ jobs: runs-on: ubuntu-latest strategy: matrix: - tikv_version: [nightly, v5.0.4, v5.3.0, v5.4.0] + tikv_version: [v5.0.6, v5.3.4, v5.4.3] + fail-fast: false steps: - uses: actions/checkout@v2 - name: Set up JDK 8 @@ -40,19 +41,26 @@ jobs: java-version: '8.0' distribution: 'adopt' - name: Install TiUP - run: curl --proto '=https' --tlsv1.2 -sSf https://tiup-mirrors.pingcap.com/install.sh | sh + run: | + curl --proto '=https' --tlsv1.2 -sSf https://tiup-mirrors.pingcap.com/install.sh | sh + /home/runner/.tiup/bin/tiup install playground pd:${{ matrix.tikv_version }} tikv:${{ matrix.tikv_version }} - name: Start TiUP Playground run: | # Start TiKV in APIV1TTL - /home/runner/.tiup/bin/tiup playground ${{ matrix.tikv_version }} --mode tikv-slim --kv 1 --without-monitor --kv.config /home/runner/work/client-java/client-java/.github/config/tikv_rawkv.toml --pd.config /home/runner/work/client-java/client-java/.github/config/pd.toml --pd.port 2379 2>&1 & - - # The first run of `tiup` has to download all components so it'll take longer. - sleep 1m 30s + touch tiup-v1ttl.log + /home/runner/.tiup/bin/tiup playground ${{ matrix.tikv_version }} --host 127.0.0.1 --tag rawkv --mode tikv-slim --kv 1 --without-monitor --kv.port 20160 --kv.config /home/runner/work/client-java/client-java/.github/config/tikv_rawkv.toml --pd.config /home/runner/work/client-java/client-java/.github/config/pd.toml --pd.port 2379 2>&1 >> tiup-v1ttl.log & + timeout 300 grep -q "PD Endpoints:" <(tail -f tiup-v1ttl.log) + cat tiup-v1ttl.log + echo "Wait for bootstrap" + sleep 10s # Start TiKV in APIV1 - /home/runner/.tiup/bin/tiup playground ${{ matrix.tikv_version }} --mode tikv-slim --kv 1 --without-monitor --kv.config /home/runner/work/client-java/client-java/.github/config/tikv_txnkv.toml --pd.config /home/runner/work/client-java/client-java/.github/config/pd.toml --pd.port 2381 2>&1 & - - sleep 30s + touch tiup-v1.log + /home/runner/.tiup/bin/tiup playground ${{ matrix.tikv_version }} --host 127.0.0.1 --tag txnkv --mode tikv-slim --kv 1 --without-monitor --kv.port 30160 --kv.config /home/runner/work/client-java/client-java/.github/config/tikv_txnkv.toml --pd.config /home/runner/work/client-java/client-java/.github/config/pd.toml --pd.port 2381 2>&1 >> tiup-v1.log & + timeout 300 grep -q "PD Endpoints:" <(tail -f tiup-v1.log) + cat tiup-v1.log + echo "Wait for bootstrap" + sleep 10s # Get PD address echo "RAWKV_PD_ADDRESSES=127.0.0.1:2379" >> $GITHUB_ENV @@ -60,6 +68,14 @@ jobs: - name: Run Integration Test run: mvn clean test + - name: Print TiKV logs + if: failure() + run: | + echo "RawKV TiKV logs" + cat /home/runner/.tiup/data/rawkv/tikv-0/tikv.log + + echo "TxnKV TiKV logs" + cat /home/runner/.tiup/data/txnkv/tikv-0/tikv.log - name: Upload coverage uses: codecov/codecov-action@v2 with: diff --git a/.github/workflows/ci_v2.yml b/.github/workflows/ci_v2.yml new file mode 100644 index 00000000000..be69782a470 --- /dev/null +++ b/.github/workflows/ci_v2.yml @@ -0,0 +1,52 @@ +name: CI (APIv2) + +on: + pull_request: + push: + branches: + - master + +jobs: + integration-test: + name: Integration Test - ${{ matrix.tikv_version }} + runs-on: ubuntu-latest + strategy: + matrix: + tikv_version: [v6.5.3, v7.1.1, nightly] + fail-fast: false + steps: + - uses: actions/checkout@v2 + - name: Set up JDK 8 + uses: actions/setup-java@v2 + with: + java-version: '8.0' + distribution: 'adopt' + - name: Install TiUP + run: | + curl --proto '=https' --tlsv1.2 -sSf https://tiup-mirrors.pingcap.com/install.sh | sh + /home/runner/.tiup/bin/tiup install playground pd:${{ matrix.tikv_version }} tikv:${{ matrix.tikv_version }} + - name: Start TiUP Playground + run: | + # Start TiKV in APIV2 + touch tiup.log + /home/runner/.tiup/bin/tiup playground ${{ matrix.tikv_version }} --tag kv --mode tikv-slim --kv 1 --without-monitor --kv.config /home/runner/work/client-java/client-java/.github/config/tikv_v2.toml --pd.config /home/runner/work/client-java/client-java/.github/config/pd.toml --pd.port 2379 2>&1 >> tiup.log & + timeout 300 grep -q "PD Endpoints:" <(tail -f tiup.log) + cat tiup.log + + # Get PD address + echo "RAWKV_PD_ADDRESSES=127.0.0.1:2379" >> $GITHUB_ENV + echo "TXNKV_PD_ADDRESSES=127.0.0.1:2379" >> $GITHUB_ENV + + - name: Run Integration Test + run: mvn clean test + - name: Print TiKV logs + if: failure() + run: | + echo "TiKV logs" + cat /home/runner/.tiup/data/kv/tikv-0/tikv.log + - name: Upload coverage + uses: codecov/codecov-action@v2 + with: + files: ${{ github.workspace }}/target/site/jacoco/jacoco.xml + fail_ci_if_error: true + verbose: true From 533dbc23a19a7f11e3dd3fbf153d62d81d03f86d Mon Sep 17 00:00:00 2001 From: Ping Yu Date: Sat, 7 Oct 2023 16:16:49 +0800 Subject: [PATCH 22/22] [to #763] Pick bug fixes of API v2 codec to master (#769) * [cherry-pick] ignore invalid region while decoding EpochNotMatch (#765) Signed-off-by: iosmanthus Signed-off-by: Ping Yu * fix region out of keyspace (#766) * fix region decoding Signed-off-by: iosmanthus * add test for decodeRegion Signed-off-by: iosmanthus * add more tests for decodeRegion Signed-off-by: iosmanthus --------- Signed-off-by: iosmanthus Signed-off-by: Ping Yu * [to #763] Complement v2 codec unit test cases (#768) * complement v2 codec unit test cases. Signed-off-by: Ping Yu * fix fmt Signed-off-by: Ping Yu --------- Signed-off-by: Ping Yu --------- Signed-off-by: iosmanthus Signed-off-by: Ping Yu Co-authored-by: iosmanthus --- .../tikv/common/apiversion/CodecUtils.java | 2 +- .../common/apiversion/RequestKeyV2Codec.java | 19 ++--- .../common/operation/RegionErrorHandler.java | 11 ++- .../apiversion/RequestKeyCodecTest.java | 83 ++++++++++++++++++- 4 files changed, 101 insertions(+), 14 deletions(-) diff --git a/src/main/java/org/tikv/common/apiversion/CodecUtils.java b/src/main/java/org/tikv/common/apiversion/CodecUtils.java index 1c6cfea9fa1..a2b0725be5f 100644 --- a/src/main/java/org/tikv/common/apiversion/CodecUtils.java +++ b/src/main/java/org/tikv/common/apiversion/CodecUtils.java @@ -23,7 +23,7 @@ import org.tikv.common.codec.CodecDataOutput; // TODO(iosmanthus): use ByteString.wrap to avoid once more copying. -class CodecUtils { +public class CodecUtils { public static ByteString encode(ByteString key) { CodecDataOutput cdo = new CodecDataOutput(); BytesCodec.writeBytes(cdo, key.toByteArray()); diff --git a/src/main/java/org/tikv/common/apiversion/RequestKeyV2Codec.java b/src/main/java/org/tikv/common/apiversion/RequestKeyV2Codec.java index 11db11c6414..ab86fb5e020 100644 --- a/src/main/java/org/tikv/common/apiversion/RequestKeyV2Codec.java +++ b/src/main/java/org/tikv/common/apiversion/RequestKeyV2Codec.java @@ -81,22 +81,21 @@ public Region decodeRegion(Region region) { if (!start.isEmpty()) { start = CodecUtils.decode(start); - if (ByteString.unsignedLexicographicalComparator().compare(start, keyPrefix) < 0) { - start = ByteString.EMPTY; - } else { - start = decodeKey(start); - } } if (!end.isEmpty()) { end = CodecUtils.decode(end); - if (ByteString.unsignedLexicographicalComparator().compare(end, infiniteEndKey) >= 0) { - end = ByteString.EMPTY; - } else { - end = decodeKey(end); - } } + if (ByteString.unsignedLexicographicalComparator().compare(start, infiniteEndKey) >= 0 + || (!end.isEmpty() + && ByteString.unsignedLexicographicalComparator().compare(end, keyPrefix) <= 0)) { + throw new IllegalArgumentException("region out of keyspace" + region.toString()); + } + + start = start.startsWith(keyPrefix) ? start.substring(keyPrefix.size()) : ByteString.EMPTY; + end = end.startsWith(keyPrefix) ? end.substring(keyPrefix.size()) : ByteString.EMPTY; + return builder.setStartKey(start).setEndKey(end).build(); } } diff --git a/src/main/java/org/tikv/common/operation/RegionErrorHandler.java b/src/main/java/org/tikv/common/operation/RegionErrorHandler.java index df95471ffa7..debbccf7eee 100644 --- a/src/main/java/org/tikv/common/operation/RegionErrorHandler.java +++ b/src/main/java/org/tikv/common/operation/RegionErrorHandler.java @@ -229,7 +229,16 @@ private boolean onRegionEpochNotMatch(BackOffer backOffer, List c // If the region epoch is not ahead of TiKV's, replace region meta in region cache. for (Metapb.Region meta : currentRegions) { // The region needs to be decoded to plain format. - meta = regionManager.getPDClient().getCodec().decodeRegion(meta); + try { + meta = regionManager.getPDClient().getCodec().decodeRegion(meta); + } catch (Exception e) { + logger.warn("ignore invalid region: " + meta.toString()); + // if the region is invalid, ignore it since the following situation might appear. + // Assuming a region with range [r000, z), then it splits into: + // [r000, x) [x, z), the right region is invalid for keyspace `r000`. + // We should only care about the valid region. + continue; + } TiRegion region = regionManager.createRegion(meta, backOffer); newRegions.add(region); if (recv.getRegion().getVerID() == region.getVerID()) { diff --git a/src/test/java/org/tikv/common/apiversion/RequestKeyCodecTest.java b/src/test/java/org/tikv/common/apiversion/RequestKeyCodecTest.java index 871a20cdf20..ed97fcdb81b 100644 --- a/src/test/java/org/tikv/common/apiversion/RequestKeyCodecTest.java +++ b/src/test/java/org/tikv/common/apiversion/RequestKeyCodecTest.java @@ -17,8 +17,7 @@ package org.tikv.common.apiversion; -import static org.junit.Assert.assertArrayEquals; -import static org.junit.Assert.assertEquals; +import static org.junit.Assert.*; import com.google.common.collect.ImmutableList; import com.google.protobuf.ByteString; @@ -177,5 +176,85 @@ void testV2Codec(RequestKeyV2Codec v2) { decoded = v2.decodeRegion(region); assertEquals(start, decoded.getStartKey()); assertEquals(ByteString.EMPTY, decoded.getEndKey()); + + // test region out of keyspace + { + ByteString m_123 = CodecUtils.encode(ByteString.copyFromUtf8("m_123")); + ByteString m_124 = CodecUtils.encode(ByteString.copyFromUtf8("m_124")); + ByteString infiniteEndKey_0 = + CodecUtils.encode(v2.infiniteEndKey.concat(ByteString.copyFrom(new byte[] {0}))); + ByteString t_123 = CodecUtils.encode(ByteString.copyFromUtf8("t_123")); + ByteString y_123 = CodecUtils.encode(ByteString.copyFromUtf8("y_123")); + + ByteString[][] outOfKeyspaceCases = { + {ByteString.EMPTY, CodecUtils.encode(v2.keyPrefix)}, // ["", "r000"/"x000") + {ByteString.EMPTY, m_123}, + {m_123, m_124}, + {m_124, CodecUtils.encode(v2.keyPrefix)}, + {CodecUtils.encode(v2.infiniteEndKey), ByteString.EMPTY}, // ["r001"/"x001", "") + {CodecUtils.encode(v2.infiniteEndKey), infiniteEndKey_0}, + {infiniteEndKey_0, t_123}, + {y_123, ByteString.EMPTY}, // "y_123" is bigger than "infiniteEndKey" for both raw & txn. + }; + + for (ByteString[] testCase : outOfKeyspaceCases) { + region = Region.newBuilder().setStartKey(testCase[0]).setEndKey(testCase[1]).build(); + try { + decoded = v2.decodeRegion(region); + fail(String.format("[%s,%s): %s", testCase[0], testCase[1], decoded.toString())); + } catch (Exception ignored) { + } + } + } + + // case: regionStartKey == "" < keyPrefix < regionEndKey < infiniteEndKey + region = + Region.newBuilder() + .setStartKey(ByteString.EMPTY) + .setEndKey(CodecUtils.encode(v2.keyPrefix.concat(ByteString.copyFromUtf8("0")))) + .build(); + decoded = v2.decodeRegion(region); + assertTrue(decoded.getStartKey().isEmpty()); + assertEquals(ByteString.copyFromUtf8("0"), decoded.getEndKey()); + + // case: "" < regionStartKey < keyPrefix < regionEndKey < infiniteEndKey < "" + region = + Region.newBuilder() + .setStartKey(CodecUtils.encode(ByteString.copyFromUtf8("m_123"))) + .setEndKey(CodecUtils.encode(v2.keyPrefix.concat(ByteString.copyFromUtf8("0")))) + .build(); + decoded = v2.decodeRegion(region); + assertEquals(ByteString.EMPTY, decoded.getStartKey()); + assertEquals(ByteString.copyFromUtf8("0"), decoded.getEndKey()); + + // case: "" < regionStartKey < keyPrefix < infiniteEndKey < regionEndKey < "" + region = + Region.newBuilder() + .setStartKey(CodecUtils.encode(ByteString.copyFromUtf8("m_123"))) + .setEndKey(CodecUtils.encode(v2.infiniteEndKey.concat(ByteString.copyFromUtf8("0")))) + .build(); + decoded = v2.decodeRegion(region); + assertEquals(ByteString.EMPTY, decoded.getStartKey()); + assertEquals(ByteString.EMPTY, decoded.getEndKey()); + + // case: keyPrefix < regionStartKey < infiniteEndKey < regionEndKey < "" + region = + Region.newBuilder() + .setStartKey(CodecUtils.encode(v2.keyPrefix.concat(ByteString.copyFromUtf8("0")))) + .setEndKey(CodecUtils.encode(v2.infiniteEndKey.concat(ByteString.copyFromUtf8("0")))) + .build(); + decoded = v2.decodeRegion(region); + assertEquals(ByteString.copyFromUtf8("0"), decoded.getStartKey()); + assertTrue(decoded.getEndKey().isEmpty()); + + // case: keyPrefix < regionStartKey < infiniteEndKey < regionEndKey == "" + region = + Region.newBuilder() + .setStartKey(CodecUtils.encode(v2.keyPrefix.concat(ByteString.copyFromUtf8("0")))) + .setEndKey(ByteString.EMPTY) + .build(); + decoded = v2.decodeRegion(region); + assertEquals(ByteString.copyFromUtf8("0"), decoded.getStartKey()); + assertTrue(decoded.getEndKey().isEmpty()); } }