fwh_data_recycle.py 19 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521
  1. import json
  2. import time
  3. import urllib.parse
  4. from datetime import datetime
  5. from typing import Optional
  6. from tqdm import tqdm
  7. from pymysql.cursors import DictCursor
  8. from applications.api import FeishuBotApi
  9. from applications.db import DatabaseConnector
  10. from applications.utils import str_to_md5, days_remaining_in_month
  11. from cold_start.crawler.wechat import get_article_detail
  12. from config import denet_config, long_articles_config, piaoquan_crawler_config
  13. class FwhDataRecycle:
  14. RECYCLE_INIT_STATUS = 0
  15. RECYCLE_PROCESSING_STATUS = 1
  16. RECYCLE_SUCCESS_STATUS = 2
  17. RECYCLE_FAILED_STATUS = 99
  18. PUBLISH_SUCCESS_STATUS = 2
  19. STAT_PERIOD = 3 * 24 * 3600
  20. ARTICLE_ILLEGAL_CODE = 25012
  21. NOT_USED_SERVER_ACCOUNT = {
  22. "gh_84e744b16b3a",
  23. "gh_5855bed97938",
  24. "gh_61a72b720de3"
  25. }
  26. def __init__(self):
  27. self.denet_client = DatabaseConnector(denet_config)
  28. self.denet_client.connect()
  29. self.long_articles_client = DatabaseConnector(long_articles_config)
  30. self.long_articles_client.connect()
  31. self.piaoquan_client = DatabaseConnector(piaoquan_crawler_config)
  32. self.piaoquan_client.connect()
  33. self.feishu_robot = FeishuBotApi()
  34. def get_group_server_accounts(self):
  35. fetch_query = f"""
  36. select gzh_id from article_gzh_developer;
  37. """
  38. fetch_response = self.piaoquan_client.fetch(fetch_query, cursor_type=DictCursor)
  39. gh_id_list = [i["gzh_id"] for i in fetch_response if i["gzh_id"] not in self.NOT_USED_SERVER_ACCOUNT]
  40. # gh_id_list = ['gh_5e543853d8f0']
  41. return gh_id_list
  42. def get_server_account_name(self, gh_id: str) -> Optional[str]:
  43. fetch_query = f"""
  44. select account_name from long_articles_group_send_result where gh_id = %s limit 1;
  45. """
  46. fetch_response = self.long_articles_client.fetch(
  47. fetch_query, cursor_type=DictCursor, params=(gh_id,)
  48. )
  49. account_name = fetch_response[0]["account_name"] if fetch_response else None
  50. return account_name
  51. def illegal_article_bot(
  52. self,
  53. account_name: str,
  54. gh_id: str,
  55. group_id: str,
  56. illegal_msg: str,
  57. publish_date: str,
  58. ):
  59. self.feishu_robot.bot(
  60. title="服务号文章违规告警,请前往微信公众平台处理",
  61. detail={
  62. "account_name": account_name,
  63. "gh_id": gh_id,
  64. "group_id": group_id,
  65. "illegal_msg": illegal_msg,
  66. "publish_date": str(publish_date),
  67. },
  68. env="server_account_publish_monitor",
  69. )
  70. class FwhGroupPublishRecordManager(FwhDataRecycle):
  71. def get_published_articles(self):
  72. fetch_query = f"""
  73. select id, publish_content_id, gh_id, user_group_id
  74. from long_articles_group_send_result
  75. where status = %s and recycle_status = %s;
  76. """
  77. fetch_response = self.long_articles_client.fetch(
  78. query=fetch_query,
  79. cursor_type=DictCursor,
  80. params=(self.PUBLISH_SUCCESS_STATUS, self.RECYCLE_INIT_STATUS),
  81. )
  82. return fetch_response
  83. def get_article_url_from_aigc_system(self, publish_content_id, user_group_id):
  84. sql = f"""
  85. select t1.publish_stage_url
  86. from publish_content_stage_url t1
  87. left join publish_content t2 on t1.publish_content_id = t2.id
  88. where t1.publish_content_id = %s and t1.user_group_id = %s;
  89. """
  90. article_info = self.denet_client.fetch(
  91. sql,
  92. cursor_type=DictCursor,
  93. params=(publish_content_id, user_group_id),
  94. )
  95. if article_info:
  96. return article_info[0]
  97. else:
  98. return None
  99. def update_recycle_status(self, record_id, ori_status, new_status):
  100. update_query = f"""
  101. update long_articles_group_send_result
  102. set recycle_status = %s
  103. where id = %s and recycle_status = %s;
  104. """
  105. return self.long_articles_client.save(
  106. update_query, (new_status, record_id, ori_status)
  107. )
  108. def set_article_url(self, record_id, article_url):
  109. update_query = f"""
  110. update long_articles_group_send_result
  111. set url = %s, recycle_status = %s
  112. where id = %s and recycle_status = %s;
  113. """
  114. return self.long_articles_client.save(
  115. query=update_query,
  116. params=(
  117. article_url,
  118. self.RECYCLE_SUCCESS_STATUS,
  119. record_id,
  120. self.RECYCLE_PROCESSING_STATUS,
  121. ),
  122. )
  123. def deal(self):
  124. publish_records = self.get_published_articles()
  125. for publish_record in tqdm(publish_records):
  126. publish_content_id = publish_record["publish_content_id"]
  127. record_id = publish_record["id"]
  128. group_id = publish_record["user_group_id"]
  129. # lock
  130. self.update_recycle_status(
  131. record_id, self.RECYCLE_INIT_STATUS, self.RECYCLE_PROCESSING_STATUS
  132. )
  133. publish_call_back_info = self.get_article_url_from_aigc_system(
  134. publish_content_id, group_id
  135. )
  136. if publish_call_back_info:
  137. article_url = publish_call_back_info["publish_stage_url"]
  138. if article_url:
  139. # set record and unlock
  140. self.set_article_url(record_id, article_url)
  141. else:
  142. # unlock
  143. self.update_recycle_status(
  144. record_id,
  145. self.RECYCLE_PROCESSING_STATUS,
  146. self.RECYCLE_INIT_STATUS,
  147. )
  148. else:
  149. # unlock
  150. self.update_recycle_status(
  151. record_id, self.RECYCLE_PROCESSING_STATUS, self.RECYCLE_INIT_STATUS
  152. )
  153. class SaveFwhDataToDatabase(FwhDataRecycle):
  154. def update_article_read_cnt(self, wx_sn, new_read_cnt):
  155. """
  156. update article read cnt
  157. """
  158. if new_read_cnt <= 0:
  159. return 0
  160. update_query = f"""
  161. update official_articles_v2
  162. set show_view_count = %s
  163. where wx_sn = %s;
  164. """
  165. return self.piaoquan_client.save(update_query, (new_read_cnt, wx_sn))
  166. def save_data_to_database(self, article):
  167. """
  168. save data to db
  169. """
  170. insert_query = f"""
  171. insert into official_articles_v2
  172. (ghId, accountName, appMsgId, title, Type, createTime, updateTime, ItemIndex, ContentUrl, show_view_count,
  173. wx_sn, title_md5, article_group, channel_content_id, root_source_id_list, publish_timestamp)
  174. values (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s);
  175. """
  176. return self.piaoquan_client.save(insert_query, article)
  177. def get_stat_published_articles(self, gh_id):
  178. earliest_timestamp = int(time.time()) - self.STAT_PERIOD
  179. fetch_query = f"""
  180. select publish_date, account_name, gh_id, user_group_id, url, publish_timestamp
  181. from long_articles_group_send_result
  182. where gh_id = %s and recycle_status = %s and create_time > %s;
  183. """
  184. earliest_time = datetime.fromtimestamp(earliest_timestamp).strftime(
  185. "%Y-%m-%d %H:%M:%S"
  186. )
  187. return self.long_articles_client.fetch(
  188. fetch_query,
  189. DictCursor,
  190. (gh_id, self.RECYCLE_SUCCESS_STATUS, earliest_time),
  191. )
  192. def process_each_account_data(self, account_published_article_list):
  193. if not account_published_article_list:
  194. return
  195. for article in account_published_article_list:
  196. account_name = article["account_name"]
  197. gh_id = article["gh_id"]
  198. user_group_id = article["user_group_id"]
  199. url = article["url"]
  200. publish_date = article["publish_date"]
  201. # get article detail info with spider
  202. try:
  203. article_detail_info = get_article_detail(
  204. url, is_count=True, is_cache=False
  205. )
  206. response_code = article_detail_info["code"]
  207. if response_code == self.ARTICLE_ILLEGAL_CODE:
  208. self.illegal_article_bot(
  209. account_name=account_name,
  210. gh_id=gh_id,
  211. group_id=user_group_id,
  212. illegal_msg=article_detail_info["msg"],
  213. publish_date=publish_date,
  214. )
  215. time.sleep(1)
  216. content_url = article_detail_info["data"]["data"]["content_link"]
  217. app_msg_id = content_url.split("mid=")[-1].split("&")[0]
  218. wx_sn = content_url.split("sn=")[-1]
  219. publish_timestamp = int(
  220. article_detail_info["data"]["data"]["publish_timestamp"] / 1000
  221. )
  222. create_time = publish_timestamp
  223. update_time = publish_timestamp
  224. item_index = article_detail_info["data"]["data"]["item_index"]
  225. show_view_count = article_detail_info["data"]["data"]["view_count"]
  226. title = article_detail_info["data"]["data"]["title"]
  227. title_md5 = str_to_md5(title)
  228. channel_content_id = article_detail_info["data"]["data"][
  229. "channel_content_id"
  230. ]
  231. mini_program_info = article_detail_info["data"]["data"]["mini_program"]
  232. root_source_id_list = [
  233. urllib.parse.parse_qs(urllib.parse.unquote(i["path"]))[
  234. "rootSourceId"
  235. ][0]
  236. for i in mini_program_info
  237. ]
  238. root_source_id_list = json.dumps(root_source_id_list)
  239. try:
  240. self.save_data_to_database(
  241. article=(
  242. gh_id,
  243. account_name,
  244. app_msg_id,
  245. title,
  246. "9",
  247. create_time,
  248. update_time,
  249. item_index,
  250. url,
  251. show_view_count,
  252. wx_sn,
  253. title_md5,
  254. user_group_id,
  255. channel_content_id,
  256. root_source_id_list,
  257. publish_timestamp,
  258. )
  259. )
  260. except Exception as e:
  261. self.update_article_read_cnt(wx_sn, show_view_count)
  262. except Exception as e:
  263. print(f"article {url} is not available, skip it")
  264. print(e)
  265. def deal(self):
  266. account_id_list = self.get_group_server_accounts()
  267. for account_id in account_id_list:
  268. publish_articles = tqdm(
  269. self.get_stat_published_articles(account_id),
  270. desc=f"<crawling> {account_id}",
  271. )
  272. self.process_each_account_data(publish_articles)
  273. class FwhDataExportTemp(FwhDataRecycle):
  274. def get_publish_articles(self, gh_id):
  275. sql = f"""
  276. -- select accountName, title, article_group, ItemIndex, show_view_count, from_unixtime(createTime, '%Y-%m-%d'), root_source_id_list
  277. select accountName, ContentUrl, title, ItemIndex, from_unixtime(createTime, '%Y-%m-%d'), sum(show_view_count), group_concat(article_group) as 'group', root_source_id_list
  278. from official_articles_v2
  279. where from_unixtime(publish_timestamp) between '2025-06-09' and '2025-06-13'
  280. and ghId = '{gh_id}' and article_group is not null
  281. group by accountName, title, ItemIndex;
  282. """
  283. return self.piaoquan_client.fetch(query=sql, cursor_type=DictCursor)
  284. def get_fission_info(self, root_source_id_list):
  285. """
  286. 获取裂变信息
  287. """
  288. root_source_id_tuple = tuple(json.loads(root_source_id_list))
  289. query = f"""
  290. select sum(uv) as 'uv', sum(first_uv) as 'first_uv', sum(split_uv) as 'split_uv', sum(split0) as 'T+0_fission'
  291. from changwen_data_rootsourceid
  292. where root_source_id
  293. in %s;
  294. """
  295. return self.long_articles_client.fetch(
  296. query=query, cursor_type=DictCursor, params=(root_source_id_tuple,)
  297. )
  298. def get_fans_num(self, gh_id, group_id_tuple):
  299. sql = f"""
  300. select count(1) as 'fans_count'
  301. from article_user_group
  302. where gzh_id = %s and user_group_id in %s
  303. and is_delete = 0;
  304. """
  305. return self.piaoquan_client.fetch(
  306. query=sql, cursor_type=DictCursor, params=(gh_id, group_id_tuple)
  307. )
  308. def deal(self):
  309. import pandas as pd
  310. gh_id_list = self.get_group_server_accounts()
  311. L = []
  312. for gh_id in gh_id_list:
  313. publish_articles = self.get_publish_articles(gh_id)
  314. for article in publish_articles:
  315. try:
  316. group_id_tuple = tuple(article["group"].split(","))
  317. fans_count = self.get_fans_num(gh_id, group_id_tuple)[0][
  318. "fans_count"
  319. ]
  320. root_source_id_list = article["root_source_id_list"]
  321. fission_info = self.get_fission_info(root_source_id_list)
  322. article["uv"] = fission_info[0]["uv"]
  323. article["first_uv"] = fission_info[0]["first_uv"]
  324. article["split_uv"] = fission_info[0]["split_uv"]
  325. article["T+0_fission"] = fission_info[0]["T+0_fission"]
  326. article["fans_count"] = fans_count
  327. L.append(article)
  328. except Exception as e:
  329. print(f"article {article['ContentUrl']} is not available, skip it")
  330. df = pd.DataFrame(L)
  331. df.to_csv("temp2.csv", index=False)
  332. class FwhGroupPublishMonitor(FwhDataRecycle):
  333. def get_sent_fans(self, date_string: str, gh_id: str) -> int:
  334. """
  335. get the number of fans sent on the specified date
  336. """
  337. fetch_query = f"""
  338. select push_id, avg(sent_count) as 'total_sent_fans'
  339. from long_articles_group_send_result
  340. where publish_date = %s and gh_id = %s and status = %s
  341. group by push_id;
  342. """
  343. fetch_response = self.long_articles_client.fetch(
  344. fetch_query,
  345. cursor_type=DictCursor,
  346. params=(date_string, gh_id, self.PUBLISH_SUCCESS_STATUS),
  347. )
  348. fans_list = [i["total_sent_fans"] for i in fetch_response]
  349. return sum(fans_list) if fans_list else 0
  350. def get_remain_fans(self, gh_id: str):
  351. """
  352. get the number of fans remain
  353. """
  354. fetch_query = f"""
  355. select count(1) as 'remain_fans'
  356. from article_user_group
  357. where gzh_id = %s and is_delete = %s and remaining_count > %s;
  358. """
  359. fetch_response = self.piaoquan_client.fetch(
  360. fetch_query, cursor_type=DictCursor, params=(gh_id, 0, 0)
  361. )
  362. response = fetch_response[0]["remain_fans"]
  363. return response if response else 0
  364. def get_remain_publish_times(self, gh_id: str):
  365. """
  366. 获取剩余可发布次数
  367. """
  368. fetch_query = f"""
  369. select sum(remaining_count) as 'remain_publish_times'
  370. from article_user_group
  371. where gzh_id = %s and is_delete = %s;
  372. """
  373. fetch_response = self.piaoquan_client.fetch(
  374. fetch_query, cursor_type=DictCursor, params=(gh_id, 0)
  375. )
  376. response = fetch_response[0]["remain_publish_times"]
  377. return response if response else 0
  378. def get_remain_fans_and_publish_times(self, gh_id: str, date_string: str):
  379. """
  380. 获取发布前,该账号剩余的发布次数和粉丝数
  381. """
  382. fetch_query = f"""
  383. select fans_before_publish, publish_times_before_publish
  384. from fwh_daily_publish_detail
  385. where gh_id = %s and publish_date = %s;
  386. """
  387. fetch_response = self.piaoquan_client.fetch(
  388. fetch_query, cursor_type=DictCursor, params=(gh_id, date_string)
  389. )
  390. return fetch_response[0] if fetch_response else None
  391. def deal(self, date_string: str = datetime.today().strftime("%Y-%m-%d")):
  392. """
  393. monitor the publish record
  394. """
  395. now = datetime.now()
  396. if now.hour > 12:
  397. return
  398. gh_id_list = self.get_group_server_accounts()
  399. # get rest publish days
  400. remain_days = days_remaining_in_month()
  401. # get table columns
  402. columns = [
  403. self.feishu_robot.create_feishu_columns_sheet(
  404. sheet_type="plain_text",
  405. sheet_name="account_name",
  406. display_name="公众号名称",
  407. ),
  408. self.feishu_robot.create_feishu_columns_sheet(
  409. sheet_type="number", sheet_name="sent_fans", display_name="今日发布次数"
  410. ),
  411. self.feishu_robot.create_feishu_columns_sheet(
  412. sheet_type="number",
  413. sheet_name="rest_publish_times",
  414. display_name="发文前剩余发文次数",
  415. ),
  416. self.feishu_robot.create_feishu_columns_sheet(
  417. sheet_type="number",
  418. sheet_name="rest_publish_fans",
  419. display_name="发文前剩余发文粉丝数",
  420. ),
  421. self.feishu_robot.create_feishu_columns_sheet(
  422. sheet_type="number",
  423. sheet_name="remain_days",
  424. display_name="本月剩余天数",
  425. ),
  426. ]
  427. monitor_table = []
  428. for gh_id in gh_id_list:
  429. account_name = self.get_server_account_name(gh_id)
  430. # 获取今日发布粉丝数量
  431. sent_fans = self.get_sent_fans(date_string, gh_id)
  432. # # 获取剩余可发文人数
  433. # remain_fans = self.get_remain_fans(gh_id)
  434. #
  435. # # 获取剩余可发文次数
  436. # remain_publish_times = self.get_remain_publish_times(gh_id)
  437. detail = self.get_remain_fans_and_publish_times(
  438. gh_id, date_string
  439. )
  440. if not detail:
  441. self.feishu_robot.bot(
  442. title=f"{date_string}服务号发文详情",
  443. detail=f"{gh_id}--{account_name} 没有发布详情",
  444. env="server_account_publish_monitor",
  445. )
  446. continue
  447. remain_fans, remain_publish_times = detail['fans_before_publish'], detail['publish_times_before_publish']
  448. temp = {
  449. "account_name": account_name,
  450. "rest_publish_times": int(remain_publish_times),
  451. "rest_publish_fans": int(remain_fans),
  452. "remain_days": int(remain_days),
  453. "sent_fans": int(sent_fans),
  454. }
  455. monitor_table.append(temp)
  456. # print(monitor_table)
  457. # print(json.dumps(monitor_table, ensure_ascii=False, indent=4))
  458. # feishu bot
  459. self.feishu_robot.bot(
  460. title=f"{date_string}服务号发文详情",
  461. detail={"columns": columns, "rows": monitor_table},
  462. table=True,
  463. mention=False,
  464. env="server_account_publish_monitor",
  465. )