Source code for pygod.models.radar

# -*- coding: utf-8 -*-
""" Residual Analysis for Anomaly Detection in Attributed Networks
"""
# Author: Kay Liu <zliu234@uic.edu>
# License: BSD 2 clause

import torch
import warnings
from torch import nn
from pygod.metrics import *
import torch.nn.functional as F
from torch_geometric.utils import to_dense_adj
from sklearn.utils.validation import check_is_fitted

from . import BaseDetector
from ..utils import validate_device


[docs]class Radar(BaseDetector): """ Radar (Residual Analysis for Anomaly Detection in Attributed Networks) is an anomaly detector with residual analysis. This model is transductive only. See :cite:`li2017radar` for details. Parameters ---------- gamma : float, optional Loss balance weight for attribute and structure. Default: ``1.``. weight_decay : float, optional Weight decay (alpha and beta in the original paper). Default: ``0.01``. contamination : float, optional Valid in (0., 0.5). The proportion of outliers in the data set. Used when fitting to define the threshold on the decision function. Default: ``0.1``. epoch : int, optional Maximum number of training epoch. Default: ``5``. verbose : bool Verbosity mode. Turn on to print out log information. Default: ``False``. Examples -------- >>> from pygod.models import Radar >>> model = Radar() >>> model.fit(data) # PyG graph data object >>> prediction = model.predict(None) """ def __init__(self, gamma=1., weight_decay=0.01, lr=0.004, epoch=100, gpu=0, contamination=0.1, verbose=False): super(Radar, self).__init__(contamination=contamination) # model param self.gamma = gamma self.weight_decay = weight_decay # training param self.lr = lr self.epoch = epoch self.device = validate_device(gpu) # other param self.verbose = verbose self.model = None
[docs] def fit(self, G, y_true=None): """ Fit detector with input data. Parameters ---------- G : torch_geometric.data.Data The input data. y_true : numpy.ndarray, optional The optional outlier ground truth labels used to monitor the training progress. They are not used to optimize the unsupervised model. Default: ``None``. Returns ------- self : object Fitted estimator. """ G.s = to_dense_adj(G.edge_index)[0] x, s, l, w_init, r_init = self.process_graph(G) self.model = Radar_Base(w_init, r_init) optimizer = torch.optim.Adam(self.model.parameters(), lr=self.lr, weight_decay=self.weight_decay) for epoch in range(self.epoch): x_, r = self.model(x) loss = self._loss(x, x_, r, l) optimizer.zero_grad() loss.backward() optimizer.step() decision_scores = torch.sum(torch.pow(r, 2), dim=1).detach() \ .cpu().numpy() if self.verbose: print("Epoch {:04d}: Loss {:.4f}" .format(epoch, loss.item()), end='') if y_true is not None: auc = eval_roc_auc(y_true, decision_scores) print(" | AUC {:.4f}".format(auc), end='') print() self.decision_scores_ = decision_scores self._process_decision_scores() return self
[docs] def decision_function(self, G): """ Predict raw anomaly score using the fitted detector. Outliers are assigned with larger anomaly scores. Parameters ---------- G : PyTorch Geometric Data instance (torch_geometric.data.Data) The input data. Returns ------- outlier_scores : numpy.ndarray The anomaly score of shape :math:`N`. """ check_is_fitted(self, ['model']) if G is not None: warnings.warn('The model is transductive only. ' 'Training data is used to predict') outlier_scores = self.decision_scores_ return outlier_scores
def process_graph(self, G): """ Process the raw PyG data object into a tuple of sub data objects needed for the model. Parameters ---------- G : PyTorch Geometric Data instance (torch_geometric.data.Data) The input data. Returns ------- x : torch.Tensor Attribute (feature) of nodes. """ x = G.x.to(self.device) s = G.s.to(self.device) s = torch.max(s, s.T) l = self._comp_laplacian(s) w_init = torch.eye(x.shape[0]).to(self.device) r_init = torch.inverse((1 + self.weight_decay) * torch.eye(x.shape[0]).to(self.device) + self.gamma * l) @ x return x, s, l, w_init, r_init def _loss(self, x, x_, r, l): return torch.norm(x - x_ - r, 2) + \ self.gamma * torch.trace(r.T @ l @ r) def _comp_laplacian(self, adj): d = torch.diag(torch.sum(adj, dim=1)) return d - adj
class Radar_Base(nn.Module): def __init__(self, w, r): super(Radar_Base, self).__init__() self.w = nn.Parameter(w) self.r = nn.Parameter(r) def forward(self, x): return self.w @ x, self.r