fwh_data_recycle.py 18 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493
  1. import json
  2. import time
  3. import urllib.parse
  4. from datetime import datetime
  5. from typing import Optional
  6. from tqdm import tqdm
  7. from pymysql.cursors import DictCursor
  8. from applications.api import FeishuBotApi
  9. from applications.db import DatabaseConnector
  10. from applications.utils import str_to_md5, days_remaining_in_month
  11. from cold_start.crawler.wechat import get_article_detail
  12. from config import denet_config, long_articles_config, piaoquan_crawler_config
  13. class FwhDataRecycle:
  14. RECYCLE_INIT_STATUS = 0
  15. RECYCLE_PROCESSING_STATUS = 1
  16. RECYCLE_SUCCESS_STATUS = 2
  17. RECYCLE_FAILED_STATUS = 99
  18. PUBLISH_SUCCESS_STATUS = 2
  19. STAT_PERIOD = 3 * 24 * 3600
  20. ARTICLE_ILLEGAL_CODE = 25012
  21. def __init__(self):
  22. self.denet_client = DatabaseConnector(denet_config)
  23. self.denet_client.connect()
  24. self.long_articles_client = DatabaseConnector(long_articles_config)
  25. self.long_articles_client.connect()
  26. self.piaoquan_client = DatabaseConnector(piaoquan_crawler_config)
  27. self.piaoquan_client.connect()
  28. self.feishu_robot = FeishuBotApi()
  29. def get_group_server_accounts(self):
  30. fetch_query = f"""
  31. select gzh_id from article_gzh_developer;
  32. """
  33. fetch_response = self.piaoquan_client.fetch(fetch_query, cursor_type=DictCursor)
  34. gh_id_list = [i["gzh_id"] for i in fetch_response]
  35. # gh_id_list = ['gh_5e543853d8f0']
  36. return gh_id_list
  37. def get_server_account_name(self, gh_id: str) -> Optional[str]:
  38. fetch_query = f"""
  39. select account_name from long_articles_group_send_result where gh_id = %s limit 1;
  40. """
  41. fetch_response = self.long_articles_client.fetch(
  42. fetch_query, cursor_type=DictCursor, params=(gh_id,)
  43. )
  44. account_name = fetch_response[0]["account_name"] if fetch_response else None
  45. return account_name
  46. def illegal_article_bot(
  47. self,
  48. account_name: str,
  49. gh_id: str,
  50. group_id: str,
  51. illegal_msg: str,
  52. publish_date: str,
  53. ):
  54. self.feishu_robot.bot(
  55. title="服务号文章违规告警,请前往微信公众平台处理",
  56. detail={
  57. "account_name": account_name,
  58. "gh_id": gh_id,
  59. "group_id": group_id,
  60. "illegal_msg": illegal_msg,
  61. "publish_date": str(publish_date),
  62. },
  63. env="server_account_publish_monitor",
  64. )
  65. class FwhGroupPublishRecordManager(FwhDataRecycle):
  66. def get_published_articles(self):
  67. fetch_query = f"""
  68. select id, publish_content_id, gh_id, user_group_id
  69. from long_articles_group_send_result
  70. where status = %s and recycle_status = %s;
  71. """
  72. fetch_response = self.long_articles_client.fetch(
  73. query=fetch_query,
  74. cursor_type=DictCursor,
  75. params=(self.PUBLISH_SUCCESS_STATUS, self.RECYCLE_INIT_STATUS),
  76. )
  77. return fetch_response
  78. def get_article_url_from_aigc_system(self, publish_content_id, user_group_id):
  79. sql = f"""
  80. select t1.publish_stage_url
  81. from publish_content_stage_url t1
  82. left join publish_content t2 on t1.publish_content_id = t2.id
  83. where t1.publish_content_id = %s and t1.user_group_id = %s;
  84. """
  85. article_info = self.denet_client.fetch(
  86. sql,
  87. cursor_type=DictCursor,
  88. params=(publish_content_id, user_group_id),
  89. )
  90. if article_info:
  91. return article_info[0]
  92. else:
  93. return None
  94. def update_recycle_status(self, record_id, ori_status, new_status):
  95. update_query = f"""
  96. update long_articles_group_send_result
  97. set recycle_status = %s
  98. where id = %s and recycle_status = %s;
  99. """
  100. return self.long_articles_client.save(
  101. update_query, (new_status, record_id, ori_status)
  102. )
  103. def set_article_url(self, record_id, article_url):
  104. update_query = f"""
  105. update long_articles_group_send_result
  106. set url = %s, recycle_status = %s
  107. where id = %s and recycle_status = %s;
  108. """
  109. return self.long_articles_client.save(
  110. query=update_query,
  111. params=(
  112. article_url,
  113. self.RECYCLE_SUCCESS_STATUS,
  114. record_id,
  115. self.RECYCLE_PROCESSING_STATUS,
  116. ),
  117. )
  118. def deal(self):
  119. publish_records = self.get_published_articles()
  120. for publish_record in tqdm(publish_records):
  121. publish_content_id = publish_record["publish_content_id"]
  122. record_id = publish_record["id"]
  123. group_id = publish_record["user_group_id"]
  124. # lock
  125. self.update_recycle_status(
  126. record_id, self.RECYCLE_INIT_STATUS, self.RECYCLE_PROCESSING_STATUS
  127. )
  128. publish_call_back_info = self.get_article_url_from_aigc_system(
  129. publish_content_id, group_id
  130. )
  131. if publish_call_back_info:
  132. article_url = publish_call_back_info["publish_stage_url"]
  133. if article_url:
  134. # set record and unlock
  135. self.set_article_url(record_id, article_url)
  136. else:
  137. # unlock
  138. self.update_recycle_status(
  139. record_id,
  140. self.RECYCLE_PROCESSING_STATUS,
  141. self.RECYCLE_INIT_STATUS,
  142. )
  143. else:
  144. # unlock
  145. self.update_recycle_status(
  146. record_id, self.RECYCLE_PROCESSING_STATUS, self.RECYCLE_INIT_STATUS
  147. )
  148. class SaveFwhDataToDatabase(FwhDataRecycle):
  149. def update_article_read_cnt(self, wx_sn, new_read_cnt):
  150. """
  151. update article read cnt
  152. """
  153. if new_read_cnt <= 0:
  154. return 0
  155. update_query = f"""
  156. update official_articles_v2
  157. set show_view_count = %s
  158. where wx_sn = %s;
  159. """
  160. return self.piaoquan_client.save(update_query, (new_read_cnt, wx_sn))
  161. def save_data_to_database(self, article):
  162. """
  163. save data to db
  164. """
  165. insert_query = f"""
  166. insert into official_articles_v2
  167. (ghId, accountName, appMsgId, title, Type, createTime, updateTime, ItemIndex, ContentUrl, show_view_count,
  168. wx_sn, title_md5, article_group, channel_content_id, root_source_id_list, publish_timestamp)
  169. values (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s);
  170. """
  171. return self.piaoquan_client.save(insert_query, article)
  172. def get_stat_published_articles(self, gh_id):
  173. earliest_timestamp = int(time.time()) - self.STAT_PERIOD
  174. fetch_query = f"""
  175. select publish_date, account_name, gh_id, user_group_id, url, publish_timestamp
  176. from long_articles_group_send_result
  177. where gh_id = %s and recycle_status = %s and create_time > %s;
  178. """
  179. earliest_time = datetime.fromtimestamp(earliest_timestamp).strftime(
  180. "%Y-%m-%d %H:%M:%S"
  181. )
  182. return self.long_articles_client.fetch(
  183. fetch_query,
  184. DictCursor,
  185. (gh_id, self.RECYCLE_SUCCESS_STATUS, earliest_time),
  186. )
  187. def process_each_account_data(self, account_published_article_list):
  188. if not account_published_article_list:
  189. return
  190. for article in account_published_article_list:
  191. account_name = article["account_name"]
  192. gh_id = article["gh_id"]
  193. user_group_id = article["user_group_id"]
  194. url = article["url"]
  195. publish_date = article["publish_date"]
  196. # get article detail info with spider
  197. try:
  198. article_detail_info = get_article_detail(
  199. url, is_count=True, is_cache=False
  200. )
  201. response_code = article_detail_info["code"]
  202. if response_code == self.ARTICLE_ILLEGAL_CODE:
  203. self.illegal_article_bot(
  204. account_name=account_name,
  205. gh_id=gh_id,
  206. group_id=user_group_id,
  207. illegal_msg=article_detail_info["msg"],
  208. publish_date=publish_date,
  209. )
  210. time.sleep(1)
  211. content_url = article_detail_info["data"]["data"]["content_link"]
  212. app_msg_id = content_url.split("mid=")[-1].split("&")[0]
  213. wx_sn = content_url.split("sn=")[-1]
  214. publish_timestamp = int(
  215. article_detail_info["data"]["data"]["publish_timestamp"] / 1000
  216. )
  217. create_time = publish_timestamp
  218. update_time = publish_timestamp
  219. item_index = article_detail_info["data"]["data"]["item_index"]
  220. show_view_count = article_detail_info["data"]["data"]["view_count"]
  221. title = article_detail_info["data"]["data"]["title"]
  222. title_md5 = str_to_md5(title)
  223. channel_content_id = article_detail_info["data"]["data"][
  224. "channel_content_id"
  225. ]
  226. mini_program_info = article_detail_info["data"]["data"]["mini_program"]
  227. root_source_id_list = [
  228. urllib.parse.parse_qs(urllib.parse.unquote(i["path"]))[
  229. "rootSourceId"
  230. ][0]
  231. for i in mini_program_info
  232. ]
  233. root_source_id_list = json.dumps(root_source_id_list)
  234. try:
  235. self.save_data_to_database(
  236. article=(
  237. gh_id,
  238. account_name,
  239. app_msg_id,
  240. title,
  241. "9",
  242. create_time,
  243. update_time,
  244. item_index,
  245. content_url,
  246. show_view_count,
  247. wx_sn,
  248. title_md5,
  249. user_group_id,
  250. channel_content_id,
  251. root_source_id_list,
  252. publish_timestamp,
  253. )
  254. )
  255. except Exception as e:
  256. self.update_article_read_cnt(wx_sn, show_view_count)
  257. except Exception as e:
  258. print(f"article {url} is not available, skip it")
  259. print(e)
  260. def deal(self):
  261. account_id_list = self.get_group_server_accounts()
  262. for account_id in account_id_list:
  263. publish_articles = tqdm(
  264. self.get_stat_published_articles(account_id),
  265. desc=f"<crawling> {account_id}",
  266. )
  267. self.process_each_account_data(publish_articles)
  268. class FwhDataExportTemp(FwhDataRecycle):
  269. def get_publish_articles(self, gh_id):
  270. sql = f"""
  271. -- select accountName, title, article_group, ItemIndex, show_view_count, from_unixtime(createTime, '%Y-%m-%d'), root_source_id_list
  272. select accountName, ContentUrl, title, ItemIndex, from_unixtime(createTime, '%Y-%m-%d'), sum(show_view_count), group_concat(article_group) as 'group', root_source_id_list
  273. from official_articles_v2
  274. where from_unixtime(publish_timestamp) between '2025-06-09' and '2025-06-13'
  275. and ghId = '{gh_id}' and article_group is not null
  276. group by accountName, title, ItemIndex;
  277. """
  278. return self.piaoquan_client.fetch(query=sql, cursor_type=DictCursor)
  279. def get_fission_info(self, root_source_id_list):
  280. """
  281. 获取裂变信息
  282. """
  283. root_source_id_tuple = tuple(json.loads(root_source_id_list))
  284. query = f"""
  285. select sum(uv) as 'uv', sum(first_uv) as 'first_uv', sum(split_uv) as 'split_uv', sum(split0) as 'T+0_fission'
  286. from changwen_data_rootsourceid
  287. where root_source_id
  288. in %s;
  289. """
  290. return self.long_articles_client.fetch(
  291. query=query, cursor_type=DictCursor, params=(root_source_id_tuple,)
  292. )
  293. def get_fans_num(self, gh_id, group_id_tuple):
  294. sql = f"""
  295. select count(1) as 'fans_count'
  296. from article_user_group
  297. where gzh_id = %s and user_group_id in %s
  298. and is_delete = 0;
  299. """
  300. return self.piaoquan_client.fetch(
  301. query=sql, cursor_type=DictCursor, params=(gh_id, group_id_tuple)
  302. )
  303. def deal(self):
  304. import pandas as pd
  305. gh_id_list = self.get_group_server_accounts()
  306. L = []
  307. for gh_id in gh_id_list:
  308. publish_articles = self.get_publish_articles(gh_id)
  309. for article in publish_articles:
  310. try:
  311. group_id_tuple = tuple(article["group"].split(","))
  312. fans_count = self.get_fans_num(gh_id, group_id_tuple)[0][
  313. "fans_count"
  314. ]
  315. root_source_id_list = article["root_source_id_list"]
  316. fission_info = self.get_fission_info(root_source_id_list)
  317. article["uv"] = fission_info[0]["uv"]
  318. article["first_uv"] = fission_info[0]["first_uv"]
  319. article["split_uv"] = fission_info[0]["split_uv"]
  320. article["T+0_fission"] = fission_info[0]["T+0_fission"]
  321. article["fans_count"] = fans_count
  322. L.append(article)
  323. except Exception as e:
  324. print(f"article {article['ContentUrl']} is not available, skip it")
  325. df = pd.DataFrame(L)
  326. df.to_csv("temp2.csv", index=False)
  327. class FwhGroupPublishMonitor(FwhDataRecycle):
  328. def get_sent_fans(self, date_string: str, gh_id: str) -> int:
  329. """
  330. get the number of fans sent on the specified date
  331. """
  332. fetch_query = f"""
  333. select sum(sent_count) as 'total_sent_fans'
  334. from long_articles_group_send_result
  335. where publish_date = %s
  336. and gh_id = %s
  337. and status = %s;
  338. """
  339. fetch_response = self.long_articles_client.fetch(
  340. fetch_query,
  341. cursor_type=DictCursor,
  342. params=(date_string, gh_id, self.PUBLISH_SUCCESS_STATUS),
  343. )
  344. fans_count = fetch_response[0]["total_sent_fans"]
  345. return fans_count if fans_count else 0
  346. def get_remain_fans(self, gh_id: str):
  347. """
  348. get the number of fans remain
  349. """
  350. fetch_query = f"""
  351. select count(1) as 'remain_fans'
  352. from article_user_group
  353. where gzh_id = %s and is_delete = %s and remaining_count > %s;
  354. """
  355. fetch_response = self.piaoquan_client.fetch(
  356. fetch_query, cursor_type=DictCursor, params=(gh_id, 0, 0)
  357. )
  358. response = fetch_response[0]["remain_fans"]
  359. return response if response else 0
  360. def get_remain_publish_times(self, gh_id: str):
  361. """
  362. 获取剩余可发布次数
  363. """
  364. fetch_query = f"""
  365. select sum(remaining_count) as 'remain_publish_times'
  366. from article_user_group
  367. where gzh_id = %s and is_delete = %s;
  368. """
  369. fetch_response = self.piaoquan_client.fetch(
  370. fetch_query, cursor_type=DictCursor, params=(gh_id, 0)
  371. )
  372. response = fetch_response[0]["remain_publish_times"]
  373. return response if response else 0
  374. def deal(self, date_string: str = datetime.today().strftime("%Y-%m-%d")):
  375. """
  376. monitor the publish record
  377. """
  378. now = datetime.now()
  379. # if now.hour < 12:
  380. gh_id_list = self.get_group_server_accounts()
  381. # get rest publish days
  382. remain_days = days_remaining_in_month()
  383. print(remain_days)
  384. print(type(remain_days))
  385. # get table columns
  386. columns = [
  387. self.feishu_robot.create_feishu_columns_sheet(
  388. sheet_type="plain_text",
  389. sheet_name="account_name",
  390. display_name="公众号名称",
  391. ),
  392. self.feishu_robot.create_feishu_columns_sheet(
  393. sheet_type="number", sheet_name="sent_fans", display_name="今日发布次数"
  394. ),
  395. self.feishu_robot.create_feishu_columns_sheet(
  396. sheet_type="number",
  397. sheet_name="rest_publish_times",
  398. display_name="发文前剩余发文次数",
  399. ),
  400. self.feishu_robot.create_feishu_columns_sheet(
  401. sheet_type="number",
  402. sheet_name="rest_publish_fans",
  403. display_name="发文前剩余发文粉丝数",
  404. ),
  405. self.feishu_robot.create_feishu_columns_sheet(
  406. sheet_type="number",
  407. sheet_name="remain_days",
  408. display_name="本月剩余天数",
  409. ),
  410. ]
  411. monitor_table = []
  412. for gh_id in gh_id_list:
  413. account_name = self.get_server_account_name(gh_id)
  414. # 获取今日发布粉丝数量
  415. print(account_name, "\t", gh_id)
  416. sent_fans = self.get_sent_fans(date_string, gh_id)
  417. print(sent_fans)
  418. # 获取剩余可发文人数
  419. remain_fans = self.get_remain_fans(gh_id)
  420. # 获取剩余可发文次数
  421. remain_publish_times = self.get_remain_publish_times(gh_id)
  422. temp = {
  423. "account_name": account_name,
  424. "rest_publish_times": int(remain_publish_times + sent_fans),
  425. "rest_publish_fans": int(remain_fans),
  426. "remain_days": int(remain_days),
  427. "sent_fans": int(sent_fans),
  428. }
  429. monitor_table.append(temp)
  430. # print(monitor_table)
  431. # print(json.dumps(monitor_table, ensure_ascii=False, indent=4))
  432. # feishu bot
  433. self.feishu_robot.bot(
  434. title=f"{date_string}服务号发文详情",
  435. detail={"columns": columns, "rows": monitor_table},
  436. table=True,
  437. mention=False,
  438. env="server_account_publish_monitor",
  439. )