فهرست منبع

feat: 取每个品类的top依次执行

jihuaqiang 1 ماه پیش
والد
کامیت
8f6f588ba6
1فایلهای تغییر یافته به همراه52 افزوده شده و 21 حذف شده
  1. 52 21
      examples/content_finder/db/schedule.py

+ 52 - 21
examples/content_finder/db/schedule.py

@@ -212,37 +212,68 @@ def get_one_today_unprocessed_demand(*, dt: int) -> Optional[Dict[str, Any]]:
         } 或 None
     """
     sql = """
-    SELECT x.demand_content_id,
-           x.query,
-           x.suggestion,
-           x.score,
-           x.merge_leve2,
-           x.rn AS category_rank
-    FROM (
-        SELECT dc.id AS demand_content_id,
-               dc.name AS query,
-               dc.suggestion AS suggestion,
-               dc.score AS score,
-               dc.merge_leve2 AS merge_leve2,
-               ROW_NUMBER() OVER (
-                   PARTITION BY COALESCE(dc.merge_leve2, '')
-                   ORDER BY dc.score DESC, dc.id DESC
-               ) AS rn
+    WITH processed AS (
+        SELECT
+            COALESCE(dc.merge_leve2, '') AS cat,
+            COUNT(*) AS processed_cnt
+        FROM demand_find_task t
+        INNER JOIN demand_content dc ON dc.id = t.demand_content_id
+        WHERE dc.dt = %s
+        GROUP BY COALESCE(dc.merge_leve2, '')
+    ),
+    ranked AS (
+        SELECT
+            dc.id AS demand_content_id,
+            dc.name AS query,
+            dc.suggestion AS suggestion,
+            dc.score AS score,
+            (dc.score + 0) AS score_num,
+            dc.merge_leve2 AS merge_leve2,
+            ROW_NUMBER() OVER (
+                PARTITION BY COALESCE(dc.merge_leve2, '')
+                ORDER BY (dc.score + 0) DESC, dc.id DESC
+            ) AS rn
         FROM demand_content dc
         WHERE dc.dt = %s
           AND NOT EXISTS (
             SELECT 1 FROM demand_find_task t
             WHERE t.demand_content_id = dc.id
           )
-    ) x
-    ORDER BY x.rn ASC, x.score DESC, x.demand_content_id DESC
+    ),
+    candidates AS (
+        SELECT
+            r.demand_content_id,
+            r.query,
+            r.suggestion,
+            r.score,
+            r.merge_leve2,
+            r.rn AS category_rank
+        FROM ranked r
+        LEFT JOIN processed p
+               ON p.cat = COALESCE(r.merge_leve2, '')
+        WHERE r.rn = COALESCE(p.processed_cnt, 0) + 1
+    ),
+    next_layer AS (
+        SELECT MIN(category_rank) AS target_rank
+        FROM candidates
+    )
+    SELECT
+        c.demand_content_id,
+        c.query,
+        c.suggestion,
+        c.score,
+        c.merge_leve2,
+        c.category_rank
+    FROM candidates c
+    INNER JOIN next_layer nl ON nl.target_rank = c.category_rank
+    ORDER BY (c.score + 0) DESC, c.demand_content_id DESC
     LIMIT 1
     """
     conn = None
     try:
         conn = get_connection()
         with conn.cursor() as cur:
-            cur.execute(sql, (int(dt),))
+            cur.execute(sql, (int(dt), int(dt)))
             row = cur.fetchone()
             return dict(row) if row else None
     except Exception as e:
@@ -276,7 +307,7 @@ def get_daily_unprocessed_pool(
             dc.score AS score,
             ROW_NUMBER() OVER (
                 PARTITION BY COALESCE(dc.merge_leve2, '')
-                ORDER BY dc.score DESC, dc.id DESC
+                ORDER BY (dc.score + 0) DESC, dc.id DESC
             ) AS rn
         FROM demand_content dc
         WHERE NOT EXISTS (
@@ -285,7 +316,7 @@ def get_daily_unprocessed_pool(
         )
     ) x
     WHERE x.rn <= %s
-    ORDER BY x.score DESC, x.demand_content_id DESC
+    ORDER BY (x.score + 0) DESC, x.demand_content_id DESC
     LIMIT %s
     """
     conn = None