recycle_daily_publish_articles.py 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442
  1. import json
  2. import time
  3. import datetime
  4. import urllib.parse
  5. import traceback
  6. from tqdm import tqdm
  7. from applications.api import feishu_robot
  8. from applications.crawler.wechat import get_article_list_from_account
  9. from applications.crawler.wechat import get_article_detail
  10. from applications.pipeline import insert_article_into_recycle_pool
  11. class Const:
  12. # 订阅号
  13. SUBSCRIBE_TYPE_SET = {0, 1}
  14. NEW_ACCOUNT_CRAWL_PERIOD = 60 * 60 * 24 * 30
  15. FORBIDDEN_GH_IDS = [
  16. "gh_4c058673c07e",
  17. "gh_de9f9ebc976b",
  18. "gh_7b4a5f86d68c",
  19. "gh_f902cea89e48",
  20. "gh_789a40fe7935",
  21. "gh_cd041ed721e6",
  22. "gh_62d7f423f382",
  23. "gh_043223059726",
  24. ]
  25. # 文章状态
  26. # 记录默认状态
  27. DEFAULT_STATUS = 0
  28. # 请求接口失败状态
  29. REQUEST_FAIL_STATUS = -1
  30. # 文章被删除状态
  31. DELETE_STATUS = -2
  32. # 未知原因无信息返回状态
  33. UNKNOWN_STATUS = -3
  34. # 文章违规状态
  35. ILLEGAL_STATUS = -4
  36. ARTICLE_ILLEGAL_CODE = 25012
  37. ARTICLE_DELETE_CODE = 25005
  38. ARTICLE_SUCCESS_CODE = 0
  39. ARTICLE_UNKNOWN_CODE = 10000
  40. class RecycleDailyPublishArticlesTask(Const):
  41. def __init__(self, pool, log_client, date_string):
  42. self.pool = pool
  43. self.log_client = log_client
  44. self.date_string = date_string
  45. async def get_publish_accounts(self):
  46. """
  47. get all publish accounts
  48. """
  49. query = f"""
  50. select distinct t3.name, t3.gh_id, t3.follower_count, t3.create_timestamp as account_init_timestamp,
  51. t4.service_type_info as account_type, t4.verify_type_info as account_auth, t3.id as account_id,
  52. group_concat(distinct t5.remark) as account_remark
  53. from
  54. publish_plan t1
  55. join publish_plan_account t2 on t1.id = t2.plan_id
  56. join publish_account t3 on t2.account_id = t3.id
  57. left join publish_account_wx_type t4 on t3.id = t4.account_id
  58. left join publish_account_remark t5 on t3.id = t5.publish_account_id
  59. where t1.plan_status = 1 and t1.content_modal = 3 and t3.channel = 5
  60. group by t3.id;
  61. """
  62. account_list, error = await self.pool.async_fetch(query, db_name="aigc")
  63. return [i for i in account_list if "自动回复" not in str(i["account_remark"])]
  64. async def get_account_status(self):
  65. """get account experiment status"""
  66. sql = f"""
  67. select t1.account_id, t2.status
  68. from wx_statistics_group_source_account t1
  69. join wx_statistics_group_source t2 on t1.group_source_name = t2.account_source_name;
  70. """
  71. account_status_list, error = await self.pool.async_fetch(sql, db_name="aigc")
  72. account_status_dict = {
  73. account["account_id"]: account["status"] for account in account_status_list
  74. }
  75. return account_status_dict
  76. async def recycle_single_account(self, account):
  77. """recycle single account"""
  78. query = f"""
  79. select max(publish_timestamp) as publish_timestamp from official_articles_v2 where ghId = %s;
  80. """
  81. response, error = await self.pool.async_fetch(
  82. query, params=(account["gh_id"],), db_name="piaoquan_crawler"
  83. )
  84. if response:
  85. max_publish_timestamp = response[0]["publish_timestamp"]
  86. else:
  87. max_publish_timestamp = int(time.time()) - self.NEW_ACCOUNT_CRAWL_PERIOD
  88. cursor = None
  89. while True:
  90. response = get_article_list_from_account(
  91. account_id=account["gh_id"], index=cursor
  92. )
  93. response_code = response["code"]
  94. match response_code:
  95. case 25013:
  96. await feishu_robot.bot(
  97. title="发布账号封禁",
  98. detail={
  99. "账号名称": account["name"],
  100. "账号id": account["gh_id"],
  101. },
  102. )
  103. return
  104. case 0:
  105. msg_list = response.get("data", {}).get("data", [])
  106. if not msg_list:
  107. return
  108. await insert_article_into_recycle_pool(
  109. self.pool, self.log_client, msg_list, account
  110. )
  111. # check last article
  112. last_article = msg_list[-1]
  113. last_publish_timestamp = last_article["AppMsg"]["BaseInfo"][
  114. "UpdateTime"
  115. ]
  116. if last_publish_timestamp <= max_publish_timestamp:
  117. return
  118. cursor = response["data"].get("next_cursor")
  119. if not cursor:
  120. return
  121. case _:
  122. return
  123. async def get_task_list(self):
  124. """recycle all publish accounts articles"""
  125. binding_accounts = await self.get_publish_accounts()
  126. # 过滤封禁账号
  127. binding_accounts = [
  128. i for i in binding_accounts if i["gh_id"] not in self.FORBIDDEN_GH_IDS
  129. ]
  130. account_status = await self.get_account_status()
  131. account_list = [
  132. {
  133. **item,
  134. "using_status": (
  135. 0 if account_status.get(item["account_id"]) == "实验" else 1
  136. ),
  137. }
  138. for item in binding_accounts
  139. ]
  140. # 订阅号
  141. subscription_accounts = [
  142. i for i in account_list if i["account_type"] in self.SUBSCRIBE_TYPE_SET
  143. ]
  144. return subscription_accounts
  145. async def deal(self):
  146. subscription_accounts = await self.get_task_list()
  147. for account in tqdm(subscription_accounts, desc="recycle each account"):
  148. try:
  149. await self.recycle_single_account(account)
  150. except Exception as e:
  151. print(
  152. f"{account['name']}\t{account['gh_id']}: recycle account error:", e
  153. )
  154. class CheckDailyPublishArticlesTask(RecycleDailyPublishArticlesTask):
  155. async def check_account(self, account: dict, date_string: str) -> bool:
  156. """check account data"""
  157. query = f"""
  158. select accountName, count(1) as publish_count
  159. from official_articles_v2 where ghId = %s and from_unixtime(publish_timestamp) > %s;
  160. """
  161. response, error = await self.pool.async_fetch(
  162. query=query,
  163. db_name="piaoquan_crawler",
  164. params=(account["gh_id"], date_string),
  165. )
  166. if error:
  167. await feishu_robot.bot(
  168. title="sql错误",
  169. detail={
  170. "task": "CheckDailyPublishArticlesTask",
  171. "function": "check_account",
  172. "account": account,
  173. "date_string": date_string,
  174. },
  175. mention=False,
  176. )
  177. return False
  178. else:
  179. today_publish_count = response[0]["publish_count"]
  180. return today_publish_count > 0
  181. async def deal(self):
  182. task_list = await self.get_task_list()
  183. for task in tqdm(task_list, desc="check each account step1: "):
  184. if await self.check_account(task, self.date_string):
  185. continue
  186. else:
  187. await self.recycle_single_account(task)
  188. # check again
  189. fail_list = []
  190. for second_task in tqdm(task_list, desc="check each account step2: "):
  191. if await self.check_account(second_task, self.date_string):
  192. continue
  193. else:
  194. second_task.pop("account_type", None)
  195. second_task.pop("account_auth", None)
  196. second_task.pop("account_id", None)
  197. second_task.pop("account_remark", None)
  198. fail_list.append(second_task)
  199. if fail_list:
  200. columns = [
  201. feishu_robot.create_feishu_columns_sheet(
  202. sheet_type="plain_text",
  203. sheet_name="name",
  204. display_name="公众号名称",
  205. ),
  206. feishu_robot.create_feishu_columns_sheet(
  207. sheet_type="plain_text", sheet_name="gh_id", display_name="gh_id"
  208. ),
  209. feishu_robot.create_feishu_columns_sheet(
  210. sheet_type="number",
  211. sheet_name="follower_count",
  212. display_name="粉丝数",
  213. ),
  214. feishu_robot.create_feishu_columns_sheet(
  215. sheet_type="date",
  216. sheet_name="account_init_timestamp",
  217. display_name="账号接入系统时间",
  218. ),
  219. feishu_robot.create_feishu_columns_sheet(
  220. sheet_type="plain_text",
  221. sheet_name="using_status",
  222. display_name="利用状态",
  223. ),
  224. ]
  225. await feishu_robot.bot(
  226. title=f"{self.date_string} 发布文章,存在未更新的账号",
  227. detail={"columns": columns, "rows": fail_list},
  228. table=True,
  229. mention=False,
  230. )
  231. else:
  232. await feishu_robot.bot(
  233. title=f"{self.date_string} 发布文章,所有文章更新成功",
  234. detail={
  235. "date_string": self.date_string,
  236. "finish_time": datetime.datetime.now().__str__(),
  237. },
  238. mention=False,
  239. )
  240. class UpdateRootSourceIdAndUpdateTimeTask(Const):
  241. """
  242. update publish_timestamp && root_source_id
  243. """
  244. def __init__(self, pool, log_client):
  245. self.pool = pool
  246. self.log_client = log_client
  247. async def get_article_list(self):
  248. query = f"""select ContentUrl, wx_sn from official_articles_v2 where publish_timestamp in %s;"""
  249. article_list, error = await self.pool.async_fetch(
  250. query=query, db_name="piaoquan_crawler", params=(tuple([0, -1]),)
  251. )
  252. return article_list
  253. async def check_each_article(self, article: dict):
  254. url = article["ContentUrl"]
  255. wx_sn = article["wx_sn"].decode("utf-8")
  256. try:
  257. response = get_article_detail(url)
  258. response_code = response["code"]
  259. if response_code == self.ARTICLE_DELETE_CODE:
  260. publish_timestamp_s = self.DELETE_STATUS
  261. root_source_id_list = []
  262. elif response_code == self.ARTICLE_ILLEGAL_CODE:
  263. publish_timestamp_s = self.ILLEGAL_STATUS
  264. root_source_id_list = []
  265. elif response_code == self.ARTICLE_SUCCESS_CODE:
  266. data = response["data"]["data"]
  267. publish_timestamp_ms = data["publish_timestamp"]
  268. publish_timestamp_s = int(publish_timestamp_ms / 1000)
  269. mini_program = data.get("mini_program", [])
  270. if mini_program:
  271. root_source_id_list = [
  272. urllib.parse.parse_qs(urllib.parse.unquote(i["path"]))[
  273. "rootSourceId"
  274. ][0]
  275. for i in mini_program
  276. ]
  277. else:
  278. root_source_id_list = []
  279. else:
  280. publish_timestamp_s = self.UNKNOWN_STATUS
  281. root_source_id_list = []
  282. except Exception as e:
  283. publish_timestamp_s = self.REQUEST_FAIL_STATUS
  284. root_source_id_list = None
  285. error_msg = traceback.format_exc()
  286. await self.log_client.log(
  287. contents={
  288. "task": "get_official_article_detail",
  289. "data": {
  290. "url": url,
  291. "wx_sn": wx_sn,
  292. "error_msg": error_msg,
  293. "error": str(e),
  294. },
  295. "function": "check_each_article",
  296. "status": "fail",
  297. }
  298. )
  299. query = f"""
  300. update official_articles_v2 set publish_timestamp = %s, root_source_id_list = %s
  301. where wx_sn = %s;
  302. """
  303. await self.pool.async_save(
  304. query=query,
  305. db_name="piaoquan_crawler",
  306. params=(
  307. publish_timestamp_s,
  308. json.dumps(root_source_id_list, ensure_ascii=False),
  309. wx_sn,
  310. ),
  311. )
  312. if publish_timestamp_s == self.REQUEST_FAIL_STATUS:
  313. return article
  314. else:
  315. return None
  316. async def fallback_mechanism(self):
  317. # 通过msgId 来修改publish_timestamp
  318. update_sql = f"""
  319. update official_articles_v2 oav
  320. join (
  321. select ghId, appMsgId, max(publish_timestamp) as publish_timestamp
  322. from official_articles_v2
  323. where publish_timestamp > %s
  324. group by ghId, appMsgId
  325. ) vv
  326. on oav.appMsgId = vv.appMsgId and oav.ghId = vv.ghId
  327. set oav.publish_timestamp = vv.publish_timestamp
  328. where oav.publish_timestamp <= %s;
  329. """
  330. affected_rows_1 = await self.pool.async_save(
  331. query=update_sql, params=(0, 0), db_name="piaoquan_crawler"
  332. )
  333. # 若还是无 publish_timestamp,用update_time当作 publish_timestamp
  334. update_sql_2 = f"""
  335. update official_articles_v2
  336. set publish_timestamp = updateTime
  337. where publish_timestamp < %s;
  338. """
  339. affected_rows_2 = await self.pool.async_save(query=update_sql_2, params=0)
  340. if affected_rows_1 or affected_rows_2:
  341. await feishu_robot.bot(
  342. title="执行兜底修改发布时间戳",
  343. detail={
  344. "通过msgId修改": affected_rows_1,
  345. "通过update_timestamp修改": affected_rows_2,
  346. },
  347. mention=False,
  348. )
  349. async def deal(self):
  350. task_list = await self.get_article_list()
  351. for task in tqdm(task_list, desc="get article detail step1: "):
  352. try:
  353. await self.check_each_article(task)
  354. except Exception as e:
  355. try:
  356. await self.log_client.log(
  357. contents={
  358. "task": "get_official_article_detail_step1",
  359. "data": {
  360. "detail": {
  361. "url": task["ContentUrl"],
  362. "wx_sn": task["wx_sn"].decode("utf-8"),
  363. },
  364. "error_msg": traceback.format_exc(),
  365. "error": str(e),
  366. },
  367. "function": "check_each_article",
  368. "status": "fail",
  369. }
  370. )
  371. except Exception as e:
  372. print(e)
  373. print(traceback.format_exc())
  374. # process_failed_task_reproduce
  375. fail_tasks = await self.get_article_list()
  376. fail_list = []
  377. for fail_task in tqdm(fail_tasks, desc="get article detail step2: "):
  378. try:
  379. res = await self.check_each_article(fail_task)
  380. if res:
  381. fail_list.append(res)
  382. except Exception as e:
  383. await self.log_client.log(
  384. contents={
  385. "task": "get_official_article_detail_step2",
  386. "data": {
  387. "detail": {
  388. "url": fail_task["ContentUrl"],
  389. "wx_sn": fail_task["wx_sn"].decode("utf-8"),
  390. },
  391. "error_msg": traceback.format_exc(),
  392. "error": str(e),
  393. },
  394. "function": "check_each_article",
  395. "status": "fail",
  396. }
  397. )
  398. if fail_list:
  399. await feishu_robot.bot(title="更新文章,获取detail失败", detail=fail_list)
  400. current_hour = datetime.datetime.now().hour
  401. if current_hour >= 21:
  402. await self.fallback_mechanism()