"""Module for graph partition utilities."""
import os
import re
import time
import numpy as np
from . import backend as F, utils
from ._ffi.function import _init_api
from .base import EID, ETYPE, NID, NTYPE
from .heterograph import DGLGraph
from .ndarray import NDArray
from .subgraph import edge_subgraph
__all__ = [
"metis_partition",
"metis_partition_assignment",
"partition_graph_with_halo",
]
def reorder_nodes(g, new_node_ids):
"""Generate a new graph with new node IDs.
We assign each node in the input graph with a new node ID. This results in
a new graph.
Parameters
----------
g : DGLGraph
The input graph
new_node_ids : a tensor
The new node IDs
Returns
-------
DGLGraph
The graph with new node IDs.
"""
assert (
len(new_node_ids) == g.num_nodes()
), "The number of new node ids must match #nodes in the graph."
new_node_ids = utils.toindex(new_node_ids)
sorted_ids, idx = F.sort_1d(new_node_ids.tousertensor())
assert (
F.asnumpy(sorted_ids[0]) == 0
and F.asnumpy(sorted_ids[-1]) == g.num_nodes() - 1
), "The new node IDs are incorrect."
new_gidx = _CAPI_DGLReorderGraph_Hetero(
g._graph, new_node_ids.todgltensor()
)
new_g = DGLGraph(gidx=new_gidx, ntypes=["_N"], etypes=["_E"])
new_g.ndata["orig_id"] = idx
return new_g
def _get_halo_heterosubgraph_inner_node(halo_subg):
return _CAPI_GetHaloSubgraphInnerNodes_Hetero(halo_subg)
def reshuffle_graph(g, node_part=None):
"""Reshuffle node ids and edge IDs of a graph.
This function reshuffles nodes and edges in a graph so that all nodes/edges of the same type
have contiguous IDs. If a graph is partitioned and nodes are assigned to different partitions,
all nodes/edges in a partition should
get contiguous IDs; within a partition, all nodes/edges of the same type have contigous IDs.
Parameters
----------
g : DGLGraph
The input graph.
node_part : Tensor
This is a vector whose length is the same as the number of nodes in the input graph.
Each element indicates the partition ID the corresponding node is assigned to.
Returns
-------
(DGLGraph, Tensor)
The graph whose nodes and edges are reshuffled.
The 1D tensor that indicates the partition IDs of the nodes in the reshuffled graph.
"""
# In this case, we don't need to reshuffle node IDs and edge IDs.
if node_part is None:
g.ndata["orig_id"] = F.arange(0, g.num_nodes())
g.edata["orig_id"] = F.arange(0, g.num_edges())
return g, None
start = time.time()
if node_part is not None:
node_part = utils.toindex(node_part)
node_part = node_part.tousertensor()
if NTYPE in g.ndata:
is_hetero = len(F.unique(g.ndata[NTYPE])) > 1
else:
is_hetero = False
if is_hetero:
num_node_types = F.max(g.ndata[NTYPE], 0) + 1
if node_part is not None:
sorted_part, new2old_map = F.sort_1d(
node_part * num_node_types + g.ndata[NTYPE]
)
else:
sorted_part, new2old_map = F.sort_1d(g.ndata[NTYPE])
sorted_part = F.floor_div(sorted_part, num_node_types)
elif node_part is not None:
sorted_part, new2old_map = F.sort_1d(node_part)
else:
g.ndata["orig_id"] = g.ndata[NID]
g.edata["orig_id"] = g.edata[EID]
return g, None
new_node_ids = np.zeros((g.num_nodes(),), dtype=np.int64)
new_node_ids[F.asnumpy(new2old_map)] = np.arange(0, g.num_nodes())
# If the input graph is homogneous, we only need to create an empty array, so that
# _CAPI_DGLReassignEdges_Hetero knows how to handle it.
etype = (
g.edata[ETYPE]
if ETYPE in g.edata
else F.zeros((0), F.dtype(sorted_part), F.cpu())
)
g = reorder_nodes(g, new_node_ids)
node_part = utils.toindex(sorted_part)
# We reassign edges in in-CSR. In this way, after partitioning, we can ensure
# that all edges in a partition are in the contiguous ID space.
etype_idx = utils.toindex(etype)
orig_eids = _CAPI_DGLReassignEdges_Hetero(
g._graph, etype_idx.todgltensor(), node_part.todgltensor(), True
)
orig_eids = utils.toindex(orig_eids)
orig_eids = orig_eids.tousertensor()
g.edata["orig_id"] = orig_eids
print(
"Reshuffle nodes and edges: {:.3f} seconds".format(time.time() - start)
)
return g, node_part.tousertensor()
[docs]def partition_graph_with_halo(g, node_part, extra_cached_hops, reshuffle=False):
"""Partition a graph.
Based on the given node assignments for each partition, the function splits
the input graph into subgraphs. A subgraph may contain HALO nodes which does
not belong to the partition of a subgraph but are connected to the nodes
in the partition within a fixed number of hops.
If `reshuffle` is turned on, the function reshuffles node IDs and edge IDs
of the input graph before partitioning. After reshuffling, all nodes and edges
in a partition fall in a contiguous ID range in the input graph.
The partitioend subgraphs have node data 'orig_id', which stores the node IDs
in the original input graph.
Parameters
------------
g: DGLGraph
The graph to be partitioned
node_part: 1D tensor
Specify which partition a node is assigned to. The length of this tensor
needs to be the same as the number of nodes of the graph. Each element
indicates the partition ID of a node.
extra_cached_hops: int
The number of hops a HALO node can be accessed.
reshuffle : bool
Resuffle nodes so that nodes in the same partition are in the same ID range.
Returns
--------
a dict of DGLGraphs
The key is the partition ID and the value is the DGLGraph of the partition.
Tensor
1D tensor that stores the mapping between the reshuffled node IDs and
the original node IDs if 'reshuffle=True'. Otherwise, return None.
Tensor
1D tensor that stores the mapping between the reshuffled edge IDs and
the original edge IDs if 'reshuffle=True'. Otherwise, return None.
"""
assert len(node_part) == g.num_nodes()
if reshuffle:
g, node_part = reshuffle_graph(g, node_part)
orig_nids = g.ndata["orig_id"]
orig_eids = g.edata["orig_id"]
node_part = utils.toindex(node_part)
start = time.time()
subgs = _CAPI_DGLPartitionWithHalo_Hetero(
g._graph, node_part.todgltensor(), extra_cached_hops
)
# g is no longer needed. Free memory.
g = None
print("Split the graph: {:.3f} seconds".format(time.time() - start))
subg_dict = {}
node_part = node_part.tousertensor()
start = time.time()
# This function determines whether an edge belongs to a partition.
# An edge is assigned to a partition based on its destination node. If its destination node
# is assigned to a partition, we assign the edge to the partition as well.
def get_inner_edge(subg, inner_node):
inner_edge = F.zeros((subg.num_edges(),), F.int8, F.cpu())
inner_nids = F.nonzero_1d(inner_node)
# TODO(zhengda) we need to fix utils.toindex() to avoid the dtype cast below.
inner_nids = F.astype(inner_nids, F.int64)
inner_eids = subg.in_edges(inner_nids, form="eid")
inner_edge = F.scatter_row(
inner_edge,
inner_eids,
F.ones((len(inner_eids),), F.dtype(inner_edge), F.cpu()),
)
return inner_edge
# This creaets a subgraph from subgraphs returned from the CAPI above.
def create_subgraph(subg, induced_nodes, induced_edges, inner_node):
subg1 = DGLGraph(gidx=subg.graph, ntypes=["_N"], etypes=["_E"])
# If IDs are shuffled, we should shuffled edges. This will help us collect edge data
# from the distributed graph after training.
if reshuffle:
# When we shuffle edges, we need to make sure that the inner edges are assigned with
# contiguous edge IDs and their ID range starts with 0. In other words, we want to
# place these edge IDs in the front of the edge list. To ensure that, we add the IDs
# of outer edges with a large value, so we will get the sorted list as we want.
max_eid = F.max(induced_edges[0], 0) + 1
inner_edge = get_inner_edge(subg1, inner_node)
eid = F.astype(induced_edges[0], F.int64) + max_eid * F.astype(
inner_edge == 0, F.int64
)
_, index = F.sort_1d(eid)
subg1 = edge_subgraph(subg1, index, relabel_nodes=False)
subg1.ndata[NID] = induced_nodes[0]
subg1.edata[EID] = F.gather_row(induced_edges[0], index)
else:
subg1.ndata[NID] = induced_nodes[0]
subg1.edata[EID] = induced_edges[0]
return subg1
for i, subg in enumerate(subgs):
inner_node = _get_halo_heterosubgraph_inner_node(subg)
inner_node = F.zerocopy_from_dlpack(inner_node.to_dlpack())
subg = create_subgraph(
subg, subg.induced_nodes, subg.induced_edges, inner_node
)
subg.ndata["inner_node"] = inner_node
subg.ndata["part_id"] = F.gather_row(node_part, subg.ndata[NID])
if reshuffle:
subg.ndata["orig_id"] = F.gather_row(orig_nids, subg.ndata[NID])
subg.edata["orig_id"] = F.gather_row(orig_eids, subg.edata[EID])
if extra_cached_hops >= 1:
inner_edge = get_inner_edge(subg, inner_node)
else:
inner_edge = F.ones((subg.num_edges(),), F.int8, F.cpu())
subg.edata["inner_edge"] = inner_edge
subg_dict[i] = subg
print("Construct subgraphs: {:.3f} seconds".format(time.time() - start))
if reshuffle:
return subg_dict, orig_nids, orig_eids
else:
return subg_dict, None, None
def get_peak_mem():
"""Get the peak memory size.
Returns
-------
float
The peak memory size in GB.
"""
if not os.path.exists("/proc/self/status"):
return 0.0
for line in open("/proc/self/status", "r"):
if "VmPeak" in line:
mem = re.findall(r"\d+", line)[0]
return int(mem) / 1024 / 1024
return 0.0
[docs]def metis_partition_assignment(
g, k, balance_ntypes=None, balance_edges=False, mode="k-way", objtype="cut"
):
"""This assigns nodes to different partitions with Metis partitioning algorithm.
When performing Metis partitioning, we can put some constraint on the partitioning.
Current, it supports two constrants to balance the partitioning. By default, Metis
always tries to balance the number of nodes in each partition.
* `balance_ntypes` balances the number of nodes of different types in each partition.
* `balance_edges` balances the number of edges in each partition.
To balance the node types, a user needs to pass a vector of N elements to indicate
the type of each node. N is the number of nodes in the input graph.
After the partition assignment, we construct partitions.
Parameters
----------
g : DGLGraph
The graph to be partitioned
k : int
The number of partitions.
balance_ntypes : tensor
Node type of each node
balance_edges : bool
Indicate whether to balance the edges.
mode : str, "k-way" or "recursive"
Whether use multilevel recursive bisection or multilevel k-way paritioning.
objtype : str, "cut" or "vol"
Set the objective as edge-cut minimization or communication volume minimization. This
argument is used by the Metis algorithm.
Returns
-------
a 1-D tensor
A vector with each element that indicates the partition ID of a vertex.
"""
assert mode in (
"k-way",
"recursive",
), "'mode' can only be 'k-way' or 'recursive'"
assert (
g.idtype == F.int64
), "IdType of graph is required to be int64 for now."
# METIS works only on symmetric graphs.
# The METIS runs on the symmetric graph to generate the node assignment to partitions.
start = time.time()
sym_gidx = _CAPI_DGLMakeSymmetric_Hetero(g._graph)
sym_g = DGLGraph(gidx=sym_gidx)
print(
"Convert a graph into a bidirected graph: {:.3f} seconds, peak memory: {:.3f} GB".format(
time.time() - start, get_peak_mem()
)
)
vwgt = []
# To balance the node types in each partition, we can take advantage of the vertex weights
# in Metis. When vertex weights are provided, Metis will tries to generate partitions with
# balanced vertex weights. A vertex can be assigned with multiple weights. The vertex weights
# are stored in a vector of N * w elements, where N is the number of vertices and w
# is the number of weights per vertex. Metis tries to balance the first weight, and then
# the second weight, and so on.
# When balancing node types, we use the first weight to indicate the first node type.
# if a node belongs to the first node type, its weight is set to 1; otherwise, 0.
# Similary, we set the second weight for the second node type and so on. The number
# of weights is the same as the number of node types.
start = time.time()
if balance_ntypes is not None:
assert (
len(balance_ntypes) == g.num_nodes()
), "The length of balance_ntypes should be equal to #nodes in the graph"
balance_ntypes = F.tensor(balance_ntypes)
uniq_ntypes = F.unique(balance_ntypes)
for ntype in uniq_ntypes:
vwgt.append(F.astype(balance_ntypes == ntype, F.int64))
# When balancing edges in partitions, we use in-degree as one of the weights.
if balance_edges:
if balance_ntypes is None:
vwgt.append(F.astype(g.in_degrees(), F.int64))
else:
for ntype in uniq_ntypes:
nids = F.asnumpy(F.nonzero_1d(balance_ntypes == ntype))
degs = np.zeros((g.num_nodes(),), np.int64)
degs[nids] = F.asnumpy(g.in_degrees(nids))
vwgt.append(F.zerocopy_from_numpy(degs))
# The vertex weights have to be stored in a vector.
if len(vwgt) > 0:
vwgt = F.stack(vwgt, 1)
shape = (
np.prod(
F.shape(vwgt),
),
)
vwgt = F.reshape(vwgt, shape)
vwgt = F.to_dgl_nd(vwgt)
else:
vwgt = F.zeros((0,), F.int64, F.cpu())
vwgt = F.to_dgl_nd(vwgt)
print(
"Construct multi-constraint weights: {:.3f} seconds, peak memory: {:.3f} GB".format(
time.time() - start, get_peak_mem()
)
)
start = time.time()
node_part = _CAPI_DGLMetisPartition_Hetero(
sym_g._graph, k, vwgt, mode, (objtype == "cut")
)
print(
"Metis partitioning: {:.3f} seconds, peak memory: {:.3f} GB".format(
time.time() - start, get_peak_mem()
)
)
if len(node_part) == 0:
return None
else:
node_part = utils.toindex(node_part)
return node_part.tousertensor()
[docs]def metis_partition(
g,
k,
extra_cached_hops=0,
reshuffle=False,
balance_ntypes=None,
balance_edges=False,
mode="k-way",
):
"""This is to partition a graph with Metis partitioning.
Metis assigns vertices to partitions. This API constructs subgraphs with the vertices assigned
to the partitions and their incoming edges. A subgraph may contain HALO nodes which does
not belong to the partition of a subgraph but are connected to the nodes
in the partition within a fixed number of hops.
When performing Metis partitioning, we can put some constraint on the partitioning.
Current, it supports two constrants to balance the partitioning. By default, Metis
always tries to balance the number of nodes in each partition.
* `balance_ntypes` balances the number of nodes of different types in each partition.
* `balance_edges` balances the number of edges in each partition.
To balance the node types, a user needs to pass a vector of N elements to indicate
the type of each node. N is the number of nodes in the input graph.
If `reshuffle` is turned on, the function reshuffles node IDs and edge IDs
of the input graph before partitioning. After reshuffling, all nodes and edges
in a partition fall in a contiguous ID range in the input graph.
The partitioend subgraphs have node data 'orig_id', which stores the node IDs
in the original input graph.
The partitioned subgraph is stored in DGLGraph. The DGLGraph has the `part_id`
node data that indicates the partition a node belongs to. The subgraphs do not contain
the node/edge data in the input graph.
Parameters
------------
g: DGLGraph
The graph to be partitioned
k: int
The number of partitions.
extra_cached_hops: int
The number of hops a HALO node can be accessed.
reshuffle : bool
Resuffle nodes so that nodes in the same partition are in the same ID range.
balance_ntypes : tensor
Node type of each node
balance_edges : bool
Indicate whether to balance the edges.
mode : str, "k-way" or "recursive"
Whether use multilevel recursive bisection or multilevel k-way paritioning.
Returns
--------
a dict of DGLGraphs
The key is the partition ID and the value is the DGLGraph of the partition.
"""
assert mode in (
"k-way",
"recursive",
), "'mode' can only be 'k-way' or 'recursive'"
node_part = metis_partition_assignment(
g, k, balance_ntypes, balance_edges, mode
)
if node_part is None:
return None
# Then we split the original graph into parts based on the METIS partitioning results.
return partition_graph_with_halo(
g, node_part, extra_cached_hops, reshuffle
)[0]
class NDArrayPartition(object):
"""Create a new partition of an NDArray. That is, an object which assigns
each row of an NDArray to a specific partition.
Parameters
----------
array_size : int
The first dimension of the array being partitioned.
num_parts : int
The number of parts to divide the array into.
mode : String
The type of partition. Currently, the only valid values are
'remainder' and 'range'.
'remainder' assigns rows based on remainder when dividing the row id by the
number of parts (e.g., i % num_parts).
'range' assigns rows based on which part of the range 'part_ranges'
they fall into.
part_ranges : Tensor or dgl.NDArray, Optional
Should only be specified when the mode is 'range'. Should be of the
length `num_parts + 1`, and be the exclusive prefix-sum of the number
of nodes in each partition. That is, for 3 partitions, we could have
the list [0, a, b, 'array_size'], and all rows with index less
than 'a' are assigned to partition 0, all rows with index greater than
or equal to 'a' and less than 'b' are in partition 1, and all rows
with index greater or equal to 'b' are in partition 2. Should have
the same context as the partitioned NDArray (i.e., be on the same GPU).
Examples
--------
A partition of a homgeonous graph `g`, where the vertices are
striped across processes can be generated via:
>>> from dgl.partition import NDArrayPartition
>>> part = NDArrayPartition(g.num_nodes(), num_parts, mode='remainder' )
A range based partition of a homogenous graph `g`'s nodes, where
the nodes are stored in contiguous memory. This converts an existing
range based partitioning (e.g. from a
dgl.distributed.graph_partition_book.RangePartitionBook)
'max_node_map', to an NDArrayPartition 'part'.
>>> part_range = [0]
>>> for part in part_book.metadata():
>>> part_range.append(part_range[-1] + part['num_nodes'])
>>> part = NDArrayPartition(g.num_nodes(), num_parts, mode='range',
... part_ranges=part_range)
"""
def __init__(
self, array_size, num_parts, mode="remainder", part_ranges=None
):
assert num_parts > 0, 'Invalid "num_parts", must be > 0.'
if mode == "remainder":
assert part_ranges is None, (
"When using remainder-based "
'partitioning, "part_ranges" should not be specified.'
)
self._partition = _CAPI_DGLNDArrayPartitionCreateRemainderBased(
array_size, num_parts
)
elif mode == "range":
assert part_ranges is not None, (
"When using range-based "
'partitioning, "part_ranges" must not be None.'
)
assert part_ranges[0] == 0 and part_ranges[-1] == array_size, (
"part_ranges[0] must be 0, and part_ranges[-1] must be "
'"array_size".'
)
if F.is_tensor(part_ranges):
part_ranges = F.zerocopy_to_dgl_ndarray(part_ranges)
assert isinstance(part_ranges, NDArray), (
'"part_ranges" must ' "be Tensor or dgl.NDArray."
)
self._partition = _CAPI_DGLNDArrayPartitionCreateRangeBased(
array_size, num_parts, part_ranges
)
else:
assert False, 'Unknown partition mode "{}"'.format(mode)
self._array_size = array_size
self._num_parts = num_parts
def num_parts(self):
"""Get the number of partitions."""
return self._num_parts
def array_size(self):
"""Get the total size of the first dimension of the partitioned array."""
return self._array_size
def get(self):
"""Get the C-handle for this object."""
return self._partition
def get_local_indices(self, part, ctx):
"""Get the set of global indices in this given partition."""
return self.map_to_global(
F.arange(0, self.local_size(part), ctx=ctx), part
)
def local_size(self, part):
"""Get the number of rows/items assigned to the given part."""
return _CAPI_DGLNDArrayPartitionGetPartSize(self._partition, part)
def map_to_local(self, idxs):
"""Convert the set of global indices to local indices"""
return F.zerocopy_from_dgl_ndarray(
_CAPI_DGLNDArrayPartitionMapToLocal(
self._partition, F.zerocopy_to_dgl_ndarray(idxs)
)
)
def map_to_global(self, idxs, part_id):
"""Convert the set of local indices ot global indices"""
return F.zerocopy_from_dgl_ndarray(
_CAPI_DGLNDArrayPartitionMapToGlobal(
self._partition, F.zerocopy_to_dgl_ndarray(idxs), part_id
)
)
def generate_permutation(self, idxs):
"""Produce a scheme that maps the given indices to separate partitions
and the counts of how many indices are in each partition.
Parameters
----------
idxs: torch.Tensor.
A tensor with shape (`num_indices`,), representing global indices.
Return
------
torch.Tensor.
A tensor with shape (`num_indices`,), representing the permutation
to re-order the indices by partition.
torch.Tensor.
A tensor with shape (`num_partition`,), representing the number of
indices per partition.
Examples
--------
>>> import torch
>>> from dgl.partition import NDArrayPartition
>>> part = NDArrayPartition(10, 2, mode="remainder")
>>> idx = torch.tensor([0, 2, 4, 5, 8, 8, 9], device="cuda:0")
>>> perm, splits_sum = part.generate_permutation(idx)
>>> perm
tensor([0, 1, 2, 4, 5, 3, 6], device='cuda:0')
>>> splits_sum
tensor([5, 2], device='cuda:0')
"""
ret = _CAPI_DGLNDArrayPartitionGeneratePermutation(
self._partition, F.zerocopy_to_dgl_ndarray(idxs)
)
return F.zerocopy_from_dgl_ndarray(ret(0)), F.zerocopy_from_dgl_ndarray(
ret(1)
)
_init_api("dgl.partition")