recycle_mini_program_detail.py 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315
  1. import json
  2. import traceback
  3. from typing import Any
  4. from datetime import datetime, timedelta
  5. from applications.crawler.wechat import get_article_detail
  6. from applications.utils import extract_root_source_id
  7. from applications.utils import run_tasks_with_asyncio_task_group
  8. class MiniProgramConst:
  9. ARTICLE_SUCCESS_CODE = 0
  10. # 记录默认状态
  11. DEFAULT_STATUS = 0
  12. # 请求接口失败状态
  13. REQUEST_FAIL_STATUS = -1
  14. # 文章被删除状态
  15. DELETE_STATUS = -2
  16. # 未知原因无信息返回状态
  17. UNKNOWN_STATUS = -3
  18. # 文章违规状态
  19. ILLEGAL_STATUS = -4
  20. TASK_NAME = "recycle_mini_program_detail_daily"
  21. ARTICLE_TABLE = "official_articles_v2"
  22. DETAIL_TABLE = "long_articles_detail_info"
  23. # 更新文章周期天数
  24. ARTICLE_RECYCLE_DAYS = 1
  25. # 更新root_source_id天数
  26. ROOT_SOURCE_ID_UPDATE_DAYS = 3
  27. class RecycleMiniProgramDetailBase(MiniProgramConst):
  28. def __init__(self, pool, log_client, trace_id):
  29. self.pool = pool
  30. self.log_client = log_client
  31. self.trace_id = trace_id
  32. # 构造小程序信息对象
  33. @staticmethod
  34. async def create_mini_info(response: list[dict]) -> list[dict]:
  35. return [
  36. {
  37. "app_id": "wx89e7eb06478361d7",
  38. "avatar": "https://rescdn.yishihui.com/0temp/logo.png",
  39. "image_url": "",
  40. "nike_name": "票圈 l 3亿人喜欢的视频平台",
  41. "root_source_id": item["root_source_id"],
  42. "video_id": item["video_id"],
  43. "service_type": "0",
  44. "title": "",
  45. "type": "card",
  46. }
  47. for item in response
  48. ]
  49. # 获取单个 root_source_id 的首层,裂变详情
  50. async def fetch_root_source_id_result(self, root_source_id, dt) -> list[dict]:
  51. query = f"""
  52. SELECT first_uv, split0, split0_head, split0_recommend, split1, split1_head, split1_recommend, split2, split2_head, split2_recommend
  53. FROM changwen_data_rootsourceid
  54. WHERE root_source_id = %s AND dt = %s;
  55. """
  56. return await self.pool.async_fetch(query=query, params=(root_source_id, dt))
  57. # 获取制定天发布的文章
  58. async def fetch_published_articles(
  59. self, run_date: str, date_delta: int
  60. ) -> list[dict]:
  61. query = f"""
  62. SELECT ContentUrl, wx_sn, publish_timestamp, accountName, title, root_source_id_list
  63. FROM official_articles_v2
  64. WHERE FROM_UNIXTIME(publish_timestamp)
  65. BETWEEN DATE_SUB(%s, INTERVAL %s DAY) AND DATE_SUB(%s, INTERVAL 1 SECOND);
  66. """
  67. return await self.pool.async_fetch(
  68. query=query,
  69. db_name="piaoquan_crawler",
  70. params=(run_date, date_delta, run_date),
  71. )
  72. # 从数据库获取root_source_id信息
  73. async def fetch_root_source_id_from_db(
  74. self, root_source_id_list: list[str]
  75. ) -> list[dict[str, str]]:
  76. placeholders = ", ".join(["%s"] * len(root_source_id_list))
  77. query = f"""
  78. SELECT video_id, root_source_id FROM long_articles_root_source_id WHERE root_source_id in ({placeholders});
  79. """
  80. return await self.pool.async_fetch(
  81. query=query, params=tuple(root_source_id_list)
  82. )
  83. # 获取制定几天前的root_source_id
  84. async def fetch_root_source_id_in_last_days(
  85. self, run_date: str, date_delta: int
  86. ) -> list[dict[str, str]]:
  87. query = f"""
  88. SELECT recall_dt, root_source_id
  89. FROM {self.DETAIL_TABLE}
  90. WHERE publish_dt
  91. BETWEEN DATE_SUB(%s, INTERVAL %s DAY) AND DATE_SUB(%s, INTERVAL 1 SECOND);
  92. """
  93. return await self.pool.async_fetch(
  94. query=query,
  95. db_name="piaoquan_crawler",
  96. params=(run_date, date_delta, run_date),
  97. )
  98. # 插入新的root_source_id
  99. async def insert_each_root_source_id(self, params_list: list[tuple]) -> None:
  100. # 记录root_source_id信息
  101. query = f"""
  102. INSERT INTO {self.DETAIL_TABLE}
  103. (wx_sn, mini_title, mini_name, cover_url, video_index, root_source_id, video_id, publish_dt, recall_dt)
  104. values
  105. (%s, %s, %s, %s, %s, %s, %s, %s, %s)
  106. ON DUPLICATE KEY UPDATE video_id = VALUES(video_id);
  107. """
  108. return await self.pool.async_save(
  109. query=query, db_name="piaoquan_crawler", params=params_list, batch=True
  110. )
  111. # 更新root_source_id 首层 && 裂变信息
  112. async def update_each_root_source_id(self, item: dict) -> None:
  113. recall_dt = item["recall_dt"]
  114. root_source_id = item["root_source_id"]
  115. mini_program_detail = await self.fetch_root_source_id_result(
  116. root_source_id, recall_dt
  117. )
  118. if not mini_program_detail:
  119. return
  120. mini_program_detail = mini_program_detail[0]
  121. query = f"""
  122. UPDATE {self.DETAIL_TABLE}
  123. SET first_level = %s,
  124. fission_0 = %s, fission_0_head = %s, fission_0_recommend = %s,
  125. fission_1 = %s, fission_1_head = %s, fission_1_recommend = %s,
  126. fission_2 = %s, fission_2_head = %s, fission_2_recommend = %s
  127. WHERE root_source_id = %s and recall_dt = %s;
  128. """
  129. await self.pool.async_save(
  130. query=query,
  131. db_name="piaoquan_crawler",
  132. params=(
  133. mini_program_detail["first_uv"],
  134. mini_program_detail["split0"],
  135. mini_program_detail["split0_head"],
  136. mini_program_detail["split0_recommend"],
  137. mini_program_detail["split1"],
  138. mini_program_detail["split1_head"],
  139. mini_program_detail["split1_recommend"],
  140. mini_program_detail["split2"],
  141. mini_program_detail["split2_head"],
  142. mini_program_detail["split2_recommend"],
  143. root_source_id,
  144. recall_dt,
  145. ),
  146. )
  147. class RecycleMiniProgramDetailTask(RecycleMiniProgramDetailBase):
  148. # 业务层
  149. def __init__(self, pool, log_client, trace_id):
  150. super().__init__(pool, log_client, trace_id)
  151. async def handle_single_task(self, article):
  152. """
  153. record each article into long_articles_detail_info table
  154. """
  155. url, publish_timestamp, wx_sn = (
  156. article["ContentUrl"],
  157. article["publish_timestamp"],
  158. article["wx_sn"].decode(),
  159. )
  160. root_source_id_list = (
  161. json.loads(article["root_source_id_list"])
  162. if article["root_source_id_list"]
  163. else []
  164. )
  165. mini_detail = []
  166. # 获取小程序信息
  167. if root_source_id_list:
  168. root_source_id_detail = await self.fetch_root_source_id_from_db(
  169. root_source_id_list
  170. )
  171. mini_detail = await self.create_mini_info(root_source_id_detail)
  172. # no root_source_id_list in article
  173. try:
  174. article_detail = await get_article_detail(article_link=url)
  175. response_code = article_detail["code"]
  176. if response_code == self.ARTICLE_SUCCESS_CODE:
  177. mini_detail = article_detail["data"]["data"].get("mini_program", [])
  178. except Exception as e:
  179. await self.log_client.log(
  180. contents={
  181. "task": self.TASK_NAME,
  182. "trace_id": article["trace_id"],
  183. "message": "get article detail error",
  184. "data": {
  185. "error": str(e),
  186. "traceback": traceback.format_exc(),
  187. },
  188. }
  189. )
  190. # 将root_source_id信息插入mysql表格
  191. if not mini_detail:
  192. return
  193. publish_date = datetime.fromtimestamp(publish_timestamp)
  194. publish_dt = publish_date.strftime("%Y-%m-%d")
  195. # 记录T+0, T+1, T+2 三天的数据
  196. recall_dt_list = [
  197. (publish_date + timedelta(days=i)).strftime("%Y-%m-%d") for i in range(3)
  198. ]
  199. params_list = []
  200. for dt in recall_dt_list:
  201. for video_index, mini_item in enumerate(mini_detail, start=1):
  202. image_url = mini_item["image_url"]
  203. nickname = mini_item["nike_name"]
  204. if mini_item.get("root_source_id") and mini_item.get("video_id"):
  205. root_source_id, video_id = (
  206. mini_item["root_source_id"],
  207. mini_item["video_id"],
  208. )
  209. else:
  210. id_info = extract_root_source_id(mini_item["path"])
  211. root_source_id, video_id = (
  212. id_info["root_source_id"],
  213. id_info["video_id"],
  214. )
  215. kimi_title = mini_item["title"]
  216. params_list.append(
  217. (
  218. wx_sn,
  219. kimi_title,
  220. nickname,
  221. image_url,
  222. video_index,
  223. root_source_id,
  224. video_id,
  225. publish_dt,
  226. dt,
  227. )
  228. )
  229. await self.insert_each_root_source_id(params_list)
  230. # 记录每日发文 && root_source_id 信息
  231. async def record_published_articles_job(
  232. self,
  233. date_delta: int,
  234. run_date: str,
  235. *,
  236. max_concurrency: int = 20, # 并发上限
  237. fail_fast: bool = False, # 是否出现错误就退出
  238. ) -> dict[str, Any]:
  239. # get tasks
  240. task_list = await self.fetch_published_articles(run_date, date_delta)
  241. return await run_tasks_with_asyncio_task_group(
  242. task_list=task_list,
  243. handler=self.handle_single_task,
  244. max_concurrency=max_concurrency,
  245. fail_fast=fail_fast,
  246. description="持久化文章&&rootSourceId",
  247. unit="article",
  248. )
  249. # 更新root_source_id的首层,裂变信息
  250. async def update_mini_program_detail_job(
  251. self,
  252. date_delta: int,
  253. run_date: str,
  254. *,
  255. max_concurrency: int = 20,
  256. fail_fast: bool = False,
  257. ) -> dict[str, Any]:
  258. task_list = await self.fetch_root_source_id_in_last_days(run_date, date_delta)
  259. return await run_tasks_with_asyncio_task_group(
  260. task_list=task_list,
  261. handler=self.update_each_root_source_id,
  262. max_concurrency=max_concurrency,
  263. fail_fast=fail_fast,
  264. description="更新rootSourceId裂变表现",
  265. unit="id",
  266. )
  267. # 业务入口
  268. async def deal(self, params: dict):
  269. # 解析请求参数
  270. run_date = params.get("run_date")
  271. if not run_date:
  272. run_date = datetime.today().strftime("%Y-%m-%d")
  273. record_date_delta = params.get("record_date_delta", self.ARTICLE_RECYCLE_DAYS)
  274. update_date_delta = params.get(
  275. "update_date_delta", self.ROOT_SOURCE_ID_UPDATE_DAYS
  276. )
  277. # 业务执行
  278. await self.record_published_articles_job(
  279. date_delta=record_date_delta, run_date=run_date
  280. )
  281. await self.update_mini_program_detail_job(
  282. date_delta=update_date_delta, run_date=run_date
  283. )