Skip to content

Commit

Permalink
Ignore nodes not allowed for shards, when planning rebalance steps (#…
Browse files Browse the repository at this point in the history
…6887)

We are handling colocation groups with shard group count less than the
worker node count, using a method different than the usual rebalancer.
See #6739
While making the decision of using this method or not, we should've
ignored the nodes that are marked `shouldhaveshards = false`. This PR
excludes those nodes when making the decision.

Adds a test such that:
 coordinator: []
 worker 1: [1_1, 1_2]
 worker 2: [2_1, 2_2]
(rebalance)
 coordinator: []
 worker 1: [1_1, 2_1]
 worker 2: [1_2, 2_2]

If we take the coordinator into account, the rebalancer considers the
first state as balanced and does nothing (because shard_count <
worker_count)
But with this pr, we ignore the coordinator because it's
shouldhaveshards = false
So the rebalancer distributes each colocation group to both workers

Also, fixes an unrelated flaky test in the same file

(cherry picked from commit 59ccf36)
  • Loading branch information
agedemenli authored and JelteF committed May 1, 2023
1 parent 262c335 commit 12c27ac
Show file tree
Hide file tree
Showing 3 changed files with 177 additions and 3 deletions.
13 changes: 11 additions & 2 deletions src/backend/distributed/operations/shard_rebalancer.c
Original file line number Diff line number Diff line change
Expand Up @@ -515,6 +515,16 @@ GetRebalanceSteps(RebalanceOptions *options)

/* sort the lists to make the function more deterministic */
List *activeWorkerList = SortedActiveWorkers();
int shardAllowedNodeCount = 0;
WorkerNode *workerNode = NULL;
foreach_ptr(workerNode, activeWorkerList)
{
if (workerNode->shouldHaveShards)
{
shardAllowedNodeCount++;
}
}

List *activeShardPlacementListList = NIL;
List *unbalancedShards = NIL;

Expand All @@ -532,8 +542,7 @@ GetRebalanceSteps(RebalanceOptions *options)
shardPlacementList, options->workerNode);
}

if (list_length(activeShardPlacementListForRelation) >= list_length(
activeWorkerList))
if (list_length(activeShardPlacementListForRelation) >= shardAllowedNodeCount)
{
activeShardPlacementListList = lappend(activeShardPlacementListList,
activeShardPlacementListForRelation);
Expand Down
110 changes: 109 additions & 1 deletion src/test/regress/expected/shard_rebalancer.out
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,14 @@ SELECT create_distributed_table('dist_table_test', 'a');
CREATE TABLE postgres_table_test(a int primary key);
-- make sure that all rebalance operations works fine when
-- reference tables are replicated to the coordinator
SET client_min_messages TO ERROR;
SELECT 1 FROM master_add_node('localhost', :master_port, groupId=>0);
NOTICE: localhost:xxxxx is the coordinator and already contains metadata, skipping syncing the metadata
?column?
---------------------------------------------------------------------
1
(1 row)

RESET client_min_messages;
-- should just be noops even if we add the coordinator to the pg_dist_node
SELECT rebalance_table_shards('dist_table_test');
rebalance_table_shards
Expand Down Expand Up @@ -2713,6 +2714,113 @@ SELECT sh.logicalrelid, pl.nodeport
(5 rows)

DROP TABLE single_shard_colocation_1a, single_shard_colocation_1b, single_shard_colocation_1c, single_shard_colocation_2a, single_shard_colocation_2b CASCADE;
-- test the same with coordinator shouldhaveshards = false and shard_count = 2
-- so that the shard allowed node count would be 2 when rebalancing
-- for such cases, we only count the nodes that are allowed for shard placements
UPDATE pg_dist_node SET shouldhaveshards=false WHERE nodeport = :master_port;
create table two_shard_colocation_1a (a int primary key);
create table two_shard_colocation_1b (a int primary key);
SET citus.shard_replication_factor = 1;
select create_distributed_table('two_shard_colocation_1a','a', colocate_with => 'none', shard_count => 2);
create_distributed_table
---------------------------------------------------------------------

(1 row)

select create_distributed_table('two_shard_colocation_1b','a',colocate_with=>'two_shard_colocation_1a');
create_distributed_table
---------------------------------------------------------------------

(1 row)

create table two_shard_colocation_2a (a int primary key);
create table two_shard_colocation_2b (a int primary key);
select create_distributed_table('two_shard_colocation_2a','a', colocate_with => 'none', shard_count => 2);
create_distributed_table
---------------------------------------------------------------------

(1 row)

select create_distributed_table('two_shard_colocation_2b','a',colocate_with=>'two_shard_colocation_2a');
create_distributed_table
---------------------------------------------------------------------

(1 row)

-- move shards of colocation group 1 to worker1
SELECT citus_move_shard_placement(sh.shardid, 'localhost', :worker_2_port, 'localhost', :worker_1_port)
FROM pg_dist_shard sh JOIN pg_dist_shard_placement pl ON sh.shardid = pl.shardid
WHERE sh.logicalrelid = 'two_shard_colocation_1a'::regclass
AND pl.nodeport = :worker_2_port
LIMIT 1;
citus_move_shard_placement
---------------------------------------------------------------------

(1 row)

-- move shards of colocation group 2 to worker2
SELECT citus_move_shard_placement(sh.shardid, 'localhost', :worker_1_port, 'localhost', :worker_2_port)
FROM pg_dist_shard sh JOIN pg_dist_shard_placement pl ON sh.shardid = pl.shardid
WHERE sh.logicalrelid = 'two_shard_colocation_2a'::regclass
AND pl.nodeport = :worker_1_port
LIMIT 1;
citus_move_shard_placement
---------------------------------------------------------------------

(1 row)

-- current state:
-- coordinator: []
-- worker 1: [1_1, 1_2]
-- worker 2: [2_1, 2_2]
SELECT sh.logicalrelid, pl.nodeport
FROM pg_dist_shard sh JOIN pg_dist_shard_placement pl ON sh.shardid = pl.shardid
WHERE sh.logicalrelid::text IN ('two_shard_colocation_1a', 'two_shard_colocation_1b', 'two_shard_colocation_2a', 'two_shard_colocation_2b')
ORDER BY sh.logicalrelid, pl.nodeport;
logicalrelid | nodeport
---------------------------------------------------------------------
two_shard_colocation_1a | 57637
two_shard_colocation_1a | 57637
two_shard_colocation_1b | 57637
two_shard_colocation_1b | 57637
two_shard_colocation_2a | 57638
two_shard_colocation_2a | 57638
two_shard_colocation_2b | 57638
two_shard_colocation_2b | 57638
(8 rows)

-- If we take the coordinator into account, the rebalancer considers this as balanced and does nothing (shard_count < worker_count)
-- but because the coordinator is not allowed for shards, rebalancer will distribute each colocation group to both workers
select rebalance_table_shards(shard_transfer_mode:='block_writes');
NOTICE: Moving shard xxxxx from localhost:xxxxx to localhost:xxxxx ...
NOTICE: Moving shard xxxxx from localhost:xxxxx to localhost:xxxxx ...
rebalance_table_shards
---------------------------------------------------------------------

(1 row)

-- final state:
-- coordinator: []
-- worker 1: [1_1, 2_1]
-- worker 2: [1_2, 2_2]
SELECT sh.logicalrelid, pl.nodeport
FROM pg_dist_shard sh JOIN pg_dist_shard_placement pl ON sh.shardid = pl.shardid
WHERE sh.logicalrelid::text IN ('two_shard_colocation_1a', 'two_shard_colocation_1b', 'two_shard_colocation_2a', 'two_shard_colocation_2b')
ORDER BY sh.logicalrelid, pl.nodeport;
logicalrelid | nodeport
---------------------------------------------------------------------
two_shard_colocation_1a | 57637
two_shard_colocation_1a | 57638
two_shard_colocation_1b | 57637
two_shard_colocation_1b | 57638
two_shard_colocation_2a | 57637
two_shard_colocation_2a | 57638
two_shard_colocation_2b | 57637
two_shard_colocation_2b | 57638
(8 rows)

-- cleanup
DROP TABLE two_shard_colocation_1a, two_shard_colocation_1b, two_shard_colocation_2a, two_shard_colocation_2b CASCADE;
-- verify we detect if one of the tables do not have a replica identity or primary key
-- and error out in case of shard transfer mode = auto
SELECT 1 FROM citus_remove_node('localhost', :worker_2_port);
Expand Down
57 changes: 57 additions & 0 deletions src/test/regress/sql/shard_rebalancer.sql
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,9 @@ CREATE TABLE postgres_table_test(a int primary key);

-- make sure that all rebalance operations works fine when
-- reference tables are replicated to the coordinator
SET client_min_messages TO ERROR;
SELECT 1 FROM master_add_node('localhost', :master_port, groupId=>0);
RESET client_min_messages;

-- should just be noops even if we add the coordinator to the pg_dist_node
SELECT rebalance_table_shards('dist_table_test');
Expand Down Expand Up @@ -1497,6 +1499,61 @@ SELECT sh.logicalrelid, pl.nodeport

DROP TABLE single_shard_colocation_1a, single_shard_colocation_1b, single_shard_colocation_1c, single_shard_colocation_2a, single_shard_colocation_2b CASCADE;

-- test the same with coordinator shouldhaveshards = false and shard_count = 2
-- so that the shard allowed node count would be 2 when rebalancing
-- for such cases, we only count the nodes that are allowed for shard placements
UPDATE pg_dist_node SET shouldhaveshards=false WHERE nodeport = :master_port;

create table two_shard_colocation_1a (a int primary key);
create table two_shard_colocation_1b (a int primary key);
SET citus.shard_replication_factor = 1;

select create_distributed_table('two_shard_colocation_1a','a', colocate_with => 'none', shard_count => 2);
select create_distributed_table('two_shard_colocation_1b','a',colocate_with=>'two_shard_colocation_1a');

create table two_shard_colocation_2a (a int primary key);
create table two_shard_colocation_2b (a int primary key);
select create_distributed_table('two_shard_colocation_2a','a', colocate_with => 'none', shard_count => 2);
select create_distributed_table('two_shard_colocation_2b','a',colocate_with=>'two_shard_colocation_2a');

-- move shards of colocation group 1 to worker1
SELECT citus_move_shard_placement(sh.shardid, 'localhost', :worker_2_port, 'localhost', :worker_1_port)
FROM pg_dist_shard sh JOIN pg_dist_shard_placement pl ON sh.shardid = pl.shardid
WHERE sh.logicalrelid = 'two_shard_colocation_1a'::regclass
AND pl.nodeport = :worker_2_port
LIMIT 1;
-- move shards of colocation group 2 to worker2
SELECT citus_move_shard_placement(sh.shardid, 'localhost', :worker_1_port, 'localhost', :worker_2_port)
FROM pg_dist_shard sh JOIN pg_dist_shard_placement pl ON sh.shardid = pl.shardid
WHERE sh.logicalrelid = 'two_shard_colocation_2a'::regclass
AND pl.nodeport = :worker_1_port
LIMIT 1;

-- current state:
-- coordinator: []
-- worker 1: [1_1, 1_2]
-- worker 2: [2_1, 2_2]
SELECT sh.logicalrelid, pl.nodeport
FROM pg_dist_shard sh JOIN pg_dist_shard_placement pl ON sh.shardid = pl.shardid
WHERE sh.logicalrelid::text IN ('two_shard_colocation_1a', 'two_shard_colocation_1b', 'two_shard_colocation_2a', 'two_shard_colocation_2b')
ORDER BY sh.logicalrelid, pl.nodeport;

-- If we take the coordinator into account, the rebalancer considers this as balanced and does nothing (shard_count < worker_count)
-- but because the coordinator is not allowed for shards, rebalancer will distribute each colocation group to both workers
select rebalance_table_shards(shard_transfer_mode:='block_writes');

-- final state:
-- coordinator: []
-- worker 1: [1_1, 2_1]
-- worker 2: [1_2, 2_2]
SELECT sh.logicalrelid, pl.nodeport
FROM pg_dist_shard sh JOIN pg_dist_shard_placement pl ON sh.shardid = pl.shardid
WHERE sh.logicalrelid::text IN ('two_shard_colocation_1a', 'two_shard_colocation_1b', 'two_shard_colocation_2a', 'two_shard_colocation_2b')
ORDER BY sh.logicalrelid, pl.nodeport;

-- cleanup
DROP TABLE two_shard_colocation_1a, two_shard_colocation_1b, two_shard_colocation_2a, two_shard_colocation_2b CASCADE;

-- verify we detect if one of the tables do not have a replica identity or primary key
-- and error out in case of shard transfer mode = auto
SELECT 1 FROM citus_remove_node('localhost', :worker_2_port);
Expand Down

0 comments on commit 12c27ac

Please sign in to comment.