-
Notifications
You must be signed in to change notification settings - Fork 24
Fault-tolerant Cluster database operations #224
Changes from all commits
07bd641
61b2811
880fdbf
fc93885
2b3fb37
5c2e077
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -23,9 +23,8 @@ | |
from typedb.api.client import TypeDBClusterClient | ||
from typedb.api.options import TypeDBOptions, TypeDBClusterOptions | ||
from typedb.api.session import SessionType | ||
from typedb.cluster.database import _ClusterDatabase | ||
from typedb.cluster.database import _ClusterDatabase, _FailsafeTask | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. is this needed? |
||
from typedb.cluster.database_manager import _ClusterDatabaseManager | ||
from typedb.cluster.failsafe_task import _FailsafeTask | ||
from typedb.cluster.session import _ClusterSession | ||
from typedb.common.exception import TypeDBClientException, UNABLE_TO_CONNECT, CLUSTER_UNABLE_TO_CONNECT | ||
from typedb.common.rpc.request_builder import cluster_server_manager_all_req | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -18,33 +18,39 @@ | |
# specific language governing permissions and limitations | ||
# under the License. | ||
# | ||
from abc import ABC, abstractmethod | ||
from time import sleep | ||
from typing import Dict, Optional, Set, TYPE_CHECKING | ||
|
||
import typedb_protocol.cluster.cluster_database_pb2 as cluster_database_proto | ||
|
||
from typedb.api.database import ClusterDatabase | ||
from typedb.common.exception import TypeDBClientException, UNABLE_TO_CONNECT, CLUSTER_REPLICA_NOT_PRIMARY, \ | ||
CLUSTER_UNABLE_TO_CONNECT | ||
from typedb.common.rpc.request_builder import cluster_database_manager_get_req | ||
from typedb.core.database import _CoreDatabase | ||
|
||
if TYPE_CHECKING: | ||
from typedb.cluster.database_manager import _ClusterDatabaseManager | ||
from typedb.cluster.client import _ClusterClient | ||
|
||
|
||
class _ClusterDatabase(ClusterDatabase): | ||
|
||
def __init__(self, database: str, cluster_database_mgr: "_ClusterDatabaseManager"): | ||
def __init__(self, database: str, client: "_ClusterClient"): | ||
self._name = database | ||
self._database_mgr = cluster_database_mgr | ||
self._client = client | ||
self._databases: Dict[str, _CoreDatabase] = {} | ||
self._replicas: Set["_ClusterDatabase.Replica"] = set() | ||
for address in cluster_database_mgr.database_mgrs(): | ||
core_database_mgr = cluster_database_mgr.database_mgrs()[address] | ||
cluster_db_mgr = client.databases() | ||
for address in cluster_db_mgr.database_mgrs(): | ||
core_database_mgr = cluster_db_mgr.database_mgrs()[address] | ||
self._databases[address] = _CoreDatabase(core_database_mgr.stub(), name=database) | ||
|
||
@staticmethod | ||
def of(proto_db: cluster_database_proto.ClusterDatabase, cluster_database_mgr: "_ClusterDatabaseManager") -> "_ClusterDatabase": | ||
def of(proto_db: cluster_database_proto.ClusterDatabase, client: "_ClusterClient") -> "_ClusterDatabase": | ||
assert proto_db.replicas | ||
database: str = proto_db.name | ||
database_cluster_rpc = _ClusterDatabase(database, cluster_database_mgr) | ||
database_cluster_rpc = _ClusterDatabase(database, client) | ||
for proto_replica in proto_db.replicas: | ||
database_cluster_rpc.replicas().add(_ClusterDatabase.Replica.of(proto_replica, database_cluster_rpc)) | ||
print("Discovered database cluster: %s" % database_cluster_rpc) | ||
|
@@ -57,9 +63,8 @@ def schema(self) -> str: | |
return next(iter(self._databases.values())).schema() | ||
|
||
def delete(self) -> None: | ||
for address in self._databases: | ||
if self._database_mgr.database_mgrs()[address].contains(self._name): | ||
self._databases[address].delete() | ||
delete_db_task = _DeleteDatabaseFailsafeTask(self._client, self._name, self._databases) | ||
delete_db_task.run_primary_replica() | ||
|
||
def replicas(self): | ||
return self._replicas | ||
|
@@ -142,3 +147,99 @@ def __hash__(self): | |
|
||
def __str__(self): | ||
return "%s/%s" % (self._address, self._database) | ||
|
||
|
||
# This class has to live here because of circular class creation between ClusterDatabase and FailsafeTask | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is highly unpleasant but I can't find any workaround that is any better. Because an instance method of |
||
class _FailsafeTask(ABC): | ||
|
||
PRIMARY_REPLICA_TASK_MAX_RETRIES = 10 | ||
FETCH_REPLICAS_MAX_RETRIES = 10 | ||
WAIT_FOR_PRIMARY_REPLICA_SELECTION_SECONDS: float = 2 | ||
|
||
def __init__(self, client: "_ClusterClient", database: str): | ||
self.client = client | ||
self.database = database | ||
|
||
@abstractmethod | ||
def run(self, replica: "_ClusterDatabase.Replica"): | ||
pass | ||
|
||
def rerun(self, replica: "_ClusterDatabase.Replica"): | ||
return self.run(replica) | ||
|
||
def run_primary_replica(self): | ||
if self.database not in self.client.database_by_name() or not self.client.database_by_name()[self.database].primary_replica(): | ||
self._seek_primary_replica() | ||
replica = self.client.database_by_name()[self.database].primary_replica() | ||
retries = 0 | ||
while True: | ||
try: | ||
return self.run(replica) if retries == 0 else self.rerun(replica) | ||
except TypeDBClientException as e: | ||
if e.error_message in [CLUSTER_REPLICA_NOT_PRIMARY, UNABLE_TO_CONNECT]: | ||
print("Unable to open a session or transaction, retrying in 2s... %s" % str(e)) | ||
sleep(self.WAIT_FOR_PRIMARY_REPLICA_SELECTION_SECONDS) | ||
replica = self._seek_primary_replica() | ||
else: | ||
raise e | ||
retries += 1 | ||
if retries > self.PRIMARY_REPLICA_TASK_MAX_RETRIES: | ||
raise self._cluster_not_available_exception() | ||
|
||
def run_any_replica(self): | ||
if self.database in self.client.database_by_name(): | ||
cluster_database = self.client.database_by_name()[self.database] | ||
else: | ||
cluster_database = self._fetch_database_replicas() | ||
|
||
replicas = [cluster_database.preferred_replica()] + [replica for replica in cluster_database.replicas() if not replica.is_preferred()] | ||
retries = 0 | ||
for replica in replicas: | ||
try: | ||
return self.run(replica) if retries == 0 else self.rerun(replica) | ||
except TypeDBClientException as e: | ||
if e.error_message is UNABLE_TO_CONNECT: | ||
print("Unable to open a session or transaction to %s. Attempting next replica. %s" % (str(replica.replica_id()), str(e))) | ||
else: | ||
raise e | ||
retries += 1 | ||
raise self._cluster_not_available_exception() | ||
|
||
def _seek_primary_replica(self) -> "_ClusterDatabase.Replica": | ||
retries = 0 | ||
while retries < self.FETCH_REPLICAS_MAX_RETRIES: | ||
cluster_database = self._fetch_database_replicas() | ||
if cluster_database.primary_replica(): | ||
return cluster_database.primary_replica() | ||
else: | ||
sleep(self.WAIT_FOR_PRIMARY_REPLICA_SELECTION_SECONDS) | ||
retries += 1 | ||
raise self._cluster_not_available_exception() | ||
|
||
def _fetch_database_replicas(self) -> "_ClusterDatabase": | ||
for server_address in self.client.cluster_members(): | ||
try: | ||
print("Fetching replica info from %s" % server_address) | ||
res = self.client.stub(server_address).databases_get(cluster_database_manager_get_req(self.database)) | ||
cluster_database = _ClusterDatabase.of(res.database, self.client) | ||
self.client.database_by_name()[self.database] = cluster_database | ||
return cluster_database | ||
except TypeDBClientException as e: | ||
if e.error_message is UNABLE_TO_CONNECT: | ||
print("Unable to fetch replica info for database '%s' from %s. Attempting next address. %s" % (self.database, server_address, str(e))) | ||
else: | ||
raise e | ||
raise self._cluster_not_available_exception() | ||
|
||
def _cluster_not_available_exception(self) -> TypeDBClientException: | ||
return TypeDBClientException.of(CLUSTER_UNABLE_TO_CONNECT, str([str(addr) for addr in self.client.cluster_members()])) | ||
|
||
|
||
class _DeleteDatabaseFailsafeTask(_FailsafeTask): | ||
|
||
def __init__(self, client: "_ClusterClient", database: str, databases: Dict[str, _CoreDatabase]): | ||
super().__init__(client, database) | ||
self.databases = databases | ||
|
||
def run(self, replica: _ClusterDatabase.Replica): | ||
self.databases.get(replica.address()).delete() |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -18,14 +18,18 @@ | |
# specific language governing permissions and limitations | ||
# under the License. | ||
# | ||
from typing import Dict, List, TYPE_CHECKING | ||
from typing import Dict, List, TYPE_CHECKING, Callable, TypeVar | ||
|
||
from typedb.api.database import ClusterDatabaseManager | ||
from typedb.cluster.database import _ClusterDatabase | ||
from typedb.common.exception import TypeDBClientException, CLUSTER_ALL_NODES_FAILED | ||
from typedb.cluster.database import _ClusterDatabase, _FailsafeTask | ||
from typedb.common.exception import TypeDBClientException, CLUSTER_ALL_NODES_FAILED, CLUSTER_REPLICA_NOT_PRIMARY, \ | ||
DB_DOES_NOT_EXIST | ||
from typedb.common.rpc.request_builder import cluster_database_manager_get_req, cluster_database_manager_all_req | ||
from typedb.common.rpc.stub import TypeDBClusterStub | ||
from typedb.core.database_manager import _CoreDatabaseManager | ||
|
||
T = TypeVar("T") | ||
|
||
if TYPE_CHECKING: | ||
from typedb.cluster.client import _ClusterClient | ||
|
||
|
@@ -37,38 +41,48 @@ def __init__(self, client: "_ClusterClient"): | |
self._database_mgrs: Dict[str, _CoreDatabaseManager] = {addr: client.databases() for (addr, client) in client.core_clients().items()} | ||
|
||
def contains(self, name: str) -> bool: | ||
errors = [] | ||
for address in self._database_mgrs: | ||
try: | ||
return self._database_mgrs[address].contains(name) | ||
except TypeDBClientException as e: | ||
errors.append("- %s: %s\n" % (address, e)) | ||
raise TypeDBClientException.of(CLUSTER_ALL_NODES_FAILED, str([str(e) for e in errors])) | ||
return self._failsafe_task(name, lambda stub, core_db_mgr: core_db_mgr.contains(name)) | ||
|
||
def create(self, name: str) -> None: | ||
for database_manager in self._database_mgrs.values(): | ||
if not database_manager.contains(name): | ||
database_manager.create(name) | ||
self._failsafe_task(name, lambda stub, core_db_mgr: core_db_mgr.create(name)) | ||
|
||
def get(self, name: str) -> _ClusterDatabase: | ||
errors = [] | ||
for address in self._database_mgrs: | ||
try: | ||
res = self._client.stub(address).databases_get(cluster_database_manager_get_req(name)) | ||
return _ClusterDatabase.of(res.database, self) | ||
except TypeDBClientException as e: | ||
errors.append("- %s: %s\n" % (address, e)) | ||
raise TypeDBClientException.of(CLUSTER_ALL_NODES_FAILED, str([str(e) for e in errors])) | ||
return self._failsafe_task(name, lambda stub, core_db_mgr: self._get_database_task(name, stub)) | ||
|
||
def _get_database_task(self, name: str, stub: TypeDBClusterStub): | ||
if self.contains(name): | ||
res = stub.databases_get(cluster_database_manager_get_req(name)) | ||
return _ClusterDatabase.of(res.database, self._client) | ||
raise TypeDBClientException.of(DB_DOES_NOT_EXIST, name) | ||
|
||
def all(self) -> List[_ClusterDatabase]: | ||
errors = [] | ||
for address in self._database_mgrs: | ||
try: | ||
res = self._client.stub(address).databases_all(cluster_database_manager_all_req()) | ||
return [_ClusterDatabase.of(db, self) for db in res.databases] | ||
return [_ClusterDatabase.of(db, self._client) for db in res.databases] | ||
except TypeDBClientException as e: | ||
errors.append("- %s: %s\n" % (address, e)) | ||
raise TypeDBClientException.of(CLUSTER_ALL_NODES_FAILED, str([str(e) for e in errors])) | ||
|
||
def database_mgrs(self) -> Dict[str, _CoreDatabaseManager]: | ||
return self._database_mgrs | ||
|
||
def _failsafe_task(self, name: str, task: Callable[[TypeDBClusterStub, _CoreDatabaseManager], T]): | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is where the bulk of the fault-tolerant DB operations logic lies |
||
failsafe_task = _DatabaseManagerFailsafeTask(self._client, name, task) | ||
try: | ||
return failsafe_task.run_any_replica() | ||
except TypeDBClientException as e: | ||
if e.error_message == CLUSTER_REPLICA_NOT_PRIMARY: | ||
return failsafe_task.run_primary_replica() | ||
raise e | ||
|
||
|
||
class _DatabaseManagerFailsafeTask(_FailsafeTask): | ||
|
||
def __init__(self, client: "_ClusterClient", database: str, task: Callable[[TypeDBClusterStub, _CoreDatabaseManager], T]): | ||
super().__init__(client, database) | ||
self.task = task | ||
|
||
def run(self, replica: _ClusterDatabase.Replica) -> T: | ||
return self.task(self.client.stub(replica.address()), self.client.core_client(replica.address()).databases()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've created an issue to extract the TypeDB setup logic into
TypeDBRunner
andTypeDBClusterRunner
, just like in Java: typedb/typedb-dependencies#286. I've also created one for NodeJS: typedb/typedb-dependencies#287.