Module tera.DataIntegration

A set of classes for aligning data aggregated with tools in DataAggregation.

Expand source code
"""
A set of classes for aligning data aggregated with tools in DataAggregation.
"""
from rdflib import Graph, Namespace, Literal, URIRef
from rdflib.namespace import RDF, OWL, RDFS
import pandas as pd
import validators
from fuzzywuzzy import fuzz
from fuzzywuzzy import process
from collections import defaultdict
import tera.utils as ut
import copy
from tqdm import tqdm

class Alignment:
    def __init__(self, verbose = False, name = 'Alignment'):
        """Base class for alignment of two data sets. 
        
        Parameters
        ----------
        name : str        
        """
        self.name = name
        self.verbose = verbose
    
    def __add__(self, other):
        c = copy.deepcopy(self)
        if hasattr(c, 'mappings'):
            c.mappings.update(other.mappings)
        if hasattr(c, 'reverse_mappings'):
            c.reverse_mappings.update(other.reverse_mappings)
        return c
    
    def load(self):
        """Loading mappings. 
        
        Raises
        ------
        NotImplementedError
            * If not implemented in sub-class.
        """
        raise NotImplementedError
    
    def _to_defaultdict(self):
        self.mappings = defaultdict(lambda :'no mapping', self.mappings)
    
    def _mapping(self, x, reverse = False):
        """
        Maps from one id type to another. 
        
        Parameters
        ----------
        x : rdflib.URIRef or str 
            URI/identifier to map from. 
            
        reverse : bool 
            Reverse the direction of mapping. 
                
        Returns
        -------
        str 
            If no mapping exists, returns 'no mapping'
        """
        if not hasattr(self, 'mappings'):
            self.load()
        if not hasattr(self, 'reverse_mappings'):
            self.reverse_mappings = {}
            for k,i in self.mappings.items():
                for j in i:
                    self.reverse_mappings[j] = [k]
            
        if reverse:
            tmp = self.reverse_mappings
        else:
            tmp = self.mappings
        
        x = str(x)
        if x in tmp:
            if len(tmp) > 1 and self.verbose:
                print('Mapping from %s is not unique.' % x)
            try:
                return tmp[x].pop(0)
            except:
                return 'no mapping'
            
        return 'no mapping'
    
    def __len__(self):
        return len(self.mappings)
    
    def __add__(self,other):
        self.load()
        other.load()
        self.mappings = {**self.mappings,**other.mappings}
        return self
        
    @ut.do_recursively_in_class
    def convert(self, id_, reverse=False, strip=False):
        """
        Convert a set of ids into new identifiers.
        
        Parameters
        ----------
        id_ : rdflib.URIRef, str, list, set 
            URI(s)/identifier(s)  
        
        reverse : bool 
            Reverse the direction of mapping. 
            
        strip : bool 
            Remove namespace.
                
        Returns
        -------
        str or dict
            Mapped values.
        """
        if strip:
            id_ = ut.strip_namespace(str(id_),['/','#'])
        return self._mapping(id_,reverse)

class EndpointMapping(Alignment):
    def __init__(self, endpoint, verbose=False):
        super(EndpointMapping, self).__init__(verbose=verbose)
        """Class for loading mappings based on owl:sameAs property.
        
        Parameters
        ----------
        endpoint : str 
            SPARQL endpoint URL.
        """
        self.endpoint = endpoint
    
    def load(self):
        query = """
        SELECT ?s ?o WHERE {
            ?s <http://www.w3.org/2002/07/owl#sameAs> ?o .
        } 
        """
        res = ut.query_endpoint(self.endpoint, query, var = ['s','o'])
        self.mappings = {str(s):[str(o)] for s,o in res}

class WikidataMapping(Alignment):
    def __init__(self, query, verbose=False):
        """
        Class for loading mappings from wikidata.
        
        Parameters
        ----------
        query : str 
            Wikidata query with two variables. 
            
            eg. from inchikey to cas: 
            
            SELECT ?from ?to {  
            ?compound wdt:P235 ?from . 
            ?compound wdt:P231 ?to .} 
        """
        super(WikidataMapping, self).__init__(verbose=verbose)
        self.query = query
        
    def load(self):
        res = ut.query_endpoint('https://query.wikidata.org/sparql', 
                             self.query, 
                             var = ['from', 'to'])
        self.mappings = {str(f):[str(t)] for f,t in res}

class LogMapMapping(Alignment):
    def __init__(self, filename, threshold=0.95, unique=False,  verbose=False, strip=True):
        """
        Class for using LogMap (or other system) alignments. 
        
        Parameters
        ----------
        filename : str 
            Path to logmap output file (.rdf) 
        
        threshold : float 
            Alignment threshold.
        """
        super(LogMapMapping, self).__init__(verbose=verbose)
        
        self.threshold = threshold
        self.filename = filename
        self.strip = strip
        self.unique = unique
        
    def load(self):
        if self.filename[-3:] == 'rdf':
            self.load_rdf()
        else:
            self.load_txt()
    
    def load_rdf(self):
        out = defaultdict(list)
        scores = defaultdict(lambda : 0.0)
        g = Graph()
        g.parse(self.filename)
        o = URIRef('http://knowledgeweb.semanticweb.org/heterogeneity/alignmentCell')
        for s in g.subjects(predicate=RDF.type, object = o):
            e1 = list(g.objects(subject=s,predicate=URIRef('http://knowledgeweb.semanticweb.org/heterogeneity/alignmententity1'))).pop(0)
            e2 = list(g.objects(subject=s,predicate=URIRef('http://knowledgeweb.semanticweb.org/heterogeneity/alignmententity2'))).pop(0)
            score = list(g.objects(subject=s,predicate=URIRef('http://knowledgeweb.semanticweb.org/heterogeneity/alignmentmeasure'))).pop(0)
            
            score = float(score)
            if score >= self.threshold and (score > scores[(e1,e2)] or not self.unique):
                scores[(e1,e2)] = score
                e1 = str(e1)
                e2 = str(e2)
                if self.strip:
                    e1 = ut.strip_namespace(e1,['/','#','CID'])
                    e2 = ut.strip_namespace(e2,['/','#','CID'])
                out[e1].append(e2)
        
        self.mappings = out
        self.scores = scores
        
    def load_txt(self):
        out = defaultdict(list)
        scores = defaultdict(lambda : 0.0)
        try:
            df = pd.read_csv(self.filename, sep='|', header=0, names=['e1','e2','type','score','is_instance'])
        except:
            df = pd.read_csv(self.filename, sep='|', header=0, names=['e1','e2','score'])
            
        for e1,e2,score in zip(df['e1'],df['e2'],df['score']):
            score = float(score)
            if score >= self.threshold and (score > scores[(e1,e2)] or not self.unique):
                scores[(e1,e2)] = score
                e1 = str(e1)
                e2 = str(e2)
                if self.strip:
                    e1 = ut.strip_namespace(e1,['/','#','CID'])
                    e2 = ut.strip_namespace(e2,['/','#','CID'])
                out[e1].append(e2)
        self.mappings = out
        self.scores = scores
        
class StringMatchingMapping(Alignment):
    def __init__(self, dict1, dict2, threshold = 0.95, verbose=False):
        """
        Class for creating mapping between two label dictonaries using string matching. 
        
        Parameters
        ----------
        dict1 : dict 
            Dictonary on the form {entity:list of labels} 
        
        dict2 : dict 
            Same as dict1.
            
        threshold : float 
            Alignment threshold.
        """
        super(StringMatchingMapping, self).__init__(verbose=verbose)
        
        self.threshold = threshold
        self.dict1 = dict1
        self.dict2 = dict2
    
    def load(self):
        tmp = defaultdict(float)
        for k1 in self.dict1:
            for k2 in self.dict2:
                try:
                    _, score = process.extractOne(self.dict1[k1],self.dict2[k2])
                except TypeError:
                    score = 0
                    
                if score >= self.threshold:
                    tmp[k1,k2] = max(tmp[k1,k2],score)
        
        self.mappings = {k1:[k2] for k1,k2 in tmp}
    
class DownloadedWikidata(Alignment):
    def __init__(self, filename, verbose = False):
        """
        Class for creating mappings from downloaded wikidata files. 
        
        Parameters
        ----------
        filename : str 
            Path to file with header = ['from','to']
            
        """
        super(DownloadedWikidata, self).__init__(verbose=verbose)
        self.filename = filename
    
    def load(self):
        df = pd.read_csv(self.filename,dtype=str)
        self.mappings = {k1:[k2] for k1,k2 in zip(df['from'],df['to'])}
    
class StringGraphMapping(Alignment):
    def __init__(self, g1, g2, threshold = 0.95, verbose=False):
        """
        Class for creating mapping between two graph using string matching. 
        
        Parameters
        ----------
        g1 : rdflib.Graph 
        
        g2 : rdflib.Graph 
        
        threshold : float 
            Alignment threshold.
                
        """
        super(StringGraphMapping, self).__init__(verbose=verbose)
        
        self.threshold = threshold
        self.g1 = g1
        self.g2 = g2
    
    def load(self):
        dict1 = ut.graph_to_dict(self.g1)
        dict2 = ut.graph_to_dict(self.g2)
        
        tmp = defaultdict(float)
        for k1 in dict1:
            for k2 in dict2:
                try:
                    _, score = process.extractOne(dict1[k1],dict2[k2])
                except TypeError:
                    score = 0
                    
                if score >= self.threshold:
                    tmp[k1,k2] = max(tmp[k1,k2],score)
        
        self.mappings = {k1:[k2] for k1,k2 in tmp}

class InchikeyToCas(WikidataMapping):
    def __init__(self, verbose=False):
        """Class which creates inchikey to cas mapping."""
        query = """
        SELECT ?from ?to WHERE
            { 
            [] wdt:P31 wd:Q11173 ;
               wdt:P235 ?from ;
               wdt:P231 ?tmp .
              BIND(REPLACE(?tmp, "-", "", "i") AS ?to)
            }
        """
        super(InchikeyToCas, self).__init__(query=query, verbose=verbose)
    
class InchikeyToPubChem(WikidataMapping):
    def __init__(self, verbose=False):
        """Class which creates inchikey to pubchem mapping."""
        query = """
        SELECT ?from ?to WHERE
            { 
            [] wdt:P31 wd:Q11173 ;
               wdt:P235 ?from ;
               wdt:P662 ?to .
            }
        """
        super(InchikeyToPubChem, self).__init__(query=query, verbose=verbose)
    
class InchikeyToChEBI(WikidataMapping):
    def __init__(self, verbose=False):
        """Class which creates inchikey to chebi mapping."""
        query = """
        SELECT ?from ?to WHERE
            { 
            [] wdt:P31 wd:Q11173 ;
               wdt:P235 ?from ;
               wdt:P683 ?to .
            }
        """
        super(InchikeyToChEBI, self).__init__(query=query, verbose=verbose)

class InchikeyToChEMBL(WikidataMapping):
    def __init__(self, verbose=False):
        """Class which creates inchikey to chemble mapping."""
        query = """
        SELECT ?from ?to WHERE
            { 
            [] wdt:P31 wd:Q11173 ;
               wdt:P235 ?from ;
               wdt:P592 ?to .
            }
        """
        super(InchikeyToChEMBL, self).__init__(query=query, verbose=verbose)
        
class InchikeyToMeSH(WikidataMapping):
    def __init__(self, verbose=False):
        """Class which creates inchikey to mesh mapping."""
        query = """
        SELECT ?from ?to WHERE
            { 
            [] wdt:P31 wd:Q11173 ; 
               wdt:P235 ?from ;
               wdt:P486 ?to .
            }
        """
        super(InchikeyToMeSH, self).__init__(query=query, verbose=verbose)

class NCBIToEOL(WikidataMapping):
    def __init__(self, verbose=False):
        """Class which creates ncbi to eol mapping."""
        query = """
        SELECT ?from ?to WHERE
            { 
            [] wdt:P31 wd:Q16521 ; 
               wdt:P685 ?from ;
               wdt:P830 ?to .
            }
        """
        super(NCBIToEOL, self).__init__(query=query, verbose=verbose)
        
        
#TODO change ncbi -> ecotox mapping to concensus mappings.
class NCBIToEcotox(StringGraphMapping):
    def __init__(self, dataobject1, dataobject2, verbose=False):
        """Class which creates ncbi to ecotox mapping."""
        super(NCBIToEcotox, self).__init__(dataobject1.graph,
                                           dataobject2.graph, 
                                           verbose=verbose)
        

Classes

class Alignment (verbose=False, name='Alignment')

Base class for alignment of two data sets.

Parameters

name : str
 
Expand source code
class Alignment:
    def __init__(self, verbose = False, name = 'Alignment'):
        """Base class for alignment of two data sets. 
        
        Parameters
        ----------
        name : str        
        """
        self.name = name
        self.verbose = verbose
    
    def __add__(self, other):
        c = copy.deepcopy(self)
        if hasattr(c, 'mappings'):
            c.mappings.update(other.mappings)
        if hasattr(c, 'reverse_mappings'):
            c.reverse_mappings.update(other.reverse_mappings)
        return c
    
    def load(self):
        """Loading mappings. 
        
        Raises
        ------
        NotImplementedError
            * If not implemented in sub-class.
        """
        raise NotImplementedError
    
    def _to_defaultdict(self):
        self.mappings = defaultdict(lambda :'no mapping', self.mappings)
    
    def _mapping(self, x, reverse = False):
        """
        Maps from one id type to another. 
        
        Parameters
        ----------
        x : rdflib.URIRef or str 
            URI/identifier to map from. 
            
        reverse : bool 
            Reverse the direction of mapping. 
                
        Returns
        -------
        str 
            If no mapping exists, returns 'no mapping'
        """
        if not hasattr(self, 'mappings'):
            self.load()
        if not hasattr(self, 'reverse_mappings'):
            self.reverse_mappings = {}
            for k,i in self.mappings.items():
                for j in i:
                    self.reverse_mappings[j] = [k]
            
        if reverse:
            tmp = self.reverse_mappings
        else:
            tmp = self.mappings
        
        x = str(x)
        if x in tmp:
            if len(tmp) > 1 and self.verbose:
                print('Mapping from %s is not unique.' % x)
            try:
                return tmp[x].pop(0)
            except:
                return 'no mapping'
            
        return 'no mapping'
    
    def __len__(self):
        return len(self.mappings)
    
    def __add__(self,other):
        self.load()
        other.load()
        self.mappings = {**self.mappings,**other.mappings}
        return self
        
    @ut.do_recursively_in_class
    def convert(self, id_, reverse=False, strip=False):
        """
        Convert a set of ids into new identifiers.
        
        Parameters
        ----------
        id_ : rdflib.URIRef, str, list, set 
            URI(s)/identifier(s)  
        
        reverse : bool 
            Reverse the direction of mapping. 
            
        strip : bool 
            Remove namespace.
                
        Returns
        -------
        str or dict
            Mapped values.
        """
        if strip:
            id_ = ut.strip_namespace(str(id_),['/','#'])
        return self._mapping(id_,reverse)

Subclasses

Methods

def convert(self, id_, reverse=False, strip=False)

Convert a set of ids into new identifiers.

Parameters

id_ : rdflib.URIRef, str, list, set
URI(s)/identifier(s)
reverse : bool
Reverse the direction of mapping.
strip : bool
Remove namespace.

Returns

str or dict
Mapped values.
Expand source code
@ut.do_recursively_in_class
def convert(self, id_, reverse=False, strip=False):
    """
    Convert a set of ids into new identifiers.
    
    Parameters
    ----------
    id_ : rdflib.URIRef, str, list, set 
        URI(s)/identifier(s)  
    
    reverse : bool 
        Reverse the direction of mapping. 
        
    strip : bool 
        Remove namespace.
            
    Returns
    -------
    str or dict
        Mapped values.
    """
    if strip:
        id_ = ut.strip_namespace(str(id_),['/','#'])
    return self._mapping(id_,reverse)
def load(self)

Loading mappings.

Raises

NotImplementedError
  • If not implemented in sub-class.
Expand source code
def load(self):
    """Loading mappings. 
    
    Raises
    ------
    NotImplementedError
        * If not implemented in sub-class.
    """
    raise NotImplementedError
class DownloadedWikidata (filename, verbose=False)

Class for creating mappings from downloaded wikidata files.

Parameters

filename : str
Path to file with header = ['from','to']
Expand source code
class DownloadedWikidata(Alignment):
    def __init__(self, filename, verbose = False):
        """
        Class for creating mappings from downloaded wikidata files. 
        
        Parameters
        ----------
        filename : str 
            Path to file with header = ['from','to']
            
        """
        super(DownloadedWikidata, self).__init__(verbose=verbose)
        self.filename = filename
    
    def load(self):
        df = pd.read_csv(self.filename,dtype=str)
        self.mappings = {k1:[k2] for k1,k2 in zip(df['from'],df['to'])}

Ancestors

Inherited members

class EndpointMapping (endpoint, verbose=False)

Base class for alignment of two data sets.

Parameters

name : str
 
Expand source code
class EndpointMapping(Alignment):
    def __init__(self, endpoint, verbose=False):
        super(EndpointMapping, self).__init__(verbose=verbose)
        """Class for loading mappings based on owl:sameAs property.
        
        Parameters
        ----------
        endpoint : str 
            SPARQL endpoint URL.
        """
        self.endpoint = endpoint
    
    def load(self):
        query = """
        SELECT ?s ?o WHERE {
            ?s <http://www.w3.org/2002/07/owl#sameAs> ?o .
        } 
        """
        res = ut.query_endpoint(self.endpoint, query, var = ['s','o'])
        self.mappings = {str(s):[str(o)] for s,o in res}

Ancestors

Inherited members

class InchikeyToCas (verbose=False)

Class which creates inchikey to cas mapping.

Expand source code
class InchikeyToCas(WikidataMapping):
    def __init__(self, verbose=False):
        """Class which creates inchikey to cas mapping."""
        query = """
        SELECT ?from ?to WHERE
            { 
            [] wdt:P31 wd:Q11173 ;
               wdt:P235 ?from ;
               wdt:P231 ?tmp .
              BIND(REPLACE(?tmp, "-", "", "i") AS ?to)
            }
        """
        super(InchikeyToCas, self).__init__(query=query, verbose=verbose)

Ancestors

Inherited members

class InchikeyToChEBI (verbose=False)

Class which creates inchikey to chebi mapping.

Expand source code
class InchikeyToChEBI(WikidataMapping):
    def __init__(self, verbose=False):
        """Class which creates inchikey to chebi mapping."""
        query = """
        SELECT ?from ?to WHERE
            { 
            [] wdt:P31 wd:Q11173 ;
               wdt:P235 ?from ;
               wdt:P683 ?to .
            }
        """
        super(InchikeyToChEBI, self).__init__(query=query, verbose=verbose)

Ancestors

Inherited members

class InchikeyToChEMBL (verbose=False)

Class which creates inchikey to chemble mapping.

Expand source code
class InchikeyToChEMBL(WikidataMapping):
    def __init__(self, verbose=False):
        """Class which creates inchikey to chemble mapping."""
        query = """
        SELECT ?from ?to WHERE
            { 
            [] wdt:P31 wd:Q11173 ;
               wdt:P235 ?from ;
               wdt:P592 ?to .
            }
        """
        super(InchikeyToChEMBL, self).__init__(query=query, verbose=verbose)

Ancestors

Inherited members

class InchikeyToMeSH (verbose=False)

Class which creates inchikey to mesh mapping.

Expand source code
class InchikeyToMeSH(WikidataMapping):
    def __init__(self, verbose=False):
        """Class which creates inchikey to mesh mapping."""
        query = """
        SELECT ?from ?to WHERE
            { 
            [] wdt:P31 wd:Q11173 ; 
               wdt:P235 ?from ;
               wdt:P486 ?to .
            }
        """
        super(InchikeyToMeSH, self).__init__(query=query, verbose=verbose)

Ancestors

Inherited members

class InchikeyToPubChem (verbose=False)

Class which creates inchikey to pubchem mapping.

Expand source code
class InchikeyToPubChem(WikidataMapping):
    def __init__(self, verbose=False):
        """Class which creates inchikey to pubchem mapping."""
        query = """
        SELECT ?from ?to WHERE
            { 
            [] wdt:P31 wd:Q11173 ;
               wdt:P235 ?from ;
               wdt:P662 ?to .
            }
        """
        super(InchikeyToPubChem, self).__init__(query=query, verbose=verbose)

Ancestors

Inherited members

class LogMapMapping (filename, threshold=0.95, unique=False, verbose=False, strip=True)

Class for using LogMap (or other system) alignments.

Parameters

filename : str
Path to logmap output file (.rdf)
threshold : float
Alignment threshold.
Expand source code
class LogMapMapping(Alignment):
    def __init__(self, filename, threshold=0.95, unique=False,  verbose=False, strip=True):
        """
        Class for using LogMap (or other system) alignments. 
        
        Parameters
        ----------
        filename : str 
            Path to logmap output file (.rdf) 
        
        threshold : float 
            Alignment threshold.
        """
        super(LogMapMapping, self).__init__(verbose=verbose)
        
        self.threshold = threshold
        self.filename = filename
        self.strip = strip
        self.unique = unique
        
    def load(self):
        if self.filename[-3:] == 'rdf':
            self.load_rdf()
        else:
            self.load_txt()
    
    def load_rdf(self):
        out = defaultdict(list)
        scores = defaultdict(lambda : 0.0)
        g = Graph()
        g.parse(self.filename)
        o = URIRef('http://knowledgeweb.semanticweb.org/heterogeneity/alignmentCell')
        for s in g.subjects(predicate=RDF.type, object = o):
            e1 = list(g.objects(subject=s,predicate=URIRef('http://knowledgeweb.semanticweb.org/heterogeneity/alignmententity1'))).pop(0)
            e2 = list(g.objects(subject=s,predicate=URIRef('http://knowledgeweb.semanticweb.org/heterogeneity/alignmententity2'))).pop(0)
            score = list(g.objects(subject=s,predicate=URIRef('http://knowledgeweb.semanticweb.org/heterogeneity/alignmentmeasure'))).pop(0)
            
            score = float(score)
            if score >= self.threshold and (score > scores[(e1,e2)] or not self.unique):
                scores[(e1,e2)] = score
                e1 = str(e1)
                e2 = str(e2)
                if self.strip:
                    e1 = ut.strip_namespace(e1,['/','#','CID'])
                    e2 = ut.strip_namespace(e2,['/','#','CID'])
                out[e1].append(e2)
        
        self.mappings = out
        self.scores = scores
        
    def load_txt(self):
        out = defaultdict(list)
        scores = defaultdict(lambda : 0.0)
        try:
            df = pd.read_csv(self.filename, sep='|', header=0, names=['e1','e2','type','score','is_instance'])
        except:
            df = pd.read_csv(self.filename, sep='|', header=0, names=['e1','e2','score'])
            
        for e1,e2,score in zip(df['e1'],df['e2'],df['score']):
            score = float(score)
            if score >= self.threshold and (score > scores[(e1,e2)] or not self.unique):
                scores[(e1,e2)] = score
                e1 = str(e1)
                e2 = str(e2)
                if self.strip:
                    e1 = ut.strip_namespace(e1,['/','#','CID'])
                    e2 = ut.strip_namespace(e2,['/','#','CID'])
                out[e1].append(e2)
        self.mappings = out
        self.scores = scores

Ancestors

Methods

def load_rdf(self)
Expand source code
def load_rdf(self):
    out = defaultdict(list)
    scores = defaultdict(lambda : 0.0)
    g = Graph()
    g.parse(self.filename)
    o = URIRef('http://knowledgeweb.semanticweb.org/heterogeneity/alignmentCell')
    for s in g.subjects(predicate=RDF.type, object = o):
        e1 = list(g.objects(subject=s,predicate=URIRef('http://knowledgeweb.semanticweb.org/heterogeneity/alignmententity1'))).pop(0)
        e2 = list(g.objects(subject=s,predicate=URIRef('http://knowledgeweb.semanticweb.org/heterogeneity/alignmententity2'))).pop(0)
        score = list(g.objects(subject=s,predicate=URIRef('http://knowledgeweb.semanticweb.org/heterogeneity/alignmentmeasure'))).pop(0)
        
        score = float(score)
        if score >= self.threshold and (score > scores[(e1,e2)] or not self.unique):
            scores[(e1,e2)] = score
            e1 = str(e1)
            e2 = str(e2)
            if self.strip:
                e1 = ut.strip_namespace(e1,['/','#','CID'])
                e2 = ut.strip_namespace(e2,['/','#','CID'])
            out[e1].append(e2)
    
    self.mappings = out
    self.scores = scores
def load_txt(self)
Expand source code
def load_txt(self):
    out = defaultdict(list)
    scores = defaultdict(lambda : 0.0)
    try:
        df = pd.read_csv(self.filename, sep='|', header=0, names=['e1','e2','type','score','is_instance'])
    except:
        df = pd.read_csv(self.filename, sep='|', header=0, names=['e1','e2','score'])
        
    for e1,e2,score in zip(df['e1'],df['e2'],df['score']):
        score = float(score)
        if score >= self.threshold and (score > scores[(e1,e2)] or not self.unique):
            scores[(e1,e2)] = score
            e1 = str(e1)
            e2 = str(e2)
            if self.strip:
                e1 = ut.strip_namespace(e1,['/','#','CID'])
                e2 = ut.strip_namespace(e2,['/','#','CID'])
            out[e1].append(e2)
    self.mappings = out
    self.scores = scores

Inherited members

class NCBIToEOL (verbose=False)

Class which creates ncbi to eol mapping.

Expand source code
class NCBIToEOL(WikidataMapping):
    def __init__(self, verbose=False):
        """Class which creates ncbi to eol mapping."""
        query = """
        SELECT ?from ?to WHERE
            { 
            [] wdt:P31 wd:Q16521 ; 
               wdt:P685 ?from ;
               wdt:P830 ?to .
            }
        """
        super(NCBIToEOL, self).__init__(query=query, verbose=verbose)

Ancestors

Inherited members

class NCBIToEcotox (dataobject1, dataobject2, verbose=False)

Class which creates ncbi to ecotox mapping.

Expand source code
class NCBIToEcotox(StringGraphMapping):
    def __init__(self, dataobject1, dataobject2, verbose=False):
        """Class which creates ncbi to ecotox mapping."""
        super(NCBIToEcotox, self).__init__(dataobject1.graph,
                                           dataobject2.graph, 
                                           verbose=verbose)

Ancestors

Inherited members

class StringGraphMapping (g1, g2, threshold=0.95, verbose=False)

Class for creating mapping between two graph using string matching.

Parameters

g1 : rdflib.Graph
 
g2 : rdflib.Graph
 
threshold : float
Alignment threshold.
Expand source code
class StringGraphMapping(Alignment):
    def __init__(self, g1, g2, threshold = 0.95, verbose=False):
        """
        Class for creating mapping between two graph using string matching. 
        
        Parameters
        ----------
        g1 : rdflib.Graph 
        
        g2 : rdflib.Graph 
        
        threshold : float 
            Alignment threshold.
                
        """
        super(StringGraphMapping, self).__init__(verbose=verbose)
        
        self.threshold = threshold
        self.g1 = g1
        self.g2 = g2
    
    def load(self):
        dict1 = ut.graph_to_dict(self.g1)
        dict2 = ut.graph_to_dict(self.g2)
        
        tmp = defaultdict(float)
        for k1 in dict1:
            for k2 in dict2:
                try:
                    _, score = process.extractOne(dict1[k1],dict2[k2])
                except TypeError:
                    score = 0
                    
                if score >= self.threshold:
                    tmp[k1,k2] = max(tmp[k1,k2],score)
        
        self.mappings = {k1:[k2] for k1,k2 in tmp}

Ancestors

Subclasses

Inherited members

class StringMatchingMapping (dict1, dict2, threshold=0.95, verbose=False)

Class for creating mapping between two label dictonaries using string matching.

Parameters

dict1 : dict
Dictonary on the form {entity:list of labels}
dict2 : dict
Same as dict1.
threshold : float
Alignment threshold.
Expand source code
class StringMatchingMapping(Alignment):
    def __init__(self, dict1, dict2, threshold = 0.95, verbose=False):
        """
        Class for creating mapping between two label dictonaries using string matching. 
        
        Parameters
        ----------
        dict1 : dict 
            Dictonary on the form {entity:list of labels} 
        
        dict2 : dict 
            Same as dict1.
            
        threshold : float 
            Alignment threshold.
        """
        super(StringMatchingMapping, self).__init__(verbose=verbose)
        
        self.threshold = threshold
        self.dict1 = dict1
        self.dict2 = dict2
    
    def load(self):
        tmp = defaultdict(float)
        for k1 in self.dict1:
            for k2 in self.dict2:
                try:
                    _, score = process.extractOne(self.dict1[k1],self.dict2[k2])
                except TypeError:
                    score = 0
                    
                if score >= self.threshold:
                    tmp[k1,k2] = max(tmp[k1,k2],score)
        
        self.mappings = {k1:[k2] for k1,k2 in tmp}

Ancestors

Inherited members

class WikidataMapping (query, verbose=False)

Class for loading mappings from wikidata.

Parameters

query : str

Wikidata query with two variables.

eg. from inchikey to cas:

SELECT ?from ?to {
?compound wdt:P235 ?from . ?compound wdt:P231 ?to .}

Expand source code
class WikidataMapping(Alignment):
    def __init__(self, query, verbose=False):
        """
        Class for loading mappings from wikidata.
        
        Parameters
        ----------
        query : str 
            Wikidata query with two variables. 
            
            eg. from inchikey to cas: 
            
            SELECT ?from ?to {  
            ?compound wdt:P235 ?from . 
            ?compound wdt:P231 ?to .} 
        """
        super(WikidataMapping, self).__init__(verbose=verbose)
        self.query = query
        
    def load(self):
        res = ut.query_endpoint('https://query.wikidata.org/sparql', 
                             self.query, 
                             var = ['from', 'to'])
        self.mappings = {str(f):[str(t)] for f,t in res}

Ancestors

Subclasses

Inherited members