"""GPU cached feature for GraphBolt."""
import torch
from ..feature_store import Feature
from .gpu_cache import GPUCache
__all__ = ["GPUCachedFeature"]
[docs]class GPUCachedFeature(Feature):
r"""GPU cached feature wrapping a fallback feature.
Places the GPU cache to torch.cuda.current_device().
Parameters
----------
fallback_feature : Feature
The fallback feature.
cache_size : int
The capacity of the GPU cache, the number of features to store.
Examples
--------
>>> import torch
>>> from dgl import graphbolt as gb
>>> torch_feat = torch.arange(10).reshape(2, -1).to("cuda")
>>> cache_size = 5
>>> fallback_feature = gb.TorchBasedFeature(torch_feat)
>>> feature = gb.GPUCachedFeature(fallback_feature, cache_size)
>>> feature.read()
tensor([[0, 1, 2, 3, 4],
[5, 6, 7, 8, 9]], device='cuda:0')
>>> feature.read(torch.tensor([0]).to("cuda"))
tensor([[0, 1, 2, 3, 4]], device='cuda:0')
>>> feature.update(torch.tensor([[1 for _ in range(5)]]).to("cuda"),
... torch.tensor([1]).to("cuda"))
>>> feature.read(torch.tensor([0, 1]).to("cuda"))
tensor([[0, 1, 2, 3, 4],
[1, 1, 1, 1, 1]], device='cuda:0')
>>> feature.size()
torch.Size([5])
"""
def __init__(self, fallback_feature: Feature, cache_size: int):
super(GPUCachedFeature, self).__init__()
assert isinstance(fallback_feature, Feature), (
f"The fallback_feature must be an instance of Feature, but got "
f"{type(fallback_feature)}."
)
self._fallback_feature = fallback_feature
self.cache_size = cache_size
# Fetching the feature dimension from the underlying feature.
feat0 = fallback_feature.read(torch.tensor([0]))
self._feature = GPUCache((cache_size,) + feat0.shape[1:], feat0.dtype)
[docs] def read(self, ids: torch.Tensor = None):
"""Read the feature by index.
The returned tensor is always in GPU memory, no matter whether the
fallback feature is in memory or on disk.
Parameters
----------
ids : torch.Tensor, optional
The index of the feature. If specified, only the specified indices
of the feature are read. If None, the entire feature is returned.
Returns
-------
torch.Tensor
The read feature.
"""
if ids is None:
return self._fallback_feature.read()
values, missing_index, missing_keys = self._feature.query(ids)
missing_values = self._fallback_feature.read(missing_keys).to("cuda")
values[missing_index] = missing_values
self._feature.replace(missing_keys, missing_values)
return values
[docs] def size(self):
"""Get the size of the feature.
Returns
-------
torch.Size
The size of the feature.
"""
return self._fallback_feature.size()
[docs] def update(self, value: torch.Tensor, ids: torch.Tensor = None):
"""Update the feature.
Parameters
----------
value : torch.Tensor
The updated value of the feature.
ids : torch.Tensor, optional
The indices of the feature to update. If specified, only the
specified indices of the feature will be updated. For the feature,
the `ids[i]` row is updated to `value[i]`. So the indices and value
must have the same length. If None, the entire feature will be
updated.
"""
if ids is None:
self._fallback_feature.update(value)
size = min(self.cache_size, value.shape[0])
self._feature.replace(
torch.arange(0, size, device="cuda"),
value[:size].to("cuda"),
)
else:
self._fallback_feature.update(value, ids)
self._feature.replace(ids, value)