recycle_mini_program_detail.py 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323
  1. import json
  2. import traceback
  3. from typing import Any
  4. from datetime import datetime, timedelta
  5. from applications.crawler.wechat import get_article_detail
  6. from applications.utils import extract_root_source_id
  7. from applications.utils import run_tasks_with_asyncio_task_group
  8. class MiniProgramConst:
  9. ARTICLE_SUCCESS_CODE = 0
  10. # 记录默认状态
  11. DEFAULT_STATUS = 0
  12. # 请求接口失败状态
  13. REQUEST_FAIL_STATUS = -1
  14. # 文章被删除状态
  15. DELETE_STATUS = -2
  16. # 未知原因无信息返回状态
  17. UNKNOWN_STATUS = -3
  18. # 文章违规状态
  19. ILLEGAL_STATUS = -4
  20. TASK_NAME = "recycle_mini_program_detail_daily"
  21. ARTICLE_TABLE = "official_articles_v2"
  22. DETAIL_TABLE = "long_articles_detail_info"
  23. # 更新文章周期天数
  24. ARTICLE_RECYCLE_DAYS = 1
  25. # 更新root_source_id天数
  26. ROOT_SOURCE_ID_UPDATE_DAYS = 3
  27. # DEFAULT VIDEO ID
  28. DEFAULT_VIDEO_ID = 1
  29. ZERO_VIDEO_ID = 0
  30. class RecycleMiniProgramDetailBase(MiniProgramConst):
  31. def __init__(self, pool, log_client, trace_id):
  32. self.pool = pool
  33. self.log_client = log_client
  34. self.trace_id = trace_id
  35. # 构造小程序信息对象
  36. def create_mini_info(self, response: list[dict]) -> list[dict]:
  37. return [
  38. {
  39. "app_id": "wx89e7eb06478361d7",
  40. "avatar": "https://rescdn.yishihui.com/0temp/logo.png",
  41. "image_url": "",
  42. "nike_name": "票圈 l 3亿人喜欢的视频平台",
  43. "root_source_id": item["root_source_id"],
  44. "video_id": item["video_id"]
  45. if item["video_id"]
  46. else self.DEFAULT_VIDEO_ID,
  47. "service_type": "0",
  48. "title": "",
  49. "type": "card",
  50. }
  51. for item in response
  52. ]
  53. # 获取单个 root_source_id 的首层,裂变详情
  54. async def fetch_root_source_id_result(self, root_source_id, dt) -> list[dict]:
  55. query = f"""
  56. SELECT first_uv, split0, split0_head, split0_recommend, split1, split1_head, split1_recommend, split2, split2_head, split2_recommend
  57. FROM changwen_data_rootsourceid
  58. WHERE root_source_id = %s AND dt = %s;
  59. """
  60. return await self.pool.async_fetch(query=query, params=(root_source_id, dt))
  61. # 获取制定天发布的文章
  62. async def fetch_published_articles(
  63. self, run_date: str, date_delta: int
  64. ) -> list[dict]:
  65. query = f"""
  66. SELECT ContentUrl, wx_sn, publish_timestamp, accountName, title, root_source_id_list
  67. FROM official_articles_v2
  68. WHERE FROM_UNIXTIME(publish_timestamp)
  69. BETWEEN DATE_SUB(%s, INTERVAL %s DAY) AND DATE_SUB(%s, INTERVAL 1 SECOND);
  70. """
  71. return await self.pool.async_fetch(
  72. query=query,
  73. db_name="piaoquan_crawler",
  74. params=(run_date, date_delta, run_date),
  75. )
  76. # 从数据库获取root_source_id信息
  77. async def fetch_root_source_id_from_db(
  78. self, root_source_id_list: list[str]
  79. ) -> list[dict[str, str]]:
  80. placeholders = ", ".join(["%s"] * len(root_source_id_list))
  81. query = f"""
  82. SELECT video_id, root_source_id FROM long_articles_root_source_id WHERE root_source_id in ({placeholders});
  83. """
  84. return await self.pool.async_fetch(
  85. query=query, params=tuple(root_source_id_list)
  86. )
  87. # 获取制定几天前的root_source_id
  88. async def fetch_root_source_id_in_last_days(
  89. self, run_date: str, date_delta: int
  90. ) -> list[dict[str, str]]:
  91. query = f"""
  92. SELECT recall_dt, root_source_id
  93. FROM {self.DETAIL_TABLE}
  94. WHERE publish_dt
  95. BETWEEN DATE_SUB(%s, INTERVAL %s DAY) AND DATE_SUB(%s, INTERVAL 1 SECOND);
  96. """
  97. return await self.pool.async_fetch(
  98. query=query,
  99. db_name="piaoquan_crawler",
  100. params=(run_date, date_delta, run_date),
  101. )
  102. # 插入新的root_source_id
  103. async def insert_each_root_source_id(self, params_list: list[tuple]) -> None:
  104. # 记录root_source_id信息
  105. query = f"""
  106. INSERT INTO {self.DETAIL_TABLE}
  107. (wx_sn, mini_title, mini_name, cover_url, video_index, root_source_id, video_id, publish_dt, recall_dt)
  108. values
  109. (%s, %s, %s, %s, %s, %s, %s, %s, %s)
  110. ON DUPLICATE KEY UPDATE video_id = VALUES(video_id);
  111. """
  112. return await self.pool.async_save(
  113. query=query, db_name="piaoquan_crawler", params=params_list, batch=True
  114. )
  115. # 更新root_source_id 首层 && 裂变信息
  116. async def update_each_root_source_id(self, item: dict) -> None:
  117. recall_dt = item["recall_dt"]
  118. root_source_id = item["root_source_id"]
  119. mini_program_detail = await self.fetch_root_source_id_result(
  120. root_source_id, recall_dt
  121. )
  122. if not mini_program_detail:
  123. return
  124. mini_program_detail = mini_program_detail[0]
  125. query = f"""
  126. UPDATE {self.DETAIL_TABLE}
  127. SET first_level = %s,
  128. fission_0 = %s, fission_0_head = %s, fission_0_recommend = %s,
  129. fission_1 = %s, fission_1_head = %s, fission_1_recommend = %s,
  130. fission_2 = %s, fission_2_head = %s, fission_2_recommend = %s
  131. WHERE root_source_id = %s and recall_dt = %s;
  132. """
  133. await self.pool.async_save(
  134. query=query,
  135. db_name="piaoquan_crawler",
  136. params=(
  137. mini_program_detail["first_uv"],
  138. mini_program_detail["split0"],
  139. mini_program_detail["split0_head"],
  140. mini_program_detail["split0_recommend"],
  141. mini_program_detail["split1"],
  142. mini_program_detail["split1_head"],
  143. mini_program_detail["split1_recommend"],
  144. mini_program_detail["split2"],
  145. mini_program_detail["split2_head"],
  146. mini_program_detail["split2_recommend"],
  147. root_source_id,
  148. recall_dt,
  149. ),
  150. )
  151. class RecycleMiniProgramDetailTask(RecycleMiniProgramDetailBase):
  152. # 业务层
  153. def __init__(self, pool, log_client, trace_id):
  154. super().__init__(pool, log_client, trace_id)
  155. async def handle_single_task(self, article):
  156. """
  157. record each article into long_articles_detail_info table
  158. """
  159. url, publish_timestamp, wx_sn = (
  160. article["ContentUrl"],
  161. article["publish_timestamp"],
  162. article["wx_sn"].decode(),
  163. )
  164. root_source_id_list = (
  165. json.loads(article["root_source_id_list"])
  166. if article["root_source_id_list"]
  167. else []
  168. )
  169. mini_detail = []
  170. # 获取小程序信息
  171. if root_source_id_list:
  172. root_source_id_detail = await self.fetch_root_source_id_from_db(
  173. root_source_id_list
  174. )
  175. mini_detail = self.create_mini_info(root_source_id_detail)
  176. else:
  177. # no root_source_id_list in article
  178. try:
  179. article_detail = await get_article_detail(article_link=url)
  180. response_code = article_detail["code"]
  181. if response_code == self.ARTICLE_SUCCESS_CODE:
  182. mini_detail = article_detail["data"]["data"].get("mini_program", [])
  183. except Exception as e:
  184. await self.log_client.log(
  185. contents={
  186. "task": self.TASK_NAME,
  187. "trace_id": article["trace_id"],
  188. "message": "get article detail error",
  189. "data": {
  190. "error": str(e),
  191. "traceback": traceback.format_exc(),
  192. },
  193. }
  194. )
  195. # 将root_source_id信息插入mysql表格
  196. if not mini_detail:
  197. return
  198. publish_date = datetime.fromtimestamp(publish_timestamp)
  199. publish_dt = publish_date.strftime("%Y-%m-%d")
  200. # 记录T+0, T+1, T+2 三天的数据
  201. recall_dt_list = [
  202. (publish_date + timedelta(days=i)).strftime("%Y-%m-%d") for i in range(3)
  203. ]
  204. params_list = []
  205. for dt in recall_dt_list:
  206. for video_index, mini_item in enumerate(mini_detail, start=1):
  207. image_url = mini_item["image_url"]
  208. nickname = mini_item["nike_name"]
  209. # 将video_id为0的替换为1
  210. if mini_item.get("video_id") == self.ZERO_VIDEO_ID:
  211. mini_item["video_id"] = self.DEFAULT_VIDEO_ID
  212. if mini_item.get("root_source_id") and mini_item.get("video_id"):
  213. root_source_id, video_id = (
  214. mini_item["root_source_id"],
  215. mini_item["video_id"],
  216. )
  217. else:
  218. id_info = extract_root_source_id(mini_item["path"])
  219. root_source_id, video_id = (
  220. id_info["root_source_id"],
  221. id_info["video_id"],
  222. )
  223. kimi_title = mini_item["title"]
  224. params_list.append(
  225. (
  226. wx_sn,
  227. kimi_title,
  228. nickname,
  229. image_url,
  230. video_index,
  231. root_source_id,
  232. video_id,
  233. publish_dt,
  234. dt,
  235. )
  236. )
  237. await self.insert_each_root_source_id(params_list)
  238. # 记录每日发文 && root_source_id 信息
  239. async def record_published_articles_job(
  240. self,
  241. date_delta: int,
  242. run_date: str,
  243. *,
  244. max_concurrency: int = 20, # 并发上限
  245. fail_fast: bool = False, # 是否出现错误就退出
  246. ) -> dict[str, Any]:
  247. # get tasks
  248. task_list = await self.fetch_published_articles(run_date, date_delta)
  249. return await run_tasks_with_asyncio_task_group(
  250. task_list=task_list,
  251. handler=self.handle_single_task,
  252. max_concurrency=max_concurrency,
  253. fail_fast=fail_fast,
  254. description="持久化文章&&rootSourceId",
  255. unit="article",
  256. )
  257. # 更新root_source_id的首层,裂变信息
  258. async def update_mini_program_detail_job(
  259. self,
  260. date_delta: int,
  261. run_date: str,
  262. *,
  263. max_concurrency: int = 20,
  264. fail_fast: bool = False,
  265. ) -> dict[str, Any]:
  266. task_list = await self.fetch_root_source_id_in_last_days(run_date, date_delta)
  267. return await run_tasks_with_asyncio_task_group(
  268. task_list=task_list,
  269. handler=self.update_each_root_source_id,
  270. max_concurrency=max_concurrency,
  271. fail_fast=fail_fast,
  272. description="更新rootSourceId裂变表现",
  273. unit="id",
  274. )
  275. # 业务入口
  276. async def deal(self, params: dict):
  277. # 解析请求参数
  278. run_date = params.get("run_date")
  279. if not run_date:
  280. run_date = datetime.today().strftime("%Y-%m-%d")
  281. record_date_delta = params.get("record_date_delta", self.ARTICLE_RECYCLE_DAYS)
  282. update_date_delta = params.get(
  283. "update_date_delta", self.ROOT_SOURCE_ID_UPDATE_DAYS
  284. )
  285. # 业务执行
  286. await self.record_published_articles_job(
  287. date_delta=record_date_delta, run_date=run_date
  288. )
  289. await self.update_mini_program_detail_job(
  290. date_delta=update_date_delta, run_date=run_date
  291. )