From 2713739b95a4607225a43ccde6f947c74bf8994a Mon Sep 17 00:00:00 2001 From: Matt Spilchen Date: Fri, 24 Jan 2025 10:39:38 -0400 Subject: [PATCH 1/4] cli/pgx: Update pgx versions in go.mod This commit updates the versions of specific packages from the github.com/jackc repository in go.mod to address known security vulnerabilities. The updated versions include fixes for these vulnerabilities. Notably, the signature of the Encode() function in the pgx library has changed. It now returns both the encoded message and an error. Most of the changes in this commit are updates to handle the new function signature appropriately. Epic: none Release note: none Informs #137595 --- DEPS.bzl | 30 ++++++------- build/bazelutil/distdir_files.bzl | 10 ++--- go.mod | 10 ++--- go.sum | 17 +++++--- pkg/ccl/sqlproxyccl/authentication_test.go | 24 ++++++++--- pkg/ccl/sqlproxyccl/backend_dialer.go | 6 ++- pkg/ccl/sqlproxyccl/conn_migration.go | 30 ++++++++----- pkg/ccl/sqlproxyccl/conn_migration_test.go | 13 ++++-- pkg/ccl/sqlproxyccl/forwarder_test.go | 43 +++++++++++++------ pkg/ccl/sqlproxyccl/frontend_admitter_test.go | 16 +++++-- .../interceptor/backend_conn_test.go | 3 +- pkg/ccl/sqlproxyccl/interceptor/base_test.go | 6 ++- .../interceptor/frontend_conn_test.go | 3 +- .../interceptor/interceptor_test.go | 16 +++++-- .../sqlproxyccl/interceptor/pg_conn_test.go | 9 ++-- pkg/ccl/sqlproxyccl/proxy.go | 6 ++- pkg/ccl/sqlproxyccl/proxy_handler_test.go | 3 +- pkg/ccl/sqlproxyccl/query_cancel.go | 5 ++- 18 files changed, 167 insertions(+), 83 deletions(-) diff --git a/DEPS.bzl b/DEPS.bzl index 1d913d4db17e..7a9792c4dc75 100644 --- a/DEPS.bzl +++ b/DEPS.bzl @@ -5155,10 +5155,10 @@ def go_deps(): name = "com_github_jackc_pgconn", build_file_proto_mode = "disable_global", importpath = "github.com/jackc/pgconn", - sha256 = "d7935c04e5ee85f2dcab090692d2c24008071865a3af3581580bcc331a8f41d5", - strip_prefix = "github.com/jackc/pgconn@v1.14.0", + sha256 = "164dbb661090368062498701530fcb1f62d6acc06558859646b62d97128ac06f", + strip_prefix = "github.com/jackc/pgconn@v1.14.3", urls = [ - "https://storage.googleapis.com/cockroach-godeps/gomod/github.com/jackc/pgconn/com_github_jackc_pgconn-v1.14.0.zip", + "https://storage.googleapis.com/cockroach-godeps/gomod/github.com/jackc/pgconn/com_github_jackc_pgconn-v1.14.3.zip", ], ) go_repository( @@ -5205,10 +5205,10 @@ def go_deps(): name = "com_github_jackc_pgproto3_v2", build_file_proto_mode = "disable_global", importpath = "github.com/jackc/pgproto3/v2", - sha256 = "e6bafa5c3522534557a818f56939dde7b496c6669a5db9a74a0c91ab5290612c", - strip_prefix = "github.com/jackc/pgproto3/v2@v2.3.2", + sha256 = "53ea236cbfe241693b439092e2d51b404c2a635ee3fe64ea7aad1527cb715189", + strip_prefix = "github.com/jackc/pgproto3/v2@v2.3.3", urls = [ - "https://storage.googleapis.com/cockroach-godeps/gomod/github.com/jackc/pgproto3/v2/com_github_jackc_pgproto3_v2-v2.3.2.zip", + "https://storage.googleapis.com/cockroach-godeps/gomod/github.com/jackc/pgproto3/v2/com_github_jackc_pgproto3_v2-v2.3.3.zip", ], ) go_repository( @@ -5225,10 +5225,10 @@ def go_deps(): name = "com_github_jackc_pgtype", build_file_proto_mode = "disable_global", importpath = "github.com/jackc/pgtype", - sha256 = "7e67f5b944cbd401025c6473e624ef61ed196e3ed0f77ed437b0621cf820c0c9", - strip_prefix = "github.com/jackc/pgtype@v1.14.0", + sha256 = "3acb69a66e7e432c010d503425810620d04c304166c45083fa8a96feca13054d", + strip_prefix = "github.com/jackc/pgtype@v1.14.1", urls = [ - "https://storage.googleapis.com/cockroach-godeps/gomod/github.com/jackc/pgtype/com_github_jackc_pgtype-v1.14.0.zip", + "https://storage.googleapis.com/cockroach-godeps/gomod/github.com/jackc/pgtype/com_github_jackc_pgtype-v1.14.1.zip", ], ) go_repository( @@ -5245,10 +5245,10 @@ def go_deps(): name = "com_github_jackc_pgx_v5", build_file_proto_mode = "disable_global", importpath = "github.com/jackc/pgx/v5", - sha256 = "198eebac7e875b51616090fe2e3694e6f27e335afcc3c47ae9991497ade7f52a", - strip_prefix = "github.com/jackc/pgx/v5@v5.4.2", + sha256 = "003bc28d696865bd7cb2819d3902c0415bee4d276b129f92a8438330e6619f4d", + strip_prefix = "github.com/jackc/pgx/v5@v5.5.4", urls = [ - "https://storage.googleapis.com/cockroach-godeps/gomod/github.com/jackc/pgx/v5/com_github_jackc_pgx_v5-v5.4.2.zip", + "https://storage.googleapis.com/cockroach-godeps/gomod/github.com/jackc/pgx/v5/com_github_jackc_pgx_v5-v5.5.4.zip", ], ) go_repository( @@ -5265,10 +5265,10 @@ def go_deps(): name = "com_github_jackc_puddle_v2", build_file_proto_mode = "disable_global", importpath = "github.com/jackc/puddle/v2", - sha256 = "b99ea95df0c0298caf2be786c9eba511bfde2046eccfaa06e89b3e460ab406b0", - strip_prefix = "github.com/jackc/puddle/v2@v2.2.0", + sha256 = "6698895617fabb929fa1ac868ad5253e02a997888bf5c6004379c5b29eedee58", + strip_prefix = "github.com/jackc/puddle/v2@v2.2.1", urls = [ - "https://storage.googleapis.com/cockroach-godeps/gomod/github.com/jackc/puddle/v2/com_github_jackc_puddle_v2-v2.2.0.zip", + "https://storage.googleapis.com/cockroach-godeps/gomod/github.com/jackc/puddle/v2/com_github_jackc_puddle_v2-v2.2.1.zip", ], ) go_repository( diff --git a/build/bazelutil/distdir_files.bzl b/build/bazelutil/distdir_files.bzl index 5f1adc38f236..a27672c25d42 100644 --- a/build/bazelutil/distdir_files.bzl +++ b/build/bazelutil/distdir_files.bzl @@ -675,18 +675,18 @@ DISTDIR_FILES = { "https://storage.googleapis.com/cockroach-godeps/gomod/github.com/j-keck/arping/com_github_j_keck_arping-v0.0.0-20160618110441-2cf9dc699c56.zip": "6001c94a8c4eed55718f627346cb685cce67369ca5c29ae059f58f7abd8bd8a7", "https://storage.googleapis.com/cockroach-godeps/gomod/github.com/jackc/chunkreader/com_github_jackc_chunkreader-v1.0.0.zip": "e204c917e2652ffe047f5c8b031192757321f568654e3df8408bf04178df1408", "https://storage.googleapis.com/cockroach-godeps/gomod/github.com/jackc/chunkreader/v2/com_github_jackc_chunkreader_v2-v2.0.1.zip": "6e3f4b7d9647f31061f6446ae10de71fc1407e64f84cd0949afac0cd231e8dd2", - "https://storage.googleapis.com/cockroach-godeps/gomod/github.com/jackc/pgconn/com_github_jackc_pgconn-v1.14.0.zip": "d7935c04e5ee85f2dcab090692d2c24008071865a3af3581580bcc331a8f41d5", + "https://storage.googleapis.com/cockroach-godeps/gomod/github.com/jackc/pgconn/com_github_jackc_pgconn-v1.14.3.zip": "164dbb661090368062498701530fcb1f62d6acc06558859646b62d97128ac06f", "https://storage.googleapis.com/cockroach-godeps/gomod/github.com/jackc/pgio/com_github_jackc_pgio-v1.0.0.zip": "1a83c03d53f6a40339364cafcbbabb44238203c79ca0c9b98bf582d0df0e0468", "https://storage.googleapis.com/cockroach-godeps/gomod/github.com/jackc/pgmock/com_github_jackc_pgmock-v0.0.0-20210724152146-4ad1a8207f65.zip": "0fffd0a7a67dbdfafa04297e51028c6d2d08cd6691f3b6d78d7ae6502d3d4cf2", "https://storage.googleapis.com/cockroach-godeps/gomod/github.com/jackc/pgpassfile/com_github_jackc_pgpassfile-v1.0.0.zip": "1cc79fb0b80f54b568afd3f4648dd1c349f746ad7c379df8d7f9e0eb1cac938b", "https://storage.googleapis.com/cockroach-godeps/gomod/github.com/jackc/pgproto3/com_github_jackc_pgproto3-v1.1.0.zip": "e3766bee50ed74e49a067b2c4797a2c69015cf104bf3f3624cd483a9e940b4ee", - "https://storage.googleapis.com/cockroach-godeps/gomod/github.com/jackc/pgproto3/v2/com_github_jackc_pgproto3_v2-v2.3.2.zip": "e6bafa5c3522534557a818f56939dde7b496c6669a5db9a74a0c91ab5290612c", + "https://storage.googleapis.com/cockroach-godeps/gomod/github.com/jackc/pgproto3/v2/com_github_jackc_pgproto3_v2-v2.3.3.zip": "53ea236cbfe241693b439092e2d51b404c2a635ee3fe64ea7aad1527cb715189", "https://storage.googleapis.com/cockroach-godeps/gomod/github.com/jackc/pgservicefile/com_github_jackc_pgservicefile-v0.0.0-20221227161230-091c0ba34f0a.zip": "1f8bdf75b2a0d750e56c2a94b1d1b0b5be4b29d6df056aebd997162c29bfd8ab", - "https://storage.googleapis.com/cockroach-godeps/gomod/github.com/jackc/pgtype/com_github_jackc_pgtype-v1.14.0.zip": "7e67f5b944cbd401025c6473e624ef61ed196e3ed0f77ed437b0621cf820c0c9", + "https://storage.googleapis.com/cockroach-godeps/gomod/github.com/jackc/pgtype/com_github_jackc_pgtype-v1.14.1.zip": "3acb69a66e7e432c010d503425810620d04c304166c45083fa8a96feca13054d", "https://storage.googleapis.com/cockroach-godeps/gomod/github.com/jackc/pgx/v4/com_github_jackc_pgx_v4-v4.18.1.zip": "5ca92c5bf58979d9e978f6b849e02eb319d2587565375fe875a29d10d84cfadc", - "https://storage.googleapis.com/cockroach-godeps/gomod/github.com/jackc/pgx/v5/com_github_jackc_pgx_v5-v5.4.2.zip": "198eebac7e875b51616090fe2e3694e6f27e335afcc3c47ae9991497ade7f52a", + "https://storage.googleapis.com/cockroach-godeps/gomod/github.com/jackc/pgx/v5/com_github_jackc_pgx_v5-v5.5.4.zip": "003bc28d696865bd7cb2819d3902c0415bee4d276b129f92a8438330e6619f4d", "https://storage.googleapis.com/cockroach-godeps/gomod/github.com/jackc/puddle/com_github_jackc_puddle-v1.3.0.zip": "b1eb42bb3cf9a430146af79cb183860b9dddfca51844c2d4b447dc2f43becc55", - "https://storage.googleapis.com/cockroach-godeps/gomod/github.com/jackc/puddle/v2/com_github_jackc_puddle_v2-v2.2.0.zip": "b99ea95df0c0298caf2be786c9eba511bfde2046eccfaa06e89b3e460ab406b0", + "https://storage.googleapis.com/cockroach-godeps/gomod/github.com/jackc/puddle/v2/com_github_jackc_puddle_v2-v2.2.1.zip": "6698895617fabb929fa1ac868ad5253e02a997888bf5c6004379c5b29eedee58", "https://storage.googleapis.com/cockroach-godeps/gomod/github.com/jaegertracing/jaeger/com_github_jaegertracing_jaeger-v1.18.1.zip": "256a95b2a52a66494aca6d354224bb450ff38ce3ea1890af46a7c8dc39203891", "https://storage.googleapis.com/cockroach-godeps/gomod/github.com/jawher/mow.cli/com_github_jawher_mow_cli-v1.2.0.zip": "4f8d43c8f2aa44524480ab57d8fbb63a607569ea11ff6a2eea7b46622104f717", "https://storage.googleapis.com/cockroach-godeps/gomod/github.com/jcmturner/aescts/v2/com_github_jcmturner_aescts_v2-v2.0.0.zip": "717a211ad4aac248cf33cadde73059c13f8e9462123a0ab2fed5c5e61f7739d7", diff --git a/go.mod b/go.mod index a2dd881e7e99..fee1f29459b8 100644 --- a/go.mod +++ b/go.mod @@ -78,12 +78,12 @@ require ( // the SQL team. require ( github.com/jackc/chunkreader/v2 v2.0.1 // indirect - github.com/jackc/pgconn v1.14.0 + github.com/jackc/pgconn v1.14.3 github.com/jackc/pgio v1.0.0 // indirect github.com/jackc/pgpassfile v1.0.0 // indirect - github.com/jackc/pgproto3/v2 v2.3.2 + github.com/jackc/pgproto3/v2 v2.3.3 github.com/jackc/pgservicefile v0.0.0-20221227161230-091c0ba34f0a // indirect - github.com/jackc/pgtype v1.14.0 + github.com/jackc/pgtype v1.14.1 github.com/jackc/pgx/v4 v4.18.1 ) @@ -179,7 +179,7 @@ require ( github.com/guptarohit/asciigraph v0.7.3 github.com/influxdata/influxdb-client-go/v2 v2.3.1-0.20210518120617-5d1fff431040 github.com/irfansharif/recorder v0.0.0-20211218081646-a21b46510fd6 - github.com/jackc/pgx/v5 v5.4.2 + github.com/jackc/pgx/v5 v5.5.4 github.com/jaegertracing/jaeger v1.18.1 github.com/jordan-wright/email v4.0.1-0.20210109023952-943e75fe5223+incompatible github.com/jordanlewis/gcassert v0.0.0-20240401195008-3141cbd028c0 @@ -365,7 +365,7 @@ require ( github.com/imdario/mergo v0.3.13 // indirect github.com/inconshreveable/mousetrap v1.0.1 // indirect github.com/influxdata/line-protocol v0.0.0-20200327222509-2487e7298839 // indirect - github.com/jackc/puddle/v2 v2.2.0 // indirect + github.com/jackc/puddle/v2 v2.2.1 // indirect github.com/jcmturner/aescts/v2 v2.0.0 // indirect github.com/jcmturner/dnsutils/v2 v2.0.0 // indirect github.com/jcmturner/gofork v1.7.6 // indirect diff --git a/go.sum b/go.sum index bf373fe15119..a84e549ec2f2 100644 --- a/go.sum +++ b/go.sum @@ -1456,8 +1456,9 @@ github.com/jackc/pgconn v0.0.0-20190831204454-2fabfa3c18b7/go.mod h1:ZJKsE/KZfsU github.com/jackc/pgconn v1.8.0/go.mod h1:1C2Pb36bGIP9QHGBYCjnyhqu7Rv3sGshaQUvmfGIB/o= github.com/jackc/pgconn v1.9.0/go.mod h1:YctiPyvzfU11JFxoXokUOOKQXQmDMoJL9vJzHH8/2JY= github.com/jackc/pgconn v1.9.1-0.20210724152538-d89c8390a530/go.mod h1:4z2w8XhRbP1hYxkpTuBjTS3ne3J48K83+u0zoyvg2pI= -github.com/jackc/pgconn v1.14.0 h1:vrbA9Ud87g6JdFWkHTJXppVce58qPIdP7N8y0Ml/A7Q= github.com/jackc/pgconn v1.14.0/go.mod h1:9mBNlny0UvkgJdCDvdVHYSjI+8tD2rnKK69Wz8ti++E= +github.com/jackc/pgconn v1.14.3 h1:bVoTr12EGANZz66nZPkMInAV/KHD2TxH9npjXXgiB3w= +github.com/jackc/pgconn v1.14.3/go.mod h1:RZbme4uasqzybK2RK5c65VsHxoyaml09lx3tXOcO/VM= github.com/jackc/pgio v1.0.0 h1:g12B9UwVnzGhueNavwioyEEpAmqMe1E/BN9ES+8ovkE= github.com/jackc/pgio v1.0.0/go.mod h1:oP+2QK2wFfUWgr+gxjoBH9KGBb31Eio69xUb0w5bYf8= github.com/jackc/pgmock v0.0.0-20190831213851-13a1b77aafa2/go.mod h1:fGZlG77KXmcq05nJLRkk0+p82V8B8Dw8KN2/V9c/OAE= @@ -1473,8 +1474,9 @@ github.com/jackc/pgproto3/v2 v2.0.0-rc3/go.mod h1:ryONWYqW6dqSg1Lw6vXNMXoBJhpzvW github.com/jackc/pgproto3/v2 v2.0.0-rc3.0.20190831210041-4c03ce451f29/go.mod h1:ryONWYqW6dqSg1Lw6vXNMXoBJhpzvWKnT95C46ckYeM= github.com/jackc/pgproto3/v2 v2.0.6/go.mod h1:WfJCnwN3HIg9Ish/j3sgWXnAfK8A9Y0bwXYU5xKaEdA= github.com/jackc/pgproto3/v2 v2.1.1/go.mod h1:WfJCnwN3HIg9Ish/j3sgWXnAfK8A9Y0bwXYU5xKaEdA= -github.com/jackc/pgproto3/v2 v2.3.2 h1:7eY55bdBeCz1F2fTzSz69QC+pG46jYq9/jtSPiJ5nn0= github.com/jackc/pgproto3/v2 v2.3.2/go.mod h1:WfJCnwN3HIg9Ish/j3sgWXnAfK8A9Y0bwXYU5xKaEdA= +github.com/jackc/pgproto3/v2 v2.3.3 h1:1HLSx5H+tXR9pW3in3zaztoEwQYRC9SQaYUHjTSUOag= +github.com/jackc/pgproto3/v2 v2.3.3/go.mod h1:WfJCnwN3HIg9Ish/j3sgWXnAfK8A9Y0bwXYU5xKaEdA= github.com/jackc/pgservicefile v0.0.0-20200714003250-2b9c44734f2b/go.mod h1:vsD4gTJCa9TptPL8sPkXrLZ+hDuNrZCnj29CQpr4X1E= github.com/jackc/pgservicefile v0.0.0-20221227161230-091c0ba34f0a h1:bbPeKD0xmW/Y25WS6cokEszi5g+S0QxI/d45PkRi7Nk= github.com/jackc/pgservicefile v0.0.0-20221227161230-091c0ba34f0a/go.mod h1:5TJZWKEWniPve33vlWYSoGYefn3gLQRzjfDlhSJ9ZKM= @@ -1482,23 +1484,24 @@ github.com/jackc/pgtype v0.0.0-20190421001408-4ed0de4755e0/go.mod h1:hdSHsc1V01C github.com/jackc/pgtype v0.0.0-20190824184912-ab885b375b90/go.mod h1:KcahbBH1nCMSo2DXpzsoWOAfFkdEtEJpPbVLq8eE+mc= github.com/jackc/pgtype v0.0.0-20190828014616-a8802b16cc59/go.mod h1:MWlu30kVJrUS8lot6TQqcg7mtthZ9T0EoIBFiJcmcyw= github.com/jackc/pgtype v1.8.1-0.20210724151600-32e20a603178/go.mod h1:C516IlIV9NKqfsMCXTdChteoXmwgUceqaLfjg2e3NlM= -github.com/jackc/pgtype v1.14.0 h1:y+xUdabmyMkJLyApYuPj38mW+aAIqCe5uuBB51rH3Vw= github.com/jackc/pgtype v1.14.0/go.mod h1:LUMuVrfsFfdKGLw+AFFVv6KtHOFMwRgDDzBt76IqCA4= +github.com/jackc/pgtype v1.14.1 h1:LyDar7M2K0tShCWqzJ/ctzF1QC3Wzc9c8a6cHE0PFdc= +github.com/jackc/pgtype v1.14.1/go.mod h1:LUMuVrfsFfdKGLw+AFFVv6KtHOFMwRgDDzBt76IqCA4= github.com/jackc/pgx/v4 v4.0.0-20190420224344-cc3461e65d96/go.mod h1:mdxmSJJuR08CZQyj1PVQBHy9XOp5p8/SHH6a0psbY9Y= github.com/jackc/pgx/v4 v4.0.0-20190421002000-1b8f0016e912/go.mod h1:no/Y67Jkk/9WuGR0JG/JseM9irFbnEPbuWV2EELPNuM= github.com/jackc/pgx/v4 v4.0.0-pre1.0.20190824185557-6972a5742186/go.mod h1:X+GQnOEnf1dqHGpw7JmHqHc1NxDoalibchSk9/RWuDc= github.com/jackc/pgx/v4 v4.12.1-0.20210724153913-640aa07df17c/go.mod h1:1QD0+tgSXP7iUjYm9C1NxKhny7lq6ee99u/z+IHFcgs= github.com/jackc/pgx/v4 v4.18.1 h1:YP7G1KABtKpB5IHrO9vYwSrCOhs7p3uqhvhhQBptya0= github.com/jackc/pgx/v4 v4.18.1/go.mod h1:FydWkUyadDmdNH/mHnGob881GawxeEm7TcMCzkb+qQE= -github.com/jackc/pgx/v5 v5.4.2 h1:u1gmGDwbdRUZiwisBm/Ky2M14uQyUP65bG8+20nnyrg= -github.com/jackc/pgx/v5 v5.4.2/go.mod h1:q6iHT8uDNXWiFNOlRqJzBTaSH3+2xCXkokxHZC5qWFY= +github.com/jackc/pgx/v5 v5.5.4 h1:Xp2aQS8uXButQdnCMWNmvx6UysWQQC+u1EoizjguY+8= +github.com/jackc/pgx/v5 v5.5.4/go.mod h1:ez9gk+OAat140fv9ErkZDYFWmXLfV+++K0uAOiwgm1A= github.com/jackc/puddle v0.0.0-20190413234325-e4ced69a3a2b/go.mod h1:m4B5Dj62Y0fbyuIc15OsIqK0+JU8nkqQjsgx7dvjSWk= github.com/jackc/puddle v0.0.0-20190608224051-11cab39313c9/go.mod h1:m4B5Dj62Y0fbyuIc15OsIqK0+JU8nkqQjsgx7dvjSWk= github.com/jackc/puddle v1.1.3/go.mod h1:m4B5Dj62Y0fbyuIc15OsIqK0+JU8nkqQjsgx7dvjSWk= github.com/jackc/puddle v1.3.0 h1:eHK/5clGOatcjX3oWGBO/MpxpbHzSwud5EWTSCI+MX0= github.com/jackc/puddle v1.3.0/go.mod h1:m4B5Dj62Y0fbyuIc15OsIqK0+JU8nkqQjsgx7dvjSWk= -github.com/jackc/puddle/v2 v2.2.0 h1:RdcDk92EJBuBS55nQMMYFXTxwstHug4jkhT5pq8VxPk= -github.com/jackc/puddle/v2 v2.2.0/go.mod h1:vriiEXHvEE654aYKXXjOvZM39qJ0q+azkZFrfEOc3H4= +github.com/jackc/puddle/v2 v2.2.1 h1:RhxXJtFG022u4ibrCSMSiu5aOq1i77R3OHKNJj77OAk= +github.com/jackc/puddle/v2 v2.2.1/go.mod h1:vriiEXHvEE654aYKXXjOvZM39qJ0q+azkZFrfEOc3H4= github.com/jaegertracing/jaeger v1.18.1 h1:eFqjEpTKq2FfiZ/YX53oxeCePdIZyWvDfXaTAGj0r5E= github.com/jaegertracing/jaeger v1.18.1/go.mod h1:WRzMFH62rje1VgbShlgk6UbWUNoo08uFFvs/x50aZKk= github.com/jawher/mow.cli v1.0.4/go.mod h1:5hQj2V8g+qYmLUVWqu4Wuja1pI57M83EChYLVZ0sMKk= diff --git a/pkg/ccl/sqlproxyccl/authentication_test.go b/pkg/ccl/sqlproxyccl/authentication_test.go index 1676b272cc45..0c1e7296024e 100644 --- a/pkg/ccl/sqlproxyccl/authentication_test.go +++ b/pkg/ccl/sqlproxyccl/authentication_test.go @@ -267,7 +267,9 @@ func TestReadTokenAuthResult(t *testing.T) { cli, srv := net.Pipe() go func() { - _, err := srv.Write((&pgproto3.BindComplete{}).Encode(nil)) + buf, err := (&pgproto3.BindComplete{}).Encode(nil) + require.NoError(t, err) + _, err = srv.Write(buf) require.NoError(t, err) }() @@ -280,7 +282,9 @@ func TestReadTokenAuthResult(t *testing.T) { cli, srv := net.Pipe() go func() { - _, err := srv.Write((&pgproto3.ErrorResponse{Severity: "FATAL", Code: "foo"}).Encode(nil)) + buf, err := (&pgproto3.ErrorResponse{Severity: "FATAL", Code: "foo"}).Encode(nil) + require.NoError(t, err) + _, err = srv.Write(buf) require.NoError(t, err) }() @@ -294,16 +298,24 @@ func TestReadTokenAuthResult(t *testing.T) { crdbBackendKeyData := &pgproto3.BackendKeyData{ProcessID: 42, SecretKey: 99} go func() { - _, err := srv.Write((&pgproto3.AuthenticationOk{}).Encode(nil)) + buf, err := (&pgproto3.AuthenticationOk{}).Encode(nil) + require.NoError(t, err) + _, err = srv.Write(buf) require.NoError(t, err) - _, err = srv.Write((&pgproto3.ParameterStatus{Name: "Server Version", Value: "1.3"}).Encode(nil)) + buf, err = (&pgproto3.ParameterStatus{Name: "Server Version", Value: "1.3"}).Encode(nil) + require.NoError(t, err) + _, err = srv.Write(buf) require.NoError(t, err) - _, err = srv.Write(crdbBackendKeyData.Encode(nil)) + buf, err = crdbBackendKeyData.Encode(nil) + require.NoError(t, err) + _, err = srv.Write(buf) require.NoError(t, err) - _, err = srv.Write((&pgproto3.ReadyForQuery{}).Encode(nil)) + buf, err = (&pgproto3.ReadyForQuery{}).Encode(nil) + require.NoError(t, err) + _, err = srv.Write(buf) require.NoError(t, err) }() diff --git a/pkg/ccl/sqlproxyccl/backend_dialer.go b/pkg/ccl/sqlproxyccl/backend_dialer.go index 15a2fb6c7700..3ab1747dad8e 100644 --- a/pkg/ccl/sqlproxyccl/backend_dialer.go +++ b/pkg/ccl/sqlproxyccl/backend_dialer.go @@ -56,7 +56,11 @@ var BackendDial = func( } // Forward startup message to the backend connection. - if _, err := conn.Write(msg.Encode(nil)); err != nil { + buf, err := msg.Encode(nil) + if err != nil { + return errors.Wrapf(err, "encoding StartingMessage for target server %v", serverAddress) + } + if _, err := conn.Write(buf); err != nil { return errors.Wrapf(err, "relaying StartupMessage to target server %v", serverAddress) } return nil diff --git a/pkg/ccl/sqlproxyccl/conn_migration.go b/pkg/ccl/sqlproxyccl/conn_migration.go index 1bee0199919c..7b55f7f3a3f9 100644 --- a/pkg/ccl/sqlproxyccl/conn_migration.go +++ b/pkg/ccl/sqlproxyccl/conn_migration.go @@ -400,10 +400,10 @@ var waitForShowTransferState = func( } // 2. Read DataRow. - if err := expectDataRow(ctx, serverConn, func(msg *pgproto3.DataRow, size int) bool { + if err := expectDataRow(ctx, serverConn, func(msg *pgproto3.DataRow, size int) (bool, error) { // This has to be 4 since we validated RowDescription earlier. if len(msg.Values) != 4 { - return false + return false, nil } // Validate transfer key. It is possible that the end-user uses the SHOW @@ -411,7 +411,7 @@ var waitForShowTransferState = func( // for external usage, so it is fine to just terminate here if the // transfer key does not match. if string(msg.Values[3]) != transferKey { - return false + return false, nil } // NOTE: We have to cast to string and copy here since the slice @@ -423,7 +423,7 @@ var waitForShowTransferState = func( if metrics != nil { metrics.ConnMigrationTransferResponseMessageSize.RecordValue(int64(size)) } - return true + return true, nil }); err != nil { return "", "", "", errors.Wrap(err, "expecting DataRow") } @@ -490,8 +490,8 @@ var runAndWaitForDeserializeSession = func( } // 2. Read DataRow. - if err := expectDataRow(ctx, serverConn, func(msg *pgproto3.DataRow, _ int) bool { - return len(msg.Values) == 1 && string(msg.Values[0]) == "t" + if err := expectDataRow(ctx, serverConn, func(msg *pgproto3.DataRow, _ int) (bool, error) { + return len(msg.Values) == 1 && string(msg.Values[0]) == "t", nil }); err != nil { return errors.Wrap(err, "expecting DataRow") } @@ -512,7 +512,11 @@ var runAndWaitForDeserializeSession = func( // writeQuery writes a SimpleQuery to the given writer w. func writeQuery(w io.Writer, format string, a ...interface{}) error { query := &pgproto3.Query{String: fmt.Sprintf(format, a...)} - _, err := w.Write(query.Encode(nil)) + buf, err := query.Encode(nil) + if err != nil { + return errors.Wrap(err, "encoding SimpleQuery") + } + _, err = w.Write(buf) return err } @@ -587,7 +591,11 @@ func waitForSmallRowDescription( // Matching fails, so forward the message back to the client, and // continue searching. - if _, err := clientConn.Write(msg.Encode(nil)); err != nil { + buf, err := msg.Encode(nil) + if err != nil { + return errors.Wrap(err, "encoding message") + } + if _, err := clientConn.Write(buf); err != nil { return errors.Wrap(err, "writing message") } } @@ -607,7 +615,7 @@ func waitForSmallRowDescription( func expectDataRow( ctx context.Context, serverConn *interceptor.FrontendConn, - validateFn func(*pgproto3.DataRow, int) bool, + validateFn func(*pgproto3.DataRow, int) (bool, error), ) error { if ctx.Err() != nil { return ctx.Err() @@ -624,7 +632,9 @@ func expectDataRow( if !ok { return errors.Newf("unexpected message: %v", jsonOrRaw(msg)) } - if !validateFn(pgMsg, size) { + if valid, err := validateFn(pgMsg, size); err != nil { + return errors.Wrap(err, "validation failure") + } else if !valid { return errors.Newf("validation failed for message: %v", jsonOrRaw(msg)) } return nil diff --git a/pkg/ccl/sqlproxyccl/conn_migration_test.go b/pkg/ccl/sqlproxyccl/conn_migration_test.go index f5dd23ec2de2..2314bff2a71e 100644 --- a/pkg/ccl/sqlproxyccl/conn_migration_test.go +++ b/pkg/ccl/sqlproxyccl/conn_migration_test.go @@ -1118,7 +1118,7 @@ func TestExpectDataRow(t *testing.T) { testutilsccl.ServerlessOnly(t) ctx := context.Background() - falseValidateFn := func(m *pgproto3.DataRow, s int) bool { return false } + falseValidateFn := func(m *pgproto3.DataRow, s int) (bool, error) { return false, nil } t.Run("context_cancelled", func(t *testing.T) { tCtx, cancel := context.WithCancel(ctx) @@ -1176,10 +1176,14 @@ func TestExpectDataRow(t *testing.T) { err := expectDataRow( ctx, interceptor.NewFrontendConn(r), - func(m *pgproto3.DataRow, size int) bool { + func(m *pgproto3.DataRow, size int) (bool, error) { + buf, err := msg.Encode(nil) + if err != nil { + return false, err + } return len(m.Values) == 1 && string(m.Values[0]) == "foo" && - len(msg.Encode(nil)) == size + len(buf) == size, nil }, ) require.Nil(t, err) @@ -1298,7 +1302,8 @@ func TestExpectReadyForQuery(t *testing.T) { } func writeServerMsg(w io.Writer, msg pgproto3.BackendMessage) { - _, _ = w.Write(msg.Encode(nil)) + buf, _ := msg.Encode(nil) + _, _ = w.Write(buf) } func expectMsg(t *testing.T, msgCh <-chan pgproto3.BackendMessage, match string) { diff --git a/pkg/ccl/sqlproxyccl/forwarder_test.go b/pkg/ccl/sqlproxyccl/forwarder_test.go index 12a47e8715bf..2ea6c603e02d 100644 --- a/pkg/ccl/sqlproxyccl/forwarder_test.go +++ b/pkg/ccl/sqlproxyccl/forwarder_test.go @@ -110,24 +110,33 @@ func TestForward(t *testing.T) { errChan := make(chan error, 1) go func() { <-sendEventCh - if _, err := client.Write((&pgproto3.Query{ + if buf, err := (&pgproto3.Query{ String: "SELECT 1", - }).Encode(nil)); err != nil { + }).Encode(nil); err != nil { + errChan <- err + return + } else if _, err := client.Write(buf); err != nil { errChan <- err return } <-sendEventCh - if _, err := client.Write((&pgproto3.Execute{ + if buf, err := (&pgproto3.Execute{ Portal: "foobar", MaxRows: 42, - }).Encode(nil)); err != nil { + }).Encode(nil); err != nil { + errChan <- err + return + } else if _, err := client.Write(buf); err != nil { errChan <- err return } <-sendEventCh - if _, err := client.Write((&pgproto3.Close{ + if buf, err := (&pgproto3.Close{ ObjectType: 'P', - }).Encode(nil)); err != nil { + }).Encode(nil); err != nil { + errChan <- err + return + } else if _, err := client.Write(buf); err != nil { errChan <- err return } @@ -255,17 +264,23 @@ func TestForward(t *testing.T) { errChan := make(chan error, 1) go func() { <-recvEventCh - if _, err := server.Write((&pgproto3.ErrorResponse{ + if buf, err := (&pgproto3.ErrorResponse{ Code: "100", Message: "foobarbaz", - }).Encode(nil)); err != nil { + }).Encode(nil); err != nil { + errChan <- err + return + } else if _, err := server.Write(buf); err != nil { errChan <- err return } <-recvEventCh - if _, err := server.Write((&pgproto3.ReadyForQuery{ + if buf, err := (&pgproto3.ReadyForQuery{ TxStatus: 'I', - }).Encode(nil)); err != nil { + }).Encode(nil); err != nil { + errChan <- err + return + } else if _, err := server.Write(buf); err != nil { errChan <- err return } @@ -425,7 +440,8 @@ func TestForwarder_replaceServerConn(t *testing.T) { require.NoError(t, f.resumeProcessors()) // Check that we can receive messages from newServer. - q := (&pgproto3.ReadyForQuery{TxStatus: 'I'}).Encode(nil) + q, err := (&pgproto3.ReadyForQuery{TxStatus: 'I'}).Encode(nil) + require.NoError(t, err) go func() { _, _ = newServer.Write(q) }() @@ -685,7 +701,8 @@ func TestSuspendResumeProcessor(t *testing.T) { // Client writes messages to be forwarded. buf := new(bytes.Buffer) - q := (&pgproto3.Query{String: "SELECT 1"}).Encode(nil) + q, err := (&pgproto3.Query{String: "SELECT 1"}).Encode(nil) + require.NoError(t, err) for i := 0; i < queryCount; i++ { // Alternate between SELECT 1 and 2 to ensure correctness. if i%2 == 0 { @@ -763,7 +780,7 @@ func TestSuspendResumeProcessor(t *testing.T) { // have been forwarded. go func(p *processor) { _ = p.resume(ctx) }(p) - err := p.waitResumed(ctx) + err = p.waitResumed(ctx) require.NoError(t, err) // Now read all the messages on the server for correctness. diff --git a/pkg/ccl/sqlproxyccl/frontend_admitter_test.go b/pkg/ccl/sqlproxyccl/frontend_admitter_test.go index b425ab599a98..89a6e223c897 100644 --- a/pkg/ccl/sqlproxyccl/frontend_admitter_test.go +++ b/pkg/ccl/sqlproxyccl/frontend_admitter_test.go @@ -136,7 +136,9 @@ func TestFrontendAdmitRequireEncryption(t *testing.T) { ProtocolVersion: pgproto3.ProtocolVersionNumber, Parameters: map[string]string{"key": "val"}, } - _, err := cli.Write(startup.Encode([]byte{})) + buf, err := startup.Encode([]byte{}) + require.NoError(t, err) + _, err = cli.Write(buf) require.NoError(t, err) }() @@ -166,7 +168,9 @@ func TestFrontendAdmitWithCancel(t *testing.T) { go func() { cancelRequest := pgproto3.CancelRequest{ProcessID: 1, SecretKey: 2} - _, err := cli.Write(cancelRequest.Encode([]byte{})) + buf, err := cancelRequest.Encode([]byte{}) + require.NoError(t, err) + _, err = cli.Write(buf) require.NoError(t, err) }() @@ -193,7 +197,9 @@ func TestFrontendAdmitWithSSLAndCancel(t *testing.T) { go func() { sslRequest := pgproto3.SSLRequest{} - _, err := cli.Write(sslRequest.Encode([]byte{})) + buf, err := sslRequest.Encode([]byte{}) + require.NoError(t, err) + _, err = cli.Write(buf) require.NoError(t, err) b := []byte{0} n, err := cli.Read(b) @@ -201,7 +207,9 @@ func TestFrontendAdmitWithSSLAndCancel(t *testing.T) { require.NoError(t, err) cli = tls.Client(cli, &tls.Config{InsecureSkipVerify: true}) cancelRequest := pgproto3.CancelRequest{ProcessID: 1, SecretKey: 2} - _, err = cli.Write(cancelRequest.Encode([]byte{})) + buf, err = cancelRequest.Encode([]byte{}) + require.NoError(t, err) + _, err = cli.Write(buf) require.NoError(t, err) }() diff --git a/pkg/ccl/sqlproxyccl/interceptor/backend_conn_test.go b/pkg/ccl/sqlproxyccl/interceptor/backend_conn_test.go index cc65da9c19ce..de4d2f45d013 100644 --- a/pkg/ccl/sqlproxyccl/interceptor/backend_conn_test.go +++ b/pkg/ccl/sqlproxyccl/interceptor/backend_conn_test.go @@ -24,7 +24,8 @@ func TestBackendConn(t *testing.T) { defer leaktest.AfterTest(t)() testutilsccl.ServerlessOnly(t) - q := (&pgproto3.Query{String: "SELECT 1"}).Encode(nil) + q, err := (&pgproto3.Query{String: "SELECT 1"}).Encode(nil) + require.NoError(t, err) t.Run("PeekMsg returns the right message type", func(t *testing.T) { w, r := net.Pipe() diff --git a/pkg/ccl/sqlproxyccl/interceptor/base_test.go b/pkg/ccl/sqlproxyccl/interceptor/base_test.go index 6b338a9312dc..657c07cdd7e6 100644 --- a/pkg/ccl/sqlproxyccl/interceptor/base_test.go +++ b/pkg/ccl/sqlproxyccl/interceptor/base_test.go @@ -495,13 +495,17 @@ func (rw *errReadWriter) Write(p []byte) (int, error) { // testSelect1Bytes represents the bytes for a SELECT 1 query. This will always // be 14 bytes (5 (header) + 8 (query) + 1 (null terminator)). -var testSelect1Bytes = (&pgproto3.Query{String: "SELECT 1"}).Encode(nil) +var testSelect1Bytes []byte // buildSrc generates a buffer with count test queries which alternates between // SELECT 1 and SELECT 2. func buildSrc(t *testing.T, count int) *bytes.Buffer { t.Helper() + var err error + testSelect1Bytes, err = (&pgproto3.Query{String: "SELECT 1"}).Encode(nil) + require.NoError(t, err) + // Reset bytes back to SELECT 1. defer func() { testSelect1Bytes[12] = '1' diff --git a/pkg/ccl/sqlproxyccl/interceptor/frontend_conn_test.go b/pkg/ccl/sqlproxyccl/interceptor/frontend_conn_test.go index 812615e0e8d7..896aa3eb6f34 100644 --- a/pkg/ccl/sqlproxyccl/interceptor/frontend_conn_test.go +++ b/pkg/ccl/sqlproxyccl/interceptor/frontend_conn_test.go @@ -24,7 +24,8 @@ func TestFrontendConn(t *testing.T) { defer leaktest.AfterTest(t)() testutilsccl.ServerlessOnly(t) - q := (&pgproto3.ReadyForQuery{TxStatus: 'I'}).Encode(nil) + q, err := (&pgproto3.ReadyForQuery{TxStatus: 'I'}).Encode(nil) + require.NoError(t, err) t.Run("PeekMsg returns the right message type", func(t *testing.T) { w, r := net.Pipe() diff --git a/pkg/ccl/sqlproxyccl/interceptor/interceptor_test.go b/pkg/ccl/sqlproxyccl/interceptor/interceptor_test.go index fb706bc04b9b..ac19debd9294 100644 --- a/pkg/ccl/sqlproxyccl/interceptor/interceptor_test.go +++ b/pkg/ccl/sqlproxyccl/interceptor/interceptor_test.go @@ -49,7 +49,9 @@ func TestSimpleProxy(t *testing.T) { errCh := make(chan error, len(queries)) go func() { for _, msg := range queries { - _, err := client.Write(msg.Encode(nil)) + buf, err := msg.Encode(nil) + require.NoError(t, err) + _, err = client.Write(buf) errCh <- err } }() @@ -79,7 +81,9 @@ func TestSimpleProxy(t *testing.T) { if typ == pgwirebase.ClientMsgTerminate { // Right before we terminate, we could also craft a custom // message, and send it to the server. - _, err := serverConn.Write(customQuery.Encode(nil)) + buf, err := customQuery.Encode(nil) + require.NoError(t, err) + _, err = serverConn.Write(buf) require.NoError(t, err) break } @@ -127,7 +131,9 @@ func TestSimpleProxy(t *testing.T) { errCh := make(chan error, len(queries)) go func() { for _, msg := range queries { - _, err := server.Write(msg.Encode(nil)) + buf, err := msg.Encode(nil) + require.NoError(t, err) + _, err = server.Write(buf) errCh <- err } }() @@ -177,7 +183,9 @@ func TestSimpleProxy(t *testing.T) { // the client. dmsg.SecretKey = 100 - _, err = clientConn.Write(dmsg.Encode(nil)) + buf, err := dmsg.Encode(nil) + require.NoError(t, err) + _, err = clientConn.Write(buf) require.NoError(t, err) default: // Forward message that we're not interested to the client. diff --git a/pkg/ccl/sqlproxyccl/interceptor/pg_conn_test.go b/pkg/ccl/sqlproxyccl/interceptor/pg_conn_test.go index 198b1c5618a7..f9445e0bcf20 100644 --- a/pkg/ccl/sqlproxyccl/interceptor/pg_conn_test.go +++ b/pkg/ccl/sqlproxyccl/interceptor/pg_conn_test.go @@ -23,7 +23,8 @@ func TestPGConn(t *testing.T) { defer leaktest.AfterTest(t)() testutilsccl.ServerlessOnly(t) - q := (&pgproto3.Query{String: "SELECT 1"}).Encode(nil) + q, err := (&pgproto3.Query{String: "SELECT 1"}).Encode(nil) + require.NoError(t, err) t.Run("net.Conn/Write", func(t *testing.T) { external, proxy := net.Pipe() @@ -63,7 +64,8 @@ func TestPGConn_ToFrontendConn(t *testing.T) { defer leaktest.AfterTest(t)() testutilsccl.ServerlessOnly(t) - q := (&pgproto3.ReadyForQuery{TxStatus: 'I'}).Encode(nil) + q, err := (&pgproto3.ReadyForQuery{TxStatus: 'I'}).Encode(nil) + require.NoError(t, err) external, proxy := net.Pipe() errCh := writeAsync(t, external, q) @@ -83,7 +85,8 @@ func TestPGConn_ToBackendConn(t *testing.T) { defer leaktest.AfterTest(t)() testutilsccl.ServerlessOnly(t) - q := (&pgproto3.Query{String: "SELECT 1"}).Encode(nil) + q, err := (&pgproto3.Query{String: "SELECT 1"}).Encode(nil) + require.NoError(t, err) external, proxy := net.Pipe() errCh := writeAsync(t, external, q) diff --git a/pkg/ccl/sqlproxyccl/proxy.go b/pkg/ccl/sqlproxyccl/proxy.go index 159c7235b25d..aea291c10061 100644 --- a/pkg/ccl/sqlproxyccl/proxy.go +++ b/pkg/ccl/sqlproxyccl/proxy.go @@ -67,5 +67,9 @@ var SendErrToClient = func(conn net.Conn, err error) { if err == nil || conn == nil { return } - _, _ = conn.Write(toPgError(err).Encode(nil)) + buf, err := toPgError(err).Encode(nil) + if err != nil { + return // void function - eat the error + } + _, _ = conn.Write(buf) } diff --git a/pkg/ccl/sqlproxyccl/proxy_handler_test.go b/pkg/ccl/sqlproxyccl/proxy_handler_test.go index 736fd0602286..b6ad0f617497 100644 --- a/pkg/ccl/sqlproxyccl/proxy_handler_test.go +++ b/pkg/ccl/sqlproxyccl/proxy_handler_test.go @@ -1810,7 +1810,8 @@ func TestCancelQuery(t *testing.T) { ProcessID: 1, SecretKey: conn.PgConn().SecretKey() + 1, } - buf := crdbRequest.Encode(nil /* buf */) + buf, err := crdbRequest.Encode(nil /* buf */) + require.NoError(t, err) proxyAddr := conn.PgConn().Conn().RemoteAddr() cancelConn, err := net.Dial(proxyAddr.Network(), proxyAddr.String()) require.NoError(t, err) diff --git a/pkg/ccl/sqlproxyccl/query_cancel.go b/pkg/ccl/sqlproxyccl/query_cancel.go index 45cf83b9d2ad..191660daf83f 100644 --- a/pkg/ccl/sqlproxyccl/query_cancel.go +++ b/pkg/ccl/sqlproxyccl/query_cancel.go @@ -116,7 +116,10 @@ func (c *cancelInfo) sendCancelToBackend(requestClientIP net.IP) error { ProcessID: origBackendKeyData.ProcessID, SecretKey: origBackendKeyData.SecretKey, } - buf := crdbRequest.Encode(nil /* buf */) + buf, err := crdbRequest.Encode(nil /* buf */) + if err != nil { + return err + } if _, err := cancelConn.Write(buf); err != nil { return err } From ed9f0e10e1b8cd78c98edff5f27736a78f5ab83d Mon Sep 17 00:00:00 2001 From: Faizan Qazi Date: Thu, 23 Jan 2025 11:00:52 -0500 Subject: [PATCH 2/4] catalog/lease: optimize WaitForInitialVersion for multiple objects MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Previously, the WaitForInitialVersion serially went over objects one at time, which was sub-optimal. This meant that a lot of round trips were required to wait when a large number of objects were generated within a single txn. To address this, this patch changes WaitForInitialVersion to iterate on a per-schema basis and to be able to combine waits for multiple descriptors. benchdiff with these changes: name old time/op new time/op delta GenerateObjects/10000_tables-10 5.83s ± 1% 0.73s ± 2% -87.51% (p=0.000 n=8+9) GenerateObjects/1000_tables-10 517ms ± 3% 76ms ± 3% -85.39% (p=0.000 n=9+10) GenerateObjects/10x100_tables-10 519ms ± 3% 89ms ± 1% -82.85% (p=0.000 n=10+10) GenerateObjects/10x10x10_tables-10 572ms ± 7% 170ms ± 3% -70.26% (p=0.000 n=9+9) name old alloc/op new alloc/op delta GenerateObjects/10000_tables-10 6.24GB ± 2% 1.24GB ±10% -80.20% (p=0.000 n=8+10) GenerateObjects/1000_tables-10 343MB ± 1% 116MB ± 0% -66.09% (p=0.000 n=9+10) GenerateObjects/10x100_tables-10 344MB ± 0% 130MB ± 1% -62.25% (p=0.000 n=8+10) GenerateObjects/10x10x10_tables-10 365MB ± 0% 175MB ± 1% -52.21% (p=0.000 n=8+10) name old allocs/op new allocs/op delta GenerateObjects/10000_tables-10 38.5M ± 4% 6.5M ±15% -83.04% (p=0.000 n=9+10) GenerateObjects/1000_tables-10 2.06M ± 0% 0.60M ± 0% -70.87% (p=0.000 n=8+10) Fixes: #139495 Release note: None --- pkg/sql/catalog/lease/lease.go | 287 +++++++++++++++++++-------------- pkg/sql/conn_executor_jobs.go | 17 +- 2 files changed, 171 insertions(+), 133 deletions(-) diff --git a/pkg/sql/catalog/lease/lease.go b/pkg/sql/catalog/lease/lease.go index 6bc5f017a858..2b7544622bc6 100644 --- a/pkg/sql/catalog/lease/lease.go +++ b/pkg/sql/catalog/lease/lease.go @@ -212,37 +212,42 @@ func (m *Manager) WaitForNoVersion( return nil } -// maybeGetDescriptorWithoutValidation gets a descriptor without validating +// maybeGetDescriptorsWithoutValidation gets a descriptor without validating // from the KV layer. -func (m *Manager) maybeGetDescriptorWithoutValidation( - ctx context.Context, id descpb.ID, -) (desc catalog.Descriptor, err error) { +func (m *Manager) maybeGetDescriptorsWithoutValidation( + ctx context.Context, ids descpb.IDs, existenceExpected bool, +) (descs catalog.Descriptors, err error) { err = m.storage.db.KV().Txn(ctx, func(ctx context.Context, txn *kv.Txn) (err error) { const isDescriptorRequired = false cr := m.storage.newCatalogReader(ctx) - c, err := cr.GetByIDs(ctx, txn, []descpb.ID{id}, isDescriptorRequired, catalog.Any) + c, err := cr.GetByIDs(ctx, txn, ids, isDescriptorRequired, catalog.Any) if err != nil { return err } - desc = c.LookupDescriptor(id) - if desc == nil { - // Descriptor was dropped on us, so return a structured error. If - // WaitForInitialVersion is the caller then this indicates the descriptor - // was dropped right after creation. - return errors.Wrapf(catalog.ErrDescriptorNotFound, "descriptor %d could not be fetched to count leases", id) + descs = make(catalog.Descriptors, 0, len(ids)) + for _, id := range ids { + desc := c.LookupDescriptor(id) + if desc == nil { + // Descriptor was dropped on us, so return a structured error. + if existenceExpected { + return errors.Wrapf(catalog.ErrDescriptorNotFound, "descriptor %d could not be fetched to count leases", id) + } + } else { + descs = append(descs, desc) + } } return nil }) - return desc, err + return descs, err } // countDescriptorsHeldBySessionIDs can be used to make sure certain nodes -// (sessions) observe the existence of a given descriptor. Assuming the given +// (sessions) observe the existence of a given set of descriptors. Assuming the given // sessions are still alive. func countDescriptorsHeldBySessionIDs( ctx context.Context, txn isql.Txn, - descID descpb.ID, + descIDs descpb.IDs, region string, sessionIDs []sqlliveness.SessionID, ) (int, error) { @@ -257,17 +262,24 @@ func countDescriptorsHeldBySessionIDs( } b.WriteString(fmt.Sprintf("x'%s'", sessionID.String())) } + d := strings.Builder{} + for _, descID := range descIDs { + if d.Len() > 0 { + d.WriteString(",") + } + d.WriteString(fmt.Sprintf("%d", descID)) + } // Query the count from the region. row, err := txn.QueryRow(ctx, "wait-for-new-descriptor", txn.KV(), fmt.Sprintf( ` SELECT count(*) FROM system.lease - WHERE desc_id = %d + WHERE desc_id IN (%s) AND session_id IN (%s) AND crdb_internal.sql_liveness_is_alive(session_id) %s;`, - descID, + d.String(), b.String(), regionClause, ), @@ -310,7 +322,7 @@ SELECT DISTINCT session_id FROM system.lease WHERE desc_id=%d AND crdb_internal. // (i.e. the optimizer memo will use the generation value as short circuit). func (m *Manager) WaitForInitialVersion( ctx context.Context, - id descpb.ID, + descriptorsIds descpb.IDs, retryOpts retry.Options, regions regionliveness.CachedDatabaseRegions, ) error { @@ -322,13 +334,15 @@ func (m *Manager) WaitForInitialVersion( defer wsTracker.end() decrAfterWait := m.IncGaugeAfterLeaseDuration(GaugeWaitForInitialVersion) defer decrAfterWait() + // Track the set of descriptors, this will have descriptors removed on + // partial fulfillment (i.e. some subset having the initial version set). + var ids catalog.DescriptorIDSet + for _, id := range descriptorsIds { + ids.Add(id) + } for lastCount, r := 0, retry.Start(retryOpts); r.Next(); { - desc, err := m.maybeGetDescriptorWithoutValidation(ctx, id) + descs, err := m.maybeGetDescriptorsWithoutValidation(ctx, ids.Ordered(), false /* existenceExpected */) if err != nil { - if errors.Is(err, catalog.ErrDescriptorNotFound) { - log.Infof(ctx, "descriptor %d was dropped before initial leases was acquired", id) - return nil - } return err } // If the object no longer exists or isn't fully created then skip this @@ -339,134 +353,157 @@ func (m *Manager) WaitForInitialVersion( // We don't need to worry about functions because their signature is stored // inside the schema descriptor, which will cause an implicit bump invalidating // cached metadata. - if desc == nil || - (desc.DescriptorType() != catalog.Table && desc.DescriptorType() != catalog.Type) || - desc.Dropped() || - desc.Adding() { + descsToProcess := make([]catalog.Descriptor, 0, len(descs)) + idsPerSchema := make(map[descpb.ID]descpb.IDs) + + for _, desc := range descs { + if (desc.DescriptorType() != catalog.Table && desc.DescriptorType() != catalog.Type) || + desc.Dropped() || + desc.Adding() { + continue + } + descsToProcess = append(descsToProcess, desc) + idsPerSchema[desc.GetParentSchemaID()] = append(idsPerSchema[desc.GetParentSchemaID()], desc.GetID()) + } + // No schemas to wait for. + if len(idsPerSchema) == 0 { return nil } - // Check to see if there are any leases that still exist on the previous - // version of the descriptor. - now := m.storage.clock.Now() - var count int - db := m.storage.db - // Get a list of sessions that had the schema leased out when this descriptor - // was created / modified. - var sessionsPerRegion map[string][]sqlliveness.SessionID - expectedSessions := 0 - if err := db.Txn(ctx, func(ctx context.Context, txn isql.Txn) error { - txn.KV().SetDebugName("wait-for-initial-lease-count-schema-leases") - // Look at what was leasing the schema at them modification time, we expect - // this be before the GC TTL because: - // 1) The defaults settings on the system database are not aggressive - // 2) We expect this to be a short wait in the recent past. - // If for some reason this timestamp is outside the GC TTL the transaction - // will get an error, which is a potential caveat here. - if err := txn.KV().SetFixedTimestamp(ctx, desc.GetModificationTime()); err != nil { - return err - } - expectedSessions = 0 - sessionsPerRegion = make(map[string][]sqlliveness.SessionID) + // Go over each schema and set of descriptor IDs + totalCount := 0 + totalExpectedCount := 0 + for schemaID, descIDsForSchema := range idsPerSchema { + // Check to see if there are any leases that still exist on the previous + // version of the descriptor. + now := m.storage.clock.Now() + var count int + db := m.storage.db + // Get a list of sessions that had the schema leased out when this descriptor + // was created / modified. + var sessionsPerRegion map[string][]sqlliveness.SessionID + expectedSessions := 0 + if err := db.Txn(ctx, func(ctx context.Context, txn isql.Txn) error { + txn.KV().SetDebugName("wait-for-initial-lease-count-schema-leases") + // Look at what was leasing the schema at them modification time, we expect + // this be before the GC TTL because: + // 1) The defaults settings on the system database are not aggressive + // 2) We expect this to be a short wait in the recent past. + // If for some reason this timestamp is outside the GC TTL the transaction + // will get an error, which is a potential caveat here. + if err := txn.KV().SetFixedTimestamp(ctx, descsToProcess[0].GetModificationTime()); err != nil { + return err + } + expectedSessions = 0 + sessionsPerRegion = make(map[string][]sqlliveness.SessionID) - prober := regionliveness.NewLivenessProber(db.KV(), m.storage.codec, regions, m.settings) - regionMap, err := prober.QueryLiveness(ctx, txn.KV()) - if err != nil { - return err - } - // On single region clusters we can query everything at once. - if regionMap == nil { - sessionIDs, err := getSessionsHoldingDescriptor(ctx, txn, desc.GetParentSchemaID(), "") + prober := regionliveness.NewLivenessProber(db.KV(), m.storage.codec, regions, m.settings) + regionMap, err := prober.QueryLiveness(ctx, txn.KV()) if err != nil { return err } - sessionsPerRegion[""] = sessionIDs - expectedSessions += len(sessionIDs) - } - // Otherwise, process active schema leases by region, and use the - // region liveness subsystem to detect offline regions. - return regionMap.ForEach(func(region string) error { - var sessionIDs []sqlliveness.SessionID - var err error - if hasTimeout, timeout := prober.GetProbeTimeout(); hasTimeout { - err = timeutil.RunWithTimeout(ctx, "active-schema-leases-by-region", timeout, func(ctx context.Context) error { - var err error - sessionIDs, err = getSessionsHoldingDescriptor(ctx, txn, desc.GetParentSchemaID(), region) + // On single region clusters we can query everything at once. + if regionMap == nil { + sessionIDs, err := getSessionsHoldingDescriptor(ctx, txn, schemaID, "") + if err != nil { return err - }) - } else { - sessionIDs, err = getSessionsHoldingDescriptor(ctx, txn, desc.GetParentSchemaID(), region) - } - if err != nil { - return handleRegionLivenessErrors(ctx, prober, region, err) + } + sessionsPerRegion[""] = sessionIDs + expectedSessions += len(sessionIDs) } - sessionsPerRegion[region] = sessionIDs - expectedSessions += len(sessionIDs) - return nil - }) - }); err != nil { - return err - } - // Next ensure the initial version exists on all nodes that have the schema - // leased out. - if err := db.Txn(ctx, func(ctx context.Context, txn isql.Txn) error { - count = 0 - txn.KV().SetDebugName("wait-for-initial-lease") - if err := txn.KV().SetFixedTimestamp(ctx, now); err != nil { - return err - } - prober := regionliveness.NewLivenessProber(db.KV(), m.storage.codec, regions, m.settings) - regionMap, err := prober.QueryLiveness(ctx, txn.KV()) - if err != nil { - return err - } - // On multi-region we are going to process initial version on a per-region - // basis, which will allow us to query / inform region liveness of offline - // regions. - if regions != nil && regions.IsMultiRegion() { + // Otherwise, process active schema leases by region, and use the + // region liveness subsystem to detect offline regions. return regionMap.ForEach(func(region string) error { - sessionIDs := sessionsPerRegion[region] - // Skip any regions without sessions. - if len(sessionIDs) == 0 { - return nil - } - - var regionCount int + var sessionIDs []sqlliveness.SessionID var err error if hasTimeout, timeout := prober.GetProbeTimeout(); hasTimeout { - err = timeutil.RunWithTimeout(ctx, "wait-for-new-descriptor-by-region", timeout, func(ctx context.Context) error { + err = timeutil.RunWithTimeout(ctx, "active-schema-leases-by-region", timeout, func(ctx context.Context) error { var err error - regionCount, err = countDescriptorsHeldBySessionIDs(ctx, txn, desc.GetID(), region, sessionIDs) + sessionIDs, err = getSessionsHoldingDescriptor(ctx, txn, schemaID, region) return err }) } else { - regionCount, err = countDescriptorsHeldBySessionIDs(ctx, txn, desc.GetID(), region, sessionIDs) + sessionIDs, err = getSessionsHoldingDescriptor(ctx, txn, schemaID, region) } - if err := handleRegionLivenessErrors(ctx, prober, region, err); err != nil { - return err + if err != nil { + return handleRegionLivenessErrors(ctx, prober, region, err) } - count += regionCount + sessionsPerRegion[region] = sessionIDs + expectedSessions += len(sessionIDs) return nil }) - } else { - // Otherwise, we can query the initial versions directly. - count, err = countDescriptorsHeldBySessionIDs(ctx, txn, desc.GetID(), "", sessionsPerRegion[""]) + }); err != nil { return err } - }); err != nil { - return err + // Next ensure the initial version exists on all nodes that have the schema + // leased out. + if err := db.Txn(ctx, func(ctx context.Context, txn isql.Txn) error { + count = 0 + txn.KV().SetDebugName("wait-for-initial-lease") + if err := txn.KV().SetFixedTimestamp(ctx, now); err != nil { + return err + } + prober := regionliveness.NewLivenessProber(db.KV(), m.storage.codec, regions, m.settings) + regionMap, err := prober.QueryLiveness(ctx, txn.KV()) + if err != nil { + return err + } + // On multi-region we are going to process initial version on a per-region + // basis, which will allow us to query / inform region liveness of offline + // regions. + if regions != nil && regions.IsMultiRegion() { + return regionMap.ForEach(func(region string) error { + sessionIDs := sessionsPerRegion[region] + // Skip any regions without sessions. + if len(sessionIDs) == 0 { + return nil + } + + var regionCount int + var err error + if hasTimeout, timeout := prober.GetProbeTimeout(); hasTimeout { + err = timeutil.RunWithTimeout(ctx, "wait-for-new-descriptor-by-region", timeout, func(ctx context.Context) error { + var err error + regionCount, err = countDescriptorsHeldBySessionIDs(ctx, txn, descIDsForSchema, region, sessionIDs) + return err + }) + } else { + regionCount, err = countDescriptorsHeldBySessionIDs(ctx, txn, descIDsForSchema, region, sessionIDs) + } + if err := handleRegionLivenessErrors(ctx, prober, region, err); err != nil { + return err + } + count += regionCount + return nil + }) + } else { + // Otherwise, we can query the initial versions directly. + count, err = countDescriptorsHeldBySessionIDs(ctx, txn, descIDsForSchema, "", sessionsPerRegion[""]) + return err + } + }); err != nil { + return err + } + if count == expectedSessions*len(descIDsForSchema) { + // Remove descriptors that have had their wait completed. + for _, id := range descIDsForSchema { + ids.Remove(id) + } + } + totalCount += count + totalExpectedCount += expectedSessions * len(descIDsForSchema) } - if count == expectedSessions { + // All the expected sessions are there now. + if totalCount == totalExpectedCount { break } - if count != lastCount { - log.Infof(ctx, "waiting for descriptor %d to appear on %d nodes. Last count was %d", desc.GetID(), expectedSessions, lastCount) + if totalCount != lastCount { + log.Infof(ctx, "waiting for descriptors %v to appear on %d nodes. Last count was %d", ids.Ordered(), totalExpectedCount, totalCount) wsTracker.updateProgress(countDetail{ - count: count, - targetCount: expectedSessions, + count: totalCount, + targetCount: totalExpectedCount, }) } - lastCount = count - + lastCount = totalCount } return nil } @@ -496,9 +533,11 @@ func (m *Manager) WaitForOneVersion( defer wsTracker.end() for lastCount, r := 0, retry.Start(retryOpts); r.Next(); { var err error - if desc, err = m.maybeGetDescriptorWithoutValidation(ctx, id); err != nil { + var descArr catalog.Descriptors + if descArr, err = m.maybeGetDescriptorsWithoutValidation(ctx, descpb.IDs{id}, true); err != nil { return nil, err } + desc = descArr[0] // Check to see if there are any leases that still exist on the previous // version of the descriptor. now := m.storage.clock.Now() diff --git a/pkg/sql/conn_executor_jobs.go b/pkg/sql/conn_executor_jobs.go index 750042ba4e08..0ac4d3694664 100644 --- a/pkg/sql/conn_executor_jobs.go +++ b/pkg/sql/conn_executor_jobs.go @@ -11,6 +11,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/jobs" "github.com/cockroachdb/cockroach/pkg/jobs/jobspb" "github.com/cockroachdb/cockroach/pkg/sql/catalog" + "github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb" "github.com/cockroachdb/cockroach/pkg/sql/regions" "github.com/cockroachdb/cockroach/pkg/util/retry" "github.com/cockroachdb/errors" @@ -61,19 +62,17 @@ func (ex *connExecutor) waitForInitialVersionForNewDescriptors( // nodes that have leased the schema for them out are aware of the new object. // This guarantees that any cached optimizer memos are discarded once the // user transaction completes. + descriptorIDs := make(descpb.IDs, 0, len(ex.extraTxnState.descCollection.GetUncommittedTables())) for _, tbl := range ex.extraTxnState.descCollection.GetUncommittedTables() { if tbl.GetVersion() == 1 { - err := ex.planner.LeaseMgr().WaitForInitialVersion(ex.Ctx(), tbl.GetID(), retry.Options{ - InitialBackoff: time.Millisecond, - MaxBackoff: time.Second, - Multiplier: 1.5, - }, cachedRegions) - if err != nil { - return err - } + descriptorIDs = append(descriptorIDs, tbl.GetID()) } } - return nil + return ex.planner.LeaseMgr().WaitForInitialVersion(ex.Ctx(), descriptorIDs, retry.Options{ + InitialBackoff: time.Millisecond, + MaxBackoff: time.Second, + Multiplier: 1.5, + }, cachedRegions) } // descIDsInSchemaChangeJobs returns all descriptor IDs with which schema change From f112ef2b94120be265e474d214199e308680f94c Mon Sep 17 00:00:00 2001 From: Faizan Qazi Date: Thu, 23 Jan 2025 15:20:27 -0500 Subject: [PATCH 3/4] catalog/lease: add concurrency testing for WaitForInitialVersion Previously, we found a regression where leases being acquired while doing a WaitForInitialVersion could throw things off. This was missed because we had limited concurrency testing for objects being created and used. To address this, this patch adds a test focused on creating objects and accessing them as a basic sanity test. Release note: None --- pkg/sql/catalog/lease/lease_test.go | 78 ++++++++++++++++++++++++++++- 1 file changed, 77 insertions(+), 1 deletion(-) diff --git a/pkg/sql/catalog/lease/lease_test.go b/pkg/sql/catalog/lease/lease_test.go index d5e54599aa01..a0b28fd57159 100644 --- a/pkg/sql/catalog/lease/lease_test.go +++ b/pkg/sql/catalog/lease/lease_test.go @@ -2580,7 +2580,7 @@ func TestLeaseWithOfflineTables(t *testing.T) { // descriptor txn will not wait for the initial version. if expected == descpb.DescriptorState_DROP && next == descpb.DescriptorState_PUBLIC { require.NoError(t, - execCfg.LeaseManager.WaitForInitialVersion(ctx, testTableID(), retry.Options{}, nil)) + execCfg.LeaseManager.WaitForInitialVersion(ctx, descpb.IDs{testTableID()}, retry.Options{}, nil)) } // Wait for the lease manager's refresh worker to have processed the // descriptor update. @@ -4095,3 +4095,79 @@ func TestLongLeaseWaitMetrics(t *testing.T) { require.Equal(t, int64(0), srv.MustGetSQLCounter("sql.leases.long_wait_for_two_version_invariant")) require.Equal(t, int64(0), srv.MustGetSQLCounter("sql.leases.long_wait_for_one_version")) } + +// TestWaitForInitialVersionConcurrent this test is a basic sanity test that +// intentionally has leases modified or used while the WaitForInitialVersion +// is happening. The goal of this test is to find any concurrency related +// bugs in the leasing subsystem with this logic. +func TestWaitForInitialVersionConcurrent(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + ctx := context.Background() + s, sqlDB, _ := serverutils.StartServer(t, base.TestServerArgs{}) + defer s.Stopper().Stop(ctx) + const numIter = 100 + const numThreads = 4 + + expectedErrors := map[pgcode.Code]struct{}{ + pgcode.UndefinedTable: {}, // table isn't created yet + } + tableCreator := func(ctx context.Context, index int) error { + stopTableUser := make(chan struct{}) + defer close(stopTableUser) + tableUser := func(ctx context.Context, tblName string) error { + execLoop := true + // Execute a tight loop during the table creation + // process. + iter := 0 + for execLoop { + select { + // Detect if we should move to the next + // table. + case <-stopTableUser: + execLoop = false + default: + } + var err error + // Execute DDL / DML we expect this to succeed or not notice the + // table. + if iter%2 == 0 { + _, err = sqlDB.Exec(fmt.Sprintf("ALTER TABLE %s ADD COLUMN j%d INT", tblName, iter)) + } else { + _, err = sqlDB.Exec(fmt.Sprintf("SELECT * FROM %s", tblName)) + } + // We expect failures when the table doesn't exist. + if err != nil { + var pqErr *pq.Error + if !errors.As(err, &pqErr) { + return err + } + if _, ok := expectedErrors[pgcode.MakeCode(string(pqErr.Code))]; !ok { + return err + } + } + iter += 1 + } + return nil + } + tablePrefix := fmt.Sprintf("table%d_", index) + for i := 0; i < numIter; i++ { + tblName := fmt.Sprintf("%s%d", tablePrefix, i) + grp := ctxgroup.WithContext(ctx) + grp.GoCtx(func(ctx context.Context) error { + return tableUser(ctx, tblName) + }) + _, err := sqlDB.Exec(fmt.Sprintf("CREATE TABLE %s(n int)", tblName)) + if err != nil { + return err + } + stopTableUser <- struct{}{} + if err := grp.Wait(); err != nil { + return err + } + } + return nil + } + require.NoError(t, ctxgroup.GroupWorkers(ctx, numThreads, tableCreator)) +} From a86c17d64fdfa6e2111caec138b4837a0fd94612 Mon Sep 17 00:00:00 2001 From: Faizan Qazi Date: Fri, 24 Jan 2025 10:06:19 -0500 Subject: [PATCH 4/4] workload/schemachanger: limit size of datums during insert Previously, the insert statement in the randomized schema changer workload could have unbounded sizes for individual datums. This patch adds size limit for datums in the insert operation, where datums will be regenerated a limited number of times until they are below a maximum size. Fixes: #139494 Release note: None --- .../schemachange/operation_generator.go | 18 +++++++++++++++++- 1 file changed, 17 insertions(+), 1 deletion(-) diff --git a/pkg/workload/schemachange/operation_generator.go b/pkg/workload/schemachange/operation_generator.go index e9249c516009..db3069188d88 100644 --- a/pkg/workload/schemachange/operation_generator.go +++ b/pkg/workload/schemachange/operation_generator.go @@ -2935,7 +2935,23 @@ func (og *operationGenerator) insertRow(ctx context.Context, tx pgx.Tx) (stmt *o for i := 0; i < numRows; i++ { var row []string for _, col := range nonGeneratedCols { - d := randgen.RandDatum(og.params.rng, col.typ, col.nullable) + // Limit the size of columns being generated. + const maxSize = 1024 * 1024 + maxAttempts := 32 + var d tree.Datum + for i := 0; i < maxAttempts; i++ { + d = randgen.RandDatum(og.params.rng, col.typ, col.nullable) + // Retry if we exceed the maximum size. + if d.Size() < maxSize { + break + } + } + if d.Size() > maxSize { + og.LogMessage(fmt.Sprintf("datum of type %s exceeds size limit (%d / %d)", + col.typ.SQLString(), + d.Size(), + maxSize)) + } // Unfortunately, RandDatum for OIDs only selects random values, which will // always fail validation. So, for OIDs we will select a random known type // instead.