Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[GSProcessing] Enforce re-order for node label processing during classification #1136

Open
wants to merge 1 commit into
base: main
Choose a base branch
from

Conversation

thvasilo
Copy link
Contributor

@thvasilo thvasilo commented Jan 17, 2025

Issue #, if available:

Fixes #1135 #1138

Description of changes:

  • We guarantee ordering for node label classification by ordering the transformed label DF after processing by the NODE_INT_MAPPING id, and doing the same for masks.
  • Because there is no guarantee for order when writing to Parquet from Spark even for ordered Spark DataFrames, we collect the labels and masks to a Pandas DF on the Spark leader, and write that using pyarrow.

By submitting this pull request, I confirm that you can use, modify, copy, and redistribute this contribution, under the terms of your choice.

@thvasilo thvasilo added ready able to trigger the CI gsprocessing For issues and PRs related the the GSProcessing library 0.4.1 labels Jan 17, 2025
@thvasilo thvasilo force-pushed the reorder-node-ids branch 2 times, most recently from 4d0afcd to 865d943 Compare January 17, 2025 19:28
@thvasilo thvasilo marked this pull request as ready for review January 17, 2025 19:42
@thvasilo thvasilo requested a review from jalencato January 17, 2025 19:42
@thvasilo thvasilo self-assigned this Jan 17, 2025
@thvasilo thvasilo added this to the 0.4.1 release milestone Jan 17, 2025
Copy link
Collaborator

@jalencato jalencato left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we have any performance number to compare between the built-in spark method and pandaUDF method?

.alias(self.label_column)
.cast("long")
.alias(self.label_column),
*original_cols,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So we add all the original columns here for the label value? Will it overkill?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, well here we want to ensure that the order_col is preserved in the output, but DistSingleLabelTransformation is not aware of order_col. We could modify the constructor to also provide that, and then return just the label and order column.

The argument against would be that these two cols are selected downstream anyway, and the other columns are not materialized, so there shouldn't be any real performance penalty.

I'm good with either option.

bucket,
f"{s3_prefix}/{output_file}",
)

def run(self) -> None:
"""
Executes the Spark processing job.
Executes the Spark processing job, optional repartition job, and uploads any metadata files
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
Executes the Spark processing job, optional repartition job, and uploads any metadata files
Executes the Spark processing job, optional repartition job, and uploads transformed metadata files

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This also uploads perf_counters.json , not sure if we would qualify that as a "transformed" metadata file.

if self.filesystem_type == FilesystemType.LOCAL:
os.makedirs(os.path.dirname(out_path), exist_ok=True)

pq.write_table(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shall we move this part to line 666? And if emr-serverless support directly use pyarrow to write to S3?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can move it yes.

emr-serverless support directly use pyarrow to write to S3

Yes we already use pyarrow to write and modify files on EMRS during the re-partition stage that runs on the Spark leader.

split_metadata = {}

def write_masks_numpy(np_mask_arrays: Sequence[np.ndarray]):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What about put these two functions into utils.py?

Copy link
Contributor Author

@thvasilo thvasilo Jan 17, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My rule of thumb to make a function public is : "is this function used in at least two places in the codebase?".

e.g. that's why I moved _create_metadata_entry from an inner function to be part DGHL in this PR.

Right now the answer is no for these functions, if we see a need to re-use them we can pull them out.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
0.4.1 gsprocessing For issues and PRs related the the GSProcessing library ready able to trigger the CI
Projects
None yet
Development

Successfully merging this pull request may close these issues.

GSProcessing Local Run does not output file list in alphabetic order
2 participants