recycle_outside_account_articles.py 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285
  1. import time, json
  2. import datetime
  3. import traceback
  4. import urllib.parse
  5. from tqdm.asyncio import tqdm
  6. from .recycle_daily_publish_articles import UpdateRootSourceIdAndUpdateTimeTask
  7. from .recycle_daily_publish_articles import Const
  8. from applications.crawler.wechat import get_article_list_from_account
  9. from applications.crawler.wechat import get_article_detail
  10. from applications.pipeline import insert_outside_article_into_recycle_pool
  11. from applications.api import feishu_robot
  12. class RecycleOutsideAccountArticlesTask(Const):
  13. def __init__(self, pool, log_client, date_string):
  14. self.pool = pool
  15. self.log_client = log_client
  16. self.date_string = date_string
  17. async def get_outside_accounts(self):
  18. query = """
  19. select
  20. t2.group_source_name as account_source,
  21. t3.name as name,
  22. t3.gh_id as gh_id
  23. from wx_statistics_group_source t1
  24. join wx_statistics_group_source_account t2 on t2.group_source_name = t1.account_source_name
  25. join publish_account t3 on t3.id = t2.account_id
  26. where
  27. t1.mode_type = '代运营服务号' and
  28. (
  29. t2.group_source_name like '%云誉%'
  30. or t2.group_source_name like '%微小盟%'
  31. or t2.group_source_name like '%阿雅达%'
  32. or t2.group_source_name like '%创易%'
  33. )
  34. and t3.status = 1 and t3.name != '';
  35. """
  36. return await self.pool.async_fetch(query=query, db_name="aigc")
  37. async def recycle_single_account(self, account):
  38. """recycle single account"""
  39. query = """
  40. select max(update_time) as publish_timestamp \
  41. from outside_account_articles
  42. where gh_id = %s;
  43. """
  44. response = await self.pool.async_fetch(query=query, params=(account["gh_id"],))
  45. if response:
  46. max_publish_timestamp = response[0]["publish_timestamp"]
  47. else:
  48. max_publish_timestamp = int(time.time()) - self.NEW_ACCOUNT_CRAWL_PERIOD
  49. cursor = None
  50. while True:
  51. response = await get_article_list_from_account(
  52. account_id=account["gh_id"], index=cursor
  53. )
  54. response_code = response["code"]
  55. match response_code:
  56. case self.ACCOUNT_FORBIDDEN_CODE:
  57. # await feishu_robot.bot(
  58. # title="发布账号封禁",
  59. # detail={
  60. # "账号名称": account["name"],
  61. # "账号id": account["gh_id"],
  62. # },
  63. # )
  64. return
  65. case self.ARTICLE_SUCCESS_CODE:
  66. msg_list = response.get("data", {}).get("data", [])
  67. if not msg_list:
  68. return
  69. await insert_outside_article_into_recycle_pool(
  70. self.pool, self.log_client, msg_list, account
  71. )
  72. # check last article
  73. last_article = msg_list[-1]
  74. last_publish_timestamp = last_article["AppMsg"]["BaseInfo"][
  75. "UpdateTime"
  76. ]
  77. if last_publish_timestamp <= max_publish_timestamp:
  78. return
  79. cursor = response["data"].get("next_cursor")
  80. if not cursor:
  81. return
  82. case self.CRAWL_CRASH_CODE:
  83. await self.log_client.log(
  84. contents={
  85. "task": "recycle_daily_publish_articles",
  86. "data": {
  87. "gh_id": account["gh_id"],
  88. },
  89. "message": "爬虫挂掉",
  90. "status": "fail",
  91. }
  92. )
  93. case _:
  94. return
  95. async def deal(self):
  96. subscription_accounts = await self.get_outside_accounts()
  97. for account in tqdm(subscription_accounts, desc="recycle each account"):
  98. try:
  99. await self.recycle_single_account(account)
  100. except Exception as e:
  101. print(
  102. f"{account['name']}\t{account['gh_id']}: recycle account error:", e
  103. )
  104. class UpdateOutsideRootSourceIdAndUpdateTimeTask(UpdateRootSourceIdAndUpdateTimeTask):
  105. def __init__(self, pool, log_client):
  106. super().__init__(pool, log_client)
  107. async def get_outside_article_list_v2(self) -> list[dict]:
  108. query = """
  109. select content_url, wx_sn
  110. from outside_account_articles where publish_timestamp in %s
  111. order by update_time desc;
  112. """
  113. article_list = await self.pool.async_fetch(
  114. query=query, params=(tuple([0, -1]),)
  115. )
  116. return article_list
  117. async def check_each_article(self, article: dict):
  118. url = article["content_url"]
  119. wx_sn = article["wx_sn"]
  120. try:
  121. response = await get_article_detail(url)
  122. response_code = response["code"]
  123. if response_code == self.ARTICLE_DELETE_CODE:
  124. publish_timestamp_s = self.DELETE_STATUS
  125. root_source_id_list = []
  126. elif response_code == self.ARTICLE_ILLEGAL_CODE:
  127. publish_timestamp_s = self.ILLEGAL_STATUS
  128. root_source_id_list = []
  129. elif response_code == self.ARTICLE_SUCCESS_CODE:
  130. data = response["data"]["data"]
  131. publish_timestamp_ms = data["publish_timestamp"]
  132. publish_timestamp_s = int(publish_timestamp_ms / 1000)
  133. mini_program = data.get("mini_program", [])
  134. if mini_program:
  135. root_source_id_list = [
  136. urllib.parse.parse_qs(urllib.parse.unquote(i["path"])).get(
  137. "rootSourceId", [""]
  138. )[0]
  139. for i in mini_program
  140. ]
  141. else:
  142. root_source_id_list = []
  143. else:
  144. publish_timestamp_s = self.UNKNOWN_STATUS
  145. root_source_id_list = []
  146. except Exception as e:
  147. publish_timestamp_s = self.REQUEST_FAIL_STATUS
  148. root_source_id_list = None
  149. error_msg = traceback.format_exc()
  150. await self.log_client.log(
  151. contents={
  152. "task": "get_official_article_detail",
  153. "data": {
  154. "url": url,
  155. "wx_sn": wx_sn,
  156. "error_msg": error_msg,
  157. "error": str(e),
  158. },
  159. "function": "check_each_article",
  160. "status": "fail",
  161. }
  162. )
  163. query = """
  164. update outside_account_articles set publish_timestamp = %s, root_source_id_list = %s
  165. where wx_sn = %s;
  166. """
  167. await self.pool.async_save(
  168. query=query,
  169. params=(
  170. publish_timestamp_s,
  171. json.dumps(root_source_id_list, ensure_ascii=False),
  172. wx_sn,
  173. ),
  174. )
  175. if publish_timestamp_s == self.REQUEST_FAIL_STATUS:
  176. article["wx_sn"] = wx_sn
  177. return article
  178. else:
  179. return None
  180. async def fallback_mechanism(self):
  181. # # 通过msgId 来修改publish_timestamp
  182. # update_sql = f"""
  183. # update outside_account_articles oaa
  184. # join (
  185. # select gh_id, app_msg_id, max(publish_timestamp) as publish_timestamp
  186. # from outside_account_articles
  187. # where publish_timestamp > %s
  188. # group by gh_id, app_msg_id
  189. # ) vv on oaa.app_msg_id = vv.app_msg_id and oaa.gh_id = vv.gh_id
  190. # set oaa.publish_timestamp = vv.publish_timestamp
  191. # where oaa.publish_timestamp <= %s;
  192. # """
  193. # affected_rows_1 = await self.pool.async_save(query=update_sql, params=(0, 0))
  194. # 若还是无 publish_timestamp,用update_time当作 publish_timestamp
  195. update_sql_2 = f"""
  196. update outside_account_articles
  197. set publish_timestamp = update_time
  198. where publish_timestamp < %s;
  199. """
  200. affected_rows_2 = await self.pool.async_save(query=update_sql_2, params=(0,))
  201. if affected_rows_2:
  202. await feishu_robot.bot(
  203. title="执行兜底修改发布时间戳",
  204. detail={
  205. # "通过msgId修改": affected_rows_1,
  206. "通过create_timestamp修改": affected_rows_2,
  207. },
  208. mention=False,
  209. )
  210. async def deal(self):
  211. task_list = await self.get_outside_article_list_v2()
  212. for task in tqdm(task_list, desc="get article detail step1: "):
  213. try:
  214. await self.check_each_article(task)
  215. except Exception as e:
  216. try:
  217. await self.log_client.log(
  218. contents={
  219. "task": "get_official_article_detail_step1",
  220. "data": {
  221. "detail": {
  222. "url": task["ContentUrl"],
  223. "wx_sn": task["wx_sn"],
  224. },
  225. "error_msg": traceback.format_exc(),
  226. "error": str(e),
  227. },
  228. "function": "check_each_article",
  229. "status": "fail",
  230. }
  231. )
  232. except Exception as e:
  233. print(e)
  234. print(traceback.format_exc())
  235. # # process_failed_task_reproduce
  236. # fail_tasks = await self.get_article_list()
  237. # fail_list = []
  238. # for fail_task in tqdm(fail_tasks, desc="get article detail step2: "):
  239. # try:
  240. # res = await self.check_each_article(fail_task)
  241. # if res:
  242. # fail_list.append(res)
  243. # except Exception as e:
  244. # await self.log_client.log(
  245. # contents={
  246. # "task": "get_official_article_detail_step2",
  247. # "data": {
  248. # "detail": {
  249. # "url": fail_task["ContentUrl"],
  250. # "wx_sn": fail_task["wx_sn"].decode("utf-8"),
  251. # },
  252. # "error_msg": traceback.format_exc(),
  253. # "error": str(e),
  254. # },
  255. # "function": "check_each_article",
  256. # "status": "fail",
  257. # }
  258. # )
  259. # if fail_list:
  260. # await feishu_robot.bot(title="更新文章,获取detail失败", detail=fail_list)
  261. #
  262. # current_hour = datetime.datetime.now().hour
  263. # if current_hour >= 21:
  264. # await self.fallback_mechanism()