Ver código fonte

初始化 ODPS 数据分析工具

- 封装 run_sql.py 入口脚本,支持 SQL 文件执行和变量替换
- 支持 --start/--end 日期分区参数,默认最近 7 天
- 按任务目录组织:tasks/任务名/query.sql + output/
- 添加 CLAUDE.md 项目约定和 README.md 使用说明

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
yangxiaohui 2 meses atrás
commit
38e7c59c69
7 arquivos alterados com 528 adições e 0 exclusões
  1. 3 0
      .gitignore
  2. 34 0
      CLAUDE.md
  3. 41 0
      README.md
  4. 1 0
      lib/__init__.py
  5. 31 0
      lib/odps_module.py
  6. 151 0
      run_sql.py
  7. 267 0
      tasks/渠道再分享回流/query.sql

+ 3 - 0
.gitignore

@@ -0,0 +1,3 @@
+__pycache__/
+*.pyc
+output/

+ 34 - 0
CLAUDE.md

@@ -0,0 +1,34 @@
+# 项目约定
+
+## 目录结构
+
+```
+data_analysis/
+├── lib/                    # 核心库
+│   └── odps_module.py      # ODPS 客户端
+├── tasks/                  # 分析任务(按任务名建目录)
+│   └── 任务名/
+│       ├── query.sql       # SQL 文件
+│       └── output/         # 结果目录
+│           └── start_end.csv
+└── run_sql.py              # 入口脚本
+```
+
+## 命名规范
+
+- 任务目录:中文表意,如 `渠道再分享回流/`
+- SQL 文件:统一用 `query.sql`
+- 输出目录:统一用 `output/`
+- 结果文件:`{start}_{end}.csv`,如 `20251229_20260104.csv`
+
+## SQL 变量
+
+- `${start}` - dt 分区起始日期
+- `${end}` - dt 分区结束日期
+- 默认值:最近 7 天(T-7 ~ T-1)
+
+## 新建任务
+
+1. 创建目录 `tasks/任务名/`
+2. 编写 `query.sql`,日期用 `${start}` 和 `${end}`
+3. 运行 `python run_sql.py tasks/任务名/query.sql`

+ 41 - 0
README.md

@@ -0,0 +1,41 @@
+# ODPS 数据分析工具
+
+从阿里云 ODPS (MaxCompute) 执行 SQL 并导出结果到 CSV。
+
+## 安装依赖
+
+```bash
+pip install pyodps
+```
+
+## 使用方法
+
+```bash
+# 基本用法(默认最近 7 天)
+python run_sql.py tasks/渠道再分享回流/query.sql
+
+# 指定日期范围
+python run_sql.py tasks/渠道再分享回流/query.sql --start 20251222 --end 20260103
+
+# 预览 SQL(不执行)
+python run_sql.py tasks/渠道再分享回流/query.sql --dry-run
+
+# 额外变量
+python run_sql.py tasks/xxx/query.sql --vars apptype=36
+```
+
+## 目录结构
+
+```
+tasks/
+└── 渠道再分享回流/          # 任务目录(中文表意)
+    ├── query.sql           # SQL 文件
+    └── output/             # 结果自动保存到这里
+        └── 20251229_20260104.csv
+```
+
+## 新建分析任务
+
+1. 创建目录:`mkdir -p tasks/新任务名`
+2. 编写 SQL,日期分区用 `${start}` 和 `${end}` 占位
+3. 运行查询

+ 1 - 0
lib/__init__.py

@@ -0,0 +1 @@
+# lib 模块

+ 31 - 0
lib/odps_module.py

@@ -0,0 +1,31 @@
+#!/usr/bin/env python
+# coding=utf-8
+
+from odps import ODPS
+
+
+class ODPSClient(object):
+    def __init__(self, project="loghubods"):
+        self.accessId = "LTAIWYUujJAm7CbH"
+        self.accessSecret = "RfSjdiWwED1sGFlsjXv0DlfTnZTG1P"
+        self.endpoint = "http://service.odps.aliyun.com/api"
+        self.tunnelUrl = "http://dt.cn-hangzhou.maxcompute.aliyun-inc.com"
+
+        self.odps = ODPS(
+            self.accessId,
+            self.accessSecret,
+            project,
+            self.endpoint
+        )
+
+    def execute_sql(self, sql: str):
+        hints = {
+            'odps.sql.submit.mode': 'script'
+        }
+        with self.odps.execute_sql(sql, hints=hints).open_reader(tunnel=True) as reader:
+            pd_df = reader.to_pandas()
+        return pd_df
+
+    def execute_sql_result_save_file(self, sql: str, output_file: str):
+        data_df = self.execute_sql(sql)
+        data_df.to_csv(output_file, index=False)

+ 151 - 0
run_sql.py

@@ -0,0 +1,151 @@
+#!/usr/bin/env python
+# coding=utf-8
+"""
+SQL 执行工具 - 输入 SQL 文件,输出查询结果到同目录下的 CSV
+
+使用示例:
+    python run_sql.py tasks/渠道效果分析/渠道再分享回流.sql
+    python run_sql.py tasks/渠道效果分析/渠道再分享回流.sql --start 20251222 --end 20260103
+"""
+import argparse
+from datetime import datetime, timedelta
+from pathlib import Path
+
+from lib.odps_module import ODPSClient
+
+
+def get_default_dates():
+    """获取默认日期范围:最近 7 天(start=7天前, end=昨天)"""
+    today = datetime.now()
+    end_date = today - timedelta(days=1)
+    start_date = today - timedelta(days=7)
+    return start_date.strftime('%Y%m%d'), end_date.strftime('%Y%m%d')
+
+
+def parse_variables(var_list: list) -> dict:
+    """解析变量列表为字典"""
+    if not var_list:
+        return {}
+    variables = {}
+    for item in var_list:
+        if '=' in item:
+            key, value = item.split('=', 1)
+            variables[key.strip()] = value.strip()
+    return variables
+
+
+def replace_variables(sql: str, variables: dict) -> str:
+    """替换 SQL 中的 ${variable} 占位符"""
+    for key, value in variables.items():
+        sql = sql.replace(f'${{{key}}}', value)
+    return sql
+
+
+def run_sql(sql_file: str, output_file: str = None, variables: dict = None,
+            start: str = None, end: str = None, dry_run: bool = False):
+    """
+    执行 SQL 文件并保存结果
+
+    Args:
+        sql_file: SQL 文件路径
+        output_file: 输出文件路径(默认与 SQL 同目录同名)
+        variables: 变量替换字典
+        start: dt 分区起始日期
+        end: dt 分区结束日期
+        dry_run: 仅打印 SQL,不执行
+    """
+    sql_path = Path(sql_file)
+
+    # 合并 start/end 到 variables
+    if variables is None:
+        variables = {}
+    if start:
+        variables['start'] = start
+    if end:
+        variables['end'] = end
+
+    # 输出目录:SQL 同目录下的 output/;文件名:日期.csv
+    if output_file is None:
+        output_dir = sql_path.parent / "output"
+        output_dir.mkdir(exist_ok=True)
+        if start and end:
+            output_file = output_dir / f"{start}_{end}.csv"
+        elif start:
+            output_file = output_dir / f"{start}.csv"
+        else:
+            output_file = output_dir / "result.csv"
+    else:
+        output_file = Path(output_file)
+
+    # 读取 SQL
+    with open(sql_path, 'r', encoding='utf-8') as f:
+        sql = f.read()
+
+    # 变量替换
+    if variables:
+        sql = replace_variables(sql, variables)
+
+    # Dry run 模式
+    if dry_run:
+        print("=" * 50)
+        print("SQL 预览 (dry-run 模式)")
+        print("=" * 50)
+        print(sql)
+        print("=" * 50)
+        print(f"输出文件: {output_file}")
+        return
+
+    # 执行 SQL
+    print(f"[{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}] 开始执行: {sql_path.name}")
+
+    odps_client = ODPSClient()
+    odps_client.execute_sql_result_save_file(sql, str(output_file))
+
+    print(f"[{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}] 完成,结果保存至: {output_file}")
+
+
+def main():
+    parser = argparse.ArgumentParser(
+        description='执行 SQL 文件并输出结果',
+        formatter_class=argparse.RawDescriptionHelpFormatter,
+        epilog="""
+示例:
+  python run_sql.py tasks/渠道效果分析/渠道再分享回流.sql
+  python run_sql.py tasks/渠道效果分析/渠道再分享回流.sql --start 20251222 --end 20260103
+  python run_sql.py tasks/渠道效果分析/渠道再分享回流.sql --dry-run
+        """
+    )
+    parser.add_argument('sql_file', type=str, help='SQL 文件路径')
+    parser.add_argument('--start', type=str, help='dt 分区起始日期,替换 ${start}')
+    parser.add_argument('--end', type=str, help='dt 分区结束日期,替换 ${end}')
+    parser.add_argument('-o', '--output', type=str, help='自定义输出路径')
+    parser.add_argument('--vars', nargs='*', metavar='KEY=VALUE', help='额外变量,如: apptype=36')
+    parser.add_argument('--dry-run', action='store_true', help='仅打印 SQL,不执行')
+
+    args = parser.parse_args()
+
+    # 解析变量
+    variables = parse_variables(args.vars)
+
+    # 默认日期
+    start = args.start
+    end = args.end
+    if start is None or end is None:
+        default_start, default_end = get_default_dates()
+        start = start or default_start
+        end = end or default_end
+        print(f"使用默认日期范围: {start} ~ {end}")
+
+    # 执行
+    run_sql(
+        sql_file=args.sql_file,
+        output_file=args.output,
+        variables=variables,
+        start=start,
+        end=end,
+        dry_run=args.dry_run
+    )
+
+
+if __name__ == "__main__":
+    main()

+ 267 - 0
tasks/渠道再分享回流/query.sql

@@ -0,0 +1,267 @@
+-- ********************************************************************--
+-- author:杨孝辉
+-- create time:2026-01-04 14:46:38
+-- ********************************************************************--
+-- ********************************************************************--
+-- author:杨孝辉
+-- create time:2026-01-04 14:45:54
+-- ********************************************************************--
+SELECT  dt
+        ,channel --   hotsencetype,
+        --   合作方名,
+        --   公众号名,
+        --   videoid,
+        --   title,
+        --   推荐状态,
+        -- ,merge一级品类
+        -- ,merge二级品类
+        ,COUNT(DISTINCT mid) AS 点击uv
+        ,(SUM(CASE    WHEN 再分享群聊回流uv > 0 THEN 再分享群聊回流uv ELSE 0 END) + SUM(CASE    WHEN 再分享单聊回流uv > 0 THEN 再分享单聊回流uv ELSE 0 END)) / (
+                    COUNT(DISTINCT mid) + 10
+        ) AS 再分享回流率
+        ,(SUM(CASE    WHEN 是否原视频 = '是' THEN 再分享群聊回流uv END) + SUM(CASE    WHEN 是否原视频 = '是' THEN 再分享单聊回流uv END)) / (
+                    COUNT(DISTINCT mid) + 10
+        ) AS 原视频再分享回流率
+        ,COUNT(DISTINCT CASE    WHEN 是否进入推荐 = '1' THEN mid END) AS 进入推荐人数
+        ,COUNT(DISTINCT CASE    WHEN 是否进入推荐 = '1' THEN mid END) / COUNT(DISTINCT mid) AS 进入推荐率
+        ,(SUM(CASE    WHEN 是否原视频 = '否' THEN 再分享群聊回流uv END) + SUM(CASE    WHEN 是否原视频 = '否' THEN 再分享单聊回流uv END)) / (
+                    COUNT(DISTINCT mid) + 10
+        ) AS 推荐再分享回流率
+        ,((SUM(CASE    WHEN 是否原视频 = '是' THEN 再分享群聊回流uv END) + SUM(CASE    WHEN 是否原视频 = '是' THEN 再分享单聊回流uv END)) / (
+                    COUNT(DISTINCT mid) + 10
+        )) / (
+                    (
+                                SUM(CASE    WHEN 是否原视频 = '否' THEN 再分享群聊回流uv END) + SUM(CASE    WHEN 是否原视频 = '否' THEN 再分享单聊回流uv END)
+                    ) / (
+                                COUNT(DISTINCT mid) + 10
+                    )
+        ) AS 原视频质量
+        ,SUM(CASE    WHEN 再分享群聊回流uv > 0 THEN 再分享群聊回流uv ELSE 0 END) + SUM(CASE    WHEN 再分享单聊回流uv > 0 THEN 再分享单聊回流uv ELSE 0 END) AS 再分享回流uv --   COUNT(DISTINCT wx_sn) AS 阅读文章数,
+        --   COUNT(DISTINCT videoid) AS 点击视频数,
+        --   COUNT(DISTINCT 再分享videoid) AS 再分享视频数,
+        --   COUNT(DISTINCT opengid)-1 AS 点击群数量,
+        --   COUNT(DISTINCT mid) / COUNT(DISTINCT shareid) AS 单分享卡片访问uv,
+        --   1000 *(
+        --     COUNT(
+        --       DISTINCT CASE
+        --         WHEN 群类型 = '群聊' THEN mid
+        --       END
+        --     ) + 1
+        --   ) /(
+        --     count(
+        --       DISTINCT CASE
+        --         WHEN 群类型 = '群聊' THEN opengid
+        --       end
+        --     ) * 30 + 500
+        --   ) *(
+        --     sum(
+        --       CASE
+        --         WHEN 再分享群聊回流uv > 0 then 再分享群聊回流uv
+        --         else 0
+        --       end
+        --     ) + sum(
+        --       CASE
+        --         WHEN 再分享单聊回流uv > 0 then 再分享单聊回流uv
+        --         else 0
+        --       end
+        --     ) + 1
+        --   ) /(COUNT(DISTINCT mid) + 500) as 群pushscore_ctr_ROR,
+        -- (
+        --     COUNT(
+        --       DISTINCT CASE
+        --         WHEN 群类型 = '群聊' THEN mid
+        --       END
+        --     ) + 1
+        --   ) /(
+        --     count(
+        --       DISTINCT CASE
+        --         WHEN 群类型 = '群聊' THEN opengid
+        --       end
+        --     ) * 30 + 100
+        --   ) AS 群ctr,
+        --   COUNT(DISTINCT 再分享shareid) / COUNT(DISTINCT mid) AS 再分享率,
+        -- (
+        --     sum(
+        --       CASE
+        --         WHEN 再分享群聊回流uv > 0 then 再分享群聊回流uv
+        --         else 0
+        --       end
+        --     ) + sum(
+        --       CASE
+        --         WHEN 再分享单聊回流uv > 0 then 再分享单聊回流uv
+        --         else 0
+        --       end
+        --     )
+        --   ) / COUNT(DISTINCT 再分享shareid) AS 再分享ros,
+        -- (
+        --     sum(
+        --       CASE
+        --         WHEN 再分享群聊回流uv > 0 then 再分享群聊回流uv
+        --         else 0
+        --       end
+        --     )
+        --   ) /(
+        --     sum(
+        --       CASE
+        --         WHEN 再分享群聊回流uv > 0 then 再分享群聊回流uv
+        --         else 0
+        --       end
+        --     ) + sum(
+        --       CASE
+        --         WHEN 再分享单聊回流uv > 0 then 再分享单聊回流uv
+        --         else 0
+        --       end
+        --     )
+        --   ) AS 再分享群聊占比,
+        --   COUNT(
+        --     DISTINCT CASE
+        --       WHEN 是否原视频 = '是' THEN 再分享shareid
+        --     END
+        --   ) / COUNT(DISTINCT mid) AS 原视频再分享率,
+        -- (
+        --     sum(
+        --       CASE
+        --         WHEN 是否原视频 = '是' THEN 再分享群聊回流uv
+        --       END
+        --     ) + sum(
+        --       CASE
+        --         WHEN 是否原视频 = '是' THEN 再分享单聊回流uv
+        --       END
+        --     )
+        --   ) / COUNT(
+        --     DISTINCT CASE
+        --       WHEN 是否原视频 = '是' THEN 再分享shareid
+        --     END
+        --   ) AS 原视频再分享ros,
+        -- (
+        --     sum(
+        --       CASE
+        --         WHEN 是否原视频 = '是' THEN 再分享群聊回流uv
+        --       END
+        --     )
+        --   ) /(
+        --     sum(
+        --       CASE
+        --         WHEN 是否原视频 = '是' THEN 再分享单聊回流uv
+        --       END
+        --     ) + sum(
+        --       CASE
+        --         WHEN 是否原视频 = '是' THEN 再分享群聊回流uv
+        --       END
+        --     )
+        --   ) AS 原视频再分享群聊占比,
+        --   COUNT(
+        --     DISTINCT CASE
+        --       WHEN 是否原视频 = '否' THEN 再分享shareid
+        --     END
+        --   ) / COUNT(DISTINCT mid) AS 推荐再分享率,
+        -- (
+        --     sum(
+        --       CASE
+        --         WHEN 是否原视频 = '否' THEN 再分享群聊回流uv
+        --       END
+        --     ) + sum(
+        --       CASE
+        --         WHEN 是否原视频 = '否' THEN 再分享单聊回流uv
+        --       END
+        --     )
+        --   ) / COUNT(
+        --     DISTINCT CASE
+        --       WHEN 是否原视频 = '是' THEN 再分享shareid
+        --     END
+        --   ) AS 推荐再分享ros,
+        -- (
+        --     sum(
+        --       CASE
+        --         WHEN 是否原视频 = '否' THEN 再分享群聊回流uv
+        --       END
+        --     )
+        --   ) /(
+        --     sum(
+        --       CASE
+        --         WHEN 是否原视频 = '否' THEN 再分享单聊回流uv
+        --       END
+        --     ) + sum(
+        --       CASE
+        --         WHEN 是否原视频 = '否' THEN 再分享群聊回流uv
+        --       END
+        --     )
+        --   ) AS 推荐再分享群聊占比,
+        --   COUNT(DISTINCT 再分享shareid) AS 再分享次数,
+        --   sum(
+        --     CASE
+        --       WHEN 再分享群聊回流uv > 0 then 再分享群聊回流uv
+        --     end
+        --   ) AS 再分享群聊回流,
+        --   sum(
+        --     CASE
+        --       WHEN 再分享单聊回流uv > 0 then 再分享单聊回流uv
+        --     end
+        --   ) AS 再分享单聊回流,
+        --   COUNT(
+        --     DISTINCT CASE
+        --       WHEN 再分享点击场景 = '1008' THEN 再分享点击id
+        --     END
+        --   ) as 再分享回流群数,
+        --   COUNT(
+        --     DISTINCT CASE
+        --       WHEN 是否原视频 = '是' THEN 再分享shareid
+        --     END
+        --   ) AS 再分享原视频次数,
+        --   sum(
+        --     CASE
+        --       WHEN 是否原视频 = '是' THEN 再分享群聊回流uv
+        --     END
+        --   ) AS 再分享原视频群聊回流,
+        --   sum(
+        --     CASE
+        --       WHEN 是否原视频 = '是' THEN 再分享单聊回流uv
+        --     END
+        --   ) AS 再分享原视频单聊回流,
+        --   COUNT(
+        --     DISTINCT CASE
+        --       WHEN 是否原视频 = '否' THEN 再分享shareid
+        --     END
+        --   ) AS 再分享推荐视频次数,
+        --   sum(
+        --     CASE
+        --       WHEN 是否原视频 = '否' THEN 再分享群聊回流uv
+        --     END
+        --   ) AS 再分享推荐群聊回流,
+        --   sum(
+        --     CASE
+        --       WHEN 是否原视频 = '否' THEN 再分享单聊回流uv
+        --     END
+        --   ) AS 再分享推荐单聊回流,
+        -- ,COUNT(DISTINCT 再分享merge二级品类) AS 再分享二级品类数 --   COUNT(DISTINCT shareid) AS 来源分享卡片数,
+        --   COUNT(DISTINCT 分享者mid) AS 来源分享uv,
+        --   COUNT(DISTINCT click_province) AS 点击地域省数,
+        --   COUNT(DISTINCT click_city) AS 点击地域市数,
+        --   sum(群历史人数) AS 群历史总人数,
+        --   avg(群当日活跃人数) AS 平均群当日活跃人数,
+        --   avg(群历史人数) AS 平均群历史人数,
+        --   avg(opengid_cnt) AS 平均人历史所在群数,
+        --   COUNT(DISTINCT mid) / COUNT(DISTINCT videoid) AS 单视频访问uv,
+        --   COUNT(DISTINCT mid) / COUNT(DISTINCT opengid) AS 单群访问uv,
+        --   COUNT(DISTINCT mid) / COUNT(DISTINCT 分享者mid) AS 单分享者访问uv,
+        --   COUNT(DISTINCT rootsourceid) AS 点击rootsourceid数,
+        --   COUNT(DISTINCT channel) AS 点击channel数
+FROM    loghubods.opengid_base_data
+WHERE   dt >= ${start}
+AND     dt <= ${end}
+AND     usersharedepth = 0
+AND     videoid IS NOT NULL 
+GROUP BY dt
+         ,channel
+         --   hotsencetype,
+         --   合作方名,
+         --   公众号名,
+         --   videoid,
+         --   title,
+         --   推荐状态,
+        --  merge一级品类
+        --  ,merge二级品类
+HAVING  点击uv > 10000
+ORDER BY dt DESC, 点击uv DESC
+LIMIT   5000
+;