Source code for dgl.frame

"""Columnar storage for DGLGraph."""
from __future__ import absolute_import

from collections import namedtuple
from collections.abc import MutableMapping

from . import backend as F
from .base import dgl_warning, DGLError
from .init import zero_initializer
from .storages import TensorStorage
from .utils import gather_pinned_tensor_rows, pin_memory_inplace


class _LazyIndex(object):
    def __init__(self, index):
        if isinstance(index, list):
            self._indices = index
        else:
            self._indices = [index]

    def __len__(self):
        return len(self._indices[-1])

    def slice(self, index):
        """Create a new _LazyIndex object sliced by the given index tensor."""
        # if our indices are in the same context, lets just slice now and free
        # memory, otherwise do nothing until we have to
        if F.context(self._indices[-1]) == F.context(index):
            return _LazyIndex(
                self._indices[:-1] + [F.gather_row(self._indices[-1], index)]
            )
        return _LazyIndex(self._indices + [index])

    def flatten(self):
        """Evaluate the chain of indices, and return a single index tensor."""
        flat_index = self._indices[0]
        # here we actually need to resolve it
        for index in self._indices[1:]:
            if F.context(index) != F.context(flat_index):
                index = F.copy_to(index, F.context(flat_index))
            flat_index = F.gather_row(flat_index, index)
        return flat_index

    def record_stream(self, stream):
        """Record stream for index.

        Parameters
        ----------
        stream : torch.cuda.Stream.
        """
        for index in self._indices:
            if F.context(index) != F.cpu():
                index.record_stream(stream)


[docs]class LazyFeature(object): """Placeholder for feature prefetching. One can assign this object to ``ndata`` or ``edata`` of the graphs returned by various samplers' :attr:`sample` method. When DGL's dataloader receives the subgraphs returned by the sampler, it will automatically look up all the ``ndata`` and ``edata`` whose data is a LazyFeature, replacing them with the actual data of the corresponding nodes/edges from the original graph instead. In particular, for a subgraph returned by the sampler has a LazyFeature with name ``k`` in ``subgraph.ndata[key]``: .. code:: python subgraph.ndata[key] = LazyFeature(k) Assuming that ``graph`` is the original graph, DGL's dataloader will perform .. code:: python subgraph.ndata[key] = graph.ndata[k][subgraph.ndata[dgl.NID]] DGL dataloader performs similar replacement for ``edata``. For heterogeneous graphs, the replacement is: .. code:: python subgraph.nodes[ntype].data[key] = graph.nodes[ntype].data[k][ subgraph.nodes[ntype].data[dgl.NID]] For MFGs' ``srcdata`` (and similarly ``dstdata``), the replacement is .. code:: python mfg.srcdata[key] = graph.ndata[k][mfg.srcdata[dgl.NID]] Parameters ---------- name : str The name of the data in the original graph. id_ : Tensor, optional The ID tensor. """ __slots__ = ["name", "id_"] def __init__(self, name=None, id_=None): self.name = name self.id_ = id_ def to( self, *args, **kwargs ): # pylint: disable=invalid-name, unused-argument """No-op. For compatibility of :meth:`Frame.to` method.""" return self @property def data(self): """No-op. For compatibility of :meth:`Frame.__repr__` method.""" return self def pin_memory_(self): """No-op. For compatibility of :meth:`Frame.pin_memory_` method.""" def unpin_memory_(self): """No-op. For compatibility of :meth:`Frame.unpin_memory_` method.""" def record_stream(self, stream): """No-op. For compatibility of :meth:`Frame.record_stream` method."""
class Scheme(namedtuple("Scheme", ["shape", "dtype"])): """The column scheme. Parameters ---------- shape : tuple of int The feature shape. dtype : backend-specific type object The feature data type. """ # Pickling torch dtypes could be problemetic; this is a workaround. # I also have to create data_type_dict and reverse_data_type_dict # attribute just for this bug. # I raised an issue in PyTorch bug tracker: # https://github.com/pytorch/pytorch/issues/14057 def __reduce__(self): state = (self.shape, F.reverse_data_type_dict[self.dtype]) return self._reconstruct_scheme, state @classmethod def _reconstruct_scheme(cls, shape, dtype_str): dtype = F.data_type_dict[dtype_str] return cls(shape, dtype) def infer_scheme(tensor): """Infer column scheme from the given tensor data. Parameters --------- tensor : Tensor The tensor data. Returns ------- Scheme The column scheme. """ return Scheme(tuple(F.shape(tensor)[1:]), F.dtype(tensor)) class Column(TensorStorage): """A column is a compact store of features of multiple nodes/edges. It batches all the feature tensors together along the first dimension as one dense tensor. The column can optionally have an index tensor I. In this case, the i^th feature is stored in ``storage[index[i]]``. The column class implements a Copy-On-Read semantics -- the index select operation happens upon the first read of the feature data. This is useful when one extracts a subset of the feature data but wishes the actual index select happens on-demand. Parameters ---------- storage : Tensor The feature data storage. scheme : Scheme, optional The scheme of the column. Will be inferred if not provided. index : Tensor, optional The row index to the feature data storage. None means an identity mapping. Attributes ---------- storage : Tensor The storage tensor. The storage tensor may not be the actual data tensor of this column when the index tensor is not None. This typically happens when the column is extracted from another column using the `subcolumn` method. It can also be None, which may only happen when transmitting a not-yet-materialized subcolumn from a subprocess to the main process. In this case, the main process should already maintain the content of the storage, and is responsible for restoring the subcolumn's storage pointer. data : Tensor The actual data tensor of this column. scheme : Scheme The scheme of the column. index : Tensor Index tensor """ def __init__(self, storage, *args, **kwargs): super().__init__(storage) self._init(*args, **kwargs) def __len__(self): """The number of features (number of rows) in this column.""" if self.index is None: return F.shape(self.storage)[0] else: return len(self.index) @property def shape(self): """Return the scheme shape (feature shape) of this column.""" return self.scheme.shape @property def data(self): """Return the feature data. Perform index selecting if needed.""" if self.index is not None: if isinstance(self.index, _LazyIndex): self.index = self.index.flatten() storage_ctx = F.context(self.storage) index_ctx = F.context(self.index) # If under the special case where the storage is pinned and the index is on # CUDA, directly call UVA slicing (even if they aree not in the same context). if ( storage_ctx != index_ctx and storage_ctx == F.cpu() and F.is_pinned(self.storage) ): self.storage = gather_pinned_tensor_rows( self.storage, self.index ) else: # If index and storage is not in the same context, # copy index to the same context of storage. # Copy index is usually cheaper than copy data if storage_ctx != index_ctx: kwargs = {} if self.device is not None: kwargs = self.device[1] self.index = F.copy_to(self.index, storage_ctx, **kwargs) self.storage = F.gather_row(self.storage, self.index) self.index = None # move data to the right device if self.device is not None: self.storage = F.copy_to( self.storage, self.device[0], **self.device[1] ) self.device = None # convert data to the right type if self.deferred_dtype is not None: self.storage = F.astype(self.storage, self.deferred_dtype) self.deferred_dtype = None return self.storage @data.setter def data(self, val): """Update the column data.""" self.index = None self.device = None self.deferred_dtype = None self.storage = val self._data_nd = None # should unpin data if it was pinned. self.pinned_by_dgl = False def to(self, device, **kwargs): # pylint: disable=invalid-name """Return a new column with columns copy to the targeted device (cpu/gpu). Parameters ---------- device : Framework-specific device context object The context to move data to. kwargs : Key-word arguments. Key-word arguments fed to the framework copy function. Returns ------- Column A new column """ col = self.clone() col.device = (device, kwargs) return col @property def dtype(self): """Return the effective data type of this Column""" if self.deferred_dtype is not None: return self.deferred_dtype return self.storage.dtype def astype(self, new_dtype): """Return a new column such that when its data is requested, it will be converted to new_dtype. Parameters ---------- new_dtype : Framework-specific type object The type to convert the data to. Returns ------- Column A new column """ col = self.clone() if col.dtype != new_dtype: # If there is already a pending conversion, ensure that the pending # conversion and transfer/sampling are done before this new conversion. if col.deferred_dtype is not None: _ = col.data if (col.device is None) and (col.index is None): # Do the conversion immediately if no device transfer or index # sampling is pending. The assumption is that this is most # likely to be the desired behaviour, such as converting an # entire graph's feature data to float16 (half) before transfer # to device when training, or converting back to float32 (float) # after fetching the data to a device. col.storage = F.astype(col.storage, new_dtype) else: # Defer the conversion if there is a pending transfer or sampling. # This is so that feature data that never gets accessed on the # device never needs to be transferred or sampled or converted. col.deferred_dtype = new_dtype return col def __getitem__(self, rowids): """Return the feature data given the rowids. The operation triggers index selection. Parameters ---------- rowids : Tensor Row ID tensor. Returns ------- Tensor The feature data """ return F.gather_row(self.data, rowids) def __setitem__(self, rowids, feats): """Update the feature data given the index. The update is performed out-placely so it can be used in autograd mode. The operation triggers index selection. Parameters ---------- rowids : Tensor Row IDs. feats : Tensor New features. """ self.update(rowids, feats) def update(self, rowids, feats): """Update the feature data given the index. Parameters ---------- rowids : Tensor Row IDs. feats : Tensor New features. """ feat_scheme = infer_scheme(feats) if feat_scheme != self.scheme: raise DGLError( "Cannot update column of scheme %s using feature of scheme %s." % (feat_scheme, self.scheme) ) self.data = F.scatter_row(self.data, rowids, feats) def extend(self, feats, feat_scheme=None): """Extend the feature data. The operation triggers index selection. Parameters ---------- feats : Tensor The new features. feat_scheme : Scheme, optional The scheme """ if feat_scheme is None: feat_scheme = infer_scheme(feats) if feat_scheme != self.scheme: raise DGLError( "Cannot update column of scheme %s using feature of scheme %s." % (feat_scheme, self.scheme) ) self.data = F.cat([self.data, feats], dim=0) def clone(self): """Return a shallow copy of this column.""" return Column( self.storage, self.scheme, self.index, self.device, self.deferred_dtype, ) def deepclone(self): """Return a deepcopy of this column. The operation triggers index selection. """ return Column(F.clone(self.data), copy.deepcopy(self.scheme)) def subcolumn(self, rowids): """Return a subcolumn. The resulting column will share the same storage as this column so this operation is quite efficient. If the current column is also a sub-column (i.e., the index tensor is not None), the current index tensor will be sliced by 'rowids', if they are on the same context. Otherwise, both index tensors are saved, and only applied when the data is accessed. Parameters ---------- rowids : Tensor Row IDs. Returns ------- Column Sub-column """ if self.index is None: return Column( self.storage, self.scheme, rowids, self.device, self.deferred_dtype, ) else: index = self.index if not isinstance(index, _LazyIndex): index = _LazyIndex(self.index) index = index.slice(rowids) return Column( self.storage, self.scheme, index, self.device, self.deferred_dtype, ) @staticmethod def create(data): """Create a new column using the given data.""" if isinstance(data, Column): return data.clone() else: return Column(data) def __repr__(self): return repr(self.data) def __getstate__(self): if self.storage is not None: # flush any deferred operations _ = self.data state = self.__dict__.copy() # data pinning does not get serialized, so we need to remove that from # the state state["_data_nd"] = None state["pinned_by_dgl"] = False return state def __setstate__(self, state): index = None device = None if "storage" in state and state["storage"] is not None: assert "index" not in state or state["index"] is None assert "device" not in state or state["device"] is None else: # we may have a column with only index information, and that is # valid index = None if "index" not in state else state["index"] device = None if "device" not in state else state["device"] assert "deferred_dtype" not in state or state["deferred_dtype"] is None assert "pinned_by_dgl" not in state or state["pinned_by_dgl"] is False assert "_data_nd" not in state or state["_data_nd"] is None self.__dict__ = state # properly initialize this object self._init( self.scheme if hasattr(self, "scheme") else None, index=index, device=device, ) def _init(self, scheme=None, index=None, device=None, deferred_dtype=None): self.scheme = scheme if scheme else infer_scheme(self.storage) self.index = index self.device = device self.deferred_dtype = deferred_dtype self.pinned_by_dgl = False self._data_nd = None def __copy__(self): return self.clone() def fetch(self, indices, device, pin_memory=False, **kwargs): _ = self.data # materialize in case of lazy slicing & data transfer return super().fetch(indices, device, pin_memory=pin_memory, **kwargs) def pin_memory_(self): """Pin the storage into page-locked memory. Does nothing if the storage is already pinned. """ if not self.pinned_by_dgl and not F.is_pinned(self.data): self._data_nd = pin_memory_inplace(self.data) self.pinned_by_dgl = True def unpin_memory_(self): """Unpin the storage pinned by ``pin_memory_`` method. Does nothing if the storage is not pinned by ``pin_memory_`` method, even if it is actually in page-locked memory. """ if self.pinned_by_dgl: self._data_nd.unpin_memory_() self._data_nd = None self.pinned_by_dgl = False def record_stream(self, stream): """Record stream that is using the storage. Does nothing if the backend is not PyTorch. Parameters ---------- stream : torch.cuda.Stream. """ if F.get_preferred_backend() != "pytorch": raise DGLError("record_stream only supports the PyTorch backend.") if self.index is not None and ( isinstance(self.index, _LazyIndex) or F.context(self.index) != F.cpu() ): self.index.record_stream(stream) if F.context(self.storage) != F.cpu(): self.storage.record_stream(stream) class Frame(MutableMapping): """The columnar storage for node/edge features. The frame is a dictionary from feature names to feature columns. All columns should have the same number of rows (i.e. the same first dimension). Parameters ---------- data : dict-like, optional The frame data in dictionary. If the provided data is another frame, this frame will NOT share columns with the given frame. So any out-place update on one will not reflect to the other. num_rows : int, optional The number of rows in this frame. If ``data`` is provided and is not empty, ``num_rows`` will be ignored and inferred from the given data. """ def __init__(self, data=None, num_rows=None): if data is None: self._columns = dict() self._num_rows = 0 if num_rows is None else num_rows else: assert not isinstance(data, Frame) # sanity check for code refactor # Note that we always create a new column for the given data. # This avoids two frames accidentally sharing the same column. self._columns = { k: v if isinstance(v, LazyFeature) else Column.create(v) for k, v in data.items() } self._num_rows = num_rows # infer num_rows & sanity check for name, col in self._columns.items(): if isinstance(col, LazyFeature): continue if self._num_rows is None: self._num_rows = len(col) elif len(col) != self._num_rows: raise DGLError( "Expected all columns to have same # rows (%d), " "got %d on %r." % (self._num_rows, len(col), name) ) # Initializer for empty values. Initializer is a callable. # If is none, then a warning will be raised # in the first call and zero initializer will be used later. self._initializers = {} # per-column initializers self._default_initializer = None def _set_zero_default_initializer(self): """Set the default initializer to be zero initializer.""" self._default_initializer = zero_initializer def get_initializer(self, column=None): """Get the initializer for empty values for the given column. Parameters ---------- column : str The column Returns ------- callable The initializer """ return self._initializers.get(column, self._default_initializer) def set_initializer(self, initializer, column=None): """Set the initializer for empty values, for a given column or all future columns. Initializer is a callable that returns a tensor given the shape and data type. Parameters ---------- initializer : callable The initializer. column : str, optional The column name """ if column is None: self._default_initializer = initializer else: self._initializers[column] = initializer @property def schemes(self): """Return a dictionary of column name to column schemes.""" return {k: col.scheme for k, col in self._columns.items()} @property def num_columns(self): """Return the number of columns in this frame.""" return len(self._columns) @property def num_rows(self): """Return the number of rows in this frame.""" return self._num_rows def __contains__(self, name): """Return true if the given column name exists.""" return name in self._columns def __getitem__(self, name): """Return the column of the given name. Parameters ---------- name : str The column name. Returns ------- Tensor Column data. """ return self._columns[name].data def __setitem__(self, name, data): """Update the whole column. Parameters ---------- name : str The column name. col : Column or data convertible to Column The column data. """ self.update_column(name, data) def __delitem__(self, name): """Delete the whole column. Parameters ---------- name : str The column name. """ del self._columns[name] def add_column(self, name, scheme, ctx): """Add a new column to the frame. The frame will be initialized by the initializer. Parameters ---------- name : str The column name. scheme : Scheme The column scheme. ctx : DGLContext The column context. """ if name in self: dgl_warning( 'Column "%s" already exists. Ignore adding this column again.' % name ) return if self.get_initializer(name) is None: self._set_zero_default_initializer() initializer = self.get_initializer(name) init_data = initializer( (self.num_rows,) + scheme.shape, scheme.dtype, ctx, slice(0, self.num_rows), ) self._columns[name] = Column(init_data, scheme) def add_rows(self, num_rows): """Add blank rows to this frame. For existing fields, the rows will be extended according to their initializers. Parameters ---------- num_rows : int The number of new rows """ feat_placeholders = {} for key, col in self._columns.items(): scheme = col.scheme ctx = F.context(col.data) if self.get_initializer(key) is None: self._set_zero_default_initializer() initializer = self.get_initializer(key) new_data = initializer( (num_rows,) + scheme.shape, scheme.dtype, ctx, slice(self._num_rows, self._num_rows + num_rows), ) feat_placeholders[key] = new_data self._append(Frame(feat_placeholders)) self._num_rows += num_rows def update_column(self, name, data): """Add or replace the column with the given name and data. Parameters ---------- name : str The column name. data : Column or data convertible to Column The column data. """ if isinstance(data, LazyFeature): self._columns[name] = data return col = Column.create(data) if len(col) != self.num_rows: raise DGLError( "Expected data to have %d rows, got %d." % (self.num_rows, len(col)) ) self._columns[name] = col def update_row(self, rowids, data): """Update the feature data of the given rows. If the data contains new keys (new columns) that do not exist in this frame, add a new column. The ``rowids`` shall not contain duplicates. Otherwise, the behavior is undefined. Parameters ---------- rowids : Tensor Row Ids. data : dict[str, Tensor] Row data. """ for key, val in data.items(): if key not in self: scheme = infer_scheme(val) ctx = F.context(val) self.add_column(key, scheme, ctx) for key, val in data.items(): self._columns[key].update(rowids, val) def _append(self, other): """Append ``other`` frame to ``self`` frame.""" # pad columns that are not provided in the other frame with initial values for key, col in self._columns.items(): if key in other: continue scheme = col.scheme ctx = F.context(col.data) if self.get_initializer(key) is None: self._set_zero_default_initializer() initializer = self.get_initializer(key) new_data = initializer( (other.num_rows,) + scheme.shape, scheme.dtype, ctx, slice(self._num_rows, self._num_rows + other.num_rows), ) other[key] = new_data # append other to self for key, col in other._columns.items(): if key not in self._columns: # the column does not exist; init a new column self.add_column(key, col.scheme, F.context(col.data)) self._columns[key].extend(col.data, col.scheme) def append(self, other): """Append another frame's data into this frame. If the current frame is empty, it will just use the columns of the given frame. Otherwise, the given data should contain all the column keys of this frame. Parameters ---------- other : Frame or dict-like The frame data to be appended. """ if not isinstance(other, Frame): other = Frame(other) self._append(other) self._num_rows += other.num_rows def clear(self): """Clear this frame. Remove all the columns.""" self._columns = {} self._num_rows = 0 def __iter__(self): """Return an iterator of columns.""" return iter(self._columns) def __len__(self): """Return the number of columns.""" return self.num_columns def keys(self): """Return the keys.""" return self._columns.keys() def values(self): """Return the values.""" return self._columns.values() def clone(self): """Return a clone of this frame. The clone frame does not share the underlying storage with this frame, i.e., adding or removing columns will not be visible to each other. However, they still share the tensor contents so any mutable operation on the column tensor are visible to each other. Hence, the function does not allocate extra tensor memory. Use :func:`~dgl.Frame.deepclone` for cloning a frame that does not share any data. Returns ------- Frame A cloned frame. """ newframe = Frame(self._columns, self._num_rows) newframe._initializers = self._initializers newframe._default_initializer = self._default_initializer return newframe def deepclone(self): """Return a deep clone of this frame. The clone frame has an copy of this frame and any modification to the clone frame is not visible to this frame. The function allocate new tensors and copy the contents from this frame. Use :func:`~dgl.Frame.clone` for cloning a frame that does not allocate extra tensor memory. Returns ------- Frame A deep-cloned frame. """ newframe = Frame( {k: col.deepclone() for k, col in self._columns.items()}, self._num_rows, ) newframe._initializers = self._initializers newframe._default_initializer = self._default_initializer return newframe def subframe(self, rowids): """Return a new frame whose columns are subcolumns of this frame. The given row IDs should be within range [0, self.num_rows), and allow duplicate IDs. Parameters ---------- rowids : Tensor Row IDs Returns ------- Frame A new subframe. """ subcols = {k: col.subcolumn(rowids) for k, col in self._columns.items()} subf = Frame(subcols, len(rowids)) subf._initializers = self._initializers subf._default_initializer = self._default_initializer return subf def to(self, device, **kwargs): # pylint: disable=invalid-name """Return a new frame with columns copy to the targeted device (cpu/gpu). Parameters ---------- device : Framework-specific device context object The context to move data to. kwargs : Key-word arguments. Key-word arguments fed to the framework copy function. Returns ------- Frame A new frame """ newframe = self.clone() new_columns = { key: col.to(device, **kwargs) for key, col in newframe._columns.items() } newframe._columns = new_columns return newframe def __repr__(self): return repr(dict(self)) def pin_memory_(self): """Registers the data of every column into pinned memory, materializing them if necessary.""" for column in self._columns.values(): column.pin_memory_() def unpin_memory_(self): """Unregisters the data of every column from pinned memory, materializing them if necessary.""" for column in self._columns.values(): column.unpin_memory_() def record_stream(self, stream): """Record stream that is using the data of every column, materializing them if necessary.""" for column in self._columns.values(): column.record_stream(stream) def _astype_float(self, new_type): assert new_type in [ F.float64, F.float32, F.float16, F.bfloat16, ], "'new_type' must be floating-point type: %s" % str(new_type) newframe = self.clone() new_columns = {} for name, column in self._columns.items(): dtype = column.dtype if dtype != new_type and dtype in [ F.float64, F.float32, F.float16, F.bfloat16, ]: new_columns[name] = column.astype(new_type) else: new_columns[name] = column newframe._columns = new_columns return newframe def bfloat16(self): """Return a new frame with all floating-point columns converted to bfloat16""" return self._astype_float(F.bfloat16) def half(self): """Return a new frame with all floating-point columns converted to half-precision (float16)""" return self._astype_float(F.float16) def float(self): """Return a new frame with all floating-point columns converted to single-precision (float32)""" return self._astype_float(F.float32) def double(self): """Return a new frame with all floating-point columns converted to double-precision (float64)""" return self._astype_float(F.float64)