"""Columnar storage for DGLGraph."""from__future__importabsolute_importfromcollectionsimportnamedtuplefromcollections.abcimportMutableMappingfrom.importbackendasFfrom.baseimportdgl_warning,DGLErrorfrom.initimportzero_initializerfrom.storagesimportTensorStoragefrom.utilsimportgather_pinned_tensor_rows,pin_memory_inplaceclass_LazyIndex(object):def__init__(self,index):ifisinstance(index,list):self._indices=indexelse:self._indices=[index]def__len__(self):returnlen(self._indices[-1])defslice(self,index):"""Create a new _LazyIndex object sliced by the given index tensor."""# if our indices are in the same context, lets just slice now and free# memory, otherwise do nothing until we have toifF.context(self._indices[-1])==F.context(index):return_LazyIndex(self._indices[:-1]+[F.gather_row(self._indices[-1],index)])return_LazyIndex(self._indices+[index])defflatten(self):"""Evaluate the chain of indices, and return a single index tensor."""flat_index=self._indices[0]# here we actually need to resolve itforindexinself._indices[1:]:ifF.context(index)!=F.context(flat_index):index=F.copy_to(index,F.context(flat_index))flat_index=F.gather_row(flat_index,index)returnflat_indexdefrecord_stream(self,stream):"""Record stream for index. Parameters ---------- stream : torch.cuda.Stream. """forindexinself._indices:ifF.context(index)!=F.cpu():index.record_stream(stream)
[docs]classLazyFeature(object):"""Placeholder for feature prefetching. One can assign this object to ``ndata`` or ``edata`` of the graphs returned by various samplers' :attr:`sample` method. When DGL's dataloader receives the subgraphs returned by the sampler, it will automatically look up all the ``ndata`` and ``edata`` whose data is a LazyFeature, replacing them with the actual data of the corresponding nodes/edges from the original graph instead. In particular, for a subgraph returned by the sampler has a LazyFeature with name ``k`` in ``subgraph.ndata[key]``: .. code:: python subgraph.ndata[key] = LazyFeature(k) Assuming that ``graph`` is the original graph, DGL's dataloader will perform .. code:: python subgraph.ndata[key] = graph.ndata[k][subgraph.ndata[dgl.NID]] DGL dataloader performs similar replacement for ``edata``. For heterogeneous graphs, the replacement is: .. code:: python subgraph.nodes[ntype].data[key] = graph.nodes[ntype].data[k][ subgraph.nodes[ntype].data[dgl.NID]] For MFGs' ``srcdata`` (and similarly ``dstdata``), the replacement is .. code:: python mfg.srcdata[key] = graph.ndata[k][mfg.srcdata[dgl.NID]] Parameters ---------- name : str The name of the data in the original graph. id_ : Tensor, optional The ID tensor. """__slots__=["name","id_"]def__init__(self,name=None,id_=None):self.name=nameself.id_=id_defto(self,*args,**kwargs):# pylint: disable=invalid-name, unused-argument"""No-op. For compatibility of :meth:`Frame.to` method."""returnself@propertydefdata(self):"""No-op. For compatibility of :meth:`Frame.__repr__` method."""returnselfdefpin_memory_(self):"""No-op. For compatibility of :meth:`Frame.pin_memory_` method."""defunpin_memory_(self):"""No-op. For compatibility of :meth:`Frame.unpin_memory_` method."""defrecord_stream(self,stream):"""No-op. For compatibility of :meth:`Frame.record_stream` method."""
classScheme(namedtuple("Scheme",["shape","dtype"])):"""The column scheme. Parameters ---------- shape : tuple of int The feature shape. dtype : backend-specific type object The feature data type. """# Pickling torch dtypes could be problemetic; this is a workaround.# I also have to create data_type_dict and reverse_data_type_dict# attribute just for this bug.# I raised an issue in PyTorch bug tracker:# https://github.com/pytorch/pytorch/issues/14057def__reduce__(self):state=(self.shape,F.reverse_data_type_dict[self.dtype])returnself._reconstruct_scheme,state@classmethoddef_reconstruct_scheme(cls,shape,dtype_str):dtype=F.data_type_dict[dtype_str]returncls(shape,dtype)definfer_scheme(tensor):"""Infer column scheme from the given tensor data. Parameters --------- tensor : Tensor The tensor data. Returns ------- Scheme The column scheme. """returnScheme(tuple(F.shape(tensor)[1:]),F.dtype(tensor))classColumn(TensorStorage):"""A column is a compact store of features of multiple nodes/edges. It batches all the feature tensors together along the first dimension as one dense tensor. The column can optionally have an index tensor I. In this case, the i^th feature is stored in ``storage[index[i]]``. The column class implements a Copy-On-Read semantics -- the index select operation happens upon the first read of the feature data. This is useful when one extracts a subset of the feature data but wishes the actual index select happens on-demand. Parameters ---------- storage : Tensor The feature data storage. scheme : Scheme, optional The scheme of the column. Will be inferred if not provided. index : Tensor, optional The row index to the feature data storage. None means an identity mapping. Attributes ---------- storage : Tensor The storage tensor. The storage tensor may not be the actual data tensor of this column when the index tensor is not None. This typically happens when the column is extracted from another column using the `subcolumn` method. It can also be None, which may only happen when transmitting a not-yet-materialized subcolumn from a subprocess to the main process. In this case, the main process should already maintain the content of the storage, and is responsible for restoring the subcolumn's storage pointer. data : Tensor The actual data tensor of this column. scheme : Scheme The scheme of the column. index : Tensor Index tensor """def__init__(self,storage,*args,**kwargs):super().__init__(storage)self._init(*args,**kwargs)def__len__(self):"""The number of features (number of rows) in this column."""ifself.indexisNone:returnF.shape(self.storage)[0]else:returnlen(self.index)@propertydefshape(self):"""Return the scheme shape (feature shape) of this column."""returnself.scheme.shape@propertydefdata(self):"""Return the feature data. Perform index selecting if needed."""ifself.indexisnotNone:ifisinstance(self.index,_LazyIndex):self.index=self.index.flatten()storage_ctx=F.context(self.storage)index_ctx=F.context(self.index)# If under the special case where the storage is pinned and the index is on# CUDA, directly call UVA slicing (even if they aree not in the same context).if(storage_ctx!=index_ctxandstorage_ctx==F.cpu()andF.is_pinned(self.storage)):self.storage=gather_pinned_tensor_rows(self.storage,self.index)else:# If index and storage is not in the same context,# copy index to the same context of storage.# Copy index is usually cheaper than copy dataifstorage_ctx!=index_ctx:kwargs={}ifself.deviceisnotNone:kwargs=self.device[1]self.index=F.copy_to(self.index,storage_ctx,**kwargs)self.storage=F.gather_row(self.storage,self.index)self.index=None# move data to the right deviceifself.deviceisnotNone:self.storage=F.copy_to(self.storage,self.device[0],**self.device[1])self.device=None# convert data to the right typeifself.deferred_dtypeisnotNone:self.storage=F.astype(self.storage,self.deferred_dtype)self.deferred_dtype=Nonereturnself.storage@data.setterdefdata(self,val):"""Update the column data."""self.index=Noneself.device=Noneself.deferred_dtype=Noneself.storage=valself._data_nd=None# should unpin data if it was pinned.self.pinned_by_dgl=Falsedefto(self,device,**kwargs):# pylint: disable=invalid-name"""Return a new column with columns copy to the targeted device (cpu/gpu). Parameters ---------- device : Framework-specific device context object The context to move data to. kwargs : Key-word arguments. Key-word arguments fed to the framework copy function. Returns ------- Column A new column """col=self.clone()col.device=(device,kwargs)returncol@propertydefdtype(self):"""Return the effective data type of this Column"""ifself.deferred_dtypeisnotNone:returnself.deferred_dtypereturnself.storage.dtypedefastype(self,new_dtype):"""Return a new column such that when its data is requested, it will be converted to new_dtype. Parameters ---------- new_dtype : Framework-specific type object The type to convert the data to. Returns ------- Column A new column """col=self.clone()ifcol.dtype!=new_dtype:# If there is already a pending conversion, ensure that the pending# conversion and transfer/sampling are done before this new conversion.ifcol.deferred_dtypeisnotNone:_=col.dataif(col.deviceisNone)and(col.indexisNone):# Do the conversion immediately if no device transfer or index# sampling is pending. The assumption is that this is most# likely to be the desired behaviour, such as converting an# entire graph's feature data to float16 (half) before transfer# to device when training, or converting back to float32 (float)# after fetching the data to a device.col.storage=F.astype(col.storage,new_dtype)else:# Defer the conversion if there is a pending transfer or sampling.# This is so that feature data that never gets accessed on the# device never needs to be transferred or sampled or converted.col.deferred_dtype=new_dtypereturncoldef__getitem__(self,rowids):"""Return the feature data given the rowids. The operation triggers index selection. Parameters ---------- rowids : Tensor Row ID tensor. Returns ------- Tensor The feature data """returnF.gather_row(self.data,rowids)def__setitem__(self,rowids,feats):"""Update the feature data given the index. The update is performed out-placely so it can be used in autograd mode. The operation triggers index selection. Parameters ---------- rowids : Tensor Row IDs. feats : Tensor New features. """self.update(rowids,feats)defupdate(self,rowids,feats):"""Update the feature data given the index. Parameters ---------- rowids : Tensor Row IDs. feats : Tensor New features. """feat_scheme=infer_scheme(feats)iffeat_scheme!=self.scheme:raiseDGLError("Cannot update column of scheme %s using feature of scheme %s."%(feat_scheme,self.scheme))self.data=F.scatter_row(self.data,rowids,feats)defextend(self,feats,feat_scheme=None):"""Extend the feature data. The operation triggers index selection. Parameters ---------- feats : Tensor The new features. feat_scheme : Scheme, optional The scheme """iffeat_schemeisNone:feat_scheme=infer_scheme(feats)iffeat_scheme!=self.scheme:raiseDGLError("Cannot update column of scheme %s using feature of scheme %s."%(feat_scheme,self.scheme))self.data=F.cat([self.data,feats],dim=0)defclone(self):"""Return a shallow copy of this column."""returnColumn(self.storage,self.scheme,self.index,self.device,self.deferred_dtype,)defdeepclone(self):"""Return a deepcopy of this column. The operation triggers index selection. """returnColumn(F.clone(self.data),copy.deepcopy(self.scheme))defsubcolumn(self,rowids):"""Return a subcolumn. The resulting column will share the same storage as this column so this operation is quite efficient. If the current column is also a sub-column (i.e., the index tensor is not None), the current index tensor will be sliced by 'rowids', if they are on the same context. Otherwise, both index tensors are saved, and only applied when the data is accessed. Parameters ---------- rowids : Tensor Row IDs. Returns ------- Column Sub-column """ifself.indexisNone:returnColumn(self.storage,self.scheme,rowids,self.device,self.deferred_dtype,)else:index=self.indexifnotisinstance(index,_LazyIndex):index=_LazyIndex(self.index)index=index.slice(rowids)returnColumn(self.storage,self.scheme,index,self.device,self.deferred_dtype,)@staticmethoddefcreate(data):"""Create a new column using the given data."""ifisinstance(data,Column):returndata.clone()else:returnColumn(data)def__repr__(self):returnrepr(self.data)def__getstate__(self):ifself.storageisnotNone:# flush any deferred operations_=self.datastate=self.__dict__.copy()# data pinning does not get serialized, so we need to remove that from# the statestate["_data_nd"]=Nonestate["pinned_by_dgl"]=Falsereturnstatedef__setstate__(self,state):index=Nonedevice=Noneif"storage"instateandstate["storage"]isnotNone:assert"index"notinstateorstate["index"]isNoneassert"device"notinstateorstate["device"]isNoneelse:# we may have a column with only index information, and that is# validindex=Noneif"index"notinstateelsestate["index"]device=Noneif"device"notinstateelsestate["device"]assert"deferred_dtype"notinstateorstate["deferred_dtype"]isNoneassert"pinned_by_dgl"notinstateorstate["pinned_by_dgl"]isFalseassert"_data_nd"notinstateorstate["_data_nd"]isNoneself.__dict__=state# properly initialize this objectself._init(self.schemeifhasattr(self,"scheme")elseNone,index=index,device=device,)def_init(self,scheme=None,index=None,device=None,deferred_dtype=None):self.scheme=schemeifschemeelseinfer_scheme(self.storage)self.index=indexself.device=deviceself.deferred_dtype=deferred_dtypeself.pinned_by_dgl=Falseself._data_nd=Nonedef__copy__(self):returnself.clone()deffetch(self,indices,device,pin_memory=False,**kwargs):_=self.data# materialize in case of lazy slicing & data transferreturnsuper().fetch(indices,device,pin_memory=pin_memory,**kwargs)defpin_memory_(self):"""Pin the storage into page-locked memory. Does nothing if the storage is already pinned. """ifnotself.pinned_by_dglandnotF.is_pinned(self.data):self._data_nd=pin_memory_inplace(self.data)self.pinned_by_dgl=Truedefunpin_memory_(self):"""Unpin the storage pinned by ``pin_memory_`` method. Does nothing if the storage is not pinned by ``pin_memory_`` method, even if it is actually in page-locked memory. """ifself.pinned_by_dgl:self._data_nd.unpin_memory_()self._data_nd=Noneself.pinned_by_dgl=Falsedefrecord_stream(self,stream):"""Record stream that is using the storage. Does nothing if the backend is not PyTorch. Parameters ---------- stream : torch.cuda.Stream. """ifF.get_preferred_backend()!="pytorch":raiseDGLError("record_stream only supports the PyTorch backend.")ifself.indexisnotNoneand(isinstance(self.index,_LazyIndex)orF.context(self.index)!=F.cpu()):self.index.record_stream(stream)ifF.context(self.storage)!=F.cpu():self.storage.record_stream(stream)classFrame(MutableMapping):"""The columnar storage for node/edge features. The frame is a dictionary from feature names to feature columns. All columns should have the same number of rows (i.e. the same first dimension). Parameters ---------- data : dict-like, optional The frame data in dictionary. If the provided data is another frame, this frame will NOT share columns with the given frame. So any out-place update on one will not reflect to the other. num_rows : int, optional The number of rows in this frame. If ``data`` is provided and is not empty, ``num_rows`` will be ignored and inferred from the given data. """def__init__(self,data=None,num_rows=None):ifdataisNone:self._columns=dict()self._num_rows=0ifnum_rowsisNoneelsenum_rowselse:assertnotisinstance(data,Frame)# sanity check for code refactor# Note that we always create a new column for the given data.# This avoids two frames accidentally sharing the same column.self._columns={k:vifisinstance(v,LazyFeature)elseColumn.create(v)fork,vindata.items()}self._num_rows=num_rows# infer num_rows & sanity checkforname,colinself._columns.items():ifisinstance(col,LazyFeature):continueifself._num_rowsisNone:self._num_rows=len(col)eliflen(col)!=self._num_rows:raiseDGLError("Expected all columns to have same # rows (%d), ""got %d on %r."%(self._num_rows,len(col),name))# Initializer for empty values. Initializer is a callable.# If is none, then a warning will be raised# in the first call and zero initializer will be used later.self._initializers={}# per-column initializersself._default_initializer=Nonedef_set_zero_default_initializer(self):"""Set the default initializer to be zero initializer."""self._default_initializer=zero_initializerdefget_initializer(self,column=None):"""Get the initializer for empty values for the given column. Parameters ---------- column : str The column Returns ------- callable The initializer """returnself._initializers.get(column,self._default_initializer)defset_initializer(self,initializer,column=None):"""Set the initializer for empty values, for a given column or all future columns. Initializer is a callable that returns a tensor given the shape and data type. Parameters ---------- initializer : callable The initializer. column : str, optional The column name """ifcolumnisNone:self._default_initializer=initializerelse:self._initializers[column]=initializer@propertydefschemes(self):"""Return a dictionary of column name to column schemes."""return{k:col.schemefork,colinself._columns.items()}@propertydefnum_columns(self):"""Return the number of columns in this frame."""returnlen(self._columns)@propertydefnum_rows(self):"""Return the number of rows in this frame."""returnself._num_rowsdef__contains__(self,name):"""Return true if the given column name exists."""returnnameinself._columnsdef__getitem__(self,name):"""Return the column of the given name. Parameters ---------- name : str The column name. Returns ------- Tensor Column data. """returnself._columns[name].datadef__setitem__(self,name,data):"""Update the whole column. Parameters ---------- name : str The column name. col : Column or data convertible to Column The column data. """self.update_column(name,data)def__delitem__(self,name):"""Delete the whole column. Parameters ---------- name : str The column name. """delself._columns[name]defadd_column(self,name,scheme,ctx):"""Add a new column to the frame. The frame will be initialized by the initializer. Parameters ---------- name : str The column name. scheme : Scheme The column scheme. ctx : DGLContext The column context. """ifnameinself:dgl_warning('Column "%s" already exists. Ignore adding this column again.'%name)returnifself.get_initializer(name)isNone:self._set_zero_default_initializer()initializer=self.get_initializer(name)init_data=initializer((self.num_rows,)+scheme.shape,scheme.dtype,ctx,slice(0,self.num_rows),)self._columns[name]=Column(init_data,scheme)defadd_rows(self,num_rows):"""Add blank rows to this frame. For existing fields, the rows will be extended according to their initializers. Parameters ---------- num_rows : int The number of new rows """feat_placeholders={}forkey,colinself._columns.items():scheme=col.schemectx=F.context(col.data)ifself.get_initializer(key)isNone:self._set_zero_default_initializer()initializer=self.get_initializer(key)new_data=initializer((num_rows,)+scheme.shape,scheme.dtype,ctx,slice(self._num_rows,self._num_rows+num_rows),)feat_placeholders[key]=new_dataself._append(Frame(feat_placeholders))self._num_rows+=num_rowsdefupdate_column(self,name,data):"""Add or replace the column with the given name and data. Parameters ---------- name : str The column name. data : Column or data convertible to Column The column data. """ifisinstance(data,LazyFeature):self._columns[name]=datareturncol=Column.create(data)iflen(col)!=self.num_rows:raiseDGLError("Expected data to have %d rows, got %d."%(self.num_rows,len(col)))self._columns[name]=coldefupdate_row(self,rowids,data):"""Update the feature data of the given rows. If the data contains new keys (new columns) that do not exist in this frame, add a new column. The ``rowids`` shall not contain duplicates. Otherwise, the behavior is undefined. Parameters ---------- rowids : Tensor Row Ids. data : dict[str, Tensor] Row data. """forkey,valindata.items():ifkeynotinself:scheme=infer_scheme(val)ctx=F.context(val)self.add_column(key,scheme,ctx)forkey,valindata.items():self._columns[key].update(rowids,val)def_append(self,other):"""Append ``other`` frame to ``self`` frame."""# pad columns that are not provided in the other frame with initial valuesforkey,colinself._columns.items():ifkeyinother:continuescheme=col.schemectx=F.context(col.data)ifself.get_initializer(key)isNone:self._set_zero_default_initializer()initializer=self.get_initializer(key)new_data=initializer((other.num_rows,)+scheme.shape,scheme.dtype,ctx,slice(self._num_rows,self._num_rows+other.num_rows),)other[key]=new_data# append other to selfforkey,colinother._columns.items():ifkeynotinself._columns:# the column does not exist; init a new columnself.add_column(key,col.scheme,F.context(col.data))self._columns[key].extend(col.data,col.scheme)defappend(self,other):"""Append another frame's data into this frame. If the current frame is empty, it will just use the columns of the given frame. Otherwise, the given data should contain all the column keys of this frame. Parameters ---------- other : Frame or dict-like The frame data to be appended. """ifnotisinstance(other,Frame):other=Frame(other)self._append(other)self._num_rows+=other.num_rowsdefclear(self):"""Clear this frame. Remove all the columns."""self._columns={}self._num_rows=0def__iter__(self):"""Return an iterator of columns."""returniter(self._columns)def__len__(self):"""Return the number of columns."""returnself.num_columnsdefkeys(self):"""Return the keys."""returnself._columns.keys()defvalues(self):"""Return the values."""returnself._columns.values()defclone(self):"""Return a clone of this frame. The clone frame does not share the underlying storage with this frame, i.e., adding or removing columns will not be visible to each other. However, they still share the tensor contents so any mutable operation on the column tensor are visible to each other. Hence, the function does not allocate extra tensor memory. Use :func:`~dgl.Frame.deepclone` for cloning a frame that does not share any data. Returns ------- Frame A cloned frame. """newframe=Frame(self._columns,self._num_rows)newframe._initializers=self._initializersnewframe._default_initializer=self._default_initializerreturnnewframedefdeepclone(self):"""Return a deep clone of this frame. The clone frame has an copy of this frame and any modification to the clone frame is not visible to this frame. The function allocate new tensors and copy the contents from this frame. Use :func:`~dgl.Frame.clone` for cloning a frame that does not allocate extra tensor memory. Returns ------- Frame A deep-cloned frame. """newframe=Frame({k:col.deepclone()fork,colinself._columns.items()},self._num_rows,)newframe._initializers=self._initializersnewframe._default_initializer=self._default_initializerreturnnewframedefsubframe(self,rowids):"""Return a new frame whose columns are subcolumns of this frame. The given row IDs should be within range [0, self.num_rows), and allow duplicate IDs. Parameters ---------- rowids : Tensor Row IDs Returns ------- Frame A new subframe. """subcols={k:col.subcolumn(rowids)fork,colinself._columns.items()}subf=Frame(subcols,len(rowids))subf._initializers=self._initializerssubf._default_initializer=self._default_initializerreturnsubfdefto(self,device,**kwargs):# pylint: disable=invalid-name"""Return a new frame with columns copy to the targeted device (cpu/gpu). Parameters ---------- device : Framework-specific device context object The context to move data to. kwargs : Key-word arguments. Key-word arguments fed to the framework copy function. Returns ------- Frame A new frame """newframe=self.clone()new_columns={key:col.to(device,**kwargs)forkey,colinnewframe._columns.items()}newframe._columns=new_columnsreturnnewframedef__repr__(self):returnrepr(dict(self))defpin_memory_(self):"""Registers the data of every column into pinned memory, materializing them if necessary."""forcolumninself._columns.values():column.pin_memory_()defunpin_memory_(self):"""Unregisters the data of every column from pinned memory, materializing them if necessary."""forcolumninself._columns.values():column.unpin_memory_()defrecord_stream(self,stream):"""Record stream that is using the data of every column, materializing them if necessary."""forcolumninself._columns.values():column.record_stream(stream)def_astype_float(self,new_type):assertnew_typein[F.float64,F.float32,F.float16,F.bfloat16,],"'new_type' must be floating-point type: %s"%str(new_type)newframe=self.clone()new_columns={}forname,columninself._columns.items():dtype=column.dtypeifdtype!=new_typeanddtypein[F.float64,F.float32,F.float16,F.bfloat16,]:new_columns[name]=column.astype(new_type)else:new_columns[name]=columnnewframe._columns=new_columnsreturnnewframedefbfloat16(self):"""Return a new frame with all floating-point columns converted to bfloat16"""returnself._astype_float(F.bfloat16)defhalf(self):"""Return a new frame with all floating-point columns converted to half-precision (float16)"""returnself._astype_float(F.float16)deffloat(self):"""Return a new frame with all floating-point columns converted to single-precision (float32)"""returnself._astype_float(F.float32)defdouble(self):"""Return a new frame with all floating-point columns converted to double-precision (float64)"""returnself._astype_float(F.float64)