Source code for egc.model.node_embedding.vgae

"""GAE & VGEA

"""
import copy
from typing import Tuple

import dgl
import numpy as np
import scipy.sparse as sp
import torch
import torch.nn.functional as F
from dgl.nn.pytorch.conv import GraphConv
from torch import nn
from torch import optim

from ...module import GCN
from ...module.layers import InnerProductDecoder
from ...utils import init_weights
from ...utils import load_model
from ...utils import normal_reparameterize
from ...utils import normalize_feature
from ...utils import save_model
from ...utils import sparse_mx_to_torch_sparse_tensor
from ...utils import symmetrically_normalize_adj


[docs]class Encoder(nn.Module): """Encoder for VGAE Args: in_features (int): input feature dimension. hidden_units_1 (int): hidden units size of gcn_1. Defaults to 32. hidden_units_2 (int): hidden units size of gcn_2. Defaults to 16. activation (str, optional): activation of gcn layer_1. Defaults to 'relu'. """ def __init__( self, in_features: int, hidden_units_1: int = 32, hidden_units_2: int = 16, activation: str = "relu", ): super().__init__() self.in_features = in_features self.gcn_feat = GCN(in_features, hidden_units_1, activation) self.gcn_mu = GCN(hidden_units_1, hidden_units_2, lambda x: x) self.gcn_sigma = GCN(hidden_units_1, hidden_units_2, lambda x: x)
[docs] def forward( self, features_norm: torch.Tensor, adj_norm: torch.Tensor, ) -> Tuple[torch.Tensor]: """forward Args: features_norm (torch.Tensor): features_norm adj_norm (torch.Tensor): adj_norm Returns: Tuple[torch.Tensor]: (mu, log_sigma, feat_hidden) """ feat, _ = self.gcn_feat(features_norm, adj_norm) mu, _ = self.gcn_mu(feat, adj_norm) log_sigma, _ = self.gcn_sigma(feat, adj_norm) return mu, log_sigma, feat
# pylint: disable=no-self-use
[docs]class Decoder(nn.Module): """Decoder for VGAE"""
[docs] def forward( self, mu: torch.Tensor, log_sigma: torch.Tensor, training: bool = True, ) -> torch.Tensor: """Decoder Args: mu (torch.Tensor): mu log_sigma (torch.Tensor): log_sigma training (bool): isTraining Returns: (torch.Tensor): A_hat """ z = normal_reparameterize(mu, log_sigma, training) return torch.mm(torch.squeeze(z, 0), torch.squeeze(z, 0).t())
[docs]class VGAE(nn.Module): """VGAE Args: in_features (int): input feature dimension. hidden_units_1 (int): hidden units size of gcn_1. Defaults to 32. hidden_units_2 (int): hidden units size of gcn_2. Defaults to 16. n_epochs (int, optional): number of embedding training epochs. Defaults to 200. early_stopping_epoch (int, optional): early stopping threshold. Defaults to 20. lr (float, optional): learning rate.. Defaults to 0.001. l2_coef (float, optional): weight decay. Defaults to 0.0. activation (str, optional): activation of gcn layer_1. Defaults to 'relu'. model_filename (str, optional): path to save best model parameters. Defaults to `vgae`. """ def __init__( self, in_features: int, hidden_units_1: int = 32, hidden_units_2: int = 16, n_epochs: int = 200, early_stopping_epoch: int = 20, lr: float = 0.01, l2_coef: float = 0.0, activation: str = "relu", model_filename: str = "vgae", ) -> None: super().__init__() self.n_epochs = n_epochs self.early_stopping_epoch = early_stopping_epoch self.model_filename = model_filename self.encoder = Encoder( in_features, hidden_units_1, hidden_units_2, activation, ) self.decoder = Decoder() self.optimizer = torch.optim.Adam( self.parameters(), lr=lr, weight_decay=l2_coef, ) self.features_norm = None self.adj_norm = None self.adj_label = None self.n_nodes = None self.pos_weight = None self.norm = None for module in self.modules(): init_weights(module) def _calculate_loss( self, adj_hat: torch.Tensor, mu: torch.Tensor, log_sigma: torch.Tensor, ) -> torch.Tensor: """Calculation Loss Args: adj_hat (torch.Tensor): reconstructed adj mu (torch.Tensor): mu log_sigma (torch.Tensor): logsigma Returns: torch.Tensor: loss """ reconstruct_loss = self.norm * torch.mean( torch.binary_cross_entropy_with_logits( adj_hat, self.adj_label, pos_weight=self.pos_weight)) kl_divergence = (-0.5 / self.n_nodes * torch.mean( torch.sum( 1 + 2 * log_sigma - torch.pow(mu, 2) - torch.exp(2 * log_sigma), 1))) return reconstruct_loss + kl_divergence
[docs] def forward(self): """forward Returns: loss (torch.Tensor): loss """ mu, log_sigma, _ = self.encoder(self.features_norm, self.adj_norm) adj_hat = self.decoder(mu, log_sigma, self.training) # NOTE: BUG here when `squeeze(0)` left out as loss will be less after `torch.mean` return self._calculate_loss(adj_hat, mu.squeeze(0), log_sigma.squeeze(0))
[docs] def fit( self, features: sp.lil_matrix, adj_orig: sp.csr_matrix, ) -> None: """fit Args: features (sp.lil_matrix): 2D sparse features. adj_orig (sp.csr_matrix): 2D sparse adj. """ self.features_norm = torch.FloatTensor( normalize_feature(features)[np.newaxis]) self.adj_label = adj_orig + sp.eye(adj_orig.shape[0]) self.adj_norm = sparse_mx_to_torch_sparse_tensor( symmetrically_normalize_adj(self.adj_label)) self.n_nodes = adj_orig.shape[0] adj_sum = adj_orig.sum() self.norm = self.n_nodes * self.n_nodes / float( 2 * (self.n_nodes * self.n_nodes - adj_sum)) self.pos_weight = float(self.n_nodes * self.n_nodes - adj_sum) / adj_sum if torch.cuda.is_available(): print("GPU available: VGAE Embedding Using CUDA") self.cuda() self.features_norm = self.features_norm.cuda() self.adj_norm = self.adj_norm.cuda() self.adj_label = torch.FloatTensor(self.adj_label.todense()).cuda() self.pos_weight = torch.FloatTensor([self.pos_weight]).cuda() best = 1e9 cnt_wait = 0 for epoch in range(self.n_epochs): self.train() loss_epoch = 0 self.optimizer.zero_grad() loss = self.forward() loss.backward() self.optimizer.step() loss_epoch = loss_epoch + loss.item() print(f"Epoch:{epoch+1} Loss:{loss}") if loss < best: best = loss cnt_wait = 0 save_model( self.model_filename, self, self.optimizer, epoch, loss_epoch, ) else: cnt_wait += 1 if cnt_wait == self.early_stopping_epoch: print("Early stopping!") break
[docs] def get_embedding( self, model_filename: str = None, ) -> torch.Tensor: """Get the embeddings (graph or node level). Args: model_filename (str, optional): Model file to load. Defaults to None. Returns: (torch.Tensor): embedding. """ self, _, _, _ = load_model( model_filename if model_filename is not None else self.model_filename, self, self.optimizer, ) mu, _, _ = self.encoder(self.features_norm, self.adj_norm) return mu.detach()
[docs]def loss_function(preds, labels, mu, logvar, n_nodes, norm, pos_weight): pos_weight = torch.FloatTensor([pos_weight]) cost = norm * F.binary_cross_entropy_with_logits( preds, labels, pos_weight=pos_weight, ) # see Appendix B from VAE paper: # Kingma and Welling. Auto-Encoding Variational Bayes. ICLR, 2014 # https://arxiv.org/abs/1312.6114 # 0.5 * sum(1 + log(sigma^2) - mu^2 - sigma^2) KLD = (-0.5 / n_nodes * torch.mean( torch.sum(1 + 2 * logvar - mu.pow(2) - logvar.exp().pow(2), 1))) return cost + KLD
# -------- DGL_VGEA ------
[docs]class DGL_VGAE(nn.Module): """DGL_VGAE Args: epochs (int, optional): number of embedding training epochs. Defaults to 200. n_clusters (int): cluster num. fead_dim (int): dim of features n_nodes (int): number of nodes hidden_dim1 (int): hidden units size of gcn_1. Defaults to 32. hidden_dim2 (int): hidden units size of gcn_2. Defaults to 16. dropout (int, optional): Dropout rate (1 - keep probability). lr (float, optional): learning rate.. Defaults to 0.001. early_stop (int, optional): early stopping threshold. Defaults to 10. activation (str, optional): activation of gcn layer_1. Defaults to 'relu'. """ def __init__( self, epochs: int, n_clusters: int, fead_dim: int, n_nodes: int, hidden_dim1: int = 32, hidden_dim2: int = 16, dropout: float = 0.0, lr: float = 0.01, early_stop: int = 10, activation: str = "relu", ): super().__init__() # ---------------Parameters--------------- self.epochs = epochs self.n_clusters = n_clusters self.n_nodes = n_nodes self.lr = lr self.estop_steps = early_stop if activation == "prelu": self.activation = nn.PReLU() elif activation == "relu": self.activation = nn.ReLU() else: self.activation = activation # self.embedding = None # self.memberships = None self.features = None self.adj_orig_graph = None self.norm = None self.pos_weight = None self.best_model = None self.device = None # ----------------Layers--------------- self.gc1 = GraphConv(fead_dim, hidden_dim1, activation=self.activation) self.gc2 = GraphConv(hidden_dim1, hidden_dim2) self.gc3 = GraphConv(hidden_dim1, hidden_dim2) # must have activation self.dc = InnerProductDecoder(dropout, act=lambda x: x)
[docs] def encode(self, g, feat): """Encoder for VGAE Args: g (dgl.DGLGraph): Graph data in dgl feat (torch.Tensor): node's features Returns: self.gc2(g, hidden1) (torch.Tensor):latent mean self.gc3(g, hidden1) (torch.Tensor):latent log variance """ hidden1 = self.gc1(g, feat) return self.gc2(g, hidden1), self.gc3(g, hidden1)
[docs] def reparameterize(self, mu, logvar): """reparameterization trick Args: mu: (torch.Tensor):latent mean logvar: (torch.Tensor):latent log variance Returns: mu: (torch.Tensor):latent mean after reparameterization trick """ if self.training: std = torch.exp(logvar) eps = torch.randn_like(std) return eps.mul(std).add_(mu) return mu
[docs] def forward(self): """Forward Propagation Returns: self.dc(z): Reconstructed adj matrix mu: (torch.Tensor):latent mean logvar: (torch.Tensor):latent log variance """ mu, logvar = self.encode(self.adj_orig_graph, self.features) z = self.reparameterize(mu, logvar) return self.dc(z), mu, logvar
# pylint: disable=too-many-locals
[docs] def fit(self, adj_csr, features): """Fitting a VGAE model Args: adj_csr (sp.lil_matrix): 2D sparse features. features (torch.Tensor): node's features """ # ------------------Data-------------- self.features = features # remove diagonal entries adj_orig = adj_csr - sp.dia_matrix( (adj_csr.diagonal()[np.newaxis, :], [0]), shape=adj_csr.shape) adj_orig.eliminate_zeros() adj_label = adj_orig + sp.eye(adj_orig.shape[0]) adj_label = torch.FloatTensor(adj_label.toarray()) adj_orig = adj_orig + sp.eye(adj_orig.shape[0]) self.adj_orig_graph = dgl.from_scipy(adj_orig) self.pos_weight = float(adj_csr.shape[0] * adj_csr.shape[0] - adj_csr.sum()) / adj_csr.sum() self.norm = (adj_csr.shape[0] * adj_csr.shape[0] / float( (adj_csr.shape[0] * adj_csr.shape[0] - adj_csr.sum()) * 2)) best_loss = 1e9 cnt = 0 best_epoch = 0 optimizer = optim.Adam(self.parameters(), lr=self.lr) if torch.cuda.is_available(): self.device = torch.cuda.current_device() print(f"GPU available: VGAE Embedding Using {self.device}") self.cuda() self.adj_orig_graph = self.adj_orig_graph.to(self.device) self.features = self.features.cuda() else: self.device = torch.device("cpu") for epoch in range(self.epochs): self.train() optimizer.zero_grad() recovered, mu, logvar = self.forward() loss = loss_function( preds=recovered.cpu(), labels=adj_label, mu=mu.cpu(), logvar=logvar.cpu(), n_nodes=self.n_nodes, norm=self.norm, pos_weight=self.pos_weight, ) loss.backward() cur_loss = loss.item() optimizer.step() # kmeans = KMeans(n_clusters=self.n_clusters, # n_init=20).fit(mu.data.cpu().numpy()) # ( # ARI_score, # NMI_score, # ACC_score, # Micro_F1_score, # Macro_F1_score, # ) = evaluation(label, kmeans.labels_) print( f"Epoch_{epoch}", # f":ARI {ARI_score:.4f}", # f", NMI {NMI_score:.4f}", # f", ACC {ACC_score:.4f}", # f", Micro_F1 {Micro_F1_score:.4f}", # f", Macro_F1 {Macro_F1_score:.4f}", f", Loss {cur_loss}", ) # if epoch < 10: # continue # early stopping if cur_loss < best_loss: cnt = 0 best_epoch = epoch best_loss = cur_loss del self.best_model self.best_model = copy.deepcopy(self.to(self.device)) # self.embedding = mu.data.cpu().numpy() # self.memberships = kmeans.labels_ else: cnt += 1 print(f"loss increase count:{cnt}") if cnt >= self.estop_steps: print(f"early stopping,best epoch:{best_epoch}") break print("Optimization Finished!")
[docs] def get_embedding(self): """Get cluster embedding. Returns:numpy.ndarray """ _, mu, _ = self.best_model() return mu.detach()
[docs] def get_memberships(self): """Get cluster membership. Returns:numpy.ndarray """ return self.memberships