Source code for pygod.models.gaan

# -*- coding: utf-8 -*-
"""Generative Adversarial Attributed Network Anomaly Detection (GAAN)"""
# Author: Ruitong Zhang <rtzhang@buaa.edu.cn>
# License: BSD 2 clause

import numpy as np
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch_geometric.utils import to_dense_adj
from torch_geometric.loader import NeighborLoader
from sklearn.utils.validation import check_is_fitted

from . import BaseDetector
from ..utils.utility import validate_device
from ..utils.metric import eval_roc_auc


[docs]class GAAN(BaseDetector): """ GAAN (Generative Adversarial Attributed Network Anomaly Detection) GAAN is a generative adversarial attribute network anomaly detection framework, including a generator module, an encoder module, a discriminator module, and uses anomaly evaluation measures that consider sample reconstruction error and real sample recognition confidence to make predictions. See :cite:`chen2020generative` for details. Parameters ---------- noise_dim : int, optional Dimension of the Gaussian random noise. Defaults: ``32``. latent_dim : int, optional Dimension of the latent space. Defaults: ``32``. hid_dim1 : int, optional Hidden dimension of MLP later 1. Defaults: ``32``. hid_dim2 : int, optional Hidden dimension of MLP later 2. Defaults: ``64``. hid_dim3 : int, optional Hidden dimension of MLP later 3. Defaults: ``128``. num_layers : int, optional Total number of layers in model. Defaults: ``3``. dropout : float, optional Dropout rate. Defaults: ``0.``. weight_decay : float, optional Weight decay (L2 penalty). Defaults: ``0.``. act : callable activation function or None, optional Activation function if not None. Defaults: ``torch.nn.functional.relu``. alpha : float, optional loss balance weight for attribute and structure. Defaults: ``0.2``. contamination : float, optional Valid in (0., 0.5). The proportion of outliers in the data set. Used when fitting to define the threshold on the decision function. Defaults: ``0.05``. lr : float, optional Learning rate. Defaults: ``0.005``. epoch : int, optional Maximum number of training epoch. Defaults: ``10``. gpu : int GPU Index, -1 for using CPU. Defaults: ``-1``. batch_size : int, optional Minibatch size, 0 for full batch training. Default: ``0``. num_neigh : int, optional Number of neighbors in sampling, -1 for all neighbors. Default: ``-1``. verbose : bool Verbosity mode. Turn on to print out log information. Defaults: ``False``. Examples -------- >>> from pygod.models import GAAN >>> model = GAAN() >>> model.fit(data) # PyG graph data object >>> prediction = model.predict(data) """ def __init__(self, noise_dim=32, latent_dim=32, hid_dim1=32, hid_dim2=64, hid_dim3=128, num_layers=2, dropout=0.3, weight_decay=0., act=F.relu, alpha=0.2, contamination=0.1, lr=5e-3, epoch=10, gpu=0, batch_size=0, num_neigh=-1, verbose=False): super(GAAN, self).__init__(contamination=contamination) # model param self.noise_dim = noise_dim self.latent_dim = latent_dim self.hid_dim1 = hid_dim1 self.hid_dim2 = hid_dim2 self.hid_dim3 = hid_dim3 self.num_layers = num_layers self.dropout = dropout self.weight_decay = weight_decay self.act = act self.alpha = alpha # training param self.lr = lr self.epoch = epoch self.device = validate_device(gpu) self.batch_size = batch_size self.num_neigh = num_neigh # other param self.verbose = verbose # model self.generator = None self.encoder = None self.discriminator = None
[docs] def fit(self, G, y_true=None): """ Fit detector with input data. Parameters ---------- G : torch_geometric.data.Data The input data. y_true : numpy.ndarray, optional The optional outlier ground truth labels used to monitor the training progress. They are not used to optimize the unsupervised model. Default: ``None``. Returns ------- self : object Fitted estimator. """ #X, edge_index = self.process_graph(G) G.node_idx = torch.arange(G.x.shape[0]) G.s = to_dense_adj(G.edge_index)[0] if self.batch_size == 0: self.batch_size = G.x.shape[0] loader = NeighborLoader(G, [self.num_neigh] * self.num_layers, batch_size=self.batch_size) # initialize the model self.generator = Generator(in_dim=self.noise_dim, hid_dim1=self.hid_dim1, hid_dim2=self.hid_dim2, hid_dim3=self.hid_dim3, out_dim=G.x.shape[1], act=self.act).to(self.device) self.encoder = Encoder(in_dim=G.x.shape[1], hid_dim1=self.hid_dim1, hid_dim2=self.hid_dim2, hid_dim3=self.hid_dim3, out_dim=self.latent_dim, act=self.act).to(self.device) self.discriminator = Discriminator().to(self.device) # initialize the optimizer optimizer_GE = torch.optim.Adam( params=list(self.generator.parameters()) + list( self.encoder.parameters()), lr=self.lr, weight_decay=self.weight_decay) optimizer_D = torch.optim.Adam(params=self.discriminator.parameters(), weight_decay=self.weight_decay) # enable the training mode self.generator.train() self.encoder.train() self.discriminator.train() # initialize the criterion criterion = torch.nn.BCELoss() scores = np.zeros(G.x.shape[0]) for epoch in range(self.epoch): epoch_loss_D, epoch_loss_GE = 0, 0 for sampled_data in loader: batch_size = sampled_data.batch_size node_idx = sampled_data.node_idx X, edge_index = self.process_graph(sampled_data) # Generate noise for constructing fake attribute gaussian_noise = torch.randn(X.shape[0], self.noise_dim).to( self.device) optimizer_D.zero_grad() optimizer_GE.zero_grad() # train the model X_, Y_true_pre, Y_fake_pre = self.train_model( X, gaussian_noise, edge_index) edge_index_i = ( edge_index[0] < batch_size).nonzero().to( self.device) X, X_ = X[:batch_size], X_[:batch_size] Y_true_pre = torch.reshape(Y_true_pre, [Y_true_pre.shape[0]])[edge_index_i] Y_fake_pre = torch.reshape(Y_fake_pre, [Y_fake_pre.shape[0]])[edge_index_i] edge_index = torch.index_select( edge_index, 1, torch.reshape( edge_index_i, [ edge_index_i.shape[0]])) # get loss loss_D, loss_GE = self.loss_function(X, X_, Y_true_pre, Y_fake_pre, edge_index, criterion) epoch_loss_D += loss_D.item() * batch_size epoch_loss_GE += loss_GE.item() * batch_size loss_D.backward(retain_graph=True) loss_GE.backward() optimizer_D.step() optimizer_GE.step() score = self.score_function(X, X_, Y_true_pre, Y_fake_pre, edge_index, criterion) scores[node_idx[:batch_size]] = score.detach().cpu().numpy() # print out log information if self.verbose: print( "Epoch {:04d}: Loss GE {:.4f} | Loss D {:.4f}" .format(epoch, epoch_loss_GE / G.x.shape[0], epoch_loss_D / G.x.shape[0]), end='') if y_true is not None: auc = eval_roc_auc(y_true, scores) print(" | AUC {:.4f}".format(auc)) self.decision_scores_ = scores self._process_decision_scores() return self
[docs] def decision_function(self, G): """ Predict raw anomaly score using the fitted detector. Outliers are assigned with larger anomaly scores. Parameters ---------- G : PyTorch Geometric Data instance (torch_geometric.data.Data) The input data. Returns ------- outlier_scores : numpy.ndarray The anomaly score of shape :math:`N`. """ check_is_fitted(self, ['generator', 'encoder', 'discriminator']) # get needed data object from the input data G.node_idx = torch.arange(G.x.shape[0]) G.s = to_dense_adj(G.edge_index)[0] loader = NeighborLoader(G, [self.num_neigh] * self.num_layers, batch_size=self.batch_size) # enable the evaluation mode self.generator.eval() self.encoder.eval() self.discriminator.eval() criterion = torch.nn.BCELoss() scores = np.zeros(G.x.shape[0]) for sampled_data in loader: batch_size = sampled_data.batch_size node_idx = sampled_data.node_idx X, edge_index = self.process_graph(sampled_data) # construct the vector for holding the reconstruction error gaussian_noise = torch.randn(X.shape[0], self.noise_dim).to( self.device) X_, Y_true_pre, Y_fake_pre = self.train_model(X, gaussian_noise, edge_index) edge_index_i = ( edge_index[0] < batch_size).nonzero().to( self.device) X, X_ = X[:batch_size], X_[:batch_size] Y_true_pre = torch.reshape(Y_true_pre, [Y_true_pre.shape[0]])[edge_index_i] Y_fake_pre = torch.reshape(Y_fake_pre, [Y_fake_pre.shape[0]])[edge_index_i] edge_index = torch.index_select( edge_index, 1, torch.reshape( edge_index_i, [ edge_index_i.shape[0]])) score = self.score_function(X, X_, Y_true_pre, Y_fake_pre, edge_index, criterion) scores[node_idx[:batch_size]] = score.detach().cpu().numpy() return scores
def train_model(self, X, gaussian_noise, edge_index): """ Complete the entire process from noise to generator, to encoder, and finally to discriminator. Parameters ---------- X : torch.Tensor Attribute (feature) of nodes. gaussian_noise : torch.Tensor Gaussian noise for generator. edge_index : torch.Tensor Edge list of the graph. Returns ------- X_ : torch.Tensor Fake attribute (feature) of nodes. Y_true_pre : torch.Tensor Labels predicted from the ture attribute. Y_fake_pre_ : torch.Tensor Labels predicted from the fake attribute. """ # get fake attribute matrix X_ = self.generator(gaussian_noise) # get latent embedding matrix Z = self.encoder(X) Z_ = self.encoder(X_) # get link probability matrix Y_true_pre = self.discriminator(Z, edge_index) Y_fake_pre = self.discriminator(Z_, edge_index) return X_, Y_true_pre, Y_fake_pre def loss_function(self, X, X_, Y_true_pre, Y_fake_pre, edge_index, criterion): """ Obtain the generator and discriminator losses separately. Parameters ---------- X : torch.Tensor Attribute (feature) of nodes. X_ : torch.Tensor Fake attribute (feature) of nodes. Y_true_pre : torch.Tensor Labels predicted from the ture attribute. Y_fake_pre : torch.Tensor Labels predicted from the fake attribute. edge_index : torch.Tensor Edge list of the graph. criterion : torch.nn.modules.loss.BCELoss Edge list of the graph. Returns ------- loss_D : torch.Tensor Generator loss. loss_GE : torch.Tensor Discriminator loss. """ # attribute reconstruction loss diff_attribute = torch.pow(X - X_, 2) attribute_errors = torch.sqrt(torch.sum(diff_attribute, 1)) # structure reconstruction loss Y_true = torch.ones((edge_index.shape[1]), 1).to(self.device) Y_fake = torch.zeros((edge_index.shape[1]), 1).to(self.device) structure_errors = criterion(Y_true_pre, Y_true) + criterion( Y_fake_pre, Y_fake) loss_D = structure_errors loss_GE = torch.mean(attribute_errors) return loss_D, loss_GE def score_function(self, X, X_, Y_true_pre, Y_fake_pre, edge_index, criterion): """ Get anomaly score after the model training by weighted context reconstruction loss and structure discriminator loss. Parameters ---------- X : torch.Tensor Attribute (feature) of nodes. X_ : torch.Tensor Fake attribute (feature) of nodes. Y_true_pre : torch.Tensor Labels predicted from the ture attribute. Y_fake_pre : torch.Tensor Labels predicted from the fake attribute. edge_index : torch.Tensor Edge list of the graph. criterion : torch.nn.modules.loss.BCELoss Edge list of the graph. Returns ------- score : torch.Tensor Anomaly score. """ # attribute reconstruction score diff_attribute = torch.pow(X - X_, 2) attribute_errors = torch.sqrt(torch.sum(diff_attribute, 1)) # structure reconstruction score structure_errors = torch.zeros((X.shape[0])).to(self.device) for i in range(X.shape[0]): edge_index_i = (edge_index[0] == i).nonzero().to(self.device) Y_true = torch.ones((edge_index_i.shape[0]), 1).to(self.device) Y_fake = torch.zeros((edge_index_i.shape[0]), 1).to(self.device) Y_true_pre_i = torch.reshape(Y_true_pre, [Y_true_pre.shape[0]])[ edge_index_i] Y_fake_pre_i = torch.reshape(Y_fake_pre, [Y_fake_pre.shape[0]])[ edge_index_i] structure_errors[i] = criterion(Y_true_pre_i, Y_true) + criterion( Y_fake_pre_i, Y_fake) score = self.alpha * attribute_errors + ( 1 - self.alpha) * structure_errors return score def process_graph(self, G): """ Process the raw PyG data object into a tuple of sub data objects needed for the model. Parameters ---------- G : PyTorch Geometric Data instance (torch_geometric.data.Data) The input data. Returns ------- X : torch.Tensor Attribute (feature) of nodes. edge_index : torch.Tensor Edge list of the graph. """ # data objects needed for the network edge_index = G.edge_index.to(self.device) X = G.x.to(self.device) return X, edge_index
class Generator(nn.Module): def __init__(self, in_dim, hid_dim1, hid_dim2, hid_dim3, out_dim, act): super(Generator, self).__init__() # three layer MLP self.fc1 = nn.Linear(in_dim, hid_dim1) self.fc2 = nn.Linear(hid_dim1, hid_dim2) self.fc3 = nn.Linear(hid_dim2, hid_dim3) self.fc4 = nn.Linear(hid_dim3, out_dim) self.act = act def forward(self, noise): # input the low_dimensional prior Gaussian noise hidden1 = self.act(self.fc1(noise)) hidden2 = self.act(self.fc2(hidden1)) hidden3 = self.act(self.fc3(hidden2)) # output the fake attribute matrix X_ = self.act(self.fc4(hidden3)) return X_ class Encoder(nn.Module): def __init__(self, in_dim, hid_dim1, hid_dim2, hid_dim3, out_dim, act): super(Encoder, self).__init__() # three layer MLP self.fc1 = nn.Linear(in_dim, hid_dim1) self.fc2 = nn.Linear(hid_dim1, hid_dim2) self.fc3 = nn.Linear(hid_dim2, hid_dim3) self.fc4 = nn.Linear(hid_dim3, out_dim) self.act = act def forward(self, X): # input the original attribute matrix or the fake attribute matrix hidden1 = self.act(self.fc1(X)) hidden2 = self.act(self.fc2(hidden1)) hidden3 = self.act(self.fc3(hidden2)) # output the low_dimensional latent embedding matrix of attribute # matrix Z = self.act(self.fc4(hidden3)) return Z class Discriminator(nn.Module): def __init__(self): super(Discriminator, self).__init__() # the activation function of binary classifier self.fc1 = nn.Linear(1, 1) self.act = torch.sigmoid def forward(self, Z, edge_index): # dot product of the embedding output dot_product = Z.mm(Z.t()) edge_prob = torch.reshape(dot_product[edge_index[0], edge_index[1]], [edge_index.shape[1], 1]) Y = self.act(self.fc1(edge_prob)) return Y