Skip to content

Commit

Permalink
Counts files to db migration (#295)
Browse files Browse the repository at this point in the history
* comments to refactor

* old changes

* minor fixes

* replaced json file with indexes with db

* fixed blob reading

* updated writing to DB

* updated writing to DB for centroids

* pickling fix

* refactor expressions data array

* db refactoring

* refactoring providers

* Continue refactoring fast cache assessor etc for databased cache files.

* Fix new bug related to apt and postgres docker image.

* Remove file-based fast cache assessor tests.

* Deprecate file-base caching test.

* Remove unused artifacts

* Update test and fix/update imports.

* Add one minimal test

* Deprecate reference to unused files

* Fix bug that was clearing out studies-loaded-in-progress prematurely.

* Make study vs measurement study consistent for centroids

* Fix bug with premature clearing of expressions locally.

* Make study vs measurement study consistent in another place.

* Deprecate reference to indexed samples

* Update logger info messages.

* Deprecate volume mounting for ondemand container.

* Refactor to make CLI cache pulling the more advanced operation, reasonable to run after data upload.

* Start updating data loaded containers to have cache prebuilt.

* FIx missing parameter in import dataset scripts

* Make caching possibly study specific

* Fix some mistakes in extracting from study.json

* Remove old invocation of caching

* Version bump

---------

Co-authored-by: Grigory Frantsuzov <[email protected]>
Co-authored-by: Jimmy Mathews <[email protected]>
Co-authored-by: James Mathews <[email protected]>
  • Loading branch information
4 people authored Feb 27, 2024
1 parent 242b14a commit a65f533
Show file tree
Hide file tree
Showing 56 changed files with 430 additions and 878 deletions.
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -334,7 +334,7 @@ data-loaded-image-%: ${BUILD_LOCATION_ABSOLUTE}/db/docker.built ${BUILD_SCRIPTS_
docker container start temporary-spt-db-preloading && \
bash ${BUILD_SCRIPTS_LOCATION_ABSOLUTE}/poll_container_readiness_direct.sh temporary-spt-db-preloading && \
pipeline_cmd="cd /working_dir; cp -r /mount_sources/build .; cp -r /mount_sources/test .; bash build/build_scripts/import_test_dataset$*.sh "; \
docker run \
docker container run \
--rm \
--network container:temporary-spt-db-preloading \
--mount type=bind,src=${PWD},dst=/mount_sources \
Expand Down
6 changes: 0 additions & 6 deletions build/apiserver/compose.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,6 @@ services:
SINGLE_CELL_DATABASE_HOST: spt-db-testing-only
SINGLE_CELL_DATABASE_USER: postgres
SINGLE_CELL_DATABASE_PASSWORD: postgres
USE_ALTERNATIVE_TESTING_DATABASE: 1
DISABLE_FAST_CACHE_RECREATION: 1
volumes:
- type: bind
source: ../../test/ondemand/test_expression_data
target: /ondemand/source_data/
pull_policy: never

testing-api-server:
Expand Down
4 changes: 0 additions & 4 deletions build/ondemand/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -32,23 +32,20 @@ teardown-unit-testing: ${UNIT_TESTS}
>@docker compose down && docker compose rm --force --stop ; echo "$$?" > status_code
>@${MESSAGE} end "Down." "Error."
>@rm -f status_code
>@rm -f ${TEST_LOCATION_ABSOLUTE}/ondemand/module_tests/expected_binary_expression_data_cohoused/.spt_db.config.generated
>@rm -f ${TEST_LOCATION_ABSOLUTE}/ondemand/test_expression_data/.spt_db.config.generated

teardown-module-testing: ${MODULE_TESTS}
>@${MESSAGE} start "\u2517\u2501"
>@docker compose down && docker compose rm --force --stop ; echo "$$?" > status_code
>@${MESSAGE} end "Down." "Error."
>@rm -f status_code
>@rm -f ${TEST_LOCATION_ABSOLUTE}/ondemand/module_tests/expected_binary_expression_data_cohoused/.spt_db.config.generated
>@rm -f ${TEST_LOCATION_ABSOLUTE}/ondemand/test_expression_data/.spt_db.config.generated

teardown-testing: ${TESTS}
>@${MESSAGE} start "\u2517\u2501"
>@docker compose down && docker compose rm --force --stop ; echo "$$?" > status_code
>@${MESSAGE} end "Down." "Error."
>@rm -f status_code
>@rm -f ${TEST_LOCATION_ABSOLUTE}/ondemand/module_tests/expected_binary_expression_data_cohoused/.spt_db.config.generated
>@rm -f ${TEST_LOCATION_ABSOLUTE}/ondemand/test_expression_data/.spt_db.config.generated

${TESTS}: setup-testing
Expand Down Expand Up @@ -87,6 +84,5 @@ clean:
>@rm -f ${WHEEL_FILENAME}
>@rm -f status_code
>@rm -rf ../../test/ondemand/expression_data/
>@rm -f ../../test/ondemand/module_tests/expected_binary_expression_data_cohoused/.spt_db.config.generated
>@rm -rf ../../test/test_data/fast_cache_testing
>@for f in dlogs.db.txt dlogs.api.txt dlogs.od.txt; do rm -f $$f; done;
6 changes: 0 additions & 6 deletions build/ondemand/compose.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,6 @@ services:
SINGLE_CELL_DATABASE_HOST: spt-db-testing-only
SINGLE_CELL_DATABASE_USER: postgres
SINGLE_CELL_DATABASE_PASSWORD: postgres
USE_ALTERNATIVE_TESTING_DATABASE: 1
DISABLE_FAST_CACHE_RECREATION: 1
volumes:
- type: bind
source: ../../test/ondemand/module_tests/expected_binary_expression_data_cohoused
target: /ondemand/source_data/
pull_policy: never

testing-database:
Expand Down
6 changes: 6 additions & 0 deletions spatialprofilingtoolbox/db/data_model/performance_tweaks.sql
Original file line number Diff line number Diff line change
Expand Up @@ -44,3 +44,9 @@ CREATE TABLE pending_feature_computation (
feature_specification VARCHAR(512) REFERENCES feature_specification(identifier),
time_initiated VARCHAR(512)
);

CREATE TABLE ondemand_studies_index (
specimen VARCHAR(512),
blob_type VARCHAR(512),
blob_contents bytea
);
15 changes: 13 additions & 2 deletions spatialprofilingtoolbox/db/database_connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -163,14 +163,14 @@ def retrieve_study_names(database_config_file: str | None) -> list[str]:
with DBCursor(database_config_file=database_config_file) as cursor:
cursor.execute('SELECT study FROM study_lookup;')
rows = cursor.fetchall()
return sorted([row[0] for row in rows])
return sorted([str(row[0]) for row in rows])


def get_specimen_names(cursor) -> list[str]:
query = 'SELECT specimen FROM specimen_collection_process;'
cursor.execute(query)
rows = cursor.fetchall()
return sorted([row[0] for row in rows])
return sorted([str(row[0]) for row in rows])


def retrieve_study_from_specimen(database_config_file: str | None, specimen: str) -> str:
Expand All @@ -189,6 +189,17 @@ def retrieve_study_from_specimen(database_config_file: str | None, specimen: str
return study


def retrieve_primary_study(database_config_file: str, component_study: str) -> str | None:
studies = retrieve_study_names(database_config_file)
for study in studies:
with DBCursor(database_config_file=database_config_file, study=study) as cursor:
query = 'SELECT COUNT(*) FROM study_component sc WHERE sc.component_study=%s ;'
cursor.execute(query, (component_study,))
count = tuple(cursor.fetchall())[0][0]
if count == 1:
return study
return None

def create_database(database_config_file: str | None, database_name: str) -> None:
if database_config_file is None:
message = 'Data import requires a database configuration file.'
Expand Down
53 changes: 53 additions & 0 deletions spatialprofilingtoolbox/db/ondemand_studies_index.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
"""Count number of cached files of given types in cached-files area of database."""

from spatialprofilingtoolbox.db.database_connection import DBCursor
from spatialprofilingtoolbox.db.database_connection import retrieve_study_names

def get_counts(database_config_file: str, blob_type: str, study: str | None = None) -> dict[str, int]:
if study is None:
studies = tuple(retrieve_study_names(database_config_file))
else:
studies = (study,)
counts: dict[str, int] = {}
for study in studies:
with DBCursor(database_config_file=database_config_file, study=study) as cursor:
cursor.execute(f'''
SELECT COUNT(*) FROM ondemand_studies_index osi
WHERE osi.blob_type='{blob_type}' ;
''')
count = tuple(cursor.fetchall())[0][0]
counts[study] = count
return counts


def drop_cache_files(database_config_file: str, blob_type: str, study: str | None = None) -> None:
if study is None:
studies = tuple(retrieve_study_names(database_config_file))
else:
studies = (study,)
for _study in studies:
with DBCursor(database_config_file=database_config_file, study=_study) as cursor:
cursor.execute(f'''
DELETE FROM ondemand_studies_index osi
WHERE osi.blob_type='{blob_type}' ;
''')


def retrieve_expressions_index(database_config_file: str, study: str) -> str:
with DBCursor(database_config_file=database_config_file, study=study) as cursor:
cursor.execute('''
SELECT blob_contents FROM ondemand_studies_index osi
WHERE osi.blob_type='expressions_index' ;
''')
result_blob = bytearray(tuple(cursor.fetchall())[0][0])
return result_blob.decode(encoding='utf-8')


def retrieve_indexed_samples(database_config_file: str, study: str) -> tuple[str, ...]:
with DBCursor(database_config_file=database_config_file, study=study) as cursor:
cursor.execute('''
SELECT specimen FROM ondemand_studies_index osi
WHERE osi.blob_type='feature_matrix' ;
''')
specimens = tuple(r[0] for r in cursor.fetchall())
return specimens
127 changes: 63 additions & 64 deletions spatialprofilingtoolbox/ondemand/compressed_matrix_writer.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,88 +2,87 @@

from typing import cast
import json
from os.path import isfile
from os.path import join
from os import getcwd

from spatialprofilingtoolbox.ondemand.defaults import EXPRESSIONS_INDEX_FILENAME
from spatialprofilingtoolbox.db.ondemand_studies_index import get_counts
from spatialprofilingtoolbox.db.database_connection import DBCursor
from spatialprofilingtoolbox.db.database_connection import retrieve_primary_study
from spatialprofilingtoolbox.standalone_utilities.log_formats import colorized_logger

logger = colorized_logger(__name__)


class CompressedMatrixWriter:
"""Write the compressed in-memory binary format matrices to file."""
database_config_file: str

@classmethod
def write_specimen(cls, data: dict[int, int], study_index: int, specimen_index: int) -> None:
cls._write_data_array(data, study_index, specimen_index)
def __init__(self, database_config_file: str | None) -> None:
self.database_config_file = cast(str, database_config_file)

@classmethod
def _write_data_array(cls,
def write_specimen(self, data: dict[int, int], study_name: str, specimen: str) -> None:
self._write_data_array(data, study_name, specimen)

def _write_data_array(self,
data_array: dict[int, int],
study_index: int,
specimen_index: int,
study_name: str,
specimen: str,
) -> None:
filename = cls._format_filename(study_index, specimen_index)
cls._write_data_array_to_file(cast(dict[int, int], data_array), filename)

@classmethod
def _format_filename(cls, study_index: int, specimen_index: int) -> str:
return '.'.join([
cls.get_data_array_filename_base(),
str(study_index),
str(specimen_index),
'bin',
])
self._write_data_array_to_db(data_array, study_name, specimen)

@classmethod
def write_index(cls,
def write_index(self,
specimens_by_measurement_study: dict[str, list[str]],
target_index_lookups: dict,
target_by_symbols: dict,
) -> None:
index = []
study_names = sorted(list(specimens_by_measurement_study.keys()))
for study_index, study_name in enumerate(sorted(study_names)):
for measurement_study_name in sorted(study_names):
index_item: dict[str, str | list] = {}
index_item['specimen measurement study name'] = study_name
index_item['expressions files'] = []
specimens = sorted(specimens_by_measurement_study[study_name])
for specimen_index, specimen in enumerate(specimens):
filename = '.'.join([
cls.get_data_array_filename_base(),
str(study_index),
str(specimen_index),
'bin',
])
index_item['expressions files'].append({
'specimen': specimen,
'filename': filename,
})
index_item['target index lookup'] = target_index_lookups[study_name]
index_item['target by symbol'] = target_by_symbols[study_name]
index.append(index_item)
filename = join(cls.get_data_directory(), EXPRESSIONS_INDEX_FILENAME)
with open(filename, 'wt', encoding='utf-8') as index_file:
index_file.write(json.dumps({'': index}, indent=4))
logger.debug('Wrote expression index file %s .', filename)

@classmethod
def get_data_directory(cls) -> str:
return getcwd()
index_item['specimen measurement study name'] = measurement_study_name
index_item['target index lookup'] = target_index_lookups[measurement_study_name]
index_item['target by symbol'] = target_by_symbols[measurement_study_name]
index_str = json.dumps({'': [index_item]})
index_str_as_bytes = index_str.encode('utf-8')
study = retrieve_primary_study(self.database_config_file, measurement_study_name)
with DBCursor(database_config_file=self.database_config_file, study=study) as cursor:
insert_query = '''
INSERT INTO
ondemand_studies_index (
specimen,
blob_type,
blob_contents
)
VALUES (%s, %s, %s) ;
'''
cursor.execute(insert_query, (None, 'expressions_index', index_str_as_bytes))
cursor.close()
logger.debug(f'Wrote expression index to database {study} .')

@classmethod
def _write_data_array_to_file(cls, data_array: dict[int, int], filename: str) -> None:
with open(filename, 'wb') as file:
for histological_structure_id, entry in data_array.items():
file.write(histological_structure_id.to_bytes(8, 'little'))
file.write(entry.to_bytes(8, 'little'))

@classmethod
def already_exists(cls, data_directory: str):
return isfile(join(data_directory, EXPRESSIONS_INDEX_FILENAME))
def _write_data_array_to_db(
self,
data_array: dict[int, int],
measurement_study_name: str,
specimen: str,
) -> None:
blob = bytearray()
for histological_structure_id, entry in data_array.items():
blob.extend(histological_structure_id.to_bytes(8, 'little'))
blob.extend(entry.to_bytes(8, 'little'))
study_name = retrieve_primary_study(self.database_config_file, measurement_study_name)
with DBCursor(database_config_file=self.database_config_file, study=study_name) as cursor:
insert_query = '''
INSERT INTO
ondemand_studies_index (
specimen,
blob_type,
blob_contents)
VALUES (%s, %s, %s) ;
'''
cursor.execute(insert_query, (specimen, 'feature_matrix', blob))
cursor.close()

@classmethod
def get_data_array_filename_base(cls):
return 'expression_data_array'
def expressions_indices_already_exist(self, study: str | None = None):
counts = get_counts(self.database_config_file, 'expressions_index', study=study)
for _study, count in counts.items():
if count > 1:
message = f'Too many ({count}) expression index files for study {_study}.'
raise ValueError(message)
return all(count == 1 for count in counts.values())
Loading

0 comments on commit a65f533

Please sign in to comment.