update_published_articles_minigram_detail.py 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439
  1. """
  2. @author: luojunhui
  3. """
  4. import json
  5. import traceback
  6. from datetime import datetime, timedelta
  7. from typing import List, Dict
  8. from tqdm import tqdm
  9. from pymysql.cursors import DictCursor
  10. from applications import bot, log
  11. from applications.db import DatabaseConnector
  12. from applications.exception import SpiderError
  13. from applications.utils import extract_root_source_id
  14. from cold_start.crawler.wechat import get_article_detail
  15. from config import long_articles_config, piaoquan_crawler_config
  16. TASK_NAME = "updateMinigramInfoDaily"
  17. ARTICLE_TABLE = "official_articles_v2"
  18. DETAIL_TABLE = "long_articles_detail_info"
  19. EMPTY_LIST = []
  20. EMPTY_DICT = {}
  21. class Const:
  22. ARTICLE_SUCCESS_CODE = 0
  23. # 记录默认状态
  24. DEFAULT_STATUS = 0
  25. # 请求接口失败状态
  26. REQUEST_FAIL_STATUS = -1
  27. # 文章被删除状态
  28. DELETE_STATUS = -2
  29. # 未知原因无信息返回状态
  30. UNKNOWN_STATUS = -3
  31. # 文章违规状态
  32. ILLEGAL_STATUS = -4
  33. class UpdatePublishedArticlesMinigramDetail(Const):
  34. """
  35. 更新已发布文章数据
  36. """
  37. def __init__(self):
  38. self.piaoquan_crawler_db_client = None
  39. self.long_articles_db_client = None
  40. def init_database(self) -> None:
  41. """
  42. init database connector
  43. :return:
  44. """
  45. # 初始化数据库连接
  46. try:
  47. self.piaoquan_crawler_db_client = DatabaseConnector(piaoquan_crawler_config)
  48. self.piaoquan_crawler_db_client.connect()
  49. self.long_articles_db_client = DatabaseConnector(long_articles_config)
  50. self.long_articles_db_client.connect()
  51. except Exception as e:
  52. error_msg = traceback.format_exc()
  53. bot(
  54. title="更新小程序裂变信息任务连接数据库失败",
  55. detail={"error": e, "msg": error_msg},
  56. )
  57. return
  58. def check_articles(self) -> List[Dict]:
  59. """
  60. 校验是否存在文章未更新得到发布时间
  61. :return:
  62. """
  63. sql = f"""
  64. SELECT ContentUrl, wx_sn
  65. FROM {ARTICLE_TABLE}
  66. WHERE publish_timestamp IN {(self.DEFAULT_STATUS, self.REQUEST_FAIL_STATUS)};
  67. """
  68. response = self.piaoquan_crawler_db_client.fetch(sql, cursor_type=DictCursor)
  69. return response
  70. def get_root_source_id_result(self, root_source_id: str, dt: str) -> Dict:
  71. """
  72. 获取文章的root_source_id
  73. :param dt:
  74. :param root_source_id:
  75. :return:
  76. """
  77. select_sql = f"""
  78. SELECT first_uv, split0, split0_head, split0_recommend, split1, split1_head, split1_recommend, split2, split2_head, split2_recommend
  79. FROM changwen_data_rootsourceid
  80. WHERE root_source_id = '{root_source_id}' AND dt = '{dt}';
  81. """
  82. result = self.long_articles_db_client.fetch(select_sql, cursor_type=DictCursor)
  83. if result:
  84. return result[0]
  85. else:
  86. return EMPTY_DICT
  87. def get_articles_published_yesterday(self, biz_date: str) -> List[Dict]:
  88. """
  89. 获取发布时间在biz_date前一天0点-23:59:59的文章
  90. :return:
  91. """
  92. sql = f"""
  93. SELECT ContentUrl, wx_sn, publish_timestamp, accountName, title, root_source_id_list
  94. FROM official_articles_v2
  95. WHERE FROM_UNIXTIME(publish_timestamp)
  96. BETWEEN DATE_SUB('{biz_date}', INTERVAL 1 DAY) AND DATE_SUB('{biz_date}', INTERVAL 1 SECOND);
  97. """
  98. article_list = self.piaoquan_crawler_db_client.fetch(
  99. query=sql, cursor_type=DictCursor
  100. )
  101. return article_list
  102. def insert_each_root_source_id(
  103. self,
  104. wx_sn,
  105. mini_title,
  106. mini_name,
  107. cover_url,
  108. video_index,
  109. root_source_id,
  110. video_id,
  111. publish_dt,
  112. recall_dt,
  113. ) -> int:
  114. """
  115. :param recall_dt: 召回日期
  116. :param publish_dt: 文章发布日期
  117. :param video_id: 视频id
  118. :param video_index: 视频位置
  119. :param cover_url: 视频封面
  120. :param mini_name: 小程序名称
  121. :param mini_title: 小程序标题
  122. :param wx_sn:
  123. :param root_source_id:
  124. :return:
  125. """
  126. insert_sql = f"""
  127. INSERT IGNORE INTO {DETAIL_TABLE}
  128. (wx_sn, mini_title, mini_name, cover_url, video_index, root_source_id, video_id, publish_dt, recall_dt)
  129. values
  130. (%s, %s, %s, %s, %s, %s, %s, %s, %s);
  131. """
  132. affected_rows = self.piaoquan_crawler_db_client.save(
  133. query=insert_sql,
  134. params=(
  135. wx_sn,
  136. mini_title,
  137. mini_name,
  138. cover_url,
  139. video_index,
  140. root_source_id,
  141. video_id,
  142. publish_dt,
  143. recall_dt,
  144. ),
  145. )
  146. return affected_rows
  147. def record_each_article(self, article_info: Dict) -> Dict:
  148. """
  149. 记录每篇文章的root_source_id
  150. 数量集: article_count * mini_program_count * days_count
  151. :param article_info:
  152. :return:
  153. """
  154. url = article_info["ContentUrl"]
  155. publish_timestamp = article_info["publish_timestamp"]
  156. wx_sn = article_info["wx_sn"].decode()
  157. root_source_id_list = json.loads(
  158. article_info["root_source_id_list"]
  159. if article_info["root_source_id_list"]
  160. else EMPTY_LIST
  161. )
  162. try:
  163. article_mini_program_detail = self.get_article_mini_program_info(
  164. url, root_source_id_list
  165. )
  166. except Exception as e:
  167. return {}
  168. if article_mini_program_detail:
  169. log(
  170. task=TASK_NAME,
  171. function="record_each_article",
  172. message="获取文章链接对应的 rootSourceId 成功",
  173. data={
  174. "ContentUrl": url,
  175. "wxSn": wx_sn,
  176. "publish_timestamp": publish_timestamp,
  177. "miniInfo": article_mini_program_detail,
  178. },
  179. )
  180. try:
  181. publish_date = datetime.fromtimestamp(publish_timestamp)
  182. # generate T+0, T+1, T+2 date string
  183. recall_dt_str_list = [
  184. (publish_date + timedelta(days=i)).strftime("%Y-%m-%d")
  185. for i in range(3)
  186. ]
  187. for date_str in recall_dt_str_list:
  188. for video_index, mini_item in enumerate(
  189. article_mini_program_detail, 1
  190. ):
  191. image_url = mini_item["image_url"]
  192. nick_name = mini_item["nike_name"]
  193. # extract video id and root_source_id
  194. if mini_item.get("root_source_id") and mini_item.get(
  195. "video_id"
  196. ):
  197. root_source_id = mini_item["root_source_id"]
  198. video_id = mini_item["video_id"]
  199. else:
  200. id_info = extract_root_source_id(mini_item["path"])
  201. root_source_id = id_info["root_source_id"]
  202. video_id = id_info["video_id"]
  203. kimi_title = mini_item["title"]
  204. self.insert_each_root_source_id(
  205. wx_sn=wx_sn,
  206. mini_title=kimi_title,
  207. mini_name=nick_name,
  208. cover_url=image_url,
  209. video_index=video_index,
  210. root_source_id=root_source_id,
  211. video_id=video_id,
  212. publish_dt=publish_date.strftime("%Y-%m-%d"),
  213. recall_dt=date_str,
  214. )
  215. return EMPTY_DICT
  216. except Exception as e:
  217. error_msg = traceback.format_exc()
  218. log(
  219. task=TASK_NAME,
  220. function="record_each_article",
  221. status="fail",
  222. message="插入数据失败, 失败原因是{}--{}".format(e, error_msg),
  223. )
  224. return article_info
  225. else:
  226. return EMPTY_DICT
  227. def get_article_mini_program_info(
  228. self, content_url: str, root_source_id_list: list
  229. ) -> List[Dict]:
  230. """
  231. 获取文章的小程序信息
  232. :return:
  233. """
  234. if root_source_id_list:
  235. # 说明已经获取到 root_source_id了
  236. fetch_sql = f"""
  237. select video_id, root_source_id from long_articles_root_source_id where root_source_id in %s;
  238. """
  239. fetch_response = self.long_articles_db_client.fetch(
  240. query=fetch_sql,
  241. params=(tuple(root_source_id_list),),
  242. cursor_type=DictCursor,
  243. )
  244. mini_info = []
  245. if fetch_response:
  246. # 构造 mini_info 的格式
  247. for item in fetch_response:
  248. mini_info.append(
  249. {
  250. "app_id": "wx89e7eb06478361d7",
  251. "avatar": "https://rescdn.yishihui.com/0temp/logo.png",
  252. "image_url": "",
  253. "nike_name": "票圈 l 3亿人喜欢的视频平台",
  254. "root_source_id": item["root_source_id"],
  255. "video_id": item["video_id"],
  256. "service_type": "0",
  257. "title": "",
  258. "type": "card",
  259. }
  260. )
  261. return mini_info
  262. try:
  263. article_detail = get_article_detail(content_url)
  264. except Exception as e:
  265. raise SpiderError(error=e, spider="detail", url=content_url)
  266. response_code = article_detail["code"]
  267. if response_code == self.ARTICLE_SUCCESS_CODE:
  268. mini_info = article_detail["data"]["data"]["mini_program"]
  269. return mini_info
  270. else:
  271. return EMPTY_LIST
  272. def get_root_source_id_for_three_days(self, biz_date: str) -> List[Dict]:
  273. """
  274. 获取publish_dt在 biz_date前三天的root_source_id
  275. :param biz_date:
  276. :return:
  277. """
  278. sql = f"""
  279. SELECT recall_dt, root_source_id
  280. FROM {DETAIL_TABLE}
  281. WHERE publish_dt
  282. BETWEEN DATE_SUB('{biz_date}', INTERVAL 3 DAY) AND DATE_SUB('{biz_date}', INTERVAL 1 SECOND);
  283. """
  284. article_list = self.piaoquan_crawler_db_client.fetch(
  285. query=sql, cursor_type=DictCursor
  286. )
  287. return article_list
  288. def update_each_root_source_id(self, recall_dt: str, root_source_id: str) -> None:
  289. """
  290. :param recall_dt:
  291. :param root_source_id:
  292. :return:
  293. """
  294. mini_program_detail = self.get_root_source_id_result(
  295. root_source_id=root_source_id, dt=recall_dt
  296. )
  297. if mini_program_detail:
  298. # do update job
  299. update_sql = f"""
  300. UPDATE {DETAIL_TABLE}
  301. SET first_level = %s,
  302. fission_0 = %s, fission_0_head = %s, fission_0_recommend = %s,
  303. fission_1 = %s, fission_1_head = %s, fission_1_recommend = %s,
  304. fission_2 = %s, fission_2_head = %s, fission_2_recommend = %s
  305. WHERE root_source_id = %s and recall_dt = %s;
  306. """
  307. self.piaoquan_crawler_db_client.save(
  308. query=update_sql,
  309. params=(
  310. mini_program_detail["first_uv"],
  311. mini_program_detail["split0"],
  312. mini_program_detail["split0_head"],
  313. mini_program_detail["split0_recommend"],
  314. mini_program_detail["split1"],
  315. mini_program_detail["split1_head"],
  316. mini_program_detail["split1_recommend"],
  317. mini_program_detail["split2"],
  318. mini_program_detail["split2_head"],
  319. mini_program_detail["split2_recommend"],
  320. root_source_id,
  321. recall_dt,
  322. ),
  323. )
  324. else:
  325. return
  326. def update_published_articles_job(self, biz_date=None):
  327. """
  328. 将文章信息存入裂变表中
  329. :param biz_date:
  330. :return:
  331. """
  332. if not biz_date:
  333. biz_date = datetime.today().strftime("%Y-%m-%d")
  334. published_article_list = self.get_articles_published_yesterday(biz_date)
  335. failed_article_list = []
  336. for article_info in tqdm(
  337. published_article_list, desc="update_published_articles_job"
  338. ):
  339. failed_article = self.record_each_article(article_info)
  340. if failed_article:
  341. failed_article_list.append(failed_article)
  342. # retry
  343. second_try_fail_article_list = []
  344. if failed_article_list:
  345. for failed_article in failed_article_list:
  346. second_fail = self.record_each_article(failed_article)
  347. if second_fail:
  348. second_try_fail_article_list.append(second_fail)
  349. bot(
  350. title="更新文章任务完成",
  351. detail={"finish_time": datetime.now().strftime("%Y-%m-%d %H:%M:%S")},
  352. mention=False,
  353. )
  354. if second_try_fail_article_list:
  355. bot(
  356. title="更新文章任务存在文章抓取失败",
  357. detail=[
  358. {
  359. "account": line["accountName"],
  360. "title": line["title"],
  361. "url": line["ContentUrl"],
  362. }
  363. for line in second_try_fail_article_list
  364. ],
  365. )
  366. def update_mini_program_detail_job(self, biz_date=None):
  367. """
  368. update mini program detail info
  369. :param biz_date:
  370. :return:
  371. """
  372. if not biz_date:
  373. biz_date = datetime.today().strftime("%Y-%m-%d")
  374. # get root_source_id_list
  375. root_source_id_obj_list = self.get_root_source_id_for_three_days(biz_date)
  376. log(
  377. task=TASK_NAME,
  378. function="update_minigram_detail",
  379. message="获取前三天的 rootSourceId, 一共有 {} 条记录".format(
  380. len(root_source_id_obj_list)
  381. ),
  382. )
  383. fail_count = 0
  384. for item in tqdm(
  385. root_source_id_obj_list, desc="update_mini_program_detail_job"
  386. ):
  387. try:
  388. self.update_each_root_source_id(
  389. root_source_id=item["root_source_id"], recall_dt=item["recall_dt"]
  390. )
  391. except Exception as e:
  392. log(
  393. task=TASK_NAME,
  394. function="update_minigram_detail",
  395. status="fail",
  396. message="更新单条数据失败, 报错信息是 {}".format(e),
  397. data={"error_msg": traceback.format_exc()},
  398. )
  399. fail_count += 1
  400. if fail_count:
  401. bot(
  402. title="{} fail because of lam db error".format(TASK_NAME),
  403. detail={"fail_count": fail_count},
  404. )