Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft cache source #247

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion volumina/pixelpipeline/datasourcefactories.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
hasLazyflow = True
try:
import lazyflow
from .datasources import LazyflowSource
from .datasources import LazyflowSource, CachableSource
except ImportError:
hasLazyflow = False

Expand Down Expand Up @@ -108,6 +108,7 @@ def _createDataSourceLazyflow(slot, withShape):
# has to handle Lazyflow source
src = LazyflowSource(slot)
shape = src._op5.Output.meta.shape
src = CachableSource(src)
if withShape:
return src, shape
else:
Expand Down
93 changes: 93 additions & 0 deletions volumina/pixelpipeline/datasources.py
Original file line number Diff line number Diff line change
Expand Up @@ -416,6 +416,99 @@ def __hash__(self):
# *******************************************************************************


class CachableSource(QObject):
isDirty = pyqtSignal(object)
numberOfChannelsChanged = pyqtSignal(int)

class _Request:
def __init__(self, cache, slicing, key):
self._cache = cache
self._slicing = slicing
self._key = key
self._result = None
self._rq = self._cache._source.request(self._slicing)

def wait(self):
self._result = res = self._rq.wait()
self._cache._cache[self._key] = res
return res

def getResult(self):
return self._result

def cancel(self):
print("SUBMIT")
self._rq.cancel()

def submit(self):
print("SUBMIT")
self._rq.submit()

class _CachedRequest:
def __init__(self, result):
self._result = result

def getResult(sefl):
return self._result

def wait(self):
return self._result

def cancel(self):
pass

def submit(self):
pass

def __init__(self, source):
super().__init__()
self._source = source
self._lock = threading.Lock()
self._cache = {}
self._req = {}

def cache_key(self, slicing):
parts = []

for el in slicing:
_, key_part = el.__reduce__()
parts.append(key_part)

return tuple(parts)

def request(self, slicing):
key = self.cache_key(slicing)

with self._lock:
if key in self._cache:
return self._CachedRequest(self._cache[key])

else:
if key not in self._req:
self._req[key] = self._Request(self, slicing, key)

return self._req[key]

def __getattr__(self, attr):
return getattr(self._source, attr)

def setDirty(self, slicing):
if not is_pure_slicing(slicing):
raise Exception("dirty region: slicing is not pure")
self.isDirty.emit(slicing)

def __eq__(self, other):
if other is None:
return False
return self._source is other._source

def __ne__(self, other):
return not (self == other)

def __hash__(self):
return hash(self._source)


class ConstantRequest(object):
def __init__(self, result):
self._result = result
Expand Down