From 4c7954a0b9565633e5abfb50e2c69b9530524115 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Martin=20R=C3=B6bke?= Date: Wed, 18 Dec 2024 18:45:14 +0100 Subject: [PATCH] Format files with black Format imports and lines with https://marketplace.visualstudio.com/items?itemName=ms-python.black-formatter Add pragmas for new lines. Expected to **decrease coverage** by a few percent because of the better looking lines ;) --- scripts/build_directory_md.py | 27 +- tdvisu/dijkstra.py | 21 +- tdvisu/logging.yml | 30 +- tdvisu/reader.py | 16 +- tdvisu/svgjoin.py | 168 +++++---- tdvisu/utilities.py | 232 +++++++------ tdvisu/version.py | 4 +- tdvisu/visualization.py | 634 +++++++++++++++++++--------------- tdvisu/visualization_data.py | 94 ++--- test/test_reader.py | 102 ++++-- 10 files changed, 741 insertions(+), 587 deletions(-) diff --git a/scripts/build_directory_md.py b/scripts/build_directory_md.py index ea2ceed..562bd4c 100644 --- a/scripts/build_directory_md.py +++ b/scripts/build_directory_md.py @@ -32,22 +32,24 @@ URL_BASE = "https://github.com/VaeterchenFrost/tdvisu/blob/master" -AFFECTED_EXT = ('.py', '.ipynb',) +AFFECTED_EXT = ( + ".py", + ".ipynb", +) -EXCLUDED_FILENAMES = ('__init__.py',) +EXCLUDED_FILENAMES = ("__init__.py",) -def good_file_paths(top_dir: str = '.') -> Iterator[str]: +def good_file_paths(top_dir: str = ".") -> Iterator[str]: """Return relative path to files with extension in AFFECTED_EXT.""" for dir_path, dir_names, filenames in os.walk(top_dir): - dir_names[:] = [d for d in dir_names - if d != 'scripts' and d[0] not in '._'] + dir_names[:] = [d for d in dir_names if d != "scripts" and d[0] not in "._"] for filename in filenames: if filename in EXCLUDED_FILENAMES: continue if os.path.splitext(filename)[1] in AFFECTED_EXT: normalized_path = os.path.normpath(dir_path) - if normalized_path != '.': + if normalized_path != ".": yield os.path.join(normalized_path, filename) else: yield filename @@ -68,19 +70,20 @@ def print_path(old_path: str, new_path: str) -> str: return new_path -def print_directory_md(top_dir: str = '.') -> None: +def print_directory_md(top_dir: str = ".") -> None: """Print the markdown for files with selected extensions recursing top_dir.""" - old_path = '' + old_path = "" for filepath in sorted(good_file_paths(top_dir)): filepath, filename = os.path.split(filepath) if filepath != old_path: old_path = print_path(old_path, filepath) indent = (filepath.count(os.sep) + 1) if filepath else 0 - url = '/'.join((URL_BASE, *[quote(part) - for part in (filepath, filename) if part])) - filename = os.path.splitext(filename.replace('_', ' ').title())[0] + url = "/".join( + (URL_BASE, *[quote(part) for part in (filepath, filename) if part]) + ) + filename = os.path.splitext(filename.replace("_", " ").title())[0] print(f"{md_prefix(indent)} [{filename}]({url})") if __name__ == "__main__": - print_directory_md('.') + print_directory_md(".") diff --git a/tdvisu/dijkstra.py b/tdvisu/dijkstra.py index 338a7f7..8f30497 100644 --- a/tdvisu/dijkstra.py +++ b/tdvisu/dijkstra.py @@ -48,7 +48,7 @@ from itertools import count -def bidirectional_dijkstra(edges, source, target, weight='weight'): +def bidirectional_dijkstra(edges, source, target, weight="weight"): r"""Dijkstra's algorithm for shortest paths using bidirectional search. Parameters @@ -127,7 +127,7 @@ def bidirectional_dijkstra(edges, source, target, weight='weight'): push = heappush pop = heappop # Init: [Forward, Backward] - dists = [{}, {}] # dictionary of final distances + dists = [{}, {}] # dictionary of final distances paths = [{source: [source]}, {target: [target]}] # dictionary of paths fringe = [[], []] # heap of (distance, node) for choosing node to expand seen = [{source: 0}, {target: 0}] # dict of distances to seen nodes @@ -164,8 +164,7 @@ def bidirectional_dijkstra(edges, source, target, weight='weight'): vw_length = dists[direction][v] + weight(w, v, d) if w in dists[direction]: if vw_length < dists[direction][w]: - raise ValueError( - "Contradictory paths found: negative weights?") + raise ValueError("Contradictory paths found: negative weights?") elif w not in seen[direction] or vw_length < seen[direction][w]: # relaxing seen[direction][w] = vw_length @@ -229,12 +228,14 @@ def _weight_function(weight, multigraph: bool = False): return lambda u, v, data: data.get(weight, 1) -if __name__ == "__main__": # pragma: no cover +if __name__ == "__main__": # pragma: no cover # Show one example and print to console - EDGES = {2: {1: {}, 3: {}, 4: {}}, - 1: {2: {}}, - 3: {2: {}}, - 4: {2: {}, 5: {}}, - 5: {4: {}}} + EDGES = { + 2: {1: {}, 3: {}, 4: {}}, + 1: {2: {}}, + 3: {2: {}}, + 4: {2: {}, 5: {}}, + 5: {4: {}}, + } RESULT = bidirectional_dijkstra(EDGES, 3, 5) print(RESULT) diff --git a/tdvisu/logging.yml b/tdvisu/logging.yml index 3c23a55..b486892 100644 --- a/tdvisu/logging.yml +++ b/tdvisu/logging.yml @@ -1,33 +1,29 @@ -version: 1 +--- formatters: - simple: - format: "%(asctime)s %(levelname)s %(message)s" - datefmt: "%H:%M:%S" full: - format: "%(asctime)s,%(msecs)d %(levelname)s[%(filename)s:%(lineno)d] %(message)s" - datefmt: "%Y-%m-%d %H:%M:%S" + datefmt: '%Y-%m-%d %H:%M:%S' + format: '%(asctime)s,%(msecs)d %(levelname)s[%(filename)s:%(lineno)d] %(message)s' + simple: + datefmt: '%H:%M:%S' + format: '%(asctime)s %(levelname)s %(message)s' handlers: console: class: logging.StreamHandler - level: WARNING formatter: full + level: WARNING stream: ext://sys.stdout loggers: - visualization.py: - level: NOTSET - - svgjoin.py: + construct_dpdb_visu.py: level: NOTSET - reader.py: level: NOTSET - - construct_dpdb_visu.py: + svgjoin.py: level: NOTSET - utilities.py: level: NOTSET - + visualization.py: + level: NOTSET root: - level: WARNING handlers: [console] + level: WARNING +version: 1 \ No newline at end of file diff --git a/tdvisu/reader.py b/tdvisu/reader.py index bee21ee..c377624 100644 --- a/tdvisu/reader.py +++ b/tdvisu/reader.py @@ -46,11 +46,12 @@ def main() from tdvisu.utilities import add_edge_to -logger = logging.getLogger('reader.py') +logger = logging.getLogger("reader.py") -class Reader(): +class Reader: """Base class for string-readers.""" + @classmethod def from_filename(cls, fname) -> Reader: with open(fname, "r") as file: @@ -150,7 +151,7 @@ def store_problem_vars(self): def body(self, lines) -> None: """Store the content from the given lines in the edges and adjacency_dict.""" - if self.format not in ('col', 'tw'): + if self.format not in ("col", "tw"): logger.error("Not a tw file!") sys.exit(1) @@ -163,7 +164,8 @@ def body(self, lines) -> None: logger.warning( "Expected exactly 2 vertices at line %d, but %d found", lineno, - len(line)) + len(line), + ) vertex1 = int(line[0]) vertex2 = int(line[1]) @@ -171,5 +173,7 @@ def body(self, lines) -> None: if len(self.edges) != self.num_edges: logger.warning( - "Number of edges mismatch preamble (%d vs %d)", len( - self.edges), self.num_edges) + "Number of edges mismatch preamble (%d vs %d)", + len(self.edges), + self.num_edges, + ) diff --git a/tdvisu/svgjoin.py b/tdvisu/svgjoin.py index 660246e..ea06ab8 100644 --- a/tdvisu/svgjoin.py +++ b/tdvisu/svgjoin.py @@ -30,7 +30,7 @@ from tdvisu.utilities import gen_arg -LOGGER = logging.getLogger('svg_join.py') +LOGGER = logging.getLogger("svg_join.py") # indices @@ -41,19 +41,20 @@ def test_viewbox(viewbox: List[float]): """Should be of form [0, 0, +x, +y]""" assert len(viewbox) == 4, "viewbox should have exactly 4 values" - assert viewbox[:2] == [0., 0.], "[min-x,min-y] should be zero." + assert viewbox[:2] == [0.0, 0.0], "[min-x,min-y] should be zero." assert viewbox[WIDTH] > 0, "should have positive width" assert viewbox[HEIGHT] > 0, "should have positive height" def append_svg( - first_dict: dict, - snd_dict: dict, - centerpad: float = 0., - v_bottom: float = None, - v_top: float = None, - scale2: float = 1, - ndigits: int = 3) -> dict: + first_dict: dict, + snd_dict: dict, + centerpad: float = 0.0, + v_bottom: float = None, + v_top: float = None, + scale2: float = 1, + ndigits: int = 3, +) -> dict: """Modifies the first of two xml-svg dictionary containing a viewbox to append the second svg to the right of the first image. @@ -89,8 +90,8 @@ def append_svg( """ - first_svg = first_dict['svg'] - second_svg = snd_dict['svg'] + first_svg = first_dict["svg"] + second_svg = snd_dict["svg"] # The value of the viewBox attribute is a list of four numbers: # min-x, min-y, width and height. @@ -100,53 +101,54 @@ def append_svg( # See also # https://developer.mozilla.org/en-US/docs/Web/SVG/Attribute/viewBox - pattern = re.compile(r'\s*,\s*|\s+') - viewbox1: List[float] = list( - map(float, re.split(pattern, first_svg['@viewBox']))) - viewbox2: List[float] = list( - map(float, re.split(pattern, second_svg['@viewBox']))) + pattern = re.compile(r"\s*,\s*|\s+") + viewbox1: List[float] = list(map(float, re.split(pattern, first_svg["@viewBox"]))) + viewbox2: List[float] = list(map(float, re.split(pattern, second_svg["@viewBox"]))) test_viewbox(viewbox1) # viewbox1 validation test_viewbox(viewbox2) # viewbox2 validation trafo_result = f_transform( - viewbox1[HEIGHT], viewbox2[HEIGHT], v_bottom, v_top, scale2) - vertical_snd = trafo_result['vertical_snd'] - combine_height = trafo_result['combine_height'] - scale2 = trafo_result['scale2'] + viewbox1[HEIGHT], viewbox2[HEIGHT], v_bottom, v_top, scale2 + ) + vertical_snd = trafo_result["vertical_snd"] + combine_height = trafo_result["combine_height"] + scale2 = trafo_result["scale2"] LOGGER.info( "Transformed with vertical_snd=%s combine_height=%s scale2=%s", - *(vertical_snd, combine_height, scale2)) + *(vertical_snd, combine_height, scale2), + ) viewbox1[HEIGHT] = round(combine_height - 0.5) h_displacement = float(viewbox1[WIDTH]) + centerpad - viewbox1[WIDTH] = round(max(float(viewbox1[WIDTH]), - h_displacement + scale2 * viewbox2[WIDTH]) - 0.5) + viewbox1[WIDTH] = round( + max(float(viewbox1[WIDTH]), h_displacement + scale2 * viewbox2[WIDTH]) - 0.5 + ) # new viewbox - first_svg['@viewBox'] = ' '.join(map(str, viewbox1)) # new viewbox + first_svg["@viewBox"] = " ".join(map(str, viewbox1)) # new viewbox # update width and height - first_svg['@width'] = f"{viewbox1[WIDTH]}pt" - first_svg['@height'] = f"{viewbox1[HEIGHT]}pt" + first_svg["@width"] = f"{viewbox1[WIDTH]}pt" + first_svg["@height"] = f"{viewbox1[HEIGHT]}pt" # move second image group next to first v_transform = round(max(0, vertical_snd), ndigits) transform = f"translate({h_displacement} {v_transform}) scale({scale2}) " # now scales with scale2 - transform += second_svg['g'].get('@transform', '') - second_svg['g']['@transform'] = transform + transform += second_svg["g"].get("@transform", "") + second_svg["g"]["@transform"] = transform if vertical_snd < 0: # move first image, add after other transform - transform = first_svg['g'].get('@transform', '') + transform = first_svg["g"].get("@transform", "") transform += f" translate(0 {round(-vertical_snd,ndigits)})" - first_svg['g']['@transform'] = transform + first_svg["g"]["@transform"] = transform # add group to list of 'g' - if isinstance(first_svg['g'], list): - first_svg['g'].append(second_svg['g']) + if isinstance(first_svg["g"], list): + first_svg["g"].append(second_svg["g"]) else: - first_svg['g'] = [first_svg['g'], second_svg['g']] + first_svg["g"] = [first_svg["g"], second_svg["g"]] return first_dict @@ -165,10 +167,13 @@ def append_svg( """ -def f_transform(h_one_, h_two_, - v_bottom: Union[float, str, None] = None, - v_top: Union[float, str, None] = None, - scale2: float = 1) -> Dict[str, float]: +def f_transform( + h_one_, + h_two_, + v_bottom: Union[float, str, None] = None, + v_top: Union[float, str, None] = None, + scale2: float = 1, +) -> Dict[str, float]: """Calculate vertical position and scaling of second image. The input for v_bottom, v_top is in units from\n @@ -197,14 +202,20 @@ def f_transform(h_one_, h_two_, 'vertical_snd','combine_height','scale2' """ - v_displacement = 0. + v_displacement = 0.0 # cast to float h_one = float(h_one_) h_two = float(h_two_) LOGGER.debug("Calculating with h_one=%f h_two=%f", h_one, h_two) # normalize values - conversion = {'bottom': 1, 'center': 0.5, 'top': 0, 'inf': 0, - -float('inf'): 1, float('inf'): 0} + conversion = { + "bottom": 1, + "center": 0.5, + "top": 0, + "inf": 0, + -float("inf"): 1, + float("inf"): 0, + } v_bottom = conversion.get(v_bottom, v_bottom) if isinstance(v_bottom, str): raise ValueError(f"Encountered {v_bottom=} not in {conversion=}") @@ -227,7 +238,9 @@ def f_transform(h_one_, h_two_, # moving the centerline according to value and scaling LOGGER.info( "The values of 'v_top', 'v_bottom' are both interpreted " - "as %f - interpreting as centerline!", v_top) + "as %f - interpreting as centerline!", + v_top, + ) half = size2 / h_one / 2 v_top = v_top - half v_bottom = v_bottom + half @@ -245,22 +258,25 @@ def f_transform(h_one_, h_two_, # bottom - top combine_height = (max(1, v_bottom) - min(0, v_top)) * h_one - return {'vertical_snd': v_displacement, - 'combine_height': combine_height, - 'scale2': scale2} + return { + "vertical_snd": v_displacement, + "combine_height": combine_height, + "scale2": scale2, + } def svg_join( - base_names: list, - folder: str = '', - num_images: int = 1, - outname: str = 'combined', - suffix: str = '%d.svg', - preserve_aspectratio: str = 'xMinYMin', - padding: Union[int, Iterable[int]] = 0, - scale2: Union[float, Iterable[float]] = 1, - v_top: Union[None, float, str, Iterable[Union[None, float, str]]] = None, - v_bottom: Union[None, float, str, Iterable[Union[None, float, str]]] = None): + base_names: list, + folder: str = "", + num_images: int = 1, + outname: str = "combined", + suffix: str = "%d.svg", + preserve_aspectratio: str = "xMinYMin", + padding: Union[int, Iterable[int]] = 0, + scale2: Union[float, Iterable[float]] = 1, + v_top: Union[None, float, str, Iterable[Union[None, float, str]]] = None, + v_bottom: Union[None, float, str, Iterable[Union[None, float, str]]] = None, +): """ Joines different svg-images from tdvisu placed in 'folder' for every timestep in the horizontal order specified in 'in_names'. @@ -309,7 +325,7 @@ def svg_join( LOGGER.warning("svg_join called with one file - nothing to join!") return # use path library for normalizing the path - folder = Path(folder if folder else '') + folder = Path(folder if folder else "") resultname = str(folder / outname) + suffix names = [str(folder / name) + suffix for name in base_names] @@ -326,20 +342,30 @@ def svg_join( with open(names[1] % step) as file: im_2 = benedict.from_xml(file.read()) - result = append_svg(im_1, im_2, centerpad=next(gen_padding), - v_bottom=next(gen_v_bottom), - v_top=next(gen_v_top), scale2=next(gen_scale2)) + result = append_svg( + im_1, + im_2, + centerpad=next(gen_padding), + v_bottom=next(gen_v_bottom), + v_top=next(gen_v_top), + scale2=next(gen_scale2), + ) # rest: for name in names[2:]: with open(name % step) as file: image = benedict.from_xml(file.read()) - result = append_svg(result, image, centerpad=next(gen_padding), - v_bottom=next(gen_v_bottom), - v_top=next(gen_v_top), scale2=next(gen_scale2)) - - result['svg']['@preserveAspectRatio'] = preserve_aspectratio - with open(resultname % step, 'w') as file: + result = append_svg( + result, + image, + centerpad=next(gen_padding), + v_bottom=next(gen_v_bottom), + v_top=next(gen_v_top), + scale2=next(gen_scale2), + ) + + result["svg"]["@preserveAspectRatio"] = preserve_aspectratio + with open(resultname % step, "w") as file: result.to_xml(output=file, pretty=True) if step < 10: @@ -349,9 +375,11 @@ def svg_join( if __name__ == "__main__": # pragma: no cover logging.basicConfig(level=logging.DEBUG) - svg_join(['TDStep', 'graph'], - 'Archive/WheelGraph7', - outname="default_06sc15_rise", - v_bottom=[1, .85, .7, .55, .4], - scale2=1.5, - num_images=5) + svg_join( + ["TDStep", "graph"], + "Archive/WheelGraph7", + outname="default_06sc15_rise", + v_bottom=[1, 0.85, 0.7, 0.55, 0.4], + scale2=1.5, + num_images=5, + ) diff --git a/tdvisu/utilities.py b/tdvisu/utilities.py index ada8225..ba9b6e3 100644 --- a/tdvisu/utilities.py +++ b/tdvisu/utilities.py @@ -25,19 +25,20 @@ import logging import logging.config from collections.abc import Iterable as iter_type -from configparser import ConfigParser, Error as CfgError, ParsingError +from configparser import ConfigParser +from configparser import Error as CfgError +from configparser import ParsingError from itertools import chain from pathlib import Path -from typing import (Any, Generator, Iterable, Iterator, - List, Tuple, TypeVar, Union) - -from tdvisu.version import __date__, __version__ +from typing import Any, Generator, Iterable, Iterator, List, Tuple, TypeVar, Union import yaml -LOGGER = logging.getLogger('utilities.py') +from tdvisu.version import __date__, __version__ -CFG_EXT = ('.ini', '.cfg', '.conf', '.config') +LOGGER = logging.getLogger("utilities.py") + +CFG_EXT = (".ini", ".cfg", ".conf", ".config") LOGLEVEL_EPILOG = """ Logging levels for Python: CRITICAL: 50 @@ -48,43 +49,43 @@ NOTSET: 0 (will traverse the logging hierarchy until a value is found) """ DEFAULT_LOGGING_CFG = { - 'version': 1, - 'formatters': { - 'simple': { - 'format': '%(asctime)s %(levelname)s %(message)s', - 'datefmt': '%H:%M:%S'}}, - 'handlers': { - 'console': { - 'class': 'logging.StreamHandler', - 'level': 'WARNING', - 'formatter': 'simple', - 'stream': 'ext://sys.stdout'}}, - 'loggers': { - 'visualization.py': { - 'level': 'NOTSET', - 'handlers': ['console'], - 'propagate': False}, - 'svgjoin.py': { - 'level': 'NOTSET', - 'handlers': ['console'], - 'propagate': False}, - 'reader.py': { - 'level': 'NOTSET', - 'handlers': ['console'], - 'propagate': False}, - 'construct_dpdb_visu.py': { - 'level': 'NOTSET', - 'handlers': ['console'], - 'propagate': False}}, - 'root': { - 'level': 'WARNING', - 'handlers': ['console']}} - -_T = TypeVar('_T') + "version": 1, + "formatters": { + "simple": { + "format": "%(asctime)s %(levelname)s %(message)s", + "datefmt": "%H:%M:%S", + } + }, + "handlers": { + "console": { + "class": "logging.StreamHandler", + "level": "WARNING", + "formatter": "simple", + "stream": "ext://sys.stdout", + } + }, + "loggers": { + "visualization.py": { + "level": "NOTSET", + "handlers": ["console"], + "propagate": False, + }, + "svgjoin.py": {"level": "NOTSET", "handlers": ["console"], "propagate": False}, + "reader.py": {"level": "NOTSET", "handlers": ["console"], "propagate": False}, + "construct_dpdb_visu.py": { + "level": "NOTSET", + "handlers": ["console"], + "propagate": False, + }, + }, + "root": {"level": "WARNING", "handlers": ["console"]}, +} + +_T = TypeVar("_T") def flatten(iterable: Iterable[Iterable[_T]]) -> Iterator[_T]: - """ Flatten at first level. + """Flatten at first level. Turn ex=[[1,2],[3,4]] into [1, 2, 3, 4] @@ -94,8 +95,9 @@ def flatten(iterable: Iterable[Iterable[_T]]) -> Iterator[_T]: return chain.from_iterable(iterable) -def read_yml_or_cfg(file: Union[str, Path], prefer_cfg: bool = False, - cfg_ext=CFG_EXT) -> Any: +def read_yml_or_cfg( + file: Union[str, Path], prefer_cfg: bool = False, cfg_ext=CFG_EXT +) -> Any: """ Read the file and return its content as a python object. @@ -115,8 +117,10 @@ def read_yml_or_cfg(file: Union[str, Path], prefer_cfg: bool = False, But maybe just a list or a single object. """ - err_str = ("utilities.read_yml_or_cfg encountered '{}' while " - "reading config from '{}' and prefer_cfg={}") + err_str = ( + "utilities.read_yml_or_cfg encountered '{}' while " + "reading config from '{}' and prefer_cfg={}" + ) file = Path(file) if not file.exists(): @@ -153,8 +157,9 @@ def read_yml_or_cfg(file: Union[str, Path], prefer_cfg: bool = False, return dict() -def logging_cfg(filename: str, prefer_cfg: bool = False, - loglevel: Union[None, int, str] = None) -> None: +def logging_cfg( + filename: str, prefer_cfg: bool = False, loglevel: Union[None, int, str] = None +) -> None: """Configure logging for this module""" logging.basicConfig() read_err = "could not read configuration from '%s'" @@ -168,7 +173,7 @@ def logging_cfg(filename: str, prefer_cfg: bool = False, except ValueError: loglevel = loglevel.upper() - if prefer_cfg or file.suffix.lower() in CFG_EXT: # .config + if prefer_cfg or file.suffix.lower() in CFG_EXT: # .config try: logging.config.fileConfig(file, defaults=DEFAULT_LOGGING_CFG) if loglevel is not None: @@ -181,7 +186,7 @@ def logging_cfg(filename: str, prefer_cfg: bool = False, LOGGER.error(read_err, file.resolve(), exc_info=True) except ValueError: LOGGER.error(config_err, file.resolve(), exc_info=True) - try: # dict + try: # dict file_content = read_yml_or_cfg(file, prefer_cfg=prefer_cfg) logging.config.dictConfig(file_content) if loglevel is not None: @@ -196,8 +201,7 @@ def logging_cfg(filename: str, prefer_cfg: bool = False, LOGGER.error(config_err, file.resolve(), exc_info=True) -def convert_to_adj( - edgelist: Iterable[Tuple[int, int]], directed: bool = False) -> dict: +def convert_to_adj(edgelist: Iterable[Tuple[int, int]], directed: bool = False) -> dict: """ Helper function to convert the edgelist into the adj-format from NetworkX. @@ -218,7 +222,7 @@ def convert_to_adj( https://networkx.github.io/documentation/networkx-2.1/_modules/networkx/classes/graph.html """ adj = dict() - for (source, target) in edgelist: + for source, target in edgelist: if source not in adj: adj[source] = {} adj[source][target] = {} @@ -230,11 +234,7 @@ def convert_to_adj( return adj -def add_edge_to( - edges: set, - adjacency_dict: dict, - vertex1: Any, - vertex2: Any) -> None: +def add_edge_to(edges: set, adjacency_dict: dict, vertex1: Any, vertex2: Any) -> None: """ Adding (undirected) edge from 'vertex1' to 'vertex2' to the edges and adjacency-list. @@ -293,17 +293,14 @@ def gen_arg(arg_or_iter: Any) -> Generator: yield item -def base_style( - graph, - node: str, - color: str = 'white', - penwidth: float = 1.0) -> None: +def base_style(graph, node: str, color: str = "white", penwidth: float = 1.0) -> None: """Style the node with default fillcolor and penwidth.""" graph.node(node, fillcolor=color, penwidth=str(penwidth)) -def emphasise_node(graph, node: str, color: str = 'yellow', - penwidth: float = 2.5) -> None: +def emphasise_node( + graph, node: str, color: str = "yellow", penwidth: float = 2.5 +) -> None: """Emphasise node with a different fillcolor (default:'yellow') and penwidth (default:2.5). """ @@ -315,22 +312,23 @@ def emphasise_node(graph, node: str, color: str = 'yellow', def style_hide_node(graph, node: str) -> None: """Make the node invisible during drawing.""" - graph.node(node, style='invis') + graph.node(node, style="invis") def style_hide_edge(graph, source: str, target: str) -> None: """Make the edge source->target invisible during drawing.""" - graph.edge(source, target, style='invis') + graph.edge(source, target, style="invis") def bag_node( - head, - tail, - anchor: str = 'anchor', - headcolor: str = 'white', - tableborder: int = 0, - cellborder: int = 0, - cellspacing: int = 0) -> str: + head, + tail, + anchor: str = "anchor", + headcolor: str = "white", + tableborder: int = 0, + cellborder: int = 0, + cellspacing: int = 0, +) -> str: """HTML format with 'head' as the first label, then appending further labels. @@ -356,13 +354,14 @@ def bag_node( def solution_node( - solution_table: Iterable[List[str]], - toplabel: str = '', - bottomlabel: str = '', - transpose: bool = False, - linesmax: int = 1000, - columnsmax: int = 50, - fillstr: str = '...') -> str: + solution_table: Iterable[List[str]], + toplabel: str = "", + bottomlabel: str = "", + transpose: bool = False, + linesmax: int = 1000, + columnsmax: int = 50, + fillstr: str = "...", +) -> str: """Fill the node from the 2D-matrix 'solution_table' COLUMNBASED!. Optionally add a line above and/or below the table for labels. The size of the result can be limited by using linesmax and columnsmax. @@ -398,62 +397,64 @@ def solution_node( | botlabel | |----------| """ - result = '' + result = "" if toplabel: - result += toplabel + '|' + result += toplabel + "|" if len(solution_table) == 0: - result += 'empty' + result += "empty" else: if transpose: solution_table = list(zip(*solution_table)) # limit lines backwards from length of column - vslice = (min(-1, linesmax - len(solution_table[0])) - if linesmax > 0 else -1) + vslice = min(-1, linesmax - len(solution_table[0])) if linesmax > 0 else -1 # limit columns forwards minus one - hslice = (min(len(solution_table), columnsmax) - if columnsmax > 0 else len(solution_table)) - 1 + hslice = ( + min(len(solution_table), columnsmax) + if columnsmax > 0 + else len(solution_table) + ) - 1 - result += '{' # insert table + result += "{" # insert table for column in solution_table[:hslice]: - result += '{' # start column + result += "{" # start column for row in column[:vslice]: - result += str(row) + '|' - if vslice < -1: # add one indicator of shortening - result += fillstr + '|' + result += str(row) + "|" + if vslice < -1: # add one indicator of shortening + result += fillstr + "|" for row in column[-1:]: result += str(row) - result += '}|' # sep. between columns + result += "}|" # sep. between columns # adding one column-skipping indicator if hslice < len(solution_table) - 1: - result += '{' # start column + result += "{" # start column for row in column[:vslice]: - result += fillstr + '|' - if vslice < -1: # add one indicator of shortening - result += fillstr + '|' + result += fillstr + "|" + if vslice < -1: # add one indicator of shortening + result += fillstr + "|" for row in column[-1:]: result += fillstr - result += '}|' # sep. between columns + result += "}|" # sep. between columns # last column (usually a summary of the previous cols) for column in solution_table[-1:]: - result += '{' # start column + result += "{" # start column for row in column[:vslice]: - result += str(row) + '|' - if vslice < -1: # add one indicator of shortening - result += fillstr + '|' + result += str(row) + "|" + if vslice < -1: # add one indicator of shortening + result += fillstr + "|" for row in column[-1:]: result += str(row) - result += '}' # sep. between columns - result += '}' # close table + result += "}" # sep. between columns + result += "}" # close table if bottomlabel: - result += '|' + bottomlabel + result += "|" + bottomlabel - return '{' + result + '}' + return "{" + result + "}" -def get_parser(extra_desc: str = '') -> argparse.ArgumentParser: +def get_parser(extra_desc: str = "") -> argparse.ArgumentParser: """ Prepare an argument parser for TDVisu scripts. @@ -475,11 +476,16 @@ def get_parser(extra_desc: str = '') -> argparse.ArgumentParser: This is free software, and you are welcome to redistribute it under certain conditions; see COPYING for more information. """ - + "\n" + extra_desc, + + "\n" + + extra_desc, epilog=LOGLEVEL_EPILOG, - formatter_class=argparse.RawDescriptionHelpFormatter) - - parser.add_argument('--version', action='version', - version='%(prog)s ' + __version__ + ', ' + __date__) - parser.add_argument('--loglevel', help="set the minimal loglevel for root") + formatter_class=argparse.RawDescriptionHelpFormatter, + ) + + parser.add_argument( + "--version", + action="version", + version="%(prog)s " + __version__ + ", " + __date__, + ) + parser.add_argument("--loglevel", help="set the minimal loglevel for root") return parser diff --git a/tdvisu/version.py b/tdvisu/version.py index f32f803..e0c58f1 100644 --- a/tdvisu/version.py +++ b/tdvisu/version.py @@ -4,7 +4,7 @@ """ # Base version. -__version__ = '1.1.9' +__version__ = "1.1.9" # Year-Month-Day of last version change -__date__ = '2023-07-27' +__date__ = "2023-07-27" diff --git a/tdvisu/visualization.py b/tdvisu/visualization.py index e2409a1..c0f3e1f 100644 --- a/tdvisu/visualization.py +++ b/tdvisu/visualization.py @@ -39,15 +39,27 @@ from graphviz import Digraph, Graph from tdvisu.svgjoin import svg_join -from tdvisu.utilities import (bag_node, base_style, emphasise_node, flatten, - get_parser, logging_cfg, solution_node, - style_hide_edge, style_hide_node) -from tdvisu.visualization_data import (GeneralGraphData, IncidenceGraphData, - SvgJoinData, VisualizationData) - -LOGGER = logging.getLogger('visualization.py') - -StrOrIo = NewType('StrOrIo', Union[str, io.TextIOWrapper]) +from tdvisu.utilities import ( + bag_node, + base_style, + emphasise_node, + flatten, + get_parser, + logging_cfg, + solution_node, + style_hide_edge, + style_hide_node, +) +from tdvisu.visualization_data import ( + GeneralGraphData, + IncidenceGraphData, + SvgJoinData, + VisualizationData, +) + +LOGGER = logging.getLogger("visualization.py") + +StrOrIo = NewType("StrOrIo", Union[str, io.TextIOWrapper]) def read_json(json_data: StrOrIo) -> dict: @@ -97,9 +109,9 @@ def inspect_json(self, infile: StrOrIo) -> VisualizationData: LOGGER.debug("Found keys: %s", visudata.keys()) try: - _incid = visudata.get('incidenceGraph', None) - _general_graph = visudata.get('generalGraph', None) - _svg_join = visudata.get('svgjoin', None) + _incid = visudata.get("incidenceGraph", None) + _general_graph = visudata.get("generalGraph", None) + _svg_join = visudata.get("svgjoin", None) incid_data: List[IncidenceGraphData] = list() if _incid: @@ -108,10 +120,9 @@ def inspect_json(self, infile: StrOrIo) -> VisualizationData: # unwrap as list: for data in _incid: # add object to incid_data - data['edges'] = [[x['id'], x['list']] - for x in data['edges']] + data["edges"] = [[x["id"], x["list"]] for x in data["edges"]] incid_data += [IncidenceGraphData(**data)] - visudata.pop('incidenceGraph', None) + visudata.pop("incidenceGraph", None) general_graph_data: List[GeneralGraphData] = list() if _general_graph: @@ -119,36 +130,39 @@ def inspect_json(self, infile: StrOrIo) -> VisualizationData: _general_graph = [_general_graph] for data in _general_graph: general_graph_data += [GeneralGraphData(**data)] - visudata.pop('generalGraph', None) + visudata.pop("generalGraph", None) svg_join_data: Optional[SvgJoinData] = None if _svg_join: svg_join_data = SvgJoinData(**_svg_join) - if 'svgjoin' in visudata: - visudata.pop('svgjoin') - - self.timeline = visudata['tdTimeline'] - visudata.pop('tdTimeline') - self.tree_dec = visudata['treeDecJson'] - self.bagpre = self.tree_dec['bagpre'] - self.joinpre = self.tree_dec.get('joinpre', 'Join %d~%d') - self.solpre = self.tree_dec.get('solpre', 'sol%d') - self.soljoinpre = self.tree_dec.get('soljoinpre', 'solJoin%d~%d') - visudata.pop('treeDecJson') + if "svgjoin" in visudata: + visudata.pop("svgjoin") + + self.timeline = visudata["tdTimeline"] + visudata.pop("tdTimeline") + self.tree_dec = visudata["treeDecJson"] + self.bagpre = self.tree_dec["bagpre"] + self.joinpre = self.tree_dec.get("joinpre", "Join %d~%d") + self.solpre = self.tree_dec.get("solpre", "sol%d") + self.soljoinpre = self.tree_dec.get("soljoinpre", "solJoin%d~%d") + visudata.pop("treeDecJson") except KeyError as err: raise KeyError(f"Key {err} not found in the input Json.") - return VisualizationData(incidence_graphs=incid_data, - general_graphs=general_graph_data, - svg_join=svg_join_data, - **visudata) + return VisualizationData( + incidence_graphs=incid_data, + general_graphs=general_graph_data, + svg_join=svg_join_data, + **visudata, + ) def setup_tree_dec_graph( - self, - rankdir: str = 'BT', - shape: str = 'box', - fillcolor: str = 'white', - style: str = 'rounded,filled', - margin: str = '0.11,0.01') -> None: + self, + rankdir: str = "BT", + shape: str = "box", + fillcolor: str = "white", + style: str = "rounded,filled", + margin: str = "0.11,0.01", + ) -> None: """Create self.tree_dec_digraph strict means not a multigraph - equal edges get merged. @@ -156,74 +170,89 @@ def setup_tree_dec_graph( - normally Bottom-Top or Top-Bottom. """ self.tree_dec_digraph = Digraph( - 'Tree-Decomposition', strict=True, - graph_attr={'rankdir': rankdir}, + "Tree-Decomposition", + strict=True, + graph_attr={"rankdir": rankdir}, node_attr={ - 'shape': shape, - 'fillcolor': fillcolor, - 'style': style, - 'margin': margin}) + "shape": shape, + "fillcolor": fillcolor, + "style": style, + "margin": margin, + }, + ) def basic_tdg(self) -> None: """Create basic bag structure in tree_dec_digraph.""" - for item in self.tree_dec['labeldict']: - bagname = self.bagpre % str(item['id']) - self.tree_dec_digraph.node(bagname, - bag_node(bagname, item['labels'])) - - self.tree_dec_digraph.edges([(self.bagpre % str(first), self.bagpre % str( - second)) for (first, second) in self.tree_dec['edgearray']]) - - def forward_iterate_tdg( - self, - joinpre: str, - solpre: str, - soljoinpre: str) -> None: + for item in self.tree_dec["labeldict"]: + bagname = self.bagpre % str(item["id"]) + self.tree_dec_digraph.node(bagname, bag_node(bagname, item["labels"])) + + self.tree_dec_digraph.edges( + [ + (self.bagpre % str(first), self.bagpre % str(second)) + for (first, second) in self.tree_dec["edgearray"] + ] + ) + + def forward_iterate_tdg(self, joinpre: str, solpre: str, soljoinpre: str) -> None: """Create the final positions of all nodes with solutions.""" - tdg = self.tree_dec_digraph # shorten name + tdg = self.tree_dec_digraph # shorten name - for i, node in enumerate(self.timeline): # Create the positions + for i, node in enumerate(self.timeline): # Create the positions if len(node) > 1: # solution to be displayed id_inv_bags = node[0] if isinstance(id_inv_bags, int): last_sol = solpre % id_inv_bags - tdg.node(last_sol, solution_node( - *(node[1])), shape='record') + tdg.node(last_sol, solution_node(*(node[1])), shape="record") tdg.edge(self.bagpre % id_inv_bags, last_sol) - else: # joined node with 2 bags - suc = self.timeline[i + 1][0] # get the joined bags + else: # joined node with 2 bags + suc = self.timeline[i + 1][0] # get the joined bags - LOGGER.debug('joining %s to %s ', node[0], suc) + LOGGER.debug("joining %s to %s ", node[0], suc) id_inv_bags = tuple(id_inv_bags) last_sol = soljoinpre % id_inv_bags - tdg.node(last_sol, solution_node( - *(node[1])), shape='record') + tdg.node(last_sol, solution_node(*(node[1])), shape="record") tdg.edge(joinpre % id_inv_bags, last_sol) # edges for child in id_inv_bags: # basically "remove" current # TODO check where 2 args are possibly occuring tdg.edge( - self.bagpre % child - if isinstance(child, int) else joinpre % child, - self.bagpre % suc - if isinstance(suc, int) else joinpre % suc, - style='invis', - constraint='false') - tdg.edge(self.bagpre % child if isinstance(child, int) - else joinpre % child, - joinpre % id_inv_bags) - tdg.edge(joinpre % id_inv_bags, self.bagpre % suc - if isinstance(suc, int) else joinpre % suc) - - def backwards_iterate_tdg(self, joinpre: str, solpre: str, soljoinpre: str, - view: bool = False) -> None: + ( + self.bagpre % child + if isinstance(child, int) + else joinpre % child + ), + ( + self.bagpre % suc + if isinstance(suc, int) + else joinpre % suc + ), + style="invis", + constraint="false", + ) + tdg.edge( + ( + self.bagpre % child + if isinstance(child, int) + else joinpre % child + ), + joinpre % id_inv_bags, + ) + tdg.edge( + joinpre % id_inv_bags, + self.bagpre % suc if isinstance(suc, int) else joinpre % suc, + ) + + def backwards_iterate_tdg( + self, joinpre: str, solpre: str, soljoinpre: str, view: bool = False + ) -> None: """Cut the single steps back and update emphasis acordingly.""" - tdg = self.tree_dec_digraph # shorten name + tdg = self.tree_dec_digraph # shorten name last_sol = "" _filename = self.outfolder / self.data.td_file for i, node in enumerate(reversed(self.timeline)): @@ -234,11 +263,10 @@ def backwards_iterate_tdg(self, joinpre: str, solpre: str, soljoinpre: str, # Delete previous emphasis prevhead = self.timeline[len(self.timeline) - i][0] bag = ( - self.bagpre % - prevhead if isinstance( - prevhead, - int) else joinpre % - tuple(prevhead)) + self.bagpre % prevhead + if isinstance(prevhead, int) + else joinpre % tuple(prevhead) + ) base_style(tdg, bag) if last_sol: style_hide_node(tdg, last_sol) @@ -256,36 +284,39 @@ def backwards_iterate_tdg(self, joinpre: str, solpre: str, soljoinpre: str, last_sol = soljoinpre % id_inv_bags emphasise_node(tdg, last_sol) - emphasise_node(tdg, - self.bagpre % - id_inv_bags if isinstance( - id_inv_bags, - int) else joinpre % - id_inv_bags) + emphasise_node( + tdg, + ( + self.bagpre % id_inv_bags + if isinstance(id_inv_bags, int) + else joinpre % id_inv_bags + ), + ) tdg.render( - view=view, format='svg', - filename=str(_filename) + str(len(self.timeline) - i) + view=view, + format="svg", + filename=str(_filename) + str(len(self.timeline) - i), ) def tree_dec_timeline(self, view: bool = False) -> None: """Main-method for handling all construction of the timeline.""" self.setup_tree_dec_graph( - rankdir=self.data.orientation, - fillcolor=self.data.bagcolor) + rankdir=self.data.orientation, fillcolor=self.data.bagcolor + ) self.basic_tdg() # Iterate labeldict self.forward_iterate_tdg( - joinpre=self.joinpre, - solpre=self.solpre, - soljoinpre=self.soljoinpre) + joinpre=self.joinpre, solpre=self.solpre, soljoinpre=self.soljoinpre + ) self.backwards_iterate_tdg( view=view, joinpre=self.joinpre, solpre=self.solpre, - soljoinpre=self.soljoinpre) + soljoinpre=self.soljoinpre, + ) # Prepare supporting graph timeline @@ -296,8 +327,13 @@ def tree_dec_timeline(self, view: bool = False) -> None: elif isinstance(step[0], int): _timeline.append( next( - (item.get('items') for item in self.tree_dec['labeldict'] - if item['id'] == step[0]))) + ( + item.get("items") + for item in self.tree_dec["labeldict"] + if item["id"] == step[0] + ) + ) + ) else: # Join operation - no clauses involved in computation _timeline.append(None) @@ -305,15 +341,15 @@ def tree_dec_timeline(self, view: bool = False) -> None: if self.data.incidence_graphs: for incidence_data in self.data.incidence_graphs: self.prepare_incidence(incidence_data, _timeline, view) - LOGGER.info("Created incidence-graph for file='%s'", - incidence_data.inc_file) + LOGGER.info( + "Created incidence-graph for file='%s'", incidence_data.inc_file + ) if self.data.general_graphs: for graph_data in self.data.general_graphs: - self.general_graph(timeline=_timeline, view=view, - **asdict(graph_data)) + self.general_graph(timeline=_timeline, view=view, **asdict(graph_data)) LOGGER.info( - "Created general-graph for file='%s'", - graph_data.file_basename) + "Created general-graph for file='%s'", graph_data.file_basename + ) if self.data.svg_join: self.call_svgjoin() @@ -321,18 +357,21 @@ def prepare_incidence(self, incid, _timeline, view): """Prepare incidence construction.""" if incid.infer_primal or incid.infer_dual: # prepare incid edges with abs: - abs_clauses = [[cl[0], list(map(abs, cl[1]))] - for cl in incid.edges] + abs_clauses = [[cl[0], list(map(abs, cl[1]))] for cl in incid.edges] if incid.infer_primal: # vertex for each variable + edge if the variables # occur in the same clause: - primal_edges = set(flatten( # remove duplicates - [itertools.combinations(cl[1], 2) - for cl in abs_clauses])) + primal_edges = set( + flatten( # remove duplicates + [itertools.combinations(cl[1], 2) for cl in abs_clauses] + ) + ) # check if any node is really isolated: - isolated = [cl[1][0] for cl in abs_clauses - if len(cl[1]) == 1 and - not any(cl[1][0] in sl for sl in primal_edges)] + isolated = [ + cl[1][0] + for cl in abs_clauses + if len(cl[1]) == 1 and not any(cl[1][0] in sl for sl in primal_edges) + ] self.general_graph( timeline=_timeline, @@ -340,18 +379,22 @@ def prepare_incidence(self, incid, _timeline, view): extra_nodes=set(isolated), graph_name=incid.primal_file, file_basename=incid.primal_file, - var_name=incid.var_name_two) + var_name=incid.var_name_two, + ) LOGGER.info("Created infered primal-graph") if incid.infer_dual: # Edge, if clauses share the same variable - dual_edges = [(cl[0], other[0]) - for i, cl in enumerate(abs_clauses) - for other in abs_clauses[i + 1:] # no multiples - if any(var in cl[1] for var in other[1])] + dual_edges = [ + (cl[0], other[0]) + for i, cl in enumerate(abs_clauses) + for other in abs_clauses[i + 1 :] # no multiples + if any(var in cl[1] for var in other[1]) + ] # check if any clause is isolated: - isolated = [cl[0] for cl in abs_clauses - if not any(cl[0] in sl for sl in dual_edges)] + isolated = [ + cl[0] for cl in abs_clauses if not any(cl[0] in sl for sl in dual_edges) + ] self.general_graph( timeline=_timeline, @@ -359,41 +402,45 @@ def prepare_incidence(self, incid, _timeline, view): extra_nodes=set(isolated), graph_name=incid.dual_file, file_basename=incid.dual_file, - var_name=incid.var_name_one) + var_name=incid.var_name_one, + ) LOGGER.info("Created infered dual-graph") self.incidence( edges=incid.edges, timeline=_timeline, inc_file=incid.inc_file, - num_vars=self.tree_dec['num_vars'], - colors=self.data.colors, view=view, + num_vars=self.tree_dec["num_vars"], + colors=self.data.colors, + view=view, fontsize=incid.fontsize, penwidth=incid.penwidth, basefill=self.data.bagcolor, var_name_one=incid.var_name_one, var_name_two=incid.var_name_two, - column_distance=incid.column_distance) + column_distance=incid.column_distance, + ) def general_graph( - self, - timeline: Iterable[Optional[List[int]]], - edges: Iterable[Iterable[int]], - extra_nodes: Iterable[int] = tuple(), - view: bool = False, - fontsize: int = 20, - fontcolor: str = 'black', - penwidth: float = 2.2, - first_color: str = 'yellow', - first_style: str = 'filled', - second_color: str = 'green', - second_style: str = 'dotted,filled', - third_color: str = 'red', - graph_name: str = 'graph', - file_basename: str = 'graph', - do_sort_nodes: bool = True, - do_adj_nodes: bool = True, - var_name: str = '') -> None: + self, + timeline: Iterable[Optional[List[int]]], + edges: Iterable[Iterable[int]], + extra_nodes: Iterable[int] = tuple(), + view: bool = False, + fontsize: int = 20, + fontcolor: str = "black", + penwidth: float = 2.2, + first_color: str = "yellow", + first_style: str = "filled", + second_color: str = "green", + second_style: str = "dotted,filled", + third_color: str = "red", + graph_name: str = "graph", + file_basename: str = "graph", + do_sort_nodes: bool = True, + do_adj_nodes: bool = True, + var_name: str = "", + ) -> None: """ Creates one graph emphasized for the given timeline. @@ -423,113 +470,117 @@ def general_graph( """ _filename = self.outfolder / file_basename LOGGER.info("Generating general-graph for '%s'", file_basename) - vartag_n: str = var_name + '%d' + vartag_n: str = var_name + "%d" # sfdp http://yifanhu.net/SOFTWARE/SFDP/index.html - default_engine = 'sfdp' + default_engine = "sfdp" graph = Graph( graph_name, strict=True, engine=default_engine, graph_attr={ - 'fontsize': str(fontsize), - 'overlap': 'false', - 'outputorder': 'edgesfirst', - 'K': '2'}, + "fontsize": str(fontsize), + "overlap": "false", + "outputorder": "edgesfirst", + "K": "2", + }, node_attr={ - 'fontcolor': str(fontcolor), - 'penwidth': str(penwidth), - 'style': 'filled', - 'fillcolor': 'white'}) + "fontcolor": str(fontcolor), + "penwidth": str(penwidth), + "style": "filled", + "fillcolor": "white", + }, + ) if do_sort_nodes: bodybaselen = len(graph.body) # 1: layout with circo - graph.engine = 'circo' + graph.engine = "circo" # 2: nodes in edges+extra_nodes make a circle - nodes = sorted([vartag_n % n for n in set( - itertools.chain(flatten(edges), extra_nodes))], - key=lambda x: (len(x), x)) + nodes = sorted( + [ + vartag_n % n + for n in set(itertools.chain(flatten(edges), extra_nodes)) + ], + key=lambda x: (len(x), x), + ) for i, node in enumerate(nodes): graph.edge(str(nodes[i - 1]), str(node)) # 3: reads in bytes! - code_lines = graph.pipe('plain').splitlines() + code_lines = graph.pipe("plain").splitlines() # 4: save the (sorted) positions - assert code_lines[0].startswith(b'graph') - node_positions = [line.split()[1:4] for line in code_lines[1:] - if line.startswith(b'node')] + assert code_lines[0].startswith(b"graph") + node_positions = [ + line.split()[1:4] for line in code_lines[1:] if line.startswith(b"node") + ] # 5: cut layout graph.body = graph.body[:bodybaselen] for line in node_positions: - graph.node(line[0].decode(), - pos='%f,%f!' % (float(line[1]), float(line[2]))) + graph.node( + line[0].decode(), pos="%f,%f!" % (float(line[1]), float(line[2])) + ) # 6: Engine uses previous positions - graph.engine = 'neato' + graph.engine = "neato" - for (src, tar) in edges: + for src, tar in edges: graph.edge(vartag_n % src, vartag_n % tar) for nodeid in extra_nodes: graph.node(vartag_n % nodeid) bodybaselen = len(graph.body) - for i, variables in enumerate(timeline, start=1): # all timesteps + for i, variables in enumerate(timeline, start=1): # all timesteps # reset highlighting graph.body = graph.body[:bodybaselen] if variables is None: - graph.render( - view=view, - format='svg', - filename=str(_filename) + str(i)) + graph.render(view=view, format="svg", filename=str(_filename) + str(i)) continue for var in variables: - graph.node( - vartag_n % var, - fillcolor=first_color, - style=first_style) + graph.node(vartag_n % var, fillcolor=first_color, style=first_style) # highlight edges between variables - for (s, t) in edges: + for s, t in edges: if s in variables and t in variables: graph.edge( vartag_n % s, vartag_n % t, color=third_color, - penwidth=str(penwidth)) + penwidth=str(penwidth), + ) if do_adj_nodes: # set.difference accepts list as argument, "-" does not. edges = [set(edge) for edge in edges] adjacent = { - edge.difference(variables).pop() for edge in edges if len( - edge.difference(variables)) == 1} + edge.difference(variables).pop() + for edge in edges + if len(edge.difference(variables)) == 1 + } for var in adjacent: - graph.node(vartag_n % var, - color=second_color, - style=second_style) + graph.node(vartag_n % var, color=second_color, style=second_style) - graph.render(view=view, format='svg', - filename=str(_filename) + str(i)) + graph.render(view=view, format="svg", filename=str(_filename) + str(i)) def incidence( - self, - timeline: Iterable[Optional[List[int]]], - num_vars: int, - colors: List, - edges: List, - inc_file: str = 'IncidenceGraphStep', - view: bool = False, - fontsize: Union[str, int] = 16, - penwidth: float = 2.2, - basefill: str = 'white', - sndshape: str = 'diamond', - neg_tail: str = 'odot', - var_name_one: str = '', - var_name_two: str = '', - column_distance: float = 0.5) -> None: + self, + timeline: Iterable[Optional[List[int]]], + num_vars: int, + colors: List, + edges: List, + inc_file: str = "IncidenceGraphStep", + view: bool = False, + fontsize: Union[str, int] = 16, + penwidth: float = 2.2, + basefill: str = "white", + sndshape: str = "diamond", + neg_tail: str = "odot", + var_name_one: str = "", + var_name_two: str = "", + column_distance: float = 0.5, + ) -> None: """ Creates the incidence graph emphasized for the given timeline. @@ -566,125 +617,135 @@ def incidence( """ _filename = self.outfolder / inc_file - clausetag_n = var_name_one + '%d' - vartag_n = var_name_two + '%d' + clausetag_n = var_name_one + "%d" + vartag_n = var_name_two + "%d" g_incid = Graph( inc_file, strict=True, graph_attr={ - 'splines': 'false', - 'ranksep': '0.2', - 'nodesep': str(float(column_distance)), - 'fontsize': str( - int(fontsize)), - 'compound': 'true'}, + "splines": "false", + "ranksep": "0.2", + "nodesep": str(float(column_distance)), + "fontsize": str(int(fontsize)), + "compound": "true", + }, edge_attr={ - 'penwidth': str(float(penwidth)), - 'dir': 'back', - 'arrowtail': 'none'}) - with g_incid.subgraph(name='cluster_clause', - edge_attr={'style': 'invis'}, - node_attr={'style': 'rounded,filled', - 'fillcolor': basefill}) as clauses: - clauses.attr(label='clauses') - clauses.edges([(clausetag_n % (i + 1), clausetag_n % (i + 2)) - for i in range(len(edges) - 1)]) - - g_incid.attr('node', shape=sndshape, - penwidth=str(float(penwidth)), - style='dotted') - with g_incid.subgraph(name='cluster_ivar', - edge_attr={'style': 'invis'}) as ivars: - ivars.attr(label='variables') - ivars.edges([(vartag_n % (i + 1), vartag_n % (i + 2)) - for i in range(num_vars - 1)]) + "penwidth": str(float(penwidth)), + "dir": "back", + "arrowtail": "none", + }, + ) + with g_incid.subgraph( + name="cluster_clause", + edge_attr={"style": "invis"}, + node_attr={"style": "rounded,filled", "fillcolor": basefill}, + ) as clauses: + clauses.attr(label="clauses") + clauses.edges( + [ + (clausetag_n % (i + 1), clausetag_n % (i + 2)) + for i in range(len(edges) - 1) + ] + ) + + g_incid.attr( + "node", shape=sndshape, penwidth=str(float(penwidth)), style="dotted" + ) + with g_incid.subgraph( + name="cluster_ivar", edge_attr={"style": "invis"} + ) as ivars: + ivars.attr(label="variables") + ivars.edges( + [(vartag_n % (i + 1), vartag_n % (i + 2)) for i in range(num_vars - 1)] + ) for i in range(num_vars): - g_incid.node(vartag_n % - (i + 1), vartag_n % - (i + 1), color=colors[(i + 1) % - len(colors)]) + g_incid.node( + vartag_n % (i + 1), + vartag_n % (i + 1), + color=colors[(i + 1) % len(colors)], + ) - g_incid.attr('edge', constraint='false') + g_incid.attr("edge", constraint="false") for clause in edges: for var in clause[1]: if var >= 0: - g_incid.edge(clausetag_n % clause[0], - vartag_n % var, - color=colors[var % len(colors)]) + g_incid.edge( + clausetag_n % clause[0], + vartag_n % var, + color=colors[var % len(colors)], + ) else: - g_incid.edge(clausetag_n % clause[0], - vartag_n % -var, - color=colors[-var % len(colors)], - arrowtail=neg_tail) + g_incid.edge( + clausetag_n % clause[0], + vartag_n % -var, + color=colors[-var % len(colors)], + arrowtail=neg_tail, + ) # make edgelist variable-based (varX, clauseY), ... # var_cl_iter [(1, 1), (4, 1), ... - var_cl_iter = tuple(flatten([[(x, y[0]) for x in y[1]] - for y in edges])) + var_cl_iter = tuple(flatten([[(x, y[0]) for x in y[1]] for y in edges])) bodybaselen = len(g_incid.body) - for i, variables in enumerate(timeline, start=1): # all timesteps + for i, variables in enumerate(timeline, start=1): # all timesteps # reset highlighting g_incid.body = g_incid.body[:bodybaselen] if variables is None: - g_incid.render(view=view, format='svg', - filename=str(_filename) + str(i)) + g_incid.render( + view=view, format="svg", filename=str(_filename) + str(i) + ) continue emp_clause = { - var_cl[1] for var_cl in var_cl_iter if abs( - var_cl[0]) in variables} + var_cl[1] for var_cl in var_cl_iter if abs(var_cl[0]) in variables + } - emp_var = {abs(var_cl[0]) - for var_cl in var_cl_iter if var_cl[1] in emp_clause} + emp_var = { + abs(var_cl[0]) for var_cl in var_cl_iter if var_cl[1] in emp_clause + } for var in emp_var: _vartag = vartag_n % abs(var) - _style = 'solid,filled' if var in variables else 'dotted,filled' - g_incid.node( - _vartag, - _vartag, - style=_style, - fillcolor='yellow') + _style = "solid,filled" if var in variables else "dotted,filled" + g_incid.node(_vartag, _vartag, style=_style, fillcolor="yellow") for clause in emp_clause: g_incid.node( - clausetag_n % clause, - clausetag_n % clause, - fillcolor='yellow') + clausetag_n % clause, clausetag_n % clause, fillcolor="yellow" + ) for edge in var_cl_iter: (var, clause) = edge - _style = 'solid' if clause in emp_clause else 'dotted' + _style = "solid" if clause in emp_clause else "dotted" _vartag = vartag_n % abs(var) if var >= 0: - g_incid.edge(clausetag_n % clause, - _vartag, - color=colors[var % len(colors)], - style=_style) - else: # negated variable - g_incid.edge(clausetag_n % clause, - _vartag, - color=colors[-var % len(colors)], - arrowtail='odot', - style=_style) - - g_incid.render( - view=view, - format='svg', - filename=str(_filename) + str(i)) + g_incid.edge( + clausetag_n % clause, + _vartag, + color=colors[var % len(colors)], + style=_style, + ) + else: # negated variable + g_incid.edge( + clausetag_n % clause, + _vartag, + color=colors[-var % len(colors)], + arrowtail="odot", + style=_style, + ) + + g_incid.render(view=view, format="svg", filename=str(_filename) + str(i)) def call_svgjoin(self) -> None: """Analyzes content in data.svg_join for the call to svg_join.""" sj_data = self.data.svg_join if not sj_data.base_names: - LOGGER.warning( - "svg_join data in JsonAPI contains no file-names to join.") + LOGGER.warning("svg_join data in JsonAPI contains no file-names to join.") return if isinstance(sj_data.base_names, str): sj_data.base_names = [sj_data.base_names] @@ -712,31 +773,30 @@ def main(args: List[str]) -> None: ------- None """ - parser = get_parser( - "Visualizing Dynamic Programming on Tree-Decompositions.") + parser = get_parser("Visualizing Dynamic Programming on Tree-Decompositions.") # possible to use stdin for the file. - parser.add_argument('infile', nargs='?', - type=argparse.FileType('r', encoding='UTF-8'), - default=sys.stdin, - help="Input file for the visualization " - "must conform with the 'JsonAPI.md'") - parser.add_argument('outfolder', - help="Folder to output the visualization results") + parser.add_argument( + "infile", + nargs="?", + type=argparse.FileType("r", encoding="UTF-8"), + default=sys.stdin, + help="Input file for the visualization " "must conform with the 'JsonAPI.md'", + ) + parser.add_argument("outfolder", help="Folder to output the visualization results") # get cmd-arguments options = parser.parse_args(args) - logging_cfg(filename='logging.yml', loglevel=options.loglevel) + logging_cfg(filename="logging.yml", loglevel=options.loglevel) LOGGER.info("Called with '%s'", options) infile = options.infile outfolder = options.outfolder if not outfolder: - outfolder = 'outfolder' + outfolder = "outfolder" outfolder = Path(outfolder).resolve() - LOGGER.info("Will read from '%s' and write to folder '%s'", - infile.name, outfolder) + LOGGER.info("Will read from '%s' and write to folder '%s'", infile.name, outfolder) visu = Visualization(infile=infile, outfolder=outfolder) visu.tree_dec_timeline() diff --git a/tdvisu/visualization_data.py b/tdvisu/visualization_data.py index 439debe..8f2c9ee 100644 --- a/tdvisu/visualization_data.py +++ b/tdvisu/visualization_data.py @@ -28,11 +28,12 @@ @dataclass class SvgJoinData: """Class for holding different parameters to join the results.""" + base_names: Union[str, Iterable[str]] folder: Optional[str] = None - outname: str = 'combined' - suffix: str = '%d.svg' - preserve_aspectratio: str = 'xMinYMin' + outname: str = "combined" + suffix: str = "%d.svg" + preserve_aspectratio: str = "xMinYMin" num_images: int = 1 padding: Union[int, Iterable[int]] = 0 scale2: Union[float, Iterable[float]] = 1.0 @@ -43,38 +44,40 @@ class SvgJoinData: @dataclass class IncidenceGraphData: """Class holding different parameters for the incidence graph.""" + edges: list - subgraph_name_one: str = 'clauses' - subgraph_name_two: str = 'variables' - var_name_one: str = '' - var_name_two: str = '' + subgraph_name_one: str = "clauses" + subgraph_name_two: str = "variables" + var_name_one: str = "" + var_name_two: str = "" infer_primal: bool = False infer_dual: bool = False - primal_file: str = 'PrimalGraphStep' - inc_file: str = 'IncidenceGraphStep' - dual_file: str = 'DualGraphStep' + primal_file: str = "PrimalGraphStep" + inc_file: str = "IncidenceGraphStep" + dual_file: str = "DualGraphStep" fontsize: int = 16 penwidth: float = 2.2 - second_shape: str = 'diamond' + second_shape: str = "diamond" column_distance: float = 0.5 @dataclass class GeneralGraphData: """Class holding different parameters for the general graph.""" + edges: list extra_nodes: Optional[list] = None - graph_name: str = 'graph' - file_basename: str = 'graph' - var_name: str = '' + graph_name: str = "graph" + file_basename: str = "graph" + var_name: str = "" do_sort_nodes: bool = False do_adj_nodes: bool = False fontsize: int = 20 - first_color: str = 'yellow' - first_style: str = 'filled' - second_color: str = 'green' - second_style: str = 'dotted,filled' - third_color: str = 'red' + first_color: str = "yellow" + first_style: str = "filled" + second_color: str = "green" + second_style: str = "dotted,filled" + third_color: str = "red" def __post_init__(self): if self.extra_nodes is None: @@ -84,46 +87,51 @@ def __post_init__(self): @dataclass class VisualizationData: """Class holding different parameters for Visualization.""" + incidence_graphs: Optional[List[IncidenceGraphData]] = None general_graphs: Optional[List[GeneralGraphData]] = None svg_join: Optional[SvgJoinData] = None - td_file: str = 'TDStep' + td_file: str = "TDStep" colors: Optional[list] = None - orientation: str = 'BT' + orientation: str = "BT" linesmax: int = 100 columnsmax: int = 20 - bagcolor: str = 'white' + bagcolor: str = "white" fontsize: int = 20 penwidth: float = 2.2 - fontcolor: str = 'black' + fontcolor: str = "black" emphasis: Optional[dict] = None def __post_init__(self): if self.colors is None: self.colors = [ - '#0073a1', - '#b14923', - '#244320', - '#b1740f', - '#a682ff', - '#004066', - '#0d1321', - '#da1167', - '#604909', - '#0073a1', - '#b14923', - '#244320', - '#b1740f', - '#a682ff'] + "#0073a1", + "#b14923", + "#244320", + "#b1740f", + "#a682ff", + "#004066", + "#0d1321", + "#da1167", + "#604909", + "#0073a1", + "#b14923", + "#244320", + "#b1740f", + "#a682ff", + ] if self.emphasis is None: self.emphasis = dict() # merge input over defaults: - self.emphasis = {**{"firstcolor": 'yellow', - "secondcolor": 'green', - "firststyle": 'filled', - "secondstyle": 'dotted,filled' - }, - **self.emphasis} + self.emphasis = { + **{ + "firstcolor": "yellow", + "secondcolor": "green", + "firststyle": "filled", + "secondstyle": "dotted,filled", + }, + **self.emphasis, + } if __name__ == "__main__": # pragma: no cover diff --git a/test/test_reader.py b/test/test_reader.py index 50bb064..fb7934f 100644 --- a/test/test_reader.py +++ b/test/test_reader.py @@ -21,7 +21,6 @@ """ import logging - from pathlib import Path import pytest @@ -31,7 +30,7 @@ def test_reader_valid_input(): """Create and test the reader on valid input from a file.""" - twfile = Path(__file__).parent / 'grda16.tw' + twfile = Path(__file__).parent / "grda16.tw" # from filename reader = TwReader.from_filename(twfile) _reader_assertions(reader) @@ -51,7 +50,7 @@ def test_reader_valid_input(): def test_reader_commented_body(): """Create and test the reader on valid input from a file with comments in the body.""" - twfile = Path(__file__).parent / 'grda16_comments.tw' + twfile = Path(__file__).parent / "grda16_comments.tw" # from filename reader = TwReader.from_filename(twfile) _reader_assertions(reader) @@ -77,7 +76,7 @@ def test_dimacsreader_has_body(): def test_reader_inval_preamble(caplog): """Test message when a unexpected token in the preamble was encountered.""" - twfile = Path(__file__).parent / 'grda16.tw' + twfile = Path(__file__).parent / "grda16.tw" # from string with open(twfile) as file: content = "invalid preamble\n" + file.read() @@ -86,7 +85,8 @@ def test_reader_inval_preamble(caplog): assert ( "reader.py", logging.WARN, - "Invalid content in preamble at line 0: invalid preamble") in caplog.record_tuples + "Invalid content in preamble at line 0: invalid preamble", + ) in caplog.record_tuples def test_reader_no_type_found(caplog): @@ -94,9 +94,11 @@ def test_reader_no_type_found(caplog): content = "c no preamble\n" with pytest.raises(SystemExit) as pytest_wrapped_e: TwReader.from_string(content) # should raise SystemExit - assert ("reader.py", - logging.ERROR, - "No type found in DIMACS file!") in caplog.record_tuples + assert ( + "reader.py", + logging.ERROR, + "No type found in DIMACS file!", + ) in caplog.record_tuples assert pytest_wrapped_e.type == SystemExit assert pytest_wrapped_e.value.code == 1 @@ -106,26 +108,26 @@ def test_dimacs_wrong_type_found(caplog): content = "p wrong 16 31\n1 2\n2 1\n" with pytest.raises(SystemExit) as pytest_wrapped_e: TwReader.from_string(content) # should raise SystemExit - assert ("reader.py", - logging.ERROR, - "Not a tw file!") in caplog.record_tuples + assert ("reader.py", logging.ERROR, "Not a tw file!") in caplog.record_tuples assert pytest_wrapped_e.type == SystemExit assert pytest_wrapped_e.value.code == 1 def test_dimacs_col_long_line(caplog): """Test message when long line was read in the body.""" - colfile = Path(__file__).parent / 'col_with_long_line.col' + colfile = Path(__file__).parent / "col_with_long_line.col" TwReader.from_filename(colfile) # should raise SystemExit assert ( "reader.py", logging.WARN, - "Expected exactly 2 vertices at line 1, but 3 found") in caplog.record_tuples + "Expected exactly 2 vertices at line 1, but 3 found", + ) in caplog.record_tuples assert ( "reader.py", logging.WARN, - "Expected exactly 2 vertices at line 1, but 3 found") in caplog.record_tuples + "Expected exactly 2 vertices at line 1, but 3 found", + ) in caplog.record_tuples def test_dimacs_fewer_edges(caplog): @@ -135,7 +137,8 @@ def test_dimacs_fewer_edges(caplog): assert ( "reader.py", logging.WARN, - "Number of edges mismatch preamble (2 vs 3)") in caplog.record_tuples + "Number of edges mismatch preamble (2 vs 3)", + ) in caplog.record_tuples assert reader.num_vertices == 3 assert reader.num_edges == 3 assert len(reader.edges) == 2 @@ -146,18 +149,63 @@ def _reader_assertions(reader: TwReader): as well as the number of vertices and number of edges. """ - expected_edges = {(1, 2), (2, 1), (2, 3), (3, 2), (3, 4), (3, 5), - (4, 3), (4, 5), (4, 6), (5, 3), (5, 4), (6, 4), - (6, 7), (6, 15), (7, 6), (7, 8), (7, 14), (8, 7), - (8, 9), (9, 8), (9, 10), (9, 11), (10, 9), - (11, 9), (11, 12), (11, 14), (12, 11), (12, 13), - (12, 14), (13, 12), (14, 7), (14, 11), (14, 12), - (15, 6), (15, 16), (16, 15)} - - expected_adj = {1: {2}, 2: {1, 3}, 3: {2, 4, 5}, 4: {3, 5, 6}, - 5: {3, 4}, 6: {4, 7, 15}, 7: {6, 8, 14}, 8: {7, 9}, - 9: {8, 10, 11}, 10: {9}, 11: {9, 12, 14}, 12: {11, 13, 14}, - 13: {12}, 14: {7, 11, 12}, 15: {6, 16}, 16: {15}} + expected_edges = { + (1, 2), + (2, 1), + (2, 3), + (3, 2), + (3, 4), + (3, 5), + (4, 3), + (4, 5), + (4, 6), + (5, 3), + (5, 4), + (6, 4), + (6, 7), + (6, 15), + (7, 6), + (7, 8), + (7, 14), + (8, 7), + (8, 9), + (9, 8), + (9, 10), + (9, 11), + (10, 9), + (11, 9), + (11, 12), + (11, 14), + (12, 11), + (12, 13), + (12, 14), + (13, 12), + (14, 7), + (14, 11), + (14, 12), + (15, 6), + (15, 16), + (16, 15), + } + + expected_adj = { + 1: {2}, + 2: {1, 3}, + 3: {2, 4, 5}, + 4: {3, 5, 6}, + 5: {3, 4}, + 6: {4, 7, 15}, + 7: {6, 8, 14}, + 8: {7, 9}, + 9: {8, 10, 11}, + 10: {9}, + 11: {9, 12, 14}, + 12: {11, 13, 14}, + 13: {12}, + 14: {7, 11, 12}, + 15: {6, 16}, + 16: {15}, + } assert reader.num_vertices == 16 assert reader.num_edges == 36