Skip to content

Commit

Permalink
[SYNPY-1316] Updating to caching logic to take in MD5 key (#1008)
Browse files Browse the repository at this point in the history
* Updating to caching logic to take in MD5
  • Loading branch information
BryanFauble authored Nov 6, 2023
1 parent a81545f commit e283f88
Show file tree
Hide file tree
Showing 4 changed files with 281 additions and 36 deletions.
11 changes: 0 additions & 11 deletions synapseclient/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -1070,7 +1070,6 @@ def _getWithEntityBundle(self, entityBundle, entity=None, **kwargs):
entity,
ifcollision,
submission,
file_handle.get("contentMd5", None),
)
else: # no filehandle means that we do not have DOWNLOAD permission
warning_message = (
Expand Down Expand Up @@ -1103,7 +1102,6 @@ def _download_file_entity(
entity: Entity,
ifcollision: str,
submission: str,
expected_md5: typing.Union[str, None],
):
# set the initial local state
entity.path = None
Expand All @@ -1115,15 +1113,6 @@ def _download_file_entity(
# downloaded the file
cached_file_path = self.cache.get(entity.dataFileHandleId, downloadLocation)

# This is to handle for cases where the names of the files in the cache and the
# requested file name match - But there is a difference in content.
if (
expected_md5 is not None
and cached_file_path is not None
and utils.md5_for_file(cached_file_path).hexdigest() != expected_md5
):
cached_file_path = None

# location in .synapseCache where the file would be corresponding to its FileHandleId
synapseCache_location = self.cache.get_cache_dir(entity.dataFileHandleId)

Expand Down
145 changes: 121 additions & 24 deletions synapseclient/core/cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,16 @@
import datetime
import json
import math
import operator
import os
import re
import shutil
import typing

from synapseclient.core.lock import Lock
from synapseclient.core import utils
from opentelemetry import trace

tracer = trace.get_tracer("synapseclient")

CACHE_ROOT_DIR = os.path.join("~", ".synapseCache")

Expand Down Expand Up @@ -94,7 +96,9 @@ def __init__(self, cache_root_dir=CACHE_ROOT_DIR, fanout=1000):
self.fanout = fanout
self.cache_map_file_name = ".cacheMap"

def get_cache_dir(self, file_handle_id):
def get_cache_dir(
self, file_handle_id: typing.Union[collections.abc.Mapping, str]
) -> str:
if isinstance(file_handle_id, collections.abc.Mapping):
if "dataFileHandleId" in file_handle_id:
file_handle_id = file_handle_id["dataFileHandleId"]
Expand All @@ -112,7 +116,7 @@ def get_cache_dir(self, file_handle_id):
str(file_handle_id),
)

def _read_cache_map(self, cache_dir):
def _read_cache_map(self, cache_dir: str) -> dict:
cache_map_file = os.path.join(cache_dir, self.cache_map_file_name)

if not os.path.exists(cache_map_file):
Expand All @@ -128,7 +132,7 @@ def _read_cache_map(self, cache_dir):

return cache_map

def _write_cache_map(self, cache_dir, cache_map):
def _write_cache_map(self, cache_dir: str, cache_map: dict) -> None:
if not os.path.exists(cache_dir):
os.makedirs(cache_dir)

Expand All @@ -138,7 +142,63 @@ def _write_cache_map(self, cache_dir, cache_map):
json.dump(cache_map, f)
f.write("\n") # For compatibility with R's JSON parser

def contains(self, file_handle_id, path):
def _get_cache_modified_time(
self, cache_map_entry: typing.Union[str, dict, None]
) -> typing.Union[str, None]:
"""
Retrieve the `modified_time` from the cache_map_entry. This needed to be
backwards-compatible any cache entries that do not have the new JSON structure
for data. That means that if the cache_map_entry has a `modified_time` key,
then it is a new entry and we can return the value. If it does not, then it
is an old entry and we should return the cache_map_entry itself.
:param cache_map_entry: The entry from the cache map
:returns: The modified time if it exists, otherwise the cache_map_entry
"""
if cache_map_entry is not None and "modified_time" in cache_map_entry:
return cache_map_entry.get("modified_time", None)
else:
return cache_map_entry

def _get_cache_content_md5(
self, cache_map_entry: typing.Union[str, dict, None]
) -> typing.Union[str, None]:
"""
Retrieve the `content_md5` from the cache_map_entry.
:param cache_map_entry: The entry from the cache map
:returns: The content md5 if it exists, otherwise None
"""
if cache_map_entry is not None and "content_md5" in cache_map_entry:
return cache_map_entry.get("content_md5", None)
else:
return None

def _cache_item_unmodified(
self, cache_map_entry: typing.Union[str, dict], path: str
) -> bool:
"""
Determine if the cache_map_entry is unmodified by comparing the modified_time
and content_md5 to the file at the given path.
:param cache_map_entry: The entry from the cache map
:param path: The path to the file to compare to
:returns: True if the cache_map_entry is unmodified, otherwise False
"""
cached_time = self._get_cache_modified_time(cache_map_entry)
cached_md5 = self._get_cache_content_md5(cache_map_entry)

# compare_timestamps has an implicit check for whether the path exists
return compare_timestamps(_get_modified_time(path), cached_time) and (
cached_md5 is None or cached_md5 == utils.md5_for_file(path).hexdigest()
)

def contains(
self, file_handle_id: typing.Union[collections.abc.Mapping, str], path: str
) -> bool:
"""
Given a file and file_handle_id, return True if an unmodified cached
copy of the file exists at the exact path given or False otherwise.
Expand All @@ -154,12 +214,18 @@ def contains(self, file_handle_id, path):

path = utils.normalize_path(path)

cached_time = cache_map.get(path, None)
cached_time = self._get_cache_modified_time(cache_map.get(path, None))

if cached_time:
return compare_timestamps(_get_modified_time(path), cached_time)
return False

def get(self, file_handle_id, path=None):
@tracer.start_as_current_span("cache::get")
def get(
self,
file_handle_id: typing.Union[collections.abc.Mapping, str],
path: str = None,
) -> typing.Union[str, None]:
"""
Retrieve a file with the given file handle from the cache.
Expand All @@ -174,7 +240,14 @@ def get(self, file_handle_id, path=None):
exists in the specified location or None if it does not
"""
cache_dir = self.get_cache_dir(file_handle_id)
trace.get_current_span().set_attributes(
{
"synapse.cache.dir": cache_dir,
"synapse.cache.file_handle_id": file_handle_id,
}
)
if not os.path.exists(cache_dir):
trace.get_current_span().set_attributes({"synapse.cache.hit": False})
return None

with Lock(self.cache_map_file_name, dir=cache_dir):
Expand All @@ -196,11 +269,10 @@ def get(self, file_handle_id, path=None):
)

# iterate a copy of cache_map to allow modifying original cache_map
for cached_file_path, cached_time in dict(cache_map).items():
for cached_file_path, cache_map_entry in dict(cache_map).items():
if path == os.path.dirname(cached_file_path):
# compare_timestamps has an implicit check for whether the path exists
if compare_timestamps(
_get_modified_time(cached_file_path), cached_time
if self._cache_item_unmodified(
cache_map_entry, cached_file_path
):
# "break" instead of "return" to write removed invalid entries to disk if necessary
matching_unmodified_directory = cached_file_path
Expand All @@ -216,30 +288,44 @@ def get(self, file_handle_id, path=None):
self._write_cache_map(cache_dir, cache_map)

if matching_unmodified_directory is not None:
trace.get_current_span().set_attributes(
{"synapse.cache.hit": True}
)
return matching_unmodified_directory

# if we're given a full file path, look up a matching file in the cache
else:
cached_time = cache_map.get(path, None)
if cached_time:
return (
cache_map_entry = cache_map.get(path, None)
if cache_map_entry:
matching_file_path = (
path
if compare_timestamps(_get_modified_time(path), cached_time)
if self._cache_item_unmodified(cache_map_entry, path)
else None
)
trace.get_current_span().set_attributes(
{"synapse.cache.hit": matching_file_path is not None}
)
return matching_file_path

# return most recently cached and unmodified file OR
# None if there are no unmodified files
for cached_file_path, cached_time in sorted(
cache_map.items(), key=operator.itemgetter(1), reverse=True
for cached_file_path, cache_map_entry in sorted(
cache_map.items(),
key=lambda item: item[1]["modified_time"]
if isinstance(item[1], dict)
else item[1],
reverse=True,
):
if compare_timestamps(
_get_modified_time(cached_file_path), cached_time
):
if self._cache_item_unmodified(cache_map_entry, cached_file_path):
trace.get_current_span().set_attributes({"synapse.cache.hit": True})
return cached_file_path

trace.get_current_span().set_attributes({"synapse.cache.hit": False})
return None

def add(self, file_handle_id, path):
def add(
self, file_handle_id: typing.Union[collections.abc.Mapping, str], path: str
) -> dict:
"""
Add a file to the cache
"""
Expand All @@ -252,19 +338,30 @@ def add(self, file_handle_id, path):

path = utils.normalize_path(path)
# write .000 milliseconds for backward compatibility
cache_map[path] = epoch_time_to_iso(math.floor(_get_modified_time(path)))
cache_map[path] = {
"modified_time": epoch_time_to_iso(
math.floor(_get_modified_time(path))
),
"content_md5": utils.md5_for_file(path).hexdigest(),
}
self._write_cache_map(cache_dir, cache_map)

return cache_map

def remove(self, file_handle_id, path=None, delete=None):
def remove(
self,
file_handle_id: typing.Union[collections.abc.Mapping, str],
path: str = None,
delete: bool = None,
) -> typing.List[str]:
"""
Remove a file from the cache.
:param file_handle_id: Will also extract file handle id from either a File or file handle
:param path: If the given path is None, remove (and potentially delete)
all cached copies. If the path is that of a file in the
.cacheMap file, remove it.
:param delete: If True, delete the file from disk as well as removing it from the cache
:returns: A list of files removed
"""
Expand Down Expand Up @@ -318,7 +415,7 @@ def purge(
before_date: typing.Union[datetime.datetime, int] = None,
after_date: typing.Union[datetime.datetime, int] = None,
dry_run: bool = False,
):
) -> int:
"""
Purge the cache. Use with caution. Deletes files whose cache maps were last updated in a specified period.
Expand Down
Loading

0 comments on commit e283f88

Please sign in to comment.