recycle_mini_program_detail.py 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293
  1. import json
  2. from datetime import datetime, timedelta
  3. from tqdm.asyncio import tqdm
  4. from applications.crawler.wechat import get_article_detail
  5. from applications.utils import extract_root_source_id
  6. class MiniProgramConst:
  7. ARTICLE_SUCCESS_CODE = 0
  8. # 记录默认状态
  9. DEFAULT_STATUS = 0
  10. # 请求接口失败状态
  11. REQUEST_FAIL_STATUS = -1
  12. # 文章被删除状态
  13. DELETE_STATUS = -2
  14. # 未知原因无信息返回状态
  15. UNKNOWN_STATUS = -3
  16. # 文章违规状态
  17. ILLEGAL_STATUS = -4
  18. TASK_NAME = "recycle_mini_program_detail_daily"
  19. ARTICLE_TABLE = "official_articles_v2"
  20. DETAIL_TABLE = "long_articles_detail_info"
  21. # 更新文章周期天数
  22. ARTICLE_RECYCLE_DAYS = 1
  23. # 更新root_source_id天数
  24. ROOT_SOURCE_ID_UPDATE_DAYS = 3
  25. class RecycleMiniProgramDetailBase(MiniProgramConst):
  26. def __init__(self, pool, log_client, trace_id):
  27. self.pool = pool
  28. self.log_client = log_client
  29. self.trace_id = trace_id
  30. # 构造小程序信息对象
  31. @staticmethod
  32. async def create_mini_info(response: list[dict]) -> list[dict]:
  33. return [
  34. {
  35. "app_id": "wx89e7eb06478361d7",
  36. "avatar": "https://rescdn.yishihui.com/0temp/logo.png",
  37. "image_url": "",
  38. "nike_name": "票圈 l 3亿人喜欢的视频平台",
  39. "root_source_id": item["root_source_id"],
  40. "video_id": item["video_id"],
  41. "service_type": "0",
  42. "title": "",
  43. "type": "card",
  44. }
  45. for item in response
  46. ]
  47. # 获取单个 root_source_id 的首层,裂变详情
  48. async def fetch_root_source_id_result(self, root_source_id, dt) -> list[dict]:
  49. query = f"""
  50. SELECT first_uv, split0, split0_head, split0_recommend, split1, split1_head, split1_recommend, split2, split2_head, split2_recommend
  51. FROM changwen_data_rootsourceid
  52. WHERE root_source_id = %s AND dt = %s;
  53. """
  54. return await self.pool.async_fetch(query=query, params=(root_source_id, dt))
  55. # 获取制定天发布的文章
  56. async def fetch_published_articles(
  57. self, run_date: str, date_delta: int
  58. ) -> list[dict]:
  59. query = f"""
  60. SELECT ContentUrl, wx_sn, publish_timestamp, accountName, title, root_source_id_list
  61. FROM official_articles_v2
  62. WHERE FROM_UNIXTIME(publish_timestamp)
  63. BETWEEN DATE_SUB(%s, INTERVAL %s DAY) AND DATE_SUB(%s, INTERVAL 1 SECOND);
  64. """
  65. return await self.pool.async_fetch(
  66. query=query,
  67. db_name="piaoquan_crawler",
  68. params=(run_date, date_delta, run_date),
  69. )
  70. # 从数据库获取root_source_id信息
  71. async def fetch_root_source_id_from_db(
  72. self, root_source_id_list: list[str]
  73. ) -> list[dict[str, str]]:
  74. query = """
  75. SELECT video_id, root_source_id FROM long_articles_root_source_id WHERE root_source_id in %s;
  76. """
  77. return await self.pool.async_fetch(
  78. query=query, params=(tuple(root_source_id_list),)
  79. )
  80. # 获取制定几天前的root_source_id
  81. async def fetch_root_source_id_in_last_days(
  82. self, run_date: str, date_delta: int
  83. ) -> list[dict[str, str]]:
  84. query = f"""
  85. SELECT recall_dt, root_source_id
  86. FROM {self.DETAIL_TABLE}
  87. WHERE publish_dt
  88. BETWEEN DATE_SUB(%s, INTERVAL %s DAY) AND DATE_SUB(%s, INTERVAL 1 SECOND);
  89. """
  90. return await self.pool.async_fetch(
  91. query=query,
  92. db_name="piaoquan_crawler",
  93. params=(run_date, date_delta, run_date),
  94. )
  95. async def whether_article_record_exist(self, wx_sn, recall_dt, video_id):
  96. query = f"""
  97. SELECT * FROM {self.DETAIL_TABLE}
  98. WHERE wx_sn = %s AND publish_dt = %s AND video_id = %s;
  99. """
  100. return await self.pool.async_fetch(query=query, db_name="piaoquan_crawler", params=(wx_sn, recall_dt, video_id))
  101. # 插入新的root_source_id
  102. async def insert_each_root_source_id(self, params: tuple) -> None:
  103. # 记录root_source_id信息
  104. query = f"""
  105. INSERT IGNORE INTO {self.DETAIL_TABLE}
  106. (wx_sn, mini_title, mini_name, cover_url, video_index, root_source_id, video_id, publish_dt, recall_dt)
  107. values
  108. (%s, %s, %s, %s, %s, %s, %s, %s, %s);
  109. """
  110. return await self.pool.async_save(
  111. query=query, db_name="piaoquan_crawler", params=params
  112. )
  113. # 更新root_source_id 首层 && 裂变信息
  114. async def update_each_root_source_id(self, item: dict) -> None:
  115. recall_dt = item["recall_dt"]
  116. root_source_id = item["root_source_id"]
  117. mini_program_detail = await self.fetch_root_source_id_result(
  118. root_source_id, recall_dt
  119. )
  120. if not mini_program_detail:
  121. return
  122. mini_program_detail = mini_program_detail[0]
  123. query = f"""
  124. UPDATE {self.DETAIL_TABLE}
  125. SET first_level = %s,
  126. fission_0 = %s, fission_0_head = %s, fission_0_recommend = %s,
  127. fission_1 = %s, fission_1_head = %s, fission_1_recommend = %s,
  128. fission_2 = %s, fission_2_head = %s, fission_2_recommend = %s
  129. WHERE root_source_id = %s and recall_dt = %s;
  130. """
  131. await self.pool.async_save(
  132. query=query,
  133. db_name="piaoquan_crawler",
  134. params=(
  135. mini_program_detail["first_uv"],
  136. mini_program_detail["split0"],
  137. mini_program_detail["split0_head"],
  138. mini_program_detail["split0_recommend"],
  139. mini_program_detail["split1"],
  140. mini_program_detail["split1_head"],
  141. mini_program_detail["split1_recommend"],
  142. mini_program_detail["split2"],
  143. mini_program_detail["split2_head"],
  144. mini_program_detail["split2_recommend"],
  145. root_source_id,
  146. recall_dt,
  147. ),
  148. )
  149. class RecycleMiniProgramDetailTask(RecycleMiniProgramDetailBase):
  150. # 业务层
  151. def __init__(self, pool, log_client, trace_id):
  152. super().__init__(pool, log_client, trace_id)
  153. async def handle_single_task(self, article):
  154. """
  155. record each article into long_articles_detail_info table
  156. """
  157. url, publish_timestamp, wx_sn = (
  158. article["ContentUrl"],
  159. article["publish_timestamp"],
  160. article["wx_sn"].decode(),
  161. )
  162. root_source_id_list = (
  163. json.loads(article["root_source_id_list"])
  164. if article["root_source_id_list"]
  165. else []
  166. )
  167. mini_detail = []
  168. # 获取小程序信息
  169. if root_source_id_list:
  170. root_source_id_detail = await self.fetch_root_source_id_from_db(
  171. root_source_id_list
  172. )
  173. mini_detail = await self.create_mini_info(root_source_id_detail)
  174. # no root_source_id_list in article
  175. try:
  176. article_detail = await get_article_detail(article_link=url)
  177. response_code = article_detail["code"]
  178. if response_code == self.ARTICLE_SUCCESS_CODE:
  179. mini_detail = article_detail["data"]["data"].get("mini_program", [])
  180. except Exception as e:
  181. raise Exception("Weixin Detail Spider Error", e)
  182. # 将root_source_id信息插入mysql表格
  183. if not mini_detail:
  184. return
  185. publish_date = datetime.fromtimestamp(publish_timestamp)
  186. publish_dt = publish_date.strftime("%Y-%m-%d")
  187. # 记录T+0, T+1, T+2 三天的数据
  188. recall_dt_list = [
  189. (publish_date + timedelta(days=i)).strftime("%Y-%m-%d") for i in range(3)
  190. ]
  191. for dt in recall_dt_list:
  192. for video_index, mini_item in enumerate(mini_detail, start=1):
  193. image_url = mini_item["image_url"]
  194. nickname = mini_item["nike_name"]
  195. if mini_item.get("root_source_id") and mini_item.get("video_id"):
  196. root_source_id, video_id = (
  197. mini_item["root_source_id"],
  198. mini_item["video_id"],
  199. )
  200. else:
  201. id_info = extract_root_source_id(mini_item["path"])
  202. root_source_id, video_id = (
  203. id_info["root_source_id"],
  204. id_info["video_id"],
  205. )
  206. if self.whether_article_record_exist(wx_sn=wx_sn, video_id=video_id, recall_dt=dt):
  207. continue
  208. kimi_title = mini_item["title"]
  209. await self.insert_each_root_source_id(
  210. params=(
  211. wx_sn,
  212. kimi_title,
  213. nickname,
  214. image_url,
  215. video_index,
  216. root_source_id,
  217. video_id,
  218. publish_dt,
  219. dt,
  220. )
  221. )
  222. # 记录每日发文 && root_source_id 信息
  223. async def record_published_articles_job(self, date_delta: int, run_date: str):
  224. # get tasks
  225. tasks = await self.fetch_published_articles(run_date, date_delta)
  226. for task in tqdm(tasks):
  227. await self.handle_single_task(task)
  228. # 更新root_source_id的首层,裂变信息
  229. async def update_mini_program_detail_job(self, date_delta: int, run_date: str):
  230. mini_info_list = await self.fetch_root_source_id_in_last_days(
  231. run_date, date_delta
  232. )
  233. fail_cnt = 0
  234. for mini_item in tqdm(
  235. mini_info_list, total=len(mini_info_list), desc="update_each_root_source_id"
  236. ):
  237. try:
  238. await self.update_each_root_source_id(mini_item)
  239. except Exception as e:
  240. fail_cnt += 1
  241. print(e)
  242. if fail_cnt:
  243. # add bot
  244. raise Exception(f"Fail Count {fail_cnt}")
  245. # 业务入口
  246. async def deal(self, params: dict):
  247. # 解析请求参数
  248. run_date = params.get("run_date")
  249. if not run_date:
  250. run_date = datetime.today().strftime("%Y-%m-%d")
  251. record_date_delta = params.get("record_date_delta", self.ARTICLE_RECYCLE_DAYS)
  252. update_date_delta = params.get(
  253. "update_date_delta", self.ROOT_SOURCE_ID_UPDATE_DAYS
  254. )
  255. # 业务执行
  256. await self.record_published_articles_job(
  257. date_delta=record_date_delta, run_date=run_date
  258. )
  259. await self.update_mini_program_detail_job(
  260. date_delta=update_date_delta, run_date=run_date
  261. )