From 81d00177651b9fe0447198985426ad32b56124b0 Mon Sep 17 00:00:00 2001 From: unknown <david.hoksza@gmail.com> Date: Fri, 8 Nov 2019 17:46:46 +0100 Subject: [PATCH] fdp as a beautification op --- talent/beautification.py | 25 +++ talent/fdp.py | 319 +++++++++++++++++++++++++++++++++------ talent/graphics.py | 26 +++- talent/layout.py | 1 + 4 files changed, 318 insertions(+), 53 deletions(-) diff --git a/talent/beautification.py b/talent/beautification.py index ca9d6d3..25ad627 100644 --- a/talent/beautification.py +++ b/talent/beautification.py @@ -18,6 +18,7 @@ from . import sbml from . import settings from .grid import Grid from .routing import RoutingGraph +from . import fdp from typing import Dict @@ -36,6 +37,7 @@ class BEAUTY_OPERATION_TYPE(Enum): ALIGN_TO_GRID = 3 HANDLE_REACTION_SPECIES_OVERLAPS = 4 DEDUPLICATE = 5 + ONE_DEGREE_SPECIES = 6 # PUSH_AWAY_SIDE_SPECIES = 5 def get_params(self): @@ -51,6 +53,8 @@ class BEAUTY_OPERATION_TYPE(Enum): return HandleReactionSpeciesOverlapsParams() if self == BEAUTY_OPERATION_TYPE.DEDUPLICATE: return DeduplicateParams() + if self == BEAUTY_OPERATION_TYPE.ONE_DEGREE_SPECIES: + return OneDegreeSpeciesParams() class CompactTrailingPathsParams: @@ -63,6 +67,16 @@ class CompactTrailingPathsParams: self.iteration_count = params["iteration_count"] +class OneDegreeSpeciesParams: + def __init__(self, params=None): + if params is None: + self.iteration_count = 100 + elif isinstance(params, OneDegreeSpeciesParams): + self.iteration_count = params.iteration_count + else: + self.iteration_count = params["iteration_count"] + + class InflateParams: def __init__(self, params=None): if params is None: @@ -159,6 +173,8 @@ class BeautificationOp: handle_reaction_species_overlaps(self.__g) elif self.type == BEAUTY_OPERATION_TYPE.DEDUPLICATE: deduplicate(self.__g, DeduplicateParams(self.__params)) + elif self.type == BEAUTY_OPERATION_TYPE.ONE_DEGREE_SPECIES: + one_degreee_species(self.__g, OneDegreeSpeciesParams(self.__params)) else: raise Exception('Unsupported beautification operation: {}'.format(self.type)) @@ -288,6 +304,15 @@ def beautify(g: nx.MultiGraph): # handle_overlapping_segements(g) # handle_reaction_species_overlaps(g, 0.5*gsh, 0.5*gsh, 1) +def one_degreee_species(g: nx.MultiGraph, params: OneDegreeSpeciesParams=OneDegreeSpeciesParams()): + logging.info("begin FDP") + for r_id in gu.get_all_reaction_ids(g): + for s_id, pos in fdp.springs(g, r_id).items(): + lt.set_predicted_position(g.nodes[s_id], pos) + logging.info("end FDP") + + for r_id in gu.get_all_reaction_ids(g): + gu.get_predicted_layout(g, r_id).recompute_layout() def deduplicate(g: nx.MultiGraph, params: DeduplicateParams=DeduplicateParams()): """ diff --git a/talent/fdp.py b/talent/fdp.py index 5b24c44..998e457 100644 --- a/talent/fdp.py +++ b/talent/fdp.py @@ -1,88 +1,309 @@ import networkx as nx import math +from statistics import median import copy import numpy as np +from scipy import stats from random import random -from typing import Dict, List +from typing import Dict, List, NamedTuple +from sortedcontainers import SortedList +from enum import Enum from talent import graph_utils as gu from talent import bioentity as be from talent import graphics as gr +from talent import settings -def davidson_harel(g: nx.MultiGraph, r_id: str) -> Dict[str, np.ndarray]: +class Configuration: + def __init__(self, bbs:Dict[str, gr.Rectangle]=[], fixed: List[str]=[]): + self.bbs: Dict[str, gr.Rectangle] = bbs + self.fixed: List[str] = fixed - #TODO: optimize structures - used tuple instead of rectangle and dict.copy instead of deep copy - threshold = 10 + def __copy__(self): + return Configuration(bbs=copy.deepcopy(self.bbs), fixed=self.fixed) + + def generate_new(self, shift_radius: float): + conf_new = copy.copy(self) + for rect in [v for k,v in conf_new.bbs.items() if k not in self.fixed]: + shift = [(random() * 2 - 1) * shift_radius, (random() * 2 - 1) * shift_radius] + rect.shift(shift) + return conf_new + + +class EnergyEvaluator: + + def __init__(self, g:nx.MultiGraph, r_id: str, conf:Configuration): + + # the energy function should have the following components: + # - repulsive component - forcing the layout + # - edge length component - forcing optimal lengths + # - crossing component - minimize the number of crossings + # - orthogonal component - forcing horizontal and vertical positions for main reactant and products if these are ODS + + self.components = { + "repulsive": { + "weight": 1, + "func": self.__repulsive_energy + }, + "edge_length": { + "weight": 1, + "func": self.__edge_length_energy + } + } + + self.rs: gu.ReactionSpecies = gu.get_reaction_species(g, r_id) + rl: be.ReactionLayout = gu.get_predicted_layout(g, r_id) + self.mr_id = rl.get_main_reactant_id() + self.mp_id = rl.get_main_product_id() + self.mr_pos = rl.get_main_reactant_pos() + self.mp_pos = rl.get_main_product_pos() + self.r_cp = rl.get_reactants_connection_pos() + self.p_cp = rl.get_products_connection_pos() + self.r_c = rl.get_center() + + r_size = gr.utils.size(self.mr_pos - self.mp_pos) + self.max_radius = max(r_size, settings.render.optimal_species_reaction_dist*2) + + for name, comp in self.components.items(): + energies = [comp["func"](conf.generate_new(self.max_radius)) for i in range(100)] + med = median(energies) + comp["weight"] = 1 / med / len(self.components) #each component has such a weight that the overall median energy equals one + + self.e_median = 1 + + + def evaluate_energy(self, conf: Configuration) -> float: + + energy = 0 + + if len(conf.bbs) <= 1: + return energy + + es = [c["func"](conf) for c in self.components.values()] + print(es) + return sum(es) - rs: gu.ReactionSpecies = gu.get_reaction_species(g, r_id) + def __repulsive_energy(self, conf:Configuration): + # TODO: Maybe only consider distances between reactants, products and modifiers + rects = list(conf.bbs.values()) + energy_repulsive = 0 + for i1 in range(len(rects)): + r1 = rects[i1] + for i2 in range(i1 + 1, len(rects)): + r2 = rects[i2] + energy_repulsive += r1.distance_rect(r2) + return self.components["repulsive"]["weight"] / energy_repulsive + + def __edge_length_energy(self, conf:Configuration): + # Edge lengths component + energy_edge_lengths = 0 + for s_ids, pos in (self.rs.reactantIds, self.mr_pos), (self.rs.productIds, self.mp_pos), (self.rs.modifierIds, self.r_c): + for s_id in s_ids: + dist = gr.utils.dist(conf.bbs[s_id].get_center(), pos) + energy_edge_lengths += dist * dist * dist + return self.components["edge_length"]["weight"] * energy_edge_lengths + + +def davidson_harel(g: nx.MultiGraph, r_id: str) -> Configuration: + + #TODO: optimize structures - used tuple instead of rectangle and dict.copy instead of deep copy + cnt_iterations = 500 s_ids = list(g[r_id]) ods_ids = [id for id in g[r_id] if len(g[id]) == 1] - nonods_ids = list(set(s_ids) - set(ods_ids)) if len(ods_ids) <= 1: assert False + bbs = {id: gu.get_predicted_layout(g, id).get_bb().get() for id in s_ids} + rl = gu.get_predicted_layout(g, r_id) + fixed = set(s_ids) - set(ods_ids) + fixed.add(rl.get_main_reactant_id()) + fixed.add(rl.get_main_product_id()) + conf = Configuration(bbs=bbs, fixed=list(fixed)) + + ee = EnergyEvaluator(g, r_id, conf) - rl:be.ReactionLayout = gu.get_predicted_layout(g, r_id) - mr_id = rl.get_main_reactant_id() - mp_id = rl.get_main_product_id() - r_cp = rl.get_reactants_connection_pos() - p_cp = rl.get_reactants_connection_pos() + temperature = ee.e_median + temperature_step = temperature / cnt_iterations + factor = 1 / 2 * math.exp(1) # at the beginning we will have 50% to accept solutions which are median of the sampled solutions - conf: Dict[str, gr.Rectangle] = {id:gu.get_predicted_layout(g, id).get_bb().get() for id in ods_ids} - rects_fixed: List[gr.Rectangle] = [gu.get_predicted_layout(g, id).get_bb().get() for id in nonods_ids] + max_radius = ee.max_radius + radius_step = max_radius / cnt_iterations + radius = max_radius + e = ee.evaluate_energy(conf) + solutions = SortedList(key=lambda x: x["energy"]) + solutions.add({"energy": e, "solution": conf}) - temperature = 100 - conf_new = copy.deepcopy(conf) - e = evaluate_energy(g=g, conf=conf, rects_fixed=rects_fixed, mr_id=mr_id, mp_id=mp_id, rs=rs) - radius = 240 + i = 0 while temperature > 0: - update_configuration(conf=conf_new, shift_radius=radius) - e_new = evaluate_energy(g=g, conf=conf_new, rects_fixed=rects_fixed, mr_id=mr_id, mp_id=mp_id, rs=rs) + i +=1 + conf_new = conf.generate_new(radius) + # radius -= radius_step + e_new = ee.evaluate_energy(conf=conf_new) + solutions.add({"energy": e, "solution": conf}) - if e_new < e or math.exp(-(e-e_new)/temperature) > threshold: + r = random() + # print(r, math.exp((e-e_new)/temperature), e, e_new) + if e_new < e or math.exp((e-e_new)/temperature) > r: + if (e_new < e): + print("better solution in step", i) + else: + print("accepted worse") e = e_new - conf = copy.deepcopy(conf_new) - else: - conf_new = copy.deepcopy(conf) + conf = conf_new + solutions.add({"energy": e, "solution": conf}) + + temperature -= temperature_step + + print(solutions[0]["energy"]) + return solutions[0]["solution"] + + +class ROLE(Enum): + REACTANT = 0 + MODIFIER = 1 + PRODUCT = 2 + + +class Force: + + def __init__(self, g: nx.MultiGraph, r_id: str, s_ids: List[str], bbs: Dict[str, gr.Rectangle]): + self.s_ids = s_ids + self.bbs = bbs + self.rs: gu.ReactionSpecies = gu.get_reaction_species(g, r_id) + rl: be.ReactionLayout = gu.get_predicted_layout(g, r_id) + self.mr_id = rl.get_main_reactant_id() + self.mp_id = rl.get_main_product_id() + self.mr_pos = rl.get_main_reactant_pos() + self.mp_pos = rl.get_main_product_pos() + self.r_cp = rl.get_reactants_connection_pos() + self.p_cp = rl.get_products_connection_pos() + self.r_c = rl.get_center() + + + self.attractors = { + ROLE.REACTANT: [self.r_cp, self.mr_pos], + ROLE.MODIFIER: [self.r_c], + ROLE.PRODUCT: [self.p_cp, self.mp_pos] + } + + self.repulsors = { + ROLE.REACTANT: [self.r_cp], + ROLE.MODIFIER: [], + ROLE.PRODUCT: [self.p_cp] + } + + self.roles:Dict[str, ROLE] = {} + for ids, role in (self.rs.reactantIds, ROLE.REACTANT), (self.rs.modifierIds, ROLE.MODIFIER), (self.rs.productIds, ROLE.PRODUCT): + for id in ids: + self.roles[id] = role + + def repulsive(self) -> Dict[str, np.ndarray]: + + displacement:Dict[str, np.ndarray] = {s_id:np.array([0.0,0.0]) for s_id in self.bbs} + + dist_min = 20 + + # we iterate overl all s_ids, because we need to account for repulsion for nodes which are fixed, + # however, these nodes are not shifted, so we do not note displacement for them + all_ids = sorted(list(self.bbs)) + for i1, s_id1 in enumerate(all_ids): + bb1 = self.bbs[s_id1] + for i2 in range(i1+1, len(all_ids)): + s_id2 = all_ids[i2] + bb2 = self.bbs[s_id2] + delta = bb2.distance_rect(bb1, two_d=True) + if delta[0] == 0 and delta[1] == 0: + delta = np.array([random()*0.1, random()*0.1]) + delta_size = gr.utils.size(delta) + # if s_id2 == self.mr_id or s_id2 == self.mp_id: + # delta_size /=4 + disp = (dist_min / delta_size) * gr.utils.normalize(delta) + if s_id1 in self.s_ids: + displacement[s_id1] += disp + if s_id2 in self.s_ids: + displacement[s_id2] -= disp + + for pos_attractor in self.repulsors[self.roles[s_id1]]: + delta = self.bbs[s_id1].distance_point(pos_attractor, two_d=True) + if delta[0] == 0 and delta[1] == 0: + delta = np.array([random()*0.1, random()*0.1]) + delta_size = gr.utils.size(delta) + displacement[s_id1] -= (dist_min / delta_size) * gr.utils.normalize(delta) + + return displacement + + + def attractive(self) -> Dict[str, np.ndarray]: + + dist_opt = settings.render.optimal_species_reaction_dist / 2 + + displacement: Dict[str, np.ndarray] = {s_id: np.array([0.0, 0.0]) for s_id in self.bbs} + + for s_id in self.s_ids: + for pos_attractor in self.attractors[self.roles[s_id]]: + delta = self.bbs[s_id].distance_point(pos_attractor, two_d=True) + displacement[s_id] += gr.utils.normalize(delta) * (gr.utils.size(delta) - dist_opt) / dist_opt # we deduct dist_opt because we don't wont the nodes to get to the attractors + return displacement + + +def springs(g: nx.MultiGraph, r_id: str) -> Dict[str, np.ndarray]: + + #TODO: optimize structures - used tuple instead of rectangle and dict.copy instead of deep copy + cnt_iterations = 500 + + s_ids = list(g[r_id]) + ods_ids = [id for id in g[r_id] if len(g[id]) == 1] + + if len(ods_ids) == 0: + return {} + + bbs = {id: gu.get_predicted_layout(g, id).get_bb().get() for id in s_ids} + rl = gu.get_predicted_layout(g, r_id) + fixed_ids = set(s_ids) - set(ods_ids) + fixed_ids.add(rl.get_main_reactant_id()) + fixed_ids.add(rl.get_main_product_id()) + + position_ids = sorted(list(set(s_ids) - fixed_ids)) + + rl: be.ReactionLayout = gu.get_predicted_layout(g, r_id) + time = max(gr.utils.size(rl.get_main_reactant_pos() - rl.get_main_product_pos()), 2*settings.render.optimal_species_reaction_dist) + + time_step = time / cnt_iterations + + force = Force(g, r_id, position_ids, bbs) + + + for i in range(cnt_iterations): + + disp: Dict[str, np.ndarray] = {} + disp_repulsive = force.repulsive() + disp_attractive = force.attractive() + for s_id in position_ids: + # print(time, s_id, disp_repulsive[s_id], disp_attractive[s_id]) + disp[s_id] = disp_attractive[s_id] + disp_repulsive[s_id] + + for s_id in position_ids: + # shift = gr.utils.normalize(disp[s_id]) * min(gr.utils.size(disp[s_id]), time) + shift = disp[s_id] + bbs[s_id].shift(shift) + + time -= time_step + + return {id: bb.get_center() for id, bb in bbs.items()} + - temperature -= 1 - return -def update_configuration(conf: Dict[str, gr.Rectangle], shift_radius: float): - for rect in conf.values(): - shift = [random() * shift_radius, random() * shift_radius] - rect.shift(shift) -def evaluate_energy(g: nx.MultiGraph, conf: Dict[str, gr.Rectangle], rects_fixed: List[gr.Rectangle], mr_id: str, mp_id: str, rs: gu.ReactionSpecies) -> float: - # the energy function should have the following components: - # - repulsive component - forcing the layout - # - edge length component - forcing optimal lengths - # - crossing component - minimize the number of crossings - # - orthogonal component - forcing horizontal and vertical positions for main reactant and products if these are ODS - energy = 0 - constants = { - "repulsive": 1 - } - if len(conf) <= 1: - return energy - rects = list(conf.values()) - dist_sum = 0 - for i1 in range(len(rects)): - r1 = rects[i1] - for i2 in range(i1+1, len(rects)): - r2 = rects[i2] - dist_sum += r1.distance(r2) - energy += constants["repulsive"] / dist_sum diff --git a/talent/graphics.py b/talent/graphics.py index 08df882..14f4b68 100644 --- a/talent/graphics.py +++ b/talent/graphics.py @@ -74,23 +74,41 @@ class Rectangle: def get_center(self): return self.get_lt() + np.array([(self.get_rt()[0] - self.get_lt()[0]) / 2, (self.get_lb()[1] - self.get_lt()[1]) / 2]) - def distance(self, r2:'Rectangle'): + def distance_rect(self, r2: 'Rectangle', two_d:bool = False) -> float or np.ndarray: r1 = self if r1.get_x2() < r2.get_x1(): x_diff = r2.get_x1() - r1.get_x2() elif r2.get_x2() < r1.get_x1(): - x_diff = r1.get_x1() - r2.get_x2() + x_diff = r2.get_x2() - r1.get_x1() else: x_diff = 0 if r1.get_y2() < r2.get_y1(): y_diff = r2.get_y1() - r1.get_y2() elif r2.get_y2() < r1.get_y1(): - y_diff = r1.get_y1() - r2.get_y2() + y_diff = r2.get_y2() - r1.get_y1() else: y_diff = 0 - return math.sqrt(x_diff*x_diff + y_diff*y_diff) + return np.array([x_diff, y_diff]) if two_d else math.sqrt(x_diff*x_diff + y_diff*y_diff) + + def distance_point(self, p: np.ndarray, two_d:bool = False) -> float or np.ndarray: + + if self.get_x2() < p[0]: + x_diff = p[0] - self.get_x2() + elif p[0] < self.get_x1(): + x_diff = p[0] - self.get_x1() + else: + x_diff = 0 + + if self.get_y2() < p[1]: + y_diff = p[1] - self.get_y2() + elif p[1] < self.get_y1(): + y_diff = p[1] - self.get_y1() + else: + y_diff = 0 + + return np.array([x_diff, y_diff]) if two_d else math.sqrt(x_diff * x_diff + y_diff * y_diff) def inside(self, other: 'Rectangle') -> bool: diff --git a/talent/layout.py b/talent/layout.py index b8e3885..e673de2 100644 --- a/talent/layout.py +++ b/talent/layout.py @@ -13,6 +13,7 @@ from . import settings from . import bioentity as be from . import graph_utils as gu from . import graphics as gr +from . import fdp from itertools import combinations, product -- GitLab