update_published_articles_minigram_detail.py 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415
  1. """
  2. @author: luojunhui
  3. """
  4. import traceback
  5. from datetime import datetime, timedelta
  6. from typing import List, Dict
  7. from tqdm import tqdm
  8. from urllib.parse import urlparse, parse_qs
  9. from pymysql.cursors import DictCursor
  10. from applications import bot
  11. from applications import log
  12. from applications import Functions
  13. from applications import WeixinSpider
  14. from applications.db import DatabaseConnector
  15. from applications.const import UpdateMiniProgramDetailConst
  16. from applications.exception import SpiderError
  17. from config import long_articles_config, piaoquan_crawler_config
  18. const = UpdateMiniProgramDetailConst()
  19. spider = WeixinSpider()
  20. functions = Functions()
  21. TASK_NAME = "updateMinigramInfoDaily"
  22. ARTICLE_TABLE = "official_articles_v2"
  23. DETAIL_TABLE = "long_articles_detail_info_dev"
  24. EMPTY_LIST = []
  25. def get_root_source_id_list(mini_program: List[Dict]) -> List[str]:
  26. """
  27. 校验是否存在文章是否存在root_source_id
  28. :return:
  29. """
  30. root_source_id_list = []
  31. for item in mini_program:
  32. path = item['path']
  33. # 解析主URL的查询参数
  34. params = parse_qs(urlparse(path).query)
  35. # 提取 'jumpPage' 参数的值并解析它的查询参数
  36. jump_page = params.get('jumpPage', [None])[0]
  37. if jump_page:
  38. params2 = parse_qs(jump_page)
  39. # 提取 'rootSourceId' 参数的值
  40. root_source_id = params2.get('rootSourceId', [None])[0]
  41. if root_source_id:
  42. root_source_id_list.append(root_source_id)
  43. return root_source_id_list
  44. def get_article_mini_program_info(content_url: str) -> List[Dict]:
  45. """
  46. 获取文章的小程序信息
  47. :return:
  48. """
  49. try:
  50. article_detail = spider.get_article_text(content_url)
  51. except Exception as e:
  52. raise SpiderError(error=e, spider="detail", url=content_url)
  53. response_code = article_detail['code']
  54. if response_code == const.ARTICLE_SUCCESS_CODE:
  55. mini_info = article_detail['data']['data']['mini_program']
  56. return mini_info
  57. else:
  58. return EMPTY_LIST
  59. class UpdatePublishedArticlesMinigramDetail(object):
  60. """
  61. 更新已发布文章数据
  62. """
  63. def __init__(self):
  64. self.piaoquan_crawler_db_client = None
  65. self.long_articles_db_client = None
  66. def init_database(self) -> None:
  67. """
  68. init database connector
  69. :return:
  70. """
  71. # 初始化数据库连接
  72. try:
  73. self.piaoquan_crawler_db_client = DatabaseConnector(piaoquan_crawler_config)
  74. self.piaoquan_crawler_db_client.connect()
  75. self.long_articles_db_client = DatabaseConnector(long_articles_config)
  76. self.long_articles_db_client.connect()
  77. except Exception as e:
  78. error_msg = traceback.format_exc()
  79. bot(
  80. title="更新小程序裂变信息任务连接数据库失败",
  81. detail={
  82. "error": e,
  83. "msg": error_msg
  84. }
  85. )
  86. return
  87. def check_articles(self) -> List[Dict]:
  88. """
  89. 校验是否存在文章未更新得到发布时间
  90. :return:
  91. """
  92. sql = f"""
  93. SELECT ContentUrl, wx_sn
  94. FROM {ARTICLE_TABLE}
  95. WHERE publish_timestamp IN {(const.DEFAULT_STATUS, const.REQUEST_FAIL_STATUS)};"""
  96. response = self.piaoquan_crawler_db_client.fetch(sql, cursor_type=DictCursor)
  97. return response
  98. def get_root_source_id_result(self, root_source_id: str, dt: str) -> Dict:
  99. """
  100. 获取文章的root_source_id
  101. :param dt:
  102. :param root_source_id:
  103. :return:
  104. """
  105. select_sql = f"""
  106. SELECT first_uv, split0, split1, split2
  107. FROM changwen_data_rootsourceid
  108. WHERE root_source_id = '{root_source_id}' AND dt = '{dt}';
  109. """
  110. result = self.long_articles_db_client.fetch(select_sql, cursor_type=DictCursor)
  111. if result:
  112. return result[0]
  113. else:
  114. return {}
  115. def get_articles_published_yesterday(self, biz_date: str) -> List[Dict]:
  116. """
  117. 获取发布时间在biz_date前一天0点-23:59:59的文章
  118. :return:
  119. """
  120. sql = f"""
  121. SELECT ContentUrl, wx_sn, publish_timestamp, accountName, title
  122. FROM official_articles_v2
  123. WHERE FROM_UNIXTIME(publish_timestamp)
  124. BETWEEN DATE_SUB('{biz_date}', INTERVAL 1 DAY) AND DATE_SUB('{biz_date}', INTERVAL 1 SECOND);
  125. """
  126. article_list = self.piaoquan_crawler_db_client.fetch(query=sql, cursor_type=DictCursor)
  127. return article_list
  128. def insert_each_root_source_id(self, wx_sn, mini_title, mini_name, cover_url, video_index, root_source_id, video_id, publish_dt, recall_dt) -> int:
  129. """
  130. :param recall_dt: 召回日期
  131. :param publish_dt: 文章发布日期
  132. :param video_id: 视频id
  133. :param video_index: 视频位置
  134. :param cover_url: 视频封面
  135. :param mini_name: 小程序名称
  136. :param mini_title: 小程序标题
  137. :param wx_sn:
  138. :param root_source_id:
  139. :return:
  140. """
  141. insert_sql = f"""
  142. INSERT INTO {DETAIL_TABLE}
  143. (wx_sn, mini_title, mini_name, cover_url, video_index, root_source_id, video_id, publish_dt, recall_dt)
  144. values
  145. (%s, %s, %s, %s, %s, %s, %s, %s, %s);
  146. """
  147. affected_rows = self.piaoquan_crawler_db_client.save(
  148. query=insert_sql,
  149. params=(
  150. wx_sn, mini_title, mini_name, cover_url, video_index, root_source_id, video_id, publish_dt, recall_dt
  151. )
  152. )
  153. return affected_rows
  154. def record_each_article(self, article_info: Dict) -> Dict:
  155. """
  156. 记录每篇文章的root_source_id
  157. 数量集: article_count * mini_program_count * days_count
  158. :param article_info:
  159. :return:
  160. """
  161. url = article_info['ContentUrl']
  162. publish_timestamp = article_info['publish_timestamp']
  163. wx_sn = article_info['wx_sn'].decode()
  164. article_mini_program_detail = get_article_mini_program_info(url)
  165. if article_mini_program_detail:
  166. log(
  167. task=TASK_NAME,
  168. function="record_each_article",
  169. message="获取文章链接对应的 rootSourceId 成功",
  170. data={
  171. "ContentUrl": url,
  172. "wxSn": wx_sn,
  173. "publish_timestamp": publish_timestamp,
  174. "miniInfo": article_mini_program_detail
  175. }
  176. )
  177. try:
  178. publish_date = datetime.fromtimestamp(publish_timestamp)
  179. # generate T+0, T+1, T+2 date string
  180. recall_dt_str_list = [
  181. (publish_date + timedelta(days=i)).strftime('%Y-%m-%d')
  182. for i in range(3)
  183. ]
  184. for date_str in recall_dt_str_list:
  185. for video_index, mini_item in enumerate(article_mini_program_detail, 1):
  186. image_url = mini_item['image_url']
  187. nick_name = mini_item['nike_name']
  188. root_source_id = mini_item['path'].split("rootSourceId%3D")[-1]
  189. video_id = mini_item['path'].split("videos%3Fid%3D")[1].split("%26su%3D")[0]
  190. kimi_title = mini_item['title']
  191. self.insert_each_root_source_id(
  192. wx_sn=wx_sn,
  193. mini_title=kimi_title,
  194. mini_name=nick_name,
  195. cover_url=image_url,
  196. video_index=video_index,
  197. root_source_id=root_source_id,
  198. video_id=video_id,
  199. publish_dt=publish_date.strftime('%Y-%m-%d'),
  200. recall_dt=date_str
  201. )
  202. return {}
  203. except Exception as e:
  204. error_msg = traceback.format_exc()
  205. log(
  206. task=TASK_NAME,
  207. function="record_each_article",
  208. status="fail",
  209. message="插入数据失败, 失败原因是{}--{}".format(e, error_msg)
  210. )
  211. return article_info
  212. else:
  213. return article_info
  214. # # check whether root_source_id_list is null
  215. # if root_source_id_list:
  216. # root_source_id_list = json.loads(article_info['root_source_id_list'])
  217. #
  218. # # check whether root_source_id is empty, if not, try to get
  219. # if not root_source_id_list:
  220. # root_source_id_response = check_root_source_id_list(url)
  221. # if root_source_id_response:
  222. # root_source_id_list = get_root_source_id_list(root_source_id_response)
  223. # else:
  224. # return
  225. #
  226. # for root_source_id in root_source_id_list:
  227. #
  228. # self.insert_each_root_source_id(root_source_id, article_info)
  229. #
  230. # else:
  231. # print('todo: root_source_id_list is None')
  232. def get_root_source_id_for_three_days(self, biz_date: str) -> List[Dict]:
  233. """
  234. 获取publish_dt在 biz_date前三天的root_source_id
  235. :param biz_date:
  236. :return:
  237. """
  238. sql = f"""
  239. SELECT recall_dt, root_source_id
  240. FROM {DETAIL_TABLE}
  241. WHERE publish_dt
  242. BETWEEN DATE_SUB('{biz_date}', INTERVAL 3 DAY) AND DATE_SUB('{biz_date}', INTERVAL 1 SECOND);
  243. """
  244. article_list = self.piaoquan_crawler_db_client.fetch(query=sql, cursor_type=DictCursor)
  245. return article_list
  246. def update_each_root_source_id(self, recall_dt: str, root_source_id: str) -> None:
  247. """
  248. :param recall_dt:
  249. :param root_source_id:
  250. :return:
  251. """
  252. mini_program_detail = self.get_root_source_id_result(root_source_id=root_source_id, dt=recall_dt)
  253. if mini_program_detail:
  254. # do update job
  255. update_sql = f"""
  256. UPDATE {DETAIL_TABLE}
  257. SET first_level = %s, fission_0 = %s, fission_1 = %s, fission_2 = %s
  258. WHERE root_source_id = %s and recall_dt = %s;
  259. """
  260. self.long_articles_db_client.save(
  261. query=update_sql,
  262. params=(
  263. mini_program_detail['first_uv'],
  264. mini_program_detail['split0'],
  265. mini_program_detail['split1'],
  266. mini_program_detail['split2'],
  267. root_source_id,
  268. recall_dt
  269. )
  270. )
  271. else:
  272. return
  273. def update_published_articles_job(self, biz_date=None):
  274. """
  275. 将文章信息存入裂变表中
  276. :param biz_date:
  277. :return:
  278. """
  279. if not biz_date:
  280. biz_date = datetime.today().strftime('%Y-%m-%d')
  281. published_article_list = self.get_articles_published_yesterday(biz_date)
  282. failed_article_list = []
  283. for article_info in tqdm(published_article_list, desc="update_published_articles_job"):
  284. failed_article = self.record_each_article(article_info)
  285. if failed_article:
  286. failed_article_list.append(failed_article)
  287. # retry
  288. second_try_fail_article_list = []
  289. if failed_article_list:
  290. for failed_article in failed_article_list:
  291. second_fail = self.record_each_article(failed_article)
  292. if second_fail:
  293. second_try_fail_article_list.append(second_fail)
  294. bot(
  295. title="更新文章任务完成",
  296. detail={
  297. "finish_time": datetime.now().strftime("%Y-%m-%d %H:%M:%S")
  298. },
  299. mention=False
  300. )
  301. if second_try_fail_article_list:
  302. bot(
  303. title="更新文章任务存在文章抓取失败",
  304. detail=[
  305. {
  306. "account": line['accountName'],
  307. "title": line['title'],
  308. "url": line['ContentUrl']
  309. }
  310. for line in second_try_fail_article_list
  311. ]
  312. )
  313. def update_mini_program_detail_job(self, biz_date=None):
  314. """
  315. 更新裂变信息
  316. :param biz_date:
  317. :return:
  318. """
  319. if not biz_date:
  320. biz_date = datetime.today().strftime('%Y-%m-%d')
  321. # get root_source_id_list
  322. root_source_id_obj_list = self.get_root_source_id_for_three_days(biz_date)
  323. log(
  324. task=TASK_NAME,
  325. function="update_minigram_detail",
  326. message="获取前三天的 rootSourceId, 一共有 {} 条记录".format(len(root_source_id_obj_list))
  327. )
  328. fail_count = 0
  329. for item in tqdm(root_source_id_obj_list, desc="update_mini_program_detail_job"):
  330. try:
  331. self.update_each_root_source_id(
  332. root_source_id=item['root_source_id'],
  333. recall_dt=item['recall_dt']
  334. )
  335. except Exception as e:
  336. log(
  337. task=TASK_NAME,
  338. function="update_minigram_detail",
  339. status="fail",
  340. message="更新单条数据失败, 报错信息是 {}".format(e),
  341. data={"error_msg": traceback.format_exc()}
  342. )
  343. fail_count += 1
  344. log(
  345. task=TASK_NAME,
  346. function="update_minigram_detail",
  347. message="获取前三天的 rootSourceId, 一共有 {} 条记录".format(len(root_source_id_obj_list))
  348. )
  349. fail_count = 0
  350. for item in tqdm(root_source_id_obj_list, desc="update_mini_program_detail_job"):
  351. try:
  352. self.update_each_root_source_id(
  353. root_source_id=item['root_source_id'],
  354. recall_dt=item['recall_dt']
  355. )
  356. except Exception as e:
  357. log(
  358. task=TASK_NAME,
  359. function="update_minigram_detail",
  360. status="fail",
  361. message="更新单条数据失败, 报错信息是 {}".format(e),
  362. data={"error_msg": traceback.format_exc()}
  363. )
  364. fail_count += 1
  365. if fail_count:
  366. bot(
  367. title="{} fail because of lam db error".format(TASK_NAME),
  368. detail={
  369. "fail_count": fail_count
  370. }
  371. )
  372. if fail_count:
  373. bot(
  374. title="{} fail because of lam db error".format(TASK_NAME),
  375. detail={
  376. "fail_count": fail_count
  377. }
  378. )