#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
===============================================================================
Created on : Fri Jun 25 21:00:10 2021
Author : Alejandro Martínez León
Mail : [alejandro.martinezleon@uni-saarland.de, ale94mleon@gmail.com]
Affiliation : Jochen Hub's Biophysics Group
Affiliation : Faculty of NS, University of Saarland, Saarbrücken, Germany
===============================================================================
DESCRIPTION :
DEPENDENCIES :
===============================================================================
"""
def warn(*args, **kwargs):
pass
import warnings
warnings.warn = warn
import os, subprocess, shutil, datetime, tempfile, time, tqdm
from collections import OrderedDict
import numpy as np
import pandas as pd
import multiprocessing as mp
from glob import glob
from kmodes.kmodes import KModes
import prolif as plf
import MDAnalysis as mda
from MDAnalysis.topology.guessers import guess_types
from rdkit.Chem import DataStructs, AllChem
from rdkit.DataStructs import ExplicitBitVect
from rdkit import Chem
from sklearn.cluster import KMeans
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import MinMaxScaler
from scipy.cluster.vq import vq
from scipy.interpolate import interp1d
from palmiche.utils import xvg
from palmiche.home import home
from typing import Union
import tarfile
PathLike = Union[os.PathLike, str, bytes]
#=======================================================================================
# Miscellanea tools
#=======================================================================================
[docs]
class CTE:
"""Some important physical constants
"""
Kb = 8.314462618E-3 #kJ/(mol⋅K) (kNA)
R = 8.314462618E-3 #kJ/(mol⋅K)
[docs]
def get_palmiche_data(file:PathLike, out_dir:PathLike = '.') -> None:
"""Get data from the data directory of Palmiche
Parameters
----------
file : PathLike
this is the path relative to the data directory of Palmiche. For example,
To get the amber99sb-star-ildn force field, you must provided
GROMACS.ff/amber99sb-star-ildn.ff.tar.gz
out_dir : PathLike, optional
Where the file will be decompress, by default '.'
"""
fname = home(file)
tar = tarfile.open(fname, "r:gz")
tar.extractall(out_dir)
tar.close()
[docs]
def RUS(data, ref_index_data = 0, NumbPoints = None, InterpolationKind = 'cubic'):
"""
RUS stands for Residues Unsigned Sums
RUS = sum_{i}^{n} (M_i - R_i)
Where M_i is the error value of the magnitude on i of the model and R_i for the reference.
RUS < 0, The model presents lower errors than the reference (in general)
RUS = 0, The model has the same errors as the reference or they cancell out
RUS > 0, The model presents worst errors than the reference
"""
# GEtting the number of points for the interpolation
if not NumbPoints:
NumbPoints = data[0].shape[0]
# Setting the reference
ref = data[ref_index_data]
# Getting the interpolation data
f_interp_data = [interp1d(ref[:,0], item[:,1], kind=InterpolationKind) for item in data]
minimum = max([item[:,0].min() for item in data])
maximum = min([item[:,0].max() for item in data])
# In this way we ensure only take those values for which we already have points in both data set
# Interpolating data nad having the same points (newx)
newx = np.linspace(minimum, maximum, num=NumbPoints, endpoint=True)
interp_data = [f_interp(newx) for f_interp in f_interp_data]
# import matplotlib.pyplot as plt
# for i in range(len(data)):
# plt.scatter(data[i][:,0], data[i][:,1], label = f'row_data{i}')
# plt.plot(newx, interp_data[i],'->', label = f'interp_data{i}', ms = 4)
# plt.legend()
# plt.show()
# Getting RSS values
RUS_values = []
for i in range(len(interp_data)):
RUS_values.append(((interp_data[i] - interp_data[ref_index_data])).sum())
return RUS_values
[docs]
def sign_change_index(values: list) -> int:
"""This function check wheter the sign of the values change.
For example, for the list [-3,-2,0,1,2], the return value will be 2.
On value "0" the sig change.
Parameters
----------
values : list
Iterable of numerical values
Returns
-------
int
The index. In case that the sign does not change or values has less than 2 elements,
it will return None.
"""
if len(values) <= 1:
return None
else:
for i in range(1, len(values)):
if values[i] * values[i-1] < 0:
return i
return None
def zerolistmaker(n):
return [0]*n
[docs]
def unit_vector(vector):
""" Returns the unit vector of the vector. """
return vector / np.linalg.norm(vector)
[docs]
def angle_between(v1, v2):
""" Returns the angle in radians between vectors 'v1' and 'v2'::
# >>> angle_between((1, 0, 0), (0, 1, 0))
# 1.5707963267948966
# >>> angle_between((1, 0, 0), (1, 0, 0))
# 0.0
# >>> angle_between((1, 0, 0), (-1, 0, 0))
# 3.141592653589793
"""
v1_u = unit_vector(v1)
v2_u = unit_vector(v2)
return np.arccos(np.clip(np.dot(v1_u, v2_u), -1.0, 1.0))
[docs]
def aovec(vectors, round = None):
"""Return the average oriented vector
Calculate all the angles respect to the cartessina axis and average the angles,andthen return the unitary vector
ang(vec, x) = mean(ang(vectors_i, x))
ang(vec, y) = mean(ang(vectors_i, y))
ang(vec, z) = mean(ang(vectors_i, z))
vec = (cos(ang(vec, x); cos(ang(vec, y); cos(ang(vec, z))
Args:
vectors ([type]): [description]
round ([type]): [description]
Returns:
[type]: [description]
"""
vectors = [np.asarray(v) for v in vectors]
#Dimension of the vectors, how many components
vec = np.empty(vectors[0].shape)
for i in range(vectors[0].shape[0]):
zero_vector = np.zeros(vectors[0].shape)
zero_vector[i] = 1
vec[i] = np.cos(np.mean([angle_between(vector, zero_vector) for vector in vectors]))
vec = unit_vector(vec)
if round: vec = np.round(vec, round)
return vec
[docs]
def get_vec_COM(conf, tpr, ndx, group1, group2, round = None):
"""Take two groups of a index file and return the vector formed by the coordinates of group2, group1 in this order.py
Args:
conf (path): the configuration file [gro, pdb, xtc, etc..]
tpr (path): tpr file, or gro, pdb, etc...
ndx (path): index file, need to have the defined groups
group1 (str): name of the first group in the index file.
group2 (str): name of the second group in the index file.
round (int or None): the specification to round.
Returns:
np.array: the unitary vector formed by group2, group1
"""
with tempfile.NamedTemporaryFile(suffix=".xvg") as xvgtmp1:
run(f"export GMX_MAXBACKUP=-1;echo {group1} | gmx traj -f {conf} -s {tpr} -n {ndx} -com -ox {xvgtmp1.name}")
coord1 = xvg.XVG(xvgtmp1.name).data[0,1:]# First row from column 1 to the end, the column 0 is time.
with tempfile.NamedTemporaryFile(suffix=".xvg") as xvgtmp2:
run(f"export GMX_MAXBACKUP=-1;echo {group2} | gmx traj -f {conf} -s {tpr} -n {ndx} -com -ox {xvgtmp2.name}")
coord2 = xvg.XVG(xvgtmp2.name).data[0,1:]# First row from column 1 to the end, the column 0 is time.
vector = unit_vector(coord2 - coord1)
if round: vector = np.round(vector, round)
return vector
def timeit(method):
def timed(*args, **kw):
ts = time.time()
result = method(*args, **kw)
te = time.time()
if 'log_time' in kw:
name = kw.get('log_name', method.__name__.upper())
kw['log_time'][name] = int((te - ts) * 1000)
else:
print('%r %2.2f ms' % \
(method.__name__, (te - ts) * 1000))
return result
return timed
#=======================================================================================
# Tools for execution
#=======================================================================================
[docs]
def multi_run(commands, nPar, shell = True, executable = '/bin/bash'):
"""This will run as many runs as nPar.
Args:
commands (list): A list of string to be run in the specified shell.
nPar (int): How many processes are running simultaneously
shell (bool, optional): belongs to run(). Defaults to True.
executable (str, optional): belongs to tun(). Defaults to '/bin/bash'.
Popen (bool, optional): belongs to run(). Defaults to False.
"""
tmp_cmds = []
for cmd in commands:
tmp_cmds.append(run(cmd, shell = shell, executable = executable, Popen = True))
if len(tmp_cmds) >= nPar:
print(f"{len(tmp_cmds)} running, now waiting...")
for tmp_cmd in tmp_cmds:
tmp_cmd.wait()
tmp_cmds = []
def run(command, shell = True, executable = '/bin/bash', Popen = False):
#Here I could make some modification in order that detect the operator system
#NAd make the command compatible with the operator system
#the function eval could be an option if some modification to the variable command
#need to be done.... SOme flight ideas...
if Popen:
#In this case you could access the pid as: run.pid
process = subprocess.Popen(command, shell = shell, executable = executable, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text = True)
else:
process = subprocess.run(command, shell = shell, executable = executable, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text = True)
returncode = process.returncode
if returncode != 0:
print(f'Command {command} returned non-zero exit status {returncode}')
raise RuntimeError(process.stderr)
return process
[docs]
class CHECK:
"""
This class test the normal ending of a GROMACS simulation using the log file.
E.g.
let say that we have the md.log of a simulation
# CHECK = CHECK('md.log')
# if CHECK.performance:
# #The simulation had a good ending
# else:
# #The simulations crashed.
"""
[docs]
def __init__(self, logfile, lisfile = None):
self.logfile = logfile
self.lisfile = lisfile
self.performance = None #in ns/day, this attribute could be used to check if the simulation ended
self.nsteps = None
self.final_step = 0
self.__check_normal_end__()
def __check_normal_end__(self):
with open(self.logfile, 'r') as f:
lines = f.readlines()
for i in range(len(lines)):
if 'nsteps' in lines[i]:
self.nsteps = int(lines[i].split('=')[-1])
if lines[i].startswith(' Step Time'):
self.final_step = int(lines[i+1].split()[0])
if 'Finished mdrun on rank' in lines[i]:
try:
self.performance = float(lines[i - 1].split()[1])
except:pass
# last check. It could be that the last part is printed but the job is not finished. Received ter signal
if self.nsteps and self.performance:
if self.final_step < self.nsteps:
print(f"({self.logfile}, {self.lisfile}) simulation, even finishing with proper output, only {self.final_step} / {self.nsteps} steps were reached. Therefore, performance = None")
self.performance = None
[docs]
def estimated_end_datetime(self):
"""Read an md.lis looking for the last instance of the string with the estimation of the ended time.
"step 24940000, will finish Tue Oct 5 10:38:53 2021"
Args:
self.lisfile (path): md.lis, the output of the gromacs execution.
Returns:
datetime: The estimated end of the simulation.
"""
with open(self.lisfile, "r") as f:
lines = f.readlines()
datetime_string = None
for line in reversed(lines):
if "will finish" in line:
datetime_string = line.split("will finish")[-1].strip()
break
if datetime_string:
return datetime.datetime.strptime(datetime_string, "%a %b %d %H:%M:%S %Y")
else:
return None
def get_startdatetime_from_log(self):
with open(self.logfile, "r") as f:
lines = f.readlines()
datetime_string = None
for line in reversed(lines):
if 'Started mdrun on rank' in line: # Will take the last
datetime_string = " ".join(line.split()[-5:])
break
if datetime_string:
return datetime.datetime.strptime(datetime_string, "%a %b %d %H:%M:%S %Y")
else:
return None
[docs]
def daysdiff(self, startdatetime = None):
"""Take the difference between the estimated ended time of lisfile and datetime_start
Args:
lisfile (path): [description]
datetime_start (datetime): A date time.
Returns:
float: Days' difference.
"""
if not startdatetime:
startdatetime = self.get_startdatetime_from_log()
print("The starting data time of the log file was used becasue user did not provided a start_datetime")
end_datetime = self.estimated_end_datetime()
if startdatetime and end_datetime:
return (end_datetime - startdatetime).total_seconds() / 86400
else:
return None
[docs]
def checkrun(user = '$USER', partition = 'deflt'):
"""Check for the running process on the cluster.
Returns:
list of integers: The integers that identify the running process on the cluster.
"""
stdout = run(f"squeue -p {partition} -u {user} --format=%.i").stdout.split()[1:]#The first line is the string "JOBID"
return [int(item) for item in stdout]
[docs]
def job_launch_list(job_path_list, shell="sbatch"):
"""
Same as job_launch, but the path to the jobs are provided
Args:
job_path_list (_type_): _description_
shell (str, optional): _description_. Defaults to "sbatch".
Returns:
_type_: _description_
"""
cwd = os.getcwd()
JOBIDs = []
for job_path in job_path_list:
path = os.path.dirname(job_path)
script_name = os.path.basename(job_path)
os.chdir(path)
if shell == "sbatch":
JOBIDs.append(int(run(f"{shell} --parsable {script_name}").stdout))
else:
try:
JOBIDs.append(int(run(f"{shell} {script_name}").stdout))
except:
pass
os.chdir(cwd)
return JOBIDs
def launch_wait_check_repeat(partition, jobpaths, logfile, lisfile, jobsh_name, logfile_start_datetime = None):
maximum_days = {
'deflt': 2,
'long': 5,
'debug': 0.02,
'medium': 2,
'fat+': 2,
'fat': 2,
'int': 2,
'gpu': 2,
'gpu:test': 0.02,
'gpu-hub':2,
'uds-hub':2,
}
# partition = 'deflt'
# jobs2launch = [full_path_jobsh]
# logfile = "production.log"
# lisfile = 'production.lis'
# jobsh_name = 'job.sh'
# First launch (normal)
# I will check for the cpt file based on the logfile name
JOBIDs = job_launch_list([os.path.join(path, jobsh_name) for path in jobpaths])
while len(set(JOBIDs).difference(checkrun(partition=partition))) != len(JOBIDs):
time.sleep(np.random.randint(20,60))
# Relaunch if needed.
jobs2relaunch = {}
for path in jobpaths:
check = CHECK(os.path.join(path,logfile), os.path.join(path,lisfile))
cpt_file = os.path.join(path, os.path.splitext(logfile)[0] + '.cpt')
# Check from the correct end
print(f"The simulation {os.path.join(path,logfile)} presented a performance of {check.performance}")
if not check.performance:
if os.path.exists(cpt_file):
if logfile_start_datetime:
tmp_check = CHECK(os.path.join(path,logfile_start_datetime))
daysdiff = check.daysdiff(tmp_check.get_startdatetime_from_log())
else:
daysdiff = check.daysdiff()
# Check if the estimated time is larger than the reservation time in the corresponded partition.
if daysdiff:
if partition not in maximum_days:
maximum_days[partition] = 2
if daysdiff > maximum_days[partition]:
# Get how many time the simulation must be relaunch
repeat = round((daysdiff / maximum_days[partition]) - 1)
if repeat == 0:repeat = 1
# Add path as key and number of repetition as value
jobs2relaunch[path] = repeat
else:
print(f'We were not able to guess what is the problem with the run of the job {path}')
else:
print(f'We were not able to guess what is the problem with the run of the job {path}')
else:
print(f'Unable to continue job {path}, {cpt_file} doe not exist or is unaccesible')
# Only if there are some jobs to relaunch
if jobs2relaunch:
while sum(jobs2relaunch.values()):
# Launch the jobs if repeat is greater than zero
to_launch = [os.path.join(path, jobsh_name) for path in jobs2relaunch if jobs2relaunch[path] > 0]
JOBIDs = job_launch_list(to_launch)
while len(list(set(JOBIDs).difference(checkrun(partition=partition)))) != len(JOBIDs):
time.sleep(np.random.randint(20,60))
# Update the repetitions
for path in jobs2relaunch:
check = CHECK(os.path.join(path,logfile), os.path.join(path,lisfile))
if check.performance:
jobs2relaunch[path] == 0
if jobs2relaunch[path] > 0:
jobs2relaunch[path] -= 1
print(f'Performance of the relaunched jobs:')
for path in jobs2relaunch:
check = CHECK(os.path.join(path,logfile), os.path.join(path,lisfile))
print(f"The simulation {os.path.join(path,logfile)} presented a performance of {check.performance}")
[docs]
def job_launch(shell="sbatch", script_name = "job.sh"):
"""
Parameters
----------
shell : TYPE, optional
DESCRIPTION. The default is "sbatch".
script_name : TYPE, optional
DESCRIPTION. The default is "job.sh".
If a regular expresion is provided
then the function will execute the first ordered alphabetically. E.g:
job.* was provided and there job.sh and job.bash. Then it will use job.bash.
Returns
-------
JOBIDs : list of integers.
DESCRIPTION. The ID of the launch in case of sbatch was used as shell
"""
cwd = os.getcwd()
JOBIDs = []
for (root, _, _) in list(os.walk(cwd)):
os.chdir(root)
if glob(script_name):
script_name = sorted(glob(script_name))[0]
if shell == "sbatch":
JOBIDs.append(int(run(f"{shell} --parsable {script_name}").stdout))
else:
try:
JOBIDs.append(int(run(f"{shell} {script_name}").stdout))
except:
pass
os.chdir(cwd)
return JOBIDs
[docs]
def backoff(file_path):
"""
Parameters
----------
file : TYPE string
DESCRIPTION: The name or the path f or the specific file
Returns
-------
None.
If the file already exist. it will made a back up to ./#{file}.{str(i)}#,
Where i is an integer.
"""
basname = os.path.basename(file_path)
dirname = os.path.dirname(file_path)
if os.path.exists(file_path):
new_basname = basname
i = 1
while(os.path.exists(os.path.join(dirname, new_basname))):
new_basname = f"./#{basname}.{str(i)}#"
i += 1
print(f"Back Off! I just backed up {file_path} to {os.path.join(dirname, new_basname)}")
shutil.copy2(file_path, os.path.join(dirname, new_basname))
#=======================================================================================
# Tools for working with files
#=======================================================================================
def makedirs(path):
if os.path.exists(path):
pass
else:
os.makedirs(path,exist_ok=True)
[docs]
def rm(pattern, r = False):
"""
Parameters
----------
patterns : string
input-like Unix rm
r : TYPE, bool
DESCRIPTION. The default is False.
If True delete also directories
Returns
-------
None.
"""
if glob(pattern):
for p in glob(pattern):
if r:
if os.path.isdir(p):
shutil.rmtree(p)
elif os.path.isfile(p):
os.remove(p)
else:
if os.path.isfile(p):
os.remove(p)
elif os.path.isdir(p):
print(f"The directory '{p}' was not deleted, set the flag r to True")
else:
print(f"rm: '{pattern}' doesn't exist")
[docs]
def cp(src, dest, r = False):
"""
This function makes use of the possible multiple CPU of the machine.
Parameters
----------
src : TYPE: string
DESCRIPTION:
Source Path to a directory or a file. regular expresion are accepted. The librery glob is used for that.
dest : TYPE: string
DESCRIPTION:
Destination Path
r : TYPE, optional
DESCRIPTION. The default is False:
If True, Also directories will be copy. If not, and a directory was
given as src, a Raise Exception will be printed
Another Raise Exception will be printed if the destination path doesn't
exist.
Returns
-------
None.
"""
src = os.path.abspath(src)
dest = os.path.abspath(dest)
#This scheme doesn't consider the possibilities of other item except files, dir or regualr expresions
if glob(src) and os.path.exists(os.path.dirname(dest)):
if os.path.isdir(src):
if r == True:
basename_src = os.path.basename(src)
for root, dirs, files in os.walk(src):
path2copy = root.split(src)[1]
try:
if path2copy[0] == "/":
path2copy = path2copy[1:]
except: pass
path_dest = os.path.join(dest,basename_src,path2copy)
if dirs:
pool = mp.Pool(mp.cpu_count())
pool.map(makedirs, [os.path.join(path_dest, d) for d in dirs])
pool.close()
if files:
makedirs(path_dest)
pool = mp.Pool(mp.cpu_count())
pool.starmap(shutil.copy2, [(os.path.join(root, f),path_dest) for f in files])
pool.close()
else:
print(f"If you need to copy directories, set the 'r' flag to True. {src} is a directory")
elif os.path.isfile(src):
shutil.copy2(src,dest)
#if not os.path.exists(dest):
# print(f"The file '{src} was not copy to '{dest}' becasue doesn't exist or is not accesible")
else:#Here we are in regular expresions
to_copy = [file for file in glob(src) if os.path.isfile(file)]
pool = mp.Pool(mp.cpu_count())
pool.starmap(shutil.copy2, [(pattern,dest) for pattern in to_copy])
pool.close()
elif not glob(src) and not os.path.exists(os.path.dirname(dest)):
print(f"cp: neither {src}, nor {dest} exist.")
elif not glob(src):
print(f"cp: The source file {src} doesn't exist")
else:
print(f"cp: cannot create regular file '{dest}': Not a directory")
def mv(src, dest, r = False):
cp(src, dest, r = r)
rm(src,r=r)
def touch(filepath):
open(filepath, "w").close()
def list_if_dir(path = '.'):
return [item for item in os.listdir(path) if os.path.isdir(os.path.join(path, item))]
def list_if_file(path = '.'):
return [item for item in os.listdir(path) if os.path.isfile(os.path.join(path, item))]
#=======================================================================================
# Tools for working with index and topology files
#=======================================================================================
[docs]
class Mol2:
"""Read a mol2 molecule and storge the corresponded RDKit molecule in
the attribute 'mol'
Example
-------
.. ipython:: python
from palmiche.utils.tools import Mol2, get_palmiche_data
import os
get_palmiche_data(file="samples/mol2.tar.gz", out_dir='.')
molecule = Mol2('mol2/test.mol2')
for atom in molecule.mol.GetAtoms():
# Two extra properties added to the atom class
print(atom.GetProp('atom_type'), atom.GetDoubleProp('charge'))
"""
[docs]
def __init__(self, mol2_path:PathLike) -> None:
self.mol2_path = mol2_path
self.mol = None
self._name_changed_ = False
self.read()
[docs]
def read(self) -> None:
"""This function read the file and create
the RDKit molecule and stores it in the attribute
`mol`.
"""
self.mol = Chem.MolFromMol2File(self.mol2_path, removeHs = False)
# Trying first conversion
if self.mol:
atom_types = [atom.GetSymbol() for atom in self.mol.GetAtoms()]
# Trying first conversion
else:
print('Trying with atom types change...')
with open(self.mol2_path, 'r') as f:
lines = f.readlines()
atom_types = []
for i in range(len(lines)):
if lines[i].startswith('@<TRIPOS>ATOM'):
for j in range(i+1, len(lines)):
if lines[j].startswith('@'):
break
line_split = lines[j].split()
atom_types.append(line_split[5])
# Remove all digits of atom and set the atom as atom type
line_split[5] = line_split[1].translate({ord(i): None for i in '0123456789'})
lines[j] = '\t'.join(line_split) + '\n'
break
string = "".join(lines)
self.mol = Chem.MolFromMol2Block(string, removeHs = False)
if self.mol:
with open(self.mol2_path, 'r') as f:
lines = f.readlines()
user_formal_charges = []
for i in range(len(lines)):
if lines[i].startswith('@<TRIPOS>ATOM'):
for j in range(i+1, len(lines)):
if lines[j].startswith('@'):
break
line_split = lines[j].split()
user_formal_charges.append(float(line_split[-1]))
break
for atom, atom_type, user_formal_charge in zip(self.mol.GetAtoms(), atom_types, user_formal_charges):
atom.SetDoubleProp('charge', float(user_formal_charge))
atom.SetProp('atom_type',atom_type)
print("The mol was created succesfully!")
else:
print("\nIt is not possible to convert the mol2 to RDKit molecule.\n")
[docs]
def ChangeName(self, mol_name:str) -> None:
"""Change name of the molecule
Parameters
----------
mol_name : str
Code name to the molecule. It is wise to use a code
no larger than 3 characters.
"""
self._name_changed_ = True
self.mol.SetProp('_Name', mol_name)
[docs]
def write(self, out_file:PathLike) -> None:
"""Write the molecue in mol2 format.
Parameters
----------
out_file : PathLike
The path to the file.
"""
with open(self.mol2_path, 'r') as f:
lines = f.readlines()
for i in range(len(lines)):
if lines[i].startswith('@<TRIPOS>ATOM'):
atom_idx = 0
for j in range(i+1, len(lines)):
if lines[j].startswith('@'):
break
line_split = lines[j].split()
line_split[2:5] = [coord for coord in self.mol.GetConformer().GetAtomPosition(atom_idx)]
if self._name_changed_:
line_split[7] = self.mol.GetProp('_Name')
for column, item in enumerate(line_split):
if column in [0,1,5,6,7]:
line_split[column] = f"{line_split[column]:>5}"
elif column in [2,3,4]:
line_split[2:5] = [float(coord) for coord in line_split[2:5]]
line_split[2:5] = [f"{coord:10.4f}" for coord in line_split[2:5]]
else:
line_split[column] = f"{line_split[column]:>10}"
lines[j] = ''.join(line_split) + '\n'
atom_idx += 1
break
if lines[i].startswith('@<TRIPOS>MOLECULE') and self._name_changed_:
lines[i+1] = f"{self.mol.GetProp('_Name'):>7}\n"
with open(out_file, 'w') as f:
f.write("".join(lines))
# import openbabel
# obConversion = openbabel.OBConversion()
# obConversion.SetInAndOutFormats("mol", "mol2")
# obabel_mol = openbabel.OBMol()
# obConversion.ReadString(obabel_mol, Chem.MolToMolBlock(self.mol))
# obConversion.WriteFile(obabel_mol, out_file_name)
# for idx, atom in enumerate(self.mol.GetAtoms()):
# obabel_mol.GetAtom(idx +1).SetPartialCharge(atom.GetDoubleProp('charge'))
# obConversion.WriteFile(obabel_mol, 'taka')
[docs]
def get_atom_index(file_path, H_atoms = True):
"""
Parameters
----------
file_path : TYPE, str
DESCRIPTION. The file with the atoms, Could be an itp, gro or pdb.
H_atoms : TYPE, optional
DESCRIPTION. The default is True. :
True:
Return all the list of atoms.
False:
Just return the heavy atoms (non hydrogens).
Returns
-------
atom_index : TYPE
DESCRIPTION.
"""
cwd = os.getcwd()
tmpdir = tempfile.TemporaryDirectory()
#Guessing the extension file
if not os.path.exists(file_path):
raise FileNotFoundError(f"The file {file_path} doesn't exist or is not accessible")
file_path = os.path.abspath(file_path)
basename = os.path.basename(file_path)
ext = basename.split('.')[-1].lower()
os.chdir(tmpdir.name)
atom_index = []
if ext == 'itp':
atoms_section = get_top_sections(file_path, dictionary=True)['atoms']
for line in atoms_section:
if not line.startswith(";") and not line == "\n":
split = line.split()
if H_atoms:
atom_index.append(int(split[0]))
else:
if split[4].strip()[0] not in 'Hh': #This look for the first character of the atom name, if it is H or h, then is a hydrogen atom
atom_index.append(int(split[0]))
elif ext in ['gro', 'pdb']:
with open("ndx.opt", "w") as opt:
opt.write("""
"System" group System;
"System-H" group System and ! name "H*" and ! name "h*"
""")
run(f"""
echo 'q'| gmx make_ndx -f {file_path} -o tmp.ndx
gmx select -s {file_path} -sf ndx.opt -n tmp.ndx -on tmp
""")
#deleting the line _f0_t0.000 in the file
with open("tmp.ndx", "r") as index:
data = index.read()
data = data.replace("_f0_t0.000","")
with open("tmp.ndx", "w") as index:
index.write(data)
ndx = NDX("tmp.ndx").data
if H_atoms:
atom_index = ndx["System"]
else:
atom_index = ndx["System-H"]
# Returning to the original directory
os.chdir(cwd)
return atom_index
# Both functions could be put together in a class NDX, that has one method, write, because always read. But this create the disatventage to incompatibiliti with ndx rectifivcation
# En otras palabras que se pierde la ventaja de copiar un dictionario nen formatop NDX, se tendria que crear un NDX vacio (un archivo vacio , porque siempre lo va a leer) y asignar la data al objecto,
class NDX:
def __init__(self, file):
self.file = file
self.data = {}
self.read()
def read(self):
"""
Da bateo cuando el nombre del indice no tiene ningu atomo, dime algo!!!!
Tengo que revisar en todos lo lugares que llamo a NDX, porque le voy a quitar que convierta a entero
Y tengo que ver como hago mas general la idea de indices sin atomos
Returns
-------
dictionary : TYPE: dict
A dictionary with the group index in the form
{ group_name1: [atom_idndex1, atom_idndex2, ...]
group_name2: [atom_idndex1, atom_idndex2, ...]
...
}
"""
with open(self.file,'r') as f:
lines = f.readlines()
positions = [i for (i, line) in enumerate(lines) if line.startswith('[')]
for i in range(len(positions)):
key = lines[positions[i]][2:-3]
self.data[key] = []
try:
chunk_lines = lines[positions[i]+1:positions[i+1]]
except:
chunk_lines = lines[positions[i]+1:]
for line in chunk_lines:
self.data[key] += [int(value) for value in line.split()]
def write(self, out_name = "index.ndx", backup = True):
"""
Write a GROMACS index file
Parameters
----------
dictionary : dict
The keys of the dictionary are the names of the groups.
The values are the corresponded list of atoms
out_name : TYPE, str
DESCRIPTION. The default is "index.ndx".
The name of the output file
Returns
-------
None.
"""
data = ""
items_per_row = 15
for key in self.data:
data += f"[ {key} ]\n"
i = 0
while i < len(self.data[key]):
string = [str(item) for item in self.data[key][i:i+items_per_row]]
i += items_per_row
data += " ".join(string) + "\n"
if backup: backoff(out_name)
out_file = open(out_name,'w')
out_file.write(data)
out_file.close()
[docs]
def get_top_sections(topology, dictionary = False):
"""
The flag dictionary is available because if in the topology there are two
sections with the same name, then the last one is the only one that will
be save.
IN the case that
Parameters
----------
topology : path to a gromacs topology file
DESCRIPTION.
dictionary : TYPE, optional
DESCRIPTION. The default is False.
If True instead of a list return a dict with keywords the sections
If not, a list with elements [key, info], ...
Returns
-------
sections : list or dict depend on dictionary
DESCRIPTION.
The info of the topology file
"""
with open(topology,"r") as top:
top_lines = top.readlines()
sections = [] #is not possible to use dict because there is some repeated sections, like dihedrals, I don't know why
for i in range(len(top_lines)):
if top_lines[i].startswith("["):
key = ""
for char in top_lines[i]:
if char == "]":
break
if char not in "[ ":
key += char
key.strip()
info = []
for j in range(i + 1, len(top_lines)):
if top_lines[j].startswith("["):
i = j - 1
break
info.append(top_lines[j])
sections.append([key.strip(), info])
if dictionary: return dict(sections)
return sections
[docs]
def KbT(absolute_temperature):
"""Return the value of Kb*T in kJ/mol
Args:
absolute_temperature (float): The absolute temperature in kelvin
"""
return absolute_temperature*CTE.Kb
def beta(absolute_temperature):
return 1 / KbT(absolute_temperature)
def BoltzmannFactor(E,T):
return np.exp(-E/KbT(T))
def get_force_cte(T, sigma):
return KbT(T)/sigma**2 # Kj/(mol [sigma^2])
def get_sigma(T, force_constant):
return np.sqrt(KbT(T)/force_constant)
#======================================================================================================
# Here I will introduce some nice tools for working with advances python modules for molecular dynamic
#======================================================================================================
[docs]
def my_guess_types(names):
"""Guess the type of the atoms. It will simply remove the number of each string. Useful for the guessing of the ligands atoms
Args:
names (list): a list of atoms names
"""
def f(array):
for item in array:
return ''.join(filter(str.isalpha, item))
return np.vectorize(f)(names)
def array_to_bv(s):
bv = ExplicitBitVect(len(s))
on_bits = np.where(s == True)[0].tolist()
bv.SetBitsFromList(on_bits)
return bv
class classifier_ifp:
def __init__(self, tpr, xtc, ligand_selection = 'resname LIA', protein_selection = 'protein', elapsed_steps = 0, load_fingerprint = False, ifp_file = 'ifp.pkl', ifp_atoms_file = 'ifp_atoms.pkl', n_clusters = 2) -> None:
self.tpr = tpr
self.xtc = xtc
self.ligand_selection = ligand_selection
self.protein_selection = protein_selection
self.elapsed_steps = elapsed_steps
self.load_fingerprint = load_fingerprint
self.ifp_file = ifp_file
self.ifp_atoms_file = ifp_atoms_file
self.n_clusters = n_clusters
self.parse()
def __split(self, string):
def f(s):
if 'chain' in s:
return s.split('_')[-1]
elif "LI" in s:
return s.split('_')[-1][-1]
else:
return ""
return np.vectorize(f)(string)
def cluster(self, n_clusters=2, max_iter=500, init='Cao', n_init=40, verbose=0, random_state=None, n_jobs=1):
self.n_clusters = n_clusters
## Becasue all the data is True or False, every variable contribute with the same weight, so there is no need to rescale it.
## Initialize the model
self.model = KModes(
n_clusters=self.n_clusters,
max_iter=max_iter,
init=init,
n_init=n_init,
verbose=verbose,
random_state=random_state,
n_jobs=n_jobs
)
## Run the model
self.model.fit(self.ifp_df)
# Group frames on clusters
self.membership = {}
for i in range(self.n_clusters):
self.membership[i] = np.where(self.model.labels_ == i)[0]
def parse(self):
if self.load_fingerprint:
self.ifp_df = pd.read_pickle(self.ifp_file).astype(np.uint8)
try:
self.ifp_df_atoms = pd.read_pickle(self.ifp_atoms_file)
except:
print('This object does not contains the atribute self.ifp_df_atoms')
else:
tmp_pdb = tempfile.NamedTemporaryFile(suffix='.pdb')
run(f"export GMX_MAXBACKUP=-1; gmx editconf -f {self.tpr} -o {tmp_pdb.name}")
ref = mda.Universe(tmp_pdb.name) # This has the correct numeration for the residues
u = mda.Universe(self.tpr, self.xtc)#, guess_bonds=True
guessed_elements = guess_types(my_guess_types(u.atoms.names))
u.add_TopologyAttr('names', guessed_elements)
u.add_TopologyAttr('elements', guessed_elements)
u.add_TopologyAttr('types', guessed_elements)
u.add_TopologyAttr('resid', ref.residues.resids)
u.add_TopologyAttr('chainID', self.__split(u.atoms.segids)) # Puede pensar en algo mas general con la funcion que tengo dentro del pdb para adivinar las cadenas
self.ligand = u.atoms.select_atoms(self.ligand_selection)
self.protein = u.atoms.select_atoms(self.protein_selection)
# Initialize fingerprint
ifp = plf.Fingerprint()
# run on a slice of frames from begining to end with a step of 10
if self.elapsed_steps:
ifp.run(u.trajectory[::self.elapsed_steps], self.ligand, self.protein)
else:
ifp.run(u.trajectory, self.ligand, self.protein)
self.ifp_df = ifp.to_dataframe()
self.ifp_df_atoms = ifp.to_dataframe(return_atoms = True)
# Saving the fingerprint
self.ifp_df.to_pickle(self.ifp_file)
# Also the ifp_atoms will be exported
self.ifp_df_atoms.to_pickle(self.ifp_atoms_file)
tmp_pdb.close()
# Run cluster with the default values nad number of cluster
self.cluster(n_clusters=self.n_clusters)
def get_closers(self, user_membership = None):
"""Return the closest frames for each cluster in self.membership or user_memebership if provided.
Args:
user_membership (dict, optional): The memebership, the keys will be the cluster identifier and the values the frames taht belongs to each cluster. Defaults to None.
Returns:
_type_: _description_
"""
bvs_centroids = [array_to_bv(centroids) for centroids in self.model.cluster_centroids_]
bvs_frames = plf.to_bitvectors(self.ifp_df)
if user_membership:
membership = user_membership
else:
membership = self.membership
closest_frames = {}
for cluster in membership:
bvs_selected_frames = [bvs_frames[frame] for frame in membership[cluster]]
# If a cluster only have one member
if len(bvs_selected_frames) == 1:
# BulkTanimotoSimilarity needs more than one fingerprint, this is just a trick, and probably alwasy be avoid.
bvs_selected_frames += bvs_selected_frames
similarities = DataStructs.BulkTanimotoSimilarity(bvs_centroids[cluster], bvs_selected_frames)
index = similarities.index(max(similarities))
closest_frames[cluster] = membership[cluster][index]
return closest_frames
class classifier_dist_contacts:
def __init__(self, tpr, xtc, chains = ('A', 'B', 'C', 'D', 'E'), elapsed_steps = 0, n_clusters = 2) -> None:
self.tpr = tpr
self.xtc = xtc
self.chains = chains
self.elapsed_steps = elapsed_steps
self.n_clusters = n_clusters
self.data = pd.DataFrame()
self.parse()
def cluster(self, n_clusters=2, init='k-means++', n_init=50, max_iter=500, tol=0.0001, verbose=0, random_state=None, copy_x=True, algorithm='auto'):
features = ['AveNumResi', 'AveDistZ']
self.n_clusters = n_clusters
pipe = Pipeline(
[
("scaler", MinMaxScaler()),
('kmeans',
KMeans(
n_clusters=self.n_clusters,
init=init,
n_init=n_init,
max_iter=max_iter,
tol=tol,
verbose=verbose,
random_state=random_state,
copy_x=copy_x,
algorithm=algorithm
)
)
]
)
pipe.fit(self.data[features])
self.model = pipe['kmeans']
#Think about if implement all the method in a athomatic way to improve the selction of the clusters
# Group frames on clusters
self.membership = OrderedDict()
for i in range(self.n_clusters):
self.membership[i] = np.where(self.model.labels_ == i)[0]
def parse(self):
u = mda.Universe(self.tpr, self.xtc)
close_contacts_frame_0 = {chain:u.atoms.select_atoms(f'(backbone and segid *Protein_chain_{chain}) and around 4 resname LI{chain}') for chain in self.chains}
if self.elapsed_steps:
iterator = u.trajectory[::self.elapsed_steps]
Frame = range(0,u.trajectory.n_frames,self.elapsed_steps)
else:
iterator = u.trajectory
Frame = range(u.trajectory.n_frames)
AveNumResi = []
AveDistZ = []
for ts in tqdm.tqdm(iterator):
NumResi = []
DistZ = []
for chain in self.chains:
NumResi.append(len(u.atoms.select_atoms(f'(protein and segid *Protein_chain_{chain}) and around 3 resname LI{chain}').residues))
DistZ.append(np.abs(close_contacts_frame_0[chain].center_of_mass(pbc=True, compound='group')[-1] - u.atoms.select_atoms(f'resname LI{chain}').center_of_mass(pbc=True, compound='group')[-1]))
AveNumResi.append(np.mean(NumResi))
AveDistZ.append(np.mean(DistZ))
self.data = pd.DataFrame(
{
'Frame': Frame,
'AveNumResi': AveNumResi,
'AveDistZ': AveDistZ,
}
)
# Run cluster with the default values nad number of cluster
self.cluster(n_clusters=self.n_clusters)
def get_closers(self):
# https://stackoverflow.com/questions/26795535/output-50-samples-closest-to-each-cluster-center-using-scikit-learn-k-means-libr
# This is like on the paper was done, but I need to do the scalling first
scale = MinMaxScaler()
scale.fit(self.data[['AveNumResi', 'AveDistZ']])
closest_frames, distances = vq(self.model.cluster_centers_, scale.transform(self.data[['AveNumResi', 'AveDistZ']]))
return closest_frames
if __name__ == '__main__':
pass