From 07bd64127ba8bb56e4d4137dff234ee47005a2c5 Mon Sep 17 00:00:00 2001 From: Alex Walker Date: Tue, 18 May 2021 10:40:39 +0100 Subject: [PATCH 1/6] Fix match:test-core --- tests/behaviour/typeql/typeql_steps.py | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/tests/behaviour/typeql/typeql_steps.py b/tests/behaviour/typeql/typeql_steps.py index 3c6b2159..0d8c5001 100644 --- a/tests/behaviour/typeql/typeql_steps.py +++ b/tests/behaviour/typeql/typeql_steps.py @@ -474,3 +474,10 @@ def step_impl(context: Context): for answer in context.answers: query = apply_query_template(template=context.text, answer=answer) assert_that(list(context.tx().query().match(query)), has_length(1)) + + +@step("each answer does not satisfy") +def step_impl(context: Context): + for answer in context.answers: + query = apply_query_template(template=context.text, answer=answer) + assert_that(list(context.tx().query().match(query)), has_length(0)) From 61b2811175f24d1aee22561ff9a1636d3da09520 Mon Sep 17 00:00:00 2001 From: Alex Walker Date: Wed, 19 May 2021 11:38:30 +0100 Subject: [PATCH 2/6] Fault-tolerant database creation --- typedb/cluster/client.py | 3 +- typedb/cluster/database.py | 121 ++++++++++++++++++++++++++--- typedb/cluster/database_manager.py | 58 ++++++++------ typedb/cluster/failsafe_task.py | 116 --------------------------- typedb/cluster/session.py | 3 +- 5 files changed, 149 insertions(+), 152 deletions(-) delete mode 100644 typedb/cluster/failsafe_task.py diff --git a/typedb/cluster/client.py b/typedb/cluster/client.py index 87f4fd16..89a867a8 100644 --- a/typedb/cluster/client.py +++ b/typedb/cluster/client.py @@ -23,9 +23,8 @@ from typedb.api.client import TypeDBClusterClient from typedb.api.options import TypeDBOptions, TypeDBClusterOptions from typedb.api.session import SessionType -from typedb.cluster.database import _ClusterDatabase +from typedb.cluster.database import _ClusterDatabase, _FailsafeTask from typedb.cluster.database_manager import _ClusterDatabaseManager -from typedb.cluster.failsafe_task import _FailsafeTask from typedb.cluster.session import _ClusterSession from typedb.common.exception import TypeDBClientException, UNABLE_TO_CONNECT, CLUSTER_UNABLE_TO_CONNECT from typedb.common.rpc.request_builder import cluster_server_manager_all_req diff --git a/typedb/cluster/database.py b/typedb/cluster/database.py index 108a57a3..7f972562 100644 --- a/typedb/cluster/database.py +++ b/typedb/cluster/database.py @@ -18,33 +18,39 @@ # specific language governing permissions and limitations # under the License. # +from abc import ABC, abstractmethod +from time import sleep from typing import Dict, Optional, Set, TYPE_CHECKING import typedb_protocol.cluster.cluster_database_pb2 as cluster_database_proto from typedb.api.database import ClusterDatabase +from typedb.common.exception import TypeDBClientException, UNABLE_TO_CONNECT, CLUSTER_REPLICA_NOT_PRIMARY, \ + CLUSTER_UNABLE_TO_CONNECT +from typedb.common.rpc.request_builder import cluster_database_manager_get_req from typedb.core.database import _CoreDatabase if TYPE_CHECKING: - from typedb.cluster.database_manager import _ClusterDatabaseManager + from typedb.cluster.client import _ClusterClient class _ClusterDatabase(ClusterDatabase): - def __init__(self, database: str, cluster_database_mgr: "_ClusterDatabaseManager"): + def __init__(self, database: str, client: "_ClusterClient"): self._name = database - self._database_mgr = cluster_database_mgr + self._client = client self._databases: Dict[str, _CoreDatabase] = {} self._replicas: Set["_ClusterDatabase.Replica"] = set() - for address in cluster_database_mgr.database_mgrs(): - core_database_mgr = cluster_database_mgr.database_mgrs()[address] + cluster_db_mgr = client.databases() + for address in cluster_db_mgr.database_mgrs(): + core_database_mgr = cluster_db_mgr.database_mgrs()[address] self._databases[address] = _CoreDatabase(core_database_mgr.stub(), name=database) @staticmethod - def of(proto_db: cluster_database_proto.ClusterDatabase, cluster_database_mgr: "_ClusterDatabaseManager") -> "_ClusterDatabase": + def of(proto_db: cluster_database_proto.ClusterDatabase, client: "_ClusterClient") -> "_ClusterDatabase": assert proto_db.replicas database: str = proto_db.name - database_cluster_rpc = _ClusterDatabase(database, cluster_database_mgr) + database_cluster_rpc = _ClusterDatabase(database, client) for proto_replica in proto_db.replicas: database_cluster_rpc.replicas().add(_ClusterDatabase.Replica.of(proto_replica, database_cluster_rpc)) print("Discovered database cluster: %s" % database_cluster_rpc) @@ -57,9 +63,8 @@ def schema(self) -> str: return next(iter(self._databases.values())).schema() def delete(self) -> None: - for address in self._databases: - if self._database_mgr.database_mgrs()[address].contains(self._name): - self._databases[address].delete() + delete_db_task = _DeleteDatabaseFailsafeTask(self._client, self._name, self._databases) + delete_db_task.run_primary_replica() def replicas(self): return self._replicas @@ -142,3 +147,99 @@ def __hash__(self): def __str__(self): return "%s/%s" % (self._address, self._database) + + +# This class has to live here because of circular class creation between ClusterDatabase and FailsafeTask +class _FailsafeTask(ABC): + + PRIMARY_REPLICA_TASK_MAX_RETRIES = 10 + FETCH_REPLICAS_MAX_RETRIES = 10 + WAIT_FOR_PRIMARY_REPLICA_SELECTION_SECONDS: float = 2 + + def __init__(self, client: "_ClusterClient", database: str): + self.client = client + self.database = database + + @abstractmethod + def run(self, replica: "_ClusterDatabase.Replica"): + pass + + def rerun(self, replica: "_ClusterDatabase.Replica"): + return self.run(replica) + + def run_primary_replica(self): + if self.database not in self.client.database_by_name() or not self.client.database_by_name()[self.database].primary_replica(): + self._seek_primary_replica() + replica = self.client.database_by_name()[self.database].primary_replica() + retries = 0 + while True: + try: + return self.run(replica) if retries == 0 else self.rerun(replica) + except TypeDBClientException as e: + if e.error_message in [CLUSTER_REPLICA_NOT_PRIMARY, UNABLE_TO_CONNECT]: + print("Unable to open a session or transaction, retrying in 2s... %s" % str(e)) + sleep(self.WAIT_FOR_PRIMARY_REPLICA_SELECTION_SECONDS) + replica = self._seek_primary_replica() + else: + raise e + retries += 1 + if retries > self.PRIMARY_REPLICA_TASK_MAX_RETRIES: + raise self._cluster_not_available_exception() + + def run_any_replica(self): + if self.database in self.client.database_by_name(): + cluster_database = self.client.database_by_name()[self.database] + else: + cluster_database = self._fetch_database_replicas() + + replicas = [cluster_database.preferred_replica()] + [replica for replica in cluster_database.replicas() if not replica.is_preferred()] + retries = 0 + for replica in replicas: + try: + return self.run(replica) if retries == 0 else self.rerun(replica) + except TypeDBClientException as e: + if e.error_message is UNABLE_TO_CONNECT: + print("Unable to open a session or transaction to %s. Attempting next replica. %s" % (str(replica.replica_id()), str(e))) + else: + raise e + retries += 1 + raise self._cluster_not_available_exception() + + def _seek_primary_replica(self) -> "_ClusterDatabase.Replica": + retries = 0 + while retries < self.FETCH_REPLICAS_MAX_RETRIES: + cluster_database = self._fetch_database_replicas() + if cluster_database.primary_replica(): + return cluster_database.primary_replica() + else: + sleep(self.WAIT_FOR_PRIMARY_REPLICA_SELECTION_SECONDS) + retries += 1 + raise self._cluster_not_available_exception() + + def _fetch_database_replicas(self) -> "_ClusterDatabase": + for server_address in self.client.cluster_members(): + try: + print("Fetching replica info from %s" % server_address) + res = self.client.stub(server_address).databases_get(cluster_database_manager_get_req(self.database)) + cluster_database = _ClusterDatabase.of(res.database, self.client) + self.client.database_by_name()[self.database] = cluster_database + return cluster_database + except TypeDBClientException as e: + if e.error_message is UNABLE_TO_CONNECT: + print("Unable to fetch replica info for database '%s' from %s. Attempting next address. %s" % (self.database, server_address, str(e))) + else: + raise e + raise self._cluster_not_available_exception() + + def _cluster_not_available_exception(self) -> TypeDBClientException: + return TypeDBClientException.of(CLUSTER_UNABLE_TO_CONNECT, str([str(addr) for addr in self.client.cluster_members()])) + + +class _DeleteDatabaseFailsafeTask(_FailsafeTask): + + def __init__(self, client: "_ClusterClient", database: str, databases: Dict[str, _CoreDatabase]): + super().__init__(client, database) + self.databases = databases + + def run(self, replica: _ClusterDatabase.Replica): + self.databases.get(replica.address()).delete() diff --git a/typedb/cluster/database_manager.py b/typedb/cluster/database_manager.py index 3ef9b685..6be57bda 100644 --- a/typedb/cluster/database_manager.py +++ b/typedb/cluster/database_manager.py @@ -18,14 +18,18 @@ # specific language governing permissions and limitations # under the License. # -from typing import Dict, List, TYPE_CHECKING +from typing import Dict, List, TYPE_CHECKING, Callable, TypeVar from typedb.api.database import ClusterDatabaseManager -from typedb.cluster.database import _ClusterDatabase -from typedb.common.exception import TypeDBClientException, CLUSTER_ALL_NODES_FAILED +from typedb.cluster.database import _ClusterDatabase, _FailsafeTask +from typedb.common.exception import TypeDBClientException, CLUSTER_ALL_NODES_FAILED, CLUSTER_REPLICA_NOT_PRIMARY, \ + DB_DOES_NOT_EXIST from typedb.common.rpc.request_builder import cluster_database_manager_get_req, cluster_database_manager_all_req +from typedb.common.rpc.stub import TypeDBClusterStub from typedb.core.database_manager import _CoreDatabaseManager +T = TypeVar("T") + if TYPE_CHECKING: from typedb.cluster.client import _ClusterClient @@ -37,38 +41,48 @@ def __init__(self, client: "_ClusterClient"): self._database_mgrs: Dict[str, _CoreDatabaseManager] = {addr: client.databases() for (addr, client) in client.core_clients().items()} def contains(self, name: str) -> bool: - errors = [] - for address in self._database_mgrs: - try: - return self._database_mgrs[address].contains(name) - except TypeDBClientException as e: - errors.append("- %s: %s\n" % (address, e)) - raise TypeDBClientException.of(CLUSTER_ALL_NODES_FAILED, str([str(e) for e in errors])) + return self._failsafe_task(name, lambda stub, core_db_mgr: core_db_mgr.contains(name)) def create(self, name: str) -> None: - for database_manager in self._database_mgrs.values(): - if not database_manager.contains(name): - database_manager.create(name) + self._failsafe_task(name, lambda stub, core_db_mgr: core_db_mgr.create(name)) def get(self, name: str) -> _ClusterDatabase: - errors = [] - for address in self._database_mgrs: - try: - res = self._client.stub(address).databases_get(cluster_database_manager_get_req(name)) - return _ClusterDatabase.of(res.database, self) - except TypeDBClientException as e: - errors.append("- %s: %s\n" % (address, e)) - raise TypeDBClientException.of(CLUSTER_ALL_NODES_FAILED, str([str(e) for e in errors])) + return self._failsafe_task(name, lambda stub, core_db_mgr: self._get_database_task(name, stub)) + + def _get_database_task(self, name: str, stub: TypeDBClusterStub): + if self.contains(name): + res = stub.databases_get(cluster_database_manager_get_req(name)) + return _ClusterDatabase.of(res.database, self._client) + raise TypeDBClientException.of(DB_DOES_NOT_EXIST, name) def all(self) -> List[_ClusterDatabase]: errors = [] for address in self._database_mgrs: try: res = self._client.stub(address).databases_all(cluster_database_manager_all_req()) - return [_ClusterDatabase.of(db, self) for db in res.databases] + return [_ClusterDatabase.of(db, self._client) for db in res.databases] except TypeDBClientException as e: errors.append("- %s: %s\n" % (address, e)) raise TypeDBClientException.of(CLUSTER_ALL_NODES_FAILED, str([str(e) for e in errors])) def database_mgrs(self) -> Dict[str, _CoreDatabaseManager]: return self._database_mgrs + + def _failsafe_task(self, name: str, task: Callable[[TypeDBClusterStub, _CoreDatabaseManager], T]): + failsafe_task = _DatabaseManagerFailsafeTask(self._client, name, task) + try: + return failsafe_task.run_any_replica() + except TypeDBClientException as e: + if e.error_message == CLUSTER_REPLICA_NOT_PRIMARY: + return failsafe_task.run_primary_replica() + raise e + + +class _DatabaseManagerFailsafeTask(_FailsafeTask): + + def __init__(self, client: "_ClusterClient", database: str, task: Callable[[TypeDBClusterStub, _CoreDatabaseManager], T]): + super().__init__(client, database) + self.task = task + + def run(self, replica: _ClusterDatabase.Replica) -> T: + return self.task(self.client.stub(replica.address()), self.client.core_client(replica.address()).databases()) diff --git a/typedb/cluster/failsafe_task.py b/typedb/cluster/failsafe_task.py deleted file mode 100644 index d3e2d6f3..00000000 --- a/typedb/cluster/failsafe_task.py +++ /dev/null @@ -1,116 +0,0 @@ -# -# Copyright (C) 2021 Vaticle -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# -from abc import ABC, abstractmethod -from time import sleep -from typing import TYPE_CHECKING - -from typedb.cluster.database import _ClusterDatabase -from typedb.common.exception import TypeDBClientException, CLUSTER_REPLICA_NOT_PRIMARY, UNABLE_TO_CONNECT, \ - CLUSTER_UNABLE_TO_CONNECT -from typedb.common.rpc.request_builder import cluster_database_manager_get_req - -if TYPE_CHECKING: - from typedb.cluster.client import _ClusterClient - - -class _FailsafeTask(ABC): - - PRIMARY_REPLICA_TASK_MAX_RETRIES = 10 - FETCH_REPLICAS_MAX_RETRIES = 10 - WAIT_FOR_PRIMARY_REPLICA_SELECTION_SECONDS: float = 2 - - def __init__(self, client: "_ClusterClient", database: str): - self.client = client - self.database = database - - @abstractmethod - def run(self, replica: _ClusterDatabase.Replica): - pass - - def rerun(self, replica: _ClusterDatabase.Replica): - return self.run(replica) - - def run_primary_replica(self): - if self.database not in self.client.database_by_name() or not self.client.database_by_name()[self.database].primary_replica(): - self._seek_primary_replica() - replica = self.client.database_by_name()[self.database].primary_replica() - retries = 0 - while True: - try: - return self.run(replica) if retries == 0 else self.rerun(replica) - except TypeDBClientException as e: - if e.error_message in [CLUSTER_REPLICA_NOT_PRIMARY, UNABLE_TO_CONNECT]: - print("Unable to open a session or transaction, retrying in 2s... %s" % str(e)) - sleep(self.WAIT_FOR_PRIMARY_REPLICA_SELECTION_SECONDS) - replica = self._seek_primary_replica() - else: - raise e - retries += 1 - if retries > self.PRIMARY_REPLICA_TASK_MAX_RETRIES: - raise self._cluster_not_available_exception() - - def run_any_replica(self): - if self.database in self.client.database_by_name(): - cluster_database = self.client.database_by_name()[self.database] - else: - cluster_database = self._fetch_database_replicas() - - replicas = [cluster_database.preferred_replica()] + [replica for replica in cluster_database.replicas() if not replica.is_preferred()] - retries = 0 - for replica in replicas: - try: - return self.run(replica) if retries == 0 else self.rerun(replica) - except TypeDBClientException as e: - if e.error_message is UNABLE_TO_CONNECT: - print("Unable to open a session or transaction to %s. Attempting next replica. %s" % (str(replica.replica_id()), str(e))) - else: - raise e - retries += 1 - raise self._cluster_not_available_exception() - - def _seek_primary_replica(self) -> _ClusterDatabase.Replica: - retries = 0 - while retries < self.FETCH_REPLICAS_MAX_RETRIES: - cluster_database = self._fetch_database_replicas() - if cluster_database.primary_replica(): - return cluster_database.primary_replica() - else: - sleep(self.WAIT_FOR_PRIMARY_REPLICA_SELECTION_SECONDS) - retries += 1 - raise self._cluster_not_available_exception() - - def _fetch_database_replicas(self) -> _ClusterDatabase: - for server_address in self.client.cluster_members(): - try: - print("Fetching replica info from %s" % server_address) - res = self.client.stub(server_address).databases_get(cluster_database_manager_get_req(self.database)) - cluster_database = _ClusterDatabase.of(res.database, self.client.databases()) - self.client.database_by_name()[self.database] = cluster_database - return cluster_database - except TypeDBClientException as e: - if e.error_message is UNABLE_TO_CONNECT: - print("Unable to fetch replica info for database '%s' from %s. Attempting next address. %s" % (self.database, server_address, str(e))) - else: - raise e - raise self._cluster_not_available_exception() - - def _cluster_not_available_exception(self) -> TypeDBClientException: - return TypeDBClientException.of(CLUSTER_UNABLE_TO_CONNECT, str([str(addr) for addr in self.client.cluster_members()])) diff --git a/typedb/cluster/session.py b/typedb/cluster/session.py index c37d48a8..f49c0951 100644 --- a/typedb/cluster/session.py +++ b/typedb/cluster/session.py @@ -23,8 +23,7 @@ from typedb.api.options import TypeDBClusterOptions, TypeDBOptions from typedb.api.session import TypeDBSession, SessionType from typedb.api.transaction import TransactionType -from typedb.cluster.database import _ClusterDatabase -from typedb.cluster.failsafe_task import _FailsafeTask +from typedb.cluster.database import _ClusterDatabase, _FailsafeTask from typedb.core.database import _CoreDatabase from typedb.core.transaction import _CoreTransaction From 880fdbfa31481295bd7141fa813c732aecc021d7 Mon Sep 17 00:00:00 2001 From: Alex Walker Date: Wed, 19 May 2021 11:38:47 +0100 Subject: [PATCH 3/6] Fix all Cluster tests --- tests/integration/test_cluster_failover.py | 8 ++++---- tools/behave_rule.bzl | 2 +- tools/cluster_test_rule.bzl | 8 ++++---- 3 files changed, 9 insertions(+), 9 deletions(-) diff --git a/tests/integration/test_cluster_failover.py b/tests/integration/test_cluster_failover.py index 97e2a4db..c7da53cf 100644 --- a/tests/integration/test_cluster_failover.py +++ b/tests/integration/test_cluster_failover.py @@ -69,7 +69,7 @@ def test_put_entity_type_to_crashed_primary_replica(self): print("Retrieved entity type with label '%s' from primary replica." % person.get_label()) assert person.get_label().name() == "person" iteration = 0 - while iteration < 10: + while iteration < 2: iteration += 1 primary_replica = self.get_primary_replica(client.databases()) print("Stopping primary replica (test %d/10)..." % iteration) @@ -79,13 +79,13 @@ def test_put_entity_type_to_crashed_primary_replica(self): print("Primary replica is hosted by server with PID %s" % primary_replica_server_pid) subprocess.check_call(["kill", "-9", primary_replica_server_pid]) print("Primary replica stopped successfully.") - sleep(0.5) + sleep(1) with client.session("typedb", SCHEMA) as session, session.transaction(READ) as tx: person = tx.concepts().get_entity_type("person") print("Retrieved entity type with label '%s' from new primary replica." % person.get_label()) assert person.get_label().name() == "person" idx = str(primary_replica.address())[10] - subprocess.Popen(["./%s/typedb" % idx, "server", "--data", "server/data", "--address", "127.0.0.1:%s1729:%s1730" % (idx, idx), "--peer", "127.0.0.1:11729:11730", "--peer", "127.0.0.1:21729:21730", "--peer", "127.0.0.1:31729:31730"]) + subprocess.Popen(["./%s/typedb" % idx, "server", "--data", "server/data", "--address", "127.0.0.1:%s1729:%s1730:%s1731" % (idx, idx, idx), "--peer", "127.0.0.1:11729:11730:11731", "--peer", "127.0.0.1:21729:21730:21731", "--peer", "127.0.0.1:31729:31730:31731"]) lsof = None live_check_iteration = 0 while not lsof and live_check_iteration < 60: @@ -94,7 +94,7 @@ def test_put_entity_type_to_crashed_primary_replica(self): lsof = subprocess.check_output(["lsof", "-i", ":%s" % port]) except subprocess.CalledProcessError: pass - sleep(0.5) + sleep(1) if __name__ == "__main__": diff --git a/tools/behave_rule.bzl b/tools/behave_rule.bzl index 23147024..0ec0a51d 100644 --- a/tools/behave_rule.bzl +++ b/tools/behave_rule.bzl @@ -83,7 +83,7 @@ def _rule_implementation(ctx): if [[ $PRODUCT == "Core" ]]; then ./typedb_distribution/"$DIRECTORY"/typedb server --port $PORT --data typedb_test & else - ./typedb_distribution/"$DIRECTORY"/typedb server --address "127.0.0.1:$PORT:$(($PORT+1))" --data typedb_test & + ./typedb_distribution/"$DIRECTORY"/typedb server --address "127.0.0.1:$PORT:$(($PORT+1)):$(($PORT+2))" --data typedb_test & fi POLL_INTERVAL_SECS=0.5 diff --git a/tools/cluster_test_rule.bzl b/tools/cluster_test_rule.bzl index 7c834bdf..77f45392 100644 --- a/tools/cluster_test_rule.bzl +++ b/tools/cluster_test_rule.bzl @@ -56,9 +56,9 @@ def _rule_implementation(ctx): echo Successfully unarchived TypeDB distribution. Creating 3 copies. cp -r typedb_distribution/$TYPEDB/ 1 && cp -r typedb_distribution/$TYPEDB/ 2 && cp -r typedb_distribution/$TYPEDB/ 3 echo Starting 3 TypeDB servers. - ./1/typedb server --data server/data --address 127.0.0.1:11729:11730 --peer 127.0.0.1:11729:11730 --peer 127.0.0.1:21729:21730 --peer 127.0.0.1:31729:31730 & - ./2/typedb server --data server/data --address 127.0.0.1:21729:21730 --peer 127.0.0.1:11729:11730 --peer 127.0.0.1:21729:21730 --peer 127.0.0.1:31729:31730 & - ./3/typedb server --data server/data --address 127.0.0.1:31729:31730 --peer 127.0.0.1:11729:11730 --peer 127.0.0.1:21729:21730 --peer 127.0.0.1:31729:31730 & + ./1/typedb server --data server/data --address 127.0.0.1:11729:11730:11731 --peer 127.0.0.1:11729:11730:11731 --peer 127.0.0.1:21729:21730:21731 --peer 127.0.0.1:31729:31730:31731 & + ./2/typedb server --data server/data --address 127.0.0.1:21729:21730:21731 --peer 127.0.0.1:11729:11730:11731 --peer 127.0.0.1:21729:21730:21731 --peer 127.0.0.1:31729:31730:31731 & + ./3/typedb server --data server/data --address 127.0.0.1:31729:31730:31731 --peer 127.0.0.1:11729:11730:11731 --peer 127.0.0.1:21729:21730:21731 --peer 127.0.0.1:31729:31730:31731 & POLL_INTERVAL_SECS=0.5 MAX_RETRIES=60 @@ -88,7 +88,7 @@ def _rule_implementation(ctx): cmd += """ echo Tests concluded with exit value $RESULT echo Stopping servers. - kill $(jps | awk '/TypeDBServer/ {print $1}' | paste -sd " " -) + kill $(jps | awk '/TypeDBNode/ {print $1}' | paste -sd " " -) exit $RESULT """ From fc938854d6fbea4039e582c1460be971aac13ebf Mon Sep 17 00:00:00 2001 From: Alex Walker Date: Wed, 19 May 2021 11:42:34 +0100 Subject: [PATCH 4/6] bump From 2b3fb373131feb92aa9ca537386e108a95a64d6e Mon Sep 17 00:00:00 2001 From: Alex Walker Date: Wed, 19 May 2021 11:59:54 +0100 Subject: [PATCH 5/6] Set cluster failover test iteration count to 10 --- tests/integration/test_cluster_failover.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/integration/test_cluster_failover.py b/tests/integration/test_cluster_failover.py index c7da53cf..63689f6b 100644 --- a/tests/integration/test_cluster_failover.py +++ b/tests/integration/test_cluster_failover.py @@ -69,7 +69,7 @@ def test_put_entity_type_to_crashed_primary_replica(self): print("Retrieved entity type with label '%s' from primary replica." % person.get_label()) assert person.get_label().name() == "person" iteration = 0 - while iteration < 2: + while iteration < 10: iteration += 1 primary_replica = self.get_primary_replica(client.databases()) print("Stopping primary replica (test %d/10)..." % iteration) From 5c2e077601352324ee77b6c5992ad0ffce0d7f4c Mon Sep 17 00:00:00 2001 From: Alex Walker Date: Wed, 19 May 2021 13:04:32 +0100 Subject: [PATCH 6/6] Add comment to sleep statement in failover test --- tests/integration/test_cluster_failover.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/tests/integration/test_cluster_failover.py b/tests/integration/test_cluster_failover.py index 63689f6b..31800b9b 100644 --- a/tests/integration/test_cluster_failover.py +++ b/tests/integration/test_cluster_failover.py @@ -79,7 +79,7 @@ def test_put_entity_type_to_crashed_primary_replica(self): print("Primary replica is hosted by server with PID %s" % primary_replica_server_pid) subprocess.check_call(["kill", "-9", primary_replica_server_pid]) print("Primary replica stopped successfully.") - sleep(1) + sleep(5) # TODO: This ensures the server is actually shut down, but it's odd that it needs to be so long with client.session("typedb", SCHEMA) as session, session.transaction(READ) as tx: person = tx.concepts().get_entity_type("person") print("Retrieved entity type with label '%s' from new primary replica." % person.get_label()) @@ -94,7 +94,6 @@ def test_put_entity_type_to_crashed_primary_replica(self): lsof = subprocess.check_output(["lsof", "-i", ":%s" % port]) except subprocess.CalledProcessError: pass - sleep(1) if __name__ == "__main__":