update_mini_program_info_task.py 9.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243
  1. """
  2. @author: luojunhui
  3. @desc: 更新每日发布文章的小程序裂变信息
  4. """
  5. import json
  6. import time
  7. import traceback
  8. from argparse import ArgumentParser
  9. from datetime import datetime, timedelta
  10. from typing import List, Dict
  11. from tqdm import tqdm
  12. from pymysql.cursors import DictCursor
  13. from applications import bot
  14. from applications import Functions
  15. from applications import log
  16. from applications import WeixinSpider
  17. from applications.db import DatabaseConnector
  18. from applications.exception import SpiderError
  19. from config import long_articles_config, piaoquan_crawler_config
  20. basic_function = Functions()
  21. spider = WeixinSpider()
  22. TASK_NAME = "updateMinigramInfoDaily"
  23. SPIDER_SUCCESS_STATUS = 0
  24. class UpdateMiniProgramInfoTask(object):
  25. """
  26. 更新小程序裂变信息
  27. """
  28. def __init__(self):
  29. self.piaoquan_crawler_db_client = None
  30. self.long_articles_db_client = None
  31. def init_database(self) -> None:
  32. """
  33. init database connector
  34. :return:
  35. """
  36. # 初始化数据库连接
  37. try:
  38. self.piaoquan_crawler_db_client = DatabaseConnector(piaoquan_crawler_config)
  39. self.piaoquan_crawler_db_client.connect()
  40. self.long_articles_db_client = DatabaseConnector(long_articles_config)
  41. self.long_articles_db_client.connect()
  42. except Exception as e:
  43. error_msg = traceback.format_exc()
  44. bot(
  45. title="更新小程序裂变信息任务连接数据库失败",
  46. detail={
  47. "error": e,
  48. "msg": error_msg
  49. }
  50. )
  51. return
  52. def get_published_articles_yesterday(self, run_date: str) -> List[Dict]:
  53. """
  54. get_published_articles_yesterday
  55. :param run_date:
  56. :return:
  57. """
  58. sql = f"""
  59. SELECT ContentUrl, wx_sn, publish_timestamp, accountName, title
  60. FROM official_articles_v2
  61. WHERE FROM_UNIXTIME(publish_timestamp)
  62. BETWEEN DATE_SUB('{run_date}', INTERVAL 1 DAY) AND DATE_SUB('{run_date}', INTERVAL 1 SECOND);
  63. """
  64. article_list = self.piaoquan_crawler_db_client.fetch(query=sql, cursor_type=DictCursor)
  65. return article_list
  66. def update_each_article(self, article_info: Dict) -> None:
  67. """
  68. update_each_article
  69. :param article_info:
  70. :return:
  71. """
  72. url = article_info['ContentUrl']
  73. wx_sn = article_info['wx_sn'].decode()
  74. publish_timestamp = article_info['publish_timestamp']
  75. try:
  76. article_detail = spider.get_article_text(url)
  77. except Exception as e:
  78. raise SpiderError(error=e, spider="detail", url=url)
  79. response_code = article_detail['code']
  80. if response_code == SPIDER_SUCCESS_STATUS:
  81. mini_info = article_detail['data']['data']['mini_program']
  82. if mini_info:
  83. log(
  84. task=TASK_NAME,
  85. function="get_root_source_ids",
  86. message="获取文章链接对应的 rootSourceId 成功",
  87. data={
  88. "ContentUrl": url,
  89. "wxSn": wx_sn,
  90. "publish_time_stamp": publish_timestamp,
  91. "miniInfo": mini_info
  92. }
  93. )
  94. try:
  95. dt_object = datetime.fromtimestamp(publish_timestamp)
  96. publish_dt = dt_object.strftime('%Y-%m-%d')
  97. one_day = timedelta(days=1)
  98. two_day = timedelta(days=2)
  99. next_day = dt_object + one_day
  100. next_next_day = dt_object + two_day
  101. recall_dt_list = [dt_object, next_day, next_next_day]
  102. recall_dt_str_list = [i.strftime('%Y-%m-%d') for i in recall_dt_list]
  103. for dt_str in recall_dt_str_list:
  104. for index, item in enumerate(mini_info, 1):
  105. image_url = item['image_url']
  106. nick_name = item['nike_name']
  107. root_source_id = item['path'].split("rootSourceId%3D")[-1]
  108. video_id = item['path'].split("videos%3Fid%3D")[1].split("%26su%3D")[0]
  109. kimi_title = item['title']
  110. # print(image_url, nick_name, root_source_id, video_id, kimi_title)
  111. insert_sql = f"""
  112. INSERT INTO long_articles_detail_info
  113. (wx_sn, mini_title, mini_name, cover_url, video_index, root_source_id, video_id, publish_dt, recall_dt)
  114. values
  115. (%s, %s, %s, %s, %s, %s, %s, %s, %s);
  116. """
  117. self.piaoquan_crawler_db_client.save(
  118. query=insert_sql,
  119. params=(
  120. wx_sn,
  121. kimi_title,
  122. nick_name,
  123. image_url,
  124. index,
  125. root_source_id,
  126. video_id,
  127. publish_dt,
  128. dt_str
  129. )
  130. )
  131. log(
  132. task=TASK_NAME,
  133. function="update_article_info",
  134. message="插入数据成功, video_id 是: {}".format(video_id)
  135. )
  136. except Exception as e:
  137. error_msg = traceback.format_exc()
  138. log(
  139. task=TASK_NAME,
  140. function="update_article_info",
  141. status="fail",
  142. message="插入数据失败, 失败原因是{}--{}".format(e, error_msg)
  143. )
  144. def get_source_id_info(self, root_source_id: str) -> Dict:
  145. """
  146. 计算root_source_id
  147. :param root_source_id:
  148. :return:
  149. """
  150. sql = f"""
  151. select type, machinecode, create_time, first_level_dt
  152. from changwen_data_base_v2
  153. where rootsourceid = '{root_source_id}';
  154. """
  155. result_list = self.long_articles_db_client.fetch(sql)
  156. def summarize(values):
  157. """
  158. :param values:
  159. :return:
  160. """
  161. L = {}
  162. first_level = {}
  163. fission_level = {}
  164. for line in values:
  165. # 先统计首层
  166. if line[0] == '首层':
  167. try:
  168. dt = str(line[-1])
  169. key_dt = datetime.strptime(dt, '%Y%m%d').strftime('%Y-%m-%d')
  170. if first_level.get(key_dt):
  171. first_level[key_dt].add(line[1])
  172. else:
  173. first_level[key_dt] = {line[1]}
  174. except Exception as e:
  175. continue
  176. else:
  177. try:
  178. dt = str(line[-1])
  179. first_level_dt = datetime.strptime(dt, '%Y%m%d')
  180. create_level_dt = line[-2]
  181. delta = create_level_dt - first_level_dt
  182. days = int(delta.days)
  183. key_dt = datetime.strptime(dt, '%Y%m%d').strftime('%Y-%m-%d')
  184. if fission_level.get(key_dt):
  185. fission_level[key_dt].append((line[1], days))
  186. else:
  187. fission_level[key_dt] = [(line[1], days)]
  188. except Exception as e:
  189. continue
  190. # print("first level dt is NULL")
  191. tt = {}
  192. for key in fission_level:
  193. detail_list = fission_level[key]
  194. temp = {}
  195. for item in detail_list:
  196. mid, days = item
  197. if temp.get(days):
  198. temp[days].add(mid)
  199. else:
  200. temp[days] = {mid}
  201. final = {}
  202. for sub_key in temp:
  203. length = len(temp[sub_key])
  204. final[sub_key] = length
  205. tt[key] = final
  206. for key in first_level:
  207. temp = [len(first_level[key]), tt.get(key, {}).get(0, 0), tt.get(key, {}).get(1, 0),
  208. tt.get(key, {}).get(2, 0)]
  209. L[key] = temp
  210. return L
  211. try:
  212. response = summarize(result_list)
  213. log(
  214. task=TASK_NAME,
  215. function="get_minigram_info",
  216. message="计算source_id信息成功",
  217. data=response
  218. )
  219. return response
  220. except Exception as e:
  221. log(
  222. task=TASK_NAME,
  223. function="get_minigram_info",
  224. message="获取 source_id信息失败, 报错信息是: {}".format(e),
  225. status="fail"
  226. )
  227. return {}