Просмотр исходного кода

feat: 飞书上传增强(filter/limit/cell上限) + AB效果SQL更新D链指标

- fetch_daily: 新增 filter/limit 参数、富文本表头兼容、5M cell 上限保护、
  简化写入流程(expand+sequential write 替代 reverse insert)
- AB实时效果: 分组改为单码(ab0 而非 ab0,ab1)
- AB天级效果含多跳: 添加 D 链指标(d_1~d_3/total_d/total_bcd/total_bcd1)及 lift
- rosn品类分析: dt 匹配改为 LIKE 支持月份前缀

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
yangxiaohui 3 недель назад
Родитель
Сommit
854ce9da36

+ 88 - 57
fetch_daily.py

@@ -161,6 +161,8 @@ def load_feishu_config(sql_file):
         "sheet_id": None,
         "sort": "dt:desc",
         "cols": None,
+        "filter": None,
+        "limit": None,
     }
 
     root_dir = Path(__file__).parent
@@ -259,7 +261,7 @@ def column_index_to_letter(col_idx):
     return result
 
 
-def upload_to_feishu(csv_file, sheet_token, sheet_id=None, sort_spec="dt:desc", cols_spec=None):
+def upload_to_feishu(csv_file, sheet_token, sheet_id=None, sort_spec="dt:desc", cols_spec=None, filter_spec=None, limit=None):
     """上传 CSV 文件到飞书表格(通过模板行继承样式)
 
     第1行: 表头
@@ -272,6 +274,8 @@ def upload_to_feishu(csv_file, sheet_token, sheet_id=None, sort_spec="dt:desc",
         sheet_id: 工作表 ID(None 时自动获取第一个)
         sort_spec: 排序规格,如 "dt:desc,name:asc"
         cols_spec: 列映射规格,如 "dt:日期,name,value:数值"
+        filter_spec: 过滤条件,dict {"字段": "值"} 或 str "字段=值,字段=值"
+        limit: 上传行数上限
     """
     from feishu import Client, LARK_HOST, APP_ID, APP_SECRET, request
 
@@ -299,6 +303,36 @@ def upload_to_feishu(csv_file, sheet_token, sheet_id=None, sort_spec="dt:desc",
         if applied:
             print(f"排序: {', '.join(reversed(applied))}")
 
+    # 过滤(排序之后)
+    if filter_spec:
+        # 支持 dict(来自 JSON 配置)或 str(来自命令行 "字段=值,字段!=值")
+        if isinstance(filter_spec, str):
+            filters = []
+            for part in filter_spec.split(","):
+                if "!=" in part:
+                    k, v = part.split("!=", 1)
+                    filters.append((k.strip(), v.strip(), "!="))
+                elif "=" in part:
+                    k, v = part.split("=", 1)
+                    filters.append((k.strip(), v.strip(), "=="))
+        elif isinstance(filter_spec, dict):
+            filters = [(k, v, "==") for k, v in filter_spec.items()]
+
+        before_count = len(data_rows)
+        for field, value, op in filters:
+            if field in header:
+                idx = header.index(field)
+                if op == "!=":
+                    data_rows = [row for row in data_rows if idx < len(row) and row[idx] != str(value)]
+                else:
+                    data_rows = [row for row in data_rows if idx < len(row) and row[idx] == str(value)]
+        print(f"过滤: {filters} → {before_count} → {len(data_rows)} 行")
+
+    # limit(过滤之后)
+    if limit and len(data_rows) > limit:
+        print(f"限制行数: {len(data_rows)} → {limit}")
+        data_rows = data_rows[:limit]
+
     # 列映射(排序之后)
     header, data_rows = apply_cols_mapping(header, data_rows, cols_spec)
 
@@ -350,12 +384,19 @@ def upload_to_feishu(csv_file, sheet_token, sheet_id=None, sort_spec="dt:desc",
     if feishu_header and feishu_header[0]:
         feishu_cols = [c for c in feishu_header[0] if c]  # 过滤 None 和空字符串
 
+    # 富文本列转纯文本(飞书表头可能含带链接的 list 结构)
+    def _col_to_str(col):
+        if isinstance(col, list):
+            return "".join(item.get("text", "") for item in col if isinstance(item, dict))
+        return col
+
     if feishu_cols:
-        print(f"飞书表头: {feishu_cols}")
+        feishu_cols_str = [_col_to_str(c) for c in feishu_cols]
+        print(f"飞书表头: {feishu_cols_str}")
         print(f"CSV表头: {header}")
 
         # 校验字段一致性(警告但继续,以飞书表头为准)
-        feishu_set = set(feishu_cols)
+        feishu_set = set(feishu_cols_str)
         csv_set = set(header)
 
         missing_in_csv = feishu_set - csv_set
@@ -366,12 +407,12 @@ def upload_to_feishu(csv_file, sheet_token, sheet_id=None, sort_spec="dt:desc",
         if missing_in_feishu:
             print(f"警告: 飞书缺少字段(将忽略): {missing_in_feishu}")
 
-        # 按飞书表头顺序重排数据
+        # 按飞书表头顺序重排数据(用纯文本版本做匹配)
         csv_col_index = {name: i for i, name in enumerate(header)}
         new_converted_rows = []
         for row in converted_rows:
             new_row = []
-            for col_name in feishu_cols:
+            for col_name in feishu_cols_str:
                 if col_name in csv_col_index:
                     new_row.append(row[csv_col_index[col_name]])
                 else:
@@ -398,6 +439,14 @@ def upload_to_feishu(csv_file, sheet_token, sheet_id=None, sort_spec="dt:desc",
     num_cols = len(header)
     end_col = column_index_to_letter(num_cols)
 
+    # 飞书单 sheet 上限 5,000,000 cells,预留表头+模板行
+    CELL_LIMIT = 5_000_000
+    max_data_rows = (CELL_LIMIT // num_cols) - 2
+    if total_rows > max_data_rows:
+        print(f"⚠ 飞书 cell 上限 {CELL_LIMIT:,}({num_cols}列 × {max_data_rows}行),截断 {total_rows} → {max_data_rows} 行")
+        converted_rows = converted_rows[:max_data_rows]
+        total_rows = max_data_rows
+
     print(f"上传到飞书: {total_rows} 行数据")
 
     batch_size = 500
@@ -430,56 +479,46 @@ def upload_to_feishu(csv_file, sheet_token, sheet_id=None, sort_spec="dt:desc",
 
     # 第2步:扩展表格容量(insert 不会自动扩展)
     # 删除后当前只有2行(表头+模板),需要扩展到 2 + total_rows 行
-    needed_rows = 2 + total_rows
     add_url = f"{LARK_HOST}/open-apis/sheets/v2/spreadsheets/{sheet_token}/dimension_range"
-    add_payload = {
-        "dimension": {
-            "sheetId": sheet_id,
-            "majorDimension": "ROWS",
-            "length": total_rows  # 添加数据行数
-        }
-    }
-    try:
-        request("POST", add_url, headers, add_payload)
-        print(f"扩展容量: +{total_rows} 行")
-    except Exception as e:
-        print(f"  扩展容量失败: {e}")
-
-    # 第3步:分批插入空行(继承第2行样式)并写入数据
-    print(f"插入并写入 {total_rows} 行...")
-    insert_url = f"{LARK_HOST}/open-apis/sheets/v2/spreadsheets/{sheet_token}/insert_dimension_range"
-
-    # 反向处理批次(从最后一批开始),因为每次都在第3行前插入
-    batches = [converted_rows[i:i + batch_size] for i in range(0, total_rows, batch_size)]
-    processed = 0
-
-    for batch in reversed(batches):
-        batch_count = len(batch)
-
-        # 在第3行前插入空行(继承第2行样式)
-        insert_payload = {
+    expand_batch = 5000
+    remaining = total_rows
+    expanded = 0
+    while remaining > 0:
+        chunk = min(remaining, expand_batch)
+        add_payload = {
             "dimension": {
                 "sheetId": sheet_id,
                 "majorDimension": "ROWS",
-                "startIndex": 2,  # 0-indexed, 第3行位置
-                "endIndex": 2 + batch_count
-            },
-            "inheritStyle": "BEFORE"
+                "length": chunk
+            }
         }
         try:
-            request("POST", insert_url, headers, insert_payload)
+            request("POST", add_url, headers, add_payload)
+            expanded += chunk
+            remaining -= chunk
         except Exception as e:
-            print(f"  插入行失败: {e}")
+            print(f"  扩展容量失败(已扩展{expanded}): {e}")
             break
+    if expanded > 0:
+        print(f"扩展容量: +{expanded} 行")
 
-        # 写入数据到插入的行(第3行开始,飞书单次最多100列,需按列分批)
+    # 第3步:分批写入数据到扩展的空行(不再 insert,避免 expand+insert 双重加行超 cell 上限)
+    print(f"写入 {total_rows} 行...")
+    batches = [converted_rows[i:i + batch_size] for i in range(0, total_rows, batch_size)]
+    processed = 0
+
+    for i, batch in enumerate(batches):
+        batch_count = len(batch)
+        start_row = 3 + i * batch_size  # 从第3行开始,顺序写入
+
+        # 写入数据(飞书单次最多100列,需按列分批)
         col_batch = 100
         value_ranges = []
         for col_start in range(0, num_cols, col_batch):
             col_end = min(col_start + col_batch, num_cols)
             sc = column_index_to_letter(col_start + 1)
             ec = column_index_to_letter(col_end)
-            col_range = f"{sheet_id}!{sc}3:{ec}{2 + batch_count}"
+            col_range = f"{sheet_id}!{sc}{start_row}:{ec}{start_row + batch_count - 1}"
             col_values = [row[col_start:col_end] for row in batch]
             value_ranges.append({"range": col_range, "values": col_values})
         client.batch_update_values(access_token, sheet_token, {
@@ -489,20 +528,6 @@ def upload_to_feishu(csv_file, sheet_token, sheet_id=None, sort_spec="dt:desc",
         processed += batch_count
         print(f"  处理: {processed}/{total_rows}")
 
-    # 第4步:删除末尾多余的空行(扩展容量时添加的)
-    # 重新查询实际行数,避免因初始行数不同导致计算偏差
-    sheet_props_after = client.get_sheet_properties(access_token, sheet_token, sheet_id)
-    actual_row_count = sheet_props_after['row_count'] if sheet_props_after else 0
-    # 期望行数:初始保留行数 + 插入的数据行数
-    rows_after_cleanup = 2 if current_rows > 2 else current_rows
-    final_row_count = rows_after_cleanup + total_rows
-    if actual_row_count > final_row_count:
-        print(f"清理多余空行({actual_row_count - final_row_count}行)...")
-        try:
-            client.delete_rows(access_token, sheet_token, sheet_id, final_row_count + 1, actual_row_count)
-        except Exception as e:
-            print(f"  清理失败: {e}")
-
     # 第5步:删除模板行(第2行),仅当初始存在模板行时
     if current_rows >= 2:
         print(f"删除模板行...")
@@ -586,6 +611,8 @@ def main():
     parser.add_argument("--sheet-id", type=str, default=None, help="飞书工作表ID")
     parser.add_argument("--sort", type=str, default=None, help="排序: 字段:asc/desc")
     parser.add_argument("--cols", type=str, default=None, help="列映射: 原名:新名,...")
+    parser.add_argument("--filter", type=str, default=None, help="过滤: 字段=值,字段=值")
+    parser.add_argument("--limit", type=int, default=None, help="上传行数上限")
     parser.add_argument("--config", type=str, default="default", help="ODPS配置: default 或 piaoquan_api")
     args = parser.parse_args()
 
@@ -608,6 +635,10 @@ def main():
         args.sort = feishu_config["sort"]
     if args.cols is None:
         args.cols = feishu_config["cols"]
+    if args.filter is None:
+        args.filter = feishu_config["filter"]
+    if args.limit is None:
+        args.limit = feishu_config["limit"]
 
     # 打印飞书配置
     if args.feishu:
@@ -629,7 +660,7 @@ def main():
             merged_file = merge_csv_files(daily_dir)
             # 如果指定了飞书上传
             if args.feishu and merged_file:
-                upload_to_feishu(merged_file, args.feishu, args.sheet_id, args.sort, args.cols)
+                upload_to_feishu(merged_file, args.feishu, args.sheet_id, args.sort, args.cols, args.filter, args.limit)
         else:
             print("没有可合并的数据")
         return
@@ -693,7 +724,7 @@ def main():
             print(f"数据目录: {output_file}")
             # 如果指定了飞书上传
             if args.feishu and output_file.exists():
-                upload_to_feishu(output_file, args.feishu, args.sheet_id, args.sort, args.cols)
+                upload_to_feishu(output_file, args.feishu, args.sheet_id, args.sort, args.cols, args.filter, args.limit)
         except Exception as e:
             print(f"✗ 执行失败: {e}")
         return
@@ -733,7 +764,7 @@ def main():
     if args.feishu:
         merged_file = merge_csv_files(daily_dir)
         if merged_file:
-            upload_to_feishu(merged_file, args.feishu, args.sheet_id, args.sort, args.cols)
+            upload_to_feishu(merged_file, args.feishu, args.sheet_id, args.sort, args.cols, args.filter, args.limit)
 
 
 if __name__ == "__main__":

+ 5 - 5
tasks/00_AB效果/01_推荐AB实时效果.sql

@@ -10,11 +10,11 @@ WITH t_base AS
             --         WHEN apptype IN ("0") AND abcode IN ("ab2","ab3") THEN "对照组"
             --         ELSE "其他"
             -- END AS abcode
-            ,CASE   WHEN apptype IN ("4") AND abcode IN ("ab0","ab1") THEN "实验组-先验地域降权"
-                    WHEN apptype IN ("4") AND abcode IN ("ab6","ab7") THEN "实验组-str+校准&ros-统计量"
-                    WHEN apptype IN ("4") AND abcode IN ("ab8","ab9") THEN "实验组-str+校准&ros损失函数优化"
-                    WHEN apptype IN ("4") AND abcode IN ("ab4","ab5") THEN "实验组-str+校准&ros天级更新"
-                    WHEN apptype IN ("4") AND abcode IN ("ab2","ab3") THEN "对照组"
+            ,CASE   WHEN apptype IN ("4") AND abcode IN ("ab0") THEN "实验组-先验地域降权"
+                    WHEN apptype IN ("4") AND abcode IN ("ab6") THEN "实验组-str+校准&ros-统计量"
+                    WHEN apptype IN ("4") AND abcode IN ("ab8") THEN "实验组-str+校准&ros损失函数优化"
+                    WHEN apptype IN ("4") AND abcode IN ("ab4") THEN "实验组-str+校准&ros天级更新"
+                    WHEN apptype IN ("4") AND abcode IN ("ab2") THEN "对照组"
                     ELSE "其他"
             END AS abcode
             -- ,CASE   WHEN apptype IN ("4") AND abcode IN ("ab0","ab1") THEN "实验组-先验地域降权"

+ 29 - 2
tasks/00_AB效果/05_推荐AB天级效果_对比对照组_含多跳.sql

@@ -1,5 +1,5 @@
--- 推荐AB天级效果 - 含对照组对比 + 多跳B/C指标
--- 基于新表 dwd_recsys_alg_exposure_base_20260206,新增 b/c_1/c_2/c_3/total_bc per-exposure 率指标
+-- 推荐AB天级效果 - 含对照组对比 + 多跳B/C/D指标
+-- 基于新表 dwd_recsys_alg_exposure_base_20260206,新增 b/c_1~c_3/d_1~d_3/total_bc/total_d/total_bcd per-exposure 率指标
 WITH t_base AS
 (
     SELECT  dt
@@ -31,6 +31,9 @@ WITH t_base AS
             ,COALESCE(CAST(c_1 AS BIGINT), 0) AS c_1
             ,COALESCE(CAST(c_2 AS BIGINT), 0) AS c_2
             ,COALESCE(CAST(c_3 AS BIGINT), 0) AS c_3
+            ,COALESCE(CAST(d_1 AS BIGINT), 0) AS d_1
+            ,COALESCE(CAST(d_2 AS BIGINT), 0) AS d_2
+            ,COALESCE(CAST(d_3 AS BIGINT), 0) AS d_3
     FROM    loghubods.dwd_recsys_alg_exposure_base_20260206
     WHERE   dt = '${dt}'
     AND     apptype IN ("4")
@@ -66,6 +69,12 @@ t_metrics AS (
             ,SUM(c_2) / COUNT(1) AS c_2
             ,SUM(c_3) / COUNT(1) AS c_3
             ,SUM(b + c_1 + c_2 + c_3) / COUNT(1) AS total_bc
+            ,SUM(d_1) / COUNT(1) AS d_1
+            ,SUM(d_2) / COUNT(1) AS d_2
+            ,SUM(d_3) / COUNT(1) AS d_3
+            ,SUM(d_1 + d_2 + d_3) / COUNT(1) AS total_d
+            ,SUM(b + c_1 + c_2 + c_3 + d_1 + d_2 + d_3) / COUNT(1) AS total_bcd
+            ,SUM(b + c_1 + c_2 + c_3 + d_1) / COUNT(1) AS total_bcd1
     FROM    t_base
     WHERE   page IN ("推荐")
     GROUP BY dt
@@ -100,6 +109,12 @@ t_control AS (
             ,c_2 AS ctrl_c_2
             ,c_3 AS ctrl_c_3
             ,total_bc AS ctrl_total_bc
+            ,d_1 AS ctrl_d_1
+            ,d_2 AS ctrl_d_2
+            ,d_3 AS ctrl_d_3
+            ,total_d AS ctrl_total_d
+            ,total_bcd AS ctrl_total_bcd
+            ,total_bcd1 AS ctrl_total_bcd1
     FROM    t_metrics
     WHERE   abcode = "对照组"
 )
@@ -131,6 +146,12 @@ SELECT  m.dt
         ,m.c_2
         ,m.c_3
         ,m.total_bc
+        ,m.d_1
+        ,m.d_2
+        ,m.d_3
+        ,m.total_d
+        ,m.total_bcd
+        ,m.total_bcd1
         -- 相对对照组变化率
         ,(m.exp_per_dau - c.ctrl_exp_per_dau) / c.ctrl_exp_per_dau AS exp_per_dau_lift
         ,(m.str_one - c.ctrl_str_one) / c.ctrl_str_one AS str_one_lift
@@ -154,6 +175,12 @@ SELECT  m.dt
         ,(m.c_2 - c.ctrl_c_2) / c.ctrl_c_2 AS c_2_lift
         ,(m.c_3 - c.ctrl_c_3) / c.ctrl_c_3 AS c_3_lift
         ,(m.total_bc - c.ctrl_total_bc) / c.ctrl_total_bc AS total_bc_lift
+        ,(m.d_1 - c.ctrl_d_1) / c.ctrl_d_1 AS d_1_lift
+        ,(m.d_2 - c.ctrl_d_2) / c.ctrl_d_2 AS d_2_lift
+        ,(m.d_3 - c.ctrl_d_3) / c.ctrl_d_3 AS d_3_lift
+        ,(m.total_d - c.ctrl_total_d) / c.ctrl_total_d AS total_d_lift
+        ,(m.total_bcd - c.ctrl_total_bcd) / c.ctrl_total_bcd AS total_bcd_lift
+        ,(m.total_bcd1 - c.ctrl_total_bcd1) / c.ctrl_total_bcd1 AS total_bcd1_lift
 FROM    t_metrics m
 LEFT JOIN t_control c
 ON      m.dt = c.dt

+ 1 - 1
tasks/承接/rosn分析/02_实验组xTop10一级品类_vs对照组.sql

@@ -10,7 +10,7 @@ WITH t_raw AS
                     ELSE "其他"
             END AS page_type
     FROM    loghubods.dwd_recsys_alg_sample_all_20250212
-    WHERE   dt = '${dt}'
+    WHERE   dt LIKE '${dt}%'
     AND     apptype IN ("0","4")
     AND     abcode IN ("ab0","ab1","ab2","ab3","ab4","ab5","ab6","ab7","ab8","ab9")
     AND     abcode NOT IN ("ab100")