فهرست منبع

赠机判断聚类树是否存在

xueyiming 1 ماه پیش
والد
کامیت
eec7a2a445
3فایلهای تغییر یافته به همراه45 افزوده شده و 3 حذف شده
  1. 37 0
      examples/demand/db_manager.py
  2. 3 3
      examples/demand/pattern_builds/pattern_service.py
  3. 5 0
      examples/demand/web_api.py

+ 37 - 0
examples/demand/db_manager.py

@@ -20,6 +20,22 @@ class DatabaseManager:
         return self.SessionLocal()
 
 
+class DatabaseManager2:
+    """数据库管理类"""
+
+    # mysql+pymysql://<用户名>:<密码>@<主机地址>:<端口>/<数据库名>?charset=utf8mb4
+    def __init__(self):
+        connection_string = (
+            f"mysql+pymysql://content_rw:bC1aH4bA1lB0@rm-t4nh1xx6o2a6vj8qu3o.mysql.singapore.rds.aliyuncs.com:3306/open_aigc_pattern?charset=utf8mb4"
+        )
+        self.engine = create_engine(connection_string, pool_pre_ping=True, pool_recycle=3600)
+        self.SessionLocal = sessionmaker(bind=self.engine, autoflush=False, autocommit=False)
+
+    def get_session(self) -> Session:
+        """获取数据库会话"""
+        return self.SessionLocal()
+
+
 def query_video_ids_by_names(execution_id: int, names: Iterable[str]) -> list[str]:
     """按 execution_id + 名称列表查询去重后的 post_id。"""
     clean_names = [str(n).strip() for n in names if n is not None and str(n).strip()]
@@ -76,3 +92,24 @@ def query_video_ids_by_names(execution_id: int, names: Iterable[str]) -> list[st
 
     return list(video_ids)
 
+
+db2 = DatabaseManager2()
+
+
+def exist_cluster_tree(merge_level2):
+    session = db2.get_session()
+
+    exec_row = session.execute(
+        text("""
+            SELECT id
+            FROM cluster_execution
+            WHERE name LIKE :name AND status = 2
+            ORDER BY create_time DESC
+            LIMIT 1
+        """),
+        {"name": f"{merge_level2}%"},
+    ).mappings().first()
+
+    if not exec_row:
+        return False
+    return True

+ 3 - 3
examples/demand/pattern_builds/pattern_service.py

@@ -315,7 +315,7 @@ def _store_category_tree_snapshot(session, execution_id: int, categories: list,
 
 
 def get_merge_level2_crawler_data(merge_level2) -> dict:
-    """用 merge_level2 匹配 crawler_execution.name,取最新成功记录的 id 作为 execution_id,再查 global_* 三表。"""
+    """用 merge_level2 匹配 cluster_execution.name,取最新成功记录的 id 作为 execution_id,再查 global_* 三表。"""
     session = db.get_session()
 
     empty = {
@@ -336,7 +336,7 @@ def get_merge_level2_crawler_data(merge_level2) -> dict:
         exec_row = session.execute(
             text("""
                 SELECT id
-                FROM crawler_execution
+                FROM cluster_execution
                 WHERE name LIKE :name AND status = 2
                 ORDER BY create_time DESC
                 LIMIT 1
@@ -346,7 +346,7 @@ def get_merge_level2_crawler_data(merge_level2) -> dict:
 
         if not exec_row:
             empty["message"] = (
-                f"未找到 crawler_execution: name={merge_level2!r} 且 status=2(按 create_time 最新)"
+                f"未找到 cluster_execution: name={merge_level2!r} 且 status=2(按 create_time 最新)"
             )
             return empty
 

+ 5 - 0
examples/demand/web_api.py

@@ -14,6 +14,8 @@ from zoneinfo import ZoneInfo
 from fastapi import FastAPI, HTTPException
 from pydantic import BaseModel
 
+from examples.demand.db_manager import exist_cluster_tree
+
 # 添加项目根目录到 Python 路径(与 run.py 保持一致)
 sys.path.insert(0, str(Path(__file__).parent.parent.parent))
 
@@ -77,6 +79,9 @@ async def demand_start_sync(cluster_name: str, platform_type: Literal["piaoquan"
     task_id: Optional[int] = None
     try:
         if platform_type == "piaoquan":
+            exist = exist_cluster_tree(cluster_name)
+            if not exist:
+                raise ValueError("获取聚类树失败")
             execution_id = piaoquan_prepare(cluster_name)
         elif platform_type == "changwen":
             execution_id = changwen_prepare(cluster_name)