diff --git a/python/cugraph/cugraph/dask/sampling/node2vec_random_walks.py b/python/cugraph/cugraph/dask/sampling/node2vec_random_walks.py index d70601841c..18171eda62 100644 --- a/python/cugraph/cugraph/dask/sampling/node2vec_random_walks.py +++ b/python/cugraph/cugraph/dask/sampling/node2vec_random_walks.py @@ -48,30 +48,31 @@ def convert_to_cudf(cp_paths, number_map=None, is_vertex_paths=False): return cudf.Series(cp_paths) -def _call_plc_node2vec_random_walks(sID, mg_graph_x, st_x, max_depth, compress_result, p, q): +def _call_plc_node2vec_random_walks(sID, mg_graph_x, st_x, max_depth, p, q, random_state): return pylibcugraph_node2vec_random_walks( resource_handle=ResourceHandle(Comms.get_handle(sID).getHandle()), graph=mg_graph_x, seed_array=st_x, max_depth=max_depth, - compress_result=compress_result, p=p, - q=q + q=q, + random_state=random_state ) +# FIXME: Add type anotation def node2vec_random_walks( input_graph, start_vertices=None, max_depth=None, - compress_result=True, p=1.0, - q=1.0 + q=1.0, + random_state=None ): """ Computes random walks for each node in 'start_vertices', under the - node2vec_random_walks sampling framework. + node2vec sampling framework. parameters ---------- @@ -87,10 +88,6 @@ def node2vec_random_walks( The maximum depth of the random walks. If not specified, the maximum depth is set to 1. - compress_result: bool, optional (default=True) - If True, coalesced paths are returned with a sizes array with offsets. - Otherwise padded paths are returned with an empty sizes array. - p: float, optional (default=1.0, [0 < p]) Return factor, which represents the likelihood of backtracking to a previous node in the walk. A higher value makes it less likely to @@ -103,6 +100,9 @@ def node2vec_random_walks( is likelier to visit nodes closer to the outgoing node. If q < 1, the random walk is likelier to visit nodes further from the outgoing node. A positive float. + + random_state: int, optional + Random seed to use when making sampling calls. Returns ------- @@ -112,9 +112,6 @@ def node2vec_random_walks( edge_weight_paths: dask_cudf.Series Series containing the edge weights of edges represented by the returned vertex_paths - - sizes : dask_cudf.Series - The path size or sizes in case of coalesced paths. """ client = default_client() @@ -122,10 +119,6 @@ def node2vec_random_walks( raise ValueError( f"'max_depth' must be a positive integer, " f"got: {max_depth}" ) - if not isinstance(compress_result, bool): - raise ValueError( - f"'compress_result' must be a bool, " f"got: {compress_result}" - ) if (not isinstance(p, float)) or (p <= 0.0): raise ValueError(f"'p' must be a positive float, got: {p}") if (not isinstance(q, float)) or (q <= 0.0): @@ -158,9 +151,6 @@ def node2vec_random_walks( start_vertices, client, return_type="dict" ) - #print("start vertex_type = ", start_vertices_type) - #print("edgelist type = ", input_graph.edgelist.edgelist_df) - result = [ client.submit( _call_plc_node2vec_random_walks, @@ -168,9 +158,9 @@ def node2vec_random_walks( input_graph._plc_graph[w], start_v[0] if start_v else cudf.Series(dtype=start_vertices_type), max_depth, - compress_result=compress_result, p=p, q=q, + random_state=random_state, workers=[w], allow_other_workers=False, ) @@ -181,7 +171,6 @@ def node2vec_random_walks( result_vertex_paths = [client.submit(op.getitem, f, 0) for f in result] result_edge_wgt_paths = [client.submit(op.getitem, f, 1) for f in result] - result_sizes = [client.submit(op.getitem, f, 2) for f in result] cudf_vertex_paths = [ client.submit(convert_to_cudf, cp_vertex_paths, input_graph.renumber_map, True) @@ -193,26 +182,17 @@ def node2vec_random_walks( for cp_edge_wgt_paths in result_edge_wgt_paths ] - cudf_sizes = [ - client.submit(convert_to_cudf, cp_sizes) - for cp_sizes in result_sizes - ] - - wait([cudf_vertex_paths, cudf_edge_wgt_paths, cudf_sizes]) + wait([cudf_vertex_paths, cudf_edge_wgt_paths]) - ddf_vertex_paths = dask_cudf.from_delayed(cudf_vertex_paths).persist() ddf_edge_wgt_paths = dask_cudf.from_delayed(cudf_edge_wgt_paths).persist() - ddf_sizes = dask_cudf.from_delayed(cudf_sizes).persist() - #wait([ddf_vertex_paths, ddf_edge_wgt_paths]) - # Wait until the inactive futures are released wait( [ (r.release(), c_v.release(), c_e.release()) - for r, c_v, c_e, c_s in zip(result, cudf_vertex_paths, cudf_edge_wgt_paths, cudf_sizes) + for r, c_v, c_e in zip(result, cudf_vertex_paths, cudf_edge_wgt_paths) ] ) - return ddf_vertex_paths, ddf_edge_wgt_paths, ddf_sizes + return ddf_vertex_paths, ddf_edge_wgt_paths