"""GraphBolt OnDiskDataset."""
import bisect
import json
import os
import shutil
import textwrap
from copy import deepcopy
from typing import Dict, List, Union
import numpy as np
import torch
import yaml
from ...base import dgl_warning
from ...data.utils import download, extract_archive
from ..base import etype_str_to_tuple, ORIGINAL_EDGE_ID
from ..dataset import Dataset, Task
from ..internal import (
calculate_dir_hash,
check_dataset_change,
copy_or_convert_data,
get_attributes,
read_data,
read_edges,
)
from ..itemset import ItemSet, ItemSetDict
from ..sampling_graph import SamplingGraph
from .fused_csc_sampling_graph import (
fused_csc_sampling_graph,
FusedCSCSamplingGraph,
)
from .ondisk_metadata import (
OnDiskGraphTopology,
OnDiskMetaData,
OnDiskTaskData,
OnDiskTVTSet,
)
from .torch_based_feature_store import TorchBasedFeatureStore
__all__ = ["OnDiskDataset", "preprocess_ondisk_dataset", "BuiltinDataset"]
NAMES_INDICATING_NODE_IDS = [
"seed_nodes",
"node_pairs",
"seeds",
"negative_srcs",
"negative_dsts",
]
def _graph_data_to_fused_csc_sampling_graph(
dataset_dir: str,
graph_data: Dict,
include_original_edge_id: bool,
auto_cast_to_optimal_dtype: bool,
) -> FusedCSCSamplingGraph:
"""Convert the raw graph data into FusedCSCSamplingGraph.
Parameters
----------
dataset_dir : str
The path to the dataset directory.
graph_data : Dict
The raw data read from yaml file.
include_original_edge_id : bool
Whether to include the original edge id in the FusedCSCSamplingGraph.
auto_cast_to_optimal_dtype: bool, optional
Casts the dtypes of tensors in the dataset into smallest possible dtypes
for reduced storage requirements and potentially increased performance.
Returns
-------
sampling_graph : FusedCSCSamplingGraph
The FusedCSCSamplingGraph constructed from the raw data.
"""
from ...sparse import spmatrix
is_homogeneous = (
len(graph_data["nodes"]) == 1
and len(graph_data["edges"]) == 1
and "type" not in graph_data["nodes"][0]
and "type" not in graph_data["edges"][0]
)
if is_homogeneous:
# Homogeneous graph.
edge_fmt = graph_data["edges"][0]["format"]
edge_path = graph_data["edges"][0]["path"]
src, dst = read_edges(dataset_dir, edge_fmt, edge_path)
num_nodes = graph_data["nodes"][0]["num"]
num_edges = len(src)
coo_tensor = torch.tensor(np.array([src, dst]))
sparse_matrix = spmatrix(coo_tensor, shape=(num_nodes, num_nodes))
del coo_tensor
indptr, indices, edge_ids = sparse_matrix.csc()
del sparse_matrix
if auto_cast_to_optimal_dtype:
if num_nodes <= torch.iinfo(torch.int32).max:
indices = indices.to(torch.int32)
if num_edges <= torch.iinfo(torch.int32).max:
indptr = indptr.to(torch.int32)
edge_ids = edge_ids.to(torch.int32)
node_type_offset = None
type_per_edge = None
node_type_to_id = None
edge_type_to_id = None
node_attributes = {}
edge_attributes = {}
if include_original_edge_id:
edge_attributes[ORIGINAL_EDGE_ID] = edge_ids
else:
# Heterogeneous graph.
# Sort graph_data by ntype/etype lexicographically to ensure ordering.
graph_data["nodes"].sort(key=lambda x: x["type"])
graph_data["edges"].sort(key=lambda x: x["type"])
# Construct node_type_offset and node_type_to_id.
node_type_offset = [0]
node_type_to_id = {}
for ntype_id, node_info in enumerate(graph_data["nodes"]):
node_type_to_id[node_info["type"]] = ntype_id
node_type_offset.append(node_type_offset[-1] + node_info["num"])
total_num_nodes = node_type_offset[-1]
# Construct edge_type_offset, edge_type_to_id and coo_tensor.
edge_type_offset = [0]
edge_type_to_id = {}
coo_src_list = []
coo_dst_list = []
coo_etype_list = []
for etype_id, edge_info in enumerate(graph_data["edges"]):
edge_type_to_id[edge_info["type"]] = etype_id
edge_fmt = edge_info["format"]
edge_path = edge_info["path"]
src, dst = read_edges(dataset_dir, edge_fmt, edge_path)
edge_type_offset.append(edge_type_offset[-1] + len(src))
src_type, _, dst_type = etype_str_to_tuple(edge_info["type"])
src += node_type_offset[node_type_to_id[src_type]]
dst += node_type_offset[node_type_to_id[dst_type]]
coo_src_list.append(torch.tensor(src))
coo_dst_list.append(torch.tensor(dst))
coo_etype_list.append(torch.full((len(src),), etype_id))
total_num_edges = edge_type_offset[-1]
coo_src = torch.cat(coo_src_list)
del coo_src_list
coo_dst = torch.cat(coo_dst_list)
del coo_dst_list
if auto_cast_to_optimal_dtype:
dtypes = [torch.uint8, torch.int16, torch.int32, torch.int64]
dtype_maxes = [torch.iinfo(dtype).max for dtype in dtypes]
dtype_id = bisect.bisect_left(dtype_maxes, len(edge_type_to_id) - 1)
etype_dtype = dtypes[dtype_id]
coo_etype_list = [
tensor.to(etype_dtype) for tensor in coo_etype_list
]
coo_etype = torch.cat(coo_etype_list)
del coo_etype_list
sparse_matrix = spmatrix(
indices=torch.stack((coo_src, coo_dst), dim=0),
shape=(total_num_nodes, total_num_nodes),
)
del coo_src, coo_dst
indptr, indices, edge_ids = sparse_matrix.csc()
del sparse_matrix
if auto_cast_to_optimal_dtype:
if total_num_nodes <= torch.iinfo(torch.int32).max:
indices = indices.to(torch.int32)
if total_num_edges <= torch.iinfo(torch.int32).max:
indptr = indptr.to(torch.int32)
edge_ids = edge_ids.to(torch.int32)
node_type_offset = torch.tensor(node_type_offset, dtype=indices.dtype)
type_per_edge = torch.index_select(coo_etype, dim=0, index=edge_ids)
del coo_etype
node_attributes = {}
edge_attributes = {}
if include_original_edge_id:
# If uint8 or int16 was chosen above for etypes, we cast to int.
temp_etypes = (
type_per_edge.int()
if type_per_edge.element_size() < 4
else type_per_edge
)
edge_ids -= torch.index_select(
torch.tensor(edge_type_offset, dtype=edge_ids.dtype),
dim=0,
index=temp_etypes,
)
del temp_etypes
edge_attributes[ORIGINAL_EDGE_ID] = edge_ids
# Load the sampling related node/edge features and add them to
# the sampling-graph.
if graph_data.get("feature_data", None):
if is_homogeneous:
# Homogeneous graph.
for graph_feature in graph_data["feature_data"]:
in_memory = (
True
if "in_memory" not in graph_feature
else graph_feature["in_memory"]
)
if graph_feature["domain"] == "node":
node_data = read_data(
os.path.join(dataset_dir, graph_feature["path"]),
graph_feature["format"],
in_memory=in_memory,
)
assert node_data.shape[0] == num_nodes
node_attributes[graph_feature["name"]] = node_data
elif graph_feature["domain"] == "edge":
edge_data = read_data(
os.path.join(dataset_dir, graph_feature["path"]),
graph_feature["format"],
in_memory=in_memory,
)
assert edge_data.shape[0] == num_edges
edge_attributes[graph_feature["name"]] = edge_data
else:
# Heterogeneous graph.
node_feature_collector = {}
edge_feature_collector = {}
for graph_feature in graph_data["feature_data"]:
in_memory = (
True
if "in_memory" not in graph_feature
else graph_feature["in_memory"]
)
if graph_feature["domain"] == "node":
node_data = read_data(
os.path.join(dataset_dir, graph_feature["path"]),
graph_feature["format"],
in_memory=in_memory,
)
if graph_feature["name"] not in node_feature_collector:
node_feature_collector[graph_feature["name"]] = {}
node_feature_collector[graph_feature["name"]][
graph_feature["type"]
] = node_data
elif graph_feature["domain"] == "edge":
edge_data = read_data(
os.path.join(dataset_dir, graph_feature["path"]),
graph_feature["format"],
in_memory=in_memory,
)
if graph_feature["name"] not in edge_feature_collector:
edge_feature_collector[graph_feature["name"]] = {}
edge_feature_collector[graph_feature["name"]][
graph_feature["type"]
] = edge_data
# For heterogenous, a node/edge feature must cover all node/edge types.
all_node_types = set(node_type_to_id.keys())
for feat_name, feat_data in node_feature_collector.items():
existing_node_type = set(feat_data.keys())
assert all_node_types == existing_node_type, (
f"Node feature {feat_name} does not cover all node types. "
f"Existing types: {existing_node_type}. "
f"Expected types: {all_node_types}."
)
all_edge_types = set(edge_type_to_id.keys())
for feat_name, feat_data in edge_feature_collector.items():
existing_edge_type = set(feat_data.keys())
assert all_edge_types == existing_edge_type, (
f"Edge feature {feat_name} does not cover all edge types. "
f"Existing types: {existing_edge_type}. "
f"Expected types: {all_edge_types}."
)
for feat_name, feat_data in node_feature_collector.items():
_feat = next(iter(feat_data.values()))
feat_tensor = torch.empty(
([total_num_nodes] + list(_feat.shape[1:])),
dtype=_feat.dtype,
)
for ntype, feat in feat_data.items():
feat_tensor[
node_type_offset[
node_type_to_id[ntype]
] : node_type_offset[node_type_to_id[ntype] + 1]
] = feat
node_attributes[feat_name] = feat_tensor
del node_feature_collector
for feat_name, feat_data in edge_feature_collector.items():
_feat = next(iter(feat_data.values()))
feat_tensor = torch.empty(
([total_num_edges] + list(_feat.shape[1:])),
dtype=_feat.dtype,
)
for etype, feat in feat_data.items():
feat_tensor[
edge_type_offset[
edge_type_to_id[etype]
] : edge_type_offset[edge_type_to_id[etype] + 1]
] = feat
edge_attributes[feat_name] = feat_tensor
del edge_feature_collector
if not bool(node_attributes):
node_attributes = None
if not bool(edge_attributes):
edge_attributes = None
# Construct the FusedCSCSamplingGraph.
return fused_csc_sampling_graph(
csc_indptr=indptr,
indices=indices,
node_type_offset=node_type_offset,
type_per_edge=type_per_edge,
node_type_to_id=node_type_to_id,
edge_type_to_id=edge_type_to_id,
node_attributes=node_attributes,
edge_attributes=edge_attributes,
)
def preprocess_ondisk_dataset(
dataset_dir: str,
include_original_edge_id: bool = False,
force_preprocess: bool = None,
auto_cast_to_optimal_dtype: bool = True,
) -> str:
"""Preprocess the on-disk dataset. Parse the input config file,
load the data, and save the data in the format that GraphBolt supports.
Parameters
----------
dataset_dir : str
The path to the dataset directory.
include_original_edge_id : bool, optional
Whether to include the original edge id in the FusedCSCSamplingGraph.
force_preprocess: bool, optional
Whether to force reload the ondisk dataset.
auto_cast_to_optimal_dtype: bool, optional
Casts the dtypes of tensors in the dataset into smallest possible dtypes
for reduced storage requirements and potentially increased performance.
Default is True.
Returns
-------
output_config_path : str
The path to the output config file.
"""
# Check if the dataset path is valid.
if not os.path.exists(dataset_dir):
raise RuntimeError(f"Invalid dataset path: {dataset_dir}")
# Check if the dataset_dir is a directory.
if not os.path.isdir(dataset_dir):
raise RuntimeError(
f"The dataset must be a directory. But got {dataset_dir}"
)
# 0. Check if the dataset is already preprocessed.
processed_dir_prefix = "preprocessed"
preprocess_metadata_path = os.path.join(
processed_dir_prefix, "metadata.yaml"
)
if os.path.exists(os.path.join(dataset_dir, preprocess_metadata_path)):
if force_preprocess is None:
with open(
os.path.join(dataset_dir, preprocess_metadata_path), "r"
) as f:
preprocess_config = yaml.safe_load(f)
if (
preprocess_config.get("include_original_edge_id", None)
== include_original_edge_id
):
force_preprocess = check_dataset_change(
dataset_dir, processed_dir_prefix
)
else:
force_preprocess = True
if force_preprocess:
shutil.rmtree(os.path.join(dataset_dir, processed_dir_prefix))
print(
"The on-disk dataset is re-preprocessing, so the existing "
+ "preprocessed dataset has been removed."
)
else:
print("The dataset is already preprocessed.")
return os.path.join(dataset_dir, preprocess_metadata_path)
print("Start to preprocess the on-disk dataset.")
# Check if the metadata.yaml exists.
metadata_file_path = os.path.join(dataset_dir, "metadata.yaml")
if not os.path.exists(metadata_file_path):
raise RuntimeError("metadata.yaml does not exist.")
# Read the input config.
with open(metadata_file_path, "r") as f:
input_config = yaml.safe_load(f)
# 1. Make `processed_dir_abs` directory if it does not exist.
os.makedirs(os.path.join(dataset_dir, processed_dir_prefix), exist_ok=True)
output_config = deepcopy(input_config)
# 2. Load the data and create a FusedCSCSamplingGraph.
if "graph" not in input_config:
raise RuntimeError("Invalid config: does not contain graph field.")
sampling_graph = _graph_data_to_fused_csc_sampling_graph(
dataset_dir,
input_config["graph"],
include_original_edge_id,
auto_cast_to_optimal_dtype,
)
# 3. Record value of include_original_edge_id.
output_config["include_original_edge_id"] = include_original_edge_id
# 4. Save the FusedCSCSamplingGraph and modify the output_config.
output_config["graph_topology"] = {}
output_config["graph_topology"]["type"] = "FusedCSCSamplingGraph"
output_config["graph_topology"]["path"] = os.path.join(
processed_dir_prefix, "fused_csc_sampling_graph.pt"
)
node_ids_within_int32 = (
sampling_graph.indices.dtype == torch.int32
and auto_cast_to_optimal_dtype
)
torch.save(
sampling_graph,
os.path.join(
dataset_dir,
output_config["graph_topology"]["path"],
),
)
del sampling_graph
del output_config["graph"]
# 5. Load the node/edge features and do necessary conversion.
if input_config.get("feature_data", None):
has_edge_feature_data = False
for feature, out_feature in zip(
input_config["feature_data"], output_config["feature_data"]
):
# Always save the feature in numpy format.
out_feature["format"] = "numpy"
out_feature["path"] = os.path.join(
processed_dir_prefix, feature["path"].replace("pt", "npy")
)
in_memory = (
True if "in_memory" not in feature else feature["in_memory"]
)
if not has_edge_feature_data and feature["domain"] == "edge":
has_edge_feature_data = True
copy_or_convert_data(
os.path.join(dataset_dir, feature["path"]),
os.path.join(dataset_dir, out_feature["path"]),
feature["format"],
output_format=out_feature["format"],
in_memory=in_memory,
is_feature=True,
)
if has_edge_feature_data and not include_original_edge_id:
dgl_warning("Edge feature is stored, but edge IDs are not saved.")
# 6. Save tasks and train/val/test split according to the output_config.
if input_config.get("tasks", None):
for input_task, output_task in zip(
input_config["tasks"], output_config["tasks"]
):
for set_name in ["train_set", "validation_set", "test_set"]:
if set_name not in input_task:
continue
for input_set_per_type, output_set_per_type in zip(
input_task[set_name], output_task[set_name]
):
for input_data, output_data in zip(
input_set_per_type["data"], output_set_per_type["data"]
):
# Always save the feature in numpy format.
output_data["format"] = "numpy"
output_data["path"] = os.path.join(
processed_dir_prefix,
input_data["path"].replace("pt", "npy"),
)
name = (
input_data["name"] if "name" in input_data else None
)
copy_or_convert_data(
os.path.join(dataset_dir, input_data["path"]),
os.path.join(dataset_dir, output_data["path"]),
input_data["format"],
output_data["format"],
within_int32=node_ids_within_int32
and name in NAMES_INDICATING_NODE_IDS,
)
# 7. Save the output_config.
output_config_path = os.path.join(dataset_dir, preprocess_metadata_path)
with open(output_config_path, "w") as f:
yaml.dump(output_config, f)
print("Finish preprocessing the on-disk dataset.")
# 8. Calculate and save the hash value of the dataset directory.
hash_value_file = "dataset_hash_value.txt"
hash_value_file_path = os.path.join(
dataset_dir, processed_dir_prefix, hash_value_file
)
if os.path.exists(hash_value_file_path):
os.remove(hash_value_file_path)
dir_hash = calculate_dir_hash(dataset_dir)
with open(hash_value_file_path, "w") as f:
f.write(json.dumps(dir_hash, indent=4))
# 9. Return the absolute path of the preprocessing yaml file.
return output_config_path
class OnDiskTask:
"""An on-disk task.
An on-disk task is for ``OnDiskDataset``. It contains the metadata and the
train/val/test sets.
"""
def __init__(
self,
metadata: Dict,
train_set: Union[ItemSet, ItemSetDict],
validation_set: Union[ItemSet, ItemSetDict],
test_set: Union[ItemSet, ItemSetDict],
):
"""Initialize a task.
Parameters
----------
metadata : Dict
Metadata.
train_set : Union[ItemSet, ItemSetDict]
Training set.
validation_set : Union[ItemSet, ItemSetDict]
Validation set.
test_set : Union[ItemSet, ItemSetDict]
Test set.
"""
self._metadata = metadata
self._train_set = train_set
self._validation_set = validation_set
self._test_set = test_set
@property
def metadata(self) -> Dict:
"""Return the task metadata."""
return self._metadata
@property
def train_set(self) -> Union[ItemSet, ItemSetDict]:
"""Return the training set."""
return self._train_set
@property
def validation_set(self) -> Union[ItemSet, ItemSetDict]:
"""Return the validation set."""
return self._validation_set
@property
def test_set(self) -> Union[ItemSet, ItemSetDict]:
"""Return the test set."""
return self._test_set
def __repr__(self) -> str:
ret = "{Classname}({attributes})"
attributes_str = ""
attributes = get_attributes(self)
attributes.reverse()
for attribute in attributes:
if attribute[0] == "_":
continue
value = getattr(self, attribute)
attributes_str += f"{attribute}={value},\n"
attributes_str = textwrap.indent(
attributes_str, " " * len("OnDiskTask(")
).strip()
return ret.format(
Classname=self.__class__.__name__, attributes=attributes_str
)
[docs]class OnDiskDataset(Dataset):
"""An on-disk dataset which reads graph topology, feature data and
Train/Validation/Test set from disk.
Due to limited resources, the data which are too large to fit into RAM will
remain on disk while others reside in RAM once ``OnDiskDataset`` is
initialized. This behavior could be controled by user via ``in_memory``
field in YAML file. All paths in YAML file are relative paths to the
dataset directory.
A full example of YAML file is as follows:
.. code-block:: yaml
dataset_name: graphbolt_test
graph:
nodes:
- type: paper # could be omitted for homogeneous graph.
num: 1000
- type: author
num: 1000
edges:
- type: author:writes:paper # could be omitted for homogeneous graph.
format: csv # Can be csv only.
path: edge_data/author-writes-paper.csv
- type: paper:cites:paper
format: csv
path: edge_data/paper-cites-paper.csv
feature_data:
- domain: node
type: paper # could be omitted for homogeneous graph.
name: feat
format: numpy
in_memory: false # If not specified, default to true.
path: node_data/paper-feat.npy
- domain: edge
type: "author:writes:paper"
name: feat
format: numpy
in_memory: false
path: edge_data/author-writes-paper-feat.npy
tasks:
- name: "edge_classification"
num_classes: 10
train_set:
- type: paper # could be omitted for homogeneous graph.
data: # multiple data sources could be specified.
- name: node_pairs
format: numpy # Can be numpy or torch.
in_memory: true # If not specified, default to true.
path: set/paper-train-node_pairs.npy
- name: labels
format: numpy
path: set/paper-train-labels.npy
validation_set:
- type: paper
data:
- name: node_pairs
format: numpy
path: set/paper-validation-node_pairs.npy
- name: labels
format: numpy
path: set/paper-validation-labels.npy
test_set:
- type: paper
data:
- name: node_pairs
format: numpy
path: set/paper-test-node_pairs.npy
- name: labels
format: numpy
path: set/paper-test-labels.npy
Parameters
----------
path: str
The YAML file path.
include_original_edge_id: bool, optional
Whether to include the original edge id in the FusedCSCSamplingGraph.
force_preprocess: bool, optional
Whether to force reload the ondisk dataset.
auto_cast_to_optimal_dtype: bool, optional
Casts the dtypes of tensors in the dataset into smallest possible dtypes
for reduced storage requirements and potentially increased performance.
Default is True.
"""
def __init__(
self,
path: str,
include_original_edge_id: bool = False,
force_preprocess: bool = None,
auto_cast_to_optimal_dtype: bool = True,
) -> None:
# Always call the preprocess function first. If already preprocessed,
# the function will return the original path directly.
self._dataset_dir = path
yaml_path = preprocess_ondisk_dataset(
path,
include_original_edge_id,
force_preprocess,
auto_cast_to_optimal_dtype,
)
with open(yaml_path) as f:
self._yaml_data = yaml.load(f, Loader=yaml.loader.SafeLoader)
self._loaded = False
def _convert_yaml_path_to_absolute_path(self):
"""Convert the path in YAML file to absolute path."""
if "graph_topology" in self._yaml_data:
self._yaml_data["graph_topology"]["path"] = os.path.join(
self._dataset_dir, self._yaml_data["graph_topology"]["path"]
)
if "feature_data" in self._yaml_data:
for feature in self._yaml_data["feature_data"]:
feature["path"] = os.path.join(
self._dataset_dir, feature["path"]
)
if "tasks" in self._yaml_data:
for task in self._yaml_data["tasks"]:
for set_name in ["train_set", "validation_set", "test_set"]:
if set_name not in task:
continue
for set_per_type in task[set_name]:
for data in set_per_type["data"]:
data["path"] = os.path.join(
self._dataset_dir, data["path"]
)
[docs] def load(self, tasks: List[str] = None):
"""Load the dataset.
Parameters
----------
tasks: List[str] = None
The name of the tasks to be loaded. For single task, the type of
tasks can be both string and List[str]. For multiple tasks, only
List[str] is acceptable.
Examples
--------
1. Loading via single task name "node_classification".
>>> dataset = gb.OnDiskDataset(base_dir).load(
... tasks="node_classification")
>>> len(dataset.tasks)
1
>>> dataset.tasks[0].metadata["name"]
"node_classification"
2. Loading via single task name ["node_classification"].
>>> dataset = gb.OnDiskDataset(base_dir).load(
... tasks=["node_classification"])
>>> len(dataset.tasks)
1
>>> dataset.tasks[0].metadata["name"]
"node_classification"
3. Loading via multiple task names ["node_classification",
"link_prediction"].
>>> dataset = gb.OnDiskDataset(base_dir).load(
... tasks=["node_classification","link_prediction"])
>>> len(dataset.tasks)
2
>>> dataset.tasks[0].metadata["name"]
"node_classification"
>>> dataset.tasks[1].metadata["name"]
"link_prediction"
"""
self._convert_yaml_path_to_absolute_path()
self._meta = OnDiskMetaData(**self._yaml_data)
self._dataset_name = self._meta.dataset_name
self._graph = self._load_graph(self._meta.graph_topology)
self._feature = TorchBasedFeatureStore(self._meta.feature_data)
self._tasks = self._init_tasks(self._meta.tasks, tasks)
self._all_nodes_set = self._init_all_nodes_set(self._graph)
self._loaded = True
return self
@property
def yaml_data(self) -> Dict:
"""Return the YAML data."""
return self._yaml_data
@property
def tasks(self) -> List[Task]:
"""Return the tasks."""
self._check_loaded()
return self._tasks
@property
def graph(self) -> SamplingGraph:
"""Return the graph."""
self._check_loaded()
return self._graph
@property
def feature(self) -> TorchBasedFeatureStore:
"""Return the feature."""
self._check_loaded()
return self._feature
@property
def dataset_name(self) -> str:
"""Return the dataset name."""
self._check_loaded()
return self._dataset_name
@property
def all_nodes_set(self) -> Union[ItemSet, ItemSetDict]:
"""Return the itemset containing all nodes."""
self._check_loaded()
return self._all_nodes_set
def _init_tasks(
self, tasks: List[OnDiskTaskData], selected_tasks: List[str]
) -> List[OnDiskTask]:
"""Initialize the tasks."""
if isinstance(selected_tasks, str):
selected_tasks = [selected_tasks]
if selected_tasks and not isinstance(selected_tasks, list):
raise TypeError(
f"The type of selected_task should be list, but got {type(selected_tasks)}"
)
ret = []
if tasks is None:
return ret
task_names = set()
for task in tasks:
task_name = task.extra_fields.get("name", None)
if selected_tasks is None or task_name in selected_tasks:
ret.append(
OnDiskTask(
task.extra_fields,
self._init_tvt_set(task.train_set),
self._init_tvt_set(task.validation_set),
self._init_tvt_set(task.test_set),
)
)
if selected_tasks:
task_names.add(task_name)
if selected_tasks:
not_found_tasks = set(selected_tasks) - task_names
if len(not_found_tasks):
dgl_warning(
f"Below tasks are not found in YAML: {not_found_tasks}. Skipped."
)
return ret
def _check_loaded(self):
assert self._loaded, (
"Please ensure that you have called the OnDiskDataset.load() method"
+ " to properly load the data."
)
def _load_graph(
self, graph_topology: OnDiskGraphTopology
) -> FusedCSCSamplingGraph:
"""Load the graph topology."""
if graph_topology is None:
return None
if graph_topology.type == "FusedCSCSamplingGraph":
return torch.load(graph_topology.path)
raise NotImplementedError(
f"Graph topology type {graph_topology.type} is not supported."
)
def _init_tvt_set(
self, tvt_set: List[OnDiskTVTSet]
) -> Union[ItemSet, ItemSetDict]:
"""Initialize the TVT set."""
ret = None
if (tvt_set is None) or (len(tvt_set) == 0):
return ret
if tvt_set[0].type is None:
assert (
len(tvt_set) == 1
), "Only one TVT set is allowed if type is not specified."
ret = ItemSet(
tuple(
read_data(data.path, data.format, data.in_memory)
for data in tvt_set[0].data
),
names=tuple(data.name for data in tvt_set[0].data),
)
else:
data = {}
for tvt in tvt_set:
data[tvt.type] = ItemSet(
tuple(
read_data(data.path, data.format, data.in_memory)
for data in tvt.data
),
names=tuple(data.name for data in tvt.data),
)
ret = ItemSetDict(data)
return ret
def _init_all_nodes_set(self, graph) -> Union[ItemSet, ItemSetDict]:
if graph is None:
dgl_warning(
"`all_nodes_set` is returned as None, since graph is None."
)
return None
num_nodes = graph.num_nodes
dtype = graph.indices.dtype
if isinstance(num_nodes, int):
return ItemSet(
torch.tensor(num_nodes, dtype=dtype),
names="seed_nodes",
)
else:
data = {
node_type: ItemSet(
torch.tensor(num_node, dtype=dtype),
names="seed_nodes",
)
for node_type, num_node in num_nodes.items()
}
return ItemSetDict(data)
[docs]class BuiltinDataset(OnDiskDataset):
"""A utility class to download built-in dataset from AWS S3 and load it as
:class:`OnDiskDataset`.
Available built-in datasets include:
**cora**
The cora dataset is a homogeneous citation network dataset, which is
designed for the node classification task.
**ogbn-mag**
The ogbn-mag dataset is a heterogeneous network composed of a subset of
the Microsoft Academic Graph (MAG). See more details in
`ogbn-mag <https://ogb.stanford.edu/docs/nodeprop/#ogbn-mag>`_.
.. note::
Reverse edges are added to the original graph and duplicated
edges are removed.
**ogbl-citation2**
The ogbl-citation2 dataset is a directed graph, representing the
citation network between a subset of papers extracted from MAG. See
more details in `ogbl-citation2
<https://ogb.stanford.edu/docs/linkprop/#ogbl-citation2>`_.
.. note::
Reverse edges are added to the original graph and duplicated
edges are removed.
**ogbn-arxiv**
The ogbn-arxiv dataset is a directed graph, representing the citation
network between all Computer Science (CS) arXiv papers indexed by MAG.
See more details in `ogbn-arxiv
<https://ogb.stanford.edu/docs/nodeprop/#ogbn-arxiv>`_.
.. note::
Reverse edges are added to the original graph and duplicated
edges are removed.
**ogbn-papers100M**
The ogbn-papers100M dataset is a directed graph, representing the citation
network between all Computer Science (CS) arXiv papers indexed by MAG.
See more details in `ogbn-papers100M
<https://ogb.stanford.edu/docs/nodeprop/#ogbn-papers100M>`_.
.. note::
Reverse edges are added to the original graph and duplicated
edges are removed.
**ogbn-products**
The ogbn-products dataset is an undirected and unweighted graph,
representing an Amazon product co-purchasing network. See more details
in `ogbn-products
<https://ogb.stanford.edu/docs/nodeprop/#ogbn-products>`_.
.. note::
Reverse edges are added to the original graph.
Node features are stored as float32.
**ogb-lsc-mag240m**
The ogb-lsc-mag240m dataset is a heterogeneous academic graph extracted
from the Microsoft Academic Graph (MAG). See more details in
`ogb-lsc-mag240m <https://ogb.stanford.edu/docs/lsc/mag240m/>`_.
.. note::
Reverse edges are added to the original graph.
Parameters
----------
name : str
The name of the builtin dataset.
root : str, optional
The root directory of the dataset. Default ot ``datasets``.
"""
# For dataset that is smaller than 30GB, we use the base url.
# Otherwise, we use the accelerated url.
_base_url = "https://data.dgl.ai/dataset/graphbolt/"
_accelerated_url = (
"https://dgl-data.s3-accelerate.amazonaws.com/dataset/graphbolt/"
)
_datasets = [
"cora",
"ogbn-mag",
"ogbl-citation2",
"ogbn-products",
"ogbn-arxiv",
]
_large_datasets = ["ogb-lsc-mag240m", "ogbn-papers100M"]
_all_datasets = _datasets + _large_datasets
def __init__(self, name: str, root: str = "datasets") -> OnDiskDataset:
dataset_dir = os.path.join(root, name)
if not os.path.exists(dataset_dir):
if name not in self._all_datasets:
raise RuntimeError(
f"Dataset {name} is not available. Available datasets are "
f"{self._all_datasets}."
)
url = (
self._accelerated_url
if name in self._large_datasets
else self._base_url
)
url += name + ".zip"
os.makedirs(root, exist_ok=True)
zip_file_path = os.path.join(root, name + ".zip")
download(url, path=zip_file_path)
extract_archive(zip_file_path, root, overwrite=True)
os.remove(zip_file_path)
super().__init__(dataset_dir, force_preprocess=False)