Skip to content

Commit

Permalink
Merge pull request mmcdermott#229 from mmcdermott/dev
Browse files Browse the repository at this point in the history
Release Candidate
  • Loading branch information
mmcdermott authored Dec 4, 2024
2 parents f0a773b + f54ea5a commit 2f88863
Show file tree
Hide file tree
Showing 15 changed files with 95 additions and 64 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -271,7 +271,7 @@ In this structure, `$INPUT_FILE_STEM` is the stem of the input file name, `$EVEN
particular kind of event that can be extracted from the input file, `$CODE` is the code for the event, either
as a constant string or (with the syntax `"col($COLUMN)"` the name of the column in the raw data to be read to
get the code), and `$TIME` is the time for the event, either as `null` to indicate the event has a
null time (e.g., a static measurement) or with the `"col($COLUMN)"` syntax refenced above, and all
null time (e.g., a static measurement) or with the `"col($COLUMN)"` syntax referenced above, and all
subsequent key-value pairs are mappings from the MEDS column name to the raw column name in the input data.
Here, these mappings can _only_ point to columns in the input data, not constant values, and the input data
columns must be either string or categorical types (in which case they will be converted to categorical) or
Expand Down
2 changes: 1 addition & 1 deletion docs/preprocessing_operation_prototypes.md
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ simplicity).

##### Operation Steps

1. Per-shard, filter the pateint data to satisfy desired set of patient or other data critieria.
1. Per-shard, filter the pateint data to satisfy desired set of patient or other data criteria.
2. Per-shard, group by code and collect some aggregate statistics. Optionally also compute statistics across
all codes.
3. Reduce the per-shard aggregate files into a unified `metadata/codes.parquet` file.
Expand Down
2 changes: 1 addition & 1 deletion docs/terminology.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ output layer logit identification, etc.
A single measurable quantity observed about the patient during their care. These observations can take on many
forms, such as observing a diagnostic code being applied to the patient, observing a patient's admission or
transfer from one unit to another, observing a laboratory test result, but always correspond to a single
measureable unit about a single patient. They are encoded in MEDS datasets as a single row in the main MEDS
measurable unit about a single patient. They are encoded in MEDS datasets as a single row in the main MEDS
schema.

#### An _event_ or _patient event_
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ classifiers = [
"Operating System :: OS Independent",
]
dependencies = [
"polars~=1.6.0", "pyarrow", "nested_ragged_tensors==0.0.6", "loguru", "hydra-core", "numpy", "meds==0.3.3",
"polars~=1.14", "pyarrow", "nested_ragged_tensors>=0.0.8", "loguru", "hydra-core", "numpy", "meds==0.3.3",
]

[tool.setuptools_scm]
Expand Down
69 changes: 65 additions & 4 deletions src/MEDS_transforms/aggregate_code_metadata.py
Original file line number Diff line number Diff line change
Expand Up @@ -443,6 +443,39 @@ def mapper_fntr(
│ C ┆ 1 ┆ [5.0, 7.5] │
│ D ┆ null ┆ [] │
└──────┴───────────┴───────────────────┘
Empty dataframes are handled as you would expect
>>> df_empty = pl.DataFrame({
... "code": [],
... "modifier1": [],
... "modifier_ignored": [],
... "subject_id": [],
... "numeric_value": [],
... }, schema=df.schema)
>>> stage_cfg = DictConfig({"aggregations": ["values/sum_sqd", "values/min", "values/max"]})
>>> mapper = mapper_fntr(stage_cfg, code_modifiers)
>>> mapper(df_empty.lazy()).collect()
shape: (0, 5)
┌──────┬───────────┬────────────────┬────────────┬────────────┐
│ code ┆ modifier1 ┆ values/sum_sqd ┆ values/min ┆ values/max │
│ --- ┆ --- ┆ --- ┆ --- ┆ --- │
│ str ┆ i64 ┆ f64 ┆ f64 ┆ f64 │
╞══════╪═══════════╪════════════════╪════════════╪════════════╡
└──────┴───────────┴────────────────┴────────────┴────────────┘
>>> stage_cfg = DictConfig({
... "aggregations": ["values/sum_sqd", "values/min", "values/max"],
... "do_summarize_over_all_codes": True,
... })
>>> mapper = mapper_fntr(stage_cfg, code_modifiers)
>>> mapper(df_empty.lazy()).collect()
shape: (1, 5)
┌──────┬───────────┬────────────────┬────────────┬────────────┐
│ code ┆ modifier1 ┆ values/sum_sqd ┆ values/min ┆ values/max │
│ --- ┆ --- ┆ --- ┆ --- ┆ --- │
│ str ┆ i64 ┆ f64 ┆ f64 ┆ f64 │
╞══════╪═══════════╪════════════════╪════════════╪════════════╡
│ null ┆ null ┆ 0.0 ┆ null ┆ null │
└──────┴───────────┴────────────────┴────────────┴────────────┘
"""

code_key_columns = validate_args_and_get_code_cols(stage_cfg, code_modifiers)
Expand Down Expand Up @@ -542,10 +575,38 @@ def reducer_fntr(
... "values/max": [2],
... "values/quantiles": [[]],
... })
>>> df_empty = pl.DataFrame({
... "code": [],
... "modifier1": [],
... "code/n_subjects": [],
... "code/n_occurrences": [],
... "values/n_subjects": [],
... "values/n_occurrences": [],
... "values/n_ints": [],
... "values/sum": [],
... "values/sum_sqd": [],
... "values/min": [],
... "values/max": [],
... "values/quantiles": [],
... }, schema=df_3.schema)
>>> df_null_empty = pl.DataFrame({
... "code": [None],
... "modifier1": [None],
... "code/n_subjects": [0],
... "code/n_occurrences": [0],
... "values/n_subjects": [0],
... "values/n_occurrences": [0],
... "values/n_ints": [0],
... "values/sum": [0],
... "values/sum_sqd": [0],
... "values/min": [None],
... "values/max": [None],
... "values/quantiles": [None],
... }, schema=df_3.schema)
>>> code_modifiers = ["modifier1"]
>>> stage_cfg = DictConfig({"aggregations": ["code/n_subjects", "values/n_ints"]})
>>> reducer = reducer_fntr(stage_cfg, code_modifiers)
>>> reducer(df_1, df_2, df_3)
>>> reducer(df_1, df_2, df_3, df_empty, df_null_empty)
shape: (7, 4)
┌──────┬───────────┬─────────────────┬───────────────┐
│ code ┆ modifier1 ┆ code/n_subjects ┆ values/n_ints │
Expand All @@ -572,7 +633,7 @@ def reducer_fntr(
... })
>>> stage_cfg = DictConfig({"aggregations": ["code/n_occurrences", "values/sum"]})
>>> reducer = reducer_fntr(stage_cfg, code_modifiers)
>>> reducer(df_1, df_2, df_3)
>>> reducer(df_1, df_2, df_3, df_empty, df_null_empty)
shape: (7, 4)
┌──────┬───────────┬────────────────────┬────────────┐
│ code ┆ modifier1 ┆ code/n_occurrences ┆ values/sum │
Expand All @@ -589,7 +650,7 @@ def reducer_fntr(
└──────┴───────────┴────────────────────┴────────────┘
>>> stage_cfg = DictConfig({"aggregations": ["values/n_subjects", "values/n_occurrences"]})
>>> reducer = reducer_fntr(stage_cfg, code_modifiers)
>>> reducer(df_1, df_2, df_3)
>>> reducer(df_1, df_2, df_3, df_empty, df_null_empty)
shape: (7, 4)
┌──────┬───────────┬───────────────────┬──────────────────────┐
│ code ┆ modifier1 ┆ values/n_subjects ┆ values/n_occurrences │
Expand Down Expand Up @@ -629,7 +690,7 @@ def reducer_fntr(
... "aggregations": [{"name": "values/quantiles", "quantiles": [0.25, 0.5, 0.75]}],
... })
>>> reducer = reducer_fntr(stage_cfg, code_modifiers)
>>> reducer(df_1, df_2, df_3).unnest("values/quantiles")
>>> reducer(df_1, df_2, df_3, df_empty, df_null_empty).unnest("values/quantiles")
shape: (7, 5)
┌──────┬───────────┬──────────────────────┬─────────────────────┬──────────────────────┐
│ code ┆ modifier1 ┆ values/quantile/0.25 ┆ values/quantile/0.5 ┆ values/quantile/0.75 │
Expand Down
1 change: 1 addition & 0 deletions src/MEDS_transforms/mapreduce/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,7 @@ def rwlock_wrap(
Traceback (most recent call last):
...
polars.exceptions.ColumnNotFoundError: unable to find column "d"; valid columns: ["a", "b", "c"]
...
>>> assert not out_fp.is_file() # Out file should not be created when the process crashes
If we check the locks during computation, one should be present
Expand Down
5 changes: 2 additions & 3 deletions src/MEDS_transforms/parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
- Two key-value pairs, where the first key is `"output"` and the value is the column expression and the
second key is `"matcher"` and the value is the matcher dictionary.
"""

from __future__ import annotations

import re
Expand Down Expand Up @@ -295,9 +296,7 @@ def to_pl_expr(cls, expr_type: ColExprType, expr_val: Any) -> tuple[pl.Expr, set
['baz', 'foo']
>>> expr, cols = ColExprType.to_pl_expr(ColExprType.LITERAL, ListConfig(["foo", "bar"]))
>>> print(expr)
Series[literal]
>>> pl.select(expr).item().to_list()
['foo', 'bar']
["foo", "bar"]
>>> cols
set()
>>> expr, cols = ColExprType.to_pl_expr(ColExprType.EXTRACT, {"from": "foo", "regex": "bar"})
Expand Down
1 change: 0 additions & 1 deletion tests/MEDS_Extract/test_finalize_MEDS_metadata.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
scripts.
"""


from datetime import datetime

import polars as pl
Expand Down
1 change: 0 additions & 1 deletion tests/MEDS_Transforms/test_fit_vocabulary_indices.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
scripts.
"""


from tests.MEDS_Transforms import FIT_VOCABULARY_INDICES_SCRIPT
from tests.MEDS_Transforms.transform_tester_base import parse_code_metadata_csv, single_stage_transform_tester

Expand Down
5 changes: 2 additions & 3 deletions tests/MEDS_Transforms/test_multi_stage_preprocess_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
The stage configuration arguments will be as given in the yaml block below:
"""


from datetime import datetime

import polars as pl
Expand Down Expand Up @@ -436,7 +435,7 @@
1195293,"06/20/2010, 20:12:31",AGE,32.002988771458945,true
1195293,"06/20/2010, 20:12:31",HR,112.5,true
1195293,"06/20/2010, 20:12:31",TEMP,99.8,true
1195293,"06/20/2010, 20:24:44","TIME_OF_DAY//[18,24)",
1195293,"06/20/2010, 20:24:44","TIME_OF_DAY//[18,24)",,
1195293,"06/20/2010, 20:24:44",AGE,32.00301199932335,true
1195293,"06/20/2010, 20:24:44",HR,107.7,true
1195293,"06/20/2010, 20:24:44",TEMP,100.0,true
Expand Down Expand Up @@ -821,7 +820,7 @@
1195293,"06/20/2010, 20:12:31",2,nan
1195293,"06/20/2010, 20:12:31",8,0.5879650115966797
1195293,"06/20/2010, 20:12:31",9,-1.1555582284927368
1195293,"06/20/2010, 20:24:44",12
1195293,"06/20/2010, 20:24:44",12,
1195293,"06/20/2010, 20:24:44",2,nan
1195293,"06/20/2010, 20:24:44",8,-1.2583553791046143
1195293,"06/20/2010, 20:24:44",9,-0.0889078751206398
Expand Down
1 change: 0 additions & 1 deletion tests/MEDS_Transforms/test_occlude_outliers.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
scripts.
"""


import polars as pl

from tests.MEDS_Transforms import OCCLUDE_OUTLIERS_SCRIPT
Expand Down
1 change: 0 additions & 1 deletion tests/MEDS_Transforms/test_reorder_measurements.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
scripts.
"""


from tests.MEDS_Transforms import REORDER_MEASUREMENTS_SCRIPT
from tests.MEDS_Transforms.transform_tester_base import single_stage_transform_tester
from tests.utils import parse_meds_csvs
Expand Down
1 change: 0 additions & 1 deletion tests/MEDS_Transforms/test_tensorization.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
scripts.
"""


from nested_ragged_tensors.ragged_numpy import JointNestedRaggedTensorDict

from tests.MEDS_Transforms import TENSORIZATION_SCRIPT
Expand Down
64 changes: 20 additions & 44 deletions tests/test_with_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,8 +91,8 @@
- _preprocess
- _self_
input_dir: {{input_dir}}
cohort_dir: {{cohort_dir}}
input_dir: {input_dir}
cohort_dir: {cohort_dir}
"""

PIPELINE_YAML = f"""
Expand Down Expand Up @@ -176,48 +176,42 @@


def test_pipeline():
shared_kwargs = {
"config_name": "runner",
"stage_name": None,
"stage_kwargs": None,
"do_pass_stage_name": False,
"do_use_config_yaml": False,
"do_include_dirs": False,
"hydra_verbose": False,
}

single_stage_tester(
script=str(RUNNER_SCRIPT) + " -h",
config_name="runner",
stage_name=None,
stage_kwargs=None,
do_pass_stage_name=False,
do_use_config_yaml=False,
input_files={},
want_outputs={},
assert_no_other_outputs=True,
should_error=False,
test_name="Runner Help Test",
do_include_dirs=False,
hydra_verbose=False,
stdout_regex=exact_str_regex(NO_ARGS_HELP_STR.strip()),
**shared_kwargs,
)

single_stage_tester(
script=str(RUNNER_SCRIPT) + " -h",
config_name="runner",
stage_name=None,
stage_kwargs=None,
do_pass_stage_name=False,
do_use_config_yaml=False,
input_files={"pipeline.yaml": partial(add_params, PIPELINE_YAML)},
want_outputs={},
assert_no_other_outputs=True,
should_error=False,
pipeline_config_fp="{input_dir}/pipeline.yaml",
test_name="Runner Help Test",
do_include_dirs=False,
hydra_verbose=False,
stdout_regex=exact_str_regex(WITH_CONFIG_HELP_STR.strip()),
**shared_kwargs,
)

shared_kwargs["script"] = RUNNER_SCRIPT

single_stage_tester(
script=RUNNER_SCRIPT,
config_name="runner",
stage_name=None,
stage_kwargs=None,
do_pass_stage_name=False,
do_use_config_yaml=False,
input_files={
**{f"data/{k}": v for k, v in MEDS_SHARDS.items()},
code_metadata_filepath: MEDS_CODE_METADATA,
Expand All @@ -242,17 +236,11 @@ def test_pipeline():
pipeline_config_fp="{input_dir}/pipeline.yaml",
stage_runner_fp="{input_dir}/stage_runner.yaml",
test_name="Runner Test",
do_include_dirs=False,
df_check_kwargs={"check_column_order": False},
**shared_kwargs,
)

single_stage_tester(
script=RUNNER_SCRIPT,
config_name="runner",
stage_name=None,
stage_kwargs=None,
do_pass_stage_name=False,
do_use_config_yaml=False,
input_files={
**{f"data/{k}": v for k, v in MEDS_SHARDS.items()},
code_metadata_filepath: MEDS_CODE_METADATA,
Expand All @@ -277,44 +265,32 @@ def test_pipeline():
pipeline_config_fp="{input_dir}/pipeline.yaml",
stage_runner_fp="{input_dir}/stage_runner.yaml",
test_name="Runner Test with parallelism",
do_include_dirs=False,
df_check_kwargs={"check_column_order": False},
**shared_kwargs,
)

single_stage_tester(
script=RUNNER_SCRIPT,
config_name="runner",
stage_name=None,
stage_kwargs=None,
do_pass_stage_name=False,
do_use_config_yaml=False,
input_files={
**{f"data/{k}": v for k, v in MEDS_SHARDS.items()},
code_metadata_filepath: MEDS_CODE_METADATA,
subject_splits_filepath: SPLITS_DF,
"_preprocess.yaml": partial(add_params, PIPELINE_YAML),
},
do_include_dirs=False,
should_error=True,
pipeline_config_fp="{input_dir}/_preprocess.yaml",
test_name="Runner should fail if the pipeline config has an invalid name",
**shared_kwargs,
)

single_stage_tester(
script=RUNNER_SCRIPT,
config_name="runner",
stage_name=None,
stage_kwargs=None,
do_pass_stage_name=False,
do_use_config_yaml=False,
input_files={
**{f"data/{k}": v for k, v in MEDS_SHARDS.items()},
code_metadata_filepath: MEDS_CODE_METADATA,
subject_splits_filepath: SPLITS_DF,
"pipeline.yaml": partial(add_params, PIPELINE_NO_STAGES_YAML),
},
do_include_dirs=False,
should_error=True,
pipeline_config_fp="{input_dir}/pipeline.yaml",
test_name="Runner should fail if the pipeline has no stages",
**shared_kwargs,
)
2 changes: 1 addition & 1 deletion tests/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -247,7 +247,7 @@ def check_NRT_output(
want_nrt: JointNestedRaggedTensorDict,
msg: str,
):
got_nrt = JointNestedRaggedTensorDict.load(output_fp)
got_nrt = JointNestedRaggedTensorDict(tensors_fp=output_fp)

# assert got_nrt.schema == want_nrt.schema, (
# f"Expected the schema of the NRT at {output_fp} to be equal to the target.\n"
Expand Down

0 comments on commit 2f88863

Please sign in to comment.