Source code for dgl.distributed.dist_tensor

"""Define distributed tensor."""

import os

from .. import backend as F

from .dist_context import is_initialized
from .kvstore import get_kvstore
from .role import get_role
from .rpc import get_group_id
from .utils import totensor


def _default_init_data(shape, dtype):
    return F.zeros(shape, dtype, F.cpu())


# These IDs can identify the anonymous distributed tensors.
DIST_TENSOR_ID = 0


[docs]class DistTensor: """Distributed tensor. ``DistTensor`` references to a distributed tensor sharded and stored in a cluster of machines. It has the same interface as Pytorch Tensor to access its metadata (e.g., shape and data type). To access data in a distributed tensor, it supports slicing rows and writing data to rows. It does not support any operators of a deep learning framework, such as addition and multiplication. Currently, distributed tensors are designed to store node data and edge data of a distributed graph. Therefore, their first dimensions have to be the number of nodes or edges in the graph. The tensors are sharded in the first dimension based on the partition policy of nodes or edges. When a distributed tensor is created, the partition policy is automatically determined based on the first dimension if the partition policy is not provided. If the first dimension matches the number of nodes of a node type, ``DistTensor`` will use the partition policy for this particular node type; if the first dimension matches the number of edges of an edge type, ``DistTensor`` will use the partition policy for this particular edge type. If DGL cannot determine the partition policy automatically (e.g., multiple node types or edge types have the same number of nodes or edges), users have to explicity provide the partition policy. A distributed tensor can be ether named or anonymous. When a distributed tensor has a name, the tensor can be persistent if ``persistent=True``. Normally, DGL destroys the distributed tensor in the system when the ``DistTensor`` object goes away. However, a persistent tensor lives in the system even if the ``DistTenor`` object disappears in the trainer process. The persistent tensor has the same life span as the DGL servers. DGL does not allow an anonymous tensor to be persistent. When a ``DistTensor`` object is created, it may reference to an existing distributed tensor or create a new one. A distributed tensor is identified by the name passed to the constructor. If the name exists, ``DistTensor`` will reference the existing one. In this case, the shape and the data type must match the existing tensor. If the name doesn't exist, a new tensor will be created in the kvstore. When a distributed tensor is created, its values are initialized to zero. Users can define an initialization function to control how the values are initialized. The init function has two input arguments: shape and data type and returns a tensor. Below shows an example of an init function: .. highlight:: python .. code-block:: python def init_func(shape, dtype): return torch.ones(shape=shape, dtype=dtype) Parameters ---------- shape : tuple The shape of the tensor. The first dimension has to be the number of nodes or the number of edges of a distributed graph. dtype : dtype The dtype of the tensor. The data type has to be the one in the deep learning framework. name : string, optional The name of the embeddings. The name can uniquely identify embeddings in a system so that another ``DistTensor`` object can referent to the distributed tensor. init_func : callable, optional The function to initialize data in the tensor. If the init function is not provided, the values of the embeddings are initialized to zero. part_policy : PartitionPolicy, optional The partition policy of the rows of the tensor to different machines in the cluster. Currently, it only supports node partition policy or edge partition policy. The system determines the right partition policy automatically. persistent : bool Whether the created tensor lives after the ``DistTensor`` object is destroyed. is_gdata : bool Whether the created tensor is a ndata/edata or not. attach : bool Whether to attach group ID into name to be globally unique. Examples -------- >>> init = lambda shape, dtype: th.ones(shape, dtype=dtype) >>> arr = dgl.distributed.DistTensor((g.num_nodes(), 2), th.int32, init_func=init) >>> print(arr[0:3]) tensor([[1, 1], [1, 1], [1, 1]], dtype=torch.int32) >>> arr[0:3] = th.ones((3, 2), dtype=th.int32) * 2 >>> print(arr[0:3]) tensor([[2, 2], [2, 2], [2, 2]], dtype=torch.int32) Note ---- The creation of ``DistTensor`` is a synchronized operation. When a trainer process tries to create a ``DistTensor`` object, the creation succeeds only when all trainer processes do the same. """ def __init__( self, shape, dtype, name=None, init_func=None, part_policy=None, persistent=False, is_gdata=True, attach=True, ): self.kvstore = get_kvstore() assert ( self.kvstore is not None ), "Distributed module is not initialized. Please call dgl.distributed.initialize." self._shape = shape self._dtype = dtype self._attach = attach self._is_gdata = is_gdata part_policies = self.kvstore.all_possible_part_policy # If a user doesn't provide a partition policy, we should find one based on # the input shape. if part_policy is None: for policy_name in part_policies: policy = part_policies[policy_name] if policy.get_size() == shape[0]: # If multiple partition policies match the input shape, we cannot # decide which is the right one automatically. We should ask users # to provide one. assert part_policy is None, ( "Multiple partition policies match the input shape. " + "Please provide a partition policy explicitly." ) part_policy = policy assert part_policy is not None, ( "Cannot find a right partition policy. It is either because " + "its first dimension does not match the number of nodes or edges " + "of a distributed graph or there does not exist a distributed graph." ) self._part_policy = part_policy assert ( part_policy.get_size() == shape[0] ), "The partition policy does not match the input shape." if init_func is None: init_func = _default_init_data exist_names = self.kvstore.data_name_list() # If a user doesn't provide a name, we generate a name ourselves. # We need to generate the name in a deterministic way. if name is None: assert ( not persistent ), "We cannot generate anonymous persistent distributed tensors" global DIST_TENSOR_ID # All processes of the same role should create DistTensor synchronously. # Thus, all of them should have the same IDs. name = "anonymous-" + get_role() + "-" + str(DIST_TENSOR_ID) DIST_TENSOR_ID += 1 assert isinstance(name, str), "name {} is type {}".format( name, type(name) ) name = self._attach_group_id(name) self._tensor_name = name data_name = part_policy.get_data_name(name) self._name = str(data_name) self._persistent = persistent if self._name not in exist_names: self._owner = True self.kvstore.init_data( self._name, shape, dtype, part_policy, init_func, is_gdata ) else: self._owner = False dtype1, shape1, _ = self.kvstore.get_data_meta(self._name) assert ( dtype == dtype1 ), "The dtype does not match with the existing tensor" assert ( shape == shape1 ), "The shape does not match with the existing tensor" def __del__(self): initialized = ( os.environ.get("DGL_DIST_MODE", "standalone") == "standalone" or is_initialized() ) if not self._persistent and self._owner and initialized: self.kvstore.delete_data(self._name) def __getitem__(self, idx): idx = totensor(idx) return self.kvstore.pull(name=self._name, id_tensor=idx) def __setitem__(self, idx, val): idx = totensor(idx) # TODO(zhengda) how do we want to support broadcast (e.g., G.ndata['h'][idx] = 1). self.kvstore.push(name=self._name, id_tensor=idx, data_tensor=val) @property def kvstore_key(self): """Return the key string of this DistTensor in the associated KVStore.""" return self._name @property def local_partition(self): """Return the local partition of this DistTensor.""" return self.kvstore.data_store[self._name] def __or__(self, other): new_dist_tensor = DistTensor( self._shape, self._dtype, part_policy=self._part_policy, persistent=self._persistent, is_gdata=self._is_gdata, attach=self._attach, ) kvstore = self.kvstore kvstore.union(self._name, other._name, new_dist_tensor._name) return new_dist_tensor def __len__(self): return self._shape[0] @property def part_policy(self): """Return the partition policy Returns ------- PartitionPolicy The partition policy of the distributed tensor. """ return self._part_policy @property def shape(self): """Return the shape of the distributed tensor. Returns ------- tuple The shape of the distributed tensor. """ return self._shape @property def dtype(self): """Return the data type of the distributed tensor. Returns ------ dtype The data type of the tensor. """ return self._dtype @property def name(self): """Return the name of the distributed tensor Returns ------- str The name of the tensor. """ return self._detach_group_id(self._name) @property def tensor_name(self): """Return the tensor name Returns ------- str The name of the tensor. """ return self._detach_group_id(self._tensor_name) def count_nonzero(self): """Count and return the number of nonzero value Returns ------- int the number of nonzero value """ return self.kvstore.count_nonzero(name=self._name) def _attach_group_id(self, name): """Attach group ID if needed Returns ------- str new name with group ID attached """ if not self._attach: return name return "{}_{}".format(name, get_group_id()) def _detach_group_id(self, name): """Detach group ID if needed Returns ------- str original name without group ID """ if not self._attach: return name suffix = "_{}".format(get_group_id()) return name[: -len(suffix)]