update_mini_program_info_task.py 9.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256
  1. """
  2. @author: luojunhui
  3. @desc: 更新每日发布文章的小程序裂变信息
  4. """
  5. import json
  6. import time
  7. import traceback
  8. from argparse import ArgumentParser
  9. from collections import defaultdict
  10. from datetime import datetime, timedelta
  11. from typing import List, Dict
  12. from tqdm import tqdm
  13. from pymysql.cursors import DictCursor
  14. from applications import bot
  15. from applications import Functions
  16. from applications import log
  17. from applications import WeixinSpider
  18. from applications.db import DatabaseConnector
  19. from applications.exception import SpiderError
  20. from config import long_articles_config, piaoquan_crawler_config
  21. basic_function = Functions()
  22. spider = WeixinSpider()
  23. TASK_NAME = "updateMinigramInfoDaily"
  24. SPIDER_SUCCESS_STATUS = 0
  25. class UpdateMiniProgramInfoTask(object):
  26. """
  27. 更新小程序裂变信息
  28. """
  29. def __init__(self):
  30. self.piaoquan_crawler_db_client = None
  31. self.long_articles_db_client = None
  32. def init_database(self) -> None:
  33. """
  34. init database connector
  35. :return:
  36. """
  37. # 初始化数据库连接
  38. try:
  39. self.piaoquan_crawler_db_client = DatabaseConnector(piaoquan_crawler_config)
  40. self.piaoquan_crawler_db_client.connect()
  41. self.long_articles_db_client = DatabaseConnector(long_articles_config)
  42. self.long_articles_db_client.connect()
  43. except Exception as e:
  44. error_msg = traceback.format_exc()
  45. bot(
  46. title="更新小程序裂变信息任务连接数据库失败",
  47. detail={
  48. "error": e,
  49. "msg": error_msg
  50. }
  51. )
  52. return
  53. def get_published_articles_yesterday(self, run_date: str) -> List[Dict]:
  54. """
  55. get_published_articles_yesterday
  56. :param run_date:
  57. :return:
  58. """
  59. sql = f"""
  60. SELECT ContentUrl, wx_sn, publish_timestamp, accountName, title
  61. FROM official_articles_v2
  62. WHERE FROM_UNIXTIME(publish_timestamp)
  63. BETWEEN DATE_SUB('{run_date}', INTERVAL 1 DAY) AND DATE_SUB('{run_date}', INTERVAL 1 SECOND);
  64. """
  65. article_list = self.piaoquan_crawler_db_client.fetch(query=sql, cursor_type=DictCursor)
  66. return article_list
  67. def update_each_article(self, article_info: Dict) -> None:
  68. """
  69. update_each_article
  70. :param article_info:
  71. :return:
  72. """
  73. url = article_info['ContentUrl']
  74. wx_sn = article_info['wx_sn'].decode()
  75. publish_timestamp = article_info['publish_timestamp']
  76. try:
  77. article_detail = spider.get_article_text(url)
  78. except Exception as e:
  79. raise SpiderError(error=e, spider="detail", url=url)
  80. response_code = article_detail['code']
  81. if response_code == SPIDER_SUCCESS_STATUS:
  82. mini_info = article_detail['data']['data']['mini_program']
  83. if mini_info:
  84. log(
  85. task=TASK_NAME,
  86. function="get_root_source_ids",
  87. message="获取文章链接对应的 rootSourceId 成功",
  88. data={
  89. "ContentUrl": url,
  90. "wxSn": wx_sn,
  91. "publish_time_stamp": publish_timestamp,
  92. "miniInfo": mini_info
  93. }
  94. )
  95. try:
  96. dt_object = datetime.fromtimestamp(publish_timestamp)
  97. publish_dt = dt_object.strftime('%Y-%m-%d')
  98. one_day = timedelta(days=1)
  99. two_day = timedelta(days=2)
  100. next_day = dt_object + one_day
  101. next_next_day = dt_object + two_day
  102. recall_dt_list = [dt_object, next_day, next_next_day]
  103. recall_dt_str_list = [i.strftime('%Y-%m-%d') for i in recall_dt_list]
  104. for dt_str in recall_dt_str_list:
  105. for index, item in enumerate(mini_info, 1):
  106. image_url = item['image_url']
  107. nick_name = item['nike_name']
  108. root_source_id = item['path'].split("rootSourceId%3D")[-1]
  109. video_id = item['path'].split("videos%3Fid%3D")[1].split("%26su%3D")[0]
  110. kimi_title = item['title']
  111. # print(image_url, nick_name, root_source_id, video_id, kimi_title)
  112. insert_sql = f"""
  113. INSERT INTO long_articles_detail_info
  114. (wx_sn, mini_title, mini_name, cover_url, video_index, root_source_id, video_id, publish_dt, recall_dt)
  115. values
  116. (%s, %s, %s, %s, %s, %s, %s, %s, %s);
  117. """
  118. self.piaoquan_crawler_db_client.save(
  119. query=insert_sql,
  120. params=(
  121. wx_sn,
  122. kimi_title,
  123. nick_name,
  124. image_url,
  125. index,
  126. root_source_id,
  127. video_id,
  128. publish_dt,
  129. dt_str
  130. )
  131. )
  132. log(
  133. task=TASK_NAME,
  134. function="update_article_info",
  135. message="插入数据成功, video_id 是: {}".format(video_id)
  136. )
  137. except Exception as e:
  138. error_msg = traceback.format_exc()
  139. log(
  140. task=TASK_NAME,
  141. function="update_article_info",
  142. status="fail",
  143. message="插入数据失败, 失败原因是{}--{}".format(e, error_msg)
  144. )
  145. def get_source_id_info(self, root_source_id: str) -> Dict:
  146. """
  147. 计算root_source_id
  148. :param root_source_id:
  149. :return:
  150. """
  151. sql = f"""
  152. select type, machinecode, create_time, first_level_dt
  153. from changwen_data_base_v2
  154. where rootsourceid = '{root_source_id}';
  155. """
  156. result_list = self.long_articles_db_client.fetch(sql)
  157. def summarize(values):
  158. """
  159. :param values:
  160. :return:
  161. """
  162. L = {}
  163. first_level = {}
  164. fission_level = {}
  165. for line in values:
  166. # 先统计首层
  167. if line[0] == '首层':
  168. try:
  169. dt = str(line[-1])
  170. key_dt = datetime.strptime(dt, '%Y%m%d').strftime('%Y-%m-%d')
  171. if first_level.get(key_dt):
  172. first_level[key_dt].add(line[1])
  173. else:
  174. first_level[key_dt] = {line[1]}
  175. except Exception as e:
  176. continue
  177. else:
  178. try:
  179. dt = str(line[-1])
  180. first_level_dt = datetime.strptime(dt, '%Y%m%d')
  181. create_level_dt = line[-2]
  182. delta = create_level_dt - first_level_dt
  183. days = int(delta.days)
  184. key_dt = datetime.strptime(dt, '%Y%m%d').strftime('%Y-%m-%d')
  185. if fission_level.get(key_dt):
  186. fission_level[key_dt].append((line[1], days))
  187. else:
  188. fission_level[key_dt] = [(line[1], days)]
  189. except Exception as e:
  190. continue
  191. # print("first level dt is NULL")
  192. tt = {}
  193. for key in fission_level:
  194. detail_list = fission_level[key]
  195. temp = {}
  196. for item in detail_list:
  197. mid, days = item
  198. if temp.get(days):
  199. temp[days].add(mid)
  200. else:
  201. temp[days] = {mid}
  202. final = {}
  203. for sub_key in temp:
  204. length = len(temp[sub_key])
  205. final[sub_key] = length
  206. tt[key] = final
  207. for key in first_level:
  208. temp = [len(first_level[key]), tt.get(key, {}).get(0, 0), tt.get(key, {}).get(1, 0),
  209. tt.get(key, {}).get(2, 0)]
  210. L[key] = temp
  211. return L
  212. def summarize_v2(value):
  213. """
  214. :param value:
  215. :return:
  216. """
  217. first_level = defaultdict(set)
  218. fission_level = defaultdict(list)
  219. for record in value:
  220. type_, machinecode, create_time, first_level_dt = record
  221. try:
  222. response = summarize(result_list)
  223. log(
  224. task=TASK_NAME,
  225. function="get_minigram_info",
  226. message="计算source_id信息成功",
  227. data=response
  228. )
  229. return response
  230. except Exception as e:
  231. log(
  232. task=TASK_NAME,
  233. function="get_minigram_info",
  234. message="获取 source_id信息失败, 报错信息是: {}".format(e),
  235. status="fail"
  236. )
  237. return {}