Skip to content

Commit

Permalink
Disallow SendCommandListToWorkerInSingleTransaction when modification…
Browse files Browse the repository at this point in the history
…s have occurred
  • Loading branch information
marcoslot authored and Metin Doslu committed Nov 3, 2016
1 parent 259252f commit 3c7c794
Show file tree
Hide file tree
Showing 3 changed files with 61 additions and 0 deletions.
7 changes: 7 additions & 0 deletions src/backend/distributed/transaction/worker_transaction.c
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,13 @@ SendCommandListToWorkerInSingleTransaction(char *nodeName, int32 nodePort, char
PGresult *queryResult = NULL;
ListCell *commandCell = NULL;

if (XactModificationLevel > XACT_MODIFICATION_NONE)
{
ereport(ERROR, (errcode(ERRCODE_ACTIVE_SQL_TRANSACTION),
errmsg("cannot open new connections after the first modification "
"command within a transaction")));
}

workerConnection = ConnectToNode(nodeName, nodePort, nodeUser);
if (workerConnection == NULL)
{
Expand Down
32 changes: 32 additions & 0 deletions src/test/regress/expected/multi_repair_shards.out
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,38 @@ SELECT shardid as newshardid FROM pg_dist_shard WHERE logicalrelid = 'customer_e
\gset
-- now, update the second placement as unhealthy
UPDATE pg_dist_shard_placement SET shardstate = 3 WHERE shardid = :newshardid AND nodeport = :worker_2_port;
-- cannot repair a shard after a modification (transaction still open during repair)
BEGIN;
ALTER TABLE customer_engagements ADD COLUMN value float;
NOTICE: using one-phase commit for distributed DDL commands
HINT: You can enable two-phase commit for extra safety with: SET citus.multi_shard_commit_protocol TO '2pc'
SELECT master_copy_shard_placement(:newshardid, 'localhost', :worker_1_port, 'localhost', :worker_2_port);
ERROR: cannot open new connections after the first modification command within a transaction
ROLLBACK;
BEGIN;
INSERT INTO customer_engagements VALUES (4, '04-01-2015', 'fourth event');
SELECT master_copy_shard_placement(:newshardid, 'localhost', :worker_1_port, 'localhost', :worker_2_port);
ERROR: cannot open new connections after the first modification command within a transaction
ROLLBACK;
-- modifications after reparing a shard are fine (will use new metadata)
BEGIN;
SELECT master_copy_shard_placement(:newshardid, 'localhost', :worker_1_port, 'localhost', :worker_2_port);
master_copy_shard_placement
-----------------------------

(1 row)

ALTER TABLE customer_engagements ADD COLUMN value float;
ROLLBACK;
BEGIN;
SELECT master_copy_shard_placement(:newshardid, 'localhost', :worker_1_port, 'localhost', :worker_2_port);
master_copy_shard_placement
-----------------------------

(1 row)

INSERT INTO customer_engagements VALUES (4, '04-01-2015', 'fourth event');
ROLLBACK;
-- add a fake healthy placement for the tests
INSERT INTO pg_dist_shard_placement (nodename, nodeport, shardid, shardstate, shardlength)
VALUES ('dummyhost', :worker_2_port, :newshardid, 1, 0);
Expand Down
22 changes: 22 additions & 0 deletions src/test/regress/sql/multi_repair_shards.sql
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,28 @@ SELECT shardid as newshardid FROM pg_dist_shard WHERE logicalrelid = 'customer_e
-- now, update the second placement as unhealthy
UPDATE pg_dist_shard_placement SET shardstate = 3 WHERE shardid = :newshardid AND nodeport = :worker_2_port;

-- cannot repair a shard after a modification (transaction still open during repair)
BEGIN;
ALTER TABLE customer_engagements ADD COLUMN value float;
SELECT master_copy_shard_placement(:newshardid, 'localhost', :worker_1_port, 'localhost', :worker_2_port);
ROLLBACK;

BEGIN;
INSERT INTO customer_engagements VALUES (4, '04-01-2015', 'fourth event');
SELECT master_copy_shard_placement(:newshardid, 'localhost', :worker_1_port, 'localhost', :worker_2_port);
ROLLBACK;

-- modifications after reparing a shard are fine (will use new metadata)
BEGIN;
SELECT master_copy_shard_placement(:newshardid, 'localhost', :worker_1_port, 'localhost', :worker_2_port);
ALTER TABLE customer_engagements ADD COLUMN value float;
ROLLBACK;

BEGIN;
SELECT master_copy_shard_placement(:newshardid, 'localhost', :worker_1_port, 'localhost', :worker_2_port);
INSERT INTO customer_engagements VALUES (4, '04-01-2015', 'fourth event');
ROLLBACK;

-- add a fake healthy placement for the tests
INSERT INTO pg_dist_shard_placement (nodename, nodeport, shardid, shardstate, shardlength)
VALUES ('dummyhost', :worker_2_port, :newshardid, 1, 0);
Expand Down

0 comments on commit 3c7c794

Please sign in to comment.