recycle_root_source_id_detail.py 6.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176
  1. import json
  2. import traceback
  3. from datetime import datetime, timedelta
  4. from tqdm.asyncio import tqdm
  5. from applications.crawler.wechat import get_article_detail
  6. from applications.pipeline import insert_into_mini_program_detail_pool
  7. from applications.utils import (
  8. get_beijing_date,
  9. handle_spider_exception,
  10. transform_to_beijing_date,
  11. extract_root_source_id,
  12. )
  13. class Const:
  14. ARTICLE_SUCCESS_CODE = 0
  15. # 记录默认状态
  16. DEFAULT_STATUS = 0
  17. # 请求接口失败状态
  18. REQUEST_FAIL_STATUS = -1
  19. # 文章被删除状态
  20. DELETE_STATUS = -2
  21. # 未知原因无信息返回状态
  22. UNKNOWN_STATUS = -3
  23. # 文章违规状态
  24. ILLEGAL_STATUS = -4
  25. class RecycleRootSourceIdDetail(Const):
  26. def __init__(self, pool, log_client, trace_id, run_date):
  27. self.pool = pool
  28. self.log_client = log_client
  29. self.trace_id = trace_id
  30. self.run_date = run_date
  31. if not self.run_date:
  32. self.run_date = get_beijing_date()
  33. async def get_publish_articles_last_day(self):
  34. """获取前一天的所有发文"""
  35. query = """
  36. SELECT ContentUrl, wx_sn, publish_timestamp, accountName, title, root_source_id_list
  37. FROM official_articles_v2
  38. WHERE FROM_UNIXTIME(publish_timestamp)
  39. BETWEEN DATE_SUB(%s, INTERVAL 1 DAY) AND DATE_SUB(%s, INTERVAL 1 SECOND);
  40. """
  41. article_list = await self.pool.async_fetch(
  42. query=query,
  43. db_name="piaoquan_crawler",
  44. params=(self.run_date, self.run_date),
  45. )
  46. return article_list
  47. async def get_mini_program_info_by_root_source_id(self, root_source_id_list):
  48. query = """
  49. select video_id, root_source_id
  50. from long_articles_root_source_id
  51. where root_source_id in %s;
  52. """
  53. return await self.pool.async_fetch(
  54. query=query, params=(tuple(root_source_id_list),)
  55. )
  56. async def get_article_mini_program_detail(self, url, root_source_id_list):
  57. if not root_source_id_list:
  58. try:
  59. article_detail = await get_article_detail(url)
  60. response_code = article_detail["code"]
  61. if response_code == self.ARTICLE_SUCCESS_CODE:
  62. mini_info = article_detail["data"]["data"]["mini_program"]
  63. return mini_info
  64. else:
  65. return []
  66. except Exception as e:
  67. await handle_spider_exception(
  68. log_client=self.log_client,
  69. error=e,
  70. traceback=traceback.format_exc(),
  71. trace_id=self.trace_id,
  72. task_name=self.__class__.__name__,
  73. )
  74. return []
  75. else:
  76. mini_program_info = await self.get_mini_program_info_by_root_source_id(
  77. root_source_id_list
  78. )
  79. if mini_program_info:
  80. return [
  81. {
  82. "app_id": "wx89e7eb06478361d7",
  83. "avatar": "https://rescdn.yishihui.com/0temp/logo.png",
  84. "image_url": "",
  85. "nike_name": "票圈 l 3亿人喜欢的视频平台",
  86. "root_source_id": item["root_source_id"],
  87. "video_id": item["video_id"],
  88. "service_type": "0",
  89. "title": "",
  90. "type": "card",
  91. }
  92. for item in mini_program_info
  93. ]
  94. else:
  95. return []
  96. async def record_single_article(self, article):
  97. url = article["ContentUrl"]
  98. wx_sn = article["wx_sn"].decode("utf-8")
  99. publish_timestamp = article["publish_timestamp"]
  100. root_source_id_list = (
  101. json.loads(article["root_source_id_list"])
  102. if article["root_source_id_list"]
  103. else []
  104. )
  105. # get article mini program info
  106. article_mini_program_detail = await self.get_article_mini_program_detail(
  107. url, root_source_id_list
  108. )
  109. if not article_mini_program_detail:
  110. return {}
  111. else:
  112. try:
  113. publish_date = transform_to_beijing_date(publish_timestamp)
  114. # generate T+0, T+1, T+2 date string
  115. recall_dt_str_list = [
  116. (publish_date + timedelta(days=i)).strftime("%Y-%m-%d")
  117. for i in range(3)
  118. ]
  119. for date_str in recall_dt_str_list:
  120. for video_index, mini_item in enumerate(
  121. article_mini_program_detail, 1
  122. ):
  123. image_url = mini_item["image_url"]
  124. nick_name = mini_item["nike_name"]
  125. # extract video id and root_source_id
  126. if mini_item.get("root_source_id") and mini_item.get(
  127. "video_id"
  128. ):
  129. root_source_id = mini_item["root_source_id"]
  130. video_id = mini_item["video_id"]
  131. else:
  132. id_info = extract_root_source_id(mini_item["path"])
  133. root_source_id = id_info["root_source_id"]
  134. video_id = id_info["video_id"]
  135. kimi_title = mini_item["title"]
  136. await insert_into_mini_program_detail_pool(
  137. self.pool,
  138. raw={
  139. "wx_sn": wx_sn,
  140. "mini_title": kimi_title,
  141. "root_source_id": root_source_id,
  142. "video_id": video_id,
  143. "mini_name": nick_name,
  144. "cover_url": image_url,
  145. "publish_dt": publish_date.strftime("%Y-%m-%d"),
  146. "recall_dt": date_str,
  147. "video_index": video_index,
  148. },
  149. )
  150. return {}
  151. except Exception as e:
  152. print(e)
  153. error_msg = traceback.format_exc()
  154. return article
  155. async def deal(self):
  156. """deal function"""
  157. # step 1, record articles to detail table
  158. publish_articles_list = await self.get_publish_articles_last_day()
  159. for article in tqdm(publish_articles_list, desc="更新文章"):
  160. await self.record_single_article(article)
  161. # step2, update root_source_id detail info