Skip to content

Commit

Permalink
apply comments for gspartition
Browse files Browse the repository at this point in the history
  • Loading branch information
jalencato committed Nov 11, 2024
1 parent 2e47f2d commit e631d2c
Show file tree
Hide file tree
Showing 8 changed files with 186 additions and 204 deletions.
2 changes: 1 addition & 1 deletion graphstorm-processing/tests/test_converter.py
Original file line number Diff line number Diff line change
Expand Up @@ -516,7 +516,7 @@ def test_convert_gsprocessing(converter: GConstructConfigConverter):
{
"column": "author",
"name": "hard_negative",
"transformation": {"name": "edge_dst_hard_negative", "separator": ";"},
"transformation": {"name": "edge_dst_hard_negative", "kwargs": {"separator": ";"}},
},
]
assert edges_output["labels"] == [
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ def test_hard_negative_example_list(spark: SparkSession, check_df_schema, tmp_pa

for idx, row in enumerate(output_data):
np.testing.assert_equal(
row[0], expected_output[idx], decimal=3, err_msg=f"Row {idx} is not equal"
row[0], expected_output[idx], err_msg=f"Row {idx} is not equal"
)


Expand All @@ -91,7 +91,7 @@ def test_hard_negative_example_str(spark: SparkSession, check_df_schema, tmp_pat
mapping_column = [NODE_MAPPING_STR, NODE_MAPPING_INT]
mapping_df = spark.createDataFrame(mapping_data, schema=mapping_column)
mapping_df.repartition(1).write.parquet(f"{tmp_path}/raw_id_mappings/dst_type/parquet")
edge_mapping_dict = {
hard_node_mapping_dict = {
"edge_type": "src_type:relation:dst_type",
"mapping_path": f"{tmp_path}/raw_id_mappings/",
"format_name": "parquet",
Expand All @@ -108,5 +108,5 @@ def test_hard_negative_example_str(spark: SparkSession, check_df_schema, tmp_pat

for idx, row in enumerate(output_data):
np.testing.assert_equal(
row[0], expected_output[idx], decimal=3, err_msg=f"Row {idx} is not equal"
row[0], expected_output[idx], err_msg=f"Row {idx} is not equal"
)
27 changes: 9 additions & 18 deletions python/graphstorm/gpartition/dist_partition_graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -191,22 +191,15 @@ def main():
)

# Hard Negative Mapping
if args.gsprocessing_config:
gsprocessing_config = args.gsprocessing_config
shuffle_hard_negative_nids(f"{args.input_path}/{gsprocessing_config}",
args.num_parts, args.output_path)
else:
for filename in os.listdir(args.input_path):
if filename.endswith("_with_transformations.json"):
gsprocessing_config = filename
shuffle_hard_negative_nids(f"{args.input_path}/{gsprocessing_config}",
args.num_parts, args.output_path)
break
else:
# Did not raise error here for not introducing the break change,
# but will raise warning here to warn customers.
logging.info("Skip the hard negative node ID mapping, "
"please upgrade to the latest GSProcessing.")
# Load GSProcessing config from launch_arguments generated by GSProcessing
# Generated GSProcessing config will have _with_transformation suffix.
with open(os.path.join(args.input_path, "launch_arguments.json"),
"r", encoding="utf-8") as f:
gsprocessing_launch_arguments: Dict = json.load(f)
gsprocessing_config = gsprocessing_launch_arguments["config_filename"]
gsprocessing_config = gsprocessing_config.replace(".json", "_with_transformations.json")
shuffle_hard_negative_nids(f"{args.input_path}/{gsprocessing_config}",
args.num_parts, args.output_path)


def parse_args() -> argparse.Namespace:
Expand All @@ -215,8 +208,6 @@ def parse_args() -> argparse.Namespace:
+ "or regression tasks")
argparser.add_argument("--input-path", type=str, required=True,
help="Path to input DGL chunked data.")
argparser.add_argument("--gsprocessing-config", type=str,
help="Path to the input GSProcessing config data.")
argparser.add_argument("--metadata-filename", type=str, default="metadata.json",
help="Name for the chunked DGL data metadata file.")
argparser.add_argument("--output-path", type=str, required=True,
Expand Down
14 changes: 7 additions & 7 deletions python/graphstorm/gpartition/post_hard_negative.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ def load_hard_negative_config(gsprocessing_config):

# Hard Negative only supports link prediction
edges_config = config['graph']['edges']
mapping_edge_list = []
hard_neg_list = []
for single_edge_config in edges_config:
if "features" not in single_edge_config:
continue
Expand All @@ -47,13 +47,13 @@ def load_hard_negative_config(gsprocessing_config):
single_edge_config["relation"]["type"],
single_edge_config["dest"]["type"]])
hard_neg_feat_name = single_feature['name']
mapping_edge_list.append({"dst_node_type": single_edge_config["dest"]["type"],
hard_neg_list.append({"dst_node_type": single_edge_config["dest"]["type"],
"edge_type": edge_type,
"hard_neg_feat_name": hard_neg_feat_name})
return mapping_edge_list
return hard_neg_list


def shuffle_hard_negative_nids(gsprocessing_config, num_parts, output_path):
def shuffle_hard_negative_nids(gsprocessing_config, num_parts, graph_path):
"""Shuffle hard negative edge feature ids with int-to-int node id mapping.
The function here align with the shuffle_hard_nids in graphstorm.gconstruct.utils.
Create an additional function to handle the id mappings under the distributed setting.
Expand All @@ -64,7 +64,7 @@ def shuffle_hard_negative_nids(gsprocessing_config, num_parts, output_path):
Path to the gsprocessing config.
num_parts: int
Number of parts.
output_path: str
graph_path: str
Path to the output DGL graph.
"""
shuffled_edge_config = load_hard_negative_config(gsprocessing_config)
Expand All @@ -73,7 +73,7 @@ def shuffle_hard_negative_nids(gsprocessing_config, num_parts, output_path):
for single_shuffled_edge_config in shuffled_edge_config:
node_type = single_shuffled_edge_config["dst_node_type"]
node_type_list.append(node_type)
node_mapping = load_dist_nid_map(f"{output_path}/dist_graph", node_type_list)
node_mapping = load_dist_nid_map(f"{graph_path}/dist_graph", node_type_list)
gnid2pnid_mapping = {}

def get_gnid2pnid_map(ntype):
Expand All @@ -89,7 +89,7 @@ def get_gnid2pnid_map(ntype):

# iterate all the partitions to convert hard negative node ids.
for i in range(num_parts):
part_path = os.path.join(f"{output_path}/dist_graph", f"part{i}")
part_path = os.path.join(f"{graph_path}/dist_graph", f"part{i}")
edge_feat_path = os.path.join(part_path, "edge_feat.dgl")

# load edge features first
Expand Down
14 changes: 13 additions & 1 deletion python/graphstorm/model/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -394,7 +394,19 @@ def _exchange_node_id_mapping(rank, world_size, device,
return gather_list[0].to(th.device("cpu"))

def load_dist_nid_map(node_id_mapping_file, ntypes):
""" Wrapper for load_dist_nid_map.
""" Load id mapping files in dist partition format.
Parameters
----------
node_id_mapping_file: str
Node mapping directory.
ntypes: list[str]
List of node types.
Return
------
id_mappings: dict
Node mapping dictionary.
"""
return _load_dist_nid_map(node_id_mapping_file, ntypes)

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
{
"graph": {
"nodes": [
{
"data": {
"format": "parquet",
"files": [
"./nodes/author.parquet"
]
},
"type": "author",
"column": "node_id"
},
{
"data": {
"format": "parquet",
"files": [
"./nodes/paper.parquet"
]
},
"type": "paper",
"column": "node_id"
}
],
"edges": [
{
"data": {
"format": "parquet",
"files": [
"./edges/author_writing_paper_hard_negative.parquet"
]
},
"source": {
"column": "source_id",
"type": "author"
},
"dest": {
"column": "dest_id",
"type": "paper"
},
"relation": {
"type": "writing"
},
"features": [
{
"column": "hard_neg",
"name": "hard_neg_feat",
"transformation": {
"name": "edge_dst_hard_negative",
"kwargs": {
"separator": ";"
}
}
}
]
},
{
"data": {
"format": "parquet",
"files": [
"./edges/paper_citing_paper.parquet"
]
},
"source": {
"column": "source_id",
"type": "paper"
},
"dest": {
"column": "dest_id",
"type": "paper"
},
"relation": {
"type": "citing"
}
}
]
},
"version": "gsprocessing-v1.0"
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
{
"graph": {
"nodes": [
{
"data": {
"format": "parquet",
"files": [
"./nodes/author.parquet"
]
},
"type": "author",
"column": "node_id"
},
{
"data": {
"format": "parquet",
"files": [
"./nodes/paper.parquet"
]
},
"type": "paper",
"column": "node_id"
}
],
"edges": [
{
"data": {
"format": "parquet",
"files": [
"./edges/author_writing_paper_hard_negative.parquet"
]
},
"source": {
"column": "source_id",
"type": "author"
},
"dest": {
"column": "dest_id",
"type": "paper"
},
"relation": {
"type": "writing"
}
},
{
"data": {
"format": "parquet",
"files": [
"./edges/paper_citing_paper.parquet"
]
},
"source": {
"column": "source_id",
"type": "paper"
},
"dest": {
"column": "dest_id",
"type": "paper"
},
"relation": {
"type": "citing"
}
}
]
},
"version": "gsprocessing-v1.0"
}
Loading

0 comments on commit e631d2c

Please sign in to comment.