# MolDF
# Author: Ruibin Liu <ruibinliuphd@gmail.com>
# License: MIT
# Code Repository: https://github.com/Ruibin-Liu/MolDF
"""PDB format reading.
Reads a PDB file, including Chimera compatible ones, into a dict of
``Pandas DataFrame`` s.
Atom coordinates and sequences are currently supported by the following category names:
``_atom_site``: ``ATOM``, ``HETATM``, ``TER``, ``NUMMDL``, ``MODEL``, and
``ENDMDL`` lines.
``_seq_res``: ``SEQRES`` lines.
"""
from __future__ import annotations
import io
import os
import urllib.request
import warnings
from collections import defaultdict
from pathlib import Path
import numpy as np # type: ignore
import pandas as pd # type: ignore
from .constants import AMINO_ACIDS
IMPLEMENTED_PDB_CATS = ["_atom_site", "_seq_res", "_chem_comp"]
"""PDB categories that are currently implemented."""
ATOM_SITE = ("ATOM", "HETATM", "TER")
"""``_atom_site`` primary lines."""
NMR_MDL = ("NUMMDL", "MODEL", "ENDMDL")
"""``_atom_site`` additional lines."""
CHEM_COMP = ("HET ", "HETNAM", "FORMUL", "HETSYN")
"""``_chem_comp`` lines."""
AF2_MODEL = 4
"""For AlphaFold structures, the version to use."""
[docs]
def read_pdb(
pdb_file: str | os.PathLike[str] | None = None,
pdb_id: str | None = None,
category_names: list | None = None,
save_pdb_file: bool = True,
pdb_file_dir: str | os.PathLike | None = None,
allow_chimera: bool = True,
need_ter_lines: bool = True,
) -> dict[str, pd.DataFrame]:
"""Reads a ``.pdb`` file's categories into a ``dict`` of ``Pandas DataFrame`` s.
Args:
pdb_id (optional): PDB/Uniprot ID. Required if ``pdb_file`` is ``None``.
Defaults to **None**.
pdb_file (optional): file name for a PDB file. Used over `pdb_id`.
Defaults to **None**.
category_names (optional): a list of categories similar to the mmCIF format.
If ``None``, ``_atom_site`` is used.
To be consistent with the PDBx format, the following category names are
used to refer to block(s) in a PDB file and only they are supported:
1. ``_atom_site``: ``ATOM``, ``HETATM``, and ``TER`` lines and possible
``NUMMDL``, ``MODEL``, and ``ENDMDL`` lines.
2. ``_seq_res``: ``SEQRES`` lines.
3. ``_chem_comp``: all lines in ``CHEM_COMP`` above. However, only
compounds included in the ``HET `` lines are extracted which
usually means the water molecule is not included.
Defaults to **None**.
save_pdb_file (optional): whether to save the fetched PDB file from RCSB
to ``pdb_file_dir``. Defaults to **True**.
pdb_file_dir (optional): directory to save fetched PDB files. If ``None`` but
``save_pdb_file`` is ``True``, './PDB_files' is used.
Defaults to **None**.
allow_chimera (optional): whether to allow Chimera-formatted PDB files.
Defaults to **True**.
need_ter_lines (optional): whether to read the ``TER`` lines into the
``DataFrame``. Defaults to **True**.
Returns:
A dict of ``Pandas DataFrame`` s corresponding to required categories.
Raises:
ValueError: if none of ``pdb_id`` or ``pdbx_file`` is provided, or if ``pdb_id``
is given but cannot the PDB file cannot be downloaded from RCSB,
NotImplementedError: if `category_names` not a subset of allowed names.
FileNotFoundError: if ``pdbx_file`` cannot be found.
"""
data: dict[str, pd.DataFrame] = {}
if pdb_id is None and pdb_file is None:
raise ValueError("At least one of pdb_id and pdb_file has to be given.")
elif pdb_file is None:
pdb_id = str(pdb_id).upper()
if pdb_file_dir is None:
pdb_file_dir = "./PDB_files"
pdb_file_dir = Path(pdb_file_dir)
file_path_1 = Path(pdb_file_dir, f"{pdb_id}.pdb")
file_path_2 = Path(pdb_file_dir, f"{pdb_id.lower()}.pdb")
if os.path.exists(file_path_1):
pdb_file_handle: io.TextIOWrapper | io.StringIO = open(
file_path_1, "r", encoding="utf-8"
)
elif os.path.exists(file_path_2):
pdb_file_handle = open(file_path_2, "r", encoding="utf-8")
else:
if len(pdb_id) == 4:
pdb_file_url = f"https://files.rcsb.org/view/{pdb_id}.pdb"
else:
pdb_file_url = f"https://alphafold.ebi.ac.uk/files/AF-{pdb_id}-F1-model_v{AF2_MODEL}.pdb"
try:
with urllib.request.urlopen(pdb_file_url) as response:
raw_data = response.read()
text = raw_data.decode("utf-8")
pdb_file_handle = io.StringIO(text)
if save_pdb_file:
if not pdb_file_dir.exists():
pdb_file_dir.mkdir(parents=True, exist_ok=True)
file_path = Path(pdb_file_dir, f"{pdb_id}.pdb")
with open(file_path, "w", encoding="utf-8") as p_file:
p_file.write(text)
except urllib.error.HTTPError as http_error:
raise ValueError(
f"Cannot download PDB file from url {pdb_file_url}."
) from http_error
else:
pdb_file = Path(pdb_file)
if not pdb_file.exists():
raise FileNotFoundError(f"File {pdb_file} not found.")
pdb_file_handle = open(pdb_file, "r", encoding="utf-8")
if category_names is None:
category_names = ["_atom_site"]
for category_name in category_names:
if category_name not in IMPLEMENTED_PDB_CATS:
implemented = ", ".join(IMPLEMENTED_PDB_CATS)
raise NotImplementedError(
f"Only {implemented} are implemented for the PDB format."
)
data[category_name] = pd.DataFrame()
num_nmr_models = 0
n_model_lines = 0
nmr_model = -1 # -1 means there is no NMR model
n_endmdl_lines = 0
# atom_site_rows = []
column_name_types = [
("record_name", "U6"),
("atom_number", "i4"),
("atom_name", "U4"),
("alt_loc", "U1"),
("residue_name", "U4"),
("chain_id", "U1"),
("residue_number", "i2"),
("insertion", "U1"),
("x_coord", "f4"),
("y_coord", "f4"),
("z_coord", "f4"),
("occupancy", "f4"),
("b_factor", "f4"),
("segment_id", "U4"),
("element_symbol", "U2"),
("charge", "U2"),
("nmr_model", "i2"),
]
if not need_ter_lines:
all_atom_site_reads = ("ATOM", "HETATM") + NMR_MDL
else:
all_atom_site_reads = ATOM_SITE + NMR_MDL # type: ignore
if pdb_file is not None: # This check is just for mypy
file_stat = os.stat(pdb_file)
total_lines = int(file_stat.st_size / 81) + 20
# Will still possibly errors if there are many chains for pdbfixer files.
else:
total_lines = 100000
n_record = 0
n_lines_till_atom_lines = 0
with pdb_file_handle as lines:
for line in lines:
if line.startswith(all_atom_site_reads) and "_atom_site" in category_names:
if (not n_record) and line.startswith(("ATOM", "HETATM", "MODEL")):
array = np.zeros(
total_lines - n_lines_till_atom_lines, column_name_types
)
if line.startswith(("ATOM", "HETATM")):
array[n_record] = _split_atom_line(
line,
nmr_model=nmr_model,
allow_chimera=allow_chimera,
is_ter_line=False,
)
n_record += 1
elif need_ter_lines and line.startswith("TER"):
array[n_record] = _split_atom_line(
line,
nmr_model=nmr_model,
allow_chimera=allow_chimera,
is_ter_line=True,
)
n_record += 1
elif line.startswith("MODEL"):
n_model_lines += 1
nmr_model = int(line[6:].strip())
elif line.startswith("ENDMDL"):
n_endmdl_lines += 1
if num_nmr_models and n_endmdl_lines == num_nmr_models:
break
else: # line.startswith("NUMMDL")
num_nmr_models = int(line[6:].strip())
elif line.startswith("SEQRES") and "_seq_res" in category_names:
chain_residues: dict[str, list[str]] = defaultdict(list)
chain_lengths: dict[str, int] = defaultdict(int)
while line.startswith("SEQRES"):
items: list[str] = line.strip().split()
chain_id: str = items[2]
chain_length: int = int(items[3])
chain_lengths[chain_id] = chain_length
chain_residues[chain_id].extend(items[4:])
if not n_record and "_atom_site" in category_names:
n_lines_till_atom_lines += 1
line = pdb_file_handle.readline()
elif line.startswith(CHEM_COMP) and "_chem_comp" in category_names:
# See: https://www.wwpdb.org/documentation/file-format-content/format33/sect4.html
comps: list[dict] = []
while line.startswith(CHEM_COMP):
if line.startswith("HET "):
comp: dict[str, str] = {}
het_id = line[7:10].strip()
chain_id = line[12]
seq_num = line[13:17].strip()
i_code = line[17]
num_het_atoms = line[20:25].strip()
text = line[30:70].strip()
comp["het_id"] = het_id
comp["chain_id"] = chain_id
comp["seq_num"] = seq_num
comp["i_code"] = i_code
comp["num_het_atoms"] = num_het_atoms
comp["text"] = text
comps.append(comp)
elif line.startswith("HETNAM"):
continuation = line[8:10].strip()
het_id = line[11:14].strip()
text = line[15:70].strip()
for i, comp in enumerate(comps):
if comp["het_id"] == het_id:
if not continuation:
comps[i]["name"] = text
else:
comps[i]["name"] += text
elif line.startswith("HETSYN"):
continuation = line[8:10].strip()
het_id = line[11:14].strip()
het_synonyms = line[15:70].strip()
for i, comp in enumerate(comps):
if comp["het_id"] == het_id:
if not continuation:
comps[i]["synonyms"] = het_synonyms
else:
comps[i]["synonyms"] += het_synonyms
elif line.startswith("FORMUL"):
comp_num = int(line[8:10].strip())
het_id = line[12:15].strip()
continuation = line[16:18].strip()
asterisk = line[17]
text = line[19:70].strip()
for i, comp in enumerate(comps):
if comp["het_id"] == het_id:
if not continuation:
comps[i]["comp_num"] = comp_num
comps[i]["comp_formula"] = text
if asterisk == "*":
comps[i]["is_water"] = True
comps[i]["comp_formula"] = "H2 O"
else:
comps[i]["is_water"] = False
else:
comps[i]["comp_formula"] += text
line = pdb_file_handle.readline()
if not n_record and "_atom_site" in category_names:
n_lines_till_atom_lines += 1
if (
"_atom_site" in category_names
and num_nmr_models != n_model_lines
and num_nmr_models != 0
):
warnings.warn(
f"The NUMMDL says {num_nmr_models} NMR models, but {n_model_lines} found.",
RuntimeWarning,
stacklevel=2,
)
if "_atom_site" in category_names:
df_atom_site = pd.DataFrame.from_records(array[:n_record])
data["_atom_site"] = df_atom_site
if not n_model_lines:
data["_atom_site"].drop(columns=["nmr_model"], inplace=True)
if "_seq_res" in category_names:
chain_sequences: dict[str, str] = {}
for chain_id, residues in chain_residues.items():
chain_sequences[chain_id] = "".join([AMINO_ACIDS[i] for i in residues])
if len(residues) != chain_lengths[chain_id]:
warnings.warn(
f"Cols 14-17 value {chain_lengths[chain_id]} contrasts real length {len(residues)}.", # noqa
RuntimeWarning,
stacklevel=2,
)
chain_ids = chain_sequences.keys()
chain_num_residues = [len(chain_sequences[chain_id]) for chain_id in chain_ids]
chain_seqs = [chain_sequences[chain_id] for chain_id in chain_ids]
data["_seq_res"] = pd.DataFrame(
{
"chain_id": chain_ids,
"chain_sequence": chain_seqs,
"chain_length": chain_num_residues,
}
)
if "_chem_comp" in category_names:
data["_chem_comp"] = pd.DataFrame(comps)
return data
[docs]
def _split_atom_line(
line: str,
nmr_model: int = -1,
allow_chimera: bool = True,
is_ter_line: bool = False,
) -> tuple:
"""Internal function to parse a single line belonging to ``ATOM``, ``HETATM``,
or ``TER`` lines.
Args:
line (required): A ``ATOM``, ``HETATM``, or ``TER`` line.
nmr_model (optional): the NMR model number for the line; ``-1`` means
not an NMR model. Defaults to **-1**.
allow_chimera (optional): try to parse as a Chimera-formatted PDB file.
Defaults to **True**
is_ter_line (optional): whether the line starts with ``TER``.
Defaults to **False**.
Returns:
parsed values as a tuple.
"""
if not (is_ter_line or allow_chimera):
return (
line[0:6],
line[6:11],
line[12:16],
line[16],
line[17:20],
line[21],
line[22:26],
line[26],
line[30:38],
line[38:46],
line[46:54],
line[54:60],
line[60:66],
line[72:76],
line[76:78],
line[78:80],
nmr_model,
)
elif is_ter_line:
if allow_chimera:
record_name = line[0:5] + " "
residue_name = line[17:21]
else:
record_name = line[0:6]
residue_name = line[17:20]
return (
record_name,
line[6:11],
line[12:16],
line[16],
residue_name,
line[21],
line[22:26],
line[26],
None,
None,
None,
None,
None,
line[72:76],
line[76:78],
line[78:80],
nmr_model,
)
else:
if line[0:5] == "HETAT":
record_name = "HETATM"
atom_number = line[6:11]
else:
record_name = line[0:5] + " "
atom_number = line[5:11]
return (
record_name,
atom_number,
line[12:16],
line[16],
line[17:21],
line[21],
line[22:26],
line[26],
line[30:38],
line[38:46],
line[46:54],
line[54:60],
line[60:66],
line[72:76],
line[76:78],
line[78:80],
nmr_model,
)