图神经网络(Graph Neural Networks)最近是越来越火,很多问题都可以用图神经网络找到新的解决方法。 今天我们就来看怎么用 PyTorch 和 PyTorch Geometric (PyG) 实现图神经网络。PyG 是一款号称比 DGL 快 14 倍的基于 PyTorch 的几何深度学习框架,可以简单方便的实现图神经网络。
另外,在 PyG 的 Github 页面上,有一个现在已经在 PyG 里实现的网络列表,大家可以参考一下。
系统需求
要求至少安装 PyTorch 1.2.0 版本。
1 2 | $ python -c "import torch; print(torch.__version__)" >>> 1.2.0 |
PyTorch Geometric 基础知识
这一部分我们介绍一下 PyG 的基础知识,主要包括 torch_geometric.data 和 torch_geometric.nn 部分。另外,还会介绍怎么设计自己的 Message Passing Layer。
Data 类
torch_geometric.data 包里有一个 Data 类,通过 Data 类我们可以很方便的创建图结构。
定义一个图结构,需要以下变量:
- 每个节点(node)的 features
- 边的连接关系或者边的 features
我们以下面的图结构为例,看看怎么用 Data 类创建图结构:
在上图中,一共有四个节点 \(v_1, v_2, v_3, v_4\),其中每个节点都有一个二维的特征向量和一个标签 \(y\)。这个特征向量和标签可以用 FloatTensor
来表示:
1 2 | x = torch.tensor([[2,1], [5,6], [3,7], [12,0]], dtype=torch.float) y = torch.tensor([0, 1, 0, 1], dtype=torch.float) |
图的连接关系(边)可以用 COO 格式表示。COO 格式的维度是 [2, num_edges]
,其中第一个列表是所有边上起始节点的 index,第二个列表是对应边上目标节点的 index:
1 2 | edge_index = torch.tensor([[0, 1, 2, 0, 3], [1, 0, 1, 3, 2]], dtype=torch.long) |
注意上面的数据里定义边的顺序是无关紧要的,这个数据仅仅用来计算邻接矩阵用的,比如上面的定义和下面的定义是等价的:
1 2 | edge_index = torch.tensor([[0, 2, 1, 0, 3], [3, 1, 0, 1, 2]], dtype=torch.long) |
综上所述,我们可以这样定义上面的图结构:
1 2 3 4 5 6 7 8 9 10 11 12 13 | import torch from torch_geometric.data import Data x = torch.tensor([[2,1], [5,6], [3,7], [12,0]], dtype=torch.float) y = torch.tensor([0, 1, 0, 1], dtype=torch.float) edge_index = torch.tensor([[0, 2, 1, 0, 3], [3, 1, 0, 1, 2]], dtype=torch.long) data = Data(x=x, y=y, edge_index=edge_index) >>> Data(edge_index=[2, 5], x=[4, 2], y=[4]) |
Dataset
PyG 里有两种数据集类型:InMemoryDataset 和 Dataset,第一种适用于可以全部放进内存中的小数据集,第二种则适用于不能一次性放进内存中的大数据集。我们以 InMemoryDataset 为例。
InMemoryDataset 中有下列四个函数需要我们实现:
raw_file_names()
返回一个包含所有未处理过的数据文件的文件名的列表。
起始也可以返回一个空列表,然后在后面要说的 process()
函数里再定义。
processed_file_names()
返回一个包含所有处理过的数据文件的文件名的列表。
download()
如果在数据加载前需要先下载,则在这里定义下载过程,下载到 self.raw_dir
中定义的文件夹位置。
如果不需要下载,返回 pass
即可。
process()
这是最重要的一个函数,我们需要在这个函数里把数据处理成一个 Data
对象。下面是官方的一个示例代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 | import torch from torch_geometric.data import InMemoryDataset class MyOwnDataset(InMemoryDataset): def __init__(self, root, transform=None, pre_transform=None): super(MyOwnDataset, self).__init__(root, transform, pre_transform) self.data, self.slices = torch.load(self.processed_paths[0]) @property def raw_file_names(self): return ['some_file_1', 'some_file_2', ...] @property def processed_file_names(self): return ['data.pt'] def download(self): # Download to `self.raw_dir`. def process(self): # Read data into huge `Data` list. data_list = [...] if self.pre_filter is not None: data_list [data for data in data_list if self.pre_filter(data)] if self.pre_transform is not None: data_list = [self.pre_transform(data) for data in data_list] data, slices = self.collate(data_list) torch.save((data, slices), self.processed_paths[0]) |
本文接下来会介绍如何用 RecSys Challenge 2015 的数据创建一个自定义数据集。
DataLoader
这个类可以帮助我们将数据按 batch 传给 model,定义的方法如下,需要制定 batch_size
和 dataset
:
1 | loader = DataLoader(dataset, batch_size=512, shuffle=True) |
每个 loader 的循环都返回一个 Batch
对象:
1 2 3 4 | for batch in loader: batch >>> Batch(x=[1024, 21], edge_index=[2, 1568], y=[512], batch=[1024]) |
Batch
相比 Data
对象多了一个 batch
参数,告诉我们这个 batch 里都包含哪些 nodes,便于计算。
MessagePassing
Message Passing 是图网络中学习 node embedding 的重要方法。点击这里查看官方文档对这个的详细说明,我们接下来也将基于官方的说明来讲解。
Message Passing 的公示如下:
其中,\(x\) 表示表格节点的 embedding,\(e\) 表示边的特征,\(\phi\) 表示 message 函数,\(□\) 表示聚合 aggregation 函数,\(\gamma\) 表示 update 函数。上标表示层的 index,比如说,当 k = 1 时,\(x\) 则表示所有输入网络的图结构的数据。
下面是每个函数的介绍:
propagate(edge_index, size=None, **kwargs)
这个函数最终会调用 message
和 update
函数。
message(**kwargs)
这个函数定义了对于每个节点对 \((x_i, x_j)\),怎样生成信息(message)。
update(aggr_out, **kwargs)
这个函数利用聚合好的信息(message)更新每个节点的 embedding。
示例:SageConv
我们来看看怎样实现论文 “Inductive Representation Learning on Large Graphs” 中的 SageConv 层。SageConv 的 Message Passing 定义如下:
聚合函数(aggregation)我们用最大池化(max pooling),这样上述公示中的 AGGREGATE 可以写为:
上述公式中,对于每个邻居节点,都和一个 weighted matrix 相乘,并且加上一个 bias,传给一个激活函数。相关代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 | class SAGEConv(MessagePassing): def __init__(self, in_channels, out_channels): super(SAGEConv, self).__init__(aggr='max') self.lin = torch.nn.Linear(in_channels, out_channels) self.act = torch.nn.ReLU() def message(self, x_j): # x_j has shape [E, in_channels] x_j = self.lin(x_j) x_j = self.act(x_j) return x_j |
对于 update 方法,我们需要聚合更新每个节点的 embedding,然后加上权重矩阵和偏置:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 | class SAGEConv(MessagePassing): def __init__(self, in_channels, out_channels): super(SAGEConv, self).__init__(aggr='max') self.update_lin = torch.nn.Linear(in_channels + out_channels, in_channels, bias=False) self.update_act = torch.nn.ReLU() def update(self, aggr_out, x): # aggr_out has shape [N, out_channels] new_embedding = torch.cat([aggr_out, x], dim=1) new_embedding = self.update_lin(new_embedding) new_embedding = torch.update_act(new_embedding) return new_embedding |
综上所述,SageConv 层的定于方法如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 | import torch from torch.nn import Sequential as Seq, Linear, ReLU from torch_geometric.nn import MessagePassing from torch_geometric.utils import remove_self_loops, add_self_loops class SAGEConv(MessagePassing): def __init__(self, in_channels, out_channels): super(SAGEConv, self).__init__(aggr='max') # "Max" aggregation. self.lin = torch.nn.Linear(in_channels, out_channels) self.act = torch.nn.ReLU() self.update_lin = torch.nn.Linear(in_channels + out_channels, in_channels, bias=False) self.update_act = torch.nn.ReLU() def forward(self, x, edge_index): # x has shape [N, in_channels] # edge_index has shape [2, E] edge_index, _ = remove_self_loops(edge_index) edge_index, _ = add_self_loops(edge_index, num_nodes=x.size(0)) return self.propagate(edge_index, size=(x.size(0), x.size(0)), x=x) def message(self, x_j): # x_j has shape [E, in_channels] x_j = self.lin(x_j) x_j = self.act(x_j) return x_j def update(self, aggr_out, x): # aggr_out has shape [N, out_channels] new_embedding = torch.cat([aggr_out, x], dim=1) new_embedding = self.update_lin(new_embedding) new_embedding = self.update_act(new_embedding) return new_embedding |
示例:RecSys Challenge 2015
RecSys Challenge 2015 是一个挑战赛,主要目的是创建一个 session-based recommender system。主要任务有两个:
- 预测经过一系列的点击后,是否会产生购买行为。
- 预测购买的商品。
数据下载地址在这里。数据主要包含两部分:yoochoose-clicks.dat 点击数据, 和 yoochoose-buys.dat 购买行为数据。
点击数据的示例如下:
购买行为数据示例如下:
数据预处理 Preprocessing
下载好数据后,我们先进行一些预处理:
1 2 3 4 5 6 7 8 9 10 11 | from sklearn.preprocessing import LabelEncoder df = pd.read_csv('../input/yoochoose-click.dat', header=None) df.columns=['session_id','timestamp','item_id','category'] buy_df = pd.read_csv('../input/yoochoose-buys.dat', header=None) buy_df.columns=['session_id','timestamp','item_id','price','quantity'] item_encoder = LabelEncoder() df['item_id'] = item_encoder.fit_transform(df.item_id) df.head() |
处理后的数据示例如下:
因为数据太多,我们随机进行取样以方便讲解:
1 2 3 4 | #randomly sample a couple of them sampled_session_id = np.random.choice(df.session_id.unique(), 1000000, replace=False) df = df.loc[df.session_id.isin(sampled_session_id)] df.nunique() |
取样的数据统计如下:
另外,为获取标签,即对于某个特定的 session,是否产生了购买行为,我们只需要检查文件 yoochoose-clicks.dat 中的 session_id 是否在文件 yoochoose-buys.dat 中出现即可:
1 2 | df['label'] = df.session_id.isin(buy_df.session_id) df.head() |
结果如下:
创建 Dataset
这里我们将预处理过的数据创建成为 Dataset
对象。对于每个 session,里面的每个商品(item)看作一个节点,因此每个 session 里所有的商品组成一个图。
首先,我们将数据集按照 session_id 进行分组,分组过程中 item_id 也要被重新编码,因为对于每个图,每个节点的 index 应该从 0 开始:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 | import torch from torch_geometric.data import InMemoryDataset from tqdm import tqdm class YooChooseBinaryDataset(InMemoryDataset): def __init__(self, root, transform=None, pre_transform=None): super(YooChooseBinaryDataset, self).__init__(root, transform, pre_transform) self.data, self.slices = torch.load(self.processed_paths[0]) @property def raw_file_names(self): return [] @property def processed_file_names(self): return ['../input/yoochoose_click_binary_1M_sess.dataset'] def download(self): pass def process(self): data_list = [] # process by session_id grouped = df.groupby('session_id') for session_id, group in tqdm(grouped): sess_item_id = LabelEncoder().fit_transform(group.item_id) group = group.reset_index(drop=True) group['sess_item_id'] = sess_item_id node_features = group.loc[group.session_id==session_id,['sess_item_id','item_id']].sort_values('sess_item_id').item_id.drop_duplicates().values node_features = torch.LongTensor(node_features).unsqueeze(1) target_nodes = group.sess_item_id.values[1:] source_nodes = group.sess_item_id.values[:-1] edge_index = torch.tensor([source_nodes, target_nodes], dtype=torch.long) x = node_features y = torch.FloatTensor([group.label.values[0]]) data = Data(x=x, edge_index=edge_index, y=y) data_list.append(data) data, slices = self.collate(data_list) torch.save((data, slices), self.processed_paths[0]) |
然后我们对数据集进行随机排序,分成 training, validation 和 testing 三个子数据集:
1 2 3 4 5 | dataset = dataset.shuffle() train_dataset = dataset[:800000] val_dataset = dataset[800000:900000] test_dataset = dataset[900000:] len(train_dataset), len(val_dataset), len(test_dataset) |
创建图网络(Graph Neural Network)
下列代码过程参考了官方的一个示例并做了适当的修改:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 | embed_dim = 128 from torch_geometric.nn import TopKPooling from torch_geometric.nn import global_mean_pool as gap, global_max_pool as gmp import torch.nn.functional as F class Net(torch.nn.Module): def __init__(self): super(Net, self).__init__() self.conv1 = SAGEConv(embed_dim, 128) self.pool1 = TopKPooling(128, ratio=0.8) self.conv2 = SAGEConv(128, 128) self.pool2 = TopKPooling(128, ratio=0.8) self.conv3 = SAGEConv(128, 128) self.pool3 = TopKPooling(128, ratio=0.8) self.item_embedding = torch.nn.Embedding(num_embeddings=df.item_id.max() +1, embedding_dim=embed_dim) self.lin1 = torch.nn.Linear(256, 128) self.lin2 = torch.nn.Linear(128, 64) self.lin3 = torch.nn.Linear(64, 1) self.bn1 = torch.nn.BatchNorm1d(128) self.bn2 = torch.nn.BatchNorm1d(64) self.act1 = torch.nn.ReLU() self.act2 = torch.nn.ReLU() def forward(self, data): x, edge_index, batch = data.x, data.edge_index, data.batch x = self.item_embedding(x) x = x.squeeze(1) x = F.relu(self.conv1(x, edge_index)) x, edge_index, _, batch, _ = self.pool1(x, edge_index, None, batch) x1 = torch.cat([gmp(x, batch), gap(x, batch)], dim=1) x = F.relu(self.conv2(x, edge_index)) x, edge_index, _, batch, _ = self.pool2(x, edge_index, None, batch) x2 = torch.cat([gmp(x, batch), gap(x, batch)], dim=1) x = F.relu(self.conv3(x, edge_index)) x, edge_index, _, batch, _ = self.pool3(x, edge_index, None, batch) x3 = torch.cat([gmp(x, batch), gap(x, batch)], dim=1) x = x1 + x2 + x3 x = self.lin1(x) x = self.act1(x) x = self.lin2(x) x = self.act2(x) x = F.dropout(x, p=0.5, training=self.training) x = torch.sigmoid(self.lin3(x)).squeeze(1) return x |
训练
训练过程中,我们使用 Adam 优化器,学习率 0.005,损失函数是 BCE:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 | def train(): model.train() loss_all = 0 for data in train_loader: data = data.to(device) optimizer.zero_grad() output = model(data) label = data.y.to(device) loss = crit(output, label) loss.backward() loss_all += data.num_graphs * loss.item() optimizer.step() return loss_all / len(train_dataset) device = torch.device('cuda') model = Net().to(device) optimizer = torch.optim.Adam(model.parameters(), lr=0.005) crit = torch.nn.BCELoss() train_loader = DataLoader(train_dataset, batch_size=batch_size) for epoch in range(num_epochs): train() |
Validation
这个数据集非常的不平衡,因为大多数的 session 里没有购买行为。也就是说,如果一个模型将所有的结果都预测为 false,也能达到 90% 的准确率。因此,这里我们不使用 accuracy 作为评测标准,而是使用 Area Under Curve (AUC):
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 | def evaluate(loader): model.eval() predictions = [] labels = [] with torch.no_grad(): for data in loader: data = data.to(device) pred = model(data).detach().cpu().numpy() label = data.y.detach().cpu().numpy() predictions.append(pred) labels.append(label) |
训练结果
下面是模型训练了 1 epoch 的结果:
1 2 3 4 5 6 7 | for epoch in range(1): loss = train() train_acc = evaluate(train_loader) val_acc = evaluate(val_loader) test_acc = evaluate(test_loader) print('Epoch: {:03d}, Loss: {:.5f}, Train Auc: {:.5f}, Val Auc: {:.5f}, Test Auc: {:.5f}'. format(epoch, loss, train_acc, val_acc, test_acc)) |
可以看到,我们用非常少的数据,在只训练了 1 个 epoch 的情况下,测试集的 AUC 也能达到 0.73。如果用更多的数据训练,应该可以达到更好的结果。
本站微信群、QQ群(三群号 726282629):
dataset = dataset.shuffle()
这行的dataset 引用的是哪个头文件(dataset.shuffle())
YooChooseBinaryDataset.process(InMemoryDataset)