Source code for egc.model.node_embedding.agc

"""
AGC Embedding
"""
import scipy.sparse as sp
import torch
from sklearn.cluster import KMeans
from torch import nn

from ...utils.evaluation import evaluation


[docs]class AGCEmbed(nn.Module): """ AGC Embedding """ def __init__( self, adj: torch.sparse.Tensor, feature: torch.Tensor, labels: torch.Tensor, epochs: int = 60, n_clusters: int = 7, rep: int = 10, ): super().__init__() self.A = adj self.feature = feature self.labels = labels self.epochs = epochs self.D = torch.sum(adj.to_dense(), 1) self.n_clusters = n_clusters self.rep = rep self.best_feature = None
[docs] def forward(self): pass
[docs] def fit(self): if torch.cuda.is_available(): self.A = self.A.cuda() self.feature = self.feature.cuda() self.labels = self.labels.cuda() self.D = self.D.cuda() tt = 0 adj_normalized = self.normalize_adj() intra_list = [10000] feature = self.feature while tt <= self.epochs: tt = tt + 1 power = tt intraD = torch.zeros(self.rep) ac = torch.zeros(self.rep) nm = torch.zeros(self.rep) f1 = torch.zeros(self.rep) feature = torch.mm(adj_normalized, feature) u, _, _ = sp.linalg.svds(feature.cpu().numpy(), k=self.n_clusters, which="LM") for i in range(self.rep): kmeans = KMeans(n_clusters=self.n_clusters).fit(u) predict_labels = kmeans.predict(u) predict_labels_tensor = torch.IntTensor(predict_labels) if torch.cuda.is_available(): predict_labels_tensor = predict_labels_tensor.cuda() intraD[i] = self.square_dist(predict_labels_tensor, feature) _, NMI_score, AMI_score, ACC_score, Micro_F1_score, _, _ = evaluation( self.labels.cpu().numpy(), predict_labels) ac[i], nm[i], f1[i] = ACC_score, NMI_score, Micro_F1_score print( f"ACC_score: {ACC_score} , NMI_score: {NMI_score}, AMI_score: {AMI_score}," f"Micro_F1_score: {Micro_F1_score}") intramean = torch.mean(intraD) acc_means = torch.mean(ac) nmi_means = torch.mean(nm) f1_means = torch.mean(f1) intra_list.append(intramean) print( f"power:{power},intra_dist:{intramean}, acc_mean:{acc_means}," f"nmi_mean:{nmi_means},f1_mean:{f1_means}") if intra_list[tt] > intra_list[tt - 1]: print(f"bestpower: {tt - 1}") break self.best_feature = feature
[docs] def normalize_adj(self): In = torch.eye(self.A.shape[0]) if torch.cuda.is_available(): In = In.cuda() A_prime = In + self.A.to_dense() D_prime = self.D + 1 D_inv = torch.diag(torch.pow(D_prime, -0.5)) conv_operator = (torch.mm(torch.mm(D_inv, A_prime), D_inv) + In) / 2 return conv_operator
[docs] def to_onehot(self, prelabel): label = torch.zeros([prelabel.shape[0], self.n_clusters]) for i, v in enumerate(prelabel): label[i, v.item()] = 1 label = label.T return label
[docs] def square_dist(self, prelabel, feature): onehot = self.to_onehot(prelabel) if torch.cuda.is_available(): onehot = onehot.cuda() m, _ = onehot.shape count = onehot.sum(1).reshape(m, 1) count[count == 0] = 1 mean = torch.mm(onehot, feature) / count a2 = (torch.mm(onehot, feature * feature) / count).sum(1) pdist2 = a2 + a2.T - 2 * torch.mm(mean, mean.T) intra_dist = torch.trace(pdist2) inter_dist = pdist2.sum() - intra_dist intra_dist /= m inter_dist /= m * (m - 1) return intra_dist
[docs] def get_embedding(self): u, _, _ = sp.linalg.svds( self.best_feature.cpu().numpy(), k=self.n_clusters, which="LM", ) return torch.FloatTensor(u.copy())