Source code for pygod.models.ocgnn

# -*- coding: utf-8 -*-
""" One-Class Graph Neural Networks for Anomaly Detection in Attributed Networks
# Author: Xueying Ding <>
# License: BSD 2 clause

import numpy as np
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch_geometric.nn import GCNConv
from sklearn.utils.validation import check_is_fitted
from torch_sparse import SparseTensor

from . import BaseDetector
from ..utils.utility import validate_device
from ..utils.metric import eval_roc_auc
from torch_geometric.loader import NeighborLoader

class GCN_base(nn.Module):
    Describe: Backbone GCN module.

    def __init__(self, in_feats, n_hidden, n_layers, dropout, act):
        super(GCN_base, self).__init__()
        self.layers = nn.ModuleList()
        # input layer
        self.layers.append(GCNConv(in_feats, n_hidden, bias=False))
        # hidden layers
        for i in range(n_layers):
            self.layers.append(GCNConv(n_hidden, n_hidden, bias=False))
        self.dropout = nn.Dropout(p=dropout)
        self.act = act

    def forward(self, x, edge_index):
        for i, layer in enumerate(self.layers[0:-1]):
            x = layer(x, edge_index)
            x = self.act(x)
            x = self.dropout(x)
        x = self.layers[-1](x, edge_index)
        return x

[docs]class OCGNN(BaseDetector): """ OCGNN (One-Class Graph Neural Networks for Anomaly Detection in Attributed Networks): OCGNN is an anomaly detector that measures the distance of anomaly to the centroid, in the similar fashion to the support vector machine, but in the embedding space after feeding towards several layers of GCN. See :cite:`wang2021one` for details. Parameters ---------- n_hidden : int, optional Hidden dimension of model. Defaults: `256``. n_layers : int, optional Dimensions of underlying GCN. Defaults: ``4``. contamination : float, optional Valid in (0., 0.5). The proportion of outliers in the data set. Used when fitting to define the threshold on the decision function. Default: ``0.1``. dropout : float, optional Dropout rate. Defaults: ``0.3``. weight_decay : float, optional Weight decay (L2 penalty). Defaults: ``0.``. act : callable activation function or None, optional Activation function if not None. Defaults: ``torch.nn.functional.relu``. eps : float, optional A small valid number for determining the center and make sure it does not collapse to 0. Defaults: ``0.001``. nu: float, optional Regularization parameter. Defaults: ``0.5`` lr : float, optional Learning rate. Defaults: ``0.005``. epoch : int, optional Maximum number of training epoch. Defaults: ``5``. warmup_epoch : int, optional Number of epochs to update radius and center in the beginning of training. Defaults: ``2``. gpu : int GPU Index, -1 for using CPU. Defaults: ``0``. verbose : bool Verbosity mode. Turn on to print out log information. Defaults: ``False``. batch_size : int, optional Minibatch size, 0 for full batch training. Default: ``0``. num_neigh : int, optional Number of neighbors in sampling, -1 for all neighbors. Default: ``-1``. Examples -------- >>> from pygod.models import AnomalyDAE >>> model = OCGNN() >>> # PyG graph data object >>> prediction = model.predict(data) """ def __init__(self, n_hidden=256, n_layers=4, contamination=0.1, dropout=0.3, lr=0.005, weight_decay=0, eps=0.001, nu=0.5, gpu=0, epoch=5, warmup_epoch=2, verbose=False, act=F.relu, batch_size = 0, num_neigh = -1): super(OCGNN, self).__init__(contamination=contamination) self.dropout = dropout self.n_hidden = n_hidden self.n_layers = n_layers = lr self.weight_decay = weight_decay self.eps = eps = nu self.data_center = 0 self.radius = 0.0 self.epoch = epoch self.warmup_epoch = warmup_epoch self.act = act self.device = validate_device(gpu) self.batch_size = batch_size self.num_neigh = num_neigh # other param self.verbose = verbose self.model = None def init_center(self, x, edge_index): """ Initialize hypersphere center c as the mean from an initial forward pass on the data. Parameters ---------- x : torch.Tensor Node features. edge_index : torch.Tensor Edge indices for the graph data Returns ---------- c : torch.Tensor The new centroid. """ n_samples = 0 self.model.eval() with torch.no_grad(): outputs = self.model(x, edge_index) # get the inputs of the batch n_samples = outputs.shape[0] c = torch.sum(outputs, dim=0).to(self.device) # print(outputs) c /= n_samples # If c_i is too close to 0, set to +-eps. Reason: a zero unit can be # trivially matched with zero weights. c[(abs(c) < self.eps) & (c < 0)] = -self.eps c[(abs(c) < self.eps) & (c > 0)] = self.eps return c def get_radius(self, dist): """ Optimally solve for radius R via the (1-nu)-quantile of distances. Parameters ---------- dist : torch.Tensor Distance of the data points, calculated by the loss function. Returns ---------- r : numpy.array New radius. """ radius = np.quantile(np.sqrt(dist.clone().data.cpu().numpy()), 1 - return radius def anomaly_scores(self, outputs): """ Calculate the anomaly score given by Euclidean distance to the center. Parameters ---------- outputs : torch.Tensor The output in the reduced space by GCN. Returns ---------- dist : torch.Tensor Average distance. scores : torch.Tensor Anomaly scores. """ dist = torch.sum((outputs - self.data_center) ** 2, dim=1) scores = dist - self.radius ** 2 return dist, scores def loss_function(self, outputs, update=False): """ Calculate the loss in paper Equation (4) Parameters ---------- outputs : torch.Tensor The output in the reduced space by GCN. update : bool, optional (default=False) If you need to update the radius, set update=True. Returns ---------- dist : torch.Tensor Average distance. scores : torch.Tensor Anomaly scores. loss : torch.Tensor A combined loss of radius and average scores. """ dist, scores = self.anomaly_scores(outputs) loss = self.radius ** 2 + (1 / * torch.mean( torch.max(torch.zeros_like(scores), scores)) if update: self.radius = torch.tensor(self.get_radius(dist), device = self.device) return loss, dist, scores
[docs] def fit(self, G, y_true=None): """ Fit detector with input data. Parameters ---------- G : The input data. y_true : numpy.ndarray, optional The optional outlier ground truth labels used to monitor the training progress. They are not used to optimize the unsupervised model. Default: ``None``. Returns ------- self : object Fitted estimator. """ G.node_idx = torch.arange(G.x.shape[0]) if self.batch_size == 0: self.batch_size = G.x.shape[0] loader = NeighborLoader(G, [self.num_neigh] * self.n_layers, batch_size=self.batch_size) self.in_feats = G.x.shape[1] # initialize the model and optimizer self.model = GCN_base(self.in_feats, self.n_hidden, self.n_layers, self.dropout, self.act) self.optimizer = torch.optim.Adam(self.model.parameters(),, weight_decay=self.weight_decay) self.data_center = torch.zeros(self.n_hidden, device=self.device) self.radius = torch.tensor(0, device=self.device) self.model = # training the model self.model.train() decision_scores = np.zeros(G.x.shape[0]) for cur_epoch in range(self.epoch): epoch_loss = 0.0 for sampled_data in loader: batch_size = sampled_data.batch_size node_idx = sampled_data.node_idx x, adj, edge_index = self.process_graph(sampled_data) outputs = self.model(x, edge_index) loss, dist, score = self.loss_function(outputs) epoch_loss += loss.item() * batch_size if self.warmup_epoch is not None and cur_epoch < self.warmup_epoch: self.data_center = self.init_center(x, edge_index) self.radius = torch.tensor(self.get_radius(dist), device=self.device) decision_scores[node_idx[:batch_size]] = score.detach()\ .cpu().numpy() self.model.zero_grad() loss.backward() self.optimizer.step() if self.verbose: print("Epoch {:04d}: Loss {:.4f}" .format(cur_epoch, epoch_loss / G.x.shape[0]), end='') if y_true is not None: auc = eval_roc_auc(y_true, decision_scores) print(" | AUC {:.4f}".format(auc), end='') print() self.decision_scores_ = decision_scores self._process_decision_scores() return self
def process_graph(self, G): """ Process the raw PyG data object into a tuple of sub data objects needed for the model. Parameters ---------- G : PyTorch Geometric Data instance ( The input data. Returns ------- x : torch.Tensor Attribute (feature) of nodes. adj : torch.Tensor Adjacency matrix of the graph. edge_index : torch.Tensor Edge list of the graph. """ edge_index = G.edge_index # via sparse matrix operation dense_adj \ = SparseTensor(row=edge_index[0], col=edge_index[1]).to_dense() # adjacency matrix normalization rowsum = dense_adj.sum(1) d_inv_sqrt = torch.pow(rowsum, -0.5).flatten() d_inv_sqrt[torch.isinf(d_inv_sqrt)] = 0. d_mat_inv_sqrt = torch.diag(d_inv_sqrt) adj = (dense_adj * d_mat_inv_sqrt).T * d_mat_inv_sqrt edge_index = adj = x = # return data objects needed for the network return x, adj, edge_index
[docs] def decision_function(self, G): """Predict raw anomaly score of X using the fitted detector. The anomaly score of an input sample is computed based on distance to the centroid and measurement within the radius Parameters ---------- G : PyTorch Geometric Data instance ( The input data. Returns ------- anomaly_scores : numpy.array The anomaly score of the input samples of shape (n_samples,). """ check_is_fitted(self, ['model']) # get needed data object from the input data G.node_idx = torch.arange(G.x.shape[0]) loader = NeighborLoader(G, [self.num_neigh] * self.n_layers, batch_size=self.batch_size) # enable the evaluation mode self.model.eval() # construct the vector for holding the reconstruction error # outlier_scores = torch.zeros([attrs.shape[0], ]) outlier_scores = np.zeros(G.x.shape[0]) for sampled_data in loader: batch_size = sampled_data.batch_size node_idx = sampled_data.node_idx x, adj, edge_index = self.process_graph(sampled_data) outputs = self.model(x, edge_index) loss, dist, score = self.loss_function(outputs) outlier_scores[node_idx[:batch_size]] = score.detach() \ .cpu().numpy() return outlier_scores