Source code for cosmograph.util

"""
Utils to prepare data for cosmos
"""

import json
from typing import Dict, Any
from functools import lru_cache, partial
from itertools import count
import re

from IPython.display import HTML, Javascript, display as ipython_display
from dol import (
    TextFiles,
    Files,
    JsonFiles,
    wrap_kvs,
    filt_iter,
    invertible_maps,
    add_ipython_key_completions,
    Pipe,
)
from dol.sources import AttrContainer

import pandas as pd

# --------------------------------------------------------------------------------------
# type annotations

CosmoKwargs = Dict[str, Any]

# --------------------------------------------------------------------------------------
# Constants and data access

try:
    import importlib.resources

    _files = importlib.resources.files  # only valid in 3.9+
except AttributeError:
    import importlib_resources  # needs pip install

    _files = importlib_resources.files

files = _files("cosmograph")
data_dir = files / "data"
data_dir_path = str(data_dir)
js_dir = files / "js"
js_dir_path = str(js_dir)

data_files = Files(data_dir_path)
json_files = filt_iter.suffixes(".json")(JsonFiles(data_dir_path))


# color_names_set = set(json_files['color_names.json'])  # removed because problematic on windows

# --------------------------------------------------------------------------------------
# Pipeline for Cosmo


[docs] class Pipeline(Pipe): def add(self, *additional_funcs): funcs = tuple(self.funcs) + tuple(additional_funcs) return Pipeline(*funcs)
# -------------------------------------------------------------------------------------- # Extracting the interface from the data import typing import re from typing import Any, Callable from i2 import Sig, Param, params_to_docstring PARAMS_SSOT_PATH = data_dir / "params_ssot.json" # TODO: add ssot validation def _params_ssot(param_names=None): params_ssot = json.loads(PARAMS_SSOT_PATH.read_text()) if param_names is not None: params_ssot = [d for d in params_ssot if d["name"] in param_names] return params_ssot def cosmograph_base_signature(param_names=None): params_ssot = _params_ssot(param_names) for d in params_ssot: d.pop("description", None) # delete description if any d.update( annotation=str_to_annotation(d["annotation"]) ) # convert annotation to type d["kind"] = Sig.KEYWORD_ONLY return Sig([Param(**d) for d in params_ssot])
[docs] def validate_signature(sig): """Validate a signature. Namely, check that all non-None default types are "subclasses" of the annotation type. TODO: Implement """ pass
[docs] def cosmograph_base_docs(param_names=None, take_name_of_types=True): """Get the params information part of a docstring""" params_ssot = _params_ssot(param_names) return params_to_docstring(params_ssot)
[docs] def is_parameterized_type(obj): """ Determines if an object is a parameterized type (e.g., list[int]) or not. Args: obj: The object to check. Returns: True if it's a parameterized type, False otherwise. """ return typing.get_origin(obj) is not None
[docs] def annotation_to_str(annotation, *, remove_typing_prefix=True): """ Encodes Python type annotations as strings for JSON serialization. This is part of a specialized Python-to-string codec designed to encode and decode Python type annotations in a way that allows them to be serialized and deserialized in JSON-compatible formats. Args: annotation: The type annotation to encode. Returns: A string representation of the annotation. Raises: ValueError: If the annotation type is unknown. Examples: >>> original = [ ... bool, ... float, ... list[Any], ... typing.Union[str, list[float]], ... int, ... str, ... list[float], ... list[list[float]], ... typing.Union[int, str], ... list[str], ... object, ... list[int], ... typing.Callable[[typing.Dict[str, Any]], Any] ... ] >>> encoded = list(map(annotation_to_str, original)) >>> encoded # doctest: +NORMALIZE_WHITESPACE ['bool', 'float', 'list[Any]', 'Union[str, list[float]]', 'int', 'str', \ 'list[float]', 'list[list[float]]', 'Union[int, str]', 'list[str]', 'object', 'list[int]', \ 'Callable[[Dict[str, Any]], Any]'] """ if isinstance(annotation, str): annotation_str = annotation elif annotation == Sig.empty: annotation_str = "Any" elif is_parameterized_type(annotation): annotation_str = str(annotation) elif isinstance(annotation, type): annotation_str = annotation.__name__ else: raise ValueError(f"Unknown annotation type: {annotation}") if remove_typing_prefix: typing_module_pattern = r"(?:(?<=\W)|^)typing\." annotation_str = re.sub(typing_module_pattern, "", annotation_str) return annotation_str
[docs] def default_is_safe(string: str) -> bool: """ Checks if a string is safe to evaluate as a type annotation. The string is considered safe if: - It contains only alphanumericals, spaces, dots, commas, and brackets. - It starts with one of a predefined list of allowed prefixes. Args: string: The string to check. Returns: True if the string is safe, False otherwise. """ allowed_prefixes = ["bool", "float", "list", "typing", "int", "object", "str"] allowed_chars = re.compile(r"^[\w\s.,\[\]()]+$") return allowed_chars.match(string) is not None and any( string.strip().startswith(prefix) for prefix in allowed_prefixes )
[docs] def str_to_annotation(string, is_safe: Callable[[str], bool] = default_is_safe): """ Decodes a string representation of a Python type annotation back into the annotation. Args: string: The string representation of the annotation. is_safe: A function that determines if the string is safe to evaluate. Returns: The decoded Python type annotation. Raises: ValueError: If the string is not safe to evaluate. """ if not is_safe(string): raise ValueError( f"The string '{string}' is not considered safe for evaluation." ) return eval(string)
# -------------------------------------------------------------------------------------- # General/Misc utils from functools import lru_cache import re
[docs] def move_to_front(df: pd.DataFrame, cols) -> pd.DataFrame: """ Move the columns in `cols` to the front of the DataFrame """ return df[cols + [col for col in df.columns if col not in cols]]
def ordered_unique(iterable): seen = set() seen_add = seen.add return (x for x in iterable if not (x in seen or seen_add(x))) class IpythonObjects: def __init__(self, *objs): self.objs = objs def display(self): for obj in self.objs: return ipython_display(obj) # def __repr__(self): # return self.display() def add_attributes(obj, **attrs): for k, v in attrs.items(): setattr(obj, k, v) return obj def camel_to_snake_case(name): # Replace any non-word character with an underscore name = re.sub(r"\W+", "_", name) # Insert an underscore before any capital letter (except the first one) name = re.sub(r"([a-z])([A-Z])", r"\1_\2", name).lower() return name def snake_to_camel_case(name, first_char_trans=str.lower): # Split the string into words separated by underscores words = name.split("_") # Convert each word to title case (i.e., with the first letter capitalized) words = [w.capitalize() for w in words] # Join the words together into a single string result = "".join(words) # process the first character (possibly) if first_char_trans and result: result = first_char_trans(result[0]) + result[1:] return result def _assert_camel_and_snake_sanity(camel_cases, snake_cases): """ Make sure that our camel to snake functions can actually fall back on their feet. (So we don't have to keep the mapping around) """ # For now, they don't (need to discuss) so muting this function # agree_1 = [ # x == y for x, y in zip( # map(camel_to_snake_case, camel_cases), # list(snake_cases) # ) # ] # agree_2 = [ # x == y for x, y in zip( # list(camel_cases), # map(snake_to_camel_case, snake_cases), # ) # ] # agree = [x and y for x, y in zip(agree_1, agree_2)] # if not all(agree): # i = agree.index(False) # raise AssertionError( # f"{list(camel_cases)[i]} and {list(snake_cases)[i]} do not agree" # ) # _cosmos_config_info = cosmos_config_info() # -------------------------------------------------------------------------------------- # Old stuff (TODO: Deprecate and remove) def _postprocess(func, egress): return Pipe(func, egress) postprocess = lambda egress: partial(_postprocess, egress=egress) display_output = postprocess(ipython_display) to_html_obj = postprocess(HTML) to_js_obj = postprocess(Javascript)
[docs] @add_ipython_key_completions @wrap_kvs(key_of_id=lambda x: x[: -len(".js")], id_of_key=lambda x: x + ".js") @filt_iter(filt=lambda x: x.endswith(".js")) class JsFiles(TextFiles): """A store of js files"""
_replace_non_alphanumerics_by_underscore = partial(re.compile(r"\W").sub, "_") # Note: js_files_as_attrs is not used in the module, but can be useful when working # in a notebook, or console, where we might want the convenience of tab-completion of # attributes
[docs] def js_files_as_attrs(rootdir): """ Will make a JsFiles, but where the keys are available as attributes. To do so, any non alphanumerics of file name are replaced with underscore, and there can be no two files that collide with that key transformation! """ s = JsFiles(rootdir) key_for_id = {id_: _replace_non_alphanumerics_by_underscore(id_) for id_ in s} key_for_id, id_for_key = invertible_maps(key_for_id) return AttrContainer( **wrap_kvs(s, key_of_id=key_for_id.get, id_of_key=id_for_key.get) )
def _nodes_from_links(links): def _yield_nodes_from_links(links): for link in links: yield link["source"] yield link["target"] return [{"id": x} for x in ordered_unique(_yield_nodes_from_links(links))] from cosmograph.validation import is_links, is_graph_json, is_nodes