recycle_root_source_id_detail.py 6.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160
  1. import json
  2. import traceback
  3. from datetime import datetime, timedelta
  4. from tqdm.asyncio import tqdm
  5. from applications.crawler.wechat import get_article_detail
  6. from applications.utils import get_beijing_date, handle_spider_exception, transform_to_beijing_date, extract_root_source_id
  7. class Const:
  8. ARTICLE_SUCCESS_CODE = 0
  9. # 记录默认状态
  10. DEFAULT_STATUS = 0
  11. # 请求接口失败状态
  12. REQUEST_FAIL_STATUS = -1
  13. # 文章被删除状态
  14. DELETE_STATUS = -2
  15. # 未知原因无信息返回状态
  16. UNKNOWN_STATUS = -3
  17. # 文章违规状态
  18. ILLEGAL_STATUS = -4
  19. class RecycleRootSourceIdDetail(Const):
  20. def __init__(self, pool, log_client, trace_id, run_date):
  21. self.pool = pool
  22. self.log_client = log_client
  23. self.trace_id = trace_id
  24. self.run_date = run_date
  25. if not self.run_date:
  26. self.run_date = get_beijing_date()
  27. async def get_publish_articles_last_day(self):
  28. """获取前一天的所有发文"""
  29. query = """
  30. SELECT ContentUrl, wx_sn, publish_timestamp, accountName, title, root_source_id_list
  31. FROM official_articles_v2
  32. WHERE FROM_UNIXTIME(publish_timestamp)
  33. BETWEEN DATE_SUB(%s, INTERVAL 1 DAY) AND DATE_SUB(%s, INTERVAL 1 SECOND);
  34. """
  35. article_list = await self.pool.async_fetch(
  36. query=query,
  37. db_name="piaoquan_crawler",
  38. params=(self.run_date, self.run_date),
  39. )
  40. return article_list
  41. async def get_mini_program_info_by_root_source_id(self, root_source_id_list):
  42. query = """
  43. select video_id, root_source_id
  44. from long_articles_root_source_id
  45. where root_source_id in %s;
  46. """
  47. return await self.pool.async_fetch(query=query, params=(tuple(root_source_id_list),))
  48. async def get_article_mini_program_detail(self, url, root_source_id_list):
  49. if not root_source_id_list:
  50. try:
  51. article_detail = await get_article_detail(url)
  52. response_code = article_detail["code"]
  53. if response_code == self.ARTICLE_SUCCESS_CODE:
  54. mini_info = article_detail["data"]["data"]["mini_program"]
  55. return mini_info
  56. else:
  57. return []
  58. except Exception as e:
  59. await handle_spider_exception(
  60. log_client=self.log_client,
  61. error=e,
  62. traceback=traceback.format_exc(),
  63. trace_id=self.trace_id,
  64. task_name=self.__class__.__name__
  65. )
  66. return []
  67. else:
  68. mini_program_info = await self.get_mini_program_info_by_root_source_id(root_source_id_list)
  69. if mini_program_info:
  70. return [
  71. {
  72. "app_id": "wx89e7eb06478361d7",
  73. "avatar": "https://rescdn.yishihui.com/0temp/logo.png",
  74. "image_url": "",
  75. "nike_name": "票圈 l 3亿人喜欢的视频平台",
  76. "root_source_id": item["root_source_id"],
  77. "video_id": item["video_id"],
  78. "service_type": "0",
  79. "title": "",
  80. "type": "card",
  81. } for item in mini_program_info
  82. ]
  83. else:
  84. return []
  85. async def record_single_article(self, article):
  86. url = article["ContentUrl"]
  87. wx_sn = article["wx_sn"].decode("utf-8")
  88. publish_timestamp = article["publish_timestamp"]
  89. root_source_id_list = json.loads(article["root_source_id_list"]) if article["root_source_id_list"] else []
  90. # get article mini program info
  91. article_mini_program_detail = await self.get_article_mini_program_detail(url, root_source_id_list)
  92. if not article_mini_program_detail:
  93. return {}
  94. else:
  95. try:
  96. publish_date = transform_to_beijing_date(publish_timestamp)
  97. # generate T+0, T+1, T+2 date string
  98. recall_dt_str_list = [
  99. (publish_date + timedelta(days=i)).strftime("%Y-%m-%d")
  100. for i in range(3)
  101. ]
  102. for date_str in recall_dt_str_list:
  103. for video_index, mini_item in enumerate(
  104. article_mini_program_detail, 1
  105. ):
  106. image_url = mini_item["image_url"]
  107. nick_name = mini_item["nike_name"]
  108. # extract video id and root_source_id
  109. if mini_item.get("root_source_id") and mini_item.get(
  110. "video_id"
  111. ):
  112. root_source_id = mini_item["root_source_id"]
  113. video_id = mini_item["video_id"]
  114. else:
  115. id_info = extract_root_source_id(mini_item["path"])
  116. root_source_id = id_info["root_source_id"]
  117. video_id = id_info["video_id"]
  118. kimi_title = mini_item["title"]
  119. # self.insert_each_root_source_id(
  120. # wx_sn=wx_sn,
  121. # mini_title=kimi_title,
  122. # mini_name=nick_name,
  123. # cover_url=image_url,
  124. # video_index=video_index,
  125. # root_source_id=root_source_id,
  126. # video_id=video_id,
  127. # publish_dt=publish_date.strftime("%Y-%m-%d"),
  128. # recall_dt=date_str,
  129. # )
  130. return {}
  131. except Exception as e:
  132. print(e)
  133. error_msg = traceback.format_exc()
  134. return article
  135. async def deal(self):
  136. """deal function"""
  137. # step 1, record articles to detail table
  138. publish_articles_list = await self.get_publish_articles_last_day()
  139. for article in tqdm(publish_articles_list, desc="更新文章"):
  140. await self.record_single_article(article)
  141. # step2, update root_source_id detail info