update_published_articles_minigram_detail.py 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379
  1. """
  2. @author: luojunhui
  3. """
  4. import traceback
  5. from datetime import datetime, timedelta
  6. from typing import List, Dict
  7. from tqdm import tqdm
  8. from urllib.parse import urlparse, parse_qs
  9. from pymysql.cursors import DictCursor
  10. from applications import bot
  11. from applications import log
  12. from applications import Functions
  13. from applications import WeixinSpider
  14. from applications.db import DatabaseConnector
  15. from applications.const import UpdateMiniProgramDetailConst
  16. from applications.exception import SpiderError
  17. from config import long_articles_config, piaoquan_crawler_config
  18. const = UpdateMiniProgramDetailConst()
  19. spider = WeixinSpider()
  20. functions = Functions()
  21. TASK_NAME = "updateMinigramInfoDaily"
  22. ARTICLE_TABLE = "official_articles_v2"
  23. DETAIL_TABLE = "long_articles_detail_info"
  24. EMPTY_LIST = []
  25. EMPTY_DICT = {}
  26. def extract_path(path: str) -> Dict:
  27. """
  28. 提取path参数
  29. :param path:
  30. :return:
  31. """
  32. params = parse_qs(urlparse(path).query)
  33. jump_page = params.get('jumpPage', [None])[0]
  34. if jump_page:
  35. params2 = parse_qs(jump_page)
  36. res = {
  37. "video_id": params2['pages/user-videos?id'][0],
  38. "root_source_id": params2['rootSourceId'][0],
  39. }
  40. return res
  41. else:
  42. return EMPTY_DICT
  43. def get_article_mini_program_info(content_url: str) -> List[Dict]:
  44. """
  45. 获取文章的小程序信息
  46. :return:
  47. """
  48. try:
  49. article_detail = spider.get_article_text(content_url)
  50. except Exception as e:
  51. raise SpiderError(error=e, spider="detail", url=content_url)
  52. response_code = article_detail['code']
  53. if response_code == const.ARTICLE_SUCCESS_CODE:
  54. mini_info = article_detail['data']['data']['mini_program']
  55. return mini_info
  56. else:
  57. return EMPTY_LIST
  58. class UpdatePublishedArticlesMinigramDetail(object):
  59. """
  60. 更新已发布文章数据
  61. """
  62. def __init__(self):
  63. self.piaoquan_crawler_db_client = None
  64. self.long_articles_db_client = None
  65. def init_database(self) -> None:
  66. """
  67. init database connector
  68. :return:
  69. """
  70. # 初始化数据库连接
  71. try:
  72. self.piaoquan_crawler_db_client = DatabaseConnector(piaoquan_crawler_config)
  73. self.piaoquan_crawler_db_client.connect()
  74. self.long_articles_db_client = DatabaseConnector(long_articles_config)
  75. self.long_articles_db_client.connect()
  76. except Exception as e:
  77. error_msg = traceback.format_exc()
  78. bot(
  79. title="更新小程序裂变信息任务连接数据库失败",
  80. detail={
  81. "error": e,
  82. "msg": error_msg
  83. }
  84. )
  85. return
  86. def check_articles(self) -> List[Dict]:
  87. """
  88. 校验是否存在文章未更新得到发布时间
  89. :return:
  90. """
  91. sql = f"""
  92. SELECT ContentUrl, wx_sn
  93. FROM {ARTICLE_TABLE}
  94. WHERE publish_timestamp IN {(const.DEFAULT_STATUS, const.REQUEST_FAIL_STATUS)};
  95. """
  96. response = self.piaoquan_crawler_db_client.fetch(sql, cursor_type=DictCursor)
  97. return response
  98. def get_root_source_id_result(self, root_source_id: str, dt: str) -> Dict:
  99. """
  100. 获取文章的root_source_id
  101. :param dt:
  102. :param root_source_id:
  103. :return:
  104. """
  105. select_sql = f"""
  106. SELECT first_uv, split0, split0_head, split0_recommend, split1, split1_head, split1_recommend, split2, split2_head, split2_recommend
  107. FROM changwen_data_rootsourceid
  108. WHERE root_source_id = '{root_source_id}' AND dt = '{dt}';
  109. """
  110. result = self.long_articles_db_client.fetch(select_sql, cursor_type=DictCursor)
  111. if result:
  112. return result[0]
  113. else:
  114. return EMPTY_DICT
  115. def get_articles_published_yesterday(self, biz_date: str) -> List[Dict]:
  116. """
  117. 获取发布时间在biz_date前一天0点-23:59:59的文章
  118. :return:
  119. """
  120. sql = f"""
  121. SELECT ContentUrl, wx_sn, publish_timestamp, accountName, title
  122. FROM official_articles_v2
  123. WHERE FROM_UNIXTIME(publish_timestamp)
  124. BETWEEN DATE_SUB('{biz_date}', INTERVAL 1 DAY) AND DATE_SUB('{biz_date}', INTERVAL 1 SECOND);
  125. """
  126. article_list = self.piaoquan_crawler_db_client.fetch(query=sql, cursor_type=DictCursor)
  127. return article_list
  128. def insert_each_root_source_id(self, wx_sn, mini_title, mini_name, cover_url, video_index, root_source_id, video_id, publish_dt, recall_dt) -> int:
  129. """
  130. :param recall_dt: 召回日期
  131. :param publish_dt: 文章发布日期
  132. :param video_id: 视频id
  133. :param video_index: 视频位置
  134. :param cover_url: 视频封面
  135. :param mini_name: 小程序名称
  136. :param mini_title: 小程序标题
  137. :param wx_sn:
  138. :param root_source_id:
  139. :return:
  140. """
  141. insert_sql = f"""
  142. INSERT IGNORE INTO {DETAIL_TABLE}
  143. (wx_sn, mini_title, mini_name, cover_url, video_index, root_source_id, video_id, publish_dt, recall_dt)
  144. values
  145. (%s, %s, %s, %s, %s, %s, %s, %s, %s);
  146. """
  147. affected_rows = self.piaoquan_crawler_db_client.save(
  148. query=insert_sql,
  149. params=(
  150. wx_sn, mini_title, mini_name, cover_url, video_index, root_source_id, video_id, publish_dt, recall_dt
  151. )
  152. )
  153. return affected_rows
  154. def record_each_article(self, article_info: Dict) -> Dict:
  155. """
  156. 记录每篇文章的root_source_id
  157. 数量集: article_count * mini_program_count * days_count
  158. :param article_info:
  159. :return:
  160. """
  161. url = article_info['ContentUrl']
  162. publish_timestamp = article_info['publish_timestamp']
  163. wx_sn = article_info['wx_sn'].decode()
  164. article_mini_program_detail = get_article_mini_program_info(url)
  165. if article_mini_program_detail:
  166. log(
  167. task=TASK_NAME,
  168. function="record_each_article",
  169. message="获取文章链接对应的 rootSourceId 成功",
  170. data={
  171. "ContentUrl": url,
  172. "wxSn": wx_sn,
  173. "publish_timestamp": publish_timestamp,
  174. "miniInfo": article_mini_program_detail
  175. }
  176. )
  177. try:
  178. publish_date = datetime.fromtimestamp(publish_timestamp)
  179. # generate T+0, T+1, T+2 date string
  180. recall_dt_str_list = [
  181. (publish_date + timedelta(days=i)).strftime('%Y-%m-%d')
  182. for i in range(3)
  183. ]
  184. for date_str in recall_dt_str_list:
  185. for video_index, mini_item in enumerate(article_mini_program_detail, 1):
  186. image_url = mini_item['image_url']
  187. nick_name = mini_item['nike_name']
  188. # extract video id and root_source_id
  189. id_info = extract_path(mini_item['path'])
  190. root_source_id = id_info['root_source_id']
  191. video_id = id_info['video_id']
  192. kimi_title = mini_item['title']
  193. self.insert_each_root_source_id(
  194. wx_sn=wx_sn,
  195. mini_title=kimi_title,
  196. mini_name=nick_name,
  197. cover_url=image_url,
  198. video_index=video_index,
  199. root_source_id=root_source_id,
  200. video_id=video_id,
  201. publish_dt=publish_date.strftime('%Y-%m-%d'),
  202. recall_dt=date_str
  203. )
  204. return EMPTY_DICT
  205. except Exception as e:
  206. error_msg = traceback.format_exc()
  207. log(
  208. task=TASK_NAME,
  209. function="record_each_article",
  210. status="fail",
  211. message="插入数据失败, 失败原因是{}--{}".format(e, error_msg)
  212. )
  213. return article_info
  214. else:
  215. return EMPTY_DICT
  216. def get_root_source_id_for_three_days(self, biz_date: str) -> List[Dict]:
  217. """
  218. 获取publish_dt在 biz_date前三天的root_source_id
  219. :param biz_date:
  220. :return:
  221. """
  222. sql = f"""
  223. SELECT recall_dt, root_source_id
  224. FROM {DETAIL_TABLE}
  225. WHERE publish_dt
  226. BETWEEN DATE_SUB('{biz_date}', INTERVAL 3 DAY) AND DATE_SUB('{biz_date}', INTERVAL 1 SECOND);
  227. """
  228. article_list = self.piaoquan_crawler_db_client.fetch(query=sql, cursor_type=DictCursor)
  229. return article_list
  230. def update_each_root_source_id(self, recall_dt: str, root_source_id: str) -> None:
  231. """
  232. :param recall_dt:
  233. :param root_source_id:
  234. :return:
  235. """
  236. mini_program_detail = self.get_root_source_id_result(root_source_id=root_source_id, dt=recall_dt)
  237. if mini_program_detail:
  238. # do update job
  239. update_sql = f"""
  240. UPDATE {DETAIL_TABLE}
  241. SET first_level = %s,
  242. fission_0 = %s, fission_0_head = %s, fission_0_recommend = %s,
  243. fission_1 = %s, fission_1_head = %s, fission_1_recommend = %s,
  244. fission_2 = %s, fission_2_head = %s, fission_2_recommend = %s
  245. WHERE root_source_id = %s and recall_dt = %s;
  246. """
  247. self.piaoquan_crawler_db_client.save(
  248. query=update_sql,
  249. params=(
  250. mini_program_detail['first_uv'],
  251. mini_program_detail['split0'],
  252. mini_program_detail['split0_head'],
  253. mini_program_detail['split0_recommend'],
  254. mini_program_detail['split1'],
  255. mini_program_detail['split1_head'],
  256. mini_program_detail['split1_recommend'],
  257. mini_program_detail['split2'],
  258. mini_program_detail['split2_head'],
  259. mini_program_detail['split2_recommend'],
  260. root_source_id,
  261. recall_dt
  262. )
  263. )
  264. else:
  265. return
  266. def update_published_articles_job(self, biz_date=None):
  267. """
  268. 将文章信息存入裂变表中
  269. :param biz_date:
  270. :return:
  271. """
  272. if not biz_date:
  273. biz_date = datetime.today().strftime('%Y-%m-%d')
  274. published_article_list = self.get_articles_published_yesterday(biz_date)
  275. failed_article_list = []
  276. for article_info in tqdm(published_article_list, desc="update_published_articles_job"):
  277. failed_article = self.record_each_article(article_info)
  278. if failed_article:
  279. failed_article_list.append(failed_article)
  280. # retry
  281. second_try_fail_article_list = []
  282. if failed_article_list:
  283. for failed_article in failed_article_list:
  284. second_fail = self.record_each_article(failed_article)
  285. if second_fail:
  286. second_try_fail_article_list.append(second_fail)
  287. bot(
  288. title="更新文章任务完成",
  289. detail={
  290. "finish_time": datetime.now().strftime("%Y-%m-%d %H:%M:%S")
  291. },
  292. mention=False
  293. )
  294. if second_try_fail_article_list:
  295. bot(
  296. title="更新文章任务存在文章抓取失败",
  297. detail=[
  298. {
  299. "account": line['accountName'],
  300. "title": line['title'],
  301. "url": line['ContentUrl']
  302. }
  303. for line in second_try_fail_article_list
  304. ]
  305. )
  306. def update_mini_program_detail_job(self, biz_date=None):
  307. """
  308. update mini program detail info
  309. :param biz_date:
  310. :return:
  311. """
  312. if not biz_date:
  313. biz_date = datetime.today().strftime('%Y-%m-%d')
  314. # get root_source_id_list
  315. root_source_id_obj_list = self.get_root_source_id_for_three_days(biz_date)
  316. log(
  317. task=TASK_NAME,
  318. function="update_minigram_detail",
  319. message="获取前三天的 rootSourceId, 一共有 {} 条记录".format(len(root_source_id_obj_list))
  320. )
  321. fail_count = 0
  322. for item in tqdm(root_source_id_obj_list, desc="update_mini_program_detail_job"):
  323. try:
  324. self.update_each_root_source_id(
  325. root_source_id=item['root_source_id'],
  326. recall_dt=item['recall_dt']
  327. )
  328. except Exception as e:
  329. log(
  330. task=TASK_NAME,
  331. function="update_minigram_detail",
  332. status="fail",
  333. message="更新单条数据失败, 报错信息是 {}".format(e),
  334. data={"error_msg": traceback.format_exc()}
  335. )
  336. fail_count += 1
  337. if fail_count:
  338. bot(
  339. title="{} fail because of lam db error".format(TASK_NAME),
  340. detail={
  341. "fail_count": fail_count
  342. }
  343. )