recycle_mini_program_detail.py 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322
  1. import json
  2. import traceback
  3. from typing import Any
  4. from datetime import datetime, timedelta
  5. from applications.crawler.wechat import get_article_detail
  6. from applications.utils import extract_root_source_id
  7. from applications.utils import run_tasks_with_asyncio_task_group
  8. class MiniProgramConst:
  9. ARTICLE_SUCCESS_CODE = 0
  10. # 记录默认状态
  11. DEFAULT_STATUS = 0
  12. # 请求接口失败状态
  13. REQUEST_FAIL_STATUS = -1
  14. # 文章被删除状态
  15. DELETE_STATUS = -2
  16. # 未知原因无信息返回状态
  17. UNKNOWN_STATUS = -3
  18. # 文章违规状态
  19. ILLEGAL_STATUS = -4
  20. TASK_NAME = "recycle_mini_program_detail_daily"
  21. ARTICLE_TABLE = "official_articles_v2"
  22. DETAIL_TABLE = "long_articles_detail_info"
  23. # 更新文章周期天数
  24. ARTICLE_RECYCLE_DAYS = 1
  25. # 更新root_source_id天数
  26. ROOT_SOURCE_ID_UPDATE_DAYS = 3
  27. # DEFAULT VIDEO ID
  28. DEFAULT_VIDEO_ID = 1
  29. ZERO_VIDEO_ID = 0
  30. class RecycleMiniProgramDetailBase(MiniProgramConst):
  31. def __init__(self, pool, log_client, trace_id):
  32. self.pool = pool
  33. self.log_client = log_client
  34. self.trace_id = trace_id
  35. # 构造小程序信息对象
  36. def create_mini_info(self, response: list[dict]) -> list[dict]:
  37. return [
  38. {
  39. "app_id": "wx89e7eb06478361d7",
  40. "avatar": "https://rescdn.yishihui.com/0temp/logo.png",
  41. "image_url": "",
  42. "nike_name": "票圈 l 3亿人喜欢的视频平台",
  43. "root_source_id": item["root_source_id"],
  44. "video_id": item["video_id"] if item["video_id"] else self.DEFAULT_VIDEO_ID,
  45. "service_type": "0",
  46. "title": "",
  47. "type": "card",
  48. }
  49. for item in response
  50. ]
  51. # 获取单个 root_source_id 的首层,裂变详情
  52. async def fetch_root_source_id_result(self, root_source_id, dt) -> list[dict]:
  53. query = f"""
  54. SELECT first_uv, split0, split0_head, split0_recommend, split1, split1_head, split1_recommend, split2, split2_head, split2_recommend
  55. FROM changwen_data_rootsourceid
  56. WHERE root_source_id = %s AND dt = %s;
  57. """
  58. return await self.pool.async_fetch(query=query, params=(root_source_id, dt))
  59. # 获取制定天发布的文章
  60. async def fetch_published_articles(
  61. self, run_date: str, date_delta: int
  62. ) -> list[dict]:
  63. query = f"""
  64. SELECT ContentUrl, wx_sn, publish_timestamp, accountName, title, root_source_id_list
  65. FROM official_articles_v2
  66. WHERE FROM_UNIXTIME(publish_timestamp)
  67. BETWEEN DATE_SUB(%s, INTERVAL %s DAY) AND DATE_SUB(%s, INTERVAL 1 SECOND);
  68. """
  69. return await self.pool.async_fetch(
  70. query=query,
  71. db_name="piaoquan_crawler",
  72. params=(run_date, date_delta, run_date),
  73. )
  74. # 从数据库获取root_source_id信息
  75. async def fetch_root_source_id_from_db(
  76. self, root_source_id_list: list[str]
  77. ) -> list[dict[str, str]]:
  78. placeholders = ", ".join(["%s"] * len(root_source_id_list))
  79. query = f"""
  80. SELECT video_id, root_source_id FROM long_articles_root_source_id WHERE root_source_id in ({placeholders});
  81. """
  82. return await self.pool.async_fetch(
  83. query=query, params=tuple(root_source_id_list)
  84. )
  85. # 获取制定几天前的root_source_id
  86. async def fetch_root_source_id_in_last_days(
  87. self, run_date: str, date_delta: int
  88. ) -> list[dict[str, str]]:
  89. query = f"""
  90. SELECT recall_dt, root_source_id
  91. FROM {self.DETAIL_TABLE}
  92. WHERE publish_dt
  93. BETWEEN DATE_SUB(%s, INTERVAL %s DAY) AND DATE_SUB(%s, INTERVAL 1 SECOND);
  94. """
  95. return await self.pool.async_fetch(
  96. query=query,
  97. db_name="piaoquan_crawler",
  98. params=(run_date, date_delta, run_date),
  99. )
  100. # 插入新的root_source_id
  101. async def insert_each_root_source_id(self, params_list: list[tuple]) -> None:
  102. # 记录root_source_id信息
  103. query = f"""
  104. INSERT INTO {self.DETAIL_TABLE}
  105. (wx_sn, mini_title, mini_name, cover_url, video_index, root_source_id, video_id, publish_dt, recall_dt)
  106. values
  107. (%s, %s, %s, %s, %s, %s, %s, %s, %s)
  108. ON DUPLICATE KEY UPDATE video_id = VALUES(video_id);
  109. """
  110. return await self.pool.async_save(
  111. query=query, db_name="piaoquan_crawler", params=params_list, batch=True
  112. )
  113. # 更新root_source_id 首层 && 裂变信息
  114. async def update_each_root_source_id(self, item: dict) -> None:
  115. recall_dt = item["recall_dt"]
  116. root_source_id = item["root_source_id"]
  117. mini_program_detail = await self.fetch_root_source_id_result(
  118. root_source_id, recall_dt
  119. )
  120. if not mini_program_detail:
  121. return
  122. mini_program_detail = mini_program_detail[0]
  123. query = f"""
  124. UPDATE {self.DETAIL_TABLE}
  125. SET first_level = %s,
  126. fission_0 = %s, fission_0_head = %s, fission_0_recommend = %s,
  127. fission_1 = %s, fission_1_head = %s, fission_1_recommend = %s,
  128. fission_2 = %s, fission_2_head = %s, fission_2_recommend = %s
  129. WHERE root_source_id = %s and recall_dt = %s;
  130. """
  131. await self.pool.async_save(
  132. query=query,
  133. db_name="piaoquan_crawler",
  134. params=(
  135. mini_program_detail["first_uv"],
  136. mini_program_detail["split0"],
  137. mini_program_detail["split0_head"],
  138. mini_program_detail["split0_recommend"],
  139. mini_program_detail["split1"],
  140. mini_program_detail["split1_head"],
  141. mini_program_detail["split1_recommend"],
  142. mini_program_detail["split2"],
  143. mini_program_detail["split2_head"],
  144. mini_program_detail["split2_recommend"],
  145. root_source_id,
  146. recall_dt,
  147. ),
  148. )
  149. class RecycleMiniProgramDetailTask(RecycleMiniProgramDetailBase):
  150. # 业务层
  151. def __init__(self, pool, log_client, trace_id):
  152. super().__init__(pool, log_client, trace_id)
  153. async def handle_single_task(self, article):
  154. """
  155. record each article into long_articles_detail_info table
  156. """
  157. url, publish_timestamp, wx_sn = (
  158. article["ContentUrl"],
  159. article["publish_timestamp"],
  160. article["wx_sn"].decode(),
  161. )
  162. root_source_id_list = (
  163. json.loads(article["root_source_id_list"])
  164. if article["root_source_id_list"]
  165. else []
  166. )
  167. mini_detail = []
  168. # 获取小程序信息
  169. if root_source_id_list:
  170. root_source_id_detail = await self.fetch_root_source_id_from_db(
  171. root_source_id_list
  172. )
  173. mini_detail = self.create_mini_info(root_source_id_detail)
  174. else:
  175. # no root_source_id_list in article
  176. try:
  177. article_detail = await get_article_detail(article_link=url)
  178. response_code = article_detail["code"]
  179. if response_code == self.ARTICLE_SUCCESS_CODE:
  180. mini_detail = article_detail["data"]["data"].get("mini_program", [])
  181. except Exception as e:
  182. await self.log_client.log(
  183. contents={
  184. "task": self.TASK_NAME,
  185. "trace_id": article["trace_id"],
  186. "message": "get article detail error",
  187. "data": {
  188. "error": str(e),
  189. "traceback": traceback.format_exc(),
  190. },
  191. }
  192. )
  193. # 将root_source_id信息插入mysql表格
  194. if not mini_detail:
  195. return
  196. publish_date = datetime.fromtimestamp(publish_timestamp)
  197. publish_dt = publish_date.strftime("%Y-%m-%d")
  198. # 记录T+0, T+1, T+2 三天的数据
  199. recall_dt_list = [
  200. (publish_date + timedelta(days=i)).strftime("%Y-%m-%d") for i in range(3)
  201. ]
  202. params_list = []
  203. for dt in recall_dt_list:
  204. for video_index, mini_item in enumerate(mini_detail, start=1):
  205. image_url = mini_item["image_url"]
  206. nickname = mini_item["nike_name"]
  207. # 将video_id为0的替换为1
  208. if mini_item.get("video_id") == self.ZERO_VIDEO_ID:
  209. mini_item["video_id"] = self.DEFAULT_VIDEO_ID
  210. if mini_item.get("root_source_id") and mini_item.get("video_id"):
  211. root_source_id, video_id = (
  212. mini_item["root_source_id"],
  213. mini_item["video_id"],
  214. )
  215. else:
  216. id_info = extract_root_source_id(mini_item["path"])
  217. root_source_id, video_id = (
  218. id_info["root_source_id"],
  219. id_info["video_id"],
  220. )
  221. kimi_title = mini_item["title"]
  222. params_list.append(
  223. (
  224. wx_sn,
  225. kimi_title,
  226. nickname,
  227. image_url,
  228. video_index,
  229. root_source_id,
  230. video_id,
  231. publish_dt,
  232. dt,
  233. )
  234. )
  235. await self.insert_each_root_source_id(params_list)
  236. # 记录每日发文 && root_source_id 信息
  237. async def record_published_articles_job(
  238. self,
  239. date_delta: int,
  240. run_date: str,
  241. *,
  242. max_concurrency: int = 20, # 并发上限
  243. fail_fast: bool = False, # 是否出现错误就退出
  244. ) -> dict[str, Any]:
  245. # get tasks
  246. task_list = await self.fetch_published_articles(run_date, date_delta)
  247. return await run_tasks_with_asyncio_task_group(
  248. task_list=task_list,
  249. handler=self.handle_single_task,
  250. max_concurrency=max_concurrency,
  251. fail_fast=fail_fast,
  252. description="持久化文章&&rootSourceId",
  253. unit="article",
  254. )
  255. # 更新root_source_id的首层,裂变信息
  256. async def update_mini_program_detail_job(
  257. self,
  258. date_delta: int,
  259. run_date: str,
  260. *,
  261. max_concurrency: int = 20,
  262. fail_fast: bool = False,
  263. ) -> dict[str, Any]:
  264. task_list = await self.fetch_root_source_id_in_last_days(run_date, date_delta)
  265. return await run_tasks_with_asyncio_task_group(
  266. task_list=task_list,
  267. handler=self.update_each_root_source_id,
  268. max_concurrency=max_concurrency,
  269. fail_fast=fail_fast,
  270. description="更新rootSourceId裂变表现",
  271. unit="id",
  272. )
  273. # 业务入口
  274. async def deal(self, params: dict):
  275. # 解析请求参数
  276. run_date = params.get("run_date")
  277. if not run_date:
  278. run_date = datetime.today().strftime("%Y-%m-%d")
  279. record_date_delta = params.get("record_date_delta", self.ARTICLE_RECYCLE_DAYS)
  280. update_date_delta = params.get(
  281. "update_date_delta", self.ROOT_SOURCE_ID_UPDATE_DAYS
  282. )
  283. # 业务执行
  284. await self.record_published_articles_job(
  285. date_delta=record_date_delta, run_date=run_date
  286. )
  287. await self.update_mini_program_detail_job(
  288. date_delta=update_date_delta, run_date=run_date
  289. )