Преглед изворни кода

feat: 新增三个分析任务

- 素材字段分析: 按天分析各渠道素材字段填充率
- 渠道用户量统计: 按用户数统计各渠道及素材覆盖
- 公众号投流素材缺失排查: 排查文章标题缺失原因

统一使用 ${start}/${end} 变量

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
yangxiaohui пре 2 месеци
родитељ
комит
aeb7bcd05c

+ 116 - 0
tasks/公众号投流素材缺失排查/analyze.py

@@ -0,0 +1,116 @@
+#!/usr/bin/env python
+# coding=utf-8
+"""
+公众号投流-稳定 素材缺失排查
+分析为什么只有部分记录有文章标题
+"""
+import pandas as pd
+from pathlib import Path
+
+task_dir = Path(__file__).parent
+output_dir = task_dir / "output"
+
+csv_files = [f for f in output_dir.glob("*.csv") if '_分析' not in f.name]
+if not csv_files:
+    print("没有找到数据文件,请先运行 query.sql")
+    exit(1)
+
+latest_file = max(csv_files, key=lambda x: x.stat().st_mtime)
+df = pd.read_csv(latest_file)
+
+lines = []
+
+
+def log(text=""):
+    print(text)
+    lines.append(text)
+
+
+log(f"分析文件: {latest_file.name}")
+log()
+
+# 汇总统计
+has_title = df[df['文章标题状态'] == '有文章标题']
+no_title = df[df['文章标题状态'] == '无文章标题']
+
+total_has = has_title['记录数'].sum()
+total_no = no_title['记录数'].sum()
+total = total_has + total_no
+
+log("=" * 70)
+log("一、整体情况")
+log("=" * 70)
+log()
+log(f"总记录数: {int(total):,}")
+log(f"有文章标题: {int(total_has):,} ({total_has/total:.1%})")
+log(f"无文章标题: {int(total_no):,} ({total_no/total:.1%})")
+log()
+
+# 分析无文章标题的特征
+log("=" * 70)
+log("二、无文章标题记录的特征")
+log("=" * 70)
+log()
+
+no_title_wx_sn = no_title['有wx_sn'].sum()
+no_title_contenturl = no_title['有contenturl'].sum()
+no_title_rootsourceid = no_title['有rootsourceid'].sum()
+
+log(f"无文章标题记录中:")
+log(f"  有 wx_sn: {int(no_title_wx_sn):,} ({no_title_wx_sn/total_no:.1%})")
+log(f"  有 contenturl: {int(no_title_contenturl):,} ({no_title_contenturl/total_no:.1%})")
+log(f"  有 rootsourceid: {int(no_title_rootsourceid):,} ({no_title_rootsourceid/total_no:.1%})")
+log()
+log("结论:无文章标题的记录有 rootsourceid,但缺少 wx_sn 和 contenturl")
+log()
+
+# 按公众号对比
+log("=" * 70)
+log("三、同一公众号的有/无文章标题对比")
+log("=" * 70)
+log()
+
+# 合并同一公众号的数据
+gzh_has = has_title.groupby('公众号名')['记录数'].sum().reset_index()
+gzh_has.columns = ['公众号名', '有标题数']
+gzh_no = no_title.groupby('公众号名')['记录数'].sum().reset_index()
+gzh_no.columns = ['公众号名', '无标题数']
+
+gzh_compare = gzh_no.merge(gzh_has, on='公众号名', how='outer').fillna(0)
+gzh_compare['总数'] = gzh_compare['有标题数'] + gzh_compare['无标题数']
+gzh_compare['缺失率'] = gzh_compare['无标题数'] / gzh_compare['总数']
+gzh_compare = gzh_compare.sort_values('总数', ascending=False)
+
+header = f"{'公众号':<20} {'总数':>10} {'有标题':>10} {'无标题':>10} {'缺失率':>8}"
+log(header)
+log("-" * 70)
+
+for _, row in gzh_compare.head(20).iterrows():
+    gzh = row['公众号名'] if pd.notna(row['公众号名']) else '(空)'
+    log(f"{gzh:<20} {int(row['总数']):>10,} {int(row['有标题数']):>10,} {int(row['无标题数']):>10,} {row['缺失率']:>8.0%}")
+
+log()
+
+# 结论
+log("=" * 70)
+log("四、结论")
+log("=" * 70)
+log()
+log("1. 同一公众号的流量分为两种:")
+log("   - 有完整长文信息(wx_sn, contenturl, 文章标题)")
+log("   - 只有 rootsourceid,无长文信息")
+log()
+log("2. 缺失原因推测:")
+log("   - 可能是不同的投放方式(直接投放 vs 长文投放)")
+log("   - 或数据采集链路不同导致字段缺失")
+log()
+log("3. 建议:")
+log("   - 排查 rootsourceid 的生成规则")
+log("   - 确认是否存在非长文的投放方式")
+log()
+
+# 保存
+result_file = output_dir / f"{latest_file.stem}_分析.txt"
+with open(result_file, 'w', encoding='utf-8') as f:
+    f.write("\n".join(lines))
+log(f"结果已保存: {result_file}")

+ 20 - 0
tasks/公众号投流素材缺失排查/query.sql

@@ -0,0 +1,20 @@
+-- 公众号投流-稳定 素材缺失排查
+-- 分析为什么只有 33% 的记录有文章标题
+
+SELECT  CASE WHEN 文章标题 IS NOT NULL AND 文章标题 != '' THEN '有文章标题' ELSE '无文章标题' END AS 文章标题状态
+        ,公众号名
+        ,COUNT(*) AS 记录数
+        ,SUM(CASE WHEN wx_sn IS NOT NULL AND wx_sn != '' THEN 1 ELSE 0 END) AS 有wx_sn
+        ,SUM(CASE WHEN contenturl IS NOT NULL AND contenturl != '' THEN 1 ELSE 0 END) AS 有contenturl
+        ,SUM(CASE WHEN rootsourceid IS NOT NULL AND rootsourceid != '' THEN 1 ELSE 0 END) AS 有rootsourceid
+        ,SUM(CASE WHEN 分享标题 IS NOT NULL AND 分享标题 != '' THEN 1 ELSE 0 END) AS 有分享标题
+FROM    loghubods.opengid_base_data
+WHERE   dt >= ${start}
+AND     dt <= ${end}
+AND     channel = '公众号投流-稳定'
+AND     usersharedepth = 0
+AND     videoid IS NOT NULL
+GROUP BY CASE WHEN 文章标题 IS NOT NULL AND 文章标题 != '' THEN '有文章标题' ELSE '无文章标题' END
+         ,公众号名
+ORDER BY 文章标题状态, 记录数 DESC
+;

+ 67 - 0
tasks/渠道用户量统计/analyze.py

@@ -0,0 +1,67 @@
+#!/usr/bin/env python
+# coding=utf-8
+"""
+渠道用户量统计
+按人数(UV)统计各渠道及素材字段覆盖情况
+"""
+import pandas as pd
+from pathlib import Path
+
+task_dir = Path(__file__).parent
+output_dir = task_dir / "output"
+
+csv_files = [f for f in output_dir.glob("*.csv") if '_分析' not in f.name]
+if not csv_files:
+    print("没有找到数据文件,请先运行 query.sql")
+    exit(1)
+
+latest_file = max(csv_files, key=lambda x: x.stat().st_mtime)
+df = pd.read_csv(latest_file)
+
+lines = []
+
+
+def log(text=""):
+    print(text)
+    lines.append(text)
+
+
+log(f"分析文件: {latest_file.name}")
+log()
+
+log("=" * 80)
+log("各渠道用户量及素材覆盖")
+log("=" * 80)
+log()
+
+header = f"{'渠道':<25} {'用户数':>10} {'人均点击':>8} {'文章标题':>10} {'卡片标题':>10}"
+log(header)
+log("-" * 80)
+
+for _, row in df.iterrows():
+    ch = row['channel']
+    uv = int(row['用户数'])
+    pv = int(row['点击数'])
+    avg = pv / uv if uv > 0 else 0
+    art = row['有文章标题用户'] / uv if uv > 0 else 0
+    card = row['有卡片标题用户'] / uv if uv > 0 else 0
+    log(f"{ch:<25} {uv:>10,} {avg:>8.1f} {art:>10.0%} {card:>10.0%}")
+
+log()
+
+# 汇总
+total_uv = df['用户数'].sum()
+total_pv = df['点击数'].sum()
+log("=" * 80)
+log("汇总")
+log("=" * 80)
+log(f"总用户数: {int(total_uv):,}")
+log(f"总点击数: {int(total_pv):,}")
+log(f"人均点击: {total_pv/total_uv:.1f}")
+log()
+
+# 保存
+result_file = output_dir / f"{latest_file.stem}_分析.txt"
+with open(result_file, 'w', encoding='utf-8') as f:
+    f.write("\n".join(lines))
+log(f"结果已保存: {result_file}")

+ 17 - 0
tasks/渠道用户量统计/query.sql

@@ -0,0 +1,17 @@
+-- 渠道用户量统计
+-- 按人数(UV)统计各渠道及素材字段覆盖情况
+
+SELECT  channel
+        ,COUNT(DISTINCT mid) AS 用户数
+        ,COUNT(*) AS 点击数
+        ,COUNT(DISTINCT CASE WHEN 文章标题 IS NOT NULL AND 文章标题 != '' THEN mid END) AS 有文章标题用户
+        ,COUNT(DISTINCT CASE WHEN 分享标题 IS NOT NULL AND 分享标题 != '' THEN mid END) AS 有卡片标题用户
+        ,COUNT(DISTINCT CASE WHEN 分享封面 IS NOT NULL AND 分享封面 != '' THEN mid END) AS 有卡片封面用户
+FROM    loghubods.opengid_base_data
+WHERE   dt >= ${start}
+AND     dt <= ${end}
+AND     usersharedepth = 0
+AND     videoid IS NOT NULL
+GROUP BY channel
+ORDER BY 用户数 DESC
+;

+ 150 - 0
tasks/素材字段分析/analyze.py

@@ -0,0 +1,150 @@
+#!/usr/bin/env python
+# coding=utf-8
+"""
+素材字段分析
+分析各渠道的素材字段填充情况(按天)
+素材字段:文章标题、卡片标题、卡片封面
+"""
+import pandas as pd
+from pathlib import Path
+
+task_dir = Path(__file__).parent
+output_dir = task_dir / "output"
+
+csv_files = [f for f in output_dir.glob("*.csv") if '_分析' not in f.name]
+if not csv_files:
+    print("没有找到数据文件,请先运行 query.sql")
+    exit(1)
+
+latest_file = max(csv_files, key=lambda x: x.stat().st_mtime)
+df = pd.read_csv(latest_file)
+
+lines = []
+
+
+def log(text=""):
+    print(text)
+    lines.append(text)
+
+
+log(f"分析文件: {latest_file.name}")
+log(f"日期范围: {df['dt'].min()} ~ {df['dt'].max()}")
+log()
+
+# ============================================================
+# 一、按天汇总
+# ============================================================
+log("=" * 70)
+log("一、每日素材字段填充率")
+log("=" * 70)
+log()
+
+daily = df.groupby('dt').agg({
+    '记录数': 'sum',
+    '有文章标题': 'sum',
+    '有卡片标题': 'sum',
+    '有卡片封面': 'sum'
+}).reset_index()
+
+header = f"{'日期':<12} {'记录数':>12} {'文章标题':>10} {'卡片标题':>10} {'卡片封面':>10}"
+log(header)
+log("-" * 70)
+
+for _, row in daily.iterrows():
+    n = row['记录数']
+    art = row['有文章标题'] / n if n > 0 else 0
+    card = row['有卡片标题'] / n if n > 0 else 0
+    cover = row['有卡片封面'] / n if n > 0 else 0
+    log(f"{row['dt']:<12} {int(n):>12,} {art:>10.1%} {card:>10.1%} {cover:>10.1%}")
+
+log()
+
+# ============================================================
+# 二、各渠道每日填充率
+# ============================================================
+log("=" * 70)
+log("二、各渠道每日文章标题填充率")
+log("=" * 70)
+log()
+
+# 只看有文章标题的渠道
+channels_with_article = df.groupby('channel').agg({
+    '记录数': 'sum',
+    '有文章标题': 'sum'
+})
+channels_with_article['填充率'] = channels_with_article['有文章标题'] / channels_with_article['记录数']
+channels_with_article = channels_with_article[channels_with_article['填充率'] >= 0.1].index.tolist()
+
+if channels_with_article:
+    dates = sorted(df['dt'].unique())
+    header = f"{'渠道':<25}" + "".join([f"{str(d)[-4:]:>8}" for d in dates])
+    log(header)
+    log("-" * 70)
+
+    for ch in channels_with_article:
+        ch_data = df[df['channel'] == ch].set_index('dt')
+        row_str = f"{ch:<25}"
+        for d in dates:
+            if d in ch_data.index:
+                r = ch_data.loc[d]
+                rate = r['有文章标题'] / r['记录数'] if r['记录数'] > 0 else 0
+                row_str += f"{rate:>8.0%}"
+            else:
+                row_str += f"{'--':>8}"
+        log(row_str)
+    log()
+
+# ============================================================
+# 三、各渠道每日卡片标题填充率
+# ============================================================
+log("=" * 70)
+log("三、各渠道每日卡片标题填充率")
+log("=" * 70)
+log()
+
+channels_with_card = df.groupby('channel').agg({
+    '记录数': 'sum',
+    '有卡片标题': 'sum'
+})
+channels_with_card['填充率'] = channels_with_card['有卡片标题'] / channels_with_card['记录数']
+channels_with_card = channels_with_card[channels_with_card['填充率'] >= 0.1].index.tolist()
+
+if channels_with_card:
+    dates = sorted(df['dt'].unique())
+    header = f"{'渠道':<25}" + "".join([f"{str(d)[-4:]:>8}" for d in dates])
+    log(header)
+    log("-" * 70)
+
+    for ch in channels_with_card:
+        ch_data = df[df['channel'] == ch].set_index('dt')
+        row_str = f"{ch:<25}"
+        for d in dates:
+            if d in ch_data.index:
+                r = ch_data.loc[d]
+                rate = r['有卡片标题'] / r['记录数'] if r['记录数'] > 0 else 0
+                row_str += f"{rate:>8.0%}"
+            else:
+                row_str += f"{'--':>8}"
+        log(row_str)
+    log()
+
+# ============================================================
+# 四、汇总
+# ============================================================
+log("=" * 70)
+log("四、整体汇总")
+log("=" * 70)
+log()
+
+total = df['记录数'].sum()
+log(f"总记录数: {int(total):,}")
+log(f"有文章标题: {int(df['有文章标题'].sum()):,} ({df['有文章标题'].sum()/total:.1%})")
+log(f"有卡片标题: {int(df['有卡片标题'].sum()):,} ({df['有卡片标题'].sum()/total:.1%})")
+log(f"有卡片封面: {int(df['有卡片封面'].sum()):,} ({df['有卡片封面'].sum()/total:.1%})")
+log()
+
+# 保存
+result_file = output_dir / f"{latest_file.stem}_分析.txt"
+with open(result_file, 'w', encoding='utf-8') as f:
+    f.write("\n".join(lines))
+log(f"结果已保存: {result_file}")

+ 18 - 0
tasks/素材字段分析/query.sql

@@ -0,0 +1,18 @@
+-- 素材字段分析
+-- 分析各渠道的素材字段填充情况
+-- 素材字段:文章标题、分享标题(卡片标题)、分享封面(卡片封面)
+
+SELECT  dt
+        ,channel
+        ,COUNT(*) AS 记录数
+        ,SUM(CASE WHEN 文章标题 IS NOT NULL AND 文章标题 != '' THEN 1 ELSE 0 END) AS 有文章标题
+        ,SUM(CASE WHEN 分享标题 IS NOT NULL AND 分享标题 != '' THEN 1 ELSE 0 END) AS 有卡片标题
+        ,SUM(CASE WHEN 分享封面 IS NOT NULL AND 分享封面 != '' THEN 1 ELSE 0 END) AS 有卡片封面
+FROM    loghubods.opengid_base_data
+WHERE   dt >= ${start}
+AND     dt <= ${end}
+AND     usersharedepth = 0
+AND     videoid IS NOT NULL
+GROUP BY dt, channel
+ORDER BY dt, 记录数 DESC
+;