Source code for biobb_structure_checking.structure_checking

"""
    Class for Structure Checking functionality
"""
__author__ = "gelpi"
__date__ = "$26-jul-2018 14:34:51$"

import importlib
import sys
import os
import time
from numpy import sqrt

import biobb_structure_checking.constants as cts

from biobb_structure_checking.io.json_writer import JSONWriter
from biobb_structure_checking.io.param_input import ParamInput, NoDialogAvailableError

import biobb_structure_checking.structure_manager as stm
import biobb_structure_checking.modelling.utils as mu


# Main class
[docs]class StructureChecking(): """ | biobb_structure_checking.StructureChecking | Main class to control structure checking functionality | Provides support for to check_structure command line | Load directly for Jupyter Notebook or python scripts. Args: base_dir_path (str): Base directory path where application resides. args (dict): Arguments dictionary see https://biobb-structure-checking.readthedocs.io/en/latest/command_line_usage.html. """ def __init__(self, base_dir_path, args): if args is None: args = {} self.args = cts.set_defaults(base_dir_path, args) self.summary = {} if self.args['debug'] or args['time_limit']: import psutil self.start_time = time.time() if self.args['debug']: self.timings = [] self.summary['elapsed_times'] = {} self.summary['memsize'] = [] try: self.strucm = self._load_structure( self.args['input_structure_path'], self.args['fasta_seq_path'] ) except IOError: sys.exit( "ERROR: fetching/parsing structure from " f"{self.args['input_structure_path']}" ) except ( stm.WrongServerError, stm.UnknownFileTypeError, stm.ParseError ) as err: sys.exit(err.message) if self.args['debug']: self.timings.append(['load', time.time() - self.start_time]) process = psutil.Process(os.getpid()) memsize = process.memory_info().rss/1024/1024 self.summary['memsize'].append(['load', memsize]) print(f"#DEBUG Memory used after structure load: {memsize:f} MB ") if self.args['time_limit'] and self._check_time_limit(): sys.exit(1) # if self.args['atom_limit'] and \ # self.strucm.st_data.stats['num_ats'] > self.args['atom_limit']: # sys.exit( # cts.MSGS['ATOM_LIMIT'].format( # self.strucm.st_data.stats['num_ats'], # self.args['atom_limit'] # ) # )
[docs] def launch(self): """ StructureChecking.launch Method run from the command line invocation """ if self.args['command'] == 'command_list': self.command_list(self.args['options']) elif self.args['command'] == 'checkall': self.checkall(self.args['options']) elif self.args['command'] == 'fixall': self.fixall(self.args['options']) elif self.args['command'] == 'load': if self.args['nocache'] and \ not self.args['force_save'] and \ not self.args['copy_input']: print( "WARNING: load with --nocache will not " "have any effect unless --copy_input is set" ) else: self._run_method(self.args['command'], self.args['options']) if not self.args['check_only'] or self.args['force_save']: if self.strucm.modified or self.args['force_save']: if not self.strucm.modified: print(cts.MSGS['FORCE_SAVE_STRUCTURE']) if not self.args['check_only']: self.print_stats('Final') self.summary['final_stats'] = self.strucm.get_stats() try: output_structure_path = self._save_structure( self.args['output_structure_path'], self.args['rename_terms'], split_models='--save_split' in self.args['options'] ) print(cts.MSGS['STRUCTURE_SAVED'], output_structure_path) self.summary['saved_structure'] = output_structure_path except OSError: print( 'ERROR: unable to save PDB data on ', output_structure_path, file=sys.stderr ) except stm.OutputPathNotProvidedError as err: print(err.message, file=sys.stderr) elif not self.strucm.modified: print(cts.MSGS['NON_MODIFIED_STRUCTURE']) self.summary['modified_structure'] = self.strucm.modified if self.args['debug']: total = time.time() - self.start_time self.summary['elapsed_times']['total'] = total print("#DEBUG TIMINGS") print("#DEBUG =======") ant = 0. for operation, timing in self.timings: elapsed = timing - ant self.summary['elapsed_times'][operation] = elapsed print( f"#DEBUG {operation:15s}: " f"{elapsed:10.4f} s " f"({elapsed / total * 100:6.2f}%)" ) ant = timing print(f"#DEBUG TOTAL : {total:10.4F} s") print() print("#DEBUG MEMORY USAGE EVOLUTION") print("#DEBUG ======================") for operation, memsize in self.summary['memsize']: print(f"#DEBUG {operation:15s}: {memsize:.2f} MB") if self.args['json_output_path'] is not None: json_writer = JSONWriter() json_writer.data = self.summary try: json_writer.save(self.args['json_output_path']) print( cts.MSGS['JSON_SAVED'], self.args['json_output_path'] ) except IOError: print( cts.MSGS['JSON_NOT_SAVED'], self.args['json_output_path'] )
[docs] def print_stats(self, prefix=None): """ StructureChecking.print_stats Print statistics on the loaded structure Args: prefix (str): (None) Prefix to add to the output lines for identification. """ self.strucm.st_data.calc_stats() if prefix is None: prefix = '' self.strucm.print_stats(prefix)
[docs] def command_list(self, opts): """ StructureChecking.command_list Manages command_list workflows Args: opts (str | list(str)): Command options as str, file or str list (';' separated). """ try: opts = cts.DIALOGS.get_parameter('command_list', opts) op_list = opts['op_list'] except NoDialogAvailableError as err: print(err.message) if not op_list: if not self.args['non_interactive']: op_list = ParamInput('Command List File', False).run(op_list) else: sys.exit('ERROR: command list not provided and non_interactive') if os.path.isfile(op_list): command_list = [] try: with open(op_list, "r") as list_file_h: for line in list_file_h: if line == "\n" or line[0:1] == '#': continue command_list.append(line) except OSError: sys.exit(f"{cts.MSGS['ERROR_OPEN_FILE']} {op_list}") else: command_list = op_list.split(';') i = 1 for line in command_list: if not self.args['quiet']: print(f"\nStep {i}: {line}") data = line.split() command = data[0] opts = data[1:] if self._run_method(command, opts): break i += 1 print(cts.MSGS['COMMAND_LIST_COMPLETED'])
[docs] def checkall(self, opts=None): """ StructureChecking.checkall Predefined workflow for complete checking """ # Required to allow for interactive run in Notebooks old_check_only = self.args['check_only'] self.args['check_only'] = True for meth in cts.AVAILABLE_METHODS: if self._run_method(meth, None): break self.args['check_only'] = old_check_only
[docs] def fixall(self, opts=None): """ StructureChecking.fixall Fix all using defaults. Not implemented (yet) """ # TODO Implement method fixall print("Fixall not implemented (yet)")
[docs] def revert_changes(self): """ StructureChecking.revert_changes Reload original structure. Used in Pipelines or Notebooks to revert changes. """ self.strucm = self._load_structure( self.args['input_structure_path'], self.args['fasta_seq_path'] ) self.summary = {} print(cts.MSGS['ALL_UNDO'])
def _run_method(self, command, opts): """ Private. StructureChecking._run_method Run check and fix methods for specific command Args: command (str): Command to run opts (str | list(str) | dict): Command options, passed from callers """ try: importlib.import_module( f'biobb_structure_checking.commands.{command}' ) f_check = sys.modules[f'biobb_structure_checking.commands.{command}'].check f_fix = sys.modules[f'biobb_structure_checking.commands.{command}'].fix except ImportError as e: print(command, e) sys.exit(cts.MSGS['COMMAND_NOT_FOUND'].format(command)) if command not in self.summary: self.summary[command] = {} msg = f"Running {command}." if opts: if isinstance(opts, list): opts_str = ' '.join(opts) elif isinstance(opts, dict): opts_str = str(opts) else: opts_str = opts msg += f' Options: {opts_str}' self.summary[command]['opts'] = opts_str if not self.args['quiet'] or self.args['verbose']: print(msg.strip()) # Running checking method data_to_fix = f_check(self) # Running fix method if needed if self.args['check_only'] or opts in (None, ''): if self.args['verbose']: print(cts.MSGS['CHECK_ONLY_DONE']) elif data_to_fix: if isinstance(opts, (str, list)): if cts.DIALOGS.exists(command): opts = cts.DIALOGS.get_parameter(command, opts) else: opts = {} else: # Adding default parameters if cts.DIALOGS.exists(command): defaults = cts.DIALOGS.get_parameter(command, '') for k in defaults: if k not in opts: opts[k] = defaults[k] error_status = f_fix(self, opts, data_to_fix) if error_status: if isinstance(error_status, tuple): if error_status[1] is None: error_status = [error_status[0]] print('ERROR', ' '.join(error_status), file=sys.stderr) self.summary[command]['error'] = ' '.join(error_status) if self.args['debug']: import psutil self.timings.append([command, time.time() - self.start_time]) process = psutil.Process(os.getpid()) memsize = process.memory_info().rss/1024/1024 self.summary['memsize'].append([command, memsize]) print(f"#DEBUG Memory used after {command}: {memsize:f} MB ") return self.args['time_limit'] and self._check_time_limit() # ============================================================================== def _load_structure( self, input_structure_path, fasta_seq_path=None, verbose=True, print_stats=True ): """ Private. StructureChecking._load_structure Prepares Structure Manager and load structure Args: input_structure_path (str): Path to structure file or pdb:{pdbid] fasta_seq_path (str): (None) Path to sequence FASTA file verbose (bool): (True) Output progress information. print_stats (bool): (True) Print structure statistics """ input_line = ParamInput( "Enter input structure path (PDB, mmcif | pdb:pdbid)", self.args['non_interactive'] ) input_structure_path = input_line.run(input_structure_path) strucm = stm.StructureManager( input_structure_path, self.args['data_library_path'], self.args['res_library_path'], pdb_server=self.args['pdb_server'], cache_dir=self.args['cache_dir_path'], file_format=self.args['file_format'], nocache=self.args['nocache'], copy_dir=self.args['copy_input'], fasta_sequence_path=fasta_seq_path, nowarn=not self.args['build_warnings'], coords_only=self.args['coords_only'], overwrite=self.args['overwrite'], atom_limit=self.args['atom_limit'] ) self.summary['loaded_structure'] = input_structure_path if verbose: print(cts.MSGS['STRUCTURE_LOADED'].format(input_structure_path)) strucm.st_data.print_headers() print() self.summary['headers'] = strucm.st_data.meta if print_stats: strucm.print_stats() self.summary['stats'] = strucm.get_stats() return strucm
[docs] def save_structure( self, output_structure_path, rename_terms=False, split_models=False ): """ StuctureChecking.save_structure Saving the current structure in a the output file Args: output_structure_path (str): Path to saved File rename_terms (bool): (False) Rename terminal residues as NXXX, CXXX split_models (bool): (False) Save models in separated output files """ return self._save_structure( output_structure_path, rename_terms=rename_terms, split_models=split_models )
# Kept for back compatibility def _save_structure( self, output_structure_path, rename_terms=False, split_models=False ): """ Private. StuctureChecking._save_structure Saving the current structure in a the output file Args: output_structure_path (str): Path to saved File rename_terms (bool): (False) Rename terminal residues as NXXX, CXXX split_models (bool): (False) Save models in separated output files """ input_line = ParamInput( "Enter output structure path", self.args['non_interactive'], set_none='fixed_structure.pdb' ) output_structure_path = input_line.run(output_structure_path) if self.args['output_format']: output_format = self.args['output_format'] else: output_format = os.path.splitext(output_structure_path)[1][1:] if output_format == 'mmCif': output_format = 'cif' if not split_models: self.strucm.save_structure( output_structure_path, rename_terms=rename_terms, output_format=output_format, keep_resnames=self.args['keep_canonical'] ) else: for mod in self.strucm.st.get_models(): output_path = f'{output_structure_path}_{mod.serial_num}.{output_format}' self.strucm.save_structure( output_path, mod_id=mod.id, rename_terms=rename_terms, output_format=output_format, keep_resnames=self.args['keep_canonical'] ) print(cts.MSGS["SPLIT_MODELS"]) return output_structure_path
[docs] def check_report_clashes(self, residue_list=None, contact_types=None): """ StructureChecking.check_report_clashes Check and reports clashes Args: residue_list (res (list)) : Residues to check contact_types (int (list)): Types of contacts to consider """ if contact_types is None: contact_types = mu.ALL_CONTACT_TYPES if not residue_list: residue_list = self.strucm.st_data.all_residues return self._clash_report( contact_types, self.strucm.check_r_list_clashes(residue_list, contact_types) )
def _clash_report(self, contact_types, clash_list): summary = {} for cls in contact_types: summary[cls] = [] if clash_list[cls]: print( cts.MSGS['CLASHES_DETECTED'].format( len(clash_list[cls]), cls ) ) for rkey in sorted( clash_list[cls], key=lambda x: mu.key_sort_atom_pairs(clash_list[cls][x]) ): print( f" {mu.atom_id(clash_list[cls][rkey][0]):12}" f" {mu.atom_id(clash_list[cls][rkey][1]):12}" f" {sqrt(clash_list[cls][rkey][2]):8.3f} A" ) summary[cls].append({ 'at1':mu.atom_id(clash_list[cls][rkey][0]), 'at2':mu.atom_id(clash_list[cls][rkey][1]), 'dist': round(float(sqrt(clash_list[cls][rkey][2])), 4) }) else: if not self.args['quiet']: print(cts.MSGS['NO_CLASHES_DETECTED'].format(cls)) return summary def _check_time_limit(self): if time.time() - self.start_time > self.args['time_limit']: print(cts.MSGS['TIME_LIMIT'].format(self.args['time_limit']), file=sys.stderr) return True return False # ============================================================================== # Entry points for direct notebook calls and Docstrings for commands' help
[docs] def help(self, command=None): """ StructureChecking.help Provides help on StructureChecking commands Args: command (str) : (None) Requested command. If empty returns all commands help """ return cts.help(command)
[docs] def sequences(self, opts=None): """ StructureChecking.sequences Print canonical and structure sequences in FASTA format Args: opts (str | dict - Options dictionary): * output_fasta (str) - File name to output (FASTA format) """ self._run_method('sequences', opts)
[docs] def models(self, opts=None): """ StructureChecking.models Detect/Select Models. Check only with no options. Options accepted as command-line string, or python dictionary. Args: opts (str | dict - Options dictionary): * select (int) - model(s) to select * superimpose (bool) - superimpose models * build_complex (bool) - Build a complex from selected models """ self._run_method('models', opts)
[docs] def chains(self, opts=None): """ StructureChecking.chains Detect/Select Chains. Check only with no options. Options accepted as command-line string, or python dictionary. Args: opts (str | dict - Options dictionary): * select: * **chain_id_list** - List of chains to retain (comma separated, case sensitive), * **protein** - Select all protein chains, * **na** - Select all NA chains, * **rna** - Select all RNA chains, * **dna** - Select all DNA chains. * rename: * **auto** - Add first possible label staring on A to unlabeled chains * **label** - Use indicated label * renumber: * **auto** - Renumbers all residues from 1 without repeating residue numbers. Chains are preserved but relabelled from A * **str** - Specific renumbering recipe indicated as a list of tasks: [OldChain:]i0[-j0]=[NewChain:]i1. No j0 implies to the end of chain. No chain implies do the transformation in all chains. * rebuild: - Creates chain labels and renumbers residues based on backbone connectivity """ self._run_method('chains', opts)
[docs] def inscodes(self, opts=None): """ StructureChecking.inscodes Detects residues with insertion codes. Args: opts (str | dict - Options dictionary): * renumber (bool): Renumber residues to remove insertion codes. """ self._run_method('inscodes', opts)
[docs] def altloc(self, opts=None): """ StructureChecking.altloc Detect/Select Alternative Locations. Check only with no options. Options accepted as command-line string, or python dictionary. Args: opts (str | dict - Options dictionary): * select: * **occupancy** - select higher occupancy, * **alt_id** - All atoms of the indicated alternative * **list** of res_id:alt_id - Indicate selection per atom """ self._run_method('altloc', opts)
[docs] def metals(self, opts=None): """ StructureChecking.metals Detect/Remove Metals. Check only with no options. Options accepted as command-line string, or python dictionary. Args: opts (str | dict - Options dictionary): * remove: * **all** - Remove all metal atoms, * **atom_type_list**: Remove all Metals of listed types * **residue_list**: Remove indicated residues """ self._run_method('metals', opts)
[docs] def water(self, opts=None): """ StructureChecking.water Detect/Select Remove Water molecules. Check only with no options. Options accepted as command-line string, or python dictionary. Args: opts (str | dict - Options dictionary): * remove: Yes - Remove All Water molecules """ self._run_method('water', opts)
[docs] def hetatm(self, opts=None): """ StructureChecking.hetatm Manages hetero atoms. Not implemented yet. See Ligands """ print("Warning: hetatm function not implemented yet, running ligands instead") self._run_method('ligands', opts)
[docs] def ligands(self, opts=None): """ StructureChecking.ligands Detect/Remove Ligands. Check only with no options. Options accepted as command-line string, or python dictionary. Args: opts (str | dict - Options dictionary): * remove: * **all** - Remove all hetatm, * **res_type_list** - Remove Hetatm of given types, * **residue_list** - Remove indicated residues """ self._run_method('ligands', opts)
[docs] def rem_hydrogen(self, opts=None): """ StructureChecking.add_hydrogen Remove Hydrogen atoms from structure. Check only with no options. Options accepted as command-line string, or python dictionary. Args: opts (str | dict - Options dictionary): * remove (str): Yes - remove all hydrogen atoms """ self._run_method('rem_hydrogen', opts)
[docs] def getss(self, opts=None): """ StructureChecking.getss Detect SS Bonds. Check only with no options. Options accepted as command-line string, or python dictionary. Args: opts (str | dict - Options dictionary): * mark: * **all** - Rename all reported cys residues as CYX, * **residue_list** - Rename indicated residues """ self._run_method('getss', opts)
[docs] def amide(self, opts=None): """ StructureChecking.amide Detect/Fix Amide atoms Assignment. Check only with no options. Options accepted as command-line string, or python dictionary. Args: opts (str | dict - Options dictionary): * fix: * **all** - Fix all residues, * **residue_list** - Fix indicated residues * **auto** - Find the best combination to minimize amide contacts . * no_recheck (bool) - (False) Do not recheck amide residues after modification. """ self._run_method('amide', opts)
[docs] def chiral(self, opts=None): """ StructureChecking.chiral Detect/Fix Improper side chain chirality. Check only with no options. Options accepted as command-line string, or python dictionary. Args: opts (str | dict - Options dictionary): * fix: * **All** - Fix all residues * **residue_list** - Fix indicates residues * no_check_clashes (bool) - (False) Do not check generated clashes """ self._run_method('chiral', opts)
[docs] def chiral_bck(self): """ StructureChecking.chiral_bck Detect/Fix Improper CA chirality. No fix. """ self._run_method('chiral_bck', None)
[docs] def clashes(self): """ StructureChecking.clashes Detect steric clashes in groups: Severe, Apolar, Polar Donors, Polar Acceptors, Ionic Positive, Ionic Negative """ self._run_method('clashes', None)
[docs] def fixside(self, opts=None): """ StructureChecking.fixside Complete side chains (heavy atoms, protein only). Check only with no options. Options accepted as command-line string, or python dictionary. Args: opts (str | dict - Options dictionary): * fix: * **all** - Fix all residues * **residue_list** - Fix indicated residues * no_check_clashes (bool) - (False) Do not check for generated clashes * rebuild (bool) - (False) Rebuild side chains using Modeller """ self._run_method('fixside', opts)
[docs] def add_hydrogen(self, opts=None): """ StructureChecking.add_hydrogen Add Hydrogen Atoms to the structure. Check only with no options. Options accepted as command-line string, or python dictionary. Args: opts (str | dict - Options dictionary): * add_mode: * **auto** - Add hydrogen atom considering pH 7.0. * **pH** (float) - Set explicit pH value. * **list** (str) - Explicit residue list as [*:]HisXXHid. * no_fix_side (bool) - (False) Do not fix side chains. * keep_h (bool) - (False) Keep original Hydrigen atoms. * add_charges FF (str) - Add charges and atom types for the selected FF. """ self._run_method('add_hydrogen', opts)
[docs] def mutateside(self, mut_list): """ StructureChecking.mutateside Mutate side chain with minimal atom replacement. Check only with no options. Options accepted as command-line string, or python dictionary. Args: opts (str | dict - Options dictionary): * mut (str) - List of mutations * no_check_clashes (bool) - (False) Do not check for generated clashes * rebuild (bool) - (False) - Optimize new side chains using Modeller """ self._run_method('mutateside', mut_list)
[docs] def backbone(self, opts=None): """ StructureChecking.backbone Analyze/Fix main chain missing atoms and fragments (protein only). Check only with no options. Options accepted as command-line string, or python dictionary. Args: opts (str | dict - Options dictionary): * fix_atoms (str - Fix missing O, OXT backbone atoms): * **all** - Fix all residues * **residue List** - Fix indicated residues * fix_chain (str - Fix backbone main chain): * **all** - All detected breaks * **break list** - Indicated breaks * add_caps (str - Add ACE and NME residues): * **all** - All detected terminals * **residue_list** - Indicated terminals * **breaks** - Add caps to backbone breaks * **terms** - Add caps to true terminals * extra_gap (int) - ('0') Recover addiciontal residues from the model to improve match (experimental) * no_recheck (bool) - (False) Do not recheck backbone after fixing * no_check_clashes (bool) - (False) Do not check for generated clashes """ self._run_method('backbone', opts)
[docs] def cistransbck(self): """ StructureChecking.cistransbck Analyzes cis-trans dihedrals on backbone atoms """ self._run_method('cistransbck', None)
# =======================================================================