Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Data structure for filters #2168

Merged
merged 107 commits into from
Jan 15, 2025
Merged
Show file tree
Hide file tree
Changes from 22 commits
Commits
Show all changes
107 commits
Select commit Hold shift + click to select a range
9ee7b29
rename validator
ychiucco Jan 7, 2025
c64b3ed
add accept_none=True to valdict_scalarvalues
ychiucco Jan 7, 2025
a3b1f91
change dataset model and add migration
ychiucco Jan 7, 2025
a6a0ce1
rename variable
ychiucco Jan 7, 2025
6977433
modify validator
ychiucco Jan 7, 2025
bee422a
update dataset schemas
ychiucco Jan 7, 2025
42c6bc7
fix type hint
ychiucco Jan 7, 2025
22949ad
remaining dataset schemas
ychiucco Jan 7, 2025
1bd9a8f
split filters in workflow task model
ychiucco Jan 7, 2025
50713fb
split filters in workflowtask schemas
ychiucco Jan 7, 2025
10dd941
remove default factory from dataset read
ychiucco Jan 7, 2025
5016b3f
attribute_filters in jobv2
ychiucco Jan 7, 2025
32bdef0
attribute_filters in jobv2 schemas
ychiucco Jan 7, 2025
3d89275
fix test_schemas_dataset_v2
ychiucco Jan 7, 2025
1d33e67
fix test_workflow_task_dump
ychiucco Jan 7, 2025
d306002
remove old columns
ychiucco Jan 7, 2025
d0faae0
new filters validators
ychiucco Jan 8, 2025
2b77e1b
fix test api dataset
ychiucco Jan 8, 2025
6d2b5c7
remove input_filters
ychiucco Jan 8, 2025
f2a4af8
remove Filters schema
ychiucco Jan 8, 2025
6527b8a
remove Filters
ychiucco Jan 8, 2025
643be06
remove Filters from runner
ychiucco Jan 8, 2025
ffb9670
reintroduce old columns
ychiucco Jan 8, 2025
d7f8bf9
validators to ImageQuery
ychiucco Jan 8, 2025
d37b8c6
fix migration
ychiucco Jan 8, 2025
40afc04
revamp validators
ychiucco Jan 8, 2025
845ea28
homogeneus types in list
ychiucco Jan 8, 2025
4144d77
remove check from match filter
ychiucco Jan 8, 2025
63c0328
remove valdict_scalarvalues
ychiucco Jan 8, 2025
0b7061b
Merge branch 'main' into 2153-data-structure-for-filters
ychiucco Jan 9, 2025
d7f9ba9
task output validator
ychiucco Jan 9, 2025
b3ec844
Update task_interface.py
tcompa Jan 9, 2025
27a6c13
Update runner with new filter logic
tcompa Jan 9, 2025
0527cd7
add validator to task output type filters
ychiucco Jan 9, 2025
efd4080
Introduce `AttributeFiltersType` and accept `None` as a value for att…
tcompa Jan 9, 2025
7a4ceb9
Extend usage of `AttributeFiltersType`
tcompa Jan 9, 2025
51e1259
Extend usage of `AttributeFiltersType`
tcompa Jan 9, 2025
459638c
Adapt test_filter_image_list_with_filters
tcompa Jan 9, 2025
c372804
Add FIXME
tcompa Jan 9, 2025
4c32260
Update `validate_attribute_filters` (accepting `values=None`)
tcompa Jan 9, 2025
eb34d54
Add `test_attribute_filters_set_to_none`
tcompa Jan 9, 2025
bb3fa2f
Add job_attribute_filters to all runner backends
tcompa Jan 9, 2025
7c94711
Fix version migration script
tcompa Jan 9, 2025
dac96c9
Fix version migration script
tcompa Jan 9, 2025
e49efea
test image models
ychiucco Jan 9, 2025
984182f
test unit image tools
ychiucco Jan 9, 2025
c4a8d9f
fix test benchmark
ychiucco Jan 9, 2025
d422fe2
fix query image endpoint
ychiucco Jan 13, 2025
f8e91c8
fix test_api_dataset_images
ychiucco Jan 13, 2025
faefc52
fix dataset dump
ychiucco Jan 13, 2025
aaf6a25
exclude none from DatasetUpdate
ychiucco Jan 13, 2025
604049d
test update dataset filters
ychiucco Jan 13, 2025
14ef8eb
fix test_filters
ychiucco Jan 14, 2025
49635bc
fail with None filters
ychiucco Jan 14, 2025
fe8a0a3
run ci for pr to dev-2.11
ychiucco Jan 14, 2025
e52cd6f
fix test_patch_dataset
ychiucco Jan 14, 2025
31e249e
fix test_schemas_dataset_v2
ychiucco Jan 14, 2025
60a02ac
Merge branch 'dev-2.11' into 2153-data-structure-for-filters
ychiucco Jan 14, 2025
9b3b563
Fix use of match_filters in runner - cc @ychiucco
tcompa Jan 14, 2025
d43f99a
Update tests 04
tcompa Jan 14, 2025
3b3e369
migration script
ychiucco Jan 14, 2025
2d2bff2
do not subscript wft
ychiucco Jan 14, 2025
0aab408
Fix tests 08
tcompa Jan 14, 2025
f562ae9
Merge branch '2153-data-structure-for-filters' of github.com:fractal-…
tcompa Jan 14, 2025
0748645
FIXME
tcompa Jan 14, 2025
6c06e92
do not use extra field in dump
ychiucco Jan 14, 2025
a38093a
Merge remote-tracking branch 'refs/remotes/origin/2153-data-structure…
ychiucco Jan 14, 2025
71f2420
Fix test 09
tcompa Jan 14, 2025
f9fe9d4
Merge branch '2153-data-structure-for-filters' of github.com:fractal-…
tcompa Jan 14, 2025
7fc266b
pop datasetdump.filters
ychiucco Jan 14, 2025
43c88cb
Review use of filters in `merge_outputs`
tcompa Jan 14, 2025
acf9695
Remove obsolete file
tcompa Jan 14, 2025
097fbac
server default to None
ychiucco Jan 14, 2025
8c352a4
Merge remote-tracking branch 'refs/remotes/origin/2153-data-structure…
ychiucco Jan 14, 2025
db1fee2
Merge remote-tracking branch 'refs/remotes/origin/2153-data-structure…
ychiucco Jan 14, 2025
6dc89f4
default_server="null"
ychiucco Jan 14, 2025
3bae409
default_server="null" in models
ychiucco Jan 14, 2025
eda21b8
move definitions outside list comprehension
ychiucco Jan 14, 2025
09e097b
match_filter accept only kwargs
ychiucco Jan 14, 2025
efc6925
set level info
ychiucco Jan 14, 2025
c2b2069
use info logging
ychiucco Jan 14, 2025
d95590e
Improve match_filter
tcompa Jan 14, 2025
3e22cad
docstring and warning message
ychiucco Jan 14, 2025
872dfdd
Merge remote-tracking branch 'refs/remotes/origin/2153-data-structure…
ychiucco Jan 14, 2025
955d665
use kwargs in match_filter
ychiucco Jan 14, 2025
b574106
changes from comments
ychiucco Jan 14, 2025
5824523
create module _filter_validators.py
ychiucco Jan 14, 2025
fac902f
comment and docstring
ychiucco Jan 14, 2025
f206847
comment
ychiucco Jan 14, 2025
2e74813
Add some validation tests
tcompa Jan 15, 2025
27d03b9
Extend some validation tests
tcompa Jan 15, 2025
337be2c
remove outdated comment
ychiucco Jan 15, 2025
4cda1f1
unit test merge_outputs
ychiucco Jan 15, 2025
c18194c
refactor merge_outputs
ychiucco Jan 15, 2025
f1d067f
improve unit test
ychiucco Jan 15, 2025
6a67061
test update_legacy_filters
ychiucco Jan 15, 2025
ca2aba1
simplify check
ychiucco Jan 15, 2025
ef05293
deduplicate image_list_removals
ychiucco Jan 15, 2025
e77d32e
assert correct order
ychiucco Jan 15, 2025
d23efb0
Comment [skip ci]
tcompa Jan 15, 2025
80801f9
Add case to unit test
tcompa Jan 15, 2025
87f4fc6
Only use `job_attribute_filters` in runner - ref #2155
tcompa Jan 15, 2025
837ed30
Extend unit test
tcompa Jan 15, 2025
898c561
Rename schema
tcompa Jan 15, 2025
02d5526
remove default from image_ok and image_fail
ychiucco Jan 15, 2025
45d573b
look fok unexisting image
ychiucco Jan 15, 2025
0df4d4a
CHANGELOG [skip ci]
tcompa Jan 15, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 3 additions & 16 deletions benchmarks/runner/mocks.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
from typing import Any
from typing import Literal
from typing import Optional

from pydantic import BaseModel
Expand All @@ -13,21 +12,14 @@ class DatasetV2Mock(BaseModel):
name: str
zarr_dir: str
images: list[dict[str, Any]] = Field(default_factory=list)
filters: dict[Literal["types", "attributes"], dict[str, Any]] = Field(
default_factory=dict
)
type_filters: dict[str, bool] = Field(default_factory=dict)
attribute_filters: dict[str, list[Any]] = Field(default_factory=dict)
history: list = Field(default_factory=list)

@property
def image_zarr_urls(self) -> list[str]:
return [image["zarr_urls"] for image in self.images]

@validator("filters", always=True)
def _default_filters(cls, value):
if value == {}:
return {"types": {}, "attributes": {}}
return value


class TaskV2Mock(BaseModel):
id: int
Expand Down Expand Up @@ -77,13 +69,8 @@ class WorkflowTaskV2Mock(BaseModel):
meta_parallel: Optional[dict[str, Any]] = Field()
meta_non_parallel: Optional[dict[str, Any]] = Field()
task: TaskV2Mock = None
input_filters: dict[str, Any] = Field(default_factory=dict)
type_filters: dict[str, bool] = Field(default_factory=dict)
order: int
id: int
workflow_id: int = 0
task_id: int

@validator("input_filters", always=True)
def _default_filters(cls, value):
if value == {}:
return {"types": {}, "attributes": {}}
12 changes: 5 additions & 7 deletions fractal_server/app/models/v2/dataset.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
from datetime import datetime
from typing import Any
from typing import Literal
from typing import Optional

from sqlalchemy import Column
Expand Down Expand Up @@ -41,12 +40,11 @@ class Config:
sa_column=Column(JSON, server_default="[]", nullable=False)
)

filters: dict[Literal["attributes", "types"], dict[str, Any]] = Field(
sa_column=Column(
JSON,
nullable=False,
server_default='{"attributes": {}, "types": {}}',
)
type_filters: dict[str, bool] = Field(
ychiucco marked this conversation as resolved.
Show resolved Hide resolved
sa_column=Column(JSON, nullable=False, server_default="{}")
)
attribute_filters: dict[str, list[Any]] = Field(
sa_column=Column(JSON, nullable=False, server_default="{}")
)

@property
Expand Down
4 changes: 4 additions & 0 deletions fractal_server/app/models/v2/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,3 +49,7 @@ class Config:
)
status: str = JobStatusTypeV2.SUBMITTED
log: Optional[str] = None

attribute_filters: dict[str, list[Any]] = Field(
sa_column=Column(JSON, nullable=False, server_default="{}")
)
11 changes: 2 additions & 9 deletions fractal_server/app/models/v2/workflowtask.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
from typing import Any
from typing import Literal
from typing import Optional

from sqlalchemy import Column
Expand All @@ -25,14 +24,8 @@ class Config:
args_parallel: Optional[dict[str, Any]] = Field(sa_column=Column(JSON))
args_non_parallel: Optional[dict[str, Any]] = Field(sa_column=Column(JSON))

input_filters: dict[
Literal["attributes", "types"], dict[str, Any]
] = Field(
sa_column=Column(
JSON,
nullable=False,
server_default='{"attributes": {}, "types": {}}',
)
type_filters: dict[str, bool] = Field(
ychiucco marked this conversation as resolved.
Show resolved Hide resolved
sa_column=Column(JSON, nullable=False, server_default="{}")
)

# Task
Expand Down
13 changes: 3 additions & 10 deletions fractal_server/app/routes/api/v2/_aux_functions.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
from ....models.v2 import WorkflowTaskV2
from ....models.v2 import WorkflowV2
from ....schemas.v2 import JobStatusTypeV2
from fractal_server.images import Filters


async def _get_project_check_owner(
Expand Down Expand Up @@ -336,7 +335,7 @@ async def _workflow_insert_task(
meta_non_parallel: Optional[dict[str, Any]] = None,
args_non_parallel: Optional[dict[str, Any]] = None,
args_parallel: Optional[dict[str, Any]] = None,
input_filters: Optional[Filters] = None,
type_filters: Optional[dict[str, bool]] = None,
db: AsyncSession,
) -> WorkflowTaskV2:
"""
Expand All @@ -350,7 +349,7 @@ async def _workflow_insert_task(
meta_non_parallel:
args_non_parallel:
args_parallel:
input_filters:
type_filters:
db:
"""
db_workflow = await db.get(WorkflowV2, workflow_id)
Expand All @@ -376,12 +375,6 @@ async def _workflow_insert_task(
if final_meta_non_parallel == {}:
final_meta_non_parallel = None

# Prepare input_filters attribute
if input_filters is None:
input_filters_kwarg = {}
else:
input_filters_kwarg = dict(input_filters=input_filters)

# Create DB entry
wf_task = WorkflowTaskV2(
task_type=task_type,
Expand All @@ -390,7 +383,7 @@ async def _workflow_insert_task(
args_parallel=args_parallel,
meta_parallel=final_meta_parallel,
meta_non_parallel=final_meta_non_parallel,
**input_filters_kwarg,
type_filters=(type_filters or dict()),
)
db_workflow.task_list.append(wf_task)
flag_modified(db_workflow, "task_list")
Expand Down
13 changes: 9 additions & 4 deletions fractal_server/app/routes/api/v2/images.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
from fractal_server.app.db import get_async_db
from fractal_server.app.models import UserOAuth
from fractal_server.app.routes.auth import current_active_user
from fractal_server.images import Filters
from fractal_server.images import SingleImage
from fractal_server.images import SingleImageUpdate
from fractal_server.images.tools import find_image_by_zarr_url
Expand All @@ -38,7 +37,8 @@ class ImagePage(BaseModel):

class ImageQuery(BaseModel):
zarr_url: Optional[str]
filters: Filters = Field(default_factory=Filters)
type_filters: dict[str, bool] = Field(default_factory=dict)
attribute_filters: dict[str, list[Any]] = Field(default_factory=dict)
ychiucco marked this conversation as resolved.
Show resolved Hide resolved


@router.post(
Expand Down Expand Up @@ -124,7 +124,11 @@ async def query_dataset_images(
images = [
image
for image in images
if match_filter(image, Filters(**dataset.filters))
if match_filter(
image,
type_filters=dataset.type_filters,
attribute_filters=dataset.attribute_filters,
)
]

attributes = {}
Expand Down Expand Up @@ -160,7 +164,8 @@ async def query_dataset_images(
for image in images
if match_filter(
image,
Filters(**query.filters.dict()),
type_filters=query.type_filters,
attribute_filters=query.attribute_filters,
)
]

Expand Down
6 changes: 3 additions & 3 deletions fractal_server/app/routes/api/v2/workflowtask.py
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ async def replace_workflowtask(
task_type=task.type,
task=task,
# old-task values
input_filters=old_workflow_task.input_filters,
type_filters=old_workflow_task.type_filters,
# possibly new values
args_non_parallel=_args_non_parallel,
args_parallel=_args_parallel,
Expand Down Expand Up @@ -183,7 +183,7 @@ async def create_workflowtask(
meta_parallel=new_task.meta_parallel,
args_non_parallel=new_task.args_non_parallel,
args_parallel=new_task.args_parallel,
input_filters=new_task.input_filters,
type_filters=new_task.type_filters,
db=db,
)

Expand Down Expand Up @@ -274,7 +274,7 @@ async def update_workflowtask(
if not actual_args:
actual_args = None
setattr(db_wf_task, key, actual_args)
elif key in ["meta_parallel", "meta_non_parallel", "input_filters"]:
elif key in ["meta_parallel", "meta_non_parallel", "type_filters"]:
setattr(db_wf_task, key, value)
else:
raise HTTPException(
Expand Down
3 changes: 2 additions & 1 deletion fractal_server/app/runner/filenames.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
HISTORY_FILENAME = "history.json"
FILTERS_FILENAME = "filters.json"
TYPE_FILTERS_FILENAME = "type_filters.json"
ATTRIBUTE_FILTERS_FILENAME = "attribute_filters.json"
IMAGES_FILENAME = "images.json"
METADATA_FILENAME = "metadata.json"
SHUTDOWN_FILENAME = "shutdown"
Expand Down
39 changes: 13 additions & 26 deletions fractal_server/app/runner/v2/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,14 @@
from typing import Callable
from typing import Optional

from ....images import Filters
from ....images import SingleImage
from ....images.tools import filter_image_list
from ....images.tools import find_image_by_zarr_url
from ....images.tools import match_filter
from ..exceptions import JobExecutionError
from ..filenames import FILTERS_FILENAME
from ..filenames import HISTORY_FILENAME
from ..filenames import IMAGES_FILENAME
from ..filenames import TYPE_FILTERS_FILENAME
from .runner_functions import no_op_submit_setup_call
from .runner_functions import run_v2_task_compound
from .runner_functions import run_v2_task_non_parallel
Expand Down Expand Up @@ -47,7 +46,7 @@ def execute_tasks_v2(
# Initialize local dataset attributes
zarr_dir = dataset.zarr_dir
tmp_images = deepcopy(dataset.images)
tmp_filters = deepcopy(dataset.filters)
tmp_type_filters = deepcopy(dataset.type_filters)
tcompa marked this conversation as resolved.
Show resolved Hide resolved
tmp_history = []

for wftask in wf_task_list:
Expand All @@ -58,19 +57,14 @@ def execute_tasks_v2(
# PRE TASK EXECUTION

# Get filtered images
pre_filters = dict(
types=copy(tmp_filters["types"]),
attributes=copy(tmp_filters["attributes"]),
)
pre_filters["types"].update(wftask.input_filters["types"])
pre_filters["attributes"].update(wftask.input_filters["attributes"])
pre_type_filters = copy(tmp_type_filters)
pre_type_filters.update(wftask.type_filters)
filtered_images = filter_image_list(
images=tmp_images,
filters=Filters(**pre_filters),
images=tmp_images, type_filters=pre_type_filters
)
# Verify that filtered images comply with task input_types
for image in filtered_images:
if not match_filter(image, Filters(types=task.input_types)):
if not match_filter(image, type_filters=task.input_types):
raise JobExecutionError(
"Invalid filtered image list\n"
f"Task input types: {task.input_types=}\n"
Expand Down Expand Up @@ -249,19 +243,12 @@ def execute_tasks_v2(
else:
tmp_images.pop(img_search["index"])

# Update filters.attributes:
# current + (task_output: not really, in current examples..)
if current_task_output.filters is not None:
tmp_filters["attributes"].update(
current_task_output.filters.attributes
)

# Find manifest ouptut types
types_from_manifest = task.output_types

# Find task-output types
if current_task_output.filters is not None:
types_from_task = current_task_output.filters.types
if current_task_output.type_filters is not None:
types_from_task = current_task_output.type_filters
else:
types_from_task = {}

Expand All @@ -279,8 +266,8 @@ def execute_tasks_v2(
)

# Update filters.types
tmp_filters["types"].update(types_from_manifest)
tmp_filters["types"].update(types_from_task)
tmp_type_filters.update(types_from_manifest)
tmp_type_filters.update(types_from_task)

# Update history (based on _DatasetHistoryItemV2)
history_item = _DatasetHistoryItemV2(
Expand All @@ -299,8 +286,8 @@ def execute_tasks_v2(
# information
with open(workflow_dir_local / HISTORY_FILENAME, "w") as f:
json.dump(tmp_history, f, indent=2)
with open(workflow_dir_local / FILTERS_FILENAME, "w") as f:
json.dump(tmp_filters, f, indent=2)
with open(workflow_dir_local / TYPE_FILTERS_FILENAME, "w") as f:
json.dump(tmp_type_filters, f, indent=2)
with open(workflow_dir_local / IMAGES_FILENAME, "w") as f:
json.dump(tmp_images, f, indent=2)

Expand All @@ -311,7 +298,7 @@ def execute_tasks_v2(
# represent the new attributes (to replace the original ones)
result = dict(
history=tmp_history,
filters=tmp_filters,
type_filters=tmp_type_filters,
images=tmp_images,
)
return result
4 changes: 2 additions & 2 deletions fractal_server/app/runner/v2/task_interface.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
from pydantic import validator

from ....images import SingleImageTaskOutput
from fractal_server.images import Filters
from fractal_server.urls import normalize_url


Expand All @@ -16,7 +15,8 @@ class TaskOutput(BaseModel, extra=Extra.forbid):
default_factory=list
)
image_list_removals: list[str] = Field(default_factory=list)
filters: Filters = Field(default_factory=Filters)
type_filters: dict[str, bool] = Field(default_factory=dict)
attribute_filters: dict[str, list[Any]] = Field(default_factory=dict)
ychiucco marked this conversation as resolved.
Show resolved Hide resolved

def check_zarr_urls_are_unique(self) -> None:
zarr_urls = [img.zarr_url for img in self.image_list_updates]
Expand Down
Loading
Loading