fwh_data_recycle.py 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315
  1. import json
  2. import time
  3. import urllib.parse
  4. from tqdm import tqdm
  5. from pymysql.cursors import DictCursor
  6. from applications.db import DatabaseConnector
  7. from applications.utils import str_to_md5
  8. from cold_start.crawler.wechat import get_article_detail
  9. from config import denet_config, long_articles_config, piaoquan_crawler_config
  10. class FwhDataRecycle:
  11. RECYCLE_INIT_STATUS = 0
  12. RECYCLE_PROCESSING_STATUS = 1
  13. RECYCLE_SUCCESS_STATUS = 2
  14. RECYCLE_FAILED_STATUS = 99
  15. PUBLISH_SUCCESS_STATUS = 2
  16. STAT_PERIOD = 2 * 24 * 3600
  17. def __init__(self):
  18. self.denet_client = DatabaseConnector(denet_config)
  19. self.denet_client.connect()
  20. self.long_articles_client = DatabaseConnector(long_articles_config)
  21. self.long_articles_client.connect()
  22. self.piaoquan_client = DatabaseConnector(piaoquan_crawler_config)
  23. self.piaoquan_client.connect()
  24. def get_group_server_accounts(self):
  25. fetch_query = f"""
  26. select gzh_id from article_gzh_developer;
  27. """
  28. fetch_response = self.piaoquan_client.fetch(fetch_query, cursor_type=DictCursor)
  29. gh_id_list = [i["gzh_id"] for i in fetch_response]
  30. # gh_id_list = ['gh_5e543853d8f0']
  31. return gh_id_list
  32. class FwhGroupPublishRecordManager(FwhDataRecycle):
  33. def get_published_articles(self):
  34. fetch_query = f"""
  35. select id, publish_content_id, gh_id, user_group_id
  36. from long_articles_group_send_result
  37. where status = %s and recycle_status = %s;
  38. """
  39. fetch_response = self.long_articles_client.fetch(
  40. query=fetch_query,
  41. cursor_type=DictCursor,
  42. params=(self.PUBLISH_SUCCESS_STATUS, self.RECYCLE_INIT_STATUS),
  43. )
  44. return fetch_response
  45. def get_article_url_from_aigc_system(self, publish_content_id, user_group_id):
  46. sql = f"""
  47. select t1.publish_stage_url
  48. from publish_content_stage_url t1
  49. left join publish_content t2 on t1.publish_content_id = t2.id
  50. where t1.publish_content_id = %s and t1.user_group_id = %s;
  51. """
  52. article_info = self.denet_client.fetch(
  53. sql,
  54. cursor_type=DictCursor,
  55. params=(publish_content_id, user_group_id),
  56. )
  57. if article_info:
  58. return article_info[0]
  59. else:
  60. return None
  61. def update_recycle_status(self, record_id, ori_status, new_status):
  62. update_query = f"""
  63. update long_articles_group_send_result
  64. set recycle_status = %s
  65. where id = %s and recycle_status = %s;
  66. """
  67. return self.long_articles_client.save(
  68. update_query, (new_status, record_id, ori_status)
  69. )
  70. def set_article_url(self, record_id, article_url):
  71. update_query = f"""
  72. update long_articles_group_send_result
  73. set url = %s, recycle_status = %s
  74. where id = %s and recycle_status = %s;
  75. """
  76. return self.long_articles_client.save(
  77. query=update_query,
  78. params=(
  79. article_url,
  80. self.RECYCLE_SUCCESS_STATUS,
  81. record_id,
  82. self.RECYCLE_PROCESSING_STATUS,
  83. ),
  84. )
  85. def deal(self):
  86. publish_records = self.get_published_articles()
  87. for publish_record in tqdm(publish_records):
  88. publish_content_id = publish_record["publish_content_id"]
  89. record_id = publish_record["id"]
  90. group_id = publish_record["user_group_id"]
  91. # lock
  92. self.update_recycle_status(
  93. record_id, self.RECYCLE_INIT_STATUS, self.RECYCLE_PROCESSING_STATUS
  94. )
  95. publish_call_back_info = self.get_article_url_from_aigc_system(
  96. publish_content_id, group_id
  97. )
  98. if publish_call_back_info:
  99. article_url = publish_call_back_info["publish_stage_url"]
  100. if article_url:
  101. # set record and unlock
  102. self.set_article_url(record_id, article_url)
  103. else:
  104. # unlock
  105. self.update_recycle_status(
  106. record_id,
  107. self.RECYCLE_PROCESSING_STATUS,
  108. self.RECYCLE_INIT_STATUS,
  109. )
  110. else:
  111. # unlock
  112. self.update_recycle_status(
  113. record_id, self.RECYCLE_PROCESSING_STATUS, self.RECYCLE_INIT_STATUS
  114. )
  115. class SaveFwhDataToDatabase(FwhDataRecycle):
  116. def update_article_read_cnt(self, wx_sn, new_read_cnt):
  117. """
  118. update article read cnt
  119. """
  120. if new_read_cnt <= 0:
  121. return 0
  122. update_query = f"""
  123. update official_articles_v2
  124. set show_view_count = %s
  125. where wx_sn = %s;
  126. """
  127. return self.piaoquan_client.save(update_query, (new_read_cnt, wx_sn))
  128. def save_data_to_database(self, article):
  129. """
  130. save data to db
  131. """
  132. insert_query = f"""
  133. insert into official_articles_v2
  134. (ghId, accountName, appMsgId, title, Type, createTime, updateTime, ItemIndex, ContentUrl, show_view_count,
  135. wx_sn, title_md5, article_group, channel_content_id, root_source_id_list, publish_timestamp)
  136. values (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s);
  137. """
  138. return self.piaoquan_client.save(insert_query, article)
  139. def get_stat_published_articles(self, gh_id):
  140. earliest_timestamp = int(time.time()) - self.STAT_PERIOD
  141. fetch_query = f"""
  142. select publish_date, account_name, gh_id, user_group_id, url, publish_timestamp
  143. from long_articles_group_send_result
  144. where gh_id = %s and recycle_status = %s and create_time > %s;
  145. """
  146. return self.long_articles_client.fetch(
  147. fetch_query,
  148. DictCursor,
  149. (gh_id, self.RECYCLE_SUCCESS_STATUS, earliest_timestamp),
  150. )
  151. def process_each_account_data(self, account_published_article_list):
  152. if not account_published_article_list:
  153. return
  154. for article in account_published_article_list:
  155. account_name = article["account_name"]
  156. gh_id = article["gh_id"]
  157. user_group_id = article["user_group_id"]
  158. url = article["url"]
  159. # get article detail info with spider
  160. try:
  161. article_detail_info = get_article_detail(
  162. url, is_count=True, is_cache=False
  163. )
  164. time.sleep(1)
  165. content_url = article_detail_info["data"]["data"]["content_link"]
  166. app_msg_id = content_url.split("mid=")[-1].split("&")[0]
  167. wx_sn = content_url.split("sn=")[-1]
  168. publish_timestamp = int(
  169. article_detail_info["data"]["data"]["publish_timestamp"] / 1000
  170. )
  171. create_time = publish_timestamp
  172. update_time = publish_timestamp
  173. item_index = article_detail_info["data"]["data"]["item_index"]
  174. show_view_count = article_detail_info["data"]["data"]["view_count"]
  175. title = article_detail_info["data"]["data"]["title"]
  176. title_md5 = str_to_md5(title)
  177. channel_content_id = article_detail_info["data"]["data"][
  178. "channel_content_id"
  179. ]
  180. mini_program_info = article_detail_info["data"]["data"]["mini_program"]
  181. root_source_id_list = [
  182. urllib.parse.parse_qs(urllib.parse.unquote(i["path"]))[
  183. "rootSourceId"
  184. ][0]
  185. for i in mini_program_info
  186. ]
  187. root_source_id_list = json.dumps(root_source_id_list)
  188. try:
  189. self.save_data_to_database(
  190. article=(
  191. gh_id,
  192. account_name,
  193. app_msg_id,
  194. title,
  195. "9",
  196. create_time,
  197. update_time,
  198. item_index,
  199. content_url,
  200. show_view_count,
  201. wx_sn,
  202. title_md5,
  203. user_group_id,
  204. channel_content_id,
  205. root_source_id_list,
  206. publish_timestamp,
  207. )
  208. )
  209. except Exception as e:
  210. self.update_article_read_cnt(wx_sn, show_view_count)
  211. except Exception as e:
  212. print(f"article {url} is not available, skip it")
  213. print(e)
  214. def deal(self):
  215. account_id_list = self.get_group_server_accounts()
  216. for account_id in account_id_list:
  217. publish_articles = tqdm(
  218. self.get_stat_published_articles(account_id),
  219. desc=f"<crawling> {account_id}",
  220. )
  221. self.process_each_account_data(publish_articles)
  222. class FwhDataExportTemp(FwhDataRecycle):
  223. def get_publish_articles(self, gh_id):
  224. sql = f"""
  225. -- select accountName, title, article_group, ItemIndex, show_view_count, from_unixtime(createTime, '%Y-%m-%d'), root_source_id_list
  226. select accountName, ContentUrl, title, ItemIndex, from_unixtime(createTime, '%Y-%m-%d'), sum(show_view_count), group_concat(article_group) as 'group', root_source_id_list
  227. from official_articles_v2
  228. where from_unixtime(publish_timestamp) between '2025-06-09' and '2025-06-13'
  229. and ghId = '{gh_id}' and article_group is not null
  230. group by accountName, title, ItemIndex;
  231. """
  232. return self.piaoquan_client.fetch(query=sql, cursor_type=DictCursor)
  233. def get_fission_info(self, root_source_id_list):
  234. """
  235. 获取裂变信息
  236. """
  237. root_source_id_tuple = tuple(json.loads(root_source_id_list))
  238. query = f"""
  239. select sum(uv) as 'uv', sum(first_uv) as 'first_uv', sum(split_uv) as 'split_uv', sum(split0) as 'T+0_fission'
  240. from changwen_data_rootsourceid
  241. where root_source_id
  242. in %s;
  243. """
  244. return self.long_articles_client.fetch(
  245. query=query, cursor_type=DictCursor, params=(root_source_id_tuple,)
  246. )
  247. def get_fans_num(self, gh_id, group_id_tuple):
  248. sql = f"""
  249. select count(1) as 'fans_count'
  250. from article_user_group
  251. where gzh_id = %s and user_group_id in %s
  252. and is_delete = 0;
  253. """
  254. return self.piaoquan_client.fetch(
  255. query=sql, cursor_type=DictCursor, params=(gh_id, group_id_tuple)
  256. )
  257. def deal(self):
  258. import pandas as pd
  259. gh_id_list = self.get_group_server_accounts()
  260. L = []
  261. for gh_id in gh_id_list:
  262. publish_articles = self.get_publish_articles(gh_id)
  263. for article in publish_articles:
  264. try:
  265. group_id_tuple = tuple(article["group"].split(","))
  266. fans_count = self.get_fans_num(gh_id, group_id_tuple)[0][
  267. "fans_count"
  268. ]
  269. root_source_id_list = article["root_source_id_list"]
  270. fission_info = self.get_fission_info(root_source_id_list)
  271. article["uv"] = fission_info[0]["uv"]
  272. article["first_uv"] = fission_info[0]["first_uv"]
  273. article["split_uv"] = fission_info[0]["split_uv"]
  274. article["T+0_fission"] = fission_info[0]["T+0_fission"]
  275. article["fans_count"] = fans_count
  276. L.append(article)
  277. except Exception as e:
  278. print(f"article {article['ContentUrl']} is not available, skip it")
  279. df = pd.DataFrame(L)
  280. df.to_csv("temp2.csv", index=False)