updatePublishedMsgDaily.py 26 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737
  1. """
  2. @author: luojunhui
  3. @description: update daily information into official articles v2
  4. """
  5. import json
  6. import time
  7. import traceback
  8. import urllib.parse
  9. from argparse import ArgumentParser
  10. from datetime import datetime
  11. from typing import Dict, List, Tuple
  12. from pymysql.cursors import DictCursor
  13. from tqdm import tqdm
  14. from applications import aiditApi
  15. from applications import bot
  16. from applications import create_feishu_columns_sheet
  17. from applications import Functions
  18. from applications import log
  19. from applications import WeixinSpider
  20. from applications.const import updatePublishedMsgTaskConst
  21. from applications.db import DatabaseConnector
  22. from config import denet_config, long_articles_config, piaoquan_crawler_config
  23. ARTICLE_TABLE = "official_articles_v2"
  24. const = updatePublishedMsgTaskConst()
  25. spider = WeixinSpider()
  26. functions = Functions()
  27. def generate_bot_columns():
  28. """
  29. 生成列
  30. :return:
  31. """
  32. columns = [
  33. create_feishu_columns_sheet(sheet_type="plain_text", sheet_name="name", display_name="公众号名称"),
  34. create_feishu_columns_sheet(sheet_type="plain_text", sheet_name="ghId", display_name="ghId"),
  35. create_feishu_columns_sheet(sheet_type="number", sheet_name="follower_count", display_name="粉丝数"),
  36. create_feishu_columns_sheet(sheet_type="date", sheet_name="account_init_timestamp",
  37. display_name="账号接入系统时间"),
  38. create_feishu_columns_sheet(sheet_type="plain_text", sheet_name="using_status", display_name="利用状态")
  39. ]
  40. return columns
  41. def get_account_status(db_client: DatabaseConnector) -> Dict:
  42. """
  43. 获取账号的实验状态
  44. :return:
  45. """
  46. sql = f"""
  47. SELECT t1.account_id, t2.status
  48. FROM wx_statistics_group_source_account t1
  49. JOIN wx_statistics_group_source t2
  50. ON t1.group_source_name = t2.account_source_name;
  51. """
  52. account_status_list = db_client.fetch(sql, cursor_type=DictCursor)
  53. account_status_dict = {account['account_id']: account['status'] for account in account_status_list}
  54. return account_status_dict
  55. def get_accounts(db_client: DatabaseConnector) -> List[Dict]:
  56. """
  57. 从 aigc 数据库中获取目前处于发布状态的账号
  58. :return:
  59. "name": line[0],
  60. "ghId": line[1],
  61. "follower_count": line[2],
  62. "account_init_time": int(line[3] / 1000),
  63. "account_type": line[4], # 订阅号 or 服务号
  64. "account_auth": line[5]
  65. """
  66. illegal_accounts = [
  67. 'gh_4c058673c07e',
  68. 'gh_de9f9ebc976b',
  69. 'gh_7b4a5f86d68c',
  70. 'gh_f902cea89e48',
  71. 'gh_789a40fe7935',
  72. 'gh_cd041ed721e6',
  73. 'gh_62d7f423f382',
  74. 'gh_043223059726',
  75. 'gh_5bb79339a1f4'
  76. ]
  77. account_list_with_out_using_status = aiditApi.get_publish_account_from_aigc()
  78. account_status_dict = get_account_status(db_client)
  79. account_list = [
  80. {
  81. **item,
  82. 'using_status': 0 if account_status_dict.get(item['account_id']) == '实验' else 1
  83. }
  84. for item in account_list_with_out_using_status
  85. ]
  86. account_list = [account for account in account_list if account['ghId'] not in illegal_accounts]
  87. return account_list
  88. def insert_each_msg(db_client: DatabaseConnector, account_info: Dict, msg_list: List[Dict]) -> None:
  89. """
  90. 把消息数据更新到数据库中
  91. :param account_info:
  92. :param db_client:
  93. :param msg_list:
  94. :return:
  95. """
  96. gh_id = account_info['ghId']
  97. account_name = account_info['name']
  98. for info in msg_list:
  99. baseInfo = info.get("BaseInfo", {})
  100. appMsgId = info.get("AppMsg", {}).get("BaseInfo", {}).get("AppMsgId", None)
  101. createTime = info.get("AppMsg", {}).get("BaseInfo", {}).get("CreateTime", None)
  102. updateTime = info.get("AppMsg", {}).get("BaseInfo", {}).get("UpdateTime", None)
  103. Type = info.get("AppMsg", {}).get("BaseInfo", {}).get("Type", None)
  104. detail_article_list = info.get("AppMsg", {}).get("DetailInfo", [])
  105. if detail_article_list:
  106. for article in detail_article_list:
  107. title = article.get("Title", None)
  108. Digest = article.get("Digest", None)
  109. ItemIndex = article.get("ItemIndex", None)
  110. ContentUrl = article.get("ContentUrl", None)
  111. SourceUrl = article.get("SourceUrl", None)
  112. CoverImgUrl = article.get("CoverImgUrl", None)
  113. CoverImgUrl_1_1 = article.get("CoverImgUrl_1_1", None)
  114. CoverImgUrl_235_1 = article.get("CoverImgUrl_235_1", None)
  115. ItemShowType = article.get("ItemShowType", None)
  116. IsOriginal = article.get("IsOriginal", None)
  117. ShowDesc = article.get("ShowDesc", None)
  118. show_stat = functions.show_desc_to_sta(ShowDesc)
  119. ori_content = article.get("ori_content", None)
  120. show_view_count = show_stat.get("show_view_count", 0)
  121. show_like_count = show_stat.get("show_like_count", 0)
  122. show_zs_count = show_stat.get("show_zs_count", 0)
  123. show_pay_count = show_stat.get("show_pay_count", 0)
  124. wx_sn = ContentUrl.split("&sn=")[1].split("&")[0] if ContentUrl else None
  125. status = account_info['using_status']
  126. info_tuple = (
  127. gh_id,
  128. account_name,
  129. appMsgId,
  130. title,
  131. Type,
  132. createTime,
  133. updateTime,
  134. Digest,
  135. ItemIndex,
  136. ContentUrl,
  137. SourceUrl,
  138. CoverImgUrl,
  139. CoverImgUrl_1_1,
  140. CoverImgUrl_235_1,
  141. ItemShowType,
  142. IsOriginal,
  143. ShowDesc,
  144. ori_content,
  145. show_view_count,
  146. show_like_count,
  147. show_zs_count,
  148. show_pay_count,
  149. wx_sn,
  150. json.dumps(baseInfo, ensure_ascii=False),
  151. functions.str_to_md5(title),
  152. status
  153. )
  154. try:
  155. insert_sql = f"""
  156. INSERT INTO {ARTICLE_TABLE}
  157. (ghId, accountName, appMsgId, title, Type, createTime, updateTime, Digest, ItemIndex, ContentUrl, SourceUrl, CoverImgUrl, CoverImgUrl_1_1, CoverImgUrl_255_1, ItemShowType, IsOriginal, ShowDesc, ori_content, show_view_count, show_like_count, show_zs_count, show_pay_count, wx_sn, baseInfo, title_md5, status)
  158. values
  159. (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s);
  160. """
  161. db_client.save(query=insert_sql, params=info_tuple)
  162. log(
  163. task="updatePublishedMsgDaily",
  164. function="insert_each_msg",
  165. message="插入文章数据成功",
  166. data={
  167. "info": info_tuple
  168. }
  169. )
  170. except Exception as e:
  171. try:
  172. update_sql = f"""
  173. UPDATE {ARTICLE_TABLE}
  174. SET show_view_count = %s, show_like_count=%s
  175. WHERE wx_sn = %s;
  176. """
  177. db_client.save(query=update_sql,
  178. params=(show_view_count, show_like_count, wx_sn))
  179. log(
  180. task="updatePublishedMsgDaily",
  181. function="insert_each_msg",
  182. message="更新文章数据成功",
  183. data={
  184. "wxSn": wx_sn,
  185. "likeCount": show_like_count,
  186. "viewCount": show_view_count
  187. }
  188. )
  189. except Exception as e:
  190. log(
  191. task="updatePublishedMsgDaily",
  192. function="insert_each_msg",
  193. message="更新文章失败, 报错原因是: {}".format(e),
  194. status="fail"
  195. )
  196. continue
  197. def update_each_account(db_client: DatabaseConnector, account_info: Dict, latest_update_time: int, cursor=None):
  198. """
  199. 更新每一个账号信息
  200. :param account_info:
  201. :param cursor:
  202. :param latest_update_time: 最新更新时间
  203. :param db_client: 数据库连接信息
  204. :return: None
  205. """
  206. gh_id = account_info['ghId']
  207. response = spider.update_msg_list(ghId=gh_id, index=cursor)
  208. msg_list = response.get("data", {}).get("data", [])
  209. if msg_list:
  210. # do
  211. last_article_in_this_msg = msg_list[-1]
  212. last_time_stamp_in_this_msg = last_article_in_this_msg['AppMsg']['BaseInfo']['UpdateTime']
  213. # last_url = last_article_in_this_msg['AppMsg']['DetailInfo'][0]['ContentUrl']
  214. # resdata = spider.get_account_by_url(last_url)
  215. # check_id = resdata['data'].get('data', {}).get('wx_gh')
  216. # if check_id == gh_id:
  217. insert_each_msg(
  218. db_client=db_client,
  219. account_info=account_info,
  220. msg_list=msg_list
  221. )
  222. if last_time_stamp_in_this_msg > latest_update_time:
  223. next_cursor = response['data']['next_cursor']
  224. return update_each_account(
  225. db_client=db_client,
  226. account_info=account_info,
  227. latest_update_time=latest_update_time,
  228. cursor=next_cursor
  229. )
  230. log(
  231. task="updatePublishedMsgDaily",
  232. function="update_each_account",
  233. message="账号文章更新成功",
  234. data=response
  235. )
  236. return None
  237. else:
  238. log(
  239. task="updatePublishedMsgDaily",
  240. function="update_each_account",
  241. message="账号文章更新失败",
  242. status="fail",
  243. data=response
  244. )
  245. return None
  246. def check_account_info(db_client: DatabaseConnector, gh_id: str) -> int:
  247. """
  248. 通过 gh_id查询账号信息的最新发布时间
  249. :param db_client:
  250. :param gh_id:
  251. :return:
  252. """
  253. sql = f"""
  254. SELECT MAX(publish_timestamp)
  255. FROM {ARTICLE_TABLE}
  256. WHERE ghId = '{gh_id}';
  257. """
  258. result = db_client.fetch(sql)
  259. if result:
  260. return result[0][0]
  261. else:
  262. # 新号,抓取周期定位抓取时刻往前推30天
  263. return int(time.time()) - const.NEW_ACCOUNT_CRAWL_PERIOD
  264. def update_single_account(db_client: DatabaseConnector, account_info: Dict):
  265. """
  266. 更新单个账号
  267. :param db_client:
  268. :param account_info:
  269. :return:
  270. """
  271. gh_id = account_info['ghId']
  272. max_publish_time = check_account_info(db_client, gh_id)
  273. update_each_account(
  274. db_client=db_client,
  275. account_info=account_info,
  276. latest_update_time=max_publish_time
  277. )
  278. def check_single_account(db_client: DatabaseConnector, account_item: Dict) -> bool:
  279. """
  280. 校验每个账号是否更新
  281. :param db_client:
  282. :param account_item:
  283. :return: True / False
  284. """
  285. gh_id = account_item['ghId']
  286. account_type = account_item['account_type']
  287. today_str = datetime.today().strftime("%Y-%m-%d")
  288. today_date_time = datetime.strptime(today_str, "%Y-%m-%d")
  289. today_timestamp = today_date_time.timestamp()
  290. sql = f"""
  291. SELECT max(updateTime)
  292. FROM {ARTICLE_TABLE}
  293. WHERE ghId = '{gh_id}';
  294. """
  295. try:
  296. latest_update_time = db_client.fetch(sql)[0][0]
  297. # 判断该账号当天发布的文章是否被收集
  298. if account_type in const.SUBSCRIBE_TYPE_SET:
  299. if int(latest_update_time) > int(today_timestamp):
  300. return True
  301. else:
  302. return False
  303. else:
  304. if int(latest_update_time) > int(today_timestamp) - 7 * 24 * 3600:
  305. return True
  306. else:
  307. return False
  308. except Exception as e:
  309. print(e)
  310. return False
  311. def get_articles(db_client: DatabaseConnector):
  312. """
  313. :return:
  314. """
  315. sql = f"""
  316. SELECT ContentUrl, wx_sn
  317. FROM {ARTICLE_TABLE}
  318. WHERE from_unixtime(publish_timestamp) > '2025-06-07'
  319. and root_source_id_list = '[]';
  320. """
  321. response = db_client.fetch(sql)
  322. return response
  323. def update_publish_timestamp(db_client: DatabaseConnector, row: Tuple):
  324. """
  325. 更新发布时间戳 && minigram 信息
  326. :param db_client:
  327. :param row:
  328. :return:
  329. """
  330. url = row[0]
  331. wx_sn = row[1]
  332. try:
  333. print(url)
  334. response = spider.get_article_text(url)
  335. print(response)
  336. response_code = response['code']
  337. if response_code == const.ARTICLE_DELETE_CODE:
  338. publish_timestamp_s = const.DELETE_STATUS
  339. root_source_id_list = []
  340. elif response_code == const.ARTICLE_ILLEGAL_CODE:
  341. publish_timestamp_s = const.ILLEGAL_STATUS
  342. root_source_id_list = []
  343. elif response_code == const.ARTICLE_SUCCESS_CODE:
  344. data = response['data']['data']
  345. publish_timestamp_ms = data['publish_timestamp']
  346. publish_timestamp_s = int(publish_timestamp_ms / 1000)
  347. mini_program = data.get('mini_program', [])
  348. if mini_program:
  349. root_source_id_list = [
  350. urllib.parse.parse_qs(
  351. urllib.parse.unquote(i['path'])
  352. )['rootSourceId'][0]
  353. for i in mini_program
  354. ]
  355. else:
  356. root_source_id_list = []
  357. else:
  358. publish_timestamp_s = const.UNKNOWN_STATUS
  359. root_source_id_list = []
  360. except Exception as e:
  361. publish_timestamp_s = const.REQUEST_FAIL_STATUS
  362. root_source_id_list = None
  363. error_msg = traceback.format_exc()
  364. print(e, error_msg)
  365. update_sql = f"""
  366. UPDATE {ARTICLE_TABLE}
  367. SET publish_timestamp = %s, root_source_id_list = %s
  368. WHERE wx_sn = %s;
  369. """
  370. db_client.save(
  371. query=update_sql,
  372. params=(
  373. publish_timestamp_s,
  374. json.dumps(root_source_id_list, ensure_ascii=False),
  375. wx_sn
  376. ))
  377. # if publish_timestamp_s == const.REQUEST_FAIL_STATUS:
  378. # return row
  379. # else:
  380. # return None
  381. def get_article_detail_job(db_client: DatabaseConnector):
  382. """
  383. 获取发布文章详情
  384. :return:
  385. """
  386. article_tuple = get_articles(db_client)
  387. for article in tqdm(article_tuple):
  388. try:
  389. update_publish_timestamp(db_client=db_client, row=article)
  390. except Exception as e:
  391. print(e)
  392. error_msg = traceback.format_exc()
  393. print(error_msg)
  394. # # check 一遍存在请求失败-1 && 0 的文章
  395. # process_failed_articles = get_articles(db_client)
  396. # fail_list = []
  397. # if process_failed_articles:
  398. # for article in tqdm(process_failed_articles):
  399. # try:
  400. # update_publish_timestamp(db_client=db_client, row=article)
  401. # # fail_list.append({"wx_sn": res[1], "url": res[0]})
  402. # except Exception as e:
  403. # print(e)
  404. # error_msg = traceback.format_exc()
  405. # print(error_msg)
  406. # # 通过msgId 来修改publish_timestamp
  407. # update_sql = f"""
  408. # UPDATE {ARTICLE_TABLE} oav
  409. # JOIN (
  410. # SELECT ghId, appMsgId, MAX(publish_timestamp) AS publish_timestamp
  411. # FROM {ARTICLE_TABLE}
  412. # WHERE publish_timestamp > %s
  413. # GROUP BY ghId, appMsgId
  414. # ) vv
  415. # ON oav.appMsgId = vv.appMsgId AND oav.ghId = vv.ghId
  416. # SET oav.publish_timestamp = vv.publish_timestamp
  417. # WHERE oav.publish_timestamp <= %s;
  418. # """
  419. # db_client.save(
  420. # query=update_sql,
  421. # params=(0, 0)
  422. # )
  423. #
  424. # # 若还是无 publish_timestamp,用update_time当作 publish_timestamp
  425. # update_sql_2 = f"""
  426. # UPDATE {ARTICLE_TABLE}
  427. # SET publish_timestamp = updateTime
  428. # WHERE publish_timestamp < %s;
  429. # """
  430. # db_client.save(
  431. # query=update_sql_2,
  432. # params=0
  433. # )
  434. # if fail_list:
  435. # bot(
  436. # title="更新文章任务,请求detail失败",
  437. # detail=fail_list
  438. # )
  439. def whether_title_unsafe(db_client: DatabaseConnector, title: str):
  440. """
  441. 检查文章标题是否已经存在违规记录
  442. :param db_client:
  443. :param title:
  444. :return:
  445. """
  446. title_md5 = functions.str_to_md5(title)
  447. sql = f"""
  448. SELECT title_md5
  449. FROM article_unsafe_title
  450. WHERE title_md5 = '{title_md5}';
  451. """
  452. res = db_client.fetch(sql)
  453. if res:
  454. return True
  455. else:
  456. return False
  457. def update_job(piaoquan_crawler_db_client, aigc_db_client):
  458. """
  459. 更新任务
  460. :return:
  461. """
  462. account_list = get_accounts(db_client=aigc_db_client)
  463. # 订阅号
  464. subscription_accounts = [i for i in account_list if i['account_type'] in const.SUBSCRIBE_TYPE_SET]
  465. success_count = 0
  466. fail_count = 0
  467. for sub_item in tqdm(subscription_accounts):
  468. try:
  469. update_single_account(piaoquan_crawler_db_client, sub_item)
  470. success_count += 1
  471. # time.sleep(5)
  472. except Exception as e:
  473. fail_count += 1
  474. log(
  475. task="updatePublishedMsgDaily",
  476. function="update_job",
  477. message="单个账号文章更新失败, 报错信息是: {}".format(e),
  478. status="fail",
  479. data={
  480. "account": sub_item,
  481. "error": str(e),
  482. "traceback": traceback.format_exc()
  483. }
  484. )
  485. log(
  486. task="updatePublishedMsgDaily",
  487. function="update_job",
  488. message="订阅号更新完成",
  489. data={
  490. "success": success_count,
  491. "fail": fail_count
  492. }
  493. )
  494. if fail_count / (success_count + fail_count) > const.SUBSCRIBE_FAIL_RATE_THRESHOLD:
  495. bot(
  496. title="订阅号超过 {}% 的账号更新失败".format(int(const.SUBSCRIBE_FAIL_RATE_THRESHOLD * 100)),
  497. detail={
  498. "success": success_count,
  499. "fail": fail_count,
  500. "failRate": fail_count / (success_count + fail_count)
  501. }
  502. )
  503. bot(
  504. title="更新每日发布文章任务完成通知",
  505. detail={
  506. "msg": "订阅号更新完成",
  507. "finish_time": datetime.today().__str__()
  508. },
  509. mention=False
  510. )
  511. # 服务号
  512. server_accounts = [i for i in account_list if i['account_type'] == const.SERVICE_TYPE]
  513. for sub_item in tqdm(server_accounts):
  514. try:
  515. update_single_account(piaoquan_crawler_db_client, sub_item)
  516. time.sleep(5)
  517. except Exception as e:
  518. print(e)
  519. bot(
  520. title="更新每日发布文章任务完成通知",
  521. detail={
  522. "msg": "服务号更新完成",
  523. "finish_time": datetime.today().__str__()
  524. },
  525. mention=False
  526. )
  527. def check_job(piaoquan_crawler_db_client, aigc_db_client):
  528. """
  529. 校验任务
  530. :return:
  531. """
  532. account_list = get_accounts(db_client=aigc_db_client)
  533. # 订阅号
  534. subscription_accounts = [i for i in account_list if i['account_type'] in const.SUBSCRIBE_TYPE_SET]
  535. fail_list = []
  536. # check and rework if fail
  537. for sub_item in tqdm(subscription_accounts):
  538. res = check_single_account(piaoquan_crawler_db_client, sub_item)
  539. if not res:
  540. try:
  541. update_single_account(piaoquan_crawler_db_client, sub_item)
  542. except Exception as e:
  543. print(e)
  544. print(sub_item)
  545. # fail_list.append(sub_item)
  546. # check whether success and bot if fails
  547. for sub_item in tqdm(subscription_accounts):
  548. res = check_single_account(piaoquan_crawler_db_client, sub_item)
  549. if not res:
  550. # 去掉三个不需要查看的字段
  551. sub_item.pop('account_type', None)
  552. sub_item.pop('account_auth', None)
  553. sub_item.pop('account_id', None)
  554. fail_list.append(sub_item)
  555. if fail_list:
  556. try:
  557. bot(
  558. title="更新当天发布文章,存在未更新的账号",
  559. detail={
  560. "columns": generate_bot_columns(),
  561. "rows": fail_list
  562. },
  563. table=True
  564. )
  565. except Exception as e:
  566. print("Timeout Error: {}".format(e))
  567. else:
  568. bot(
  569. title="更新当天发布文章,所有账号均更新成功",
  570. mention=False,
  571. detail={
  572. "msg": "校验任务完成",
  573. "finish_time": datetime.today().__str__()
  574. }
  575. )
  576. def monitor(piaoquan_crawler_db_client, long_articles_db_client, run_date):
  577. """
  578. 监控任务, 监测周期为7天,监测文章是否被违规,若监测到违规文章,则进行告警
  579. :return:
  580. """
  581. if not run_date:
  582. run_date = datetime.today().strftime("%Y-%m-%d")
  583. monitor_start_timestamp = int(datetime.strptime(run_date, "%Y-%m-%d").timestamp()) - const.MONITOR_PERIOD
  584. select_sql = f"""
  585. SELECT ghId, accountName, title, ContentUrl, wx_sn, from_unixtime(publish_timestamp) AS publish_timestamp
  586. FROM {ARTICLE_TABLE}
  587. WHERE publish_timestamp >= {monitor_start_timestamp};
  588. """
  589. article_list = piaoquan_crawler_db_client.fetch(select_sql)
  590. for article in tqdm(article_list, desc="monitor article list"):
  591. gh_id = article[0]
  592. account_name = article[1]
  593. title = article[2]
  594. # 判断标题是否存在违规记录
  595. if whether_title_unsafe(long_articles_db_client, title):
  596. continue
  597. url = article[3]
  598. wx_sn = article[4]
  599. publish_date = article[5]
  600. try:
  601. response = spider.get_article_text(url, is_cache=False)
  602. response_code = response['code']
  603. if response_code == const.ARTICLE_ILLEGAL_CODE:
  604. bot(
  605. title="文章违规告警",
  606. detail={
  607. "ghId": gh_id,
  608. "accountName": account_name,
  609. "title": title,
  610. "wx_sn": str(wx_sn),
  611. "publish_date": str(publish_date)
  612. },
  613. mention=False
  614. )
  615. aiditApi.delete_articles(
  616. gh_id=gh_id,
  617. title=title
  618. )
  619. except Exception as e:
  620. error_msg = traceback.format_exc()
  621. log(
  622. task="monitor",
  623. function="monitor",
  624. message="请求文章详情失败",
  625. data={
  626. "ghId": gh_id,
  627. "accountName": account_name,
  628. "title": title,
  629. "wx_sn": str(wx_sn),
  630. "error": str(e),
  631. "msg": error_msg
  632. }
  633. )
  634. def main():
  635. """
  636. main
  637. :return:
  638. """
  639. parser = ArgumentParser()
  640. parser.add_argument(
  641. "--run_task",
  642. help="update: update_job, check: check_job, detail: get_article_detail_job, monitor: monitor")
  643. parser.add_argument(
  644. "--run_date",
  645. help="--run_date %Y-%m-%d",
  646. )
  647. args = parser.parse_args()
  648. # 初始化数据库连接
  649. try:
  650. piaoquan_crawler_db_client = DatabaseConnector(piaoquan_crawler_config)
  651. piaoquan_crawler_db_client.connect()
  652. aigc_db_client = DatabaseConnector(denet_config)
  653. aigc_db_client.connect()
  654. long_articles_db_client = DatabaseConnector(long_articles_config)
  655. except Exception as e:
  656. error_msg = traceback.format_exc()
  657. bot(
  658. title="更新文章任务连接数据库失败",
  659. detail={
  660. "error": e,
  661. "msg": error_msg
  662. }
  663. )
  664. return
  665. if args.run_task:
  666. run_task = args.run_task
  667. match run_task:
  668. case "update":
  669. update_job(piaoquan_crawler_db_client=piaoquan_crawler_db_client, aigc_db_client=aigc_db_client)
  670. get_article_detail_job(db_client=piaoquan_crawler_db_client)
  671. case "check":
  672. check_job(piaoquan_crawler_db_client=piaoquan_crawler_db_client, aigc_db_client=aigc_db_client)
  673. case "detail":
  674. get_article_detail_job(db_client=piaoquan_crawler_db_client)
  675. case "monitor":
  676. if args.run_date:
  677. run_date = args.run_date
  678. else:
  679. run_date = None
  680. monitor(piaoquan_crawler_db_client=piaoquan_crawler_db_client,
  681. long_articles_db_client=long_articles_db_client, run_date=run_date)
  682. case _:
  683. print("No such task, input update: update_job, check: check_job, detail: get_article_detail_job")
  684. else:
  685. # update_job(piaoquan_crawler_db_client=piaoquan_crawler_db_client, aigc_db_client=aigc_db_client)
  686. # check_job(piaoquan_crawler_db_client=piaoquan_crawler_db_client, aigc_db_client=aigc_db_client)
  687. get_article_detail_job(db_client=piaoquan_crawler_db_client)
  688. if __name__ == '__main__':
  689. main()