# -*- coding: utf-8 -*-
""" One-Class Graph Neural Networks for Anomaly Detection in Attributed Networks
"""
# Author: Xueying Ding <xding2@andrew.cmu.edu>
# License: BSD 2 clause
import numpy as np
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch_geometric.nn import GCNConv
from sklearn.utils.validation import check_is_fitted
from torch_sparse import SparseTensor
from . import BaseDetector
from ..utils import validate_device
from ..metrics import eval_roc_auc
from torch_geometric.loader import NeighborLoader
class GCN_base(nn.Module):
"""
Describe: Backbone GCN module.
"""
def __init__(self, in_feats, n_hidden, n_layers, dropout, act):
super(GCN_base, self).__init__()
self.layers = nn.ModuleList()
# input layer
self.layers.append(GCNConv(in_feats, n_hidden, bias=False))
# hidden layers
for i in range(n_layers):
self.layers.append(GCNConv(n_hidden, n_hidden, bias=False))
self.dropout = nn.Dropout(p=dropout)
self.act = act
def forward(self, x, edge_index):
for i, layer in enumerate(self.layers[0:-1]):
x = layer(x, edge_index)
x = self.act(x)
x = self.dropout(x)
x = self.layers[-1](x, edge_index)
return x
[docs]class OCGNN(BaseDetector):
"""
OCGNN (One-Class Graph Neural Networks for Anomaly Detection in
Attributed Networks) is an anomaly detector that measures the
distance of anomaly to the centroid, in a similar fashion to the
support vector machine, but in the embedding space after feeding
towards several layers of GCN.
See :cite:`wang2021one` for details.
Parameters
----------
n_hidden : int, optional
Hidden dimension of model. Defaults: `256``.
n_layers : int, optional
Dimensions of underlying GCN. Defaults: ``4``.
contamination : float, optional
Valid in (0., 0.5). The proportion of outliers in the data set.
Used when fitting to define the threshold on the decision
function. Default: ``0.1``.
dropout : float, optional
Dropout rate. Defaults: ``0.3``.
weight_decay : float, optional
Weight decay (L2 penalty). Defaults: ``0.``.
act : callable activation function or None, optional
Activation function if not None.
Defaults: ``torch.nn.functional.relu``.
eps : float, optional
A small valid number for determining the center and make
sure it does not collapse to 0. Defaults: ``0.001``.
nu: float, optional
Regularization parameter. Defaults: ``0.5``
lr : float, optional
Learning rate. Defaults: ``0.005``.
epoch : int, optional
Maximum number of training epoch. Defaults: ``5``.
warmup_epoch : int, optional
Number of epochs to update radius and center in the beginning
of training. Defaults: ``2``.
gpu : int
GPU Index, -1 for using CPU. Defaults: ``0``.
verbose : bool
Verbosity mode. Turn on to print out log information.
Defaults: ``False``.
batch_size : int, optional
Minibatch size, 0 for full batch training. Default: ``0``.
num_neigh : int, optional
Number of neighbors in sampling, -1 for all neighbors.
Default: ``-1``.
Examples
--------
>>> from pygod.models import AnomalyDAE
>>> model = OCGNN()
>>> model.fit(data) # PyG graph data object
>>> prediction = model.predict(data)
"""
def __init__(self,
n_hidden=256,
n_layers=4,
contamination=0.1,
dropout=0.3,
lr=0.005,
weight_decay=0,
eps=0.001,
nu=0.5,
gpu=0,
epoch=5,
warmup_epoch=2,
verbose=False,
act=F.relu,
batch_size = 0,
num_neigh = -1):
super(OCGNN, self).__init__(contamination=contamination)
self.dropout = dropout
self.n_hidden = n_hidden
self.n_layers = n_layers
self.lr = lr
self.weight_decay = weight_decay
self.eps = eps
self.nu = nu
self.data_center = 0
self.radius = 0.0
self.epoch = epoch
self.warmup_epoch = warmup_epoch
self.act = act
self.device = validate_device(gpu)
self.batch_size = batch_size
self.num_neigh = num_neigh
# other param
self.verbose = verbose
self.model = None
def init_center(self, x, edge_index):
"""
Initialize hypersphere center c as the mean from
an initial forward pass on the data.
Parameters
----------
x : torch.Tensor
Node features.
edge_index : torch.Tensor
Edge indices for the graph data
Returns
----------
c : torch.Tensor
The new centroid.
"""
n_samples = 0
self.model.eval()
with torch.no_grad():
outputs = self.model(x, edge_index)
# get the inputs of the batch
n_samples = outputs.shape[0]
c = torch.sum(outputs, dim=0).to(self.device)
# print(outputs)
c /= n_samples
# If c_i is too close to 0, set to +-eps. Reason: a zero unit can be
# trivially matched with zero weights.
c[(abs(c) < self.eps) & (c < 0)] = -self.eps
c[(abs(c) < self.eps) & (c > 0)] = self.eps
return c
def get_radius(self, dist):
"""
Optimally solve for radius R via the (1-nu)-quantile of distances.
Parameters
----------
dist : torch.Tensor
Distance of the data points, calculated by the loss function.
Returns
----------
r : numpy.array
New radius.
"""
radius = np.quantile(np.sqrt(dist.clone().data.cpu().numpy()),
1 - self.nu)
return radius
def anomaly_scores(self, outputs):
"""
Calculate the anomaly score given by Euclidean distance to the center.
Parameters
----------
outputs : torch.Tensor
The output in the reduced space by GCN.
Returns
----------
dist : torch.Tensor
Average distance.
scores : torch.Tensor
Anomaly scores.
"""
dist = torch.sum((outputs - self.data_center) ** 2, dim=1)
scores = dist - self.radius ** 2
return dist, scores
def loss_function(self, outputs, update=False):
"""
Calculate the loss in paper Equation (4)
Parameters
----------
outputs : torch.Tensor
The output in the reduced space by GCN.
update : bool, optional (default=False)
If you need to update the radius, set update=True.
Returns
----------
dist : torch.Tensor
Average distance.
scores : torch.Tensor
Anomaly scores.
loss : torch.Tensor
A combined loss of radius and average scores.
"""
dist, scores = self.anomaly_scores(outputs)
loss = self.radius ** 2 + (1 / self.nu) * torch.mean(
torch.max(torch.zeros_like(scores), scores))
if update:
self.radius = torch.tensor(self.get_radius(dist), device = self.device)
return loss, dist, scores
[docs] def fit(self, G, y_true=None):
"""
Fit detector with input data.
Parameters
----------
G : torch_geometric.data.Data
The input data.
y_true : numpy.ndarray, optional
The optional outlier ground truth labels used to monitor
the training progress. They are not used to optimize the
unsupervised model. Default: ``None``.
Returns
-------
self : object
Fitted estimator.
"""
G.node_idx = torch.arange(G.x.shape[0])
if self.batch_size == 0:
self.batch_size = G.x.shape[0]
loader = NeighborLoader(G,
[self.num_neigh] * self.n_layers,
batch_size=self.batch_size)
self.in_feats = G.x.shape[1]
# initialize the model and optimizer
self.model = GCN_base(self.in_feats,
self.n_hidden,
self.n_layers,
self.dropout,
self.act)
self.optimizer = torch.optim.Adam(self.model.parameters(),
lr=self.lr,
weight_decay=self.weight_decay)
self.data_center = torch.zeros(self.n_hidden, device=self.device)
self.radius = torch.tensor(0, device=self.device)
self.model = self.model.to(self.device)
# training the model
self.model.train()
decision_scores = np.zeros(G.x.shape[0])
for cur_epoch in range(self.epoch):
epoch_loss = 0.0
for sampled_data in loader:
batch_size = sampled_data.batch_size
node_idx = sampled_data.node_idx
x, adj, edge_index = self.process_graph(sampled_data)
outputs = self.model(x, edge_index)
loss, dist, score = self.loss_function(outputs)
epoch_loss += loss.item() * batch_size
if self.warmup_epoch is not None and cur_epoch < self.warmup_epoch:
self.data_center = self.init_center(x, edge_index)
self.radius = torch.tensor(self.get_radius(dist),
device=self.device)
decision_scores[node_idx[:batch_size]] = score.detach()\
.cpu().numpy()
self.model.zero_grad()
loss.backward()
self.optimizer.step()
if self.verbose:
print("Epoch {:04d}: Loss {:.4f}"
.format(cur_epoch, epoch_loss / G.x.shape[0]), end='')
if y_true is not None:
auc = eval_roc_auc(y_true, decision_scores)
print(" | AUC {:.4f}".format(auc), end='')
print()
self.decision_scores_ = decision_scores
self._process_decision_scores()
return self
def process_graph(self, G):
"""
Process the raw PyG data object into a tuple of sub data
objects needed for the model.
Parameters
----------
G : PyTorch Geometric Data instance (torch_geometric.data.Data)
The input data.
Returns
-------
x : torch.Tensor
Attribute (feature) of nodes.
adj : torch.Tensor
Adjacency matrix of the graph.
edge_index : torch.Tensor
Edge list of the graph.
"""
edge_index = G.edge_index
# via sparse matrix operation
dense_adj \
= SparseTensor(row=edge_index[0], col=edge_index[1]).to_dense()
# adjacency matrix normalization
rowsum = dense_adj.sum(1)
d_inv_sqrt = torch.pow(rowsum, -0.5).flatten()
d_inv_sqrt[torch.isinf(d_inv_sqrt)] = 0.
d_mat_inv_sqrt = torch.diag(d_inv_sqrt)
adj = (dense_adj * d_mat_inv_sqrt).T * d_mat_inv_sqrt
edge_index = edge_index.to(self.device)
adj = adj.to(self.device)
x = G.x.to(self.device)
# return data objects needed for the network
return x, adj, edge_index
[docs] def decision_function(self, G):
"""Predict raw anomaly score of X using the fitted detector.
The anomaly score of an input sample is computed based on distance
to the centroid and measurement within the radius
Parameters
----------
G : PyTorch Geometric Data instance (torch_geometric.data.Data)
The input data.
Returns
-------
anomaly_scores : numpy.array
The anomaly score of the input samples of shape (n_samples,).
"""
check_is_fitted(self, ['model'])
# get needed data object from the input data
G.node_idx = torch.arange(G.x.shape[0])
loader = NeighborLoader(G,
[self.num_neigh] * self.n_layers,
batch_size=self.batch_size)
# enable the evaluation mode
self.model.eval()
# construct the vector for holding the reconstruction error
# outlier_scores = torch.zeros([attrs.shape[0], ])
outlier_scores = np.zeros(G.x.shape[0])
for sampled_data in loader:
batch_size = sampled_data.batch_size
node_idx = sampled_data.node_idx
x, adj, edge_index = self.process_graph(sampled_data)
outputs = self.model(x, edge_index)
loss, dist, score = self.loss_function(outputs)
outlier_scores[node_idx[:batch_size]] = score.detach() \
.cpu().numpy()
return outlier_scores