|
@@ -111,36 +111,130 @@ def build_derivation_result(
|
|
|
"""
|
|
"""
|
|
|
生成整体推导结果:每轮 轮次、推导成功的选题点、未推导成功的选题点、本次新推导成功的选题点。
|
|
生成整体推导结果:每轮 轮次、推导成功的选题点、未推导成功的选题点、本次新推导成功的选题点。
|
|
|
选题点用 topic_points 中的完整信息;按 name 判定是否被推导(评估中的 match_post_point)。
|
|
选题点用 topic_points 中的完整信息;按 name 判定是否被推导(评估中的 match_post_point)。
|
|
|
|
|
+ 若之前推导成功的选题点 is_fully_derived=false,本轮变为 is_fully_derived=true,则算本次新推导成功的选题点,
|
|
|
|
|
+ 且 matched_score、is_fully_derived 在本轮后更新为该轮评估值。
|
|
|
|
|
+ 推导成功的选题点:使用当前已更新的 best (matched_score, is_fully_derived)。
|
|
|
|
|
+ 本次新推导成功的选题点:用当轮评估的 matched_score、is_fully_derived。
|
|
|
|
|
+ 未推导成功的选题点:不包含 matched_score、is_fully_derived。
|
|
|
"""
|
|
"""
|
|
|
all_keys = {_topic_point_key(t) for t in topic_points}
|
|
all_keys = {_topic_point_key(t) for t in topic_points}
|
|
|
topic_by_key = {_topic_point_key(t): t for t in topic_points}
|
|
topic_by_key = {_topic_point_key(t): t for t in topic_points}
|
|
|
|
|
|
|
|
|
|
+ # 分轮次收集 (round_num, name) -> (matched_score, is_fully_derived),同一轮同名取首次出现
|
|
|
|
|
+ score_by_round_name: dict[tuple[int, str], tuple[float, bool]] = {}
|
|
|
|
|
+ for round_idx, eval_data in enumerate(evals):
|
|
|
|
|
+ round_num = eval_data.get("round", round_idx + 1)
|
|
|
|
|
+ for er in eval_data.get("eval_results") or []:
|
|
|
|
|
+ if not (er.get("is_matched") is True or er.get("match_result") == "匹配"):
|
|
|
|
|
+ continue
|
|
|
|
|
+ mp = (er.get("matched_post_point") or er.get("matched_post_topic") or er.get("match_post_point") or "").strip()
|
|
|
|
|
+ if not mp:
|
|
|
|
|
+ continue
|
|
|
|
|
+ key = (round_num, mp)
|
|
|
|
|
+ if key in score_by_round_name:
|
|
|
|
|
+ continue
|
|
|
|
|
+ score = er.get("matched_score")
|
|
|
|
|
+ if score is None:
|
|
|
|
|
+ score = 1.0
|
|
|
|
|
+ else:
|
|
|
|
|
+ try:
|
|
|
|
|
+ score = float(score)
|
|
|
|
|
+ except (TypeError, ValueError):
|
|
|
|
|
+ score = 1.0
|
|
|
|
|
+ is_fully = er.get("is_fully_derived", True)
|
|
|
|
|
+ score_by_round_name[key] = (score, bool(is_fully))
|
|
|
|
|
+
|
|
|
result = []
|
|
result = []
|
|
|
derived_names_so_far: set[str] = set()
|
|
derived_names_so_far: set[str] = set()
|
|
|
|
|
+ fully_derived_names_so_far: set[str] = set() # 已出现过 is_fully_derived=true 的选题点
|
|
|
|
|
+ best_score_by_name: dict[str, tuple[float, bool]] = {} # name -> (matched_score, is_fully_derived),遇 is_fully=true 时更新
|
|
|
|
|
|
|
|
for i, (derivation, eval_data) in enumerate(zip(derivations, evals)):
|
|
for i, (derivation, eval_data) in enumerate(zip(derivations, evals)):
|
|
|
round_num = derivation.get("round", i + 1)
|
|
round_num = derivation.get("round", i + 1)
|
|
|
eval_results = eval_data.get("eval_results") or []
|
|
eval_results = eval_data.get("eval_results") or []
|
|
|
matched_post_points = set()
|
|
matched_post_points = set()
|
|
|
for er in eval_results:
|
|
for er in eval_results:
|
|
|
- # 新格式: is_matched;旧格式: match_result == "匹配"
|
|
|
|
|
if not (er.get("is_matched") is True or er.get("match_result") == "匹配"):
|
|
if not (er.get("is_matched") is True or er.get("match_result") == "匹配"):
|
|
|
continue
|
|
continue
|
|
|
mp = er.get("matched_post_point") or er.get("matched_post_topic") or er.get("match_post_point") or ""
|
|
mp = er.get("matched_post_point") or er.get("matched_post_topic") or er.get("match_post_point") or ""
|
|
|
if mp and str(mp).strip():
|
|
if mp and str(mp).strip():
|
|
|
matched_post_points.add(str(mp).strip())
|
|
matched_post_points.add(str(mp).strip())
|
|
|
|
|
|
|
|
- new_derived_names = matched_post_points - derived_names_so_far
|
|
|
|
|
|
|
+ # 本轮每个匹配名的 (score, is_fully)
|
|
|
|
|
+ this_round_scores: dict[str, tuple[float, bool]] = {}
|
|
|
|
|
+ for name in matched_post_points:
|
|
|
|
|
+ val = score_by_round_name.get((round_num, name))
|
|
|
|
|
+ if val is not None:
|
|
|
|
|
+ this_round_scores[name] = val
|
|
|
|
|
+
|
|
|
|
|
+ # 本次新推导成功:首次匹配 或 之前 is_fully=false 且本轮 is_fully=true
|
|
|
|
|
+ new_derived_names = set()
|
|
|
|
|
+ for name in matched_post_points:
|
|
|
|
|
+ score, is_fully = this_round_scores.get(name, (None, False))
|
|
|
|
|
+ if name not in derived_names_so_far:
|
|
|
|
|
+ new_derived_names.add(name)
|
|
|
|
|
+ elif name not in fully_derived_names_so_far and is_fully:
|
|
|
|
|
+ new_derived_names.add(name)
|
|
|
|
|
+
|
|
|
|
|
+ # 更新推导集合与 best:首次出现或本轮 is_fully=true 时更新 best
|
|
|
derived_names_so_far |= matched_post_points
|
|
derived_names_so_far |= matched_post_points
|
|
|
|
|
+ for name in matched_post_points:
|
|
|
|
|
+ val = this_round_scores.get(name)
|
|
|
|
|
+ if val is None:
|
|
|
|
|
+ continue
|
|
|
|
|
+ score, is_fully = val
|
|
|
|
|
+ if name not in best_score_by_name:
|
|
|
|
|
+ best_score_by_name[name] = (score, is_fully)
|
|
|
|
|
+ elif is_fully:
|
|
|
|
|
+ best_score_by_name[name] = (score, is_fully)
|
|
|
|
|
+ if is_fully:
|
|
|
|
|
+ fully_derived_names_so_far.add(name)
|
|
|
|
|
|
|
|
- # 推导成功的选题点:name 在 derived_names_so_far 中的选题点(每 name 取一条,与 topic_points 顺序一致)
|
|
|
|
|
derived_keys = {k for k in all_keys if topic_by_key[k]["name"] in derived_names_so_far}
|
|
derived_keys = {k for k in all_keys if topic_by_key[k]["name"] in derived_names_so_far}
|
|
|
new_derived_keys = {k for k in all_keys if topic_by_key[k]["name"] in new_derived_names}
|
|
new_derived_keys = {k for k in all_keys if topic_by_key[k]["name"] in new_derived_names}
|
|
|
not_derived_keys = all_keys - derived_keys
|
|
not_derived_keys = all_keys - derived_keys
|
|
|
|
|
|
|
|
- derived_list = [dict(topic_by_key[k]) for k in sorted(derived_keys, key=lambda k: (topic_by_key[k]["name"], k[1], k[2]))]
|
|
|
|
|
- new_list = [dict(topic_by_key[k]) for k in sorted(new_derived_keys, key=lambda k: (topic_by_key[k]["name"], k[1], k[2]))]
|
|
|
|
|
- not_derived_list = [dict(topic_by_key[k]) for k in sorted(not_derived_keys, key=lambda k: (topic_by_key[k]["name"], k[1], k[2]))]
|
|
|
|
|
|
|
+ sort_derived = sorted(derived_keys, key=lambda k: (topic_by_key[k]["name"], k[1], k[2]))
|
|
|
|
|
+ sort_new = sorted(new_derived_keys, key=lambda k: (topic_by_key[k]["name"], k[1], k[2]))
|
|
|
|
|
+ sort_not = sorted(not_derived_keys, key=lambda k: (topic_by_key[k]["name"], k[1], k[2]))
|
|
|
|
|
+
|
|
|
|
|
+ def add_score_fields(keys: set, sort_keys: list, round_for_score: int | None) -> list[dict]:
|
|
|
|
|
+ """round_for_score: 用该轮评估的分数;若为 None 则不添加 score 字段。"""
|
|
|
|
|
+ out = []
|
|
|
|
|
+ for k in sort_keys:
|
|
|
|
|
+ if k not in keys:
|
|
|
|
|
+ continue
|
|
|
|
|
+ obj = dict(topic_by_key[k])
|
|
|
|
|
+ if round_for_score is not None:
|
|
|
|
|
+ name = obj.get("name", "")
|
|
|
|
|
+ val = score_by_round_name.get((round_for_score, name))
|
|
|
|
|
+ if val is not None:
|
|
|
|
|
+ obj["matched_score"] = val[0]
|
|
|
|
|
+ obj["is_fully_derived"] = val[1]
|
|
|
|
|
+ else:
|
|
|
|
|
+ obj["matched_score"] = None
|
|
|
|
|
+ obj["is_fully_derived"] = False
|
|
|
|
|
+ out.append(obj)
|
|
|
|
|
+ return out
|
|
|
|
|
+
|
|
|
|
|
+ # 推导成功的选题点:用当前已更新的 best (matched_score, is_fully_derived)
|
|
|
|
|
+ derived_list = []
|
|
|
|
|
+ for k in sort_derived:
|
|
|
|
|
+ if k not in derived_keys:
|
|
|
|
|
+ continue
|
|
|
|
|
+ obj = dict(topic_by_key[k])
|
|
|
|
|
+ name = obj.get("name", "")
|
|
|
|
|
+ val = best_score_by_name.get(name)
|
|
|
|
|
+ if val is not None:
|
|
|
|
|
+ obj["matched_score"] = val[0]
|
|
|
|
|
+ obj["is_fully_derived"] = val[1]
|
|
|
|
|
+ else:
|
|
|
|
|
+ obj["matched_score"] = None
|
|
|
|
|
+ obj["is_fully_derived"] = False
|
|
|
|
|
+ derived_list.append(obj)
|
|
|
|
|
+
|
|
|
|
|
+ new_list = add_score_fields(new_derived_keys, sort_new, round_for_score=round_num)
|
|
|
|
|
+ not_derived_list = [dict(topic_by_key[k]) for k in sort_not] # 不带 matched_score、is_fully_derived
|
|
|
|
|
|
|
|
result.append({
|
|
result.append({
|
|
|
"轮次": round_num,
|
|
"轮次": round_num,
|
|
@@ -182,75 +276,109 @@ def build_visualize_edges(
|
|
|
) -> tuple[list[dict], list[dict]]:
|
|
) -> tuple[list[dict], list[dict]]:
|
|
|
"""
|
|
"""
|
|
|
生成 node_list(所有评估通过的帖子选题点)和 edge_list(只保留评估通过的推导路径)。
|
|
生成 node_list(所有评估通过的帖子选题点)和 edge_list(只保留评估通过的推导路径)。
|
|
|
- 按轮次从小到大处理,保证每个输出节点最多只出现在一条边的 output_nodes 里,且保留的是前面轮次的数据。
|
|
|
|
|
|
|
+ - node_list:同一轮内节点不重复,重复时保留 matched_score 更高的;节点带 matched_score、is_fully_derived。
|
|
|
|
|
+ - edge_list:边带 level(与 output 节点 level 一致);同一轮内 output 节点不重复;若前面轮次该节点匹配分更高则本轮不保留该节点。
|
|
|
|
|
+ 评估数据支持 path_id(对应推导 derivation_results[].id)、item_id(output 中元素从 1 起的序号)、matched_score、is_fully_derived。
|
|
|
"""
|
|
"""
|
|
|
- # 按轮次从小到大排序,确保优先使用前面轮次的输出节点
|
|
|
|
|
derivations = sorted(derivations, key=lambda d: d.get("round", 0))
|
|
derivations = sorted(derivations, key=lambda d: d.get("round", 0))
|
|
|
evals = sorted(evals, key=lambda e: e.get("round", 0))
|
|
evals = sorted(evals, key=lambda e: e.get("round", 0))
|
|
|
|
|
|
|
|
- topic_by_name = {}
|
|
|
|
|
- for t in topic_points:
|
|
|
|
|
- name = t["name"]
|
|
|
|
|
- if name not in topic_by_name:
|
|
|
|
|
- topic_by_name[name] = t
|
|
|
|
|
|
|
+ topic_by_name = {t["name"]: t for t in topic_points}
|
|
|
|
|
|
|
|
- # 不依赖 id,仅用 (round, derivation_output_point) 与推导的 output 节点名匹配关联评估结果
|
|
|
|
|
- # key=(round_num, derivation_output_point) -> (matched_post_point, matched_reason);同轮同节点取首次匹配
|
|
|
|
|
- match_by_round_output: dict[tuple[int, str], tuple[str, str]] = {}
|
|
|
|
|
|
|
+ # 评估匹配:(round_num, path_id, item_id) -> (matched_post_point, matched_reason, matched_score, is_fully_derived)
|
|
|
|
|
+ # path_id = 推导中 derivation_results[].id,item_id = output 中元素从 1 起的序号
|
|
|
|
|
+ match_by_path_item: dict[tuple[int, int, int], tuple[str, str, float, bool]] = {}
|
|
|
|
|
+ match_by_round_output: dict[tuple[int, str], tuple[str, str, float, bool]] = {} # 兼容无 path_id/item_id
|
|
|
for round_idx, eval_data in enumerate(evals):
|
|
for round_idx, eval_data in enumerate(evals):
|
|
|
round_num = eval_data.get("round", round_idx + 1)
|
|
round_num = eval_data.get("round", round_idx + 1)
|
|
|
for er in eval_data.get("eval_results") or []:
|
|
for er in eval_data.get("eval_results") or []:
|
|
|
if not (er.get("is_matched") is True or er.get("match_result") == "匹配"):
|
|
if not (er.get("is_matched") is True or er.get("match_result") == "匹配"):
|
|
|
continue
|
|
continue
|
|
|
|
|
+ mp = (er.get("matched_post_point") or er.get("matched_post_topic") or er.get("match_post_point") or "").strip()
|
|
|
|
|
+ if not mp:
|
|
|
|
|
+ continue
|
|
|
out_point = (er.get("derivation_output_point") or "").strip()
|
|
out_point = (er.get("derivation_output_point") or "").strip()
|
|
|
- mp = er.get("matched_post_point") or er.get("matched_post_topic") or er.get("match_post_point") or ""
|
|
|
|
|
- matched_reason = er.get("matched_reason") or er.get("match_reason") or ""
|
|
|
|
|
- if out_point and mp and str(mp).strip():
|
|
|
|
|
- mp = str(mp).strip()
|
|
|
|
|
|
|
+ reason = (er.get("matched_reason") or er.get("match_reason") or "").strip()
|
|
|
|
|
+ score = er.get("matched_score")
|
|
|
|
|
+ if score is None:
|
|
|
|
|
+ score = 1.0
|
|
|
|
|
+ else:
|
|
|
|
|
+ try:
|
|
|
|
|
+ score = float(score)
|
|
|
|
|
+ except (TypeError, ValueError):
|
|
|
|
|
+ score = 1.0
|
|
|
|
|
+ is_fully = er.get("is_fully_derived", True)
|
|
|
|
|
+ val = (mp, reason, score, bool(is_fully))
|
|
|
|
|
+ path_id = er.get("path_id")
|
|
|
|
|
+ item_id = er.get("item_id")
|
|
|
|
|
+ if path_id is not None and item_id is not None:
|
|
|
|
|
+ try:
|
|
|
|
|
+ match_by_path_item[(round_num, int(path_id), int(item_id))] = val
|
|
|
|
|
+ except (TypeError, ValueError):
|
|
|
|
|
+ pass
|
|
|
|
|
+ if out_point:
|
|
|
k = (round_num, out_point)
|
|
k = (round_num, out_point)
|
|
|
if k not in match_by_round_output:
|
|
if k not in match_by_round_output:
|
|
|
- match_by_round_output[k] = (mp, str(matched_reason).strip() if matched_reason else "")
|
|
|
|
|
|
|
+ match_by_round_output[k] = val
|
|
|
|
|
+
|
|
|
|
|
+ # 按 (round_num, mp) 收集节点候选,同轮同节点保留 matched_score 最高的一条
|
|
|
|
|
+ node_candidates: dict[tuple[int, str], dict] = {} # (round_num, mp) -> node_dict (含 score, is_fully_derived)
|
|
|
|
|
+
|
|
|
|
|
+ def get_match(round_num: int, path_id: int | None, item_id: int | None, out_item: str) -> tuple[str, str, float, bool] | None:
|
|
|
|
|
+ if path_id is not None and item_id is not None:
|
|
|
|
|
+ v = match_by_path_item.get((round_num, path_id, item_id))
|
|
|
|
|
+ if v is not None:
|
|
|
|
|
+ return v
|
|
|
|
|
+ return match_by_round_output.get((round_num, out_item))
|
|
|
|
|
|
|
|
- node_list = []
|
|
|
|
|
- seen_nodes = set()
|
|
|
|
|
edge_list = []
|
|
edge_list = []
|
|
|
- level_by_name = {}
|
|
|
|
|
- output_nodes_seen: set[str] = set() # 已在之前边的 output_nodes 中出现过的节点,避免同一输出节点对应多条边
|
|
|
|
|
|
|
+ round_output_seen: set[tuple[int, str]] = set() # (round_num, node_name) 本轮已作为某边的 output
|
|
|
|
|
+ best_score_by_node: dict[str, float] = {} # node_name -> 已出现过的最高 matched_score
|
|
|
|
|
|
|
|
for round_idx, derivation in enumerate(derivations):
|
|
for round_idx, derivation in enumerate(derivations):
|
|
|
round_num = derivation.get("round", round_idx + 1)
|
|
round_num = derivation.get("round", round_idx + 1)
|
|
|
for dr in derivation.get("derivation_results") or []:
|
|
for dr in derivation.get("derivation_results") or []:
|
|
|
output_list = dr.get("output") or []
|
|
output_list = dr.get("output") or []
|
|
|
- matched_outputs = []
|
|
|
|
|
- matched_reasons = []
|
|
|
|
|
- matched_derivation_outputs = []
|
|
|
|
|
- for out_item in output_list:
|
|
|
|
|
- key = (round_num, out_item)
|
|
|
|
|
- pair = match_by_round_output.get(key)
|
|
|
|
|
- if not pair:
|
|
|
|
|
|
|
+ path_id = dr.get("id")
|
|
|
|
|
+ matched: list[tuple[str, str, float, bool, str]] = [] # (mp, reason, score, is_fully, derivation_out)
|
|
|
|
|
+ for i, out_item in enumerate(output_list):
|
|
|
|
|
+ item_id = i + 1
|
|
|
|
|
+ v = get_match(round_num, path_id, item_id, out_item)
|
|
|
|
|
+ if not v:
|
|
|
continue
|
|
continue
|
|
|
- mp, reason = pair
|
|
|
|
|
- matched_outputs.append(mp)
|
|
|
|
|
- matched_reasons.append(reason)
|
|
|
|
|
- matched_derivation_outputs.append(out_item)
|
|
|
|
|
- if mp not in seen_nodes:
|
|
|
|
|
- seen_nodes.add(mp)
|
|
|
|
|
- node = dict(topic_by_name.get(mp, {"name": mp, "point": "", "dimension": "", "root_source": "", "root_sources_desc": ""}))
|
|
|
|
|
- node["level"] = round_num
|
|
|
|
|
- if "original_word" not in node:
|
|
|
|
|
- node["original_word"] = node.get("name", mp)
|
|
|
|
|
- node["derivation_type"] = dr.get("method", "")
|
|
|
|
|
- level_by_name[mp] = round_num
|
|
|
|
|
- node_list.append(node)
|
|
|
|
|
|
|
+ mp, reason, score, is_fully = v
|
|
|
|
|
+ matched.append((mp, reason, score, is_fully, out_item))
|
|
|
|
|
|
|
|
- if not matched_outputs:
|
|
|
|
|
|
|
+ if not matched:
|
|
|
continue
|
|
continue
|
|
|
|
|
|
|
|
- # 只保留尚未在之前边的 output_nodes 中出现过的节点,避免同一输出节点对应多条边
|
|
|
|
|
- output_names_this_edge = [x for x in matched_outputs if x not in output_nodes_seen]
|
|
|
|
|
|
|
+ # 同一轮内 output 节点不重复;若前面轮次该节点匹配分更高则本轮不保留
|
|
|
|
|
+ output_names_this_edge = []
|
|
|
|
|
+ for mp, reason, score, is_fully, out_item in matched:
|
|
|
|
|
+ if (round_num, mp) in round_output_seen:
|
|
|
|
|
+ continue
|
|
|
|
|
+ if score <= best_score_by_node.get(mp, -1.0):
|
|
|
|
|
+ continue
|
|
|
|
|
+ output_names_this_edge.append((mp, reason, score, is_fully, out_item))
|
|
|
|
|
+
|
|
|
if not output_names_this_edge:
|
|
if not output_names_this_edge:
|
|
|
continue
|
|
continue
|
|
|
- output_nodes_seen.update(output_names_this_edge)
|
|
|
|
|
|
|
+
|
|
|
|
|
+ for mp, _r, score, _f, _o in output_names_this_edge:
|
|
|
|
|
+ round_output_seen.add((round_num, mp))
|
|
|
|
|
+ best_score_by_node[mp] = max(best_score_by_node.get(mp, -1.0), score)
|
|
|
|
|
+
|
|
|
|
|
+ # 节点候选:同轮同节点保留匹配分更高的
|
|
|
|
|
+ for mp, _reason, score, is_fully, _out_item in output_names_this_edge:
|
|
|
|
|
+ key = (round_num, mp)
|
|
|
|
|
+ if key not in node_candidates or node_candidates[key].get("matched_score", 0) < score:
|
|
|
|
|
+ node = dict(topic_by_name.get(mp, {"name": mp, "point": "", "dimension": "", "root_source": "", "root_sources_desc": ""}))
|
|
|
|
|
+ node["level"] = round_num
|
|
|
|
|
+ node.setdefault("original_word", node.get("name", mp))
|
|
|
|
|
+ node["derivation_type"] = dr.get("method", "")
|
|
|
|
|
+ node["matched_score"] = score
|
|
|
|
|
+ node["is_fully_derived"] = is_fully
|
|
|
|
|
+ node_candidates[key] = node
|
|
|
|
|
|
|
|
input_data = dr.get("input") or {}
|
|
input_data = dr.get("input") or {}
|
|
|
derived_nodes = input_data.get("derived_nodes") or []
|
|
derived_nodes = input_data.get("derived_nodes") or []
|
|
@@ -266,23 +394,26 @@ def build_visualize_edges(
|
|
|
else:
|
|
else:
|
|
|
input_pattern_nodes = []
|
|
input_pattern_nodes = []
|
|
|
|
|
|
|
|
- output_nodes = [{"name": x} for x in output_names_this_edge]
|
|
|
|
|
- # 与 output_names_this_edge 顺序对应的匹配理由、推导输出节点名
|
|
|
|
|
- mp_to_reason = dict(zip(matched_outputs, matched_reasons))
|
|
|
|
|
- mp_to_derivation_out = dict(zip(matched_outputs, matched_derivation_outputs))
|
|
|
|
|
- reason_for_this_edge = [mp_to_reason.get(name, "") for name in output_names_this_edge]
|
|
|
|
|
- derivation_points_this_edge = [mp_to_derivation_out.get(name, "") for name in output_names_this_edge]
|
|
|
|
|
|
|
+ output_nodes = []
|
|
|
|
|
+ reasons_list = []
|
|
|
|
|
+ derivation_points_list = []
|
|
|
|
|
+ for mp, reason, score, is_fully, out_item in output_names_this_edge:
|
|
|
|
|
+ output_nodes.append({"name": mp, "matched_score": score, "is_fully_derived": is_fully})
|
|
|
|
|
+ reasons_list.append(reason)
|
|
|
|
|
+ derivation_points_list.append(out_item)
|
|
|
|
|
+
|
|
|
detail = {
|
|
detail = {
|
|
|
"reason": dr.get("reason", ""),
|
|
"reason": dr.get("reason", ""),
|
|
|
"评估结果": "匹配成功",
|
|
"评估结果": "匹配成功",
|
|
|
}
|
|
}
|
|
|
- if any(reason_for_this_edge):
|
|
|
|
|
- detail["匹配理由"] = reason_for_this_edge
|
|
|
|
|
- detail["待比对的推导选题点"] = derivation_points_this_edge
|
|
|
|
|
|
|
+ if any(reasons_list):
|
|
|
|
|
+ detail["匹配理由"] = reasons_list
|
|
|
|
|
+ detail["待比对的推导选题点"] = derivation_points_list
|
|
|
if dr.get("tools"):
|
|
if dr.get("tools"):
|
|
|
detail["tools"] = dr["tools"]
|
|
detail["tools"] = dr["tools"]
|
|
|
edge_list.append({
|
|
edge_list.append({
|
|
|
"name": dr.get("method", "") or f"推导-{round_num}",
|
|
"name": dr.get("method", "") or f"推导-{round_num}",
|
|
|
|
|
+ "level": round_num,
|
|
|
"input_post_nodes": input_post_nodes,
|
|
"input_post_nodes": input_post_nodes,
|
|
|
"input_tree_nodes": input_tree_nodes,
|
|
"input_tree_nodes": input_tree_nodes,
|
|
|
"input_pattern_nodes": input_pattern_nodes,
|
|
"input_pattern_nodes": input_pattern_nodes,
|
|
@@ -290,12 +421,24 @@ def build_visualize_edges(
|
|
|
"detail": detail,
|
|
"detail": detail,
|
|
|
})
|
|
})
|
|
|
|
|
|
|
|
|
|
+ node_list = list(node_candidates.values())
|
|
|
return node_list, edge_list
|
|
return node_list, edge_list
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
+def _find_project_root() -> Path:
|
|
|
|
|
+ """从脚本所在目录向上查找包含 .git 的项目根目录。"""
|
|
|
|
|
+ p = Path(__file__).resolve().parent
|
|
|
|
|
+ while p != p.parent:
|
|
|
|
|
+ if (p / ".git").is_dir():
|
|
|
|
|
+ return p
|
|
|
|
|
+ p = p.parent
|
|
|
|
|
+ return Path(__file__).resolve().parent
|
|
|
|
|
+
|
|
|
|
|
+
|
|
|
def generate_visualize_data(account_name: str, post_id: str, log_id: str, base_dir: Path | None = None) -> None:
|
|
def generate_visualize_data(account_name: str, post_id: str, log_id: str, base_dir: Path | None = None) -> None:
|
|
|
"""
|
|
"""
|
|
|
主流程:读取解构内容与推导日志,生成整体推导结果与整体推导路径可视化两个 JSON。
|
|
主流程:读取解构内容与推导日志,生成整体推导结果与整体推导路径可视化两个 JSON。
|
|
|
|
|
+ base_dir 默认为脚本所在目录;若其下 output/.../推导日志 不存在,则尝试项目根目录下的 output/...(兼容从项目根运行)。
|
|
|
"""
|
|
"""
|
|
|
if base_dir is None:
|
|
if base_dir is None:
|
|
|
base_dir = Path(__file__).resolve().parent
|
|
base_dir = Path(__file__).resolve().parent
|
|
@@ -303,6 +446,15 @@ def generate_visualize_data(account_name: str, post_id: str, log_id: str, base_d
|
|
|
log_dir = base_dir / "output" / account_name / "推导日志" / post_id / log_id
|
|
log_dir = base_dir / "output" / account_name / "推导日志" / post_id / log_id
|
|
|
result_dir = base_dir / "output" / account_name / "整体推导结果"
|
|
result_dir = base_dir / "output" / account_name / "整体推导结果"
|
|
|
visualize_dir = base_dir / "output" / account_name / "整体推导路径可视化"
|
|
visualize_dir = base_dir / "output" / account_name / "整体推导路径可视化"
|
|
|
|
|
+ # 兼容:若推导日志不在 base_dir 下,尝试项目根目录下的 output/
|
|
|
|
|
+ if not log_dir.is_dir():
|
|
|
|
|
+ project_root = _find_project_root()
|
|
|
|
|
+ if project_root != base_dir:
|
|
|
|
|
+ alt_log = project_root / "output" / account_name / "推导日志" / post_id / log_id
|
|
|
|
|
+ if alt_log.is_dir():
|
|
|
|
|
+ log_dir = alt_log
|
|
|
|
|
+ result_dir = project_root / "output" / account_name / "整体推导结果"
|
|
|
|
|
+ visualize_dir = project_root / "output" / account_name / "整体推导路径可视化"
|
|
|
|
|
|
|
|
deconstruct_path = input_dir / f"{post_id}.json"
|
|
deconstruct_path = input_dir / f"{post_id}.json"
|
|
|
topic_points = parse_topic_points_from_deconstruct(deconstruct_path)
|
|
topic_points = parse_topic_points_from_deconstruct(deconstruct_path)
|
|
@@ -340,6 +492,6 @@ def main(account_name, post_id, log_id):
|
|
|
|
|
|
|
|
if __name__ == "__main__":
|
|
if __name__ == "__main__":
|
|
|
account_name="家有大志"
|
|
account_name="家有大志"
|
|
|
- post_id = "69185d49000000000d00f94e"
|
|
|
|
|
- log_id="20260310111409"
|
|
|
|
|
|
|
+ post_id = "68fb6a5c000000000302e5de"
|
|
|
|
|
+ log_id="20260310220945"
|
|
|
main(account_name, post_id, log_id)
|
|
main(account_name, post_id, log_id)
|