diff --git a/examples/example_pipeline.json b/examples/example_pipeline.json index 7def30cd..e1f876e4 100644 --- a/examples/example_pipeline.json +++ b/examples/example_pipeline.json @@ -16,7 +16,7 @@ "coef_bounds": {} }, "directory": "/path/to/experiment/directory", - "data": "/path/to/data.parquet", + "groupby_data": "/path/to/data.parquet", "groupby": [ "sex_id" ], diff --git a/examples/pipeline_example.py b/examples/pipeline_example.py index d32421b8..90752d13 100644 --- a/examples/pipeline_example.py +++ b/examples/pipeline_example.py @@ -60,7 +60,7 @@ def create_pipeline(directory: str, data: str): "model_type": "binomial", }, directory=directory, - data=data, + groupby_data=data, groupby=["sex_id"], ) diff --git a/src/onemod/pipeline.py b/src/onemod/pipeline.py index 96139fc8..b2e400ca 100644 --- a/src/onemod/pipeline.py +++ b/src/onemod/pipeline.py @@ -30,19 +30,21 @@ class Pipeline(BaseModel): Pipeline configuration. directory : Path Experiment directory. - data : Path or None, optional - Input data used to create data subsets. Required for pipeline or - stage `groupby` attribute. Default is None. groupby : set of str or None, optional Column names used to create data subsets. Default is None. + groupby_data : Path or None, optional + Path to the data file used for creating data subsets. Default is None. + Required when specifying pipeline or stage `groupby` attribute. + All columns specified in pipeline or stage `groupby` must be present in + `groupby_data`. """ name: str config: PipelineConfig directory: Path - data: Path | None = None groupby: set[str] | None = None + groupby_data: Path | None = None id_subsets: dict[str, list[Any]] | None = None _stages: dict[str, Stage] = {} # set by add_stage @@ -262,7 +264,7 @@ def build(self, id_subsets: dict[str, list[Any]] | None = None) -> None: config_path = self.directory / (self.name + ".json") for stage in self.stages.values(): stage.set_dataif(config_path) - stage.dataif.add_path("pipeline_data", self.data) + stage.dataif.add_path("pipeline_groupby_data", self.groupby_data) # Create data subsets if isinstance(stage, ModelStage): @@ -272,10 +274,11 @@ def build(self, id_subsets: dict[str, list[Any]] | None = None) -> None: else: stage.groupby.update(self.groupby) if stage.groupby: - if self.data is None: + if self.groupby_data is None: raise AttributeError("Data is required for groupby") stage.create_stage_subsets( - data_key="pipeline_data", id_subsets=self.id_subsets + data_key="pipeline_groupby_data", + id_subsets=self.id_subsets, ) # Create parameter sets if stage.config.crossable_params: diff --git a/tests/e2e/test_e2e_dummy_pipeline_sequential.py b/tests/e2e/test_e2e_dummy_pipeline_sequential.py index 781c77d6..cd88a630 100644 --- a/tests/e2e/test_e2e_dummy_pipeline_sequential.py +++ b/tests/e2e/test_e2e_dummy_pipeline_sequential.py @@ -26,7 +26,7 @@ def test_dummy_pipeline(small_input_data, test_base_dir, method): assert dummy_pipeline_dict["name"] == "dummy_pipeline" assert dummy_pipeline_dict["directory"] == str(test_base_dir) - assert dummy_pipeline_dict["data"] == str(small_input_data) + assert dummy_pipeline_dict["groupby_data"] == str(small_input_data) assert dummy_pipeline_dict["groupby"] == ["sex_id"] assert_equal_unordered( dummy_pipeline_dict["config"], diff --git a/tests/e2e/test_e2e_onemod_example1_sequential.py b/tests/e2e/test_e2e_onemod_example1_sequential.py index c81e3044..1fe9cada 100644 --- a/tests/e2e/test_e2e_onemod_example1_sequential.py +++ b/tests/e2e/test_e2e_onemod_example1_sequential.py @@ -126,7 +126,7 @@ def test_e2e_onemod_example1_sequential(test_base_dir): mtype="binomial", ), directory=test_base_dir, - data=Path(test_base_dir, "data", "data.parquet"), + groupby_data=Path(test_base_dir, "data", "data.parquet"), groupby=["sex_id"], ) diff --git a/tests/helpers/dummy_pipeline.py b/tests/helpers/dummy_pipeline.py index d3def417..2563f533 100644 --- a/tests/helpers/dummy_pipeline.py +++ b/tests/helpers/dummy_pipeline.py @@ -94,7 +94,7 @@ def setup_dummy_pipeline(test_input_data, test_base_dir): model_type="binomial", ), directory=test_base_dir, - data=test_input_data, + groupby_data=test_input_data, groupby={"sex_id"}, ) @@ -111,7 +111,7 @@ def setup_dummy_pipeline(test_input_data, test_base_dir): ) # Define dependencies - preprocessing(data=dummy_pipeline.data) + preprocessing(data=dummy_pipeline.groupby_data) covariate_selection(data=preprocessing.output["data"]) global_model( data=preprocessing.output["data"], diff --git a/tests/integration/test_integration_pipeline_build.py b/tests/integration/test_integration_pipeline_build.py index 878aa81a..9fd65f8e 100644 --- a/tests/integration/test_integration_pipeline_build.py +++ b/tests/integration/test_integration_pipeline_build.py @@ -159,7 +159,7 @@ def pipeline_with_single_stage(test_base_dir, stage_1): id_columns=["age_group_id", "location_id"], model_type="binomial" ), directory=test_base_dir, - data=test_base_dir / "data" / "data.parquet", + groupby_data=test_base_dir / "data" / "data.parquet", groupby=["age_group_id"], ) pipeline.add_stage(stage_1) @@ -176,7 +176,7 @@ def pipeline_with_multiple_stages(test_base_dir, stage_1, stage_2): id_columns=["age_group_id", "location_id"], model_type="binomial" ), directory=test_base_dir, - data=test_base_dir / "data" / "data.parquet", + groupby_data=test_base_dir / "data" / "data.parquet", groupby=["age_group_id"], ) pipeline.add_stages([stage_1, stage_2]) @@ -197,7 +197,7 @@ def test_pipeline_build_single_stage(test_base_dir, pipeline_with_single_stage): pipeline_dict_expected = { "name": "test_pipeline", "directory": str(test_base_dir), - "data": str(test_base_dir / "data" / "data.parquet"), + "groupby_data": str(test_base_dir / "data" / "data.parquet"), "groupby": ["age_group_id"], "config": { "id_columns": ["age_group_id", "location_id"], diff --git a/tests/integration/test_integration_pipeline_evaluate.py b/tests/integration/test_integration_pipeline_evaluate.py index 3b733799..77ff9202 100644 --- a/tests/integration/test_integration_pipeline_evaluate.py +++ b/tests/integration/test_integration_pipeline_evaluate.py @@ -160,14 +160,14 @@ def test_evaluate_with_id_subsets(test_base_dir, sample_data): model_type="binomial", ), directory=test_base_dir, - data=sample_input_data, + groupby_data=sample_input_data, groupby={"age_group_id"}, ) test_stage = MultiplyByTwoStage( name="multiply_by_two", config=ModelConfig() ) test_pipeline.add_stages([test_stage]) - test_stage(data=test_pipeline.data) + test_stage(data=test_pipeline.groupby_data) # Ensure input data is as expected for the test assert sample_input_data.exists()