Module tera.utils
Utilities used by other modules.
Expand source code
"""
Utilities used by other modules.
"""
from SPARQLWrapper import SPARQLWrapper, JSON
from functools import wraps
from rdflib import Literal
from collections import defaultdict
import warnings
from tqdm import tqdm
from quantulum3 import parser
from itertools import combinations
nan_values = ['nan', float('nan'),'--','-X','NA','NC',-1,'','sp.', -1,'sp,','var.','variant','NR']
unit_lookup = defaultdict(lambda: '')
unit_lookup.update({'mg':'Milligram',
'ug': 'Microgram',
'kg':'Kilogram',
'mM':'Millimol',
'ng':'Nanogram',
'g':'Gram',
'µg':'Microgram',
'L':'Litre',
'%':'Percent',
'cm':'Centimetre',
'mm':'Millimetre',
'nm':'Nanometre',
'deg':'Degree',
'C':'Celcius',
'K':'Kelvin',
'l':'Litre',
'psu':'PracticalSalinityUnit',
'h':'Hour',
'd':'Day',
'w':'Week'
}
)
prefix_table = {'kilo':1000,
'hekto':100,
'deka':10,
'desi':0.1,
'centi':0.01,
'milli':1e-3,
'micro':1e-6,
'nano':1e-9,
'percent':0.01}
base_units = ['gram','mol','litre','metre']
def unit_parser(string):
"""
Takes a unit string and converts to UNIT namespace string.
eg. mg/L -> MilligramPerLitre
Filters out assumed missprints, eg. mg%/L -> MilligramPerLitre.
Parameters
----------
string : str
Unit string.
Returns
-------
str
Raises
------
"""
if len(string) < 2 and string not in unit_lookup:
return ''
if 'dm^3' in string:
string.replace('dm^3','L')
if 'dm3' in string:
string.replace('dm3','L')
for elem,name in zip(['/','^2','^3',' '],['Per','Squared','Cubed','']):
if elem in string:
a,b = string.split(elem, 1)
return unit_parser(a) + name + unit_parser(b)
if '-1' in string:
return unit_parser(string.replace('-1','/'))
if string in unit_lookup:
return unit_lookup[string]
else:
res1 = [string[x:y] for x, y in combinations(range(len(string) + 1), r = 2)]
res1.remove(string)
res2 = map(unit_parser, res1)
res = zip(res2,res1)
res = [(a,b) for a,b in res if len(a) > 1]
if res:
u,_ = sorted(res, key=lambda x:len(x[1]),reverse=True).pop(0)
return u
return ''
def _units_of_same_type(unit1, unit2):
unit1 = unit1.lower()
unit2 = unit2.lower()
for prefix in ['milli','nano','micro','kilo','centi']:
unit1 = unit1.replace(prefix,'')
unit2 = unit2.replace(prefix,'')
unit1 = unit1.replace('mol','gram')
unit2 = unit2.replace('mol','gram')
if 'per' in unit1 and 'per' in unit2:
a1,b1 = unit1.split('per',1)
a2,b2 = unit2.split('per',1)
return _units_of_same_type(a1,a2) and _units_of_same_type(b1,b2)
if unit1 == unit2:
return True
return False
def _to_base_unit(unit):
unit = unit.lower()
if unit in base_units:
return 1
if 'per' in unit:
a,b = unit.split('per',1)
return _to_base_unit(a) / _to_base_unit(b)
if 'squared' in unit:
a,b = unit.split('squared',1)
return _to_base_unit(a)**2 * _to_base_unit(b)
if 'cubed' in unit:
a,b = unit.split('cubed',1)
return _to_base_unit(a)**3 * _to_base_unit(b)
if unit in prefix_table:
return prefix_table[unit]
tmp = unit
for bs in base_units:
unit = unit.replace(bs,'')
if unit != tmp:
return _to_base_unit(unit)
return 0
def unit_conversion(from_unit, to_unit, molecular_mass=None):
"""
Calculates the conversion factor from one unit to another.
Parameters
----------
from_unit : URIRef
to_unit : URIRef
molecular_mass : float
If converting to or from MOL this is needed.
Returns
-------
factor : float
The conversion ratio between from_unit and to_unit.
new_scalar = old_scalar*factor .
Returns 0 if no conversion is found.
Raises
------
AssertionError:
* If from_unit and to_unit is not on the same form. eg. MillimolPerLitre and MillimetrePerLiter raises error, while MillimolPerLitre and MilligramPerLiter does not.
* If either unit contails mol, without input of molecular_mass.
KeyError:
* If conversion is not in prefix table.
"""
if from_unit == to_unit:
return 1
from_unit = strip_namespace(from_unit,['/','#'])
to_unit = strip_namespace(to_unit,['/','#'])
assert _units_of_same_type(from_unit, to_unit)
from_unit = from_unit.lower()
to_unit = to_unit.lower()
mm_f = 1
mm_t = 1
if 'mol' in from_unit:
assert molecular_mass
mm_f = molecular_mass
from_unit = from_unit.replace('mol','gram')
if 'mol' in to_unit:
assert molecular_mass
mm_t = molecular_mass
to_unit = to_unit.replace('mol','gram')
return (mm_f * _to_base_unit(from_unit)) / (mm_t * _to_base_unit(to_unit))
def tanimoto(fp1, fp2):
"""
Calculate tanimoto similarity between two chemical fingerprints.
Parameters
----------
fp1 : str
Chemical fingerprint on binary form.
fp2 : str
Chemical fingerprint on binary form.
Returns
-------
float
"""
fp1_count = fp1.count('1')
fp2_count = fp2.count('1')
both_count = (fp1 & fp2).count('1')
return float(both_count) / (fp1_count + fp2_count - both_count)
def test_endpoint(endpoint):
"""
Test SPARQL endpoint.
Parameters
----------
endpoint : str
SPARQL endpoint URL. ex: https://query.wikidata.org/sparql
Returns
-------
bool
"""
sparql = SPARQLWrapper(endpoint)
q = """
SELECT ?s ?p ?o
WHERE {?s ?p ?o}
LIMIT 100
"""
sparql.setQuery(q)
sparql.setReturnFormat(JSON)
try:
results = sparql.query().convert()
return True
except:
return False
def query_endpoint(endpoint, q, var = 'p'):
"""
Wrapper for quering SPARQL endpoint with SPARQLWrapper.
Parameters
----------
endpoint : str
SPARQL endpoint URL.
q : str
SPARQL query.
var : str or list
Query variables to return.
Returns
-------
set
Set of tuple query results. Tuple in order specified in input var.
"""
if not isinstance(var, list):
var = [var]
sparql = SPARQLWrapper(endpoint)
out = {}
try:
sparql.setQuery(q)
sparql.setReturnFormat(JSON)
results = sparql.query().convert()
for v in var:
try:
out[v] = [r[v]['value'] for r in results['results']['bindings']]
except KeyError:
out[v] = [None] * len(results['results']['bindings'])
return set(zip(*[out[k] for k in out]))
except Exception as e:
print(e)
warnings.warn('Query failed:\n' + q, UserWarning)
return set()
def query_graph(graph, q):
"""
Query rdflib.Graph.
Parameters
----------
graph : rdflib.Graph
q : str
SPARQL query
Returns
-------
set
"""
try:
return set(graph.query(q))
except Exception as e:
return set()
def prefixes(initNs):
"""
Format prefixes for SPARQL.
Parameters
----------
initNs : dict
ex : {'ex':'http://example.org'}
Returns
-------
str
"""
q = ''
for k,i in initNs.items():
q += "PREFIX\t"+k+':\t' + '<'+str(i)+'>\n'
return q
def strip_namespace(string, var = ['/']):
"""
Remove namespace from URI.
Parameters
----------
string : str
URI
var : str or list
Symbols to split string. ex. / or #.
Returns
-------
str
"""
if not isinstance(var,list):
var = [var]
tmp1 = str(string)
for v in var:
tmp2 = str(string).split(v)[-1]
if len(tmp2) < len(tmp1):
tmp1 = tmp2
return tmp1
def do_recursively_in_class(func):
"""Enables function to take either element or iterable as input.
Returns
-------
function
"""
@wraps(func)
def call_recursively(my_class_instance, x, **kwargs):
if isinstance(x, (list,set,tuple)):
f = lambda x: func(my_class_instance, x, **kwargs)
out = {}
pbar = lambda x: x
if hasattr(my_class_instance, 'verbose'):
if my_class_instance.verbose:
pbar = lambda x: tqdm(x)
return dict(zip(x,map(f,pbar(x))))
else:
return func(my_class_instance, x, **kwargs)
return call_recursively
def graph_to_dict(graph):
"""
Map entities in graph to a dict.
Parameters
----------
graph : rdflib.Graph
Returns
-------
dict
On the form {entity : list of literals connected to entity}
"""
entities = graph.subjects()
d = defaultdict(list)
for e in entities:
d[e] = [str(o) for o in graph.objects(subject=e) if isinstance(o,Literal)]
return d
Functions
def do_recursively_in_class(func)
-
Enables function to take either element or iterable as input.
Returns
function
Expand source code
def do_recursively_in_class(func): """Enables function to take either element or iterable as input. Returns ------- function """ @wraps(func) def call_recursively(my_class_instance, x, **kwargs): if isinstance(x, (list,set,tuple)): f = lambda x: func(my_class_instance, x, **kwargs) out = {} pbar = lambda x: x if hasattr(my_class_instance, 'verbose'): if my_class_instance.verbose: pbar = lambda x: tqdm(x) return dict(zip(x,map(f,pbar(x)))) else: return func(my_class_instance, x, **kwargs) return call_recursively
def graph_to_dict(graph)
-
Map entities in graph to a dict.
Parameters
graph : rdflib.Graph
Returns
dict
- On the form {entity : list of literals connected to entity}
Expand source code
def graph_to_dict(graph): """ Map entities in graph to a dict. Parameters ---------- graph : rdflib.Graph Returns ------- dict On the form {entity : list of literals connected to entity} """ entities = graph.subjects() d = defaultdict(list) for e in entities: d[e] = [str(o) for o in graph.objects(subject=e) if isinstance(o,Literal)] return d
def prefixes(initNs)
-
Format prefixes for SPARQL.
Parameters
initNs : dict ex : {'ex':'http://example.org'}
Returns
str
Expand source code
def prefixes(initNs): """ Format prefixes for SPARQL. Parameters ---------- initNs : dict ex : {'ex':'http://example.org'} Returns ------- str """ q = '' for k,i in initNs.items(): q += "PREFIX\t"+k+':\t' + '<'+str(i)+'>\n' return q
def query_endpoint(endpoint, q, var='p')
-
Wrapper for quering SPARQL endpoint with SPARQLWrapper.
Parameters
endpoint : str SPARQL endpoint URL.
q : str SPARQL query.
var : str or list Query variables to return.
Returns
set
- Set of tuple query results. Tuple in order specified in input var.
Expand source code
def query_endpoint(endpoint, q, var = 'p'): """ Wrapper for quering SPARQL endpoint with SPARQLWrapper. Parameters ---------- endpoint : str SPARQL endpoint URL. q : str SPARQL query. var : str or list Query variables to return. Returns ------- set Set of tuple query results. Tuple in order specified in input var. """ if not isinstance(var, list): var = [var] sparql = SPARQLWrapper(endpoint) out = {} try: sparql.setQuery(q) sparql.setReturnFormat(JSON) results = sparql.query().convert() for v in var: try: out[v] = [r[v]['value'] for r in results['results']['bindings']] except KeyError: out[v] = [None] * len(results['results']['bindings']) return set(zip(*[out[k] for k in out])) except Exception as e: print(e) warnings.warn('Query failed:\n' + q, UserWarning) return set()
def query_graph(graph, q)
-
Query rdflib.Graph.
Parameters
graph : rdflib.Graph
q : str SPARQL query
Returns
set
Expand source code
def query_graph(graph, q): """ Query rdflib.Graph. Parameters ---------- graph : rdflib.Graph q : str SPARQL query Returns ------- set """ try: return set(graph.query(q)) except Exception as e: return set()
def strip_namespace(string, var=['/'])
-
Remove namespace from URI.
Parameters
string : str URI var : str or list Symbols to split string. ex. / or #.
Returns
str
Expand source code
def strip_namespace(string, var = ['/']): """ Remove namespace from URI. Parameters ---------- string : str URI var : str or list Symbols to split string. ex. / or #. Returns ------- str """ if not isinstance(var,list): var = [var] tmp1 = str(string) for v in var: tmp2 = str(string).split(v)[-1] if len(tmp2) < len(tmp1): tmp1 = tmp2 return tmp1
def tanimoto(fp1, fp2)
-
Calculate tanimoto similarity between two chemical fingerprints.
Parameters
fp1 : str Chemical fingerprint on binary form.
fp2 : str Chemical fingerprint on binary form.
Returns
float
Expand source code
def tanimoto(fp1, fp2): """ Calculate tanimoto similarity between two chemical fingerprints. Parameters ---------- fp1 : str Chemical fingerprint on binary form. fp2 : str Chemical fingerprint on binary form. Returns ------- float """ fp1_count = fp1.count('1') fp2_count = fp2.count('1') both_count = (fp1 & fp2).count('1') return float(both_count) / (fp1_count + fp2_count - both_count)
def test_endpoint(endpoint)
-
Test SPARQL endpoint.
Parameters
endpoint : str SPARQL endpoint URL. ex: https://query.wikidata.org/sparql
Returns
bool
Expand source code
def test_endpoint(endpoint): """ Test SPARQL endpoint. Parameters ---------- endpoint : str SPARQL endpoint URL. ex: https://query.wikidata.org/sparql Returns ------- bool """ sparql = SPARQLWrapper(endpoint) q = """ SELECT ?s ?p ?o WHERE {?s ?p ?o} LIMIT 100 """ sparql.setQuery(q) sparql.setReturnFormat(JSON) try: results = sparql.query().convert() return True except: return False
def unit_conversion(from_unit, to_unit, molecular_mass=None)
-
Calculates the conversion factor from one unit to another.
Parameters
from_unit
:URIRef
to_unit
:URIRef
molecular_mass
:float
- If converting to or from MOL this is needed.
Returns
factor
:float
- The conversion ratio between from_unit and to_unit. new_scalar = old_scalar*factor . Returns 0 if no conversion is found.
Raises
Assertionerror
- If from_unit and to_unit is not on the same form. eg. MillimolPerLitre and MillimetrePerLiter raises error, while MillimolPerLitre and MilligramPerLiter does not.
- If either unit contails mol, without input of molecular_mass.
Keyerror
- If conversion is not in prefix table.
Expand source code
def unit_conversion(from_unit, to_unit, molecular_mass=None): """ Calculates the conversion factor from one unit to another. Parameters ---------- from_unit : URIRef to_unit : URIRef molecular_mass : float If converting to or from MOL this is needed. Returns ------- factor : float The conversion ratio between from_unit and to_unit. new_scalar = old_scalar*factor . Returns 0 if no conversion is found. Raises ------ AssertionError: * If from_unit and to_unit is not on the same form. eg. MillimolPerLitre and MillimetrePerLiter raises error, while MillimolPerLitre and MilligramPerLiter does not. * If either unit contails mol, without input of molecular_mass. KeyError: * If conversion is not in prefix table. """ if from_unit == to_unit: return 1 from_unit = strip_namespace(from_unit,['/','#']) to_unit = strip_namespace(to_unit,['/','#']) assert _units_of_same_type(from_unit, to_unit) from_unit = from_unit.lower() to_unit = to_unit.lower() mm_f = 1 mm_t = 1 if 'mol' in from_unit: assert molecular_mass mm_f = molecular_mass from_unit = from_unit.replace('mol','gram') if 'mol' in to_unit: assert molecular_mass mm_t = molecular_mass to_unit = to_unit.replace('mol','gram') return (mm_f * _to_base_unit(from_unit)) / (mm_t * _to_base_unit(to_unit))
def unit_parser(string)
-
Takes a unit string and converts to UNIT namespace string. eg. mg/L -> MilligramPerLitre Filters out assumed missprints, eg. mg%/L -> MilligramPerLitre.
Parameters
string
:str
- Unit string.
Returns
str
Raises
Expand source code
def unit_parser(string): """ Takes a unit string and converts to UNIT namespace string. eg. mg/L -> MilligramPerLitre Filters out assumed missprints, eg. mg%/L -> MilligramPerLitre. Parameters ---------- string : str Unit string. Returns ------- str Raises ------ """ if len(string) < 2 and string not in unit_lookup: return '' if 'dm^3' in string: string.replace('dm^3','L') if 'dm3' in string: string.replace('dm3','L') for elem,name in zip(['/','^2','^3',' '],['Per','Squared','Cubed','']): if elem in string: a,b = string.split(elem, 1) return unit_parser(a) + name + unit_parser(b) if '-1' in string: return unit_parser(string.replace('-1','/')) if string in unit_lookup: return unit_lookup[string] else: res1 = [string[x:y] for x, y in combinations(range(len(string) + 1), r = 2)] res1.remove(string) res2 = map(unit_parser, res1) res = zip(res2,res1) res = [(a,b) for a,b in res if len(a) > 1] if res: u,_ = sorted(res, key=lambda x:len(x[1]),reverse=True).pop(0) return u return ''