updatePublishedMsgDaily.py 22 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693
  1. """
  2. @author: luojunhui
  3. @description: update daily information into official articles v2
  4. """
  5. import time
  6. import json
  7. import traceback
  8. import urllib.parse
  9. from tqdm import tqdm
  10. from datetime import datetime
  11. from argparse import ArgumentParser
  12. from applications import PQMySQL, WeixinSpider, Functions, log, bot, aiditApi
  13. from applications.const import updatePublishedMsgTaskConst
  14. ARTICLE_TABLE = "official_articles_v2"
  15. const = updatePublishedMsgTaskConst()
  16. spider = WeixinSpider()
  17. def get_account_using_status():
  18. """
  19. 获取正在 using 的 ghid
  20. :return:
  21. """
  22. sql = "SELECT gh_id FROM long_articles_publishing_accounts WHERE is_using = 1;"
  23. gh_id_tuple = PQMySQL().select(sql)
  24. gh_id_list = [
  25. i[0] for i in gh_id_tuple
  26. ]
  27. return set(gh_id_list)
  28. def get_accounts():
  29. """
  30. 从 aigc 数据库中获取目前处于发布状态的账号
  31. :return:
  32. "name": line[0],
  33. "ghId": line[1],
  34. "follower_count": line[2],
  35. "account_init_time": int(line[3] / 1000),
  36. "account_type": line[4], # 订阅号 or 服务号
  37. "account_auth": line[5]
  38. """
  39. using_account_set = get_account_using_status()
  40. account_list_with_out_using_status = aiditApi.get_publish_account_from_aigc()
  41. account_list = []
  42. for item in account_list_with_out_using_status:
  43. if item['ghId'] in using_account_set:
  44. item['using_status'] = 1
  45. else:
  46. item['using_status'] = 0
  47. account_list.append(item)
  48. subscription_account = [i for i in account_list if i['account_type'] in const.SUBSCRIBE_TYPE_SET]
  49. server_account = [i for i in account_list if i['account_type'] == const.SERVICE_TYPE]
  50. return subscription_account, server_account
  51. def insert_each_msg(db_client, account_info, account_name, msg_list):
  52. """
  53. 把消息数据更新到数据库中
  54. :param account_info:
  55. :param db_client:
  56. :param account_name:
  57. :param msg_list:
  58. :return:
  59. """
  60. gh_id = account_info['ghId']
  61. for info in msg_list:
  62. baseInfo = info.get("BaseInfo", {})
  63. appMsgId = info.get("AppMsg", {}).get("BaseInfo", {}).get("AppMsgId", None)
  64. createTime = info.get("AppMsg", {}).get("BaseInfo", {}).get("CreateTime", None)
  65. updateTime = info.get("AppMsg", {}).get("BaseInfo", {}).get("UpdateTime", None)
  66. Type = info.get("AppMsg", {}).get("BaseInfo", {}).get("Type", None)
  67. detail_article_list = info.get("AppMsg", {}).get("DetailInfo", [])
  68. if detail_article_list:
  69. for article in detail_article_list:
  70. title = article.get("Title", None)
  71. Digest = article.get("Digest", None)
  72. ItemIndex = article.get("ItemIndex", None)
  73. ContentUrl = article.get("ContentUrl", None)
  74. SourceUrl = article.get("SourceUrl", None)
  75. CoverImgUrl = article.get("CoverImgUrl", None)
  76. CoverImgUrl_1_1 = article.get("CoverImgUrl_1_1", None)
  77. CoverImgUrl_235_1 = article.get("CoverImgUrl_235_1", None)
  78. ItemShowType = article.get("ItemShowType", None)
  79. IsOriginal = article.get("IsOriginal", None)
  80. ShowDesc = article.get("ShowDesc", None)
  81. show_stat = Functions().show_desc_to_sta(ShowDesc)
  82. ori_content = article.get("ori_content", None)
  83. show_view_count = show_stat.get("show_view_count", 0)
  84. show_like_count = show_stat.get("show_like_count", 0)
  85. show_zs_count = show_stat.get("show_zs_count", 0)
  86. show_pay_count = show_stat.get("show_pay_count", 0)
  87. wx_sn = ContentUrl.split("&sn=")[1].split("&")[0] if ContentUrl else None
  88. status = account_info['using_status']
  89. info_tuple = (
  90. gh_id,
  91. account_name,
  92. appMsgId,
  93. title,
  94. Type,
  95. createTime,
  96. updateTime,
  97. Digest,
  98. ItemIndex,
  99. ContentUrl,
  100. SourceUrl,
  101. CoverImgUrl,
  102. CoverImgUrl_1_1,
  103. CoverImgUrl_235_1,
  104. ItemShowType,
  105. IsOriginal,
  106. ShowDesc,
  107. ori_content,
  108. show_view_count,
  109. show_like_count,
  110. show_zs_count,
  111. show_pay_count,
  112. wx_sn,
  113. json.dumps(baseInfo, ensure_ascii=False),
  114. Functions().str_to_md5(title),
  115. status
  116. )
  117. try:
  118. insert_sql = f"""
  119. INSERT INTO {ARTICLE_TABLE}
  120. (ghId, accountName, appMsgId, title, Type, createTime, updateTime, Digest, ItemIndex, ContentUrl, SourceUrl, CoverImgUrl, CoverImgUrl_1_1, CoverImgUrl_255_1, ItemShowType, IsOriginal, ShowDesc, ori_content, show_view_count, show_like_count, show_zs_count, show_pay_count, wx_sn, baseInfo, title_md5, status)
  121. values
  122. (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s);
  123. """
  124. db_client.update(sql=insert_sql, params=info_tuple)
  125. log(
  126. task="updatePublishedMsgDaily",
  127. function="insert_each_msg",
  128. message="插入文章数据成功",
  129. data={
  130. "info": info_tuple
  131. }
  132. )
  133. except Exception as e:
  134. try:
  135. update_sql = f"""
  136. UPDATE {ARTICLE_TABLE}
  137. SET show_view_count = %s, show_like_count=%s
  138. WHERE wx_sn = %s;
  139. """
  140. db_client.update(sql=update_sql,
  141. params=(show_view_count, show_like_count, wx_sn))
  142. log(
  143. task="updatePublishedMsgDaily",
  144. function="insert_each_msg",
  145. message="更新文章数据成功",
  146. data={
  147. "wxSn": wx_sn,
  148. "likeCount": show_like_count,
  149. "viewCount": show_view_count
  150. }
  151. )
  152. except Exception as e:
  153. log(
  154. task="updatePublishedMsgDaily",
  155. function="insert_each_msg",
  156. message="更新文章失败, 报错原因是: {}".format(e),
  157. status="fail"
  158. )
  159. continue
  160. def update_each_account(db_client, account_info, account_name, latest_update_time, cursor=None):
  161. """
  162. 更新每一个账号信息
  163. :param account_info:
  164. :param account_name:
  165. :param cursor:
  166. :param latest_update_time: 最新更新时间
  167. :param db_client: 数据库连接信息
  168. :return: None
  169. """
  170. gh_id = account_info['ghId']
  171. response = WeixinSpider().update_msg_list(ghId=gh_id, index=cursor)
  172. msg_list = response.get("data", {}).get("data", {})
  173. if msg_list:
  174. # do
  175. last_article_in_this_msg = msg_list[-1]
  176. last_time_stamp_in_this_msg = last_article_in_this_msg['AppMsg']['BaseInfo']['UpdateTime']
  177. last_url = last_article_in_this_msg['AppMsg']['DetailInfo'][0]['ContentUrl']
  178. resdata = WeixinSpider().get_account_by_url(last_url)
  179. check_id = resdata['data'].get('data', {}).get('wx_gh')
  180. if check_id == gh_id:
  181. insert_each_msg(
  182. db_client=db_client,
  183. account_info=account_info,
  184. account_name=account_name,
  185. msg_list=msg_list
  186. )
  187. if last_time_stamp_in_this_msg > latest_update_time:
  188. next_cursor = response['data']['next_cursor']
  189. return update_each_account(
  190. db_client=db_client,
  191. account_info=account_info,
  192. account_name=account_name,
  193. latest_update_time=latest_update_time,
  194. cursor=next_cursor
  195. )
  196. log(
  197. task="updatePublishedMsgDaily",
  198. function="update_each_account",
  199. message="账号文章更新成功",
  200. data=response
  201. )
  202. else:
  203. log(
  204. task="updatePublishedMsgDaily",
  205. function="update_each_account",
  206. message="账号文章更新失败",
  207. status="fail",
  208. data=response
  209. )
  210. return
  211. def check_account_info(db_client, gh_id, account_name):
  212. """
  213. 通过 gh_id查询视频信息
  214. :param account_name:
  215. :param db_client:
  216. :param gh_id:
  217. :return:
  218. """
  219. sql = f"""
  220. SELECT accountName, updateTime
  221. FROM {ARTICLE_TABLE}
  222. WHERE ghId = '{gh_id}'
  223. ORDER BY updateTime DESC LIMIT 1;
  224. """
  225. result = db_client.select(sql)
  226. if result:
  227. old_account_name, update_time = result[0]
  228. return {
  229. "account_name": old_account_name,
  230. "update_time": update_time,
  231. "account_type": "history"
  232. }
  233. else:
  234. return {
  235. "account_name": account_name,
  236. "update_time": int(time.time()) - 30 * 24 * 60 * 60,
  237. "account_type": "new"
  238. }
  239. def update_single_account(db_client, account_info):
  240. """
  241. :param account_info:
  242. :param db_client:
  243. :return:
  244. """
  245. gh_id = account_info['ghId']
  246. account_name = account_info['name']
  247. account_detail = check_account_info(db_client, gh_id, account_name)
  248. account_name = account_detail['account_name']
  249. update_time = account_detail['update_time']
  250. update_each_account(
  251. db_client=db_client,
  252. account_info=account_info,
  253. account_name=account_name,
  254. latest_update_time=update_time
  255. )
  256. def check_single_account(db_client, account_item):
  257. """
  258. 校验每个账号是否更新
  259. :param db_client:
  260. :param account_item:
  261. :return: True / False
  262. """
  263. gh_id = account_item['ghId']
  264. account_type = account_item['account_type']
  265. today_str = datetime.today().strftime("%Y-%m-%d")
  266. today_date_time = datetime.strptime(today_str, "%Y-%m-%d")
  267. today_timestamp = today_date_time.timestamp()
  268. sql = f"""
  269. SELECT updateTime
  270. FROM {ARTICLE_TABLE}
  271. WHERE ghId = '{gh_id}'
  272. ORDER BY updateTime
  273. DESC
  274. LIMIT 1;
  275. """
  276. try:
  277. latest_update_time = db_client.select(sql)[0][0]
  278. # 判断该账号当天发布的文章是否被收集
  279. if account_type in const.SUBSCRIBE_TYPE_SET:
  280. if int(latest_update_time) > int(today_timestamp):
  281. return True
  282. else:
  283. return False
  284. else:
  285. if int(latest_update_time) > int(today_timestamp) - 7 * 24 * 3600:
  286. return True
  287. else:
  288. return False
  289. except Exception as e:
  290. print("updateTime Error -- {}".format(e))
  291. return False
  292. def update_job():
  293. """
  294. 更新任务
  295. :return:
  296. """
  297. try:
  298. db_client = PQMySQL()
  299. except Exception as e:
  300. error_msg = traceback.format_exc()
  301. bot(
  302. title="更新文章任务连接数据库失败",
  303. detail={
  304. "error": e,
  305. "msg": error_msg
  306. }
  307. )
  308. return
  309. sub_accounts, server_accounts = get_accounts()
  310. s_count = 0
  311. f_count = 0
  312. for sub_item in tqdm(sub_accounts):
  313. try:
  314. update_single_account(db_client, sub_item)
  315. s_count += 1
  316. time.sleep(5)
  317. except Exception as e:
  318. f_count += 1
  319. log(
  320. task="updatePublishedMsgDaily",
  321. function="update_job",
  322. message="单个账号文章更新失败, 报错信息是: {}".format(e),
  323. status="fail",
  324. )
  325. log(
  326. task="updatePublishedMsgDaily",
  327. function="update_job",
  328. message="订阅号更新完成",
  329. data={
  330. "success": s_count,
  331. "fail": f_count
  332. }
  333. )
  334. if f_count / (s_count + f_count) > 0.3:
  335. bot(
  336. title="订阅号超过 30% 的账号更新失败",
  337. detail={
  338. "success": s_count,
  339. "fail": f_count,
  340. "failRate": f_count / (s_count + f_count)
  341. }
  342. )
  343. bot(
  344. title="更新每日发布文章任务完成通知",
  345. detail={
  346. "msg": "订阅号更新完成",
  347. "finish_time": datetime.today().__str__()
  348. },
  349. mention=False
  350. )
  351. for sub_item in tqdm(server_accounts):
  352. try:
  353. update_single_account(db_client, sub_item)
  354. time.sleep(5)
  355. except Exception as e:
  356. print(e)
  357. bot(
  358. title="更新每日发布文章任务完成通知",
  359. detail={
  360. "msg": "服务号更新完成",
  361. "finish_time": datetime.today().__str__()
  362. },
  363. mention=False
  364. )
  365. def check_job():
  366. """
  367. 校验任务
  368. :return:
  369. """
  370. try:
  371. db_client = PQMySQL()
  372. except Exception as e:
  373. error_msg = traceback.format_exc()
  374. bot(
  375. title="校验更新文章任务连接数据库失败",
  376. detail={
  377. "job": "check_job",
  378. "error": e,
  379. "msg": error_msg
  380. }
  381. )
  382. return
  383. sub_accounts, server_accounts = get_accounts()
  384. fail_list = []
  385. # account_list = sub_accounts + server_accounts
  386. account_list = sub_accounts
  387. # check and rework if fail
  388. for sub_item in tqdm(account_list):
  389. res = check_single_account(db_client, sub_item)
  390. if not res:
  391. update_single_account(db_client, sub_item)
  392. # check whether success and bot if fails
  393. for sub_item in tqdm(account_list):
  394. res = check_single_account(db_client, sub_item)
  395. if not res:
  396. sub_item.drop('account_type')
  397. sub_item.drop('account_auth')
  398. init_timestamp = sub_item.pop('account_init_timestamp')
  399. sub_item['account_init_date'] = datetime.fromtimestamp(init_timestamp).strftime('%Y-%m-%d %H:%M:%S')
  400. fail_list.append(sub_item)
  401. if fail_list:
  402. try:
  403. bot(
  404. title="日常报警, 存在账号更新失败",
  405. detail=fail_list
  406. )
  407. except Exception as e:
  408. print("Timeout Error: {}".format(e))
  409. else:
  410. bot(
  411. title="校验完成通知",
  412. mention=False,
  413. detail={
  414. "msg": "校验任务完成",
  415. "finish_time": datetime.today().__str__()
  416. }
  417. )
  418. def get_articles(db_client):
  419. """
  420. :return:
  421. """
  422. sql = f"""
  423. SELECT ContentUrl, wx_sn
  424. FROM {ARTICLE_TABLE}
  425. WHERE publish_timestamp in {(const.DEFAULT_STATUS, const.REQUEST_FAIL_STATUS)};"""
  426. response = db_client.select(sql)
  427. return response
  428. def update_publish_timestamp(db_client, row):
  429. """
  430. 更新发布时间戳 && minigram 信息
  431. :param db_client:
  432. :param row:
  433. :return:
  434. """
  435. url = row[0]
  436. wx_sn = row[1]
  437. try:
  438. response = WeixinSpider().get_article_text(url)
  439. response_code = response['code']
  440. if response_code == const.ARTICLE_DELETE_CODE:
  441. publish_timestamp_s = const.DELETE_STATUS
  442. root_source_id_list = []
  443. elif response_code == const.ARTICLE_ILLEGAL_CODE:
  444. publish_timestamp_s = const.ILLEGAL_STATUS
  445. root_source_id_list = []
  446. elif response_code == const.ARTICLE_SUCCESS_CODE:
  447. data = response['data']['data']
  448. publish_timestamp_ms = data['publish_timestamp']
  449. publish_timestamp_s = int(publish_timestamp_ms / 1000)
  450. mini_program = data.get('mini_program', [])
  451. if mini_program:
  452. root_source_id_list = [
  453. urllib.parse.parse_qs(
  454. urllib.parse.unquote(i['path'])
  455. )['rootSourceId'][0]
  456. for i in mini_program
  457. ]
  458. else:
  459. root_source_id_list = []
  460. else:
  461. publish_timestamp_s = const.UNKNOWN_STATUS
  462. root_source_id_list = []
  463. except Exception as e:
  464. publish_timestamp_s = const.REQUEST_FAIL_STATUS
  465. root_source_id_list = []
  466. error_msg = traceback.format_exc()
  467. print(e, error_msg)
  468. update_sql = f"""
  469. UPDATE {ARTICLE_TABLE}
  470. SET publish_timestamp = %s, root_source_id_list = %s
  471. WHERE wx_sn = %s;
  472. """
  473. db_client.update(
  474. sql=update_sql,
  475. params=(
  476. publish_timestamp_s,
  477. json.dumps(root_source_id_list, ensure_ascii=False),
  478. wx_sn
  479. ))
  480. if publish_timestamp_s == const.REQUEST_FAIL_STATUS:
  481. return row
  482. else:
  483. return None
  484. def get_article_detail_job():
  485. """
  486. 获取发布文章详情
  487. :return:
  488. """
  489. try:
  490. db_client = PQMySQL()
  491. except Exception as e:
  492. error_msg = traceback.format_exc()
  493. bot(
  494. title="获取文章详情任务连接数据库失败",
  495. detail={
  496. "job": "get_article_detail_job",
  497. "error": e,
  498. "msg": error_msg
  499. }
  500. )
  501. return
  502. article_tuple = get_articles(db_client)
  503. for article in tqdm(article_tuple):
  504. try:
  505. update_publish_timestamp(db_client=db_client, row=article)
  506. except Exception as e:
  507. print(e)
  508. error_msg = traceback.format_exc()
  509. print(error_msg)
  510. # check 一遍存在请求失败-1 && 0 的文章
  511. process_failed_articles = get_articles(db_client)
  512. fail_list = []
  513. if process_failed_articles:
  514. for article in tqdm(process_failed_articles):
  515. try:
  516. res = update_publish_timestamp(db_client=db_client, row=article)
  517. fail_list.append({"wx_sn": res[1], "url": res[0]})
  518. except Exception as e:
  519. print(e)
  520. error_msg = traceback.format_exc()
  521. print(error_msg)
  522. # 通过msgId 来修改publish_timestamp
  523. update_sql = f"""
  524. UPDATE {ARTICLE_TABLE} oav
  525. JOIN (
  526. SELECT appMsgId, MAX(publish_timestamp) AS publish_timestamp
  527. FROM {ARTICLE_TABLE}
  528. WHERE publish_timestamp > %s
  529. GROUP BY appMsgId
  530. ) vv
  531. ON oav.appMsgId = vv.appMsgId
  532. SET oav.publish_timestamp = vv.publish_timestamp
  533. WHERE oav.publish_timestamp <= %s;
  534. """
  535. affected_rows = db_client.update(
  536. sql=update_sql,
  537. params=(0, 0)
  538. )
  539. # 若还是无 publish_timestamp,用update_time当作 publish_timestamp
  540. update_sql_2 = f"""
  541. UPDATE {ARTICLE_TABLE}
  542. SET publish_timestamp = updateTime
  543. WHERE publish_timestamp < %s;
  544. """
  545. db_client.update(
  546. sql=update_sql_2,
  547. params=0
  548. )
  549. if fail_list:
  550. bot(
  551. title="更新文章任务,请求detail失败",
  552. detail=fail_list
  553. )
  554. def monitor():
  555. """
  556. 监控任务, 监测周期为7天,监测文章是否被违规,若监测到违规文章,则进行告警
  557. :return:
  558. """
  559. try:
  560. db_client = PQMySQL()
  561. except Exception as e:
  562. error_msg = traceback.format_exc()
  563. bot(
  564. title="监控任务连接数据库失败",
  565. detail={
  566. "job": "monitor",
  567. "error": str(e),
  568. "msg": error_msg
  569. }
  570. )
  571. return
  572. now_time = int(time.time())
  573. monitor_start_timestamp = now_time - const.MONITOR_PERIOD
  574. select_sql = f"""
  575. SELECT ghId, accountName, title, ContentUrl, wx_sn, from_unixtime(publish_timestamp) AS publish_timestamp
  576. FROM {ARTICLE_TABLE}
  577. WHERE publish_timestamp >= {monitor_start_timestamp};
  578. """
  579. article_list = db_client.select(select_sql)
  580. for article in tqdm(article_list, desc="monitor article list"):
  581. gh_id = article[0]
  582. account_name = article[1]
  583. title = article[2]
  584. url = article[3]
  585. wx_sn = article[4]
  586. publish_timestamp = article[5]
  587. try:
  588. response = spider.get_article_text(url, is_cache=False)
  589. response_code = response['code']
  590. if response_code == const.ARTICLE_ILLEGAL_CODE:
  591. bot(
  592. title="文章违规告警",
  593. detail={
  594. "ghId": gh_id,
  595. "accountName": account_name,
  596. "title": title,
  597. "wx_sn": str(wx_sn),
  598. "publish_date": publish_timestamp
  599. },
  600. mention=False
  601. )
  602. aiditApi.delete_articles(
  603. gh_id=gh_id,
  604. title=title
  605. )
  606. except Exception as e:
  607. error_msg = traceback.format_exc()
  608. log(
  609. task="monitor",
  610. function="monitor",
  611. message="请求文章详情失败",
  612. data={
  613. "ghId": gh_id,
  614. "accountName": account_name,
  615. "title": title,
  616. "wx_sn": wx_sn,
  617. "error": e,
  618. "msg": error_msg
  619. }
  620. )
  621. def main():
  622. """
  623. main
  624. :return:
  625. """
  626. update_job()
  627. check_job()
  628. get_article_detail_job()
  629. if __name__ == '__main__':
  630. parser = ArgumentParser()
  631. parser.add_argument("--run_task",
  632. help="update: update_job, check: check_job, detail: get_article_detail_job, monitor: monitor")
  633. args = parser.parse_args()
  634. if args.run_task:
  635. run_task = args.run_task
  636. match run_task:
  637. case "update":
  638. update_job()
  639. case "check":
  640. check_job()
  641. case "detail":
  642. get_article_detail_job()
  643. case "monitor":
  644. monitor()
  645. case _:
  646. print("No such task, input update: update_job, check: check_job, detail: get_article_detail_job")
  647. else:
  648. main()