Преглед изворни кода

添加数据探索和分析任务

- opengid数据探索:按渠道取样分析字段非空率
- opengid原始分享数据探索:usersharedepth=0 的数据分析
- 视频维度分析:按视频维度统计点击、再分享、回流等指标

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
yangxiaohui пре 2 месеци
родитељ
комит
17014e4507

+ 157 - 0
tasks/opengid原始分享数据探索/analyze.py

@@ -0,0 +1,157 @@
+#!/usr/bin/env python
+# coding=utf-8
+"""
+分析 opengid_base_data 样例数据
+"""
+import pandas as pd
+from pathlib import Path
+
+# 找到最新的输出文件
+task_dir = Path(__file__).parent
+output_dir = task_dir / "output"
+csv_files = list(output_dir.glob("*.csv"))
+if not csv_files:
+    print("没有找到数据文件,请先运行 query.sql")
+    exit(1)
+
+latest_file = max(csv_files, key=lambda x: x.stat().st_mtime)
+df = pd.read_csv(latest_file)
+
+# 输出结果收集
+lines = []
+
+
+def log(text=""):
+    print(text)
+    lines.append(text)
+
+
+log(f"分析文件: {latest_file.name}")
+log()
+
+# 基本信息
+log("=" * 60)
+log("基本信息")
+log("=" * 60)
+log(f"字段数: {len(df.columns)}")
+log(f"记录数: {len(df)}")
+log(f"渠道数: {df['channel'].nunique()}")
+log()
+
+# 渠道列表
+log("=" * 60)
+log("渠道列表")
+log("=" * 60)
+channels = df['channel'].unique()
+for ch in channels:
+    count = len(df[df['channel'] == ch])
+    log(f"  {ch}: {count} 条")
+log()
+
+# 统计每个渠道的有效字段(非空率 > 50%)
+channel_fields = {}
+for channel in channels:
+    ch_df = df[df['channel'] == channel]
+    valid_fields = set()
+    for col in ch_df.columns:
+        if col in ['channel', 'rn', 'dt']:
+            continue
+        non_null_rate = ch_df[col].notna().sum() / len(ch_df)
+        if non_null_rate >= 0.5:  # 非空率 >= 50% 认为是有效字段
+            valid_fields.add(col)
+    channel_fields[channel] = valid_fields
+
+# 公共字段(所有渠道都有的)
+all_channels_fields = list(channel_fields.values())
+common_fields = set.intersection(*all_channels_fields) if all_channels_fields else set()
+
+log("=" * 60)
+log(f"公共字段(所有渠道都有,共 {len(common_fields)} 个)")
+log("=" * 60)
+for field in sorted(common_fields):
+    log(f"  {field}")
+log()
+
+# 每个渠道独有的字段
+log("=" * 60)
+log("各渠道独有字段")
+log("=" * 60)
+for channel in channels:
+    unique_fields = channel_fields[channel] - common_fields
+    # 检查是否真的独有(其他渠道都没有)
+    truly_unique = set()
+    for field in unique_fields:
+        is_unique = True
+        for other_ch in channels:
+            if other_ch != channel and field in channel_fields[other_ch]:
+                is_unique = False
+                break
+        if is_unique:
+            truly_unique.add(field)
+
+    if truly_unique:
+        log(f"\n【{channel}】独有字段 ({len(truly_unique)} 个):")
+        for field in sorted(truly_unique):
+            log(f"  {field}")
+log()
+
+# 每个渠道的完整字段列表
+log("=" * 60)
+log("各渠道完整字段列表(非空率 >= 50%)")
+log("=" * 60)
+
+for channel in channels:
+    ch_df = df[df['channel'] == channel]
+    log(f"\n【{channel}】({len(ch_df)} 条)")
+    log("-" * 50)
+
+    # 统计每个字段
+    field_stats = []
+    for col in ch_df.columns:
+        if col in ['channel', 'rn', 'dt']:
+            continue
+        non_null = ch_df[col].notna().sum()
+        rate = non_null / len(ch_df)
+        field_stats.append((col, rate, non_null))
+
+    # 按非空率排序
+    field_stats.sort(key=lambda x: (-x[1], x[0]))
+
+    # 完整字段
+    full_fields = [(c, r, n) for c, r, n in field_stats if r == 1.0]
+    if full_fields:
+        log(f"\n  完整字段 ({len(full_fields)} 个):")
+        for col, rate, n in full_fields:
+            log(f"    {col}")
+
+    # 部分有值
+    partial_fields = [(c, r, n) for c, r, n in field_stats if 0 < r < 1.0]
+    if partial_fields:
+        log(f"\n  部分有值 ({len(partial_fields)} 个):")
+        for col, rate, n in partial_fields:
+            log(f"    {col}: {rate:.0%} ({n}/{len(ch_df)})")
+
+    # 全空字段
+    empty_fields = [(c, r, n) for c, r, n in field_stats if r == 0]
+    if empty_fields:
+        log(f"\n  全空字段 ({len(empty_fields)} 个):")
+        for col, rate, n in empty_fields:
+            log(f"    {col}")
+
+# 输出推荐取数字段
+log()
+log("=" * 60)
+log("各渠道推荐取数字段(非空率 >= 50%)")
+log("=" * 60)
+for channel in channels:
+    fields = sorted(channel_fields[channel])
+    log(f"\n【{channel}】({len(fields)} 个字段):")
+    log(f"  {', '.join(fields)}")
+
+# 保存分析结果
+result_file = output_dir / f"{latest_file.stem}_分析.txt"
+with open(result_file, 'w', encoding='utf-8') as f:
+    f.write("\n".join(lines))
+
+log()
+log(f"分析结果已保存到: {result_file}")

+ 13 - 0
tasks/opengid原始分享数据探索/query.sql

@@ -0,0 +1,13 @@
+-- opengid_base_data 原始分享数据样例
+-- 每个渠道取 20 条记录,只看 usersharedepth = 0(原始分享)
+
+SELECT  *
+FROM    (
+            SELECT  *
+                    ,ROW_NUMBER() OVER (PARTITION BY channel ORDER BY 点击时间 DESC) AS rn
+            FROM    loghubods.opengid_base_data
+            WHERE   dt = ${start}
+            AND     usersharedepth = 0
+        ) t
+WHERE   rn <= 20
+;

+ 157 - 0
tasks/opengid数据探索/analyze.py

@@ -0,0 +1,157 @@
+#!/usr/bin/env python
+# coding=utf-8
+"""
+分析 opengid_base_data 样例数据
+"""
+import pandas as pd
+from pathlib import Path
+
+# 找到最新的输出文件
+task_dir = Path(__file__).parent
+output_dir = task_dir / "output"
+csv_files = list(output_dir.glob("*.csv"))
+if not csv_files:
+    print("没有找到数据文件,请先运行 query.sql")
+    exit(1)
+
+latest_file = max(csv_files, key=lambda x: x.stat().st_mtime)
+df = pd.read_csv(latest_file)
+
+# 输出结果收集
+lines = []
+
+
+def log(text=""):
+    print(text)
+    lines.append(text)
+
+
+log(f"分析文件: {latest_file.name}")
+log()
+
+# 基本信息
+log("=" * 60)
+log("基本信息")
+log("=" * 60)
+log(f"字段数: {len(df.columns)}")
+log(f"记录数: {len(df)}")
+log(f"渠道数: {df['channel'].nunique()}")
+log()
+
+# 渠道列表
+log("=" * 60)
+log("渠道列表")
+log("=" * 60)
+channels = df['channel'].unique()
+for ch in channels:
+    count = len(df[df['channel'] == ch])
+    log(f"  {ch}: {count} 条")
+log()
+
+# 统计每个渠道的有效字段(非空率 > 50%)
+channel_fields = {}
+for channel in channels:
+    ch_df = df[df['channel'] == channel]
+    valid_fields = set()
+    for col in ch_df.columns:
+        if col in ['channel', 'rn', 'dt']:
+            continue
+        non_null_rate = ch_df[col].notna().sum() / len(ch_df)
+        if non_null_rate >= 0.5:  # 非空率 >= 50% 认为是有效字段
+            valid_fields.add(col)
+    channel_fields[channel] = valid_fields
+
+# 公共字段(所有渠道都有的)
+all_channels_fields = list(channel_fields.values())
+common_fields = set.intersection(*all_channels_fields) if all_channels_fields else set()
+
+log("=" * 60)
+log(f"公共字段(所有渠道都有,共 {len(common_fields)} 个)")
+log("=" * 60)
+for field in sorted(common_fields):
+    log(f"  {field}")
+log()
+
+# 每个渠道独有的字段
+log("=" * 60)
+log("各渠道独有字段")
+log("=" * 60)
+for channel in channels:
+    unique_fields = channel_fields[channel] - common_fields
+    # 检查是否真的独有(其他渠道都没有)
+    truly_unique = set()
+    for field in unique_fields:
+        is_unique = True
+        for other_ch in channels:
+            if other_ch != channel and field in channel_fields[other_ch]:
+                is_unique = False
+                break
+        if is_unique:
+            truly_unique.add(field)
+
+    if truly_unique:
+        log(f"\n【{channel}】独有字段 ({len(truly_unique)} 个):")
+        for field in sorted(truly_unique):
+            log(f"  {field}")
+log()
+
+# 每个渠道的完整字段列表
+log("=" * 60)
+log("各渠道完整字段列表(非空率 >= 50%)")
+log("=" * 60)
+
+for channel in channels:
+    ch_df = df[df['channel'] == channel]
+    log(f"\n【{channel}】({len(ch_df)} 条)")
+    log("-" * 50)
+
+    # 统计每个字段
+    field_stats = []
+    for col in ch_df.columns:
+        if col in ['channel', 'rn', 'dt']:
+            continue
+        non_null = ch_df[col].notna().sum()
+        rate = non_null / len(ch_df)
+        field_stats.append((col, rate, non_null))
+
+    # 按非空率排序
+    field_stats.sort(key=lambda x: (-x[1], x[0]))
+
+    # 完整字段
+    full_fields = [(c, r, n) for c, r, n in field_stats if r == 1.0]
+    if full_fields:
+        log(f"\n  完整字段 ({len(full_fields)} 个):")
+        for col, rate, n in full_fields:
+            log(f"    {col}")
+
+    # 部分有值
+    partial_fields = [(c, r, n) for c, r, n in field_stats if 0 < r < 1.0]
+    if partial_fields:
+        log(f"\n  部分有值 ({len(partial_fields)} 个):")
+        for col, rate, n in partial_fields:
+            log(f"    {col}: {rate:.0%} ({n}/{len(ch_df)})")
+
+    # 全空字段
+    empty_fields = [(c, r, n) for c, r, n in field_stats if r == 0]
+    if empty_fields:
+        log(f"\n  全空字段 ({len(empty_fields)} 个):")
+        for col, rate, n in empty_fields:
+            log(f"    {col}")
+
+# 输出推荐取数字段
+log()
+log("=" * 60)
+log("各渠道推荐取数字段(非空率 >= 50%)")
+log("=" * 60)
+for channel in channels:
+    fields = sorted(channel_fields[channel])
+    log(f"\n【{channel}】({len(fields)} 个字段):")
+    log(f"  {', '.join(fields)}")
+
+# 保存分析结果
+result_file = output_dir / f"{latest_file.stem}_分析.txt"
+with open(result_file, 'w', encoding='utf-8') as f:
+    f.write("\n".join(lines))
+
+log()
+log(f"分析结果已保存到: {result_file}")

+ 12 - 0
tasks/opengid数据探索/query.sql

@@ -0,0 +1,12 @@
+-- opengid_base_data 数据样例
+-- 每个渠道取 5 条记录
+
+SELECT  *
+FROM    (
+            SELECT  *
+                    ,ROW_NUMBER() OVER (PARTITION BY channel ORDER BY 点击时间 DESC) AS rn
+            FROM    loghubods.opengid_base_data
+            WHERE   dt = ${start}
+        ) t
+WHERE   rn <= 20
+;

+ 118 - 0
tasks/视频维度分析/query.sql

@@ -0,0 +1,118 @@
+-- 视频维度分析
+-- 按 dt、channel、videoid 等维度统计点击、再分享、回流等指标
+
+SELECT  dt
+        ,channel
+        ,hotsencetype
+        ,合作方名
+        ,公众号名
+        ,videoid
+        ,title
+        ,推荐状态
+        ,merge一级品类
+        ,merge二级品类
+        ,COUNT(DISTINCT mid) AS 点击uv
+        ,(SUM(CASE WHEN 再分享群聊回流uv > 0 THEN 再分享群聊回流uv ELSE 0 END)
+          + SUM(CASE WHEN 再分享单聊回流uv > 0 THEN 再分享单聊回流uv ELSE 0 END)
+         ) / (COUNT(DISTINCT mid) + 10) AS 再分享回流率
+        ,(SUM(CASE WHEN 是否原视频 = '是' THEN 再分享群聊回流uv END)
+          + SUM(CASE WHEN 是否原视频 = '是' THEN 再分享单聊回流uv END)
+         ) / (COUNT(DISTINCT mid) + 10) AS 原视频再分享回流率
+        ,COUNT(DISTINCT CASE WHEN 是否进入推荐 = '1' THEN mid END) AS 进入推荐人数
+        ,COUNT(DISTINCT CASE WHEN 是否进入推荐 = '1' THEN mid END) / COUNT(DISTINCT mid) AS 进入推荐率
+        ,(SUM(CASE WHEN 是否原视频 = '否' THEN 再分享群聊回流uv END)
+          + SUM(CASE WHEN 是否原视频 = '否' THEN 再分享单聊回流uv END)
+         ) / (COUNT(DISTINCT mid) + 10) AS 推荐再分享回流率
+        ,((SUM(CASE WHEN 是否原视频 = '是' THEN 再分享群聊回流uv END)
+           + SUM(CASE WHEN 是否原视频 = '是' THEN 再分享单聊回流uv END)
+          ) / (COUNT(DISTINCT mid) + 10)
+         ) / ((SUM(CASE WHEN 是否原视频 = '否' THEN 再分享群聊回流uv END)
+               + SUM(CASE WHEN 是否原视频 = '否' THEN 再分享单聊回流uv END)
+              ) / (COUNT(DISTINCT mid) + 10)
+         ) AS 原视频质量
+        ,AVG(粉丝数) AS 粉丝量avg
+        ,AVG(新增粉丝数) AS 新增粉丝数avg
+        ,AVG(阅读量) AS 阅读量avg
+        ,100 * COUNT(DISTINCT mid) / (AVG(粉丝数) + 1) AS 阅读打开率
+        ,100 * AVG(阅读量) / (AVG(粉丝数) + 1) AS 阅读率
+        ,100 * COUNT(DISTINCT mid) / (AVG(阅读量) + 1) AS 打开率
+        ,SUM(CASE WHEN 再分享群聊回流uv > 0 THEN 再分享群聊回流uv ELSE 0 END)
+         + SUM(CASE WHEN 再分享单聊回流uv > 0 THEN 再分享单聊回流uv ELSE 0 END) AS 再分享回流uv
+        ,COUNT(DISTINCT wx_sn) AS 阅读文章数
+        ,COUNT(DISTINCT videoid) AS 点击视频数
+        ,COUNT(DISTINCT 再分享videoid) AS 再分享视频数
+        ,COUNT(DISTINCT opengid) - 1 AS 点击群数量
+        ,COUNT(DISTINCT mid) / COUNT(DISTINCT shareid) AS 单分享卡片访问uv
+        ,1000 * (COUNT(DISTINCT CASE WHEN 群类型 = '群聊' THEN mid END) + 1)
+         / (COUNT(DISTINCT CASE WHEN 群类型 = '群聊' THEN opengid END) * 30 + 500)
+         * (SUM(CASE WHEN 再分享群聊回流uv > 0 THEN 再分享群聊回流uv ELSE 0 END)
+            + SUM(CASE WHEN 再分享单聊回流uv > 0 THEN 再分享单聊回流uv ELSE 0 END) + 1
+           ) / (COUNT(DISTINCT mid) + 500) AS 群pushscore_ctr_ROR
+        ,(COUNT(DISTINCT CASE WHEN 群类型 = '群聊' THEN mid END) + 1)
+         / (COUNT(DISTINCT CASE WHEN 群类型 = '群聊' THEN opengid END) * 30 + 100) AS 群ctr
+        ,COUNT(DISTINCT 再分享shareid) / COUNT(DISTINCT mid) AS 再分享率
+        ,(SUM(CASE WHEN 再分享群聊回流uv > 0 THEN 再分享群聊回流uv ELSE 0 END)
+          + SUM(CASE WHEN 再分享单聊回流uv > 0 THEN 再分享单聊回流uv ELSE 0 END)
+         ) / COUNT(DISTINCT 再分享shareid) AS 再分享ros
+        ,SUM(CASE WHEN 再分享群聊回流uv > 0 THEN 再分享群聊回流uv ELSE 0 END)
+         / (SUM(CASE WHEN 再分享群聊回流uv > 0 THEN 再分享群聊回流uv ELSE 0 END)
+            + SUM(CASE WHEN 再分享单聊回流uv > 0 THEN 再分享单聊回流uv ELSE 0 END)
+           ) AS 再分享群聊占比
+        ,COUNT(DISTINCT CASE WHEN 是否原视频 = '是' THEN 再分享shareid END) / COUNT(DISTINCT mid) AS 原视频再分享率
+        ,(SUM(CASE WHEN 是否原视频 = '是' THEN 再分享群聊回流uv END)
+          + SUM(CASE WHEN 是否原视频 = '是' THEN 再分享单聊回流uv END)
+         ) / COUNT(DISTINCT CASE WHEN 是否原视频 = '是' THEN 再分享shareid END) AS 原视频再分享ros
+        ,SUM(CASE WHEN 是否原视频 = '是' THEN 再分享群聊回流uv END)
+         / (SUM(CASE WHEN 是否原视频 = '是' THEN 再分享单聊回流uv END)
+            + SUM(CASE WHEN 是否原视频 = '是' THEN 再分享群聊回流uv END)
+           ) AS 原视频再分享群聊占比
+        ,COUNT(DISTINCT CASE WHEN 是否原视频 = '否' THEN 再分享shareid END) / COUNT(DISTINCT mid) AS 推荐再分享率
+        ,(SUM(CASE WHEN 是否原视频 = '否' THEN 再分享群聊回流uv END)
+          + SUM(CASE WHEN 是否原视频 = '否' THEN 再分享单聊回流uv END)
+         ) / COUNT(DISTINCT CASE WHEN 是否原视频 = '是' THEN 再分享shareid END) AS 推荐再分享ros
+        ,SUM(CASE WHEN 是否原视频 = '否' THEN 再分享群聊回流uv END)
+         / (SUM(CASE WHEN 是否原视频 = '否' THEN 再分享单聊回流uv END)
+            + SUM(CASE WHEN 是否原视频 = '否' THEN 再分享群聊回流uv END)
+           ) AS 推荐再分享群聊占比
+        ,COUNT(DISTINCT 再分享shareid) AS 再分享次数
+        ,SUM(CASE WHEN 再分享群聊回流uv > 0 THEN 再分享群聊回流uv END) AS 再分享群聊回流
+        ,SUM(CASE WHEN 再分享单聊回流uv > 0 THEN 再分享单聊回流uv END) AS 再分享单聊回流
+        ,COUNT(DISTINCT CASE WHEN 再分享点击场景 = '1008' THEN 再分享点击id END) AS 再分享回流群数
+        ,COUNT(DISTINCT CASE WHEN 是否原视频 = '是' THEN 再分享shareid END) AS 再分享原视频次数
+        ,SUM(CASE WHEN 是否原视频 = '是' THEN 再分享群聊回流uv END) AS 再分享原视频群聊回流
+        ,SUM(CASE WHEN 是否原视频 = '是' THEN 再分享单聊回流uv END) AS 再分享原视频单聊回流
+        ,COUNT(DISTINCT CASE WHEN 是否原视频 = '否' THEN 再分享shareid END) AS 再分享推荐视频次数
+        ,SUM(CASE WHEN 是否原视频 = '否' THEN 再分享群聊回流uv END) AS 再分享推荐群聊回流
+        ,SUM(CASE WHEN 是否原视频 = '否' THEN 再分享单聊回流uv END) AS 再分享推荐单聊回流
+        ,COUNT(DISTINCT 再分享merge二级品类) AS 再分享二级品类数
+        ,COUNT(DISTINCT shareid) AS 来源分享卡片数
+        ,COUNT(DISTINCT 分享者mid) AS 来源分享uv
+        ,COUNT(DISTINCT click_province) AS 点击地域省数
+        ,COUNT(DISTINCT click_city) AS 点击地域市数
+        ,SUM(群历史人数) AS 群历史总人数
+        ,AVG(群当日活跃人数) AS 平均群当日活跃人数
+        ,AVG(群历史人数) AS 平均群历史人数
+        ,AVG(opengid_cnt) AS 平均人历史所在群数
+        ,COUNT(DISTINCT mid) / COUNT(DISTINCT videoid) AS 单视频访问uv
+        ,COUNT(DISTINCT mid) / COUNT(DISTINCT opengid) AS 单群访问uv
+        ,COUNT(DISTINCT mid) / COUNT(DISTINCT 分享者mid) AS 单分享者访问uv
+        ,COUNT(DISTINCT rootsourceid) AS 点击rootsourceid数
+        ,COUNT(DISTINCT channel) AS 点击channel数
+FROM    loghubods.opengid_base_data
+WHERE   dt >= ${start}
+AND     dt <= ${end}
+AND     usersharedepth = 0
+AND     videoid IS NOT NULL
+GROUP BY dt
+         ,channel
+         ,hotsencetype
+         ,合作方名
+         ,公众号名
+         ,videoid
+         ,title
+         ,推荐状态
+         ,merge一级品类
+         ,merge二级品类
+ORDER BY 点击uv DESC
+LIMIT   5000
+;