Module KGEkeras.utils

Expand source code
import numpy as np
from tqdm import tqdm
from scipy.stats import rankdata
from random import choice
from collections import defaultdict

from tensorflow.keras.callbacks import Callback
from tensorflow.keras.losses import binary_crossentropy
import tensorflow as tf
from random import choices
EPSILON = 1e-6

def load_kg(path):
    out = []
    with open(path,'r') as f:
        for l in f:
            l = l.strip().split()
            out.append(l)
    return out

def generate_negative(kg, N, negative=2, check_kg=False, corrupt_head=True, corrupt_tail=True):
    # false triples:
    assert corrupt_head or corrupt_tail
    R = np.repeat(np.asarray([p for _,p,_ in kg]).reshape((-1,1)),negative,axis=0)
    fs = np.random.randint(0,N,size=(negative*len(kg),1))  
    fo = np.random.randint(0,N,size=(negative*len(kg),1))  
    negative_kg = np.stack([fs,R,fo],axis=1)
    return negative_kg

def oversample_data(kgs,x=None,y=None,testing=False):
    if testing:
        kgs = [list(kg)[:len(y)] for kg in kgs]
    else:
        kgs = [list(kg) for kg in kgs]
        
    if y is not None:
        m = max(max(map(len,kgs)),len(y))
    else:
        m = max(map(len,kgs))
    
    out = []
    for kg in kgs:
        out.append(choices(kg, k=m))
    
    if x is not None and y is not None:
        k = np.ceil(m/len(y))
        y = np.repeat(y,k,axis=0)[:m]
        x = np.repeat(x,k,axis=0)[:m,:]
        for s in np.split(x,3,axis=1):
            out.append(s.reshape((-1,)))
        return [np.squeeze(np.asarray(o)) for o in out], np.asarray(y)
    
    else:
        return [np.squeeze(np.asarray(o)) for o in out]

def pad(kg,bs):
    kg = list(kg)
    while len(kg) % bs != 0:
        kg.append(choice(kg))
    return np.asarray(kg)
        
def mrr(target, scores):
    scores = sorted(scores, key=lambda x: x[1], reverse=True)
    labels = [x for x,_ in scores]
    return 1/(1+labels.index(target))

def hits(target, scores, k=10):
    scores = sorted(scores, key=lambda x: x[1], reverse=True)
    labels = [x for x,_ in scores][:k]
    return int(target in labels)

def gen_tail_data(test_data,num_entities,bs,filter_t):
    
    for s,p,o in test_data:
        candiate_objects = list(range(num_entities))
        candiate_objects.remove(o)
        for oi in filter_t[(s,p)]:
            candiate_objects.remove(oi)
                    
        subjects = np.asarray([[int(s)]]*(len(candiate_objects)+1))
        predicates = np.asarray([[int(p)]]*(len(candiate_objects)+1))
        objects = np.asarray([[int(o)]] + [[ent_id] for ent_id in candiate_objects])
        
        triples = np.concatenate((subjects,predicates,objects),axis=-1)
        
        yield triples.reshape((-1,3))
        
def gen_head_data(test_data,num_entities,bs,filter_h):
    
    for s,p,o in test_data:
        candiate_subjects = list(range(num_entities))
        candiate_subjects.remove(s)
        
        for si in filter_h[(p,o)]:
            candiate_subjects.remove(si)
                    
        objects = np.asarray([[int(o)]]*(len(candiate_subjects)+1))
        predicates = np.asarray([[int(p)]]*(len(candiate_subjects)+1))
        subjects = np.asarray([[int(s)]] + [[ent_id] for ent_id in candiate_subjects])
        
        triples = np.concatenate((subjects,predicates,objects),axis=-1)
        
        yield triples.reshape((-1,3))
        
        
def validate(model, test_data, num_entities, bs, filtering_triples = None):
    
    filter_h = defaultdict(set)
    filter_t = defaultdict(set)
    for s,p,o in filtering_triples:
        filter_h[(p,o)].add(s)
        filter_t[(s,p)].add(o)
    
    c_1, c_3, c_10 = 0,0,0
    mean_ranks = []

    for t in tqdm(gen_tail_data(test_data,num_entities,bs,filter_t),total=len(test_data)):
        res = np.asarray(model.predict(t)).reshape((-1,))
        r = rankdata(res,'max')
        target_rank = r[0]
        num_candidate = len(res)
        real_rank = num_candidate - target_rank + 1
        c_1 += 1 if target_rank == num_candidate else 0
        c_3 += 1 if target_rank + 3 > num_candidate else 0
        c_10 += 1 if target_rank + 10 > num_candidate else 0
        mean_ranks.append(real_rank)
        
    tail_hit_at_1 = c_1 / float(len(test_data))
    tail_hit_at_3 = c_3 / float(len(test_data))
    tail_hit_at_10 = c_10 / float(len(test_data))
    tail_avg_rank = np.mean(mean_ranks)
    tail_mrr = np.mean([1/m for m in mean_ranks])
    
    c_1, c_3, c_10 = 0,0,0
    mean_ranks = []
    
    for t in tqdm(gen_head_data(test_data,num_entities,bs,filter_h),total=len(test_data)):
        res = np.asarray(model.predict(t)).reshape((-1,))
        r = rankdata(res,'max')
        target_rank = r[0]
        num_candidate = len(res)
        real_rank = num_candidate - target_rank + 1
        c_1 += 1 if target_rank == num_candidate else 0
        c_3 += 1 if target_rank + 3 > num_candidate else 0
        c_10 += 1 if target_rank + 10 > num_candidate else 0
        mean_ranks.append(real_rank)
        
    head_hit_at_1 = c_1 / float(len(test_data))
    head_hit_at_3 = c_3 / float(len(test_data))
    head_hit_at_10 = c_10 / float(len(test_data))
    head_avg_rank = np.mean(mean_ranks)
    head_mrr = np.mean([1/m for m in mean_ranks])
        
    metrics = {'tail_hits@1':tail_hit_at_1,
               'tail_hits@3':tail_hit_at_3,
               'tail_hits@10':tail_hit_at_10,
               'tail_mr':tail_avg_rank,
               'tail_mrr':tail_mrr,
               'head_hits@1':head_hit_at_1,
               'head_hits@3':head_hit_at_3,
               'head_hits@10':head_hit_at_10,
               'head_mr':head_avg_rank,
               'head_mrr':head_mrr,
               'hits@1':(tail_hit_at_1+head_hit_at_1)/2,
               'hits@3':(tail_hit_at_3+head_hit_at_3)/2,
               'hits@10':(tail_hit_at_10+head_hit_at_10)/2,
               'mr':(tail_avg_rank+head_avg_rank)/2,
               'mrr':(tail_mrr+head_mrr)/2,
               }
    
    return metrics

        
class KGEValidateCallback(Callback):
    def __init__(self, validation_data, train_data=None, *args, **kwargs):
        super(Callback, self).__init__(*args, **kwargs)
        self.validation_data = validation_data
        self.train_data = train_data
        
    def on_epoch_end(self, epoch, logs = None):
        if epoch % 5 == 0:
            logs = logs or {}
            tmp = validate(self.model, 
                            self.validation_data,
                            self.model.num_entities,
                            self.train_data)
                
            for k in tmp:
                logs['val_'+k] = tmp[k]
                
    def on_train_end(self, logs=None):
        self.on_epoch_end(100,logs=logs)
        

def pointwize_hinge(true,false,margin=1,negative_samples=1, reduce_mean = True):
    return tf.reduce_mean(tf.nn.relu(margin-true))+tf.reduce_mean(tf.nn.relu(margin+false))

def pointwize_logistic(true,false,margin=1,negative_samples=1, reduce_mean = True):
    return tf.reduce_mean(tf.math.log(EPSILON+1+tf.math.exp(-true)))+tf.reduce_mean(tf.math.log(EPSILON+1+tf.math.exp(false)))

def pointwize_square_loss(true,false,margin=1,negative_samples=1, reduce_mean = True):
    return tf.reduce_mean(tf.square(margin-true))+tf.reduce_mean(tf.square(margin+false))

def pointwize_cross_entropy(true,false,margin=1,negative_samples=1, reduce_mean = True):
    return binary_crossentropy(1,true)+binary_crossentropy(0,false)

def pairwize_hinge(true,false,margin=1, negative_samples=1, reduce_mean = True):
    false = tf.reshape(false,(-1,negative_samples))
    tmp = tf.nn.relu(margin+false-true)
    if reduce_mean:
        return tf.reduce_mean(tmp)
    return tmp

def pairwize_logistic(true,false,margin=0, negative_samples=1, reduce_mean = True):
    false = tf.reshape(false,(-1,negative_samples))
    tmp = tf.math.log(EPSILON+1+tf.math.exp(false-true))
    if reduce_mean:
        return tf.reduce_mean(tmp) 
    return tmp

def pairwize_square_loss(true,false,margin=0, negative_samples=1, reduce_mean = True):
    false = tf.reshape(false,(-1,negative_samples))
    tmp = - tf.square(false-true)
    if reduce_mean:
        return tf.reduce_mean(tmp)
    return tmp

def loss_function_lookup(name):
    return {
    'pointwize_hinge':pointwize_hinge,
    'pointwize_logistic':pointwize_logistic,
    'pointwize_cross_entropy':pointwize_cross_entropy,
    'pointwize_square_loss':pointwize_square_loss,
    'pairwize_hinge':pairwize_hinge,
    'pairwize_logistic':pairwize_logistic,
    'pairwize_square_loss':pairwize_square_loss
    }[name]

Functions

def gen_head_data(test_data, num_entities, bs, filter_h)
Expand source code
def gen_head_data(test_data,num_entities,bs,filter_h):
    
    for s,p,o in test_data:
        candiate_subjects = list(range(num_entities))
        candiate_subjects.remove(s)
        
        for si in filter_h[(p,o)]:
            candiate_subjects.remove(si)
                    
        objects = np.asarray([[int(o)]]*(len(candiate_subjects)+1))
        predicates = np.asarray([[int(p)]]*(len(candiate_subjects)+1))
        subjects = np.asarray([[int(s)]] + [[ent_id] for ent_id in candiate_subjects])
        
        triples = np.concatenate((subjects,predicates,objects),axis=-1)
        
        yield triples.reshape((-1,3))
def gen_tail_data(test_data, num_entities, bs, filter_t)
Expand source code
def gen_tail_data(test_data,num_entities,bs,filter_t):
    
    for s,p,o in test_data:
        candiate_objects = list(range(num_entities))
        candiate_objects.remove(o)
        for oi in filter_t[(s,p)]:
            candiate_objects.remove(oi)
                    
        subjects = np.asarray([[int(s)]]*(len(candiate_objects)+1))
        predicates = np.asarray([[int(p)]]*(len(candiate_objects)+1))
        objects = np.asarray([[int(o)]] + [[ent_id] for ent_id in candiate_objects])
        
        triples = np.concatenate((subjects,predicates,objects),axis=-1)
        
        yield triples.reshape((-1,3))
def generate_negative(kg, N, negative=2, check_kg=False, corrupt_head=True, corrupt_tail=True)
Expand source code
def generate_negative(kg, N, negative=2, check_kg=False, corrupt_head=True, corrupt_tail=True):
    # false triples:
    assert corrupt_head or corrupt_tail
    R = np.repeat(np.asarray([p for _,p,_ in kg]).reshape((-1,1)),negative,axis=0)
    fs = np.random.randint(0,N,size=(negative*len(kg),1))  
    fo = np.random.randint(0,N,size=(negative*len(kg),1))  
    negative_kg = np.stack([fs,R,fo],axis=1)
    return negative_kg
def hits(target, scores, k=10)
Expand source code
def hits(target, scores, k=10):
    scores = sorted(scores, key=lambda x: x[1], reverse=True)
    labels = [x for x,_ in scores][:k]
    return int(target in labels)
def load_kg(path)
Expand source code
def load_kg(path):
    out = []
    with open(path,'r') as f:
        for l in f:
            l = l.strip().split()
            out.append(l)
    return out
def loss_function_lookup(name)
Expand source code
def loss_function_lookup(name):
    return {
    'pointwize_hinge':pointwize_hinge,
    'pointwize_logistic':pointwize_logistic,
    'pointwize_cross_entropy':pointwize_cross_entropy,
    'pointwize_square_loss':pointwize_square_loss,
    'pairwize_hinge':pairwize_hinge,
    'pairwize_logistic':pairwize_logistic,
    'pairwize_square_loss':pairwize_square_loss
    }[name]
def mrr(target, scores)
Expand source code
def mrr(target, scores):
    scores = sorted(scores, key=lambda x: x[1], reverse=True)
    labels = [x for x,_ in scores]
    return 1/(1+labels.index(target))
def oversample_data(kgs, x=None, y=None, testing=False)
Expand source code
def oversample_data(kgs,x=None,y=None,testing=False):
    if testing:
        kgs = [list(kg)[:len(y)] for kg in kgs]
    else:
        kgs = [list(kg) for kg in kgs]
        
    if y is not None:
        m = max(max(map(len,kgs)),len(y))
    else:
        m = max(map(len,kgs))
    
    out = []
    for kg in kgs:
        out.append(choices(kg, k=m))
    
    if x is not None and y is not None:
        k = np.ceil(m/len(y))
        y = np.repeat(y,k,axis=0)[:m]
        x = np.repeat(x,k,axis=0)[:m,:]
        for s in np.split(x,3,axis=1):
            out.append(s.reshape((-1,)))
        return [np.squeeze(np.asarray(o)) for o in out], np.asarray(y)
    
    else:
        return [np.squeeze(np.asarray(o)) for o in out]
def pad(kg, bs)
Expand source code
def pad(kg,bs):
    kg = list(kg)
    while len(kg) % bs != 0:
        kg.append(choice(kg))
    return np.asarray(kg)
def pairwize_hinge(true, false, margin=1, negative_samples=1, reduce_mean=True)
Expand source code
def pairwize_hinge(true,false,margin=1, negative_samples=1, reduce_mean = True):
    false = tf.reshape(false,(-1,negative_samples))
    tmp = tf.nn.relu(margin+false-true)
    if reduce_mean:
        return tf.reduce_mean(tmp)
    return tmp
def pairwize_logistic(true, false, margin=0, negative_samples=1, reduce_mean=True)
Expand source code
def pairwize_logistic(true,false,margin=0, negative_samples=1, reduce_mean = True):
    false = tf.reshape(false,(-1,negative_samples))
    tmp = tf.math.log(EPSILON+1+tf.math.exp(false-true))
    if reduce_mean:
        return tf.reduce_mean(tmp) 
    return tmp
def pairwize_square_loss(true, false, margin=0, negative_samples=1, reduce_mean=True)
Expand source code
def pairwize_square_loss(true,false,margin=0, negative_samples=1, reduce_mean = True):
    false = tf.reshape(false,(-1,negative_samples))
    tmp = - tf.square(false-true)
    if reduce_mean:
        return tf.reduce_mean(tmp)
    return tmp
def pointwize_cross_entropy(true, false, margin=1, negative_samples=1, reduce_mean=True)
Expand source code
def pointwize_cross_entropy(true,false,margin=1,negative_samples=1, reduce_mean = True):
    return binary_crossentropy(1,true)+binary_crossentropy(0,false)
def pointwize_hinge(true, false, margin=1, negative_samples=1, reduce_mean=True)
Expand source code
def pointwize_hinge(true,false,margin=1,negative_samples=1, reduce_mean = True):
    return tf.reduce_mean(tf.nn.relu(margin-true))+tf.reduce_mean(tf.nn.relu(margin+false))
def pointwize_logistic(true, false, margin=1, negative_samples=1, reduce_mean=True)
Expand source code
def pointwize_logistic(true,false,margin=1,negative_samples=1, reduce_mean = True):
    return tf.reduce_mean(tf.math.log(EPSILON+1+tf.math.exp(-true)))+tf.reduce_mean(tf.math.log(EPSILON+1+tf.math.exp(false)))
def pointwize_square_loss(true, false, margin=1, negative_samples=1, reduce_mean=True)
Expand source code
def pointwize_square_loss(true,false,margin=1,negative_samples=1, reduce_mean = True):
    return tf.reduce_mean(tf.square(margin-true))+tf.reduce_mean(tf.square(margin+false))
def validate(model, test_data, num_entities, bs, filtering_triples=None)
Expand source code
def validate(model, test_data, num_entities, bs, filtering_triples = None):
    
    filter_h = defaultdict(set)
    filter_t = defaultdict(set)
    for s,p,o in filtering_triples:
        filter_h[(p,o)].add(s)
        filter_t[(s,p)].add(o)
    
    c_1, c_3, c_10 = 0,0,0
    mean_ranks = []

    for t in tqdm(gen_tail_data(test_data,num_entities,bs,filter_t),total=len(test_data)):
        res = np.asarray(model.predict(t)).reshape((-1,))
        r = rankdata(res,'max')
        target_rank = r[0]
        num_candidate = len(res)
        real_rank = num_candidate - target_rank + 1
        c_1 += 1 if target_rank == num_candidate else 0
        c_3 += 1 if target_rank + 3 > num_candidate else 0
        c_10 += 1 if target_rank + 10 > num_candidate else 0
        mean_ranks.append(real_rank)
        
    tail_hit_at_1 = c_1 / float(len(test_data))
    tail_hit_at_3 = c_3 / float(len(test_data))
    tail_hit_at_10 = c_10 / float(len(test_data))
    tail_avg_rank = np.mean(mean_ranks)
    tail_mrr = np.mean([1/m for m in mean_ranks])
    
    c_1, c_3, c_10 = 0,0,0
    mean_ranks = []
    
    for t in tqdm(gen_head_data(test_data,num_entities,bs,filter_h),total=len(test_data)):
        res = np.asarray(model.predict(t)).reshape((-1,))
        r = rankdata(res,'max')
        target_rank = r[0]
        num_candidate = len(res)
        real_rank = num_candidate - target_rank + 1
        c_1 += 1 if target_rank == num_candidate else 0
        c_3 += 1 if target_rank + 3 > num_candidate else 0
        c_10 += 1 if target_rank + 10 > num_candidate else 0
        mean_ranks.append(real_rank)
        
    head_hit_at_1 = c_1 / float(len(test_data))
    head_hit_at_3 = c_3 / float(len(test_data))
    head_hit_at_10 = c_10 / float(len(test_data))
    head_avg_rank = np.mean(mean_ranks)
    head_mrr = np.mean([1/m for m in mean_ranks])
        
    metrics = {'tail_hits@1':tail_hit_at_1,
               'tail_hits@3':tail_hit_at_3,
               'tail_hits@10':tail_hit_at_10,
               'tail_mr':tail_avg_rank,
               'tail_mrr':tail_mrr,
               'head_hits@1':head_hit_at_1,
               'head_hits@3':head_hit_at_3,
               'head_hits@10':head_hit_at_10,
               'head_mr':head_avg_rank,
               'head_mrr':head_mrr,
               'hits@1':(tail_hit_at_1+head_hit_at_1)/2,
               'hits@3':(tail_hit_at_3+head_hit_at_3)/2,
               'hits@10':(tail_hit_at_10+head_hit_at_10)/2,
               'mr':(tail_avg_rank+head_avg_rank)/2,
               'mrr':(tail_mrr+head_mrr)/2,
               }
    
    return metrics

Classes

class KGEValidateCallback (validation_data, train_data=None, *args, **kwargs)

Abstract base class used to build new callbacks.

Attributes

params
Dict. Training parameters (eg. verbosity, batch size, number of epochs…).
model
Instance of keras.models.Model. Reference of the model being trained.

The logs dictionary that callback methods take as argument will contain keys for quantities relevant to the current batch or epoch (see method-specific docstrings).

Expand source code
class KGEValidateCallback(Callback):
    def __init__(self, validation_data, train_data=None, *args, **kwargs):
        super(Callback, self).__init__(*args, **kwargs)
        self.validation_data = validation_data
        self.train_data = train_data
        
    def on_epoch_end(self, epoch, logs = None):
        if epoch % 5 == 0:
            logs = logs or {}
            tmp = validate(self.model, 
                            self.validation_data,
                            self.model.num_entities,
                            self.train_data)
                
            for k in tmp:
                logs['val_'+k] = tmp[k]
                
    def on_train_end(self, logs=None):
        self.on_epoch_end(100,logs=logs)

Ancestors

  • tensorflow.python.keras.callbacks.Callback

Methods

def on_epoch_end(self, epoch, logs=None)

Called at the end of an epoch.

Subclasses should override for any actions to run. This function should only be called during TRAIN mode.

Arguments

epoch: Integer, index of epoch. logs: Dict, metric results for this training epoch, and for the validation epoch if validation is performed. Validation result keys are prefixed with val_.

Expand source code
def on_epoch_end(self, epoch, logs = None):
    if epoch % 5 == 0:
        logs = logs or {}
        tmp = validate(self.model, 
                        self.validation_data,
                        self.model.num_entities,
                        self.train_data)
            
        for k in tmp:
            logs['val_'+k] = tmp[k]
def on_train_end(self, logs=None)

Called at the end of training.

Subclasses should override for any actions to run.

Arguments

logs: Dict. Currently the output of the last call to on_epoch_end() is passed to this argument for this method but that may change in the future.

Expand source code
def on_train_end(self, logs=None):
    self.on_epoch_end(100,logs=logs)