Skip to content

Commit

Permalink
Merge pull request #123 from ihmeuw-msca/feat/rename-pipeline-data-attr
Browse files Browse the repository at this point in the history
Rename Pipeline.data to Pipeline.grouby_data
  • Loading branch information
blsmxiu47 authored Dec 26, 2024
2 parents 9a8b5c1 + 1754e13 commit f2132cf
Show file tree
Hide file tree
Showing 8 changed files with 21 additions and 18 deletions.
2 changes: 1 addition & 1 deletion examples/example_pipeline.json
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
"coef_bounds": {}
},
"directory": "/path/to/experiment/directory",
"data": "/path/to/data.parquet",
"groupby_data": "/path/to/data.parquet",
"groupby": [
"sex_id"
],
Expand Down
2 changes: 1 addition & 1 deletion examples/pipeline_example.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ def create_pipeline(directory: str, data: str):
"model_type": "binomial",
},
directory=directory,
data=data,
groupby_data=data,
groupby=["sex_id"],
)

Expand Down
17 changes: 10 additions & 7 deletions src/onemod/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,19 +30,21 @@ class Pipeline(BaseModel):
Pipeline configuration.
directory : Path
Experiment directory.
data : Path or None, optional
Input data used to create data subsets. Required for pipeline or
stage `groupby` attribute. Default is None.
groupby : set of str or None, optional
Column names used to create data subsets. Default is None.
groupby_data : Path or None, optional
Path to the data file used for creating data subsets. Default is None.
Required when specifying pipeline or stage `groupby` attribute.
All columns specified in pipeline or stage `groupby` must be present in
`groupby_data`.
"""

name: str
config: PipelineConfig
directory: Path
data: Path | None = None
groupby: set[str] | None = None
groupby_data: Path | None = None
id_subsets: dict[str, list[Any]] | None = None
_stages: dict[str, Stage] = {} # set by add_stage

Expand Down Expand Up @@ -262,7 +264,7 @@ def build(self, id_subsets: dict[str, list[Any]] | None = None) -> None:
config_path = self.directory / (self.name + ".json")
for stage in self.stages.values():
stage.set_dataif(config_path)
stage.dataif.add_path("pipeline_data", self.data)
stage.dataif.add_path("pipeline_groupby_data", self.groupby_data)

# Create data subsets
if isinstance(stage, ModelStage):
Expand All @@ -272,10 +274,11 @@ def build(self, id_subsets: dict[str, list[Any]] | None = None) -> None:
else:
stage.groupby.update(self.groupby)
if stage.groupby:
if self.data is None:
if self.groupby_data is None:
raise AttributeError("Data is required for groupby")
stage.create_stage_subsets(
data_key="pipeline_data", id_subsets=self.id_subsets
data_key="pipeline_groupby_data",
id_subsets=self.id_subsets,
)
# Create parameter sets
if stage.config.crossable_params:
Expand Down
2 changes: 1 addition & 1 deletion tests/e2e/test_e2e_dummy_pipeline_sequential.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ def test_dummy_pipeline(small_input_data, test_base_dir, method):

assert dummy_pipeline_dict["name"] == "dummy_pipeline"
assert dummy_pipeline_dict["directory"] == str(test_base_dir)
assert dummy_pipeline_dict["data"] == str(small_input_data)
assert dummy_pipeline_dict["groupby_data"] == str(small_input_data)
assert dummy_pipeline_dict["groupby"] == ["sex_id"]
assert_equal_unordered(
dummy_pipeline_dict["config"],
Expand Down
2 changes: 1 addition & 1 deletion tests/e2e/test_e2e_onemod_example1_sequential.py
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ def test_e2e_onemod_example1_sequential(test_base_dir):
mtype="binomial",
),
directory=test_base_dir,
data=Path(test_base_dir, "data", "data.parquet"),
groupby_data=Path(test_base_dir, "data", "data.parquet"),
groupby=["sex_id"],
)

Expand Down
4 changes: 2 additions & 2 deletions tests/helpers/dummy_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ def setup_dummy_pipeline(test_input_data, test_base_dir):
model_type="binomial",
),
directory=test_base_dir,
data=test_input_data,
groupby_data=test_input_data,
groupby={"sex_id"},
)

Expand All @@ -111,7 +111,7 @@ def setup_dummy_pipeline(test_input_data, test_base_dir):
)

# Define dependencies
preprocessing(data=dummy_pipeline.data)
preprocessing(data=dummy_pipeline.groupby_data)
covariate_selection(data=preprocessing.output["data"])
global_model(
data=preprocessing.output["data"],
Expand Down
6 changes: 3 additions & 3 deletions tests/integration/test_integration_pipeline_build.py
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ def pipeline_with_single_stage(test_base_dir, stage_1):
id_columns=["age_group_id", "location_id"], model_type="binomial"
),
directory=test_base_dir,
data=test_base_dir / "data" / "data.parquet",
groupby_data=test_base_dir / "data" / "data.parquet",
groupby=["age_group_id"],
)
pipeline.add_stage(stage_1)
Expand All @@ -176,7 +176,7 @@ def pipeline_with_multiple_stages(test_base_dir, stage_1, stage_2):
id_columns=["age_group_id", "location_id"], model_type="binomial"
),
directory=test_base_dir,
data=test_base_dir / "data" / "data.parquet",
groupby_data=test_base_dir / "data" / "data.parquet",
groupby=["age_group_id"],
)
pipeline.add_stages([stage_1, stage_2])
Expand All @@ -197,7 +197,7 @@ def test_pipeline_build_single_stage(test_base_dir, pipeline_with_single_stage):
pipeline_dict_expected = {
"name": "test_pipeline",
"directory": str(test_base_dir),
"data": str(test_base_dir / "data" / "data.parquet"),
"groupby_data": str(test_base_dir / "data" / "data.parquet"),
"groupby": ["age_group_id"],
"config": {
"id_columns": ["age_group_id", "location_id"],
Expand Down
4 changes: 2 additions & 2 deletions tests/integration/test_integration_pipeline_evaluate.py
Original file line number Diff line number Diff line change
Expand Up @@ -160,14 +160,14 @@ def test_evaluate_with_id_subsets(test_base_dir, sample_data):
model_type="binomial",
),
directory=test_base_dir,
data=sample_input_data,
groupby_data=sample_input_data,
groupby={"age_group_id"},
)
test_stage = MultiplyByTwoStage(
name="multiply_by_two", config=ModelConfig()
)
test_pipeline.add_stages([test_stage])
test_stage(data=test_pipeline.data)
test_stage(data=test_pipeline.groupby_data)

# Ensure input data is as expected for the test
assert sample_input_data.exists()
Expand Down

0 comments on commit f2132cf

Please sign in to comment.