fwh_data_recycle.py 20 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538
  1. import json
  2. import time
  3. import urllib.parse
  4. from datetime import datetime
  5. from typing import Optional
  6. from tqdm import tqdm
  7. from pymysql.cursors import DictCursor
  8. from applications.api import FeishuBotApi
  9. from applications.db import DatabaseConnector
  10. from applications.utils import str_to_md5, days_remaining_in_month
  11. from cold_start.crawler.wechat import get_article_detail
  12. from config import denet_config, long_articles_config, piaoquan_crawler_config
  13. class FwhDataRecycle:
  14. RECYCLE_INIT_STATUS = 0
  15. RECYCLE_PROCESSING_STATUS = 1
  16. RECYCLE_SUCCESS_STATUS = 2
  17. RECYCLE_FAILED_STATUS = 99
  18. PUBLISH_SUCCESS_STATUS = 2
  19. STAT_PERIOD = 3 * 24 * 3600
  20. ARTICLE_ILLEGAL_CODE = 25012
  21. NOT_USED_SERVER_ACCOUNT = {
  22. "gh_84e744b16b3a",
  23. "gh_5855bed97938",
  24. "gh_61a72b720de3"
  25. }
  26. def __init__(self):
  27. self.denet_client = DatabaseConnector(denet_config)
  28. self.denet_client.connect()
  29. self.long_articles_client = DatabaseConnector(long_articles_config)
  30. self.long_articles_client.connect()
  31. self.piaoquan_client = DatabaseConnector(piaoquan_crawler_config)
  32. self.piaoquan_client.connect()
  33. self.feishu_robot = FeishuBotApi()
  34. def get_group_server_accounts(self):
  35. fetch_query = f"""
  36. select gzh_id from article_gzh_developer;
  37. """
  38. fetch_response = self.piaoquan_client.fetch(fetch_query, cursor_type=DictCursor)
  39. gh_id_list = [i["gzh_id"] for i in fetch_response if i["gzh_id"] not in self.NOT_USED_SERVER_ACCOUNT]
  40. # gh_id_list = ['gh_5e543853d8f0']
  41. return gh_id_list
  42. def get_server_account_name(self, gh_id: str) -> Optional[str]:
  43. fetch_query = f"""
  44. select account_name from long_articles_group_send_result where gh_id = %s limit 1;
  45. """
  46. fetch_response = self.long_articles_client.fetch(
  47. fetch_query, cursor_type=DictCursor, params=(gh_id,)
  48. )
  49. account_name = fetch_response[0]["account_name"] if fetch_response else None
  50. return account_name
  51. def illegal_article_bot(
  52. self,
  53. account_name: str,
  54. gh_id: str,
  55. group_id: str,
  56. illegal_msg: str,
  57. publish_date: str,
  58. article_title: str,
  59. ):
  60. self.feishu_robot.bot(
  61. title="服务号文章违规告警,请前往微信公众平台处理",
  62. detail={
  63. "account_name": account_name,
  64. "gh_id": gh_id,
  65. "group_id": group_id,
  66. "illegal_msg": illegal_msg,
  67. "publish_date": str(publish_date),
  68. "article_title": article_title,
  69. },
  70. env="server_account_publish_monitor",
  71. )
  72. class FwhGroupPublishRecordManager(FwhDataRecycle):
  73. def get_published_articles(self):
  74. fetch_query = f"""
  75. select id, publish_content_id, gh_id, user_group_id
  76. from long_articles_group_send_result
  77. where status = %s and recycle_status = %s;
  78. """
  79. fetch_response = self.long_articles_client.fetch(
  80. query=fetch_query,
  81. cursor_type=DictCursor,
  82. params=(self.PUBLISH_SUCCESS_STATUS, self.RECYCLE_INIT_STATUS),
  83. )
  84. return fetch_response
  85. def get_article_url_from_aigc_system(self, publish_content_id, user_group_id):
  86. sql = f"""
  87. select t1.publish_stage_url
  88. from publish_content_stage_url t1
  89. left join publish_content t2 on t1.publish_content_id = t2.id
  90. where t1.publish_content_id = %s and t1.user_group_id = %s;
  91. """
  92. article_info = self.denet_client.fetch(
  93. sql,
  94. cursor_type=DictCursor,
  95. params=(publish_content_id, user_group_id),
  96. )
  97. if article_info:
  98. return article_info[0]
  99. else:
  100. return None
  101. def update_recycle_status(self, record_id, ori_status, new_status):
  102. update_query = f"""
  103. update long_articles_group_send_result
  104. set recycle_status = %s
  105. where id = %s and recycle_status = %s;
  106. """
  107. return self.long_articles_client.save(
  108. update_query, (new_status, record_id, ori_status)
  109. )
  110. def set_article_url(self, record_id, article_url):
  111. update_query = f"""
  112. update long_articles_group_send_result
  113. set url = %s, recycle_status = %s
  114. where id = %s and recycle_status = %s;
  115. """
  116. return self.long_articles_client.save(
  117. query=update_query,
  118. params=(
  119. article_url,
  120. self.RECYCLE_SUCCESS_STATUS,
  121. record_id,
  122. self.RECYCLE_PROCESSING_STATUS,
  123. ),
  124. )
  125. def deal(self):
  126. publish_records = self.get_published_articles()
  127. for publish_record in tqdm(publish_records):
  128. publish_content_id = publish_record["publish_content_id"]
  129. record_id = publish_record["id"]
  130. group_id = publish_record["user_group_id"]
  131. # lock
  132. self.update_recycle_status(
  133. record_id, self.RECYCLE_INIT_STATUS, self.RECYCLE_PROCESSING_STATUS
  134. )
  135. publish_call_back_info = self.get_article_url_from_aigc_system(
  136. publish_content_id, group_id
  137. )
  138. if publish_call_back_info:
  139. article_url = publish_call_back_info["publish_stage_url"]
  140. if article_url:
  141. # set record and unlock
  142. self.set_article_url(record_id, article_url)
  143. else:
  144. # unlock
  145. self.update_recycle_status(
  146. record_id,
  147. self.RECYCLE_PROCESSING_STATUS,
  148. self.RECYCLE_INIT_STATUS,
  149. )
  150. else:
  151. # unlock
  152. self.update_recycle_status(
  153. record_id, self.RECYCLE_PROCESSING_STATUS, self.RECYCLE_INIT_STATUS
  154. )
  155. class SaveFwhDataToDatabase(FwhDataRecycle):
  156. def update_article_read_cnt(self, wx_sn, new_read_cnt):
  157. """
  158. update article read cnt
  159. """
  160. if new_read_cnt <= 0:
  161. return 0
  162. update_query = f"""
  163. update official_articles_v2
  164. set show_view_count = %s
  165. where wx_sn = %s;
  166. """
  167. return self.piaoquan_client.save(update_query, (new_read_cnt, wx_sn))
  168. def save_data_to_database(self, article):
  169. """
  170. save data to db
  171. """
  172. insert_query = f"""
  173. insert into official_articles_v2
  174. (ghId, accountName, appMsgId, title, Type, createTime, updateTime, ItemIndex, ContentUrl, show_view_count,
  175. wx_sn, title_md5, article_group, channel_content_id, root_source_id_list, publish_timestamp)
  176. values (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s);
  177. """
  178. return self.piaoquan_client.save(insert_query, article)
  179. def get_stat_published_articles(self, gh_id):
  180. earliest_timestamp = int(time.time()) - self.STAT_PERIOD
  181. fetch_query = f"""
  182. select publish_date, account_name, gh_id, user_group_id, url, publish_timestamp, content_id
  183. from long_articles_group_send_result
  184. where gh_id = %s and recycle_status = %s and create_time > %s;
  185. """
  186. earliest_time = datetime.fromtimestamp(earliest_timestamp).strftime(
  187. "%Y-%m-%d %H:%M:%S"
  188. )
  189. return self.long_articles_client.fetch(
  190. fetch_query,
  191. DictCursor,
  192. (gh_id, self.RECYCLE_SUCCESS_STATUS, earliest_time),
  193. )
  194. def process_each_account_data(self, account_published_article_list):
  195. if not account_published_article_list:
  196. return
  197. for article in account_published_article_list:
  198. account_name = article["account_name"]
  199. gh_id = article["gh_id"]
  200. user_group_id = article["user_group_id"]
  201. url = article["url"]
  202. publish_date = article["publish_date"]
  203. content_id = article["content_id"]
  204. # get article detail info with spider
  205. try:
  206. article_detail_info = get_article_detail(
  207. url, is_count=True, is_cache=False
  208. )
  209. response_code = article_detail_info["code"]
  210. if response_code == self.ARTICLE_ILLEGAL_CODE:
  211. query = """
  212. SELECT article_title FROM long_articles_text WHERE content_id = %s;
  213. """
  214. article_title = self.long_articles_client.fetch(
  215. query=query,
  216. cursor_type=DictCursor,
  217. params=(content_id,),
  218. )
  219. if article_title:
  220. article_title = article_title[0]["article_title"]
  221. else:
  222. article_title = content_id
  223. self.illegal_article_bot(
  224. account_name=account_name,
  225. gh_id=gh_id,
  226. group_id=user_group_id,
  227. illegal_msg=article_detail_info["msg"],
  228. publish_date=publish_date,
  229. article_title=article_title,
  230. )
  231. time.sleep(1)
  232. content_url = article_detail_info["data"]["data"]["content_link"]
  233. app_msg_id = content_url.split("mid=")[-1].split("&")[0]
  234. wx_sn = content_url.split("sn=")[-1]
  235. publish_timestamp = int(
  236. article_detail_info["data"]["data"]["publish_timestamp"] / 1000
  237. )
  238. create_time = publish_timestamp
  239. update_time = publish_timestamp
  240. item_index = article_detail_info["data"]["data"]["item_index"]
  241. show_view_count = article_detail_info["data"]["data"]["view_count"]
  242. title = article_detail_info["data"]["data"]["title"]
  243. title_md5 = str_to_md5(title)
  244. channel_content_id = article_detail_info["data"]["data"][
  245. "channel_content_id"
  246. ]
  247. mini_program_info = article_detail_info["data"]["data"]["mini_program"]
  248. root_source_id_list = [
  249. urllib.parse.parse_qs(urllib.parse.unquote(i["path"]))[
  250. "rootSourceId"
  251. ][0]
  252. for i in mini_program_info
  253. ]
  254. root_source_id_list = json.dumps(root_source_id_list)
  255. try:
  256. self.save_data_to_database(
  257. article=(
  258. gh_id,
  259. account_name,
  260. app_msg_id,
  261. title,
  262. "9",
  263. create_time,
  264. update_time,
  265. item_index,
  266. url,
  267. show_view_count,
  268. wx_sn,
  269. title_md5,
  270. user_group_id,
  271. channel_content_id,
  272. root_source_id_list,
  273. publish_timestamp,
  274. )
  275. )
  276. except Exception as e:
  277. self.update_article_read_cnt(wx_sn, show_view_count)
  278. except Exception as e:
  279. print(f"article {url} is not available, skip it")
  280. print(e)
  281. def deal(self):
  282. account_id_list = self.get_group_server_accounts()
  283. for account_id in account_id_list:
  284. publish_articles = tqdm(
  285. self.get_stat_published_articles(account_id),
  286. desc=f"<crawling> {account_id}",
  287. )
  288. self.process_each_account_data(publish_articles)
  289. class FwhDataExportTemp(FwhDataRecycle):
  290. def get_publish_articles(self, gh_id):
  291. sql = f"""
  292. -- select accountName, title, article_group, ItemIndex, show_view_count, from_unixtime(createTime, '%Y-%m-%d'), root_source_id_list
  293. select accountName, ContentUrl, title, ItemIndex, from_unixtime(createTime, '%Y-%m-%d'), sum(show_view_count), group_concat(article_group) as 'group', root_source_id_list
  294. from official_articles_v2
  295. where from_unixtime(publish_timestamp) between '2025-06-09' and '2025-06-13'
  296. and ghId = '{gh_id}' and article_group is not null
  297. group by accountName, title, ItemIndex;
  298. """
  299. return self.piaoquan_client.fetch(query=sql, cursor_type=DictCursor)
  300. def get_fission_info(self, root_source_id_list):
  301. """
  302. 获取裂变信息
  303. """
  304. root_source_id_tuple = tuple(json.loads(root_source_id_list))
  305. query = f"""
  306. select sum(uv) as 'uv', sum(first_uv) as 'first_uv', sum(split_uv) as 'split_uv', sum(split0) as 'T+0_fission'
  307. from changwen_data_rootsourceid
  308. where root_source_id
  309. in %s;
  310. """
  311. return self.long_articles_client.fetch(
  312. query=query, cursor_type=DictCursor, params=(root_source_id_tuple,)
  313. )
  314. def get_fans_num(self, gh_id, group_id_tuple):
  315. sql = f"""
  316. select count(1) as 'fans_count'
  317. from article_user_group
  318. where gzh_id = %s and user_group_id in %s
  319. and is_delete = 0;
  320. """
  321. return self.piaoquan_client.fetch(
  322. query=sql, cursor_type=DictCursor, params=(gh_id, group_id_tuple)
  323. )
  324. def deal(self):
  325. import pandas as pd
  326. gh_id_list = self.get_group_server_accounts()
  327. L = []
  328. for gh_id in gh_id_list:
  329. publish_articles = self.get_publish_articles(gh_id)
  330. for article in publish_articles:
  331. try:
  332. group_id_tuple = tuple(article["group"].split(","))
  333. fans_count = self.get_fans_num(gh_id, group_id_tuple)[0][
  334. "fans_count"
  335. ]
  336. root_source_id_list = article["root_source_id_list"]
  337. fission_info = self.get_fission_info(root_source_id_list)
  338. article["uv"] = fission_info[0]["uv"]
  339. article["first_uv"] = fission_info[0]["first_uv"]
  340. article["split_uv"] = fission_info[0]["split_uv"]
  341. article["T+0_fission"] = fission_info[0]["T+0_fission"]
  342. article["fans_count"] = fans_count
  343. L.append(article)
  344. except Exception as e:
  345. print(f"article {article['ContentUrl']} is not available, skip it")
  346. df = pd.DataFrame(L)
  347. df.to_csv("temp2.csv", index=False)
  348. class FwhGroupPublishMonitor(FwhDataRecycle):
  349. def get_sent_fans(self, date_string: str, gh_id: str) -> int:
  350. """
  351. get the number of fans sent on the specified date
  352. """
  353. fetch_query = f"""
  354. select push_id, avg(sent_count) as 'total_sent_fans'
  355. from long_articles_group_send_result
  356. where publish_date = %s and gh_id = %s and status = %s
  357. group by push_id;
  358. """
  359. fetch_response = self.long_articles_client.fetch(
  360. fetch_query,
  361. cursor_type=DictCursor,
  362. params=(date_string, gh_id, self.PUBLISH_SUCCESS_STATUS),
  363. )
  364. fans_list = [i["total_sent_fans"] for i in fetch_response]
  365. return sum(fans_list) if fans_list else 0
  366. def get_remain_fans(self, gh_id: str):
  367. """
  368. get the number of fans remain
  369. """
  370. fetch_query = f"""
  371. select count(1) as 'remain_fans'
  372. from article_user_group
  373. where gzh_id = %s and is_delete = %s and remaining_count > %s;
  374. """
  375. fetch_response = self.piaoquan_client.fetch(
  376. fetch_query, cursor_type=DictCursor, params=(gh_id, 0, 0)
  377. )
  378. response = fetch_response[0]["remain_fans"]
  379. return response if response else 0
  380. def get_remain_publish_times(self, gh_id: str):
  381. """
  382. 获取剩余可发布次数
  383. """
  384. fetch_query = f"""
  385. select sum(remaining_count) as 'remain_publish_times'
  386. from article_user_group
  387. where gzh_id = %s and is_delete = %s;
  388. """
  389. fetch_response = self.piaoquan_client.fetch(
  390. fetch_query, cursor_type=DictCursor, params=(gh_id, 0)
  391. )
  392. response = fetch_response[0]["remain_publish_times"]
  393. return response if response else 0
  394. def get_remain_fans_and_publish_times(self, gh_id: str, date_string: str):
  395. """
  396. 获取发布前,该账号剩余的发布次数和粉丝数
  397. """
  398. fetch_query = f"""
  399. select fans_before_publish, publish_times_before_publish
  400. from fwh_daily_publish_detail
  401. where gh_id = %s and publish_date = %s;
  402. """
  403. fetch_response = self.piaoquan_client.fetch(
  404. fetch_query, cursor_type=DictCursor, params=(gh_id, date_string)
  405. )
  406. return fetch_response[0] if fetch_response else None
  407. def deal(self, date_string: str = datetime.today().strftime("%Y-%m-%d")):
  408. """
  409. monitor the publish record
  410. """
  411. now = datetime.now()
  412. if now.hour > 12:
  413. return
  414. gh_id_list = self.get_group_server_accounts()
  415. # get rest publish days
  416. remain_days = days_remaining_in_month()
  417. # get table columns
  418. columns = [
  419. self.feishu_robot.create_feishu_columns_sheet(
  420. sheet_type="plain_text",
  421. sheet_name="account_name",
  422. display_name="公众号名称",
  423. ),
  424. self.feishu_robot.create_feishu_columns_sheet(
  425. sheet_type="number", sheet_name="sent_fans", display_name="今日发布次数"
  426. ),
  427. self.feishu_robot.create_feishu_columns_sheet(
  428. sheet_type="number",
  429. sheet_name="rest_publish_times",
  430. display_name="发文前剩余发文次数",
  431. ),
  432. self.feishu_robot.create_feishu_columns_sheet(
  433. sheet_type="number",
  434. sheet_name="rest_publish_fans",
  435. display_name="发文前剩余发文粉丝数",
  436. ),
  437. self.feishu_robot.create_feishu_columns_sheet(
  438. sheet_type="number",
  439. sheet_name="remain_days",
  440. display_name="本月剩余天数",
  441. ),
  442. ]
  443. monitor_table = []
  444. for gh_id in gh_id_list:
  445. account_name = self.get_server_account_name(gh_id)
  446. # 获取今日发布粉丝数量
  447. sent_fans = self.get_sent_fans(date_string, gh_id)
  448. # # 获取剩余可发文人数
  449. # remain_fans = self.get_remain_fans(gh_id)
  450. #
  451. # # 获取剩余可发文次数
  452. # remain_publish_times = self.get_remain_publish_times(gh_id)
  453. detail = self.get_remain_fans_and_publish_times(
  454. gh_id, date_string
  455. )
  456. if not detail:
  457. self.feishu_robot.bot(
  458. title=f"{date_string}服务号发文详情",
  459. detail=f"{gh_id}--{account_name} 没有发布详情",
  460. env="server_account_publish_monitor",
  461. )
  462. continue
  463. remain_fans, remain_publish_times = detail['fans_before_publish'], detail['publish_times_before_publish']
  464. temp = {
  465. "account_name": account_name,
  466. "rest_publish_times": int(remain_publish_times),
  467. "rest_publish_fans": int(remain_fans),
  468. "remain_days": int(remain_days),
  469. "sent_fans": int(sent_fans),
  470. }
  471. monitor_table.append(temp)
  472. # print(monitor_table)
  473. # print(json.dumps(monitor_table, ensure_ascii=False, indent=4))
  474. # feishu bot
  475. self.feishu_robot.bot(
  476. title=f"{date_string}服务号发文详情",
  477. detail={"columns": columns, "rows": monitor_table},
  478. table=True,
  479. mention=False,
  480. env="server_account_publish_monitor",
  481. )