Просмотр исходного кода

feat(图数据): 新增 Neo4j 图数据导出方案

- 新增 06_图数据_v2.sql: 简洁版图数据导出(share/click 事件)
- 新增 06_图数据_v2_full.sql: 完整版(含用户/视频详情,CTE 复用)
- 新增 07_用户详情.sql, 08_视频详情.sql: 节点属性补充
- 新增 convert_to_neo4j.py: CSV 转 Neo4j 导入格式(6 个文件)
- fetch_daily.py: 新增 --parallel 参数支持多线程下载
- odps_module.py: 新增并行下载方法,解除 1 万条限制

图模型: User-[CREATED]->Share-[OF]->Video, User-[CLICKED]->Share

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
yangxiaohui 1 месяц назад
Родитель
Сommit
9edcb76560

+ 49 - 15
fetch_daily.py

@@ -10,7 +10,9 @@
     python fetch_daily.py tasks/xxx/query.sql --start 20260101 --end 20260107
     python fetch_daily.py tasks/xxx/query.sql --date 20260105    # 单天
     python fetch_daily.py tasks/xxx/query.sql --force            # 强制重新获取
-    python fetch_daily.py tasks/xxx/query.sql --workers 10       # 设置并发数
+    python fetch_daily.py tasks/xxx/query.sql --workers 10       # 设置天级并发数
+    python fetch_daily.py tasks/xxx/query.sql --parallel 50      # 单天多线程下载(默认50,大数据量推荐)
+    python fetch_daily.py tasks/xxx/query.sql --parallel 0       # 关闭多线程,使用单线程下载
 """
 import argparse
 import sys
@@ -55,27 +57,32 @@ def get_date_range(start_str, end_str):
     return dates
 
 
-def fetch_single_day(dt, sql_template, daily_dir):
+def fetch_single_day(dt, sql_template, daily_dir, parallel_threads=0):
     """获取单天数据"""
     global success_count, fail_count
 
     try:
         client = ODPSClient()
         sql = sql_template.replace("${dt}", dt)
-        df = client.execute_sql(sql)
-
         output_file = daily_dir / f"{dt}.csv"
 
-        if df is not None and len(df) > 0:
-            df.to_csv(output_file, index=False)
-            with counter_lock:
-                success_count += 1
-            return (dt, "success", len(df))
-        elif df is not None:
-            df.to_csv(output_file, index=False)
+        # 下载到文件
+        if parallel_threads > 0:
+            # 多线程并行下载(适合大数据量)
+            client.execute_sql_result_save_file_parallel(sql, str(output_file), workers=parallel_threads)
+        else:
+            # 单线程下载
+            client.execute_sql_result_save_file(sql, str(output_file))
+
+        # 检查结果
+        if output_file.exists():
+            row_count = sum(1 for _ in open(output_file)) - 1  # 减去表头
             with counter_lock:
                 success_count += 1
-            return (dt, "empty", 0)
+            if row_count > 0:
+                return (dt, "success", row_count)
+            else:
+                return (dt, "empty", 0)
         else:
             with counter_lock:
                 fail_count += 1
@@ -97,7 +104,8 @@ def main():
     parser.add_argument("--end", type=str, help="结束日期 YYYYMMDD")
     parser.add_argument("--date", type=str, help="单天日期 YYYYMMDD")
     parser.add_argument("--force", action="store_true", help="强制重新获取")
-    parser.add_argument("--workers", type=int, default=5, help="并发数 (默认5)")
+    parser.add_argument("--workers", type=int, default=5, help="天级并发数 (默认5)")
+    parser.add_argument("--parallel", type=int, default=50, help="单天多线程下载 (默认50, 大数据量推荐)")
     args = parser.parse_args()
 
     # 解析 SQL 文件路径
@@ -146,17 +154,43 @@ def main():
     # 读取 SQL 模板
     sql_template = sql_file.read_text(encoding="utf-8")
 
+    # 检测 SQL 中是否包含 ${dt} 变量
+    has_dt_var = "${dt}" in sql_template
+
     # 重置计数器
     success_count = 0
     fail_count = 0
 
+    # 如果 SQL 中没有 ${dt},只需执行一次
+    if not has_dt_var:
+        print("\n检测到 SQL 中不含 ${dt} 变量,只执行一次...")
+        target_dates = ["20000101"]  # 用虚拟日期
+        missing_dates = target_dates
+        output_file = output_dir / f"{sql_file.stem}.csv"
+        output_file.parent.mkdir(parents=True, exist_ok=True)
+
+        try:
+            client = ODPSClient()
+            if args.parallel > 0:
+                client.execute_sql_result_save_file_parallel(sql_template, str(output_file), workers=args.parallel)
+            else:
+                client.execute_sql_result_save_file(sql_template, str(output_file))
+            print(f"数据目录: {output_file}")
+        except Exception as e:
+            print(f"✗ 执行失败: {e}")
+        return
+
     # 并发获取
+    print(f"目标日期: {target_dates[0]} ~ {target_dates[-1]} ({len(target_dates)}天)")
     workers = min(args.workers, len(missing_dates))
-    print(f"\n开始获取 (并发数: {workers})...")
+    if args.parallel > 0:
+        print(f"\n开始获取 (天级并发: {workers}, 单天多线程: {args.parallel})...")
+    else:
+        print(f"\n开始获取 (并发数: {workers})...")
 
     with ThreadPoolExecutor(max_workers=workers) as executor:
         futures = {
-            executor.submit(fetch_single_day, dt, sql_template, daily_dir): dt
+            executor.submit(fetch_single_day, dt, sql_template, daily_dir, args.parallel): dt
             for dt in missing_dates
         }
 

+ 148 - 8
lib/odps_module.py

@@ -1,7 +1,20 @@
 #!/usr/bin/env python
 # coding=utf-8
 
-from odps import ODPS
+import os
+import time
+import uuid
+import threading
+from concurrent.futures import ThreadPoolExecutor, as_completed
+from odps import ODPS, options
+from odps.tunnel import TableTunnel
+from tqdm import tqdm
+import pyarrow as pa
+from pyarrow import csv as pa_csv
+
+# 开启 Instance Tunnel,解除 1 万条限制
+options.tunnel.use_instance_tunnel = True
+options.tunnel.limit_instance_tunnel = False
 
 
 class ODPSClient(object):
@@ -18,14 +31,141 @@ class ODPSClient(object):
             self.endpoint
         )
 
-    def execute_sql(self, sql: str):
-        hints = {
-            'odps.sql.submit.mode': 'script'
-        }
-        with self.odps.execute_sql(sql, hints=hints).open_reader(tunnel=True) as reader:
+    def execute_sql(self, sql: str, print_logview: bool = True):
+        """执行 SQL 并返回 DataFrame"""
+        hints = {'odps.sql.submit.mode': 'script'}
+        instance = self.odps.execute_sql(sql, hints=hints)
+
+        if print_logview:
+            print(f"LogView: {instance.get_logview_address()}")
+
+        with instance.open_reader(tunnel=True, limit=False) as reader:
             pd_df = reader.to_pandas()
         return pd_df
 
     def execute_sql_result_save_file(self, sql: str, output_file: str):
-        data_df = self.execute_sql(sql)
-        data_df.to_csv(output_file, index=False)
+        """执行 SQL 并保存到文件(Arrow 直接写 CSV,速度最快)"""
+        hints = {'odps.sql.submit.mode': 'script'}
+
+        start_time = time.time()
+        instance = self.odps.execute_sql(sql, hints=hints)
+        sql_time = time.time() - start_time
+
+        print(f"LogView: {instance.get_logview_address()}")
+        print(f"SQL 执行耗时: {sql_time:.1f}s")
+
+        with instance.open_reader(tunnel=True, limit=False, arrow=True) as reader:
+            total = reader.count
+
+            # 边下载边写入,用 pyarrow 直接写 CSV
+            with open(output_file, 'wb') as f:
+                first = True
+                with tqdm(total=total, unit='行', desc='下载中') as pbar:
+                    for batch in reader:
+                        # pyarrow 写 CSV(比 pandas 快很多)
+                        options = pa_csv.WriteOptions(include_header=first)
+                        pa_csv.write_csv(pa.Table.from_batches([batch]), f, write_options=options)
+                        first = False
+                        pbar.update(batch.num_rows)
+
+        total_time = time.time() - start_time
+        print(f"总耗时: {total_time:.1f}s")
+        print(f"完成: {output_file}")
+
+    def execute_sql_result_save_file_parallel(self, sql: str, output_file: str, workers: int = 4):
+        """执行 SQL 并保存到文件(多线程并行下载,速度最快)"""
+        hints = {'odps.sql.submit.mode': 'script'}
+
+        # 生成临时表名
+        tmp_table = f"tmp_download_{uuid.uuid4().hex[:8]}"
+        create_sql = f"CREATE TABLE {tmp_table} LIFECYCLE 1 AS {sql}"
+
+        start_time = time.time()
+
+        # 1. 创建临时表
+        print(f"创建临时表: {tmp_table}")
+        instance = self.odps.execute_sql(create_sql, hints=hints)
+        print(f"LogView: {instance.get_logview_address()}")
+        instance.wait_for_success()
+        sql_time = time.time() - start_time
+        print(f"SQL 执行耗时: {sql_time:.1f}s")
+
+        try:
+            # 2. 获取表信息
+            table = self.odps.get_table(tmp_table)
+            tunnel = TableTunnel(self.odps)
+            download_session = tunnel.create_download_session(table.name)
+            total = download_session.count
+            print(f"总行数: {total}")
+
+            if total == 0:
+                # 空表,直接写入空 CSV
+                with open(output_file, 'w') as f:
+                    columns = [col.name for col in table.table_schema.columns]
+                    f.write(','.join(columns) + '\n')
+                print(f"完成: {output_file} (空表)")
+                return
+
+            # 3. 分段
+            chunk_size = (total + workers - 1) // workers
+            chunks = []
+            for i in range(workers):
+                start = i * chunk_size
+                end = min((i + 1) * chunk_size, total)
+                if start < end:
+                    chunks.append((i, start, end - start))  # (index, start, count)
+
+            print(f"并行下载: {len(chunks)} 个分片, {workers} 线程")
+
+            # 4. 多线程下载到临时文件(放在输出目录)
+            output_dir = os.path.dirname(output_file)
+            tmp_prefix = os.path.join(output_dir, f".tmp_{os.path.basename(output_file)}_")
+            pbar = tqdm(total=total, unit='行', desc='下载中')
+            pbar_lock = threading.Lock()
+            session_id = download_session.id
+            tmp_files = {}
+
+            def download_chunk(chunk_info):
+                idx, start, count = chunk_info
+                tmp_file = f"{tmp_prefix}{idx:04d}"
+                session = tunnel.create_download_session(table.name, download_id=session_id)
+                with session.open_arrow_reader(start, count) as reader:
+                    batches = []
+                    for batch in reader:
+                        batches.append(batch)
+                        with pbar_lock:
+                            pbar.update(batch.num_rows)
+                    if batches:
+                        tbl = pa.Table.from_batches(batches)
+                        pa_csv.write_csv(tbl, tmp_file)
+                return idx, tmp_file
+
+            # 并行下载
+            with ThreadPoolExecutor(max_workers=workers) as executor:
+                futures = [executor.submit(download_chunk, chunk) for chunk in chunks]
+                for future in as_completed(futures):
+                    idx, tmp_file = future.result()
+                    tmp_files[idx] = tmp_file
+
+            pbar.close()
+
+            # 按顺序合并
+            print("合并文件中...")
+            with open(output_file, 'wb') as outf:
+                for idx in range(len(chunks)):
+                    tmp_file = tmp_files.get(idx)
+                    if tmp_file and os.path.exists(tmp_file):
+                        with open(tmp_file, 'rb') as inf:
+                            if idx > 0:
+                                inf.readline()  # 跳过表头
+                            outf.write(inf.read())
+                        os.remove(tmp_file)
+
+        finally:
+            # 6. 删除临时表
+            print(f"删除临时表: {tmp_table}")
+            self.odps.delete_table(tmp_table, if_exists=True)
+
+        total_time = time.time() - start_time
+        print(f"总耗时: {total_time:.1f}s")
+        print(f"完成: {output_file}")

+ 27 - 0
tasks/00_表的洞察/loghubods.user_share_log/06_图数据_v2.sql

@@ -0,0 +1,27 @@
+-- Neo4j 图数据导出 - 简洁版
+-- 节点: User(mid), Video(vid), Share(shareid)
+-- 关系: CREATED(User→Share), OF(Share→Video), CLICKED(User→Share)
+-- 使用 fetch_daily.py 按天获取
+--
+-- 字段说明:
+--   action_type: share=分享事件, click=点击事件
+--   actor_mid: 执行动作的用户(share时是分享者,click时是点击者)
+--   sharer_mid: 创建 shareid 的用户(从 shareid 提取)
+--   注: share事件时 actor_mid = sharer_mid
+
+SELECT
+    topic as action_type,
+    shareid,
+    rootshareid,
+    sharedepth,
+    CASE
+        WHEN topic = 'share' THEN shareobjectid
+        WHEN topic = 'click' THEN clickobjectid
+    END as vid,
+    machinecode as actor_mid,
+    split(shareid, '-')[0] as sharer_mid,
+    clienttimestamp as action_ts
+FROM loghubods.user_share_log
+WHERE dt = '${dt}'
+    AND topic IN ('share', 'click')
+    AND shareid IS NOT NULL

+ 63 - 0
tasks/00_表的洞察/loghubods.user_share_log/06_图数据_v2_full.sql

@@ -0,0 +1,63 @@
+-- Neo4j 图数据导出 - 完整版(含用户和视频详情)
+-- 节点: User(mid), Video(vid), Share(shareid)
+-- 关系: CREATED(User→Share), OF(Share→Video), CLICKED(User→Share)
+-- 使用 fetch_daily.py 按天获取
+--
+-- 字段说明:
+--   action_type: share=分享事件, click=点击事件
+--   actor_mid: 执行动作的用户(share时是分享者,click时是点击者)
+--   sharer_mid: 创建 shareid 的用户(从 shareid 提取)
+
+WITH user_info AS (
+    SELECT uid, mid, nick_name, gender, country, province, city
+    FROM videoods.dim_user
+    LATERAL VIEW explode(split(mids, ',')) tmp AS mid
+),
+share_log AS (
+    SELECT
+        topic as action_type,
+        shareid,
+        rootshareid,
+        sharedepth,
+        CASE
+            WHEN topic = 'share' THEN shareobjectid
+            WHEN topic = 'click' THEN clickobjectid
+        END as vid,
+        machinecode as actor_mid,
+        split(shareid, '-')[0] as sharer_mid,
+        clienttimestamp as action_ts
+    FROM loghubods.user_share_log
+    WHERE dt = '${dt}'
+        AND topic IN ('share', 'click')
+        AND shareid IS NOT NULL
+)
+SELECT
+    t.action_type,
+    t.shareid,
+    t.rootshareid,
+    t.sharedepth,
+    t.vid,
+    t.actor_mid,
+    t.sharer_mid,
+    t.action_ts,
+    -- 动作执行者详情(share时=分享者,click时=点击者)
+    actor.uid as actor_uid,
+    actor.nick_name as actor_nick,
+    actor.gender as actor_gender,
+    actor.country as actor_country,
+    actor.province as actor_province,
+    actor.city as actor_city,
+    -- 分享者详情(click时有意义,share时与actor相同)
+    sharer.uid as sharer_uid,
+    sharer.nick_name as sharer_nick,
+    sharer.gender as sharer_gender,
+    sharer.country as sharer_country,
+    sharer.province as sharer_province,
+    sharer.city as sharer_city,
+    -- 视频详情
+    v.title as video_title,
+    v.cover_img_path as video_cover
+FROM share_log t
+LEFT JOIN user_info actor ON t.actor_mid = actor.mid
+LEFT JOIN user_info sharer ON t.sharer_mid = sharer.mid
+LEFT JOIN videoods.wx_video v ON t.vid = CAST(v.id AS STRING)

+ 16 - 0
tasks/00_表的洞察/loghubods.user_share_log/07_用户详情.sql

@@ -0,0 +1,16 @@
+-- Neo4j User 节点属性补充
+-- 用于导入后按需补充用户详情
+-- 使用 run_sql.py 执行(无日期分区)
+
+SELECT
+    uid,
+    mid,
+    nick_name,
+    gender,
+    country,
+    province,
+    city,
+    gmt_create_timestamp as register_ts
+FROM videoods.dim_user
+LATERAL VIEW explode(split(mids, ',')) t AS mid
+WHERE mids IS NOT NULL AND mids != ''

+ 11 - 0
tasks/00_表的洞察/loghubods.user_share_log/08_视频详情.sql

@@ -0,0 +1,11 @@
+-- Neo4j Video 节点属性补充
+-- 用于导入后按需补充视频详情
+-- 使用 run_sql.py 执行(无日期分区)
+
+SELECT
+    id as vid,
+    title,
+    cover_img_path,
+    gmt_create_timestamp as publish_ts
+FROM videoods.wx_video
+WHERE status = 1

+ 143 - 0
tasks/00_表的洞察/loghubods.user_share_log/convert_to_neo4j.py

@@ -0,0 +1,143 @@
+#!/usr/bin/env python3
+"""
+将图数据 CSV 转换为 Neo4j 导入格式
+
+输入: output/06_图数据_v2/{dt}.csv
+输出: output/neo4j/{dt}/
+    ├── nodes_user.csv      # User 节点
+    ├── nodes_video.csv     # Video 节点
+    ├── nodes_share.csv     # Share 节点
+    ├── rels_created.csv    # CREATED 关系 (User → Share)
+    ├── rels_of.csv         # OF 关系 (Share → Video)
+    └── rels_clicked.csv    # CLICKED 关系 (User → Share)
+
+Neo4j 导入命令:
+    neo4j-admin database import full \
+        --nodes=User=nodes_user.csv \
+        --nodes=Video=nodes_video.csv \
+        --nodes=Share=nodes_share.csv \
+        --relationships=CREATED=rels_created.csv \
+        --relationships=OF=rels_of.csv \
+        --relationships=CLICKED=rels_clicked.csv \
+        neo4j
+"""
+
+import pandas as pd
+from pathlib import Path
+import argparse
+
+
+def convert_to_neo4j(input_csv: Path, output_dir: Path):
+    """将原始 CSV 转换为 Neo4j 导入格式"""
+
+    print(f"读取: {input_csv}")
+    df = pd.read_csv(input_csv)
+    print(f"总记录数: {len(df)}")
+
+    output_dir.mkdir(parents=True, exist_ok=True)
+
+    # 分离 share 和 click 事件
+    df_share = df[df['action_type'] == 'share'].copy()
+    df_click = df[df['action_type'] == 'click'].copy()
+
+    print(f"  share 事件: {len(df_share)}")
+    print(f"  click 事件: {len(df_click)}")
+
+    # ========== 节点文件 ==========
+
+    # 1. User 节点 (收集所有 mid: actor_mid + sharer_mid)
+    all_mids = set()
+    all_mids.update(df['actor_mid'].dropna().unique())
+    all_mids.update(df['sharer_mid'].dropna().unique())
+    all_users = pd.Series(list(all_mids))
+
+    users_df = pd.DataFrame({
+        'mid:ID(User)': all_users
+    })
+    users_df.to_csv(output_dir / 'nodes_user.csv', index=False)
+    print(f"  User 节点: {len(users_df)}")
+
+    # 2. Video 节点
+    all_vids = df['vid'].dropna().unique()
+    videos_df = pd.DataFrame({
+        'vid:ID(Video)': all_vids
+    })
+    videos_df.to_csv(output_dir / 'nodes_video.csv', index=False)
+    print(f"  Video 节点: {len(videos_df)}")
+
+    # 3. Share 节点 (只从 share 事件创建,包含元数据)
+    shares_df = df_share[['shareid', 'rootshareid', 'sharedepth']].drop_duplicates('shareid')
+    shares_df = shares_df.rename(columns={
+        'shareid': 'shareid:ID(Share)',
+        'rootshareid': 'rootshareid',
+        'sharedepth': 'depth:int'
+    })
+    shares_df.to_csv(output_dir / 'nodes_share.csv', index=False)
+    print(f"  Share 节点: {len(shares_df)}")
+
+    # ========== 关系文件 ==========
+
+    # 1. CREATED 关系 (User → Share)
+    # share 事件: sharer_mid 创建了 shareid
+    created_df = df_share[['sharer_mid', 'shareid', 'action_ts']].dropna(subset=['sharer_mid', 'shareid'])
+    created_df = created_df.drop_duplicates(['sharer_mid', 'shareid'])
+    created_df = created_df.rename(columns={
+        'sharer_mid': ':START_ID(User)',
+        'shareid': ':END_ID(Share)',
+        'action_ts': 'action_ts:long'
+    })
+    created_df.to_csv(output_dir / 'rels_created.csv', index=False)
+    print(f"  CREATED 关系: {len(created_df)}")
+
+    # 2. OF 关系 (Share → Video)
+    # 每个 shareid 对应一个 vid
+    of_df = df_share[['shareid', 'vid']].dropna()
+    of_df = of_df.drop_duplicates('shareid')
+    of_df = of_df.rename(columns={
+        'shareid': ':START_ID(Share)',
+        'vid': ':END_ID(Video)'
+    })
+    of_df.to_csv(output_dir / 'rels_of.csv', index=False)
+    print(f"  OF 关系: {len(of_df)}")
+
+    # 3. CLICKED 关系 (User → Share)
+    # click 事件: actor_mid 点击了 shareid
+    clicked_df = df_click[['actor_mid', 'shareid', 'action_ts']].dropna(subset=['actor_mid', 'shareid'])
+    # 同一个用户可能多次点击同一个 share,保留所有记录还是去重?
+    # 这里先去重,如果需要保留多次点击可以注释下一行
+    clicked_df = clicked_df.drop_duplicates(['actor_mid', 'shareid'])
+    clicked_df = clicked_df.rename(columns={
+        'actor_mid': ':START_ID(User)',
+        'shareid': ':END_ID(Share)',
+        'action_ts': 'action_ts:long'
+    })
+    clicked_df.to_csv(output_dir / 'rels_clicked.csv', index=False)
+    print(f"  CLICKED 关系: {len(clicked_df)}")
+
+    print(f"\n输出目录: {output_dir}")
+
+
+def main():
+    parser = argparse.ArgumentParser(description='将图数据 CSV 转换为 Neo4j 导入格式')
+    parser.add_argument('input_csv', type=Path, help='输入 CSV 文件路径')
+    parser.add_argument('-o', '--output', type=Path, help='输出目录 (默认: output/neo4j/{dt}/)')
+
+    args = parser.parse_args()
+
+    if not args.input_csv.exists():
+        print(f"错误: 文件不存在 {args.input_csv}")
+        return
+
+    # 默认输出目录
+    if args.output:
+        output_dir = args.output
+    else:
+        # 从文件名提取日期
+        dt = args.input_csv.stem  # e.g., "20260120"
+        output_dir = Path('output/neo4j') / dt
+
+    convert_to_neo4j(args.input_csv, output_dir)
+
+
+if __name__ == '__main__':
+    main()