update_published_articles_minigram_detail.py 5.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168
  1. """
  2. @author: luojunhui
  3. """
  4. import json
  5. import time
  6. import traceback
  7. from argparse import ArgumentParser
  8. from typing import List, Dict
  9. from tqdm import tqdm
  10. from pymysql.cursors import DictCursor
  11. from applications import bot
  12. from applications import log
  13. from applications import Functions
  14. from applications import WeixinSpider
  15. from applications.db import DatabaseConnector
  16. from applications.const import UpdateMiniProgramDetailConst
  17. from applications.exception import SpiderError
  18. from config import long_articles_config, piaoquan_crawler_config
  19. const = UpdateMiniProgramDetailConst()
  20. spider = WeixinSpider()
  21. functions = Functions()
  22. # 数据库配置
  23. ARTICLE_TABLE = "official_articles_v2"
  24. def check_root_source_id_list(content_url: str):
  25. """
  26. 校验是否存在文章是否存在root_source_id
  27. :return:
  28. """
  29. try:
  30. article_detail = spider.get_article_text(content_url)
  31. except Exception as e:
  32. raise SpiderError(error=e, spider="detail", url=content_url)
  33. response_code = article_detail['code']
  34. if response_code == const.ARTICLE_SUCCESS_CODE:
  35. mini_info = article_detail['data']['data']['mini_program']
  36. return mini_info
  37. else:
  38. return None
  39. class UpdatePublishedArticlesMinigramDetail(object):
  40. """
  41. 更新已发布文章数据
  42. """
  43. def __init__(self):
  44. self.piaoquan_crawler_db_client = None
  45. self.long_articles_db_client = None
  46. def init_database(self) -> None:
  47. """
  48. init database connector
  49. :return:
  50. """
  51. # 初始化数据库连接
  52. try:
  53. self.piaoquan_crawler_db_client = DatabaseConnector(piaoquan_crawler_config)
  54. self.piaoquan_crawler_db_client.connect()
  55. self.long_articles_db_client = DatabaseConnector(long_articles_config)
  56. self.long_articles_db_client.connect()
  57. except Exception as e:
  58. error_msg = traceback.format_exc()
  59. bot(
  60. title="更新小程序裂变信息任务连接数据库失败",
  61. detail={
  62. "error": e,
  63. "msg": error_msg
  64. }
  65. )
  66. return
  67. def check_articles(self) -> List[Dict]:
  68. """
  69. 校验是否存在文章未更新得到发布时间
  70. :return:
  71. """
  72. sql = f"""
  73. SELECT ContentUrl, wx_sn
  74. FROM {ARTICLE_TABLE}
  75. WHERE publish_timestamp IN {(const.DEFAULT_STATUS, const.REQUEST_FAIL_STATUS)};"""
  76. response = self.piaoquan_crawler_db_client.fetch(sql, cursor_type=DictCursor)
  77. return response
  78. def get_root_source_id_result(self, root_source_id: str, dt: str) -> Dict:
  79. """
  80. 获取文章的root_source_id
  81. :param dt:
  82. :param root_source_id:
  83. :return:
  84. """
  85. select_sql = f"""
  86. SELECT first_uv, split0, split1, split2
  87. FROM changwen_data_rootsourceid
  88. WHERE root_source_id = '{root_source_id}' AND dt = '{dt}';
  89. """
  90. result = self.long_articles_db_client.fetch(select_sql, cursor_type=DictCursor)
  91. if result:
  92. return result[0]
  93. else:
  94. return {}
  95. def get_articles_published_yesterday(self, biz_date: str) -> List[Dict]:
  96. """
  97. 获取昨天发布的文章
  98. :return:
  99. """
  100. sql = f"""
  101. SELECT ContentUrl, wx_sn, publish_timestamp, accountName, title, root_source_id_list
  102. FROM official_articles_v2
  103. WHERE FROM_UNIXTIME(publish_timestamp)
  104. BETWEEN DATE_SUB('{biz_date}', INTERVAL 1 DAY) AND DATE_SUB('{biz_date}', INTERVAL 1 SECOND);
  105. """
  106. article_list = self.piaoquan_crawler_db_client.fetch(query=sql, cursor_type=DictCursor)
  107. return article_list
  108. def insert_each_root_source_id(self, root_source_id: str, article_info: Dict) -> int:
  109. """
  110. :param root_source_id:
  111. :param article_info:
  112. :return:
  113. """
  114. insert_sql = f"""
  115. INSERT INTO long_articles_detail_info
  116. (wx_sn, mini_title, mini_name, cover_url, video_index, root_source_id, video_id, publish_dt, recall_dt)
  117. values
  118. (%s, %s, %s, %s, %s, %s, %s, %s, %s);
  119. """
  120. affected_rows = self.piaoquan_crawler_db_client.save(
  121. query=insert_sql,
  122. params=(
  123. article_info['wx_sn'],
  124. article_info['title'],
  125. article_info['mini_name'],
  126. article_info['cover_url'],
  127. article_info['video_index'],
  128. root_source_id,
  129. article_info['video_id'],
  130. article_info['publish_dt']
  131. )
  132. )
  133. return affected_rows
  134. def record_each_article(self, article_info: Dict) -> None:
  135. """
  136. 记录每篇文章的root_source_id
  137. 数量集: article_count * mini_program_count * days_count
  138. :param article_info:
  139. :return:
  140. """
  141. url = article_info['ContentUrl']
  142. root_source_id_list = json.loads(article_info['root_source_id_list'])
  143. if not root_source_id_list:
  144. root_source_id_response = check_root_source_id_list(url)
  145. if root_source_id_response:
  146. root_source_id_list = []
  147. else:
  148. return
  149. for root_source_id in root_source_id_list:
  150. self.record_each_article(root_source_id, article_info)