fwh_data_recycle.py 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373
  1. import json
  2. import time
  3. import urllib.parse
  4. from datetime import datetime
  5. from typing import Optional
  6. from tqdm import tqdm
  7. from pymysql.cursors import DictCursor
  8. from applications.api import FeishuBotApi
  9. from applications.db import DatabaseConnector
  10. from applications.utils import str_to_md5
  11. from cold_start.crawler.wechat import get_article_detail
  12. from config import denet_config, long_articles_config, piaoquan_crawler_config
  13. class FwhDataRecycle:
  14. RECYCLE_INIT_STATUS = 0
  15. RECYCLE_PROCESSING_STATUS = 1
  16. RECYCLE_SUCCESS_STATUS = 2
  17. RECYCLE_FAILED_STATUS = 99
  18. PUBLISH_SUCCESS_STATUS = 2
  19. STAT_PERIOD = 2 * 24 * 3600
  20. def __init__(self):
  21. self.denet_client = DatabaseConnector(denet_config)
  22. self.denet_client.connect()
  23. self.long_articles_client = DatabaseConnector(long_articles_config)
  24. self.long_articles_client.connect()
  25. self.piaoquan_client = DatabaseConnector(piaoquan_crawler_config)
  26. self.piaoquan_client.connect()
  27. self.feishu_robot = FeishuBotApi()
  28. def get_group_server_accounts(self):
  29. fetch_query = f"""
  30. select gzh_id from article_gzh_developer;
  31. """
  32. fetch_response = self.piaoquan_client.fetch(fetch_query, cursor_type=DictCursor)
  33. gh_id_list = [i["gzh_id"] for i in fetch_response]
  34. # gh_id_list = ['gh_5e543853d8f0']
  35. return gh_id_list
  36. def get_server_account_name(self, gh_id: str) -> Optional[str]:
  37. fetch_query = f"""
  38. select account_name from long_articles_group_send_result where gh_id = %s limit 1;
  39. """
  40. fetch_response = self.long_articles_client.fetch(
  41. fetch_query, cursor_type=DictCursor, params=(gh_id,)
  42. )
  43. account_name = fetch_response[0]["account_name"] if fetch_response else None
  44. return account_name
  45. class FwhGroupPublishRecordManager(FwhDataRecycle):
  46. def get_published_articles(self):
  47. fetch_query = f"""
  48. select id, publish_content_id, gh_id, user_group_id
  49. from long_articles_group_send_result
  50. where status = %s and recycle_status = %s;
  51. """
  52. fetch_response = self.long_articles_client.fetch(
  53. query=fetch_query,
  54. cursor_type=DictCursor,
  55. params=(self.PUBLISH_SUCCESS_STATUS, self.RECYCLE_INIT_STATUS),
  56. )
  57. return fetch_response
  58. def get_article_url_from_aigc_system(self, publish_content_id, user_group_id):
  59. sql = f"""
  60. select t1.publish_stage_url
  61. from publish_content_stage_url t1
  62. left join publish_content t2 on t1.publish_content_id = t2.id
  63. where t1.publish_content_id = %s and t1.user_group_id = %s;
  64. """
  65. article_info = self.denet_client.fetch(
  66. sql,
  67. cursor_type=DictCursor,
  68. params=(publish_content_id, user_group_id),
  69. )
  70. if article_info:
  71. return article_info[0]
  72. else:
  73. return None
  74. def update_recycle_status(self, record_id, ori_status, new_status):
  75. update_query = f"""
  76. update long_articles_group_send_result
  77. set recycle_status = %s
  78. where id = %s and recycle_status = %s;
  79. """
  80. return self.long_articles_client.save(
  81. update_query, (new_status, record_id, ori_status)
  82. )
  83. def set_article_url(self, record_id, article_url):
  84. update_query = f"""
  85. update long_articles_group_send_result
  86. set url = %s, recycle_status = %s
  87. where id = %s and recycle_status = %s;
  88. """
  89. return self.long_articles_client.save(
  90. query=update_query,
  91. params=(
  92. article_url,
  93. self.RECYCLE_SUCCESS_STATUS,
  94. record_id,
  95. self.RECYCLE_PROCESSING_STATUS,
  96. ),
  97. )
  98. def deal(self):
  99. publish_records = self.get_published_articles()
  100. for publish_record in tqdm(publish_records):
  101. publish_content_id = publish_record["publish_content_id"]
  102. record_id = publish_record["id"]
  103. group_id = publish_record["user_group_id"]
  104. # lock
  105. self.update_recycle_status(
  106. record_id, self.RECYCLE_INIT_STATUS, self.RECYCLE_PROCESSING_STATUS
  107. )
  108. publish_call_back_info = self.get_article_url_from_aigc_system(
  109. publish_content_id, group_id
  110. )
  111. if publish_call_back_info:
  112. article_url = publish_call_back_info["publish_stage_url"]
  113. if article_url:
  114. # set record and unlock
  115. self.set_article_url(record_id, article_url)
  116. else:
  117. # unlock
  118. self.update_recycle_status(
  119. record_id,
  120. self.RECYCLE_PROCESSING_STATUS,
  121. self.RECYCLE_INIT_STATUS,
  122. )
  123. else:
  124. # unlock
  125. self.update_recycle_status(
  126. record_id, self.RECYCLE_PROCESSING_STATUS, self.RECYCLE_INIT_STATUS
  127. )
  128. def monitor(self, date_string: str = datetime.today().strftime("%Y-%m-%d")):
  129. """
  130. monitor the publish record
  131. """
  132. now = datetime.now()
  133. if now.hour < 12:
  134. account_list = self.get_group_server_accounts()
  135. do_not_publish_account = []
  136. sql = f"""
  137. select account_name as '账号名称', gh_id, count(distinct user_group_id) as '发文组数'
  138. from long_articles_group_send_result
  139. where publish_date = %s
  140. group by account_name, gh_id;
  141. """
  142. publish_records = self.long_articles_client.fetch(
  143. query=sql, cursor_type=DictCursor, params=(date_string,)
  144. )
  145. self.feishu_robot.bot(
  146. title=f"{date_string}服务号发文记录",
  147. mention=False,
  148. detail=publish_records,
  149. env="server_account_publish_monitor",
  150. )
  151. publish_account_id_set = set([i["gh_id"] for i in publish_records])
  152. for account_id in account_list:
  153. if account_id not in publish_account_id_set:
  154. account_name = self.get_server_account_name(account_id)
  155. do_not_publish_account.append(
  156. {
  157. "account_name": account_name,
  158. "gh_id": account_id,
  159. }
  160. )
  161. if do_not_publish_account:
  162. self.feishu_robot.bot(
  163. title=f"{date_string}发现服务号存在未发文情况",
  164. detail=do_not_publish_account,
  165. env="server_account_publish_monitor",
  166. )
  167. class SaveFwhDataToDatabase(FwhDataRecycle):
  168. def update_article_read_cnt(self, wx_sn, new_read_cnt):
  169. """
  170. update article read cnt
  171. """
  172. if new_read_cnt <= 0:
  173. return 0
  174. update_query = f"""
  175. update official_articles_v2
  176. set show_view_count = %s
  177. where wx_sn = %s;
  178. """
  179. return self.piaoquan_client.save(update_query, (new_read_cnt, wx_sn))
  180. def save_data_to_database(self, article):
  181. """
  182. save data to db
  183. """
  184. insert_query = f"""
  185. insert into official_articles_v2
  186. (ghId, accountName, appMsgId, title, Type, createTime, updateTime, ItemIndex, ContentUrl, show_view_count,
  187. wx_sn, title_md5, article_group, channel_content_id, root_source_id_list, publish_timestamp)
  188. values (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s);
  189. """
  190. return self.piaoquan_client.save(insert_query, article)
  191. def get_stat_published_articles(self, gh_id):
  192. earliest_timestamp = int(time.time()) - self.STAT_PERIOD
  193. fetch_query = f"""
  194. select publish_date, account_name, gh_id, user_group_id, url, publish_timestamp
  195. from long_articles_group_send_result
  196. where gh_id = %s and recycle_status = %s and create_time > %s;
  197. """
  198. return self.long_articles_client.fetch(
  199. fetch_query,
  200. DictCursor,
  201. (gh_id, self.RECYCLE_SUCCESS_STATUS, earliest_timestamp),
  202. )
  203. def process_each_account_data(self, account_published_article_list):
  204. if not account_published_article_list:
  205. return
  206. for article in account_published_article_list:
  207. account_name = article["account_name"]
  208. gh_id = article["gh_id"]
  209. user_group_id = article["user_group_id"]
  210. url = article["url"]
  211. # get article detail info with spider
  212. try:
  213. article_detail_info = get_article_detail(
  214. url, is_count=True, is_cache=False
  215. )
  216. time.sleep(1)
  217. content_url = article_detail_info["data"]["data"]["content_link"]
  218. app_msg_id = content_url.split("mid=")[-1].split("&")[0]
  219. wx_sn = content_url.split("sn=")[-1]
  220. publish_timestamp = int(
  221. article_detail_info["data"]["data"]["publish_timestamp"] / 1000
  222. )
  223. create_time = publish_timestamp
  224. update_time = publish_timestamp
  225. item_index = article_detail_info["data"]["data"]["item_index"]
  226. show_view_count = article_detail_info["data"]["data"]["view_count"]
  227. title = article_detail_info["data"]["data"]["title"]
  228. title_md5 = str_to_md5(title)
  229. channel_content_id = article_detail_info["data"]["data"][
  230. "channel_content_id"
  231. ]
  232. mini_program_info = article_detail_info["data"]["data"]["mini_program"]
  233. root_source_id_list = [
  234. urllib.parse.parse_qs(urllib.parse.unquote(i["path"]))[
  235. "rootSourceId"
  236. ][0]
  237. for i in mini_program_info
  238. ]
  239. root_source_id_list = json.dumps(root_source_id_list)
  240. try:
  241. self.save_data_to_database(
  242. article=(
  243. gh_id,
  244. account_name,
  245. app_msg_id,
  246. title,
  247. "9",
  248. create_time,
  249. update_time,
  250. item_index,
  251. content_url,
  252. show_view_count,
  253. wx_sn,
  254. title_md5,
  255. user_group_id,
  256. channel_content_id,
  257. root_source_id_list,
  258. publish_timestamp,
  259. )
  260. )
  261. except Exception as e:
  262. self.update_article_read_cnt(wx_sn, show_view_count)
  263. except Exception as e:
  264. print(f"article {url} is not available, skip it")
  265. print(e)
  266. def deal(self):
  267. account_id_list = self.get_group_server_accounts()
  268. for account_id in account_id_list:
  269. publish_articles = tqdm(
  270. self.get_stat_published_articles(account_id),
  271. desc=f"<crawling> {account_id}",
  272. )
  273. self.process_each_account_data(publish_articles)
  274. class FwhDataExportTemp(FwhDataRecycle):
  275. def get_publish_articles(self, gh_id):
  276. sql = f"""
  277. -- select accountName, title, article_group, ItemIndex, show_view_count, from_unixtime(createTime, '%Y-%m-%d'), root_source_id_list
  278. select accountName, ContentUrl, title, ItemIndex, from_unixtime(createTime, '%Y-%m-%d'), sum(show_view_count), group_concat(article_group) as 'group', root_source_id_list
  279. from official_articles_v2
  280. where from_unixtime(publish_timestamp) between '2025-06-09' and '2025-06-13'
  281. and ghId = '{gh_id}' and article_group is not null
  282. group by accountName, title, ItemIndex;
  283. """
  284. return self.piaoquan_client.fetch(query=sql, cursor_type=DictCursor)
  285. def get_fission_info(self, root_source_id_list):
  286. """
  287. 获取裂变信息
  288. """
  289. root_source_id_tuple = tuple(json.loads(root_source_id_list))
  290. query = f"""
  291. select sum(uv) as 'uv', sum(first_uv) as 'first_uv', sum(split_uv) as 'split_uv', sum(split0) as 'T+0_fission'
  292. from changwen_data_rootsourceid
  293. where root_source_id
  294. in %s;
  295. """
  296. return self.long_articles_client.fetch(
  297. query=query, cursor_type=DictCursor, params=(root_source_id_tuple,)
  298. )
  299. def get_fans_num(self, gh_id, group_id_tuple):
  300. sql = f"""
  301. select count(1) as 'fans_count'
  302. from article_user_group
  303. where gzh_id = %s and user_group_id in %s
  304. and is_delete = 0;
  305. """
  306. return self.piaoquan_client.fetch(
  307. query=sql, cursor_type=DictCursor, params=(gh_id, group_id_tuple)
  308. )
  309. def deal(self):
  310. import pandas as pd
  311. gh_id_list = self.get_group_server_accounts()
  312. L = []
  313. for gh_id in gh_id_list:
  314. publish_articles = self.get_publish_articles(gh_id)
  315. for article in publish_articles:
  316. try:
  317. group_id_tuple = tuple(article["group"].split(","))
  318. fans_count = self.get_fans_num(gh_id, group_id_tuple)[0][
  319. "fans_count"
  320. ]
  321. root_source_id_list = article["root_source_id_list"]
  322. fission_info = self.get_fission_info(root_source_id_list)
  323. article["uv"] = fission_info[0]["uv"]
  324. article["first_uv"] = fission_info[0]["first_uv"]
  325. article["split_uv"] = fission_info[0]["split_uv"]
  326. article["T+0_fission"] = fission_info[0]["T+0_fission"]
  327. article["fans_count"] = fans_count
  328. L.append(article)
  329. except Exception as e:
  330. print(f"article {article['ContentUrl']} is not available, skip it")
  331. df = pd.DataFrame(L)
  332. df.to_csv("temp2.csv", index=False)