Skip to content
This repository has been archived by the owner on Oct 9, 2023. It is now read-only.

Fault-tolerant Cluster database operations #224

Merged
merged 6 commits into from
May 19, 2021
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions tests/behaviour/typeql/typeql_steps.py
Original file line number Diff line number Diff line change
Expand Up @@ -474,3 +474,10 @@ def step_impl(context: Context):
for answer in context.answers:
query = apply_query_template(template=context.text, answer=answer)
assert_that(list(context.tx().query().match(query)), has_length(1))


@step("each answer does not satisfy")
def step_impl(context: Context):
for answer in context.answers:
query = apply_query_template(template=context.text, answer=answer)
assert_that(list(context.tx().query().match(query)), has_length(0))
6 changes: 3 additions & 3 deletions tests/integration/test_cluster_failover.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,13 +79,13 @@ def test_put_entity_type_to_crashed_primary_replica(self):
print("Primary replica is hosted by server with PID %s" % primary_replica_server_pid)
subprocess.check_call(["kill", "-9", primary_replica_server_pid])
print("Primary replica stopped successfully.")
sleep(0.5)
sleep(1)
with client.session("typedb", SCHEMA) as session, session.transaction(READ) as tx:
person = tx.concepts().get_entity_type("person")
print("Retrieved entity type with label '%s' from new primary replica." % person.get_label())
assert person.get_label().name() == "person"
idx = str(primary_replica.address())[10]
subprocess.Popen(["./%s/typedb" % idx, "server", "--data", "server/data", "--address", "127.0.0.1:%s1729:%s1730" % (idx, idx), "--peer", "127.0.0.1:11729:11730", "--peer", "127.0.0.1:21729:21730", "--peer", "127.0.0.1:31729:31730"])
subprocess.Popen(["./%s/typedb" % idx, "server", "--data", "server/data", "--address", "127.0.0.1:%s1729:%s1730:%s1731" % (idx, idx, idx), "--peer", "127.0.0.1:11729:11730:11731", "--peer", "127.0.0.1:21729:21730:21731", "--peer", "127.0.0.1:31729:31730:31731"])
lsof = None
live_check_iteration = 0
while not lsof and live_check_iteration < 60:
Expand All @@ -94,7 +94,7 @@ def test_put_entity_type_to_crashed_primary_replica(self):
lsof = subprocess.check_output(["lsof", "-i", ":%s" % port])
except subprocess.CalledProcessError:
pass
sleep(0.5)
sleep(1)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems to fail intermittently without this sleep, perhaps due to getting an unexpected error as the Cluster node is in the process of starting up.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do you still have the error? can you paste them here?



if __name__ == "__main__":
Expand Down
2 changes: 1 addition & 1 deletion tools/behave_rule.bzl
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ def _rule_implementation(ctx):
if [[ $PRODUCT == "Core" ]]; then
./typedb_distribution/"$DIRECTORY"/typedb server --port $PORT --data typedb_test &
else
./typedb_distribution/"$DIRECTORY"/typedb server --address "127.0.0.1:$PORT:$(($PORT+1))" --data typedb_test &
./typedb_distribution/"$DIRECTORY"/typedb server --address "127.0.0.1:$PORT:$(($PORT+1)):$(($PORT+2))" --data typedb_test &
fi

POLL_INTERVAL_SECS=0.5
Expand Down
8 changes: 4 additions & 4 deletions tools/cluster_test_rule.bzl
Original file line number Diff line number Diff line change
Expand Up @@ -56,9 +56,9 @@ def _rule_implementation(ctx):
echo Successfully unarchived TypeDB distribution. Creating 3 copies.
cp -r typedb_distribution/$TYPEDB/ 1 && cp -r typedb_distribution/$TYPEDB/ 2 && cp -r typedb_distribution/$TYPEDB/ 3
echo Starting 3 TypeDB servers.
./1/typedb server --data server/data --address 127.0.0.1:11729:11730 --peer 127.0.0.1:11729:11730 --peer 127.0.0.1:21729:21730 --peer 127.0.0.1:31729:31730 &
./2/typedb server --data server/data --address 127.0.0.1:21729:21730 --peer 127.0.0.1:11729:11730 --peer 127.0.0.1:21729:21730 --peer 127.0.0.1:31729:31730 &
./3/typedb server --data server/data --address 127.0.0.1:31729:31730 --peer 127.0.0.1:11729:11730 --peer 127.0.0.1:21729:21730 --peer 127.0.0.1:31729:31730 &
./1/typedb server --data server/data --address 127.0.0.1:11729:11730:11731 --peer 127.0.0.1:11729:11730:11731 --peer 127.0.0.1:21729:21730:21731 --peer 127.0.0.1:31729:31730:31731 &
Copy link
Member

@lolski lolski May 19, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've created an issue to extract the TypeDB setup logic into TypeDBRunner and TypeDBClusterRunner, just like in Java: typedb/typedb-dependencies#286. I've also created one for NodeJS: typedb/typedb-dependencies#287.

./2/typedb server --data server/data --address 127.0.0.1:21729:21730:21731 --peer 127.0.0.1:11729:11730:11731 --peer 127.0.0.1:21729:21730:21731 --peer 127.0.0.1:31729:31730:31731 &
./3/typedb server --data server/data --address 127.0.0.1:31729:31730:31731 --peer 127.0.0.1:11729:11730:11731 --peer 127.0.0.1:21729:21730:21731 --peer 127.0.0.1:31729:31730:31731 &

POLL_INTERVAL_SECS=0.5
MAX_RETRIES=60
Expand Down Expand Up @@ -88,7 +88,7 @@ def _rule_implementation(ctx):
cmd += """
echo Tests concluded with exit value $RESULT
echo Stopping servers.
kill $(jps | awk '/TypeDBServer/ {print $1}' | paste -sd " " -)
kill $(jps | awk '/TypeDBNode/ {print $1}' | paste -sd " " -)
exit $RESULT
"""

Expand Down
3 changes: 1 addition & 2 deletions typedb/cluster/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,8 @@
from typedb.api.client import TypeDBClusterClient
from typedb.api.options import TypeDBOptions, TypeDBClusterOptions
from typedb.api.session import SessionType
from typedb.cluster.database import _ClusterDatabase
from typedb.cluster.database import _ClusterDatabase, _FailsafeTask
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this needed?

from typedb.cluster.database_manager import _ClusterDatabaseManager
from typedb.cluster.failsafe_task import _FailsafeTask
from typedb.cluster.session import _ClusterSession
from typedb.common.exception import TypeDBClientException, UNABLE_TO_CONNECT, CLUSTER_UNABLE_TO_CONNECT
from typedb.common.rpc.request_builder import cluster_server_manager_all_req
Expand Down
121 changes: 111 additions & 10 deletions typedb/cluster/database.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,33 +18,39 @@
# specific language governing permissions and limitations
# under the License.
#
from abc import ABC, abstractmethod
from time import sleep
from typing import Dict, Optional, Set, TYPE_CHECKING

import typedb_protocol.cluster.cluster_database_pb2 as cluster_database_proto

from typedb.api.database import ClusterDatabase
from typedb.common.exception import TypeDBClientException, UNABLE_TO_CONNECT, CLUSTER_REPLICA_NOT_PRIMARY, \
CLUSTER_UNABLE_TO_CONNECT
from typedb.common.rpc.request_builder import cluster_database_manager_get_req
from typedb.core.database import _CoreDatabase

if TYPE_CHECKING:
from typedb.cluster.database_manager import _ClusterDatabaseManager
from typedb.cluster.client import _ClusterClient


class _ClusterDatabase(ClusterDatabase):

def __init__(self, database: str, cluster_database_mgr: "_ClusterDatabaseManager"):
def __init__(self, database: str, client: "_ClusterClient"):
self._name = database
self._database_mgr = cluster_database_mgr
self._client = client
self._databases: Dict[str, _CoreDatabase] = {}
self._replicas: Set["_ClusterDatabase.Replica"] = set()
for address in cluster_database_mgr.database_mgrs():
core_database_mgr = cluster_database_mgr.database_mgrs()[address]
cluster_db_mgr = client.databases()
for address in cluster_db_mgr.database_mgrs():
core_database_mgr = cluster_db_mgr.database_mgrs()[address]
self._databases[address] = _CoreDatabase(core_database_mgr.stub(), name=database)

@staticmethod
def of(proto_db: cluster_database_proto.ClusterDatabase, cluster_database_mgr: "_ClusterDatabaseManager") -> "_ClusterDatabase":
def of(proto_db: cluster_database_proto.ClusterDatabase, client: "_ClusterClient") -> "_ClusterDatabase":
assert proto_db.replicas
database: str = proto_db.name
database_cluster_rpc = _ClusterDatabase(database, cluster_database_mgr)
database_cluster_rpc = _ClusterDatabase(database, client)
for proto_replica in proto_db.replicas:
database_cluster_rpc.replicas().add(_ClusterDatabase.Replica.of(proto_replica, database_cluster_rpc))
print("Discovered database cluster: %s" % database_cluster_rpc)
Expand All @@ -57,9 +63,8 @@ def schema(self) -> str:
return next(iter(self._databases.values())).schema()

def delete(self) -> None:
for address in self._databases:
if self._database_mgr.database_mgrs()[address].contains(self._name):
self._databases[address].delete()
delete_db_task = _DeleteDatabaseFailsafeTask(self._client, self._name, self._databases)
delete_db_task.run_primary_replica()

def replicas(self):
return self._replicas
Expand Down Expand Up @@ -142,3 +147,99 @@ def __hash__(self):

def __str__(self):
return "%s/%s" % (self._address, self._database)


# This class has to live here because of circular class creation between ClusterDatabase and FailsafeTask
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is highly unpleasant but I can't find any workaround that is any better. Because an instance method of FailsafeTask creates a ClusterDatabase, and an instance method of ClusterDatabase creates a FailsafeTask, it is physically impossible to decouple the two in Python. (This may also be the case in NodeJS)

class _FailsafeTask(ABC):

PRIMARY_REPLICA_TASK_MAX_RETRIES = 10
FETCH_REPLICAS_MAX_RETRIES = 10
WAIT_FOR_PRIMARY_REPLICA_SELECTION_SECONDS: float = 2

def __init__(self, client: "_ClusterClient", database: str):
self.client = client
self.database = database

@abstractmethod
def run(self, replica: "_ClusterDatabase.Replica"):
pass

def rerun(self, replica: "_ClusterDatabase.Replica"):
return self.run(replica)

def run_primary_replica(self):
if self.database not in self.client.database_by_name() or not self.client.database_by_name()[self.database].primary_replica():
self._seek_primary_replica()
replica = self.client.database_by_name()[self.database].primary_replica()
retries = 0
while True:
try:
return self.run(replica) if retries == 0 else self.rerun(replica)
except TypeDBClientException as e:
if e.error_message in [CLUSTER_REPLICA_NOT_PRIMARY, UNABLE_TO_CONNECT]:
print("Unable to open a session or transaction, retrying in 2s... %s" % str(e))
sleep(self.WAIT_FOR_PRIMARY_REPLICA_SELECTION_SECONDS)
replica = self._seek_primary_replica()
else:
raise e
retries += 1
if retries > self.PRIMARY_REPLICA_TASK_MAX_RETRIES:
raise self._cluster_not_available_exception()

def run_any_replica(self):
if self.database in self.client.database_by_name():
cluster_database = self.client.database_by_name()[self.database]
else:
cluster_database = self._fetch_database_replicas()

replicas = [cluster_database.preferred_replica()] + [replica for replica in cluster_database.replicas() if not replica.is_preferred()]
retries = 0
for replica in replicas:
try:
return self.run(replica) if retries == 0 else self.rerun(replica)
except TypeDBClientException as e:
if e.error_message is UNABLE_TO_CONNECT:
print("Unable to open a session or transaction to %s. Attempting next replica. %s" % (str(replica.replica_id()), str(e)))
else:
raise e
retries += 1
raise self._cluster_not_available_exception()

def _seek_primary_replica(self) -> "_ClusterDatabase.Replica":
retries = 0
while retries < self.FETCH_REPLICAS_MAX_RETRIES:
cluster_database = self._fetch_database_replicas()
if cluster_database.primary_replica():
return cluster_database.primary_replica()
else:
sleep(self.WAIT_FOR_PRIMARY_REPLICA_SELECTION_SECONDS)
retries += 1
raise self._cluster_not_available_exception()

def _fetch_database_replicas(self) -> "_ClusterDatabase":
for server_address in self.client.cluster_members():
try:
print("Fetching replica info from %s" % server_address)
res = self.client.stub(server_address).databases_get(cluster_database_manager_get_req(self.database))
cluster_database = _ClusterDatabase.of(res.database, self.client)
self.client.database_by_name()[self.database] = cluster_database
return cluster_database
except TypeDBClientException as e:
if e.error_message is UNABLE_TO_CONNECT:
print("Unable to fetch replica info for database '%s' from %s. Attempting next address. %s" % (self.database, server_address, str(e)))
else:
raise e
raise self._cluster_not_available_exception()

def _cluster_not_available_exception(self) -> TypeDBClientException:
return TypeDBClientException.of(CLUSTER_UNABLE_TO_CONNECT, str([str(addr) for addr in self.client.cluster_members()]))


class _DeleteDatabaseFailsafeTask(_FailsafeTask):

def __init__(self, client: "_ClusterClient", database: str, databases: Dict[str, _CoreDatabase]):
super().__init__(client, database)
self.databases = databases

def run(self, replica: _ClusterDatabase.Replica):
self.databases.get(replica.address()).delete()
58 changes: 36 additions & 22 deletions typedb/cluster/database_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,18 @@
# specific language governing permissions and limitations
# under the License.
#
from typing import Dict, List, TYPE_CHECKING
from typing import Dict, List, TYPE_CHECKING, Callable, TypeVar

from typedb.api.database import ClusterDatabaseManager
from typedb.cluster.database import _ClusterDatabase
from typedb.common.exception import TypeDBClientException, CLUSTER_ALL_NODES_FAILED
from typedb.cluster.database import _ClusterDatabase, _FailsafeTask
from typedb.common.exception import TypeDBClientException, CLUSTER_ALL_NODES_FAILED, CLUSTER_REPLICA_NOT_PRIMARY, \
DB_DOES_NOT_EXIST
from typedb.common.rpc.request_builder import cluster_database_manager_get_req, cluster_database_manager_all_req
from typedb.common.rpc.stub import TypeDBClusterStub
from typedb.core.database_manager import _CoreDatabaseManager

T = TypeVar("T")

if TYPE_CHECKING:
from typedb.cluster.client import _ClusterClient

Expand All @@ -37,38 +41,48 @@ def __init__(self, client: "_ClusterClient"):
self._database_mgrs: Dict[str, _CoreDatabaseManager] = {addr: client.databases() for (addr, client) in client.core_clients().items()}

def contains(self, name: str) -> bool:
errors = []
for address in self._database_mgrs:
try:
return self._database_mgrs[address].contains(name)
except TypeDBClientException as e:
errors.append("- %s: %s\n" % (address, e))
raise TypeDBClientException.of(CLUSTER_ALL_NODES_FAILED, str([str(e) for e in errors]))
return self._failsafe_task(name, lambda stub, core_db_mgr: core_db_mgr.contains(name))

def create(self, name: str) -> None:
for database_manager in self._database_mgrs.values():
if not database_manager.contains(name):
database_manager.create(name)
self._failsafe_task(name, lambda stub, core_db_mgr: core_db_mgr.create(name))

def get(self, name: str) -> _ClusterDatabase:
errors = []
for address in self._database_mgrs:
try:
res = self._client.stub(address).databases_get(cluster_database_manager_get_req(name))
return _ClusterDatabase.of(res.database, self)
except TypeDBClientException as e:
errors.append("- %s: %s\n" % (address, e))
raise TypeDBClientException.of(CLUSTER_ALL_NODES_FAILED, str([str(e) for e in errors]))
return self._failsafe_task(name, lambda stub, core_db_mgr: self._get_database_task(name, stub))

def _get_database_task(self, name: str, stub: TypeDBClusterStub):
if self.contains(name):
res = stub.databases_get(cluster_database_manager_get_req(name))
return _ClusterDatabase.of(res.database, self._client)
raise TypeDBClientException.of(DB_DOES_NOT_EXIST, name)

def all(self) -> List[_ClusterDatabase]:
errors = []
for address in self._database_mgrs:
try:
res = self._client.stub(address).databases_all(cluster_database_manager_all_req())
return [_ClusterDatabase.of(db, self) for db in res.databases]
return [_ClusterDatabase.of(db, self._client) for db in res.databases]
except TypeDBClientException as e:
errors.append("- %s: %s\n" % (address, e))
raise TypeDBClientException.of(CLUSTER_ALL_NODES_FAILED, str([str(e) for e in errors]))

def database_mgrs(self) -> Dict[str, _CoreDatabaseManager]:
return self._database_mgrs

def _failsafe_task(self, name: str, task: Callable[[TypeDBClusterStub, _CoreDatabaseManager], T]):
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is where the bulk of the fault-tolerant DB operations logic lies

failsafe_task = _DatabaseManagerFailsafeTask(self._client, name, task)
try:
return failsafe_task.run_any_replica()
except TypeDBClientException as e:
if e.error_message == CLUSTER_REPLICA_NOT_PRIMARY:
return failsafe_task.run_primary_replica()
raise e


class _DatabaseManagerFailsafeTask(_FailsafeTask):

def __init__(self, client: "_ClusterClient", database: str, task: Callable[[TypeDBClusterStub, _CoreDatabaseManager], T]):
super().__init__(client, database)
self.task = task

def run(self, replica: _ClusterDatabase.Replica) -> T:
return self.task(self.client.stub(replica.address()), self.client.core_client(replica.address()).databases())
Loading