#!/usr/bin/env python3
from __future__ import annotations
import copy
import hashlib
import json
import os
import pickle
import random
import re
import sys
import tempfile
import time
import urllib.request
from datetime import timedelta
from typing import TYPE_CHECKING, Optional, Union
from urllib.error import HTTPError
import astropy.units as u # type: ignore
import numpy as np
from packaging.version import Version
import amespahdbpythonsuite as suite
from amespahdbpythonsuite.xmlparser import XMLparser
if TYPE_CHECKING:
from amespahdbpythonsuite.geometry import Geometry
from amespahdbpythonsuite.laboratory import Laboratory
from amespahdbpythonsuite.species import Species
from amespahdbpythonsuite.transitions import Transitions
[docs]
class AmesPAHdb:
"""
AmesPAHdbPythonSuite main class.
Contains methods to parse the database, perform search based on query,
and retrieve UIDs.
Calls classes:
:class:`amespahdbpythonsuite.transitions.Transitions`,
:class:`amespahdbpythonsuite.laboratory.Laboratory`,
:class:`amespahdbpythonsuite.species.Species`,
:class:`amespahdbpythonsuite.geometry.Geometry`,
to retrieve the respective instances.
"""
def __init__(self, **keywords) -> None:
"""
Initialize amespahdbpythonsuite class. Prints basic PAHdb info
and calls the :meth:`amespahdbpythonsuite.amespahdb.parsefile`
method to parse or restore the database.
"""
intro = [
"AmesPAHdbPythonSuite\n",
"by\n",
"Dr. Christiaan Boersma\n",
"and\n",
"Dr. Alexandros Maragkoudakis\n",
"Dr. Matthew J. Shannanon\n",
"Dr. Joseph E. Roser\n",
]
self.message(intro)
self.message(f"SUITE VERSION: {suite.__version__}")
if keywords.get("update", False) or (
keywords.get("update", True) and random.randint(0, 4) == 4
):
self.message("CHECKING FOR UPDATE")
github = "http://api.github.com/repos/pahdb/amespahdbpythonsuite/tags"
try:
with urllib.request.urlopen(github) as url:
data = json.load(url)
versions = [Version(tag["name"]) for tag in data]
versions.sort()
if Version(suite.__version__) < versions[-1]:
update = versions[-1].public
self.message(f"V{update} UPDATE AVAILABLE")
else:
self.message("NO UPDATE AVAILABLE")
except HTTPError:
self.message("FAILED TO CHECK FOR UPDATE")
self.message("WEBSITE: WWW.ASTROCHEM.ORG/PAHDB/")
self.message("CONTACT: CHRISTIAAN.BOERSMA@NASA.GOV")
filename = keywords.get("filename")
if not filename:
filename = os.environ.get("AMESPAHDEFAULTDB")
if not filename:
msg = [
"DATABASE NOT FOUND:",
"SET SYSTEM AMESPAHDEFAULTDB",
"ENVIRONMENT VARIABLE",
]
self.message(" ".join(msg))
# TODO: Turn the sys.exit into exceptions.
sys.exit(1)
if (
not filename
or not os.path.isfile(filename)
or not os.access(filename, os.R_OK)
):
self.message(f"UNABLE TO READ: {filename}")
# TODO: Turn the sys.exit into exceptions.
sys.exit(2)
self.__data: dict = dict()
self._joined = None
self.parsefile(
filename,
cache=keywords.get("cache", True),
check=keywords.get("check", True),
)
[docs]
def parsefile(self, filename: str, **keywords) -> None:
"""
Method to parse or restore the database from cache.
Called by :meth:`amespahdbpythonsuite.amespahdb.__init__` method.
Parameters:
filname : str
String of the PAHdb filename and path.
keywords
Arbitrary keyword arguments.
"""
# Create MD5 hash function pickle.
md5 = (
tempfile.gettempdir()
+ "/"
+ hashlib.md5(open(filename, "r").read().encode()).hexdigest()
+ ".pkl"
)
# Check if database is dumped in cache and restore it.
if (
keywords.get("cache", True)
and os.path.isfile(md5)
and os.access(md5, os.R_OK)
):
self.message("RESTORING DATABASE FROM CACHE")
# Start timer.
tstart = time.perf_counter()
# Open and read the dumped database.
with open(md5, "rb") as f:
self.__data = pickle.load(f)
# Store the dumped database filename.
self.__data["filename"] = md5
# Stop timer and calculate elapsed time.
elapsed = timedelta(seconds=(time.perf_counter() - tstart))
info = [
f"FILENAME : {md5}",
f"ORIGNINAL FILENAME : {filename}",
f"PARSE TIME : {elapsed}",
f'DATABASE : {self.__data["database"]}',
f'VERSION (DATE) : {self.__data["version"]} ({self.__data["date"]})',
f'COMMENT : {self.__data["comment"]}',
]
self.message(info, space=0)
else:
self.message("PARSING DATABASE: THIS MAY TAKE A FEW MINUTES")
# Start timer.
tstart = time.perf_counter()
# Call XMLparser module to parse the database.
parser = XMLparser(filename=filename, validate=keywords.get("check", True))
# Store the database into self.__self.data.
self.__data = parser.to_pahdb_dict()
# Dump the database into pickle in the cache directory.
if keywords.get("cache", True):
with open(md5, "wb") as f:
pickle.dump(self.__data, f, pickle.HIGHEST_PROTOCOL)
# Store the dumped database filename.
self.__data["filename"] = filename
# Stop timer and calculate elapsed time.
elapsed = timedelta(seconds=(time.perf_counter() - tstart))
info = [
f'FILENAME : {self.__data["filename"]}',
f"PARSE TIME : {elapsed}",
f'DATABASE : {self.__data["database"]}',
f'VERSION (DATE) : {self.__data["version"]} ({self.__data["date"]})',
f'COMMENT : {self.__data["comment"]}',
]
self.message(info, space=0)
def __repr__(self) -> str:
"""
Class representation.
"""
return f"{self.__class__.__name__}(" f"filename={self.__data['filename']})"
def __str__(self) -> str:
"""
A description of the instance.
"""
return "AmesPAHdbPythonSuite AmesPAHdb instance."
def __getkeybyuids(self, key: str, uids: list[int]) -> dict:
"""
Get a dictionary of PAHdb properties
retrieved by keyword for provided UIDs.
Parameters:
key : str
Database keyword.
uids : list of integers
List of UIDs.
Returns:
Dictionary of retrieved properties with UIDs as keys.
"""
if key == "species":
return copy.deepcopy(
dict(
(uid, self.__data["species"][uid])
for uid in uids
if uid in self.__data["species"].keys()
)
)
else:
return copy.deepcopy(
dict(
(uid, self.__data["species"][uid][key])
for uid in uids
if uid in self.__data["species"].keys()
)
)
[docs]
def gettransitionsbyuid(self, uids: Union[list[int], int]) -> Transitions:
"""
Retrieve and return transitions instance based on UIDs input.
UIDs should be a list, e.g. the output of search method.
Calls the :class:`amespahdbpythonsuite.transitions.Transitions` class.
Parameters:
uids : list of integers
List of UIDs.
Returns:
transitions instance
"""
uids_list = list()
if isinstance(uids, int):
uids_list.append(uids)
else:
uids_list = uids
d = self.__getkeybyuids("transitions", uids_list)
from amespahdbpythonsuite import transitions
return transitions.Transitions(
database=self.__data["database"],
version=self.__data["version"],
data=d,
pahdb=self.__data,
uids=list(d.keys()),
model={"type": "zerokelvin_m", "temperature": 0.0, "description": ""},
units={
"abscissa": {
"unit": u.cm**-1,
"label": "frequency",
},
"ordinate": {"unit": u.km / u.mol, "label": "integrated cross-section"},
},
)
[docs]
def getlaboratorybyuid(self, uids: Union[list[int], int]) -> Optional[Laboratory]:
"""
Retrieve and return laboratory database instance based on UIDs input.
UIDs should be a list, e.g. the output of search method.
Calls the :class:`amespahdbpythonsuite.laboratory.Laboratory` class.
Parameters:
uids : list of integers
List of UIDs.
Returns:
laboratory database instance
"""
# Check if the experimental database is loaded.
if self.__data["database"] != "experimental":
self.message("EXPERIMENTAL DATABASE REQUIRED")
return None
uids_list = list()
if isinstance(uids, int):
uids_list.append(uids)
else:
uids_list = uids
d = self.__getkeybyuids("laboratory", uids_list)
from amespahdbpythonsuite import laboratory
return laboratory.Laboratory(
database=self.__data["database"],
version=self.__data["version"],
data=d,
pahdb=self.__data,
uids=list(d.keys()),
model={"type": "laboratory_m", "temperature": 0.0, "description": ""},
units={
"abscissa": {"unit": u.cm**-1, "label": "frequency"},
"ordinate": {
"unit": u.def_unit(
"absorbance",
format={"latex": r"-\log(I/I_{0})"},
doc="Absorbance",
),
"label": "absorbance",
},
},
)
[docs]
def getspeciesbyuid(self, uids: Union[list[int], int]) -> Species:
"""
Retrieve and return species instance based on UIDs input.
UIDs should be a list, e.g. the output of search method.
Calls the :class:`amespahdbpythonsuite.species.Species` class.
Parameters:
uids : list of integers
List of UIDs.
Returns:
species instance
"""
uids_list = list()
if isinstance(uids, int):
uids_list.append(uids)
else:
uids_list = uids
d = self.__getkeybyuids("species", uids_list)
from amespahdbpythonsuite import species
return species.Species(
database=self.__data["database"],
version=self.__data["version"],
data=d,
pahdb=self.__data,
uids=list(d.keys()),
references=self.__getkeybyuids("references", uids_list),
comments=self.__getkeybyuids("comments", uids_list),
)
[docs]
def getgeometrybyuid(self, uids: Union[list[int], int]) -> Geometry:
"""
Retrieve and return geometry instance based on UIDs input.
UIDs should be a list, e.g. the output of search method.
Calls the :class:`amespahdbpythonsuite.geometry.Geometry` class
and :meth:`amespahdbpythonsuite.amespahdb.__getkeybyuids` method.
Parameters:
uids : list of integers
List of UIDs.
Returns:
geometry instance
"""
uids_list = list()
if isinstance(uids, int):
uids_list.append(uids)
else:
uids_list = uids
d = self.__getkeybyuids("geometry", uids_list)
from amespahdbpythonsuite import geometry
return geometry.Geometry(
database=self.__data["database"],
version=self.__data["version"],
data=d,
pahdb=self.__data,
uids=list(d.keys()),
)
[docs]
def search(self, query: str) -> Optional[list]:
"""
Search the database based on query input.
Parameters:
query : str
String containing search query.
Returns:
List of UIDs
Example:
``search('magnesium=0 oxygen=0 iron=0
silicium=0 chx=0 ch2=0 c>20 h>0')``
"""
if not query:
return None
words = list()
n = len(query)
i = 0
while True:
if i == n:
break
while query[i] == " ":
i += 1
token = query[i]
if token in ["=", "<", ">", "(", ")"]:
i += 1
if i < n and query[i] == "=":
token += query[i]
i += 1
elif token == "&":
i += 1
if i < n and query[i] == "&":
token += query[i]
i += 1
elif token == "|":
i += 1
if i < n and query[i] == "|":
token += query[i]
i += 1
elif token == "!":
i += 1
if i < n and query[i] == "=":
token += query[i]
i += 1
else:
i += 1
while i < n and query[i] not in [
" ",
"=",
"<",
">",
"&",
"|",
"(",
")",
"!",
]:
token += query[i]
i += 1
words.append(token)
tokens = list()
for word in words:
tokens.append(self._tokenize(word))
code = self._parsetokens(tokens)
if not code:
return None
found = eval(
f"[item[0] for item in _AmesPAHdb__data['species'].items() if ({code})]",
self.__dict__ | {"np": np},
)
return found
def _tokenize(self, word: str) -> dict:
"""
A method called by :sec:`amespahdbpythonsuite.amespahdb.search`
that creates a dictionary with keys based on the input word/category.
Parameters:
word : str
"""
word = word.lower()
token = {"type": "", "translation": "", "valid": False}
charge = {
"anion": 'item[1]["charge"] < 0',
"cation": 'item[1]["charge"] > 0',
"neutral": 'item[1]["charge"] == 0',
"positive": 'item[1]["charge"] > 0',
"negative": 'item[1]["charge"] < 0',
"-": 'item[1]["charge"] == -1',
"+": 'item[1]["charge"] == 1',
"++": 'item[1]["charge"] == 2',
"+++": 'item[1]["charge"] == 3',
"---": 'item[1]["charge"] == -3',
}
identities = {
"uid": "item[0]",
"identifier": "item[0]",
"hydrogen": 'item[1]["n_h"]',
"carbon": 'item[1]["n_c"]',
"nitrogen": 'item[1]["n_n"]',
"oxygen": 'item[1]["n_o"]',
"magnesium": 'item[1]["n_mg"]',
"silicium": 'item[1]["n_si"]',
"iron": 'item[1]["n_fe"]',
"h": 'item[1]["n_h"]',
"c": 'item[1]["n_c"]',
"n": 'item[1]["n_n"]',
"o": 'item[1]["n_o"]',
"mg": 'item[1]["n_mg"]',
"si": 'item[1]["n_si"]',
"fe": 'item[1]["n_fe"]',
"ch": 'item[1]["n_ch"]',
"ch2": 'item[1]["n_ch2"]',
"ch3": 'item[1]["n_ch3"]',
"chx": 'item[1]["n_chx"]',
"solo": 'item[1]["n_solo"]',
"duo": 'item[1]["n_duo"]',
"trio": 'item[1]["n_trio"]',
"quartet": 'item[1]["n_quartet"]',
"quintet": 'item[1]["n_quintet"]',
"charge": 'item[1]["charge"]',
"symmetry": 'item[1]["symmetry"]',
"weight": 'item[1]["weight"]',
"scale": 'item[1]["scale"]',
"energy": 'item[1]["total_e"]',
"zeropoint": 'item[1]["vib_e"]',
"experiment": 'item[1]["exp"]',
}
strings: list[str] = []
if self.__data["database"] == "clusters/theoretical":
identities.update(
{
"monomers": 'item[1]["monomers"]',
"type": 'item[1]["type"]',
"conformation": 'item[1]["conformation"]',
}
)
strings.extend(
set([s["monomers"] for s in self.__data["species"].values()])
)
strings.extend(set([s["type"] for s in self.__data["species"].values()]))
strings.extend(
set([s["conformation"] for s in self.__data["species"].values()])
)
composed = {
"wavenumber": 'np.array([t["frequency"] {operator} {operand} for t in item[1]["transitions"]])',
"absorbance": 'np.array([t["intensity"] {operator} {operand} for t in item[1]["transitions"]])',
"frequency": 'np.array([t["frequency"] {operator} {operand} for t in item[1]["transitions"]])',
"intensity": 'np.array([t["intensity"] {operator} {operand} for t in item[1]["transitions"]])',
}
logical = {"and": "and", "or": "or", "|": "or", "&": "and"}
comparison = {
"<": "<",
"lt": "<",
">": ">",
"gt": ">",
"=": "==",
"eq": "==",
"<=": "<=",
"le": "<=",
">=": ">=",
"ge": ">=",
"with": "and",
"ne": "!=",
"!=": "!=",
}
transfer = {"(": "(", ")": ")"}
if word.isnumeric():
token["type"] = "NUMERIC"
token["translation"] = word
elif word in charge:
token["type"] = "CHARGE"
token["translation"] = charge[word]
elif word in identities:
token["type"] = "IDENTITY"
token["translation"] = identities[word]
elif word in composed:
token["type"] = "COMPOSED"
token["translation"] = composed[word]
elif word in logical:
token["type"] = "LOGICAL"
token["translation"] = logical[word]
elif word in comparison:
token["type"] = "COMPARISON"
token["translation"] = comparison[word]
elif word in transfer:
token["type"] = "TRANSFER"
token["translation"] = transfer[word]
elif word in strings:
token["type"] = "STRING"
token["translation"] = f"'{word}'"
elif re.search(
"(mg+|si+|fe+|[chno]+)([0-9]*)(mg+|si+|fe+|[chno]+)([0-9]*)(mg+|si+|fe+|[chno]*)([0-9]*)",
word,
):
token["type"] = "FORMULA"
token["translation"] = f"item[1]['formula'] == '{word.upper()}'"
else:
# TODO: add search by compound name
token["type"] = "IGNORE"
token["translation"] = word
return token
token["valid"] = True
return token
def _parsetokens(self, tokens: list) -> str:
"""
Parse the dictionary of tokens created by
:sec:`amespahdbpythonsuite.amespahdb.tokenize`
and return string of expressions.
Parameters:
tokens : list
List of dictionaries.
Returns:
parsed : str
String of expressions based on tokens.
"""
ntokens = len(tokens)
prev = -1
current = 0
next = 1 if ntokens > 1 else -1
parsed = ""
sub = ""
while current != -1:
if tokens[current]["type"] == "FORMULA":
if prev > -1:
if not (
tokens[prev]["type"] != "LOGICAL" and tokens[prev]["valid"]
):
parsed += " or "
parsed += tokens[current]["translation"]
elif tokens[current]["type"] == "IDENTITY":
if prev > -1:
if not (
tokens[prev]["type"] == "LOGICAL"
or tokens[prev]["type"] == "TRANSFER"
and tokens[prev]["valid"]
):
parsed += " and "
if next > -1:
if tokens[next]["type"] == "COMPARISON":
parsed += " " + tokens[current]["translation"]
else:
parsed += " " + tokens[current]["translation"] + " > 0"
elif tokens[current]["type"] == "COMPOSED":
if prev > -1:
if not (
tokens[prev]["type"] == "LOGICAL"
or tokens[prev]["type"] == "TRANSFER"
and tokens[prev]["valid"]
):
if sub:
sub += " & "
else:
parsed += " and "
if next > -1:
if tokens[next]["type"] != "COMPARISON":
print("EXPECTING OPERATOR")
return ""
partial = tokens[current]["translation"].replace(
"{operator}", tokens[next]["translation"]
)
prev = current
current = next
if next:
if next == ntokens - 1:
next = -1
else:
next += 1
if next > -1 and tokens[next]["type"] != "NUMERIC":
print("EXPECTING OPERAND")
return ""
sub += " " + partial.replace(
"{operand}", tokens[next]["translation"]
)
prev = current
current = next
if next:
if next == ntokens - 1:
next = -1
else:
next += 1
elif tokens[current]["type"] == "NUMERIC":
if prev > -1:
if tokens[prev]["type"] == "COMPARISON" and tokens[prev]["valid"]:
parsed += " " + tokens[current]["translation"]
else:
tokens[current]["valid"] = False
elif tokens[current]["type"] == "LOGICAL":
if prev > -1:
if (
tokens[prev]["type"] == "IDENTITY"
or tokens[prev]["type"] == "NUMERIC"
or tokens[prev]["type"] == "FORMULA"
or tokens[prev]["type"] == "CHARGE"
and tokens[prev]["valid"]
):
if next > -1:
if tokens[next]["type"] == "TRANSFER":
parsed += tokens[current]["translation"]
elif tokens[next]["type"] == "COMPOSED" and sub:
sub += {"and": " & ", "or": " | "}[
tokens[current]["translation"]
]
elif (
tokens[next]["type"] == "IDENTITY"
or tokens[next]["type"] == "NUMERIC"
or tokens[next]["type"] == "FORMULA"
or tokens[next]["type"] == "CHARGE"
):
parsed += " " + tokens[current]["translation"]
else:
tokens[current]["valid"] = False
elif tokens[current]["type"] == "COMPARISON":
if prev > -1:
if tokens[prev]["type"] == "IDENTITY" and tokens[prev]["valid"]:
if next is not None:
if tokens[next]["type"] in ["NUMERIC", "STRING"]:
parsed += " " + tokens[current]["translation"]
else:
tokens[current]["valid"] = False
elif tokens[current]["type"] == "CHARGE":
if prev > -1:
if not (
tokens[prev]["type"] == "LOGICAL" and tokens[prev]["valid"]
):
parsed += " and "
parsed += " " + tokens[current]["translation"]
elif tokens[current]["type"] == "TRANSFER":
parsed += tokens[current]["translation"]
elif tokens[current]["type"] == "NAME":
if prev > -1:
if not (
tokens[prev]["type"] == "LOGICAL" and tokens[prev]["valid"]
):
parsed += " and "
# TODO implement name
# parsed += f"item[1]['comments'] == {tokens[current]['translation']}"
elif tokens[current]["type"] == "STRING":
if prev > -1:
if tokens[prev]["type"] in ["COMPARISON", "NUMERIC"] and tokens[prev]["valid"]:
parsed += tokens[current]["translation"]
else:
print("EXPECTING COMPARISON")
tokens[current]["valid"] = False
else:
print("EXPECTING COMPARISON")
tokens[current]["valid"] = False
elif tokens[current]["type"] == "IGNORE":
print(f"'{tokens[current]['translation']}' NOT UNDERSTOOD")
return ""
prev = current
current = next
if next:
if next == ntokens - 1:
next = -1
else:
next += 1
if sub:
parsed += f" np.any({sub})"
print(parsed)
return parsed
[docs]
def getversion(self) -> str:
"""
Method to retrieve the PAHdb version.
Returns:
String of PAHdb version.
"""
return self.__data["version"]
[docs]
def checkversion(self, version: str) -> bool:
"""
Method to check against a PAHdb version.
Returns:
Boolean whether a provided version matched the PAHdb version.
"""
return version == self.__data["version"]
[docs]
def gettype(self) -> str:
"""
Method to retrieve the PAHdb type.
Returns:
String of PAHdb type.
"""
return self.__data["database"]
[docs]
def getdatabaseref(self) -> dict:
"""
Method to retrieve the database.
Returns:
Dictionary containing the parsed database.
"""
return self.__data
[docs]
def extenddatabase(self, **keywords) -> None:
"""
Method to extend the database.
Returns:
None
"""
if not self.__data:
self.message("NO VALID DATABASE")
if keywords.get("species", False):
self.__data["species"].update(keywords["species"])
[docs]
@staticmethod
def message(text, space: int = 55) -> None:
"""
A method to print terminal message.
Parameters:
text : string or list of strings.
Text to be displayed.
space : integer
Number to indent the text.
"""
line = (space + 2) * "="
print(line)
if isinstance(text, list):
for t in text:
print(t.center(space))
else:
print(text.center(space))
print(line)
print()