-
Notifications
You must be signed in to change notification settings - Fork 99
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
fix(schema): reconnect closed connection #804
Conversation
Deploying datachain-documentation with Cloudflare Pages
|
Codecov ReportAll modified and coverable lines are covered by tests ✅
Additional details and impacted files@@ Coverage Diff @@
## main #804 +/- ##
=======================================
Coverage 87.43% 87.44%
=======================================
Files 128 128
Lines 11332 11338 +6
Branches 1529 1530 +1
=======================================
+ Hits 9908 9914 +6
Misses 1046 1046
Partials 378 378
Flags with carried forward coverage won't be shown. Click here to find out more. ☔ View full report in Codecov by Sentry. |
d60772f
to
8e9b78a
Compare
@@ -150,14 +151,12 @@ class DataTable: | |||
def __init__( | |||
self, | |||
name: str, | |||
engine: "Engine", | |||
metadata: Optional["sa.MetaData"] = None, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure what was the case for it to be None
and for this code below:
self.metadata: sa.MetaData = metadata if metadata is not None else sa.MetaData()
that initializes its own copy of MetaData()
I haven't found any use case so far. It is simpler this way when internals of the DatabaseEngine
are not exposed into DataTable.
Let's see if tests pass.
if self.is_closed: | ||
# Reconnect in case of being closed previously. | ||
self._reconnect() | ||
return super().get_table(name) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
get_table
might trigger a connection and that triggers closed database
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍
@@ -220,7 +219,7 @@ def dataset_select_paginated( | |||
num_yielded = 0 | |||
|
|||
# Ensure we're using a thread-local connection | |||
with self.clone() as wh: | |||
with self.clone(use_new_connection=True) as wh: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Was this added because clone()
could reuse the closed connection by default? Maybe this could be fixed on clickhouse backend instead of having to add this flag everywhere?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this flag was added for CH as far as I remember, good catch @skshetry I'm not sure it is needed in this place
(not sure it worth refactoring this flag atm, I'll just revert this flag in this place)
if self.is_closed: | ||
# Reconnect in case of being closed previously. | ||
self._reconnect() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ideally, this should be automatic.
@property
def db(self):
if self._db.is_closed:
self._reconnect()
return self._db
I see executemany
, execute_str
and other methods that are not reconnecting.
Although engine
and metadata
do complicate this.
Fixes #660
Refactors
DataTable
by removing direct access to sqlachemy (and thus requiring direct access to sa.Engine, etc). This allows us to do a check and reconnect if connection was already closed.