update_published_articles_minigram_detail.py 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411
  1. """
  2. @author: luojunhui
  3. """
  4. import json
  5. import traceback
  6. from datetime import datetime, timedelta
  7. from typing import List, Dict
  8. from tqdm import tqdm
  9. from urllib.parse import urlparse, parse_qs
  10. from pymysql.cursors import DictCursor
  11. from applications import bot
  12. from applications import log
  13. from applications import Functions
  14. from applications import WeixinSpider
  15. from applications.db import DatabaseConnector
  16. from applications.const import UpdateMiniProgramDetailConst
  17. from applications.exception import SpiderError
  18. from config import long_articles_config, piaoquan_crawler_config
  19. const = UpdateMiniProgramDetailConst()
  20. spider = WeixinSpider()
  21. functions = Functions()
  22. TASK_NAME = "updateMinigramInfoDaily"
  23. ARTICLE_TABLE = "official_articles_v2"
  24. DETAIL_TABLE = "long_articles_detail_info"
  25. EMPTY_LIST = []
  26. EMPTY_DICT = {}
  27. def extract_path(path: str) -> Dict:
  28. """
  29. 提取path参数
  30. :param path:
  31. :return:
  32. """
  33. params = parse_qs(urlparse(path).query)
  34. jump_page = params.get('jumpPage', [None])[0]
  35. if jump_page:
  36. params2 = parse_qs(jump_page)
  37. res = {
  38. "video_id": params2['pages/user-videos?id'][0],
  39. "root_source_id": params2['rootSourceId'][0],
  40. }
  41. return res
  42. else:
  43. return EMPTY_DICT
  44. class UpdatePublishedArticlesMinigramDetail(object):
  45. """
  46. 更新已发布文章数据
  47. """
  48. def __init__(self):
  49. self.piaoquan_crawler_db_client = None
  50. self.long_articles_db_client = None
  51. def init_database(self) -> None:
  52. """
  53. init database connector
  54. :return:
  55. """
  56. # 初始化数据库连接
  57. try:
  58. self.piaoquan_crawler_db_client = DatabaseConnector(piaoquan_crawler_config)
  59. self.piaoquan_crawler_db_client.connect()
  60. self.long_articles_db_client = DatabaseConnector(long_articles_config)
  61. self.long_articles_db_client.connect()
  62. except Exception as e:
  63. error_msg = traceback.format_exc()
  64. bot(
  65. title="更新小程序裂变信息任务连接数据库失败",
  66. detail={
  67. "error": e,
  68. "msg": error_msg
  69. }
  70. )
  71. return
  72. def check_articles(self) -> List[Dict]:
  73. """
  74. 校验是否存在文章未更新得到发布时间
  75. :return:
  76. """
  77. sql = f"""
  78. SELECT ContentUrl, wx_sn
  79. FROM {ARTICLE_TABLE}
  80. WHERE publish_timestamp IN {(const.DEFAULT_STATUS, const.REQUEST_FAIL_STATUS)};
  81. """
  82. response = self.piaoquan_crawler_db_client.fetch(sql, cursor_type=DictCursor)
  83. return response
  84. def get_root_source_id_result(self, root_source_id: str, dt: str) -> Dict:
  85. """
  86. 获取文章的root_source_id
  87. :param dt:
  88. :param root_source_id:
  89. :return:
  90. """
  91. select_sql = f"""
  92. SELECT first_uv, split0, split0_head, split0_recommend, split1, split1_head, split1_recommend, split2, split2_head, split2_recommend
  93. FROM changwen_data_rootsourceid
  94. WHERE root_source_id = '{root_source_id}' AND dt = '{dt}';
  95. """
  96. result = self.long_articles_db_client.fetch(select_sql, cursor_type=DictCursor)
  97. if result:
  98. return result[0]
  99. else:
  100. return EMPTY_DICT
  101. def get_articles_published_yesterday(self, biz_date: str) -> List[Dict]:
  102. """
  103. 获取发布时间在biz_date前一天0点-23:59:59的文章
  104. :return:
  105. """
  106. sql = f"""
  107. SELECT ContentUrl, wx_sn, publish_timestamp, accountName, title, root_source_id_list
  108. FROM official_articles_v2
  109. WHERE FROM_UNIXTIME(publish_timestamp)
  110. BETWEEN DATE_SUB('{biz_date}', INTERVAL 1 DAY) AND DATE_SUB('{biz_date}', INTERVAL 1 SECOND);
  111. """
  112. article_list = self.piaoquan_crawler_db_client.fetch(query=sql, cursor_type=DictCursor)
  113. return article_list
  114. def insert_each_root_source_id(self, wx_sn, mini_title, mini_name, cover_url, video_index, root_source_id, video_id, publish_dt, recall_dt) -> int:
  115. """
  116. :param recall_dt: 召回日期
  117. :param publish_dt: 文章发布日期
  118. :param video_id: 视频id
  119. :param video_index: 视频位置
  120. :param cover_url: 视频封面
  121. :param mini_name: 小程序名称
  122. :param mini_title: 小程序标题
  123. :param wx_sn:
  124. :param root_source_id:
  125. :return:
  126. """
  127. insert_sql = f"""
  128. INSERT IGNORE INTO {DETAIL_TABLE}
  129. (wx_sn, mini_title, mini_name, cover_url, video_index, root_source_id, video_id, publish_dt, recall_dt)
  130. values
  131. (%s, %s, %s, %s, %s, %s, %s, %s, %s);
  132. """
  133. affected_rows = self.piaoquan_crawler_db_client.save(
  134. query=insert_sql,
  135. params=(
  136. wx_sn, mini_title, mini_name, cover_url, video_index, root_source_id, video_id, publish_dt, recall_dt
  137. )
  138. )
  139. return affected_rows
  140. def record_each_article(self, article_info: Dict) -> Dict:
  141. """
  142. 记录每篇文章的root_source_id
  143. 数量集: article_count * mini_program_count * days_count
  144. :param article_info:
  145. :return:
  146. """
  147. url = article_info['ContentUrl']
  148. publish_timestamp = article_info['publish_timestamp']
  149. wx_sn = article_info['wx_sn'].decode()
  150. root_source_id_list = json.loads(article_info['root_source_id_list'] if article_info['root_source_id_list'] else EMPTY_LIST)
  151. article_mini_program_detail = self.get_article_mini_program_info(url, root_source_id_list)
  152. if article_mini_program_detail:
  153. log(
  154. task=TASK_NAME,
  155. function="record_each_article",
  156. message="获取文章链接对应的 rootSourceId 成功",
  157. data={
  158. "ContentUrl": url,
  159. "wxSn": wx_sn,
  160. "publish_timestamp": publish_timestamp,
  161. "miniInfo": article_mini_program_detail
  162. }
  163. )
  164. try:
  165. publish_date = datetime.fromtimestamp(publish_timestamp)
  166. # generate T+0, T+1, T+2 date string
  167. recall_dt_str_list = [
  168. (publish_date + timedelta(days=i)).strftime('%Y-%m-%d')
  169. for i in range(3)
  170. ]
  171. for date_str in recall_dt_str_list:
  172. for video_index, mini_item in enumerate(article_mini_program_detail, 1):
  173. image_url = mini_item['image_url']
  174. nick_name = mini_item['nike_name']
  175. # extract video id and root_source_id
  176. if mini_item.get("root_source_id") and mini_item.get("video_id"):
  177. root_source_id = mini_item['root_source_id']
  178. video_id = mini_item['video_id']
  179. else:
  180. id_info = extract_path(mini_item['path'])
  181. root_source_id = id_info['root_source_id']
  182. video_id = id_info['video_id']
  183. kimi_title = mini_item['title']
  184. self.insert_each_root_source_id(
  185. wx_sn=wx_sn,
  186. mini_title=kimi_title,
  187. mini_name=nick_name,
  188. cover_url=image_url,
  189. video_index=video_index,
  190. root_source_id=root_source_id,
  191. video_id=video_id,
  192. publish_dt=publish_date.strftime('%Y-%m-%d'),
  193. recall_dt=date_str
  194. )
  195. return EMPTY_DICT
  196. except Exception as e:
  197. error_msg = traceback.format_exc()
  198. log(
  199. task=TASK_NAME,
  200. function="record_each_article",
  201. status="fail",
  202. message="插入数据失败, 失败原因是{}--{}".format(e, error_msg)
  203. )
  204. return article_info
  205. else:
  206. return EMPTY_DICT
  207. def get_article_mini_program_info(self, content_url: str, root_source_id_list: list) -> List[Dict]:
  208. """
  209. 获取文章的小程序信息
  210. :return:
  211. """
  212. if root_source_id_list:
  213. # 说明已经获取到 root_source_id了
  214. fetch_sql = f"""
  215. select video_id, root_source_id from long_articles_root_source_id where root_source_id in %s;
  216. """
  217. fetch_response = self.long_articles_db_client.fetch(
  218. query=fetch_sql,
  219. params=(tuple(root_source_id_list),),
  220. cursor_type=DictCursor
  221. )
  222. mini_info = []
  223. if fetch_response:
  224. # 构造 mini_info 的格式
  225. for item in fetch_response:
  226. mini_info.append(
  227. {
  228. "app_id": "wx89e7eb06478361d7",
  229. "avatar": "https://rescdn.yishihui.com/0temp/logo.png",
  230. "image_url": "",
  231. "nike_name": "票圈 l 3亿人喜欢的视频平台",
  232. "root_source_id": item['root_source_id'],
  233. "video_id": item['video_id'],
  234. "service_type": "0",
  235. "title": "",
  236. "type": "card"
  237. }
  238. )
  239. return mini_info
  240. try:
  241. article_detail = spider.get_article_text(content_url)
  242. except Exception as e:
  243. raise SpiderError(error=e, spider="detail", url=content_url)
  244. response_code = article_detail['code']
  245. if response_code == const.ARTICLE_SUCCESS_CODE:
  246. mini_info = article_detail['data']['data']['mini_program']
  247. return mini_info
  248. else:
  249. return EMPTY_LIST
  250. def get_root_source_id_for_three_days(self, biz_date: str) -> List[Dict]:
  251. """
  252. 获取publish_dt在 biz_date前三天的root_source_id
  253. :param biz_date:
  254. :return:
  255. """
  256. sql = f"""
  257. SELECT recall_dt, root_source_id
  258. FROM {DETAIL_TABLE}
  259. WHERE publish_dt
  260. BETWEEN DATE_SUB('{biz_date}', INTERVAL 3 DAY) AND DATE_SUB('{biz_date}', INTERVAL 1 SECOND);
  261. """
  262. article_list = self.piaoquan_crawler_db_client.fetch(query=sql, cursor_type=DictCursor)
  263. return article_list
  264. def update_each_root_source_id(self, recall_dt: str, root_source_id: str) -> None:
  265. """
  266. :param recall_dt:
  267. :param root_source_id:
  268. :return:
  269. """
  270. mini_program_detail = self.get_root_source_id_result(root_source_id=root_source_id, dt=recall_dt)
  271. if mini_program_detail:
  272. # do update job
  273. update_sql = f"""
  274. UPDATE {DETAIL_TABLE}
  275. SET first_level = %s,
  276. fission_0 = %s, fission_0_head = %s, fission_0_recommend = %s,
  277. fission_1 = %s, fission_1_head = %s, fission_1_recommend = %s,
  278. fission_2 = %s, fission_2_head = %s, fission_2_recommend = %s
  279. WHERE root_source_id = %s and recall_dt = %s;
  280. """
  281. self.piaoquan_crawler_db_client.save(
  282. query=update_sql,
  283. params=(
  284. mini_program_detail['first_uv'],
  285. mini_program_detail['split0'],
  286. mini_program_detail['split0_head'],
  287. mini_program_detail['split0_recommend'],
  288. mini_program_detail['split1'],
  289. mini_program_detail['split1_head'],
  290. mini_program_detail['split1_recommend'],
  291. mini_program_detail['split2'],
  292. mini_program_detail['split2_head'],
  293. mini_program_detail['split2_recommend'],
  294. root_source_id,
  295. recall_dt
  296. )
  297. )
  298. else:
  299. return
  300. def update_published_articles_job(self, biz_date=None):
  301. """
  302. 将文章信息存入裂变表中
  303. :param biz_date:
  304. :return:
  305. """
  306. if not biz_date:
  307. biz_date = datetime.today().strftime('%Y-%m-%d')
  308. published_article_list = self.get_articles_published_yesterday(biz_date)
  309. failed_article_list = []
  310. for article_info in tqdm(published_article_list, desc="update_published_articles_job"):
  311. failed_article = self.record_each_article(article_info)
  312. if failed_article:
  313. failed_article_list.append(failed_article)
  314. # retry
  315. second_try_fail_article_list = []
  316. if failed_article_list:
  317. for failed_article in failed_article_list:
  318. second_fail = self.record_each_article(failed_article)
  319. if second_fail:
  320. second_try_fail_article_list.append(second_fail)
  321. bot(
  322. title="更新文章任务完成",
  323. detail={
  324. "finish_time": datetime.now().strftime("%Y-%m-%d %H:%M:%S")
  325. },
  326. mention=False
  327. )
  328. if second_try_fail_article_list:
  329. bot(
  330. title="更新文章任务存在文章抓取失败",
  331. detail=[
  332. {
  333. "account": line['accountName'],
  334. "title": line['title'],
  335. "url": line['ContentUrl']
  336. }
  337. for line in second_try_fail_article_list
  338. ]
  339. )
  340. def update_mini_program_detail_job(self, biz_date=None):
  341. """
  342. update mini program detail info
  343. :param biz_date:
  344. :return:
  345. """
  346. if not biz_date:
  347. biz_date = datetime.today().strftime('%Y-%m-%d')
  348. # get root_source_id_list
  349. root_source_id_obj_list = self.get_root_source_id_for_three_days(biz_date)
  350. log(
  351. task=TASK_NAME,
  352. function="update_minigram_detail",
  353. message="获取前三天的 rootSourceId, 一共有 {} 条记录".format(len(root_source_id_obj_list))
  354. )
  355. fail_count = 0
  356. for item in tqdm(root_source_id_obj_list, desc="update_mini_program_detail_job"):
  357. try:
  358. self.update_each_root_source_id(
  359. root_source_id=item['root_source_id'],
  360. recall_dt=item['recall_dt']
  361. )
  362. except Exception as e:
  363. log(
  364. task=TASK_NAME,
  365. function="update_minigram_detail",
  366. status="fail",
  367. message="更新单条数据失败, 报错信息是 {}".format(e),
  368. data={"error_msg": traceback.format_exc()}
  369. )
  370. fail_count += 1
  371. if fail_count:
  372. bot(
  373. title="{} fail because of lam db error".format(TASK_NAME),
  374. detail={
  375. "fail_count": fail_count
  376. }
  377. )