update_published_articles_read_detail.py 29 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742
  1. """
  2. @author: luojunhui
  3. @desc: 更新文章的阅读详情
  4. """
  5. import json
  6. import time
  7. import traceback
  8. import urllib.parse
  9. from datetime import datetime
  10. from typing import Dict, List
  11. from pymysql.cursors import DictCursor
  12. from tqdm import tqdm
  13. from applications import aiditApi
  14. from applications import bot
  15. from applications import create_feishu_columns_sheet
  16. from applications import Functions
  17. from applications import log
  18. from applications import WeixinSpider
  19. from applications.const import updatePublishedMsgTaskConst
  20. from applications.db import DatabaseConnector
  21. from config import denet_config, long_articles_config, piaoquan_crawler_config
  22. ARTICLE_TABLE = "official_articles"
  23. const = updatePublishedMsgTaskConst()
  24. spider = WeixinSpider()
  25. functions = Functions()
  26. empty_dict = {}
  27. def generate_bot_columns():
  28. """
  29. 生成列
  30. :return:
  31. """
  32. columns = [
  33. create_feishu_columns_sheet(sheet_type="plain_text", sheet_name="name", display_name="公众号名称"),
  34. create_feishu_columns_sheet(sheet_type="plain_text", sheet_name="ghId", display_name="ghId"),
  35. create_feishu_columns_sheet(sheet_type="number", sheet_name="follower_count", display_name="粉丝数"),
  36. create_feishu_columns_sheet(sheet_type="date", sheet_name="account_init_timestamp",
  37. display_name="账号接入系统时间"),
  38. create_feishu_columns_sheet(sheet_type="plain_text", sheet_name="using_status", display_name="利用状态")
  39. ]
  40. return columns
  41. class UpdatePublishedArticlesReadDetail(object):
  42. """
  43. 更新每日发布文章的阅读详情
  44. """
  45. def __init__(self):
  46. self.aigc_db_client = None
  47. self.piaoquan_crawler_db_client = None
  48. self.long_articles_db_client = None
  49. def get_account_list(self) -> List[Dict]:
  50. """
  51. 从 aigc 数据库中获取目前处于发布状态的账号
  52. :return:
  53. "name": line[0],
  54. "ghId": line[1],
  55. "follower_count": line[2],
  56. "account_init_time": int(line[3] / 1000),
  57. "account_type": line[4], # 订阅号 or 服务号
  58. "account_auth": line[5]
  59. """
  60. def get_account_status() -> Dict:
  61. """
  62. 获取账号的实验状态
  63. :return:
  64. """
  65. sql = f"""
  66. SELECT t1.account_id, t2.status
  67. FROM wx_statistics_group_source_account t1
  68. JOIN wx_statistics_group_source t2
  69. ON t1.group_source_name = t2.account_source_name;
  70. """
  71. account_status_list = self.aigc_db_client.fetch(sql, cursor_type=DictCursor)
  72. account_status = {account['account_id']: account['status'] for account in account_status_list}
  73. return account_status
  74. account_list_with_out_using_status = aiditApi.get_publish_account_from_aigc()
  75. account_status_dict = get_account_status()
  76. account_list = [
  77. {
  78. **item,
  79. 'using_status': 0 if account_status_dict.get(item['account_id']) == '实验' else 1
  80. }
  81. for item in account_list_with_out_using_status
  82. ]
  83. return account_list
  84. def get_article_info_by_trace_id(self, trace_id: str) -> Dict:
  85. """
  86. 通过trace_id来查询文章信息
  87. """
  88. select_sql = f"""
  89. SELECT t1.gh_id, t1.account_name, t2.article_title
  90. FROM long_articles_match_videos t1
  91. JOIN long_articles_text t2
  92. ON t1.content_id = t2.content_id
  93. WHERE t1.trace_id = '{trace_id}';
  94. """
  95. article_info = self.long_articles_db_client.fetch(select_sql, cursor_type=DictCursor)
  96. if article_info:
  97. return article_info[0]
  98. else:
  99. return empty_dict
  100. def init_database(self):
  101. """
  102. 初始化数据库连接
  103. """
  104. # 初始化数据库连接
  105. try:
  106. self.piaoquan_crawler_db_client = DatabaseConnector(piaoquan_crawler_config)
  107. self.piaoquan_crawler_db_client.connect()
  108. self.aigc_db_client = DatabaseConnector(denet_config)
  109. self.aigc_db_client.connect()
  110. self.long_articles_db_client = DatabaseConnector(long_articles_config)
  111. self.long_articles_db_client.connect()
  112. except Exception as e:
  113. error_msg = traceback.format_exc()
  114. bot(
  115. title="更新文章任务连接数据库失败",
  116. detail={
  117. "error": e,
  118. "msg": error_msg
  119. }
  120. )
  121. return
  122. def insert_each_msg(self, account_info: Dict, msg_list: List[Dict]) -> None:
  123. """
  124. 把消息数据更新到数据库中
  125. :param account_info:
  126. :param msg_list:
  127. :return:
  128. """
  129. gh_id = account_info['ghId']
  130. account_name = account_info['name']
  131. for info in msg_list:
  132. baseInfo = info.get("BaseInfo", {})
  133. appMsgId = info.get("AppMsg", {}).get("BaseInfo", {}).get("AppMsgId", None)
  134. createTime = info.get("AppMsg", {}).get("BaseInfo", {}).get("CreateTime", None)
  135. updateTime = info.get("AppMsg", {}).get("BaseInfo", {}).get("UpdateTime", None)
  136. Type = info.get("AppMsg", {}).get("BaseInfo", {}).get("Type", None)
  137. detail_article_list = info.get("AppMsg", {}).get("DetailInfo", [])
  138. if detail_article_list:
  139. for article in detail_article_list:
  140. title = article.get("Title", None)
  141. Digest = article.get("Digest", None)
  142. ItemIndex = article.get("ItemIndex", None)
  143. ContentUrl = article.get("ContentUrl", None)
  144. SourceUrl = article.get("SourceUrl", None)
  145. CoverImgUrl = article.get("CoverImgUrl", None)
  146. CoverImgUrl_1_1 = article.get("CoverImgUrl_1_1", None)
  147. CoverImgUrl_235_1 = article.get("CoverImgUrl_235_1", None)
  148. ItemShowType = article.get("ItemShowType", None)
  149. IsOriginal = article.get("IsOriginal", None)
  150. ShowDesc = article.get("ShowDesc", None)
  151. show_stat = functions.show_desc_to_sta(ShowDesc)
  152. ori_content = article.get("ori_content", None)
  153. show_view_count = show_stat.get("show_view_count", 0)
  154. show_like_count = show_stat.get("show_like_count", 0)
  155. show_zs_count = show_stat.get("show_zs_count", 0)
  156. show_pay_count = show_stat.get("show_pay_count", 0)
  157. wx_sn = ContentUrl.split("&sn=")[1].split("&")[0] if ContentUrl else None
  158. status = account_info['using_status']
  159. info_tuple = (
  160. gh_id,
  161. account_name,
  162. appMsgId,
  163. title,
  164. Type,
  165. createTime,
  166. updateTime,
  167. Digest,
  168. ItemIndex,
  169. ContentUrl,
  170. SourceUrl,
  171. CoverImgUrl,
  172. CoverImgUrl_1_1,
  173. CoverImgUrl_235_1,
  174. ItemShowType,
  175. IsOriginal,
  176. ShowDesc,
  177. ori_content,
  178. show_view_count,
  179. show_like_count,
  180. show_zs_count,
  181. show_pay_count,
  182. wx_sn,
  183. json.dumps(baseInfo, ensure_ascii=False),
  184. functions.str_to_md5(title),
  185. status
  186. )
  187. self.insert_each_article(
  188. info_tuple=info_tuple,
  189. show_view_count=show_view_count,
  190. show_like_count=show_like_count,
  191. wx_sn=wx_sn
  192. )
  193. def insert_each_article(self, info_tuple, show_view_count, show_like_count, wx_sn):
  194. """
  195. 插入每一篇文章
  196. """
  197. try:
  198. insert_sql = f"""
  199. INSERT INTO {ARTICLE_TABLE}
  200. (ghId, accountName, appMsgId, title, Type, createTime, updateTime, Digest, ItemIndex, ContentUrl, SourceUrl, CoverImgUrl, CoverImgUrl_1_1, CoverImgUrl_255_1, ItemShowType, IsOriginal, ShowDesc, ori_content, show_view_count, show_like_count, show_zs_count, show_pay_count, wx_sn, baseInfo, title_md5, status)
  201. values
  202. (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s);
  203. """
  204. self.piaoquan_crawler_db_client.save(query=insert_sql, params=info_tuple)
  205. log(
  206. task="updatePublishedMsgDaily",
  207. function="insert_each_msg",
  208. message="插入文章数据成功",
  209. data={
  210. "info": info_tuple
  211. }
  212. )
  213. except Exception as e:
  214. try:
  215. update_sql = f"""
  216. UPDATE {ARTICLE_TABLE}
  217. SET show_view_count = %s, show_like_count=%s
  218. WHERE wx_sn = %s;
  219. """
  220. self.piaoquan_crawler_db_client.save(query=update_sql,
  221. params=(show_view_count, show_like_count, wx_sn))
  222. log(
  223. task="updatePublishedMsgDaily",
  224. function="insert_each_msg",
  225. message="更新文章数据成功",
  226. data={
  227. "wxSn": wx_sn,
  228. "likeCount": show_like_count,
  229. "viewCount": show_view_count
  230. }
  231. )
  232. except Exception as e:
  233. log(
  234. task="updatePublishedMsgDaily",
  235. function="insert_each_msg",
  236. message="更新文章失败, 报错原因是: {}".format(e),
  237. status="fail"
  238. )
  239. def update_account_by_spider(self, account_info: Dict, cursor=None):
  240. """
  241. 更新每一个账号信息
  242. :param account_info:
  243. :param cursor:
  244. :return: None
  245. """
  246. gh_id = account_info['ghId']
  247. latest_update_time = self.get_account_info(gh_id)
  248. response = spider.update_msg_list(ghId=gh_id, index=cursor)
  249. if not response:
  250. log(
  251. task="updatePublishedMsgDaily",
  252. function="update_account_by_spider",
  253. status="fail",
  254. message="账号更新请求爬虫接口失败",
  255. data=account_info
  256. )
  257. return
  258. msg_list = response.get("data", {}).get("data", [])
  259. if msg_list:
  260. # do
  261. last_article_in_this_msg = msg_list[-1]
  262. last_time_stamp_in_this_msg = last_article_in_this_msg['AppMsg']['BaseInfo']['UpdateTime']
  263. last_url = last_article_in_this_msg['AppMsg']['DetailInfo'][0]['ContentUrl']
  264. resdata = spider.get_account_by_url(last_url)
  265. check_id = resdata['data'].get('data', {}).get('wx_gh')
  266. if check_id == gh_id:
  267. self.insert_each_msg(
  268. account_info=account_info,
  269. msg_list=msg_list
  270. )
  271. # if last_time_stamp_in_this_msg > latest_update_time:
  272. # next_cursor = response['data']['next_cursor']
  273. # return self.update_account_by_spider(
  274. # account_info=account_info,
  275. # cursor=next_cursor
  276. # )
  277. log(
  278. task="updatePublishedMsgDaily",
  279. function="update_each_account",
  280. message="账号文章更新成功",
  281. data=response
  282. )
  283. else:
  284. log(
  285. task="updatePublishedMsgDaily",
  286. function="update_each_account",
  287. message="账号文章更新失败",
  288. status="fail",
  289. data=response
  290. )
  291. return
  292. def update_account_by_aigc(self, account_info: Dict, run_date: str):
  293. """
  294. 更新单个账号的文章
  295. """
  296. gh_id = account_info['ghId']
  297. select_sql = f"""
  298. SELECT trace_id, wx_sn, published_url, publish_timestamp, root_source_id_list, create_timestamp
  299. FROM long_articles_published_trace_id
  300. WHERE gh_id = '{gh_id}' AND publish_timestamp > UNIX_TIMESTAMP(DATE_SUB('{run_date}', INTERVAL 3 DAY)) AND delete_status = 0;
  301. """
  302. result = self.long_articles_db_client.fetch(select_sql, cursor_type=DictCursor)
  303. for article in result:
  304. trace_id = article['trace_id']
  305. wx_sn = article['wx_sn']
  306. published_url = article['published_url']
  307. publish_timestamp = article['publish_timestamp']
  308. article_info = spider.get_article_text(content_link=published_url, is_cache=False, is_count=True)
  309. response_code = article_info['code']
  310. match response_code:
  311. case const.ARTICLE_SUCCESS_CODE:
  312. response_data = article_info['data']['data']
  313. title = response_data['title']
  314. article_url = response_data['content_link']
  315. show_view_count = response_data['view_count']
  316. show_like_count = response_data['like_count']
  317. show_zs_count = 0
  318. show_pay_count = 0
  319. wx_sn = article_url.split("&sn=")[1].split("&")[0] if article_url else None
  320. app_msg_id = article_url.split("&mid=")[1].split("&")[0] if article_url else None
  321. status = account_info['using_status']
  322. info_tuple = (
  323. gh_id,
  324. account_info['name'],
  325. app_msg_id,
  326. title,
  327. "9",
  328. article['create_timestamp'],
  329. response_data['update_timestamp'],
  330. None,
  331. response_data['item_index'],
  332. response_data['content_link'],
  333. None,
  334. None,
  335. None,
  336. None,
  337. None,
  338. response_data.get("is_original", None),
  339. None,
  340. None,
  341. show_view_count,
  342. show_like_count,
  343. show_zs_count,
  344. show_pay_count,
  345. wx_sn,
  346. None,
  347. functions.str_to_md5(title),
  348. status
  349. )
  350. self.insert_each_article(
  351. info_tuple=info_tuple,
  352. show_view_count=show_view_count,
  353. show_like_count=show_like_count,
  354. wx_sn=wx_sn
  355. )
  356. case const.ARTICLE_DELETE_CODE:
  357. log(
  358. task="updatePublishedMsgDaily",
  359. function="update_account_by_aigc",
  360. message="文章被删除",
  361. data={
  362. "ghId": gh_id,
  363. "publishedUrl": published_url
  364. }
  365. )
  366. case const.ARTICLE_ILLEGAL_CODE:
  367. article_detail = self.get_article_info_by_trace_id(trace_id)
  368. if article_detail:
  369. error_detail = article_info.get("msg")
  370. insert_sql = f"""
  371. INSERT IGNORE INTO illegal_articles
  372. (gh_id, account_name, title, wx_sn, publish_date, illegal_reason)
  373. VALUES
  374. (%s, %s, %s, %s, %s, %s);
  375. """
  376. affected_rows = self.long_articles_db_client.save(
  377. query=insert_sql,
  378. params=(
  379. article_info['gh_id'],
  380. article_info['account_name'],
  381. article_info['article_title'],
  382. wx_sn,
  383. functions.timestamp_to_str(publish_timestamp),
  384. error_detail
  385. )
  386. )
  387. if affected_rows:
  388. bot(
  389. title="文章违规告警(new task)",
  390. detail={
  391. "account_name": article_info['account_name'],
  392. "gh_id": article_info['gh_id'],
  393. "title": article_info['article_title'],
  394. "wx_sn": wx_sn,
  395. "publish_date": functions.timestamp_to_str(publish_timestamp),
  396. "error_detail": error_detail,
  397. },
  398. mention=False
  399. )
  400. aiditApi.delete_articles(
  401. gh_id=article_info['gh_id'],
  402. title=article_info['article_title']
  403. )
  404. def get_account_info(self, gh_id: str) -> int:
  405. """
  406. 通过 gh_id查询账号信息的最新发布时间
  407. :param gh_id:
  408. :return:
  409. """
  410. sql = f"""
  411. SELECT MAX(publish_timestamp)
  412. FROM {ARTICLE_TABLE}
  413. WHERE ghId = '{gh_id}';
  414. """
  415. result = self.piaoquan_crawler_db_client.fetch(sql)
  416. if result:
  417. return result[0][0]
  418. else:
  419. # 新号,抓取周期定位抓取时刻往前推30天
  420. return int(time.time()) - const.NEW_ACCOUNT_CRAWL_PERIOD
  421. def check_single_account(self, account_item: Dict) -> bool:
  422. """
  423. 校验每个账号是否更新
  424. :param account_item:
  425. :return: True / False
  426. """
  427. gh_id = account_item['ghId']
  428. account_type = account_item['account_type']
  429. today_str = datetime.today().strftime("%Y-%m-%d")
  430. today_date_time = datetime.strptime(today_str, "%Y-%m-%d")
  431. today_timestamp = today_date_time.timestamp()
  432. sql = f"""
  433. SELECT max(updateTime)
  434. FROM {ARTICLE_TABLE}
  435. WHERE ghId = '{gh_id}';
  436. """
  437. try:
  438. latest_update_time = self.piaoquan_crawler_db_client.fetch(sql)[0][0]
  439. # 判断该账号当天发布的文章是否被收集
  440. if account_type in const.SUBSCRIBE_TYPE_SET:
  441. if int(latest_update_time) > int(today_timestamp):
  442. return True
  443. else:
  444. return False
  445. else:
  446. if int(latest_update_time) > int(today_timestamp) - 7 * 24 * 3600:
  447. return True
  448. else:
  449. return False
  450. except Exception as e:
  451. print(e)
  452. return False
  453. def process_single_account(self, account_info: Dict, run_date: str):
  454. """
  455. 处理单个账号
  456. """
  457. gh_id = account_info['ghId']
  458. # 判断该账号当天是否有自动群发且没有无限流发表
  459. select_sql = f"""
  460. SELECT push_type
  461. FROM long_articles_published_trace_id
  462. WHERE gh_id = '{gh_id}' AND publish_timestamp > UNIX_TIMESTAMP('{run_date}');
  463. """
  464. response = self.long_articles_db_client.fetch(select_sql, cursor_type=DictCursor)
  465. UNLIMITED_PUSH = 3
  466. if response:
  467. unlimited_push_list = [item for item in response if item['push_type'] == UNLIMITED_PUSH]
  468. if unlimited_push_list:
  469. self.update_account_by_spider(account_info=account_info)
  470. else:
  471. print("By AIGC", account_info)
  472. self.update_account_by_aigc(account_info=account_info, run_date=run_date)
  473. else:
  474. self.update_account_by_spider(account_info=account_info)
  475. def update_publish_timestamp(self, article_info: Dict):
  476. """
  477. 更新发布时间戳 && minigram 信息
  478. :param article_info:
  479. :return:
  480. """
  481. url = article_info['ContentUrl']
  482. wx_sn = article_info['wx_sn']
  483. try:
  484. response = spider.get_article_text(url)
  485. response_code = response['code']
  486. if response_code == const.ARTICLE_DELETE_CODE:
  487. publish_timestamp_s = const.DELETE_STATUS
  488. root_source_id_list = []
  489. elif response_code == const.ARTICLE_ILLEGAL_CODE:
  490. publish_timestamp_s = const.ILLEGAL_STATUS
  491. root_source_id_list = []
  492. elif response_code == const.ARTICLE_SUCCESS_CODE:
  493. data = response['data']['data']
  494. publish_timestamp_ms = data['publish_timestamp']
  495. publish_timestamp_s = int(publish_timestamp_ms / 1000)
  496. mini_program = data.get('mini_program', [])
  497. if mini_program:
  498. root_source_id_list = [
  499. urllib.parse.parse_qs(
  500. urllib.parse.unquote(i['path'])
  501. )['rootSourceId'][0]
  502. for i in mini_program
  503. ]
  504. else:
  505. root_source_id_list = []
  506. else:
  507. publish_timestamp_s = const.UNKNOWN_STATUS
  508. root_source_id_list = []
  509. except Exception as e:
  510. publish_timestamp_s = const.REQUEST_FAIL_STATUS
  511. root_source_id_list = None
  512. error_msg = traceback.format_exc()
  513. print(e, error_msg)
  514. update_sql = f"""
  515. UPDATE {ARTICLE_TABLE}
  516. SET publish_timestamp = %s, root_source_id_list = %s
  517. WHERE wx_sn = %s;
  518. """
  519. self.piaoquan_crawler_db_client.save(
  520. query=update_sql,
  521. params=(
  522. publish_timestamp_s,
  523. json.dumps(root_source_id_list, ensure_ascii=False),
  524. wx_sn
  525. ))
  526. if publish_timestamp_s == const.REQUEST_FAIL_STATUS:
  527. return article_info
  528. else:
  529. return None
  530. def update_job(self, biz_date: str = None):
  531. """
  532. 执行更新任务
  533. """
  534. account_list = self.get_account_list()
  535. if not biz_date:
  536. biz_date = datetime.today().strftime('%Y-%m-%d')
  537. # 处理订阅号
  538. subscription_accounts = [i for i in account_list if i['account_type'] in const.SUBSCRIBE_TYPE_SET]
  539. success_count = 0
  540. fail_count = 0
  541. for account in tqdm(subscription_accounts):
  542. try:
  543. self.process_single_account(account_info=account, run_date=biz_date)
  544. success_count += 1
  545. time.sleep(3)
  546. except Exception as e:
  547. fail_count += 1
  548. log(
  549. task="updatePublishedMsgDaily",
  550. function="update_job",
  551. message="单个账号文章更新失败, 报错信息是: {}".format(e),
  552. status="fail",
  553. )
  554. log(
  555. task="updatePublishedMsgDaily",
  556. function="update_job",
  557. message="订阅号更新完成",
  558. data={
  559. "success": success_count,
  560. "fail": fail_count
  561. }
  562. )
  563. if fail_count / (success_count + fail_count) > const.SUBSCRIBE_FAIL_RATE_THRESHOLD:
  564. bot(
  565. title="订阅号超过 {}% 的账号更新失败".format(int(const.SUBSCRIBE_FAIL_RATE_THRESHOLD * 100)),
  566. detail={
  567. "success": success_count,
  568. "fail": fail_count,
  569. "failRate": fail_count / (success_count + fail_count)
  570. }
  571. )
  572. bot(
  573. title="更新每日发布文章任务完成通知(new)",
  574. detail={
  575. "msg": "订阅号更新完成",
  576. "finish_time": datetime.today().__str__()
  577. },
  578. mention=False
  579. )
  580. # 服务号
  581. server_accounts = [i for i in account_list if i['account_type'] == const.SERVICE_TYPE]
  582. for account in tqdm(server_accounts):
  583. try:
  584. self.process_single_account(account_info=account, run_date=biz_date)
  585. time.sleep(1)
  586. except Exception as e:
  587. print(e)
  588. bot(
  589. title="更新每日发布文章任务完成通知(new)",
  590. detail={
  591. "msg": "服务号更新完成",
  592. "finish_time": datetime.today().__str__()
  593. },
  594. mention=False
  595. )
  596. def check_job(self, biz_date: str = None):
  597. """
  598. 执行检查任务,check each account
  599. """
  600. if not biz_date:
  601. biz_date = datetime.today().strftime('%Y-%m-%d')
  602. account_list = self.get_account_list()
  603. subscription_accounts = [i for i in account_list if i['account_type'] in const.SUBSCRIBE_TYPE_SET]
  604. fail_list = []
  605. # check and rework if fail
  606. for sub_item in tqdm(subscription_accounts):
  607. res = self.check_single_account(sub_item)
  608. if not res:
  609. self.process_single_account(sub_item, biz_date)
  610. # check whether success and bot if fails
  611. for sub_item in tqdm(subscription_accounts):
  612. res = self.check_single_account(sub_item)
  613. if not res:
  614. # 去掉三个不需要查看的字段
  615. sub_item.pop('account_type', None)
  616. sub_item.pop('account_auth', None)
  617. sub_item.pop('account_id', None)
  618. fail_list.append(sub_item)
  619. if fail_list:
  620. try:
  621. bot(
  622. title="更新当天发布文章,存在未更新的账号(new)",
  623. detail={
  624. "columns": generate_bot_columns(),
  625. "rows": fail_list
  626. },
  627. table=True
  628. )
  629. except Exception as e:
  630. print("Timeout Error: {}".format(e))
  631. else:
  632. bot(
  633. title="更新当天发布文章,所有账号均更新成功(new)",
  634. mention=False,
  635. detail={
  636. "msg": "校验任务完成",
  637. "finish_time": datetime.today().__str__()
  638. }
  639. )
  640. def get_article_detail_job(self):
  641. """
  642. 获取发布文章详情
  643. :return:
  644. """
  645. select_sql = f"""
  646. SELECT ContentUrl, wx_sn
  647. FROM {ARTICLE_TABLE}
  648. WHERE publish_timestamp in {(const.DEFAULT_STATUS, const.REQUEST_FAIL_STATUS)};
  649. """
  650. article_list = self.piaoquan_crawler_db_client.fetch(select_sql, cursor_type=DictCursor)
  651. for article in tqdm(article_list):
  652. try:
  653. self.update_publish_timestamp(article)
  654. except Exception as e:
  655. print(e)
  656. error_msg = traceback.format_exc()
  657. print(error_msg)
  658. # check 一遍存在请求失败-1 && 0 的文章
  659. select_sql = f"""
  660. SELECT ContentUrl, wx_sn
  661. FROM {ARTICLE_TABLE}
  662. WHERE publish_timestamp in {(const.DEFAULT_STATUS, const.REQUEST_FAIL_STATUS)};
  663. """
  664. process_failed_articles = self.piaoquan_crawler_db_client.fetch(select_sql, cursor_type=DictCursor)
  665. fail_list = []
  666. if process_failed_articles:
  667. for article in tqdm(process_failed_articles):
  668. try:
  669. res = self.update_publish_timestamp(article)
  670. fail_list.append(res)
  671. except Exception as e:
  672. print(e)
  673. error_msg = traceback.format_exc()
  674. print(error_msg)
  675. # 通过msgId 来修改publish_timestamp
  676. update_sql = f"""
  677. UPDATE {ARTICLE_TABLE} oav
  678. JOIN (
  679. SELECT ghId, appMsgId, MAX(publish_timestamp) AS publish_timestamp
  680. FROM {ARTICLE_TABLE}
  681. WHERE publish_timestamp > %s
  682. GROUP BY ghId, appMsgId
  683. ) vv
  684. ON oav.appMsgId = vv.appMsgId and oav.ghId = vv.ghId
  685. SET oav.publish_timestamp = vv.publish_timestamp
  686. WHERE oav.publish_timestamp <= %s;
  687. """
  688. self.piaoquan_crawler_db_client.save(
  689. query=update_sql,
  690. params=(0, 0)
  691. )
  692. # 若还是无 publish_timestamp,用update_time当作 publish_timestamp
  693. update_sql_2 = f"""
  694. UPDATE {ARTICLE_TABLE}
  695. SET publish_timestamp = updateTime
  696. WHERE publish_timestamp < %s;
  697. """
  698. self.piaoquan_crawler_db_client.save(
  699. query=update_sql_2,
  700. params=0
  701. )
  702. if fail_list:
  703. bot(
  704. title="更新文章任务,请求detail失败",
  705. detail=fail_list
  706. )