fwh_data_recycle.py 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312
  1. import json
  2. import time
  3. import urllib.parse
  4. from tqdm import tqdm
  5. from pymysql.cursors import DictCursor
  6. from applications.db import DatabaseConnector
  7. from applications.utils import str_to_md5
  8. from cold_start.crawler.wechat import get_article_detail
  9. from config import denet_config, long_articles_config, piaoquan_crawler_config
  10. class FwhDataRecycle:
  11. RECYCLE_INIT_STATUS = 0
  12. RECYCLE_PROCESSING_STATUS = 1
  13. RECYCLE_SUCCESS_STATUS = 2
  14. RECYCLE_FAILED_STATUS = 99
  15. PUBLISH_SUCCESS_STATUS = 2
  16. STAT_PERIOD = 2 * 24 * 3600
  17. def __init__(self):
  18. self.denet_client = DatabaseConnector(denet_config)
  19. self.denet_client.connect()
  20. self.long_articles_client = DatabaseConnector(long_articles_config)
  21. self.long_articles_client.connect()
  22. self.piaoquan_client = DatabaseConnector(piaoquan_crawler_config)
  23. self.piaoquan_client.connect()
  24. def get_group_server_accounts(self):
  25. fetch_query = f"""
  26. select gzh_id from article_gzh_developer;
  27. """
  28. fetch_response = self.piaoquan_client.fetch(fetch_query, cursor_type=DictCursor)
  29. gh_id_list = [i["gzh_id"] for i in fetch_response]
  30. # gh_id_list = ['gh_5e543853d8f0']
  31. return gh_id_list
  32. class FwhGroupPublishRecordManager(FwhDataRecycle):
  33. def get_published_articles(self):
  34. fetch_query = f"""
  35. select id, publish_content_id, gh_id, user_group_id
  36. from long_articles_group_send_result
  37. where status = %s and recycle_status = %s;
  38. """
  39. fetch_response = self.long_articles_client.fetch(
  40. query=fetch_query,
  41. cursor_type=DictCursor,
  42. params=(self.PUBLISH_SUCCESS_STATUS, self.RECYCLE_INIT_STATUS),
  43. )
  44. return fetch_response
  45. def get_article_url_from_aigc_system(self, publish_content_id, user_group_id):
  46. sql = f"""
  47. select t1.publish_stage_url
  48. from publish_content_stage_url t1
  49. left join publish_content t2 on t1.publish_content_id = t2.id
  50. where t1.publish_content_id = %s and t1.user_group_id = %s;
  51. """
  52. article_info = self.denet_client.fetch(
  53. sql,
  54. cursor_type=DictCursor,
  55. params=(publish_content_id, user_group_id),
  56. )
  57. if article_info:
  58. return article_info[0]
  59. else:
  60. return None
  61. def update_recycle_status(self, record_id, ori_status, new_status):
  62. update_query = f"""
  63. update long_articles_group_send_result
  64. set recycle_status = %s
  65. where id = %s and recycle_status = %s;
  66. """
  67. return self.long_articles_client.save(
  68. update_query, (new_status, record_id, ori_status)
  69. )
  70. def set_article_url(self, record_id, article_url):
  71. update_query = f"""
  72. update long_articles_group_send_result
  73. set url = %s, recycle_status = %s
  74. where id = %s and recycle_status = %s;
  75. """
  76. return self.long_articles_client.save(
  77. query=update_query,
  78. params=(
  79. article_url,
  80. self.RECYCLE_SUCCESS_STATUS,
  81. record_id,
  82. self.RECYCLE_PROCESSING_STATUS,
  83. ),
  84. )
  85. def deal(self):
  86. publish_records = self.get_published_articles()
  87. for publish_record in tqdm(publish_records):
  88. publish_content_id = publish_record["publish_content_id"]
  89. record_id = publish_record["id"]
  90. group_id = publish_record["user_group_id"]
  91. # lock
  92. self.update_recycle_status(
  93. record_id, self.RECYCLE_INIT_STATUS, self.RECYCLE_PROCESSING_STATUS
  94. )
  95. publish_call_back_info = self.get_article_url_from_aigc_system(
  96. publish_content_id, group_id
  97. )
  98. if publish_call_back_info:
  99. article_url = publish_call_back_info["publish_stage_url"]
  100. if article_url:
  101. # set record and unlock
  102. self.set_article_url(record_id, article_url)
  103. else:
  104. # unlock
  105. self.update_recycle_status(
  106. record_id,
  107. self.RECYCLE_PROCESSING_STATUS,
  108. self.RECYCLE_INIT_STATUS,
  109. )
  110. else:
  111. # unlock
  112. self.update_recycle_status(
  113. record_id, self.RECYCLE_PROCESSING_STATUS, self.RECYCLE_INIT_STATUS
  114. )
  115. class SaveFwhDataToDatabase(FwhDataRecycle):
  116. def update_article_read_cnt(self, wx_sn, new_read_cnt):
  117. """
  118. update article read cnt
  119. """
  120. update_query = f"""
  121. update official_articles_v2
  122. set show_view_count = %s
  123. where wx_sn = %s;
  124. """
  125. return self.piaoquan_client.save(update_query, (new_read_cnt, wx_sn))
  126. def save_data_to_database(self, article):
  127. """
  128. save data to db
  129. """
  130. insert_query = f"""
  131. insert into official_articles_v2
  132. (ghId, accountName, appMsgId, title, Type, createTime, updateTime, ItemIndex, ContentUrl, show_view_count,
  133. wx_sn, title_md5, article_group, channel_content_id, root_source_id_list, publish_timestamp)
  134. values (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s);
  135. """
  136. return self.piaoquan_client.save(insert_query, article)
  137. def get_stat_published_articles(self, gh_id):
  138. earliest_timestamp = int(time.time()) - self.STAT_PERIOD
  139. fetch_query = f"""
  140. select publish_date, account_name, gh_id, user_group_id, url, publish_timestamp
  141. from long_articles_group_send_result
  142. where gh_id = %s and recycle_status = %s and create_time > %s;
  143. """
  144. return self.long_articles_client.fetch(
  145. fetch_query,
  146. DictCursor,
  147. (gh_id, self.RECYCLE_SUCCESS_STATUS, earliest_timestamp),
  148. )
  149. def process_each_account_data(self, account_published_article_list):
  150. if not account_published_article_list:
  151. return
  152. for article in account_published_article_list:
  153. account_name = article["account_name"]
  154. gh_id = article["gh_id"]
  155. user_group_id = article["user_group_id"]
  156. url = article["url"]
  157. # get article detail info with spider
  158. try:
  159. article_detail_info = get_article_detail(
  160. url, is_count=True, is_cache=False
  161. )
  162. time.sleep(1)
  163. content_url = article_detail_info["data"]["data"]["content_link"]
  164. app_msg_id = content_url.split("mid=")[-1].split("&")[0]
  165. wx_sn = content_url.split("sn=")[-1]
  166. publish_timestamp = int(
  167. article_detail_info["data"]["data"]["publish_timestamp"] / 1000
  168. )
  169. create_time = publish_timestamp
  170. update_time = publish_timestamp
  171. item_index = article_detail_info["data"]["data"]["item_index"]
  172. show_view_count = article_detail_info["data"]["data"]["view_count"]
  173. title = article_detail_info["data"]["data"]["title"]
  174. title_md5 = str_to_md5(title)
  175. channel_content_id = article_detail_info["data"]["data"][
  176. "channel_content_id"
  177. ]
  178. mini_program_info = article_detail_info["data"]["data"]["mini_program"]
  179. root_source_id_list = [
  180. urllib.parse.parse_qs(urllib.parse.unquote(i["path"]))[
  181. "rootSourceId"
  182. ][0]
  183. for i in mini_program_info
  184. ]
  185. root_source_id_list = json.dumps(root_source_id_list)
  186. try:
  187. self.save_data_to_database(
  188. article=(
  189. gh_id,
  190. account_name,
  191. app_msg_id,
  192. title,
  193. "9",
  194. create_time,
  195. update_time,
  196. item_index,
  197. content_url,
  198. show_view_count,
  199. wx_sn,
  200. title_md5,
  201. user_group_id,
  202. channel_content_id,
  203. root_source_id_list,
  204. publish_timestamp,
  205. )
  206. )
  207. except Exception as e:
  208. self.update_article_read_cnt(wx_sn, show_view_count)
  209. except Exception as e:
  210. print(f"article {url} is not available, skip it")
  211. print(e)
  212. def deal(self):
  213. account_id_list = self.get_group_server_accounts()
  214. for account_id in account_id_list:
  215. publish_articles = tqdm(
  216. self.get_stat_published_articles(account_id),
  217. desc=f"<crawling> {account_id}",
  218. )
  219. self.process_each_account_data(publish_articles)
  220. class FwhDataExportTemp(FwhDataRecycle):
  221. def get_publish_articles(self, gh_id):
  222. sql = f"""
  223. -- select accountName, title, article_group, ItemIndex, show_view_count, from_unixtime(createTime, '%Y-%m-%d'), root_source_id_list
  224. select accountName, ContentUrl, title, ItemIndex, from_unixtime(createTime, '%Y-%m-%d'), sum(show_view_count), group_concat(article_group) as 'group', root_source_id_list
  225. from official_articles_v2
  226. where from_unixtime(publish_timestamp) between '2025-06-09' and '2025-06-13'
  227. and ghId = '{gh_id}' and article_group is not null
  228. group by accountName, title, ItemIndex;
  229. """
  230. return self.piaoquan_client.fetch(query=sql, cursor_type=DictCursor)
  231. def get_fission_info(self, root_source_id_list):
  232. """
  233. 获取裂变信息
  234. """
  235. root_source_id_tuple = tuple(json.loads(root_source_id_list))
  236. query = f"""
  237. select sum(uv) as 'uv', sum(first_uv) as 'first_uv', sum(split_uv) as 'split_uv', sum(split0) as 'T+0_fission'
  238. from changwen_data_rootsourceid
  239. where root_source_id
  240. in %s;
  241. """
  242. return self.long_articles_client.fetch(
  243. query=query, cursor_type=DictCursor, params=(root_source_id_tuple,)
  244. )
  245. def get_fans_num(self, gh_id, group_id_tuple):
  246. sql = f"""
  247. select count(1) as 'fans_count'
  248. from article_user_group
  249. where gzh_id = %s and user_group_id in %s
  250. and is_delete = 0;
  251. """
  252. return self.piaoquan_client.fetch(
  253. query=sql, cursor_type=DictCursor, params=(gh_id, group_id_tuple)
  254. )
  255. def deal(self):
  256. import pandas as pd
  257. gh_id_list = self.get_group_server_accounts()
  258. L = []
  259. for gh_id in gh_id_list:
  260. publish_articles = self.get_publish_articles(gh_id)
  261. for article in publish_articles:
  262. try:
  263. group_id_tuple = tuple(article["group"].split(","))
  264. fans_count = self.get_fans_num(gh_id, group_id_tuple)[0][
  265. "fans_count"
  266. ]
  267. root_source_id_list = article["root_source_id_list"]
  268. fission_info = self.get_fission_info(root_source_id_list)
  269. article["uv"] = fission_info[0]["uv"]
  270. article["first_uv"] = fission_info[0]["first_uv"]
  271. article["split_uv"] = fission_info[0]["split_uv"]
  272. article["T+0_fission"] = fission_info[0]["T+0_fission"]
  273. article["fans_count"] = fans_count
  274. L.append(article)
  275. except Exception as e:
  276. print(f"article {article['ContentUrl']} is not available, skip it")
  277. df = pd.DataFrame(L)
  278. df.to_csv("temp2.csv", index=False)