recycle_fwh_group_publish_articles.py 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431
  1. import asyncio
  2. import json
  3. import time
  4. import urllib.parse
  5. from datetime import datetime
  6. from typing import Optional, List, Dict
  7. from tqdm import tqdm
  8. from applications.api import feishu_robot
  9. from applications.utils import str_to_md5, days_remaining_in_month
  10. from applications.crawler.wechat import get_article_detail
  11. class RecycleFwhGroupPublishArticlesConst:
  12. # 状态Code
  13. INIT_STATUS = 0
  14. PROCESSING_STATUS = 1
  15. SUCCESS_STATUS = 2
  16. FAILED_STATUS = 99
  17. # 群发成功状态
  18. PUBLISH_SUCCESS_STATUS = 2
  19. # 阅读量更新区间
  20. STAT_PERIOD = 3 * 24 * 3600
  21. # 文章违规状态
  22. ARTICLE_ILLEGAL_CODE = 25012
  23. # 未使用的账号
  24. NOT_USED_SERVER_ACCOUNT = {"gh_84e744b16b3a", "gh_5855bed97938", "gh_61a72b720de3"}
  25. class RecycleFwhGroupPublishArticlesBase(RecycleFwhGroupPublishArticlesConst):
  26. def __init__(self, pool, log_client):
  27. self.pool = pool
  28. self.log_client = log_client
  29. async def get_server_group_publish_accounts(self) -> List[str]:
  30. query = "select gzh_id from article_gzh_developer;"
  31. fetch_response = await self.pool.async_fetch(
  32. query=query, db_name="piaoquan_crawler"
  33. )
  34. return [
  35. i["gzh_id"]
  36. for i in fetch_response
  37. if i["gzh_id"] not in self.NOT_USED_SERVER_ACCOUNT
  38. ]
  39. async def get_account_name(self, gh_id: str) -> Optional[str]:
  40. query = """select account_name from long_articles_group_send_result where gh_id = %s limit 1;"""
  41. fetch_response = await self.pool.async_fetch(query=query, params=(gh_id,))
  42. if fetch_response:
  43. return fetch_response[0]["account_name"]
  44. return None
  45. # 违规文章报警
  46. async def alert_illegal_article(
  47. self,
  48. gh_id: str,
  49. account_name: str,
  50. group_id: str,
  51. illegal_msg: str,
  52. publish_date: str,
  53. ):
  54. await feishu_robot.bot(
  55. title="服务号文章违规告警,请前往微信公众平台处理",
  56. detail={
  57. "account_name": account_name,
  58. "gh_id": gh_id,
  59. "group_id": group_id,
  60. "illegal_msg": illegal_msg,
  61. "publish_date": str(publish_date),
  62. },
  63. env="server_account_publish_monitor",
  64. )
  65. class RecordFwhGroupPublishArticles(RecycleFwhGroupPublishArticlesBase):
  66. # 获取服务号发文细节
  67. async def get_group_published_articles(self):
  68. query = """
  69. SELECT
  70. id, publish_content_id, gh_id, user_group_id
  71. FROM long_articles_group_send_result
  72. WHERE status = %s AND recycle_status = %s;
  73. """
  74. return await self.pool.async_fetch(
  75. query=query, params=(self.PUBLISH_SUCCESS_STATUS, self.INIT_STATUS)
  76. )
  77. # 通过回调结果获取分组群发文章的回调信息
  78. async def get_article_call_back_from_aigc(
  79. self, publish_content_id: str, user_group_id: str
  80. ) -> Optional[Dict]:
  81. query = """
  82. SELECT t1.publish_stage_url
  83. FROM publish_content_stage_url t1
  84. LEFT JOIN publish_content t2 ON t1.publish_content_id = t2.id
  85. WHERE t1.publish_content_id = %s AND t1.user_group_id = %s;
  86. """
  87. fetch_response = await self.pool.async_fetch(
  88. query=query, db_name="aigc", params=(publish_content_id, user_group_id)
  89. )
  90. if fetch_response:
  91. return fetch_response[0]
  92. return None
  93. # 更新文章回收状态
  94. async def update_recycle_status(
  95. self, record_id: int, ori_status: int, new_status: int
  96. ) -> int:
  97. query = """
  98. UPDATE long_articles_group_send_result
  99. SET recycle_status = %s
  100. WHERE id = %s AND recycle_status = %s;
  101. """
  102. return await self.pool.async_save(
  103. query=query, params=(new_status, record_id, ori_status)
  104. )
  105. # 为文章写入链接
  106. async def set_article_url(self, record_id: int, url: str) -> int:
  107. query = """
  108. UPDATE long_articles_group_send_result
  109. SET url = %s, recycle_status = %s
  110. WHERE id = %s and recycle_status = %s;
  111. """
  112. return await self.pool.async_save(
  113. query=query, params=(url, self.SUCCESS_STATUS, record_id, self.INIT_STATUS)
  114. )
  115. async def deal(self):
  116. group_published_articles = await self.get_group_published_articles()
  117. for record in tqdm(group_published_articles):
  118. record_id = record["id"]
  119. publish_content_id = record["publish_content_id"]
  120. user_group_id = record["user_group_id"]
  121. # lock task
  122. acquire_lock = await self.update_recycle_status(
  123. record_id, self.INIT_STATUS, self.PROCESSING_STATUS
  124. )
  125. if not acquire_lock:
  126. continue
  127. # get article link from aigc
  128. call_back_response = await self.get_article_call_back_from_aigc(
  129. publish_content_id, user_group_id
  130. )
  131. if call_back_response:
  132. article_link = call_back_response["publish_stage_url"]
  133. if article_link:
  134. await self.set_article_url(record_id, article_link)
  135. else:
  136. # unlock
  137. await self.update_recycle_status(
  138. record_id, self.PROCESSING_STATUS, self.FAILED_STATUS
  139. )
  140. else:
  141. # unlock
  142. await self.update_recycle_status(
  143. record_id, self.PROCESSING_STATUS, self.FAILED_STATUS
  144. )
  145. class SaveFwhDataToDatabase(RecycleFwhGroupPublishArticlesBase):
  146. # 更新阅读量
  147. async def update_read_count(self, wx_sn, read_count):
  148. query = """
  149. UPDATE official_articles_v2
  150. SET show_view_count = %s
  151. WHERE wx_sn = %s;
  152. """
  153. return await self.pool.async_save(
  154. query=query, db_name="piaoquan_crawler", params=(read_count, wx_sn)
  155. )
  156. # 保存文章数据到数据库
  157. async def save_data_to_database(self, article):
  158. query = f"""
  159. insert into official_articles_v2
  160. (ghId, accountName, appMsgId, title, Type, createTime, updateTime, ItemIndex, ContentUrl, show_view_count,
  161. wx_sn, title_md5, article_group, channel_content_id, root_source_id_list, publish_timestamp)
  162. values (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s);
  163. """
  164. return await self.pool.async_save(
  165. query=query, params=article, db_name="piaoquan_crawler"
  166. )
  167. # 获取需要更新的分组群发文章
  168. async def get_stat_published_articles(self, gh_id):
  169. earliest_timestamp = int(time.time()) - self.STAT_PERIOD
  170. fetch_query = f"""
  171. select publish_date, account_name, gh_id, user_group_id, url, publish_timestamp
  172. from long_articles_group_send_result
  173. where gh_id = %s and recycle_status = %s and create_time > %s;
  174. """
  175. earliest_time = datetime.fromtimestamp(earliest_timestamp).strftime(
  176. "%Y-%m-%d %H:%M:%S"
  177. )
  178. return await self.pool.async_fetch(
  179. query=fetch_query, params=(gh_id, self.SUCCESS_STATUS, earliest_time)
  180. )
  181. # 处理某个账号
  182. async def process_each_fwh_account(self, gh_id):
  183. stat_articles = await self.get_stat_published_articles(gh_id)
  184. if not stat_articles:
  185. return
  186. for article in stat_articles:
  187. account_name = article["account_name"]
  188. user_group_id = article["user_group_id"]
  189. url = article["url"]
  190. publish_date = article["publish_date"]
  191. # get article detail info with spider
  192. try:
  193. article_detail_info = await get_article_detail(
  194. article_link=url, is_count=True, is_cache=False
  195. )
  196. response_code = article_detail_info["code"]
  197. if response_code == self.ARTICLE_ILLEGAL_CODE:
  198. await self.alert_illegal_article(
  199. account_name=account_name,
  200. gh_id=gh_id,
  201. group_id=user_group_id,
  202. illegal_msg=article_detail_info["msg"],
  203. publish_date=publish_date,
  204. )
  205. await asyncio.sleep(1)
  206. content_url = article_detail_info["data"]["data"]["content_link"]
  207. app_msg_id = content_url.split("mid=")[-1].split("&")[0]
  208. wx_sn = content_url.split("sn=")[-1]
  209. publish_timestamp = int(
  210. article_detail_info["data"]["data"]["publish_timestamp"] / 1000
  211. )
  212. create_time = publish_timestamp
  213. update_time = publish_timestamp
  214. item_index = article_detail_info["data"]["data"]["item_index"]
  215. show_view_count = article_detail_info["data"]["data"]["view_count"]
  216. title = article_detail_info["data"]["data"]["title"]
  217. title_md5 = str_to_md5(title)
  218. channel_content_id = article_detail_info["data"]["data"][
  219. "channel_content_id"
  220. ]
  221. mini_program_info = article_detail_info["data"]["data"]["mini_program"]
  222. root_source_id_list = [
  223. urllib.parse.parse_qs(urllib.parse.unquote(i["path"]))[
  224. "rootSourceId"
  225. ][0]
  226. for i in mini_program_info["item_list"]
  227. ]
  228. root_source_id_list = json.dumps(root_source_id_list)
  229. try:
  230. await self.save_data_to_database(
  231. article=(
  232. gh_id,
  233. account_name,
  234. app_msg_id,
  235. title,
  236. "9",
  237. create_time,
  238. update_time,
  239. item_index,
  240. url,
  241. show_view_count,
  242. wx_sn,
  243. title_md5,
  244. user_group_id,
  245. channel_content_id,
  246. root_source_id_list,
  247. publish_timestamp,
  248. )
  249. )
  250. except Exception as e:
  251. await self.update_read_count(
  252. wx_sn=wx_sn, read_count=show_view_count
  253. )
  254. except Exception as e:
  255. print(f"article {url} is not available, skip it")
  256. print(e)
  257. # deal
  258. async def deal(self):
  259. account_id_list = await self.get_server_group_publish_accounts()
  260. if not account_id_list:
  261. return
  262. for account_id in account_id_list:
  263. await self.process_each_fwh_account(gh_id=account_id)
  264. class FwhGroupPublishMonitor(RecycleFwhGroupPublishArticlesBase):
  265. # 获取指定日期,该账号下所有群发任务的平均粉丝数
  266. async def get_sent_fans(self, date_string: str, gh_id: str) -> int:
  267. query = """
  268. select push_id, avg(sent_count) as 'total_sent_fans'
  269. from long_articles_group_send_result
  270. where publish_date = %s and gh_id = %s and status = %s
  271. group by push_id;
  272. """
  273. fetch_response = await self.pool.async_fetch(
  274. query=query,
  275. params=(date_string, gh_id, self.PUBLISH_SUCCESS_STATUS),
  276. )
  277. fans_list = [i["total_sent_fans"] for i in fetch_response]
  278. return sum(fans_list) if fans_list else 0
  279. # 获取指定日期,该账号下所有群发任务的剩余粉丝数
  280. async def get_remain_fans(self, gh_id: str):
  281. query = """
  282. select count(1) as 'remain_fans'
  283. from article_user_group
  284. where gzh_id = %s and is_delete = %s and remaining_count > %s;
  285. """
  286. fetch_response = await self.pool.async_fetch(query=query, params=(gh_id, 0, 0))
  287. response = fetch_response[0]["remain_fans"]
  288. return response if response else 0
  289. async def get_remain_publish_times(self, gh_id: str):
  290. """
  291. 获取剩余可发布次数
  292. """
  293. fetch_query = """
  294. select sum(remaining_count) as 'remain_publish_times'
  295. from article_user_group
  296. where gzh_id = %s and is_delete = %s;
  297. """
  298. fetch_response = await self.pool.async_fetch(
  299. query=fetch_query, db_name="piaoquan_crawler", params=(gh_id, 0)
  300. )
  301. response = fetch_response[0]["remain_publish_times"]
  302. return response if response else 0
  303. async def get_remain_fans_and_publish_times(self, gh_id: str, date_string: str):
  304. """
  305. 获取发布前,该账号剩余的发布次数和粉丝数
  306. """
  307. fetch_query = """
  308. select fans_before_publish, publish_times_before_publish
  309. from fwh_daily_publish_detail
  310. where gh_id = %s and publish_date = %s;
  311. """
  312. fetch_response = await self.pool.async_fetch(
  313. query=fetch_query, db_name="piaoquan_crawler", params=(gh_id, date_string)
  314. )
  315. return fetch_response[0] if fetch_response else None
  316. async def deal(self, date_string: str = datetime.today().strftime("%Y-%m-%d")):
  317. """
  318. monitor the publish record
  319. """
  320. now = datetime.now()
  321. if now.hour > 12:
  322. return
  323. gh_id_list = await self.get_server_group_publish_accounts()
  324. if not gh_id_list:
  325. return
  326. # get rest publish days
  327. remain_days = days_remaining_in_month()
  328. # get table columns
  329. columns = [
  330. feishu_robot.create_feishu_columns_sheet(
  331. sheet_type="plain_text",
  332. sheet_name="account_name",
  333. display_name="公众号名称",
  334. ),
  335. feishu_robot.create_feishu_columns_sheet(
  336. sheet_type="number", sheet_name="sent_fans", display_name="今日发布次数"
  337. ),
  338. feishu_robot.create_feishu_columns_sheet(
  339. sheet_type="number",
  340. sheet_name="rest_publish_times",
  341. display_name="发文前剩余发文次数",
  342. ),
  343. feishu_robot.create_feishu_columns_sheet(
  344. sheet_type="number",
  345. sheet_name="rest_publish_fans",
  346. display_name="发文前剩余发文粉丝数",
  347. ),
  348. feishu_robot.create_feishu_columns_sheet(
  349. sheet_type="number",
  350. sheet_name="remain_days",
  351. display_name="本月剩余天数",
  352. ),
  353. ]
  354. monitor_table = []
  355. for gh_id in gh_id_list:
  356. account_name = await self.get_account_name(gh_id)
  357. sent_fans = await self.get_sent_fans(date_string, gh_id)
  358. detail = await self.get_remain_fans_and_publish_times(gh_id, date_string)
  359. if not detail:
  360. await feishu_robot.bot(
  361. title=f"{date_string}服务号发文详情",
  362. detail=f"{gh_id}--{account_name} 没有发布详情",
  363. env="server_account_publish_monitor",
  364. )
  365. continue
  366. remain_fans, remain_publish_times = (
  367. detail["fans_before_publish"],
  368. detail["publish_times_before_publish"],
  369. )
  370. temp = {
  371. "account_name": account_name,
  372. "rest_publish_times": int(remain_publish_times),
  373. "rest_publish_fans": int(remain_fans),
  374. "remain_days": int(remain_days),
  375. "sent_fans": int(sent_fans),
  376. }
  377. monitor_table.append(temp)
  378. await feishu_robot.bot(
  379. title=f"{date_string}服务号发文详情",
  380. detail={"columns": columns, "rows": monitor_table},
  381. table=True,
  382. mention=False,
  383. env="server_account_publish_monitor",
  384. )