|
@@ -0,0 +1,315 @@
|
|
|
|
+import json
|
|
|
|
+import traceback
|
|
|
|
+from typing import Any
|
|
|
|
+from datetime import datetime, timedelta
|
|
|
|
+
|
|
|
|
+
|
|
|
|
+from applications.crawler.wechat import get_article_detail
|
|
|
|
+from applications.utils import extract_root_source_id
|
|
|
|
+from applications.utils import run_tasks_with_asyncio_task_group
|
|
|
|
+
|
|
|
|
+
|
|
|
|
+class MiniProgramConst:
|
|
|
|
+ ARTICLE_SUCCESS_CODE = 0
|
|
|
|
+ # 记录默认状态
|
|
|
|
+ DEFAULT_STATUS = 0
|
|
|
|
+ # 请求接口失败状态
|
|
|
|
+ REQUEST_FAIL_STATUS = -1
|
|
|
|
+ # 文章被删除状态
|
|
|
|
+ DELETE_STATUS = -2
|
|
|
|
+ # 未知原因无信息返回状态
|
|
|
|
+ UNKNOWN_STATUS = -3
|
|
|
|
+ # 文章违规状态
|
|
|
|
+ ILLEGAL_STATUS = -4
|
|
|
|
+
|
|
|
|
+ TASK_NAME = "recycle_mini_program_detail_daily"
|
|
|
|
+ ARTICLE_TABLE = "official_articles_v2"
|
|
|
|
+ DETAIL_TABLE = "long_articles_detail_info"
|
|
|
|
+
|
|
|
|
+ # 更新文章周期天数
|
|
|
|
+ ARTICLE_RECYCLE_DAYS = 1
|
|
|
|
+ # 更新root_source_id天数
|
|
|
|
+ ROOT_SOURCE_ID_UPDATE_DAYS = 3
|
|
|
|
+
|
|
|
|
+
|
|
|
|
+class RecycleMiniProgramDetailBase(MiniProgramConst):
|
|
|
|
+
|
|
|
|
+ def __init__(self, pool, log_client, trace_id):
|
|
|
|
+ self.pool = pool
|
|
|
|
+ self.log_client = log_client
|
|
|
|
+ self.trace_id = trace_id
|
|
|
|
+
|
|
|
|
+ # 构造小程序信息对象
|
|
|
|
+ @staticmethod
|
|
|
|
+ async def create_mini_info(response: list[dict]) -> list[dict]:
|
|
|
|
+ return [
|
|
|
|
+ {
|
|
|
|
+ "app_id": "wx89e7eb06478361d7",
|
|
|
|
+ "avatar": "https://rescdn.yishihui.com/0temp/logo.png",
|
|
|
|
+ "image_url": "",
|
|
|
|
+ "nike_name": "票圈 l 3亿人喜欢的视频平台",
|
|
|
|
+ "root_source_id": item["root_source_id"],
|
|
|
|
+ "video_id": item["video_id"],
|
|
|
|
+ "service_type": "0",
|
|
|
|
+ "title": "",
|
|
|
|
+ "type": "card",
|
|
|
|
+ }
|
|
|
|
+ for item in response
|
|
|
|
+ ]
|
|
|
|
+
|
|
|
|
+ # 获取单个 root_source_id 的首层,裂变详情
|
|
|
|
+ async def fetch_root_source_id_result(self, root_source_id, dt) -> list[dict]:
|
|
|
|
+ query = f"""
|
|
|
|
+ SELECT first_uv, split0, split0_head, split0_recommend, split1, split1_head, split1_recommend, split2, split2_head, split2_recommend
|
|
|
|
+ FROM changwen_data_rootsourceid
|
|
|
|
+ WHERE root_source_id = %s AND dt = %s;
|
|
|
|
+ """
|
|
|
|
+ return await self.pool.async_fetch(query=query, params=(root_source_id, dt))
|
|
|
|
+
|
|
|
|
+ # 获取制定天发布的文章
|
|
|
|
+ async def fetch_published_articles(
|
|
|
|
+ self, run_date: str, date_delta: int
|
|
|
|
+ ) -> list[dict]:
|
|
|
|
+ query = f"""
|
|
|
|
+ SELECT ContentUrl, wx_sn, publish_timestamp, accountName, title, root_source_id_list
|
|
|
|
+ FROM official_articles_v2
|
|
|
|
+ WHERE FROM_UNIXTIME(publish_timestamp)
|
|
|
|
+ BETWEEN DATE_SUB(%s, INTERVAL %s DAY) AND DATE_SUB(%s, INTERVAL 1 SECOND);
|
|
|
|
+ """
|
|
|
|
+ return await self.pool.async_fetch(
|
|
|
|
+ query=query,
|
|
|
|
+ db_name="piaoquan_crawler",
|
|
|
|
+ params=(run_date, date_delta, run_date),
|
|
|
|
+ )
|
|
|
|
+
|
|
|
|
+ # 从数据库获取root_source_id信息
|
|
|
|
+ async def fetch_root_source_id_from_db(
|
|
|
|
+ self, root_source_id_list: list[str]
|
|
|
|
+ ) -> list[dict[str, str]]:
|
|
|
|
+ placeholders = ", ".join(["%s"] * len(root_source_id_list))
|
|
|
|
+ query = f"""
|
|
|
|
+ SELECT video_id, root_source_id FROM long_articles_root_source_id WHERE root_source_id in ({placeholders});
|
|
|
|
+ """
|
|
|
|
+ return await self.pool.async_fetch(
|
|
|
|
+ query=query, params=tuple(root_source_id_list)
|
|
|
|
+ )
|
|
|
|
+
|
|
|
|
+ # 获取制定几天前的root_source_id
|
|
|
|
+ async def fetch_root_source_id_in_last_days(
|
|
|
|
+ self, run_date: str, date_delta: int
|
|
|
|
+ ) -> list[dict[str, str]]:
|
|
|
|
+ query = f"""
|
|
|
|
+ SELECT recall_dt, root_source_id
|
|
|
|
+ FROM {self.DETAIL_TABLE}
|
|
|
|
+ WHERE publish_dt
|
|
|
|
+ BETWEEN DATE_SUB(%s, INTERVAL %s DAY) AND DATE_SUB(%s, INTERVAL 1 SECOND);
|
|
|
|
+ """
|
|
|
|
+ return await self.pool.async_fetch(
|
|
|
|
+ query=query,
|
|
|
|
+ db_name="piaoquan_crawler",
|
|
|
|
+ params=(run_date, date_delta, run_date),
|
|
|
|
+ )
|
|
|
|
+
|
|
|
|
+ # 插入新的root_source_id
|
|
|
|
+ async def insert_each_root_source_id(self, params_list: list[tuple]) -> None:
|
|
|
|
+ # 记录root_source_id信息
|
|
|
|
+ query = f"""
|
|
|
|
+ INSERT INTO {self.DETAIL_TABLE}
|
|
|
|
+ (wx_sn, mini_title, mini_name, cover_url, video_index, root_source_id, video_id, publish_dt, recall_dt)
|
|
|
|
+ values
|
|
|
|
+ (%s, %s, %s, %s, %s, %s, %s, %s, %s)
|
|
|
|
+ ON DUPLICATE KEY UPDATE video_id = VALUES(video_id);
|
|
|
|
+ """
|
|
|
|
+ return await self.pool.async_save(
|
|
|
|
+ query=query, db_name="piaoquan_crawler", params=params_list, batch=True
|
|
|
|
+ )
|
|
|
|
+
|
|
|
|
+ # 更新root_source_id 首层 && 裂变信息
|
|
|
|
+ async def update_each_root_source_id(self, item: dict) -> None:
|
|
|
|
+ recall_dt = item["recall_dt"]
|
|
|
|
+ root_source_id = item["root_source_id"]
|
|
|
|
+ mini_program_detail = await self.fetch_root_source_id_result(
|
|
|
|
+ root_source_id, recall_dt
|
|
|
|
+ )
|
|
|
|
+ if not mini_program_detail:
|
|
|
|
+ return
|
|
|
|
+
|
|
|
|
+ mini_program_detail = mini_program_detail[0]
|
|
|
|
+ query = f"""
|
|
|
|
+ UPDATE {self.DETAIL_TABLE}
|
|
|
|
+ SET first_level = %s,
|
|
|
|
+ fission_0 = %s, fission_0_head = %s, fission_0_recommend = %s,
|
|
|
|
+ fission_1 = %s, fission_1_head = %s, fission_1_recommend = %s,
|
|
|
|
+ fission_2 = %s, fission_2_head = %s, fission_2_recommend = %s
|
|
|
|
+ WHERE root_source_id = %s and recall_dt = %s;
|
|
|
|
+ """
|
|
|
|
+ await self.pool.async_save(
|
|
|
|
+ query=query,
|
|
|
|
+ db_name="piaoquan_crawler",
|
|
|
|
+ params=(
|
|
|
|
+ mini_program_detail["first_uv"],
|
|
|
|
+ mini_program_detail["split0"],
|
|
|
|
+ mini_program_detail["split0_head"],
|
|
|
|
+ mini_program_detail["split0_recommend"],
|
|
|
|
+ mini_program_detail["split1"],
|
|
|
|
+ mini_program_detail["split1_head"],
|
|
|
|
+ mini_program_detail["split1_recommend"],
|
|
|
|
+ mini_program_detail["split2"],
|
|
|
|
+ mini_program_detail["split2_head"],
|
|
|
|
+ mini_program_detail["split2_recommend"],
|
|
|
|
+ root_source_id,
|
|
|
|
+ recall_dt,
|
|
|
|
+ ),
|
|
|
|
+ )
|
|
|
|
+
|
|
|
|
+
|
|
|
|
+class RecycleMiniProgramDetailTask(RecycleMiniProgramDetailBase):
|
|
|
|
+ # 业务层
|
|
|
|
+ def __init__(self, pool, log_client, trace_id):
|
|
|
|
+ super().__init__(pool, log_client, trace_id)
|
|
|
|
+
|
|
|
|
+ async def handle_single_task(self, article):
|
|
|
|
+ """
|
|
|
|
+ record each article into long_articles_detail_info table
|
|
|
|
+ """
|
|
|
|
+ url, publish_timestamp, wx_sn = (
|
|
|
|
+ article["ContentUrl"],
|
|
|
|
+ article["publish_timestamp"],
|
|
|
|
+ article["wx_sn"].decode(),
|
|
|
|
+ )
|
|
|
|
+ root_source_id_list = (
|
|
|
|
+ json.loads(article["root_source_id_list"])
|
|
|
|
+ if article["root_source_id_list"]
|
|
|
|
+ else []
|
|
|
|
+ )
|
|
|
|
+ mini_detail = []
|
|
|
|
+
|
|
|
|
+ # 获取小程序信息
|
|
|
|
+ if root_source_id_list:
|
|
|
|
+ root_source_id_detail = await self.fetch_root_source_id_from_db(
|
|
|
|
+ root_source_id_list
|
|
|
|
+ )
|
|
|
|
+ mini_detail = await self.create_mini_info(root_source_id_detail)
|
|
|
|
+
|
|
|
|
+ # no root_source_id_list in article
|
|
|
|
+ try:
|
|
|
|
+ article_detail = await get_article_detail(article_link=url)
|
|
|
|
+ response_code = article_detail["code"]
|
|
|
|
+ if response_code == self.ARTICLE_SUCCESS_CODE:
|
|
|
|
+ mini_detail = article_detail["data"]["data"].get("mini_program", [])
|
|
|
|
+
|
|
|
|
+ except Exception as e:
|
|
|
|
+ await self.log_client.log(
|
|
|
|
+ contents={
|
|
|
|
+ "task": self.TASK_NAME,
|
|
|
|
+ "trace_id": article["trace_id"],
|
|
|
|
+ "message": "get article detail error",
|
|
|
|
+ "data": {
|
|
|
|
+ "error": str(e),
|
|
|
|
+ "traceback": traceback.format_exc(),
|
|
|
|
+ },
|
|
|
|
+ }
|
|
|
|
+ )
|
|
|
|
+
|
|
|
|
+ # 将root_source_id信息插入mysql表格
|
|
|
|
+ if not mini_detail:
|
|
|
|
+ return
|
|
|
|
+
|
|
|
|
+ publish_date = datetime.fromtimestamp(publish_timestamp)
|
|
|
|
+ publish_dt = publish_date.strftime("%Y-%m-%d")
|
|
|
|
+ # 记录T+0, T+1, T+2 三天的数据
|
|
|
|
+ recall_dt_list = [
|
|
|
|
+ (publish_date + timedelta(days=i)).strftime("%Y-%m-%d") for i in range(3)
|
|
|
|
+ ]
|
|
|
|
+ params_list = []
|
|
|
|
+ for dt in recall_dt_list:
|
|
|
|
+ for video_index, mini_item in enumerate(mini_detail, start=1):
|
|
|
|
+ image_url = mini_item["image_url"]
|
|
|
|
+ nickname = mini_item["nike_name"]
|
|
|
|
+ if mini_item.get("root_source_id") and mini_item.get("video_id"):
|
|
|
|
+ root_source_id, video_id = (
|
|
|
|
+ mini_item["root_source_id"],
|
|
|
|
+ mini_item["video_id"],
|
|
|
|
+ )
|
|
|
|
+ else:
|
|
|
|
+ id_info = extract_root_source_id(mini_item["path"])
|
|
|
|
+ root_source_id, video_id = (
|
|
|
|
+ id_info["root_source_id"],
|
|
|
|
+ id_info["video_id"],
|
|
|
|
+ )
|
|
|
|
+
|
|
|
|
+ kimi_title = mini_item["title"]
|
|
|
|
+ params_list.append(
|
|
|
|
+ (
|
|
|
|
+ wx_sn,
|
|
|
|
+ kimi_title,
|
|
|
|
+ nickname,
|
|
|
|
+ image_url,
|
|
|
|
+ video_index,
|
|
|
|
+ root_source_id,
|
|
|
|
+ video_id,
|
|
|
|
+ publish_dt,
|
|
|
|
+ dt,
|
|
|
|
+ )
|
|
|
|
+ )
|
|
|
|
+
|
|
|
|
+ await self.insert_each_root_source_id(params_list)
|
|
|
|
+
|
|
|
|
+ # 记录每日发文 && root_source_id 信息
|
|
|
|
+ async def record_published_articles_job(
|
|
|
|
+ self,
|
|
|
|
+ date_delta: int,
|
|
|
|
+ run_date: str,
|
|
|
|
+ *,
|
|
|
|
+ max_concurrency: int = 20, # 并发上限
|
|
|
|
+ fail_fast: bool = False, # 是否出现错误就退出
|
|
|
|
+ ) -> dict[str, Any]:
|
|
|
|
+ # get tasks
|
|
|
|
+ task_list = await self.fetch_published_articles(run_date, date_delta)
|
|
|
|
+ return await run_tasks_with_asyncio_task_group(
|
|
|
|
+ task_list=task_list,
|
|
|
|
+ handler=self.handle_single_task,
|
|
|
|
+ max_concurrency=max_concurrency,
|
|
|
|
+ fail_fast=fail_fast,
|
|
|
|
+ description="持久化文章&&rootSourceId",
|
|
|
|
+ unit="article",
|
|
|
|
+ )
|
|
|
|
+
|
|
|
|
+ # 更新root_source_id的首层,裂变信息
|
|
|
|
+ async def update_mini_program_detail_job(
|
|
|
|
+ self,
|
|
|
|
+ date_delta: int,
|
|
|
|
+ run_date: str,
|
|
|
|
+ *,
|
|
|
|
+ max_concurrency: int = 20,
|
|
|
|
+ fail_fast: bool = False,
|
|
|
|
+ ) -> dict[str, Any]:
|
|
|
|
+ task_list = await self.fetch_root_source_id_in_last_days(run_date, date_delta)
|
|
|
|
+ return await run_tasks_with_asyncio_task_group(
|
|
|
|
+ task_list=task_list,
|
|
|
|
+ handler=self.update_each_root_source_id,
|
|
|
|
+ max_concurrency=max_concurrency,
|
|
|
|
+ fail_fast=fail_fast,
|
|
|
|
+ description="更新rootSourceId裂变表现",
|
|
|
|
+ unit="id",
|
|
|
|
+ )
|
|
|
|
+
|
|
|
|
+ # 业务入口
|
|
|
|
+ async def deal(self, params: dict):
|
|
|
|
+ # 解析请求参数
|
|
|
|
+ run_date = params.get("run_date")
|
|
|
|
+ if not run_date:
|
|
|
|
+ run_date = datetime.today().strftime("%Y-%m-%d")
|
|
|
|
+
|
|
|
|
+ record_date_delta = params.get("record_date_delta", self.ARTICLE_RECYCLE_DAYS)
|
|
|
|
+ update_date_delta = params.get(
|
|
|
|
+ "update_date_delta", self.ROOT_SOURCE_ID_UPDATE_DAYS
|
|
|
|
+ )
|
|
|
|
+
|
|
|
|
+ # 业务执行
|
|
|
|
+ await self.record_published_articles_job(
|
|
|
|
+ date_delta=record_date_delta, run_date=run_date
|
|
|
|
+ )
|
|
|
|
+ await self.update_mini_program_detail_job(
|
|
|
|
+ date_delta=update_date_delta, run_date=run_date
|
|
|
|
+ )
|