Module KGEkeras.utils
Expand source code
import numpy as np
from tqdm import tqdm
from scipy.stats import rankdata
from random import choice
from collections import defaultdict
from tensorflow.keras.callbacks import Callback
from tensorflow.keras.losses import binary_crossentropy
import tensorflow as tf
from random import choices
EPSILON = 1e-6
def load_kg(path):
out = []
with open(path,'r') as f:
for l in f:
l = l.strip().split()
out.append(l)
return out
def generate_negative(kg, N, negative=2, check_kg=False, corrupt_head=True, corrupt_tail=True):
# false triples:
assert corrupt_head or corrupt_tail
R = np.repeat(np.asarray([p for _,p,_ in kg]).reshape((-1,1)),negative,axis=0)
fs = np.random.randint(0,N,size=(negative*len(kg),1))
fo = np.random.randint(0,N,size=(negative*len(kg),1))
negative_kg = np.stack([fs,R,fo],axis=1)
return negative_kg
def oversample_data(kgs,x=None,y=None,testing=False):
if testing:
kgs = [list(kg)[:len(y)] for kg in kgs]
else:
kgs = [list(kg) for kg in kgs]
if y is not None:
m = max(max(map(len,kgs)),len(y))
else:
m = max(map(len,kgs))
out = []
for kg in kgs:
out.append(choices(kg, k=m))
if x is not None and y is not None:
k = np.ceil(m/len(y))
y = np.repeat(y,k,axis=0)[:m]
x = np.repeat(x,k,axis=0)[:m,:]
for s in np.split(x,3,axis=1):
out.append(s.reshape((-1,)))
return [np.squeeze(np.asarray(o)) for o in out], np.asarray(y)
else:
return [np.squeeze(np.asarray(o)) for o in out]
def pad(kg,bs):
kg = list(kg)
while len(kg) % bs != 0:
kg.append(choice(kg))
return np.asarray(kg)
def mrr(target, scores):
scores = sorted(scores, key=lambda x: x[1], reverse=True)
labels = [x for x,_ in scores]
return 1/(1+labels.index(target))
def hits(target, scores, k=10):
scores = sorted(scores, key=lambda x: x[1], reverse=True)
labels = [x for x,_ in scores][:k]
return int(target in labels)
def gen_tail_data(test_data,num_entities,bs,filter_t):
for s,p,o in test_data:
candiate_objects = list(range(num_entities))
candiate_objects.remove(o)
for oi in filter_t[(s,p)]:
candiate_objects.remove(oi)
subjects = np.asarray([[int(s)]]*(len(candiate_objects)+1))
predicates = np.asarray([[int(p)]]*(len(candiate_objects)+1))
objects = np.asarray([[int(o)]] + [[ent_id] for ent_id in candiate_objects])
triples = np.concatenate((subjects,predicates,objects),axis=-1)
yield triples.reshape((-1,3))
def gen_head_data(test_data,num_entities,bs,filter_h):
for s,p,o in test_data:
candiate_subjects = list(range(num_entities))
candiate_subjects.remove(s)
for si in filter_h[(p,o)]:
candiate_subjects.remove(si)
objects = np.asarray([[int(o)]]*(len(candiate_subjects)+1))
predicates = np.asarray([[int(p)]]*(len(candiate_subjects)+1))
subjects = np.asarray([[int(s)]] + [[ent_id] for ent_id in candiate_subjects])
triples = np.concatenate((subjects,predicates,objects),axis=-1)
yield triples.reshape((-1,3))
def validate(model, test_data, num_entities, bs, filtering_triples = None):
filter_h = defaultdict(set)
filter_t = defaultdict(set)
for s,p,o in filtering_triples:
filter_h[(p,o)].add(s)
filter_t[(s,p)].add(o)
c_1, c_3, c_10 = 0,0,0
mean_ranks = []
for t in tqdm(gen_tail_data(test_data,num_entities,bs,filter_t),total=len(test_data)):
res = np.asarray(model.predict(t)).reshape((-1,))
r = rankdata(res,'max')
target_rank = r[0]
num_candidate = len(res)
real_rank = num_candidate - target_rank + 1
c_1 += 1 if target_rank == num_candidate else 0
c_3 += 1 if target_rank + 3 > num_candidate else 0
c_10 += 1 if target_rank + 10 > num_candidate else 0
mean_ranks.append(real_rank)
tail_hit_at_1 = c_1 / float(len(test_data))
tail_hit_at_3 = c_3 / float(len(test_data))
tail_hit_at_10 = c_10 / float(len(test_data))
tail_avg_rank = np.mean(mean_ranks)
tail_mrr = np.mean([1/m for m in mean_ranks])
c_1, c_3, c_10 = 0,0,0
mean_ranks = []
for t in tqdm(gen_head_data(test_data,num_entities,bs,filter_h),total=len(test_data)):
res = np.asarray(model.predict(t)).reshape((-1,))
r = rankdata(res,'max')
target_rank = r[0]
num_candidate = len(res)
real_rank = num_candidate - target_rank + 1
c_1 += 1 if target_rank == num_candidate else 0
c_3 += 1 if target_rank + 3 > num_candidate else 0
c_10 += 1 if target_rank + 10 > num_candidate else 0
mean_ranks.append(real_rank)
head_hit_at_1 = c_1 / float(len(test_data))
head_hit_at_3 = c_3 / float(len(test_data))
head_hit_at_10 = c_10 / float(len(test_data))
head_avg_rank = np.mean(mean_ranks)
head_mrr = np.mean([1/m for m in mean_ranks])
metrics = {'tail_hits@1':tail_hit_at_1,
'tail_hits@3':tail_hit_at_3,
'tail_hits@10':tail_hit_at_10,
'tail_mr':tail_avg_rank,
'tail_mrr':tail_mrr,
'head_hits@1':head_hit_at_1,
'head_hits@3':head_hit_at_3,
'head_hits@10':head_hit_at_10,
'head_mr':head_avg_rank,
'head_mrr':head_mrr,
'hits@1':(tail_hit_at_1+head_hit_at_1)/2,
'hits@3':(tail_hit_at_3+head_hit_at_3)/2,
'hits@10':(tail_hit_at_10+head_hit_at_10)/2,
'mr':(tail_avg_rank+head_avg_rank)/2,
'mrr':(tail_mrr+head_mrr)/2,
}
return metrics
class KGEValidateCallback(Callback):
def __init__(self, validation_data, train_data=None, *args, **kwargs):
super(Callback, self).__init__(*args, **kwargs)
self.validation_data = validation_data
self.train_data = train_data
def on_epoch_end(self, epoch, logs = None):
if epoch % 5 == 0:
logs = logs or {}
tmp = validate(self.model,
self.validation_data,
self.model.num_entities,
self.train_data)
for k in tmp:
logs['val_'+k] = tmp[k]
def on_train_end(self, logs=None):
self.on_epoch_end(100,logs=logs)
def pointwize_hinge(true,false,margin=1,negative_samples=1, reduce_mean = True):
return tf.reduce_mean(tf.nn.relu(margin-true))+tf.reduce_mean(tf.nn.relu(margin+false))
def pointwize_logistic(true,false,margin=1,negative_samples=1, reduce_mean = True):
return tf.reduce_mean(tf.math.log(EPSILON+1+tf.math.exp(-true)))+tf.reduce_mean(tf.math.log(EPSILON+1+tf.math.exp(false)))
def pointwize_square_loss(true,false,margin=1,negative_samples=1, reduce_mean = True):
return tf.reduce_mean(tf.square(margin-true))+tf.reduce_mean(tf.square(margin+false))
def pointwize_cross_entropy(true,false,margin=1,negative_samples=1, reduce_mean = True):
return binary_crossentropy(1,true)+binary_crossentropy(0,false)
def pairwize_hinge(true,false,margin=1, negative_samples=1, reduce_mean = True):
false = tf.reshape(false,(-1,negative_samples))
tmp = tf.nn.relu(margin+false-true)
if reduce_mean:
return tf.reduce_mean(tmp)
return tmp
def pairwize_logistic(true,false,margin=0, negative_samples=1, reduce_mean = True):
false = tf.reshape(false,(-1,negative_samples))
tmp = tf.math.log(EPSILON+1+tf.math.exp(false-true))
if reduce_mean:
return tf.reduce_mean(tmp)
return tmp
def pairwize_square_loss(true,false,margin=0, negative_samples=1, reduce_mean = True):
false = tf.reshape(false,(-1,negative_samples))
tmp = - tf.square(false-true)
if reduce_mean:
return tf.reduce_mean(tmp)
return tmp
def loss_function_lookup(name):
return {
'pointwize_hinge':pointwize_hinge,
'pointwize_logistic':pointwize_logistic,
'pointwize_cross_entropy':pointwize_cross_entropy,
'pointwize_square_loss':pointwize_square_loss,
'pairwize_hinge':pairwize_hinge,
'pairwize_logistic':pairwize_logistic,
'pairwize_square_loss':pairwize_square_loss
}[name]
Functions
def gen_head_data(test_data, num_entities, bs, filter_h)-
Expand source code
def gen_head_data(test_data,num_entities,bs,filter_h): for s,p,o in test_data: candiate_subjects = list(range(num_entities)) candiate_subjects.remove(s) for si in filter_h[(p,o)]: candiate_subjects.remove(si) objects = np.asarray([[int(o)]]*(len(candiate_subjects)+1)) predicates = np.asarray([[int(p)]]*(len(candiate_subjects)+1)) subjects = np.asarray([[int(s)]] + [[ent_id] for ent_id in candiate_subjects]) triples = np.concatenate((subjects,predicates,objects),axis=-1) yield triples.reshape((-1,3)) def gen_tail_data(test_data, num_entities, bs, filter_t)-
Expand source code
def gen_tail_data(test_data,num_entities,bs,filter_t): for s,p,o in test_data: candiate_objects = list(range(num_entities)) candiate_objects.remove(o) for oi in filter_t[(s,p)]: candiate_objects.remove(oi) subjects = np.asarray([[int(s)]]*(len(candiate_objects)+1)) predicates = np.asarray([[int(p)]]*(len(candiate_objects)+1)) objects = np.asarray([[int(o)]] + [[ent_id] for ent_id in candiate_objects]) triples = np.concatenate((subjects,predicates,objects),axis=-1) yield triples.reshape((-1,3)) def generate_negative(kg, N, negative=2, check_kg=False, corrupt_head=True, corrupt_tail=True)-
Expand source code
def generate_negative(kg, N, negative=2, check_kg=False, corrupt_head=True, corrupt_tail=True): # false triples: assert corrupt_head or corrupt_tail R = np.repeat(np.asarray([p for _,p,_ in kg]).reshape((-1,1)),negative,axis=0) fs = np.random.randint(0,N,size=(negative*len(kg),1)) fo = np.random.randint(0,N,size=(negative*len(kg),1)) negative_kg = np.stack([fs,R,fo],axis=1) return negative_kg def hits(target, scores, k=10)-
Expand source code
def hits(target, scores, k=10): scores = sorted(scores, key=lambda x: x[1], reverse=True) labels = [x for x,_ in scores][:k] return int(target in labels) def load_kg(path)-
Expand source code
def load_kg(path): out = [] with open(path,'r') as f: for l in f: l = l.strip().split() out.append(l) return out def loss_function_lookup(name)-
Expand source code
def loss_function_lookup(name): return { 'pointwize_hinge':pointwize_hinge, 'pointwize_logistic':pointwize_logistic, 'pointwize_cross_entropy':pointwize_cross_entropy, 'pointwize_square_loss':pointwize_square_loss, 'pairwize_hinge':pairwize_hinge, 'pairwize_logistic':pairwize_logistic, 'pairwize_square_loss':pairwize_square_loss }[name] def mrr(target, scores)-
Expand source code
def mrr(target, scores): scores = sorted(scores, key=lambda x: x[1], reverse=True) labels = [x for x,_ in scores] return 1/(1+labels.index(target)) def oversample_data(kgs, x=None, y=None, testing=False)-
Expand source code
def oversample_data(kgs,x=None,y=None,testing=False): if testing: kgs = [list(kg)[:len(y)] for kg in kgs] else: kgs = [list(kg) for kg in kgs] if y is not None: m = max(max(map(len,kgs)),len(y)) else: m = max(map(len,kgs)) out = [] for kg in kgs: out.append(choices(kg, k=m)) if x is not None and y is not None: k = np.ceil(m/len(y)) y = np.repeat(y,k,axis=0)[:m] x = np.repeat(x,k,axis=0)[:m,:] for s in np.split(x,3,axis=1): out.append(s.reshape((-1,))) return [np.squeeze(np.asarray(o)) for o in out], np.asarray(y) else: return [np.squeeze(np.asarray(o)) for o in out] def pad(kg, bs)-
Expand source code
def pad(kg,bs): kg = list(kg) while len(kg) % bs != 0: kg.append(choice(kg)) return np.asarray(kg) def pairwize_hinge(true, false, margin=1, negative_samples=1, reduce_mean=True)-
Expand source code
def pairwize_hinge(true,false,margin=1, negative_samples=1, reduce_mean = True): false = tf.reshape(false,(-1,negative_samples)) tmp = tf.nn.relu(margin+false-true) if reduce_mean: return tf.reduce_mean(tmp) return tmp def pairwize_logistic(true, false, margin=0, negative_samples=1, reduce_mean=True)-
Expand source code
def pairwize_logistic(true,false,margin=0, negative_samples=1, reduce_mean = True): false = tf.reshape(false,(-1,negative_samples)) tmp = tf.math.log(EPSILON+1+tf.math.exp(false-true)) if reduce_mean: return tf.reduce_mean(tmp) return tmp def pairwize_square_loss(true, false, margin=0, negative_samples=1, reduce_mean=True)-
Expand source code
def pairwize_square_loss(true,false,margin=0, negative_samples=1, reduce_mean = True): false = tf.reshape(false,(-1,negative_samples)) tmp = - tf.square(false-true) if reduce_mean: return tf.reduce_mean(tmp) return tmp def pointwize_cross_entropy(true, false, margin=1, negative_samples=1, reduce_mean=True)-
Expand source code
def pointwize_cross_entropy(true,false,margin=1,negative_samples=1, reduce_mean = True): return binary_crossentropy(1,true)+binary_crossentropy(0,false) def pointwize_hinge(true, false, margin=1, negative_samples=1, reduce_mean=True)-
Expand source code
def pointwize_hinge(true,false,margin=1,negative_samples=1, reduce_mean = True): return tf.reduce_mean(tf.nn.relu(margin-true))+tf.reduce_mean(tf.nn.relu(margin+false)) def pointwize_logistic(true, false, margin=1, negative_samples=1, reduce_mean=True)-
Expand source code
def pointwize_logistic(true,false,margin=1,negative_samples=1, reduce_mean = True): return tf.reduce_mean(tf.math.log(EPSILON+1+tf.math.exp(-true)))+tf.reduce_mean(tf.math.log(EPSILON+1+tf.math.exp(false))) def pointwize_square_loss(true, false, margin=1, negative_samples=1, reduce_mean=True)-
Expand source code
def pointwize_square_loss(true,false,margin=1,negative_samples=1, reduce_mean = True): return tf.reduce_mean(tf.square(margin-true))+tf.reduce_mean(tf.square(margin+false)) def validate(model, test_data, num_entities, bs, filtering_triples=None)-
Expand source code
def validate(model, test_data, num_entities, bs, filtering_triples = None): filter_h = defaultdict(set) filter_t = defaultdict(set) for s,p,o in filtering_triples: filter_h[(p,o)].add(s) filter_t[(s,p)].add(o) c_1, c_3, c_10 = 0,0,0 mean_ranks = [] for t in tqdm(gen_tail_data(test_data,num_entities,bs,filter_t),total=len(test_data)): res = np.asarray(model.predict(t)).reshape((-1,)) r = rankdata(res,'max') target_rank = r[0] num_candidate = len(res) real_rank = num_candidate - target_rank + 1 c_1 += 1 if target_rank == num_candidate else 0 c_3 += 1 if target_rank + 3 > num_candidate else 0 c_10 += 1 if target_rank + 10 > num_candidate else 0 mean_ranks.append(real_rank) tail_hit_at_1 = c_1 / float(len(test_data)) tail_hit_at_3 = c_3 / float(len(test_data)) tail_hit_at_10 = c_10 / float(len(test_data)) tail_avg_rank = np.mean(mean_ranks) tail_mrr = np.mean([1/m for m in mean_ranks]) c_1, c_3, c_10 = 0,0,0 mean_ranks = [] for t in tqdm(gen_head_data(test_data,num_entities,bs,filter_h),total=len(test_data)): res = np.asarray(model.predict(t)).reshape((-1,)) r = rankdata(res,'max') target_rank = r[0] num_candidate = len(res) real_rank = num_candidate - target_rank + 1 c_1 += 1 if target_rank == num_candidate else 0 c_3 += 1 if target_rank + 3 > num_candidate else 0 c_10 += 1 if target_rank + 10 > num_candidate else 0 mean_ranks.append(real_rank) head_hit_at_1 = c_1 / float(len(test_data)) head_hit_at_3 = c_3 / float(len(test_data)) head_hit_at_10 = c_10 / float(len(test_data)) head_avg_rank = np.mean(mean_ranks) head_mrr = np.mean([1/m for m in mean_ranks]) metrics = {'tail_hits@1':tail_hit_at_1, 'tail_hits@3':tail_hit_at_3, 'tail_hits@10':tail_hit_at_10, 'tail_mr':tail_avg_rank, 'tail_mrr':tail_mrr, 'head_hits@1':head_hit_at_1, 'head_hits@3':head_hit_at_3, 'head_hits@10':head_hit_at_10, 'head_mr':head_avg_rank, 'head_mrr':head_mrr, 'hits@1':(tail_hit_at_1+head_hit_at_1)/2, 'hits@3':(tail_hit_at_3+head_hit_at_3)/2, 'hits@10':(tail_hit_at_10+head_hit_at_10)/2, 'mr':(tail_avg_rank+head_avg_rank)/2, 'mrr':(tail_mrr+head_mrr)/2, } return metrics
Classes
class KGEValidateCallback (validation_data, train_data=None, *args, **kwargs)-
Abstract base class used to build new callbacks.
Attributes
params- Dict. Training parameters (eg. verbosity, batch size, number of epochs…).
model- Instance of
keras.models.Model. Reference of the model being trained.
The
logsdictionary that callback methods take as argument will contain keys for quantities relevant to the current batch or epoch (see method-specific docstrings).Expand source code
class KGEValidateCallback(Callback): def __init__(self, validation_data, train_data=None, *args, **kwargs): super(Callback, self).__init__(*args, **kwargs) self.validation_data = validation_data self.train_data = train_data def on_epoch_end(self, epoch, logs = None): if epoch % 5 == 0: logs = logs or {} tmp = validate(self.model, self.validation_data, self.model.num_entities, self.train_data) for k in tmp: logs['val_'+k] = tmp[k] def on_train_end(self, logs=None): self.on_epoch_end(100,logs=logs)Ancestors
- tensorflow.python.keras.callbacks.Callback
Methods
def on_epoch_end(self, epoch, logs=None)-
Called at the end of an epoch.
Subclasses should override for any actions to run. This function should only be called during TRAIN mode.
Arguments
epoch: Integer, index of epoch. logs: Dict, metric results for this training epoch, and for the validation epoch if validation is performed. Validation result keys are prefixed with
val_.Expand source code
def on_epoch_end(self, epoch, logs = None): if epoch % 5 == 0: logs = logs or {} tmp = validate(self.model, self.validation_data, self.model.num_entities, self.train_data) for k in tmp: logs['val_'+k] = tmp[k] def on_train_end(self, logs=None)-
Called at the end of training.
Subclasses should override for any actions to run.
Arguments
logs: Dict. Currently the output of the last call to
on_epoch_end()is passed to this argument for this method but that may change in the future.Expand source code
def on_train_end(self, logs=None): self.on_epoch_end(100,logs=logs)