Source code for moldf.read_jcsv

# MolDF
# Author: Ruibin Liu <ruibinliuphd@gmail.com>
# License: MIT
# Code Repository: https://github.com/Ruibin-Liu/MolDF
"""JCSV format reading.

Reads a JCSV file into a dict of ``Pandas DataFrame`` s.
It is not limited to any molecular format.

"""
from __future__ import annotations

import csv
import os
import warnings
from collections import defaultdict

import pandas as pd  # type: ignore


[docs] def read_jcsv( jcsv_file: str | os.PathLike, category_names: list | None = None, ) -> dict[str, pd.DataFrame]: """Reads a JCSV file by name. Currently no molecular file repository has JCSV files so we can only read from a file name/path. Args: jcsv_file (required): JCSV file name/path. category_names (optional): a list of category names. If ``None``, all categories are read. Defaults to **None**. Returns: a dict of Pandas DataFrames for each category. Raises: TypeError: if ``category_names`` is not a list of strings. ValueError: if any of the ``category_names`` has double quotes or if the number of items in any line does not match the number of column names in the same category. """ read_all = False if category_names is not None: if not isinstance(category_names, list): raise TypeError(f"{category_names} is not a list") for cat in category_names: if not isinstance(cat, str): raise TypeError(f"{cat} is not a str") elif '"' in cat: raise ValueError(f"{cat} has double quotes.") else: read_all = True results: dict[str, pd.DataFrame] = {} meta_found: bool | int = False with open(jcsv_file, "r") as jf: jf_reader = csv.reader(jf, delimiter=",", quotechar='"') for i, row in enumerate(jf_reader): if i == 0 and row[0] == "#jcsv_meta": meta_found = i + 1 n_lines = _count_n_lines(jcsv_file) elif not meta_found: results = _read_jcsv_by_line(jcsv_file, category_names=category_names) break elif meta_found and i == meta_found: meta_col_names = row col_data: dict[str, list] = defaultdict(list) elif row[0][0] == "#": break elif meta_found: if len(row) != len(meta_col_names): message = "Meta data has unmatched number" message += f" of items in row '{row}' with the column" message += f" names: {meta_col_names}" raise ValueError(message) value: str | int = "" for col_name, value in zip(meta_col_names, row): if col_name == "start_line_index": value = int(value) col_data[col_name].append(value) if meta_found: meta = list(zip(col_data["category"], col_data["start_line_index"])) start_line_index: int | str = 0 for i, (category_name, start_line_index) in enumerate(meta[1:]): if read_all or ( isinstance(category_names, list) and category_name in category_names ): start_line_index = int(start_line_index) skip_rows = [j for j in range(start_line_index)] if i < len(meta) - 2: next_start = int(meta[i + 2][1]) ending_rows = [j for j in range(n_lines) if j > (next_start - 2)] skip_rows.extend(ending_rows) results[category_name] = pd.read_csv( jcsv_file, sep=",", quotechar='"', skiprows=skip_rows ) return results
[docs] def _read_jcsv_by_line( jcsv_file: str | os.PathLike, category_names: list | None = None, ) -> dict[str, pd.DataFrame]: """Reads JCSV file line by line when the file has no meta data to select blocks. Args: jcsv_file (required): JCSV file name/path. category_names (optional): a list of category names. If ``None``, all categories are read. Defaults to **None**. It is passed by the ``read_jcsv`` caller, so it is not sanitized here. Returns: a dict of Pandas DataFrames for each category. Raises: ValueError: if the number of items in any line does not match the number of column names in the same category. """ results: dict[str, pd.DataFrame] = {} read_all = False if category_names is None: read_all = True with open(jcsv_file, "r", encoding="utf-8") as jf: jf_reader = csv.reader(jf, delimiter=",", quotechar='"') req_cat_name_found: int | bool = False category_name = "" cols_data: dict[str, list] = defaultdict(list) if isinstance(category_names, list): category_names = list(category_names) for i, row in enumerate(jf_reader): if row[0][0] == "#": if read_all or ( isinstance(category_names, list) and row[0][1:] in category_names ): if category_name: results[category_name] = pd.DataFrame(cols_data) category_name = row[0][1:] cols_data = defaultdict(list) req_cat_name_found = i + 1 else: req_cat_name_found = False elif req_cat_name_found and i == req_cat_name_found: col_names = row elif req_cat_name_found: if len(row) != len(col_names): message = f"Category {category_name} has unmatched number" message += f" of items in row '{row}' with the column" message += f" names: {col_names}" raise ValueError(message) for col_name, value in zip(col_names, row): cols_data[col_name].append(value) if category_name: results[category_name] = pd.DataFrame(cols_data) if category_names is not None: for category_name in category_names: if category_name not in results: warnings.warn( "Category {category_name} not in {jcsv_file}, not read", RuntimeWarning, stacklevel=2, ) return results
[docs] def _count_n_lines(file_name: str | os.PathLike): """Gets the number of lines in a file. From https://stackoverflow.com/a/68385697/10094189 Args: file_name (required): file name or path. """ def _make_gen(reader): while True: b = reader(2**16) if not b: break yield b with open(file_name, "rb") as f: count = sum(buf.count(b"\n") for buf in _make_gen(f.raw.read)) return count