recycle_daily_publish_articles.py 23 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622
  1. import asyncio
  2. import json
  3. import time
  4. import datetime
  5. import urllib.parse
  6. import traceback
  7. from tqdm.asyncio import tqdm
  8. from applications.api import feishu_robot
  9. from applications.crawler.wechat import get_article_list_from_account
  10. from applications.crawler.wechat import get_article_detail
  11. from applications.pipeline import insert_article_into_recycle_pool
  12. from applications.utils import str_to_md5
  13. class Const:
  14. # 订阅号
  15. SUBSCRIBE_TYPE_SET = {0, 1}
  16. NEW_ACCOUNT_CRAWL_PERIOD = 60 * 60 * 24 * 30
  17. FORBIDDEN_GH_IDS = [
  18. "gh_4c058673c07e",
  19. "gh_de9f9ebc976b",
  20. "gh_7b4a5f86d68c",
  21. "gh_f902cea89e48",
  22. "gh_789a40fe7935",
  23. "gh_cd041ed721e6",
  24. "gh_62d7f423f382",
  25. "gh_043223059726",
  26. 'gh_6cfd1132df94',
  27. 'gh_7f5075624a50',
  28. 'gh_d4dffc34ac39'
  29. ]
  30. # 文章状态
  31. # 记录默认状态
  32. DEFAULT_STATUS = 0
  33. # 请求接口失败状态
  34. REQUEST_FAIL_STATUS = -1
  35. # 文章被删除状态
  36. DELETE_STATUS = -2
  37. # 未知原因无信息返回状态
  38. UNKNOWN_STATUS = -3
  39. # 文章违规状态
  40. ILLEGAL_STATUS = -4
  41. ARTICLE_ILLEGAL_CODE = 25012
  42. ARTICLE_DELETE_CODE = 25005
  43. ARTICLE_SUCCESS_CODE = 0
  44. ARTICLE_UNKNOWN_CODE = 10000
  45. ACCOUNT_FORBIDDEN_CODE = 25013
  46. CRAWL_CRASH_CODE = 20000
  47. STAT_PERIOD = 3 * 24 * 3600
  48. INIT_STATUS = 0
  49. PROCESSING_STATUS = 1
  50. SUCCESS_STATUS = 2
  51. FAILED_STATUS = 99
  52. class RecycleDailyPublishArticlesTask(Const):
  53. def __init__(self, pool, log_client, date_string):
  54. self.pool = pool
  55. self.log_client = log_client
  56. self.date_string = date_string
  57. async def get_publish_accounts(self):
  58. """
  59. get all publish accounts
  60. """
  61. query = f"""
  62. select distinct t3.name, t3.gh_id, t3.follower_count, t3.create_timestamp as account_init_timestamp,
  63. t4.service_type_info as account_type, t4.verify_type_info as account_auth, t3.id as account_id,
  64. group_concat(distinct t5.remark) as account_remark
  65. from
  66. publish_plan t1
  67. join publish_plan_account t2 on t1.id = t2.plan_id
  68. join publish_account t3 on t2.account_id = t3.id
  69. left join publish_account_wx_type t4 on t3.id = t4.account_id
  70. left join publish_account_remark t5 on t3.id = t5.publish_account_id
  71. where t1.plan_status = 1 and t1.content_modal = 3 and t3.channel = 5
  72. group by t3.id;
  73. """
  74. account_list = await self.pool.async_fetch(query, db_name="aigc")
  75. return [i for i in account_list if "自动回复" not in str(i["account_remark"])]
  76. async def get_account_status(self):
  77. """get account experiment status"""
  78. sql = f"""
  79. select t1.account_id, t2.status
  80. from wx_statistics_group_source_account t1
  81. join wx_statistics_group_source t2 on t1.group_source_name = t2.account_source_name;
  82. """
  83. account_status_list = await self.pool.async_fetch(sql, db_name="aigc")
  84. account_status_dict = {
  85. account["account_id"]: account["status"] for account in account_status_list
  86. }
  87. return account_status_dict
  88. async def recycle_single_account(self, account):
  89. """recycle single account"""
  90. query = """
  91. select max(publish_timestamp) as publish_timestamp from official_articles_v2 where ghId = %s;
  92. """
  93. response = await self.pool.async_fetch(
  94. query, params=(account["gh_id"],), db_name="piaoquan_crawler"
  95. )
  96. if response:
  97. max_publish_timestamp = response[0]["publish_timestamp"]
  98. else:
  99. max_publish_timestamp = int(time.time()) - self.NEW_ACCOUNT_CRAWL_PERIOD
  100. cursor = None
  101. while True:
  102. response = await get_article_list_from_account(
  103. account_id=account["gh_id"], index=cursor
  104. )
  105. response_code = response["code"]
  106. match response_code:
  107. case self.ACCOUNT_FORBIDDEN_CODE:
  108. await feishu_robot.bot(
  109. title="发布账号封禁",
  110. detail={
  111. "账号名称": account["name"],
  112. "账号id": account["gh_id"],
  113. },
  114. )
  115. return
  116. case self.ARTICLE_SUCCESS_CODE:
  117. msg_list = response.get("data", {}).get("data", [])
  118. if not msg_list:
  119. return
  120. await insert_article_into_recycle_pool(
  121. self.pool, self.log_client, msg_list, account
  122. )
  123. # check last article
  124. last_article = msg_list[-1]
  125. last_publish_timestamp = last_article["AppMsg"]["BaseInfo"][
  126. "UpdateTime"
  127. ]
  128. if last_publish_timestamp <= max_publish_timestamp:
  129. return
  130. cursor = response["data"].get("next_cursor")
  131. if not cursor:
  132. return
  133. case self.CRAWL_CRASH_CODE:
  134. await self.log_client.log(
  135. contents={
  136. "task": "recycle_daily_publish_articles",
  137. "data": {
  138. "gh_id": account["gh_id"],
  139. },
  140. "message": "爬虫挂掉",
  141. "status": "fail",
  142. }
  143. )
  144. case _:
  145. return
  146. async def get_task_list(self):
  147. """recycle all publish accounts articles"""
  148. binding_accounts = await self.get_publish_accounts()
  149. # 过滤封禁账号
  150. binding_accounts = [
  151. i for i in binding_accounts if i["gh_id"] not in self.FORBIDDEN_GH_IDS
  152. ]
  153. account_status = await self.get_account_status()
  154. account_list = [
  155. {
  156. **item,
  157. "using_status": (
  158. 0 if account_status.get(item["account_id"]) == "实验" else 1
  159. ),
  160. }
  161. for item in binding_accounts
  162. ]
  163. # 订阅号
  164. subscription_accounts = [
  165. i for i in account_list if i["account_type"] in self.SUBSCRIBE_TYPE_SET
  166. ]
  167. return subscription_accounts
  168. async def deal(self):
  169. subscription_accounts = await self.get_task_list()
  170. for account in tqdm(subscription_accounts, desc="recycle each account"):
  171. try:
  172. await self.recycle_single_account(account)
  173. except Exception as e:
  174. print(
  175. f"{account['name']}\t{account['gh_id']}: recycle account error:", e
  176. )
  177. class CheckDailyPublishArticlesTask(RecycleDailyPublishArticlesTask):
  178. async def check_account(self, account: dict, date_string: str) -> bool:
  179. """check account data"""
  180. query = """
  181. select accountName, count(1) as publish_count
  182. from official_articles_v2 where ghId = %s and from_unixtime(publish_timestamp) > %s;
  183. """
  184. response = await self.pool.async_fetch(
  185. query=query,
  186. db_name="piaoquan_crawler",
  187. params=(account["gh_id"], date_string),
  188. )
  189. if response:
  190. today_publish_count = response[0]["publish_count"]
  191. return today_publish_count > 0
  192. else:
  193. return False
  194. async def deal(self):
  195. task_list = await self.get_task_list()
  196. for task in tqdm(task_list, desc="check each account step1: "):
  197. if await self.check_account(task, self.date_string):
  198. continue
  199. else:
  200. await self.recycle_single_account(task)
  201. # check again
  202. fail_list = []
  203. for second_task in tqdm(task_list, desc="check each account step2: "):
  204. if await self.check_account(second_task, self.date_string):
  205. continue
  206. else:
  207. second_task.pop("account_type", None)
  208. second_task.pop("account_auth", None)
  209. second_task.pop("account_id", None)
  210. second_task.pop("account_remark", None)
  211. fail_list.append(second_task)
  212. if fail_list:
  213. now = datetime.datetime.now()
  214. if now.hour < 20:
  215. return
  216. columns = [
  217. feishu_robot.create_feishu_columns_sheet(
  218. sheet_type="plain_text",
  219. sheet_name="name",
  220. display_name="公众号名称",
  221. ),
  222. feishu_robot.create_feishu_columns_sheet(
  223. sheet_type="plain_text", sheet_name="gh_id", display_name="gh_id"
  224. ),
  225. feishu_robot.create_feishu_columns_sheet(
  226. sheet_type="number",
  227. sheet_name="follower_count",
  228. display_name="粉丝数",
  229. ),
  230. feishu_robot.create_feishu_columns_sheet(
  231. sheet_type="date",
  232. sheet_name="account_init_timestamp",
  233. display_name="账号接入系统时间",
  234. ),
  235. feishu_robot.create_feishu_columns_sheet(
  236. sheet_type="plain_text",
  237. sheet_name="using_status",
  238. display_name="利用状态",
  239. ),
  240. ]
  241. await feishu_robot.bot(
  242. title=f"{self.date_string} 发布文章,存在未更新的账号",
  243. detail={"columns": columns, "rows": fail_list},
  244. table=True,
  245. mention=False,
  246. )
  247. else:
  248. await feishu_robot.bot(
  249. title=f"{self.date_string} 发布文章,所有文章更新成功",
  250. detail={
  251. "date_string": self.date_string,
  252. "finish_time": datetime.datetime.now().__str__(),
  253. },
  254. mention=False,
  255. )
  256. class UpdateRootSourceIdAndUpdateTimeTask(Const):
  257. """
  258. update publish_timestamp && root_source_id
  259. """
  260. def __init__(self, pool, log_client):
  261. self.pool = pool
  262. self.log_client = log_client
  263. async def get_article_list(self) -> list[dict]:
  264. query = """select ContentUrl, wx_sn from official_articles_v2 where publish_timestamp in %s;"""
  265. article_list = await self.pool.async_fetch(
  266. query=query, db_name="piaoquan_crawler", params=(tuple([0, -1]),)
  267. )
  268. return article_list
  269. async def check_each_article(self, article: dict):
  270. url = article["ContentUrl"]
  271. wx_sn = article["wx_sn"].decode("utf-8")
  272. try:
  273. response = await get_article_detail(url)
  274. response_code = response["code"]
  275. if response_code == self.ARTICLE_DELETE_CODE:
  276. publish_timestamp_s = self.DELETE_STATUS
  277. root_source_id_list = []
  278. elif response_code == self.ARTICLE_ILLEGAL_CODE:
  279. publish_timestamp_s = self.ILLEGAL_STATUS
  280. root_source_id_list = []
  281. elif response_code == self.ARTICLE_SUCCESS_CODE:
  282. data = response["data"]["data"]
  283. publish_timestamp_ms = data["publish_timestamp"]
  284. publish_timestamp_s = int(publish_timestamp_ms / 1000)
  285. mini_program = data.get("mini_program", [])
  286. if mini_program:
  287. root_source_id_list = [
  288. urllib.parse.parse_qs(urllib.parse.unquote(i["path"])).get("rootSourceId", [""])[0]
  289. for i in mini_program
  290. ]
  291. else:
  292. root_source_id_list = []
  293. else:
  294. publish_timestamp_s = self.UNKNOWN_STATUS
  295. root_source_id_list = []
  296. except Exception as e:
  297. publish_timestamp_s = self.REQUEST_FAIL_STATUS
  298. root_source_id_list = None
  299. error_msg = traceback.format_exc()
  300. await self.log_client.log(
  301. contents={
  302. "task": "get_official_article_detail",
  303. "data": {
  304. "url": url,
  305. "wx_sn": wx_sn,
  306. "error_msg": error_msg,
  307. "error": str(e),
  308. },
  309. "function": "check_each_article",
  310. "status": "fail",
  311. }
  312. )
  313. query = """
  314. update official_articles_v2 set publish_timestamp = %s, root_source_id_list = %s
  315. where wx_sn = %s;
  316. """
  317. await self.pool.async_save(
  318. query=query,
  319. db_name="piaoquan_crawler",
  320. params=(
  321. publish_timestamp_s,
  322. json.dumps(root_source_id_list, ensure_ascii=False),
  323. wx_sn,
  324. ),
  325. )
  326. if publish_timestamp_s == self.REQUEST_FAIL_STATUS:
  327. article['wx_sn'] = wx_sn
  328. return article
  329. else:
  330. return None
  331. async def fallback_mechanism(self):
  332. # 通过msgId 来修改publish_timestamp
  333. update_sql = f"""
  334. update official_articles_v2 oav
  335. join (
  336. select ghId, appMsgId, max(publish_timestamp) as publish_timestamp
  337. from official_articles_v2
  338. where publish_timestamp > %s
  339. group by ghId, appMsgId
  340. ) vv
  341. on oav.appMsgId = vv.appMsgId and oav.ghId = vv.ghId
  342. set oav.publish_timestamp = vv.publish_timestamp
  343. where oav.publish_timestamp <= %s;
  344. """
  345. affected_rows_1 = await self.pool.async_save(
  346. query=update_sql, params=(0, 0), db_name="piaoquan_crawler"
  347. )
  348. # 若还是无 publish_timestamp,用update_time当作 publish_timestamp
  349. update_sql_2 = f"""
  350. update official_articles_v2
  351. set publish_timestamp = updateTime
  352. where publish_timestamp < %s;
  353. """
  354. affected_rows_2 = await self.pool.async_save(query=update_sql_2, params=(0,), db_name="piaoquan_crawler")
  355. if affected_rows_1 or affected_rows_2:
  356. await feishu_robot.bot(
  357. title="执行兜底修改发布时间戳",
  358. detail={
  359. "通过msgId修改": affected_rows_1,
  360. "通过create_timestamp修改": affected_rows_2,
  361. },
  362. mention=False,
  363. )
  364. async def deal(self):
  365. task_list = await self.get_article_list()
  366. for task in tqdm(task_list, desc="get article detail step1: "):
  367. try:
  368. await self.check_each_article(task)
  369. except Exception as e:
  370. try:
  371. await self.log_client.log(
  372. contents={
  373. "task": "get_official_article_detail_step1",
  374. "data": {
  375. "detail": {
  376. "url": task["ContentUrl"],
  377. "wx_sn": task["wx_sn"].decode("utf-8"),
  378. },
  379. "error_msg": traceback.format_exc(),
  380. "error": str(e),
  381. },
  382. "function": "check_each_article",
  383. "status": "fail",
  384. }
  385. )
  386. except Exception as e:
  387. print(e)
  388. print(traceback.format_exc())
  389. # process_failed_task_reproduce
  390. fail_tasks = await self.get_article_list()
  391. fail_list = []
  392. for fail_task in tqdm(fail_tasks, desc="get article detail step2: "):
  393. try:
  394. res = await self.check_each_article(fail_task)
  395. if res:
  396. fail_list.append(res)
  397. except Exception as e:
  398. await self.log_client.log(
  399. contents={
  400. "task": "get_official_article_detail_step2",
  401. "data": {
  402. "detail": {
  403. "url": fail_task["ContentUrl"],
  404. "wx_sn": fail_task["wx_sn"].decode("utf-8"),
  405. },
  406. "error_msg": traceback.format_exc(),
  407. "error": str(e),
  408. },
  409. "function": "check_each_article",
  410. "status": "fail",
  411. }
  412. )
  413. if fail_list:
  414. await feishu_robot.bot(title="更新文章,获取detail失败", detail=fail_list)
  415. current_hour = datetime.datetime.now().hour
  416. if current_hour >= 21:
  417. await self.fallback_mechanism()
  418. class RecycleFwhDailyPublishArticlesTask(Const):
  419. def __init__(self, pool, log_client):
  420. self.pool = pool
  421. self.log_client = log_client
  422. @staticmethod
  423. async def illegal_article_bot(
  424. account_name: str,
  425. gh_id: str,
  426. group_id: str,
  427. illegal_msg: str,
  428. publish_date: str,
  429. ):
  430. await feishu_robot.bot(
  431. title="服务号文章违规告警,请前往微信公众平台处理",
  432. detail={
  433. "account_name": account_name,
  434. "gh_id": gh_id,
  435. "group_id": group_id,
  436. "illegal_msg": illegal_msg,
  437. "publish_date": str(publish_date),
  438. },
  439. env="server_account_publish_monitor",
  440. )
  441. async def save_data_to_database(self, article):
  442. """
  443. save data to db
  444. """
  445. insert_query = f"""
  446. insert into official_articles_v2
  447. (ghId, accountName, appMsgId, title, Type, createTime, updateTime, ItemIndex, ContentUrl, show_view_count,
  448. wx_sn, title_md5, article_group, channel_content_id, root_source_id_list, publish_timestamp)
  449. values (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s);
  450. """
  451. return await self.pool.async_save(
  452. query=insert_query, db_name="piaoquan_crawler", params=article
  453. )
  454. async def update_article_read_cnt(self, wx_sn, new_read_cnt):
  455. if new_read_cnt <= 0:
  456. return 0
  457. update_query = """
  458. update official_articles_v2
  459. set show_view_count = %s
  460. where wx_sn = %s;
  461. """
  462. return await self.pool.async_save(
  463. query=update_query, db_name="piaoquan_crawler", params=(new_read_cnt, wx_sn)
  464. )
  465. async def get_group_server_accounts(self):
  466. fetch_query = "select gzh_id from article_gzh_developer;"
  467. fetch_response = await self.pool.async_fetch(
  468. query=fetch_query, db_name="piaoquan_crawler"
  469. )
  470. gh_id_list = [i["gzh_id"] for i in fetch_response]
  471. return gh_id_list
  472. async def get_stat_published_articles(self, gh_id):
  473. earliest_timestamp = int(time.time()) - self.STAT_PERIOD
  474. fetch_query = """
  475. select publish_date, account_name, gh_id, user_group_id, url, publish_timestamp
  476. from long_articles_group_send_result
  477. where gh_id = %s and recycle_status = %s and create_time > %s;
  478. """
  479. earliest_time = datetime.datetime.fromtimestamp(earliest_timestamp).strftime(
  480. "%Y-%m-%d %H:%M:%S"
  481. )
  482. return await self.pool.async_fetch(
  483. query=fetch_query,
  484. params=(gh_id, self.SUCCESS_STATUS, earliest_time),
  485. )
  486. async def process_each_account_data(self, account_published_article_list):
  487. if not account_published_article_list:
  488. return
  489. for article in account_published_article_list:
  490. account_name = article["account_name"]
  491. gh_id = article["gh_id"]
  492. user_group_id = article["user_group_id"]
  493. url = article["url"]
  494. publish_date = article["publish_date"]
  495. # get article detail info with spider
  496. try:
  497. article_detail_info = await get_article_detail(
  498. url, is_count=True, is_cache=False
  499. )
  500. response_code = article_detail_info["code"]
  501. if response_code == self.ARTICLE_ILLEGAL_CODE:
  502. await self.illegal_article_bot(
  503. account_name=account_name,
  504. gh_id=gh_id,
  505. group_id=user_group_id,
  506. illegal_msg=article_detail_info["msg"],
  507. publish_date=publish_date,
  508. )
  509. await asyncio.sleep(1)
  510. content_url = article_detail_info["data"]["data"]["content_link"]
  511. app_msg_id = content_url.split("mid=")[-1].split("&")[0]
  512. wx_sn = content_url.split("sn=")[-1]
  513. publish_timestamp = int(
  514. article_detail_info["data"]["data"]["publish_timestamp"] / 1000
  515. )
  516. create_time = publish_timestamp
  517. update_time = publish_timestamp
  518. item_index = article_detail_info["data"]["data"]["item_index"]
  519. show_view_count = article_detail_info["data"]["data"]["view_count"]
  520. title = article_detail_info["data"]["data"]["title"]
  521. title_md5 = str_to_md5(title)
  522. channel_content_id = article_detail_info["data"]["data"][
  523. "channel_content_id"
  524. ]
  525. mini_program_info = article_detail_info["data"]["data"]["mini_program"]
  526. root_source_id_list = [
  527. urllib.parse.parse_qs(urllib.parse.unquote(i["path"]))[
  528. "rootSourceId"
  529. ][0]
  530. for i in mini_program_info
  531. ]
  532. root_source_id_list = json.dumps(root_source_id_list)
  533. try:
  534. await self.save_data_to_database(
  535. article=(
  536. gh_id,
  537. account_name,
  538. app_msg_id,
  539. title,
  540. "9",
  541. create_time,
  542. update_time,
  543. item_index,
  544. url,
  545. show_view_count,
  546. wx_sn,
  547. title_md5,
  548. user_group_id,
  549. channel_content_id,
  550. root_source_id_list,
  551. publish_timestamp,
  552. )
  553. )
  554. except Exception as e:
  555. await self.update_article_read_cnt(wx_sn, show_view_count)
  556. except Exception as e:
  557. print(f"article {url} is not available, skip it")
  558. print(e)
  559. async def deal(self):
  560. account_id_list = await self.get_group_server_accounts()
  561. for account_id in account_id_list:
  562. publish_articles = tqdm(
  563. await self.get_stat_published_articles(account_id),
  564. desc=f"<crawling> {account_id}",
  565. )
  566. await self.process_each_account_data(publish_articles)