import json import time import urllib.parse from tqdm import tqdm from pymysql.cursors import DictCursor from applications.db import DatabaseConnector from applications.utils import str_to_md5 from cold_start.crawler.wechat import get_article_detail from config import denet_config, long_articles_config, piaoquan_crawler_config class FwhDataRecycle: RECYCLE_INIT_STATUS = 0 RECYCLE_PROCESSING_STATUS = 1 RECYCLE_SUCCESS_STATUS = 2 RECYCLE_FAILED_STATUS = 99 PUBLISH_SUCCESS_STATUS = 2 STAT_PERIOD = 2 * 24 * 3600 def __init__(self): self.denet_client = DatabaseConnector(denet_config) self.denet_client.connect() self.long_articles_client = DatabaseConnector(long_articles_config) self.long_articles_client.connect() self.piaoquan_client = DatabaseConnector(piaoquan_crawler_config) self.piaoquan_client.connect() def get_group_server_accounts(self): fetch_query = f""" select gzh_id from article_gzh_developer; """ fetch_response = self.piaoquan_client.fetch(fetch_query, cursor_type=DictCursor) gh_id_list = [i["gzh_id"] for i in fetch_response] # gh_id_list = ['gh_5e543853d8f0'] return gh_id_list class FwhGroupPublishRecordManager(FwhDataRecycle): def get_published_articles(self): fetch_query = f""" select id, publish_content_id, gh_id, user_group_id from long_articles_group_send_result where status = %s and recycle_status = %s; """ fetch_response = self.long_articles_client.fetch( query=fetch_query, cursor_type=DictCursor, params=(self.PUBLISH_SUCCESS_STATUS, self.RECYCLE_INIT_STATUS), ) return fetch_response def get_article_url_from_aigc_system(self, publish_content_id, user_group_id): sql = f""" select t1.publish_stage_url from publish_content_stage_url t1 left join publish_content t2 on t1.publish_content_id = t2.id where t1.publish_content_id = %s and t1.user_group_id = %s; """ article_info = self.denet_client.fetch( sql, cursor_type=DictCursor, params=(publish_content_id, user_group_id), ) if article_info: return article_info[0] else: return None def update_recycle_status(self, record_id, ori_status, new_status): update_query = f""" update long_articles_group_send_result set recycle_status = %s where id = %s and recycle_status = %s; """ return self.long_articles_client.save( update_query, (new_status, record_id, ori_status) ) def set_article_url(self, record_id, article_url): update_query = f""" update long_articles_group_send_result set url = %s, recycle_status = %s where id = %s and recycle_status = %s; """ return self.long_articles_client.save( query=update_query, params=( article_url, self.RECYCLE_SUCCESS_STATUS, record_id, self.RECYCLE_PROCESSING_STATUS, ), ) def deal(self): publish_records = self.get_published_articles() for publish_record in tqdm(publish_records): publish_content_id = publish_record["publish_content_id"] record_id = publish_record["id"] group_id = publish_record["user_group_id"] # lock self.update_recycle_status( record_id, self.RECYCLE_INIT_STATUS, self.RECYCLE_PROCESSING_STATUS ) publish_call_back_info = self.get_article_url_from_aigc_system( publish_content_id, group_id ) if publish_call_back_info: article_url = publish_call_back_info["publish_stage_url"] if article_url: # set record and unlock self.set_article_url(record_id, article_url) else: # unlock self.update_recycle_status( record_id, self.RECYCLE_PROCESSING_STATUS, self.RECYCLE_INIT_STATUS, ) else: # unlock self.update_recycle_status( record_id, self.RECYCLE_PROCESSING_STATUS, self.RECYCLE_INIT_STATUS ) class SaveFwhDataToDatabase(FwhDataRecycle): def update_article_read_cnt(self, wx_sn, new_read_cnt): """ update article read cnt """ update_query = f""" update official_articles_v2 set show_view_count = %s where wx_sn = %s; """ return self.piaoquan_client.save(update_query, (new_read_cnt, wx_sn)) def save_data_to_database(self, article): """ save data to db """ insert_query = f""" insert into official_articles_v2 (ghId, accountName, appMsgId, title, Type, createTime, updateTime, ItemIndex, ContentUrl, show_view_count, wx_sn, title_md5, article_group, channel_content_id, root_source_id_list, publish_timestamp) values (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s); """ return self.piaoquan_client.save(insert_query, article) def get_stat_published_articles(self, gh_id): earliest_timestamp = int(time.time()) - self.STAT_PERIOD fetch_query = f""" select publish_date, account_name, gh_id, user_group_id, url, publish_timestamp from long_articles_group_send_result where gh_id = %s and recycle_status = %s and create_time > %s; """ return self.long_articles_client.fetch( fetch_query, DictCursor, (gh_id, self.RECYCLE_SUCCESS_STATUS, earliest_timestamp), ) def process_each_account_data(self, account_published_article_list): if not account_published_article_list: return for article in account_published_article_list: account_name = article["account_name"] gh_id = article["gh_id"] user_group_id = article["user_group_id"] url = article["url"] # get article detail info with spider try: article_detail_info = get_article_detail( url, is_count=True, is_cache=False ) time.sleep(1) content_url = article_detail_info["data"]["data"]["content_link"] app_msg_id = content_url.split("mid=")[-1].split("&")[0] wx_sn = content_url.split("sn=")[-1] publish_timestamp = int( article_detail_info["data"]["data"]["publish_timestamp"] / 1000 ) create_time = publish_timestamp update_time = publish_timestamp item_index = article_detail_info["data"]["data"]["item_index"] show_view_count = article_detail_info["data"]["data"]["view_count"] title = article_detail_info["data"]["data"]["title"] title_md5 = str_to_md5(title) channel_content_id = article_detail_info["data"]["data"][ "channel_content_id" ] mini_program_info = article_detail_info["data"]["data"]["mini_program"] root_source_id_list = [ urllib.parse.parse_qs(urllib.parse.unquote(i["path"]))[ "rootSourceId" ][0] for i in mini_program_info ] root_source_id_list = json.dumps(root_source_id_list) try: self.save_data_to_database( article=( gh_id, account_name, app_msg_id, title, "9", create_time, update_time, item_index, content_url, show_view_count, wx_sn, title_md5, user_group_id, channel_content_id, root_source_id_list, publish_timestamp, ) ) except Exception as e: self.update_article_read_cnt(wx_sn, show_view_count) except Exception as e: print(f"article {url} is not available, skip it") print(e) def deal(self): account_id_list = self.get_group_server_accounts() for account_id in account_id_list: publish_articles = tqdm( self.get_stat_published_articles(account_id), desc=f" {account_id}", ) self.process_each_account_data(publish_articles) class FwhDataExportTemp(FwhDataRecycle): def get_publish_articles(self, gh_id): sql = f""" -- select accountName, title, article_group, ItemIndex, show_view_count, from_unixtime(createTime, '%Y-%m-%d'), root_source_id_list select accountName, ContentUrl, title, ItemIndex, from_unixtime(createTime, '%Y-%m-%d'), sum(show_view_count), group_concat(article_group) as 'group', root_source_id_list from official_articles_v2 where from_unixtime(publish_timestamp) between '2025-06-09' and '2025-06-13' and ghId = '{gh_id}' and article_group is not null group by accountName, title, ItemIndex; """ return self.piaoquan_client.fetch(query=sql, cursor_type=DictCursor) def get_fission_info(self, root_source_id_list): """ 获取裂变信息 """ root_source_id_tuple = tuple(json.loads(root_source_id_list)) query = f""" select sum(uv) as 'uv', sum(first_uv) as 'first_uv', sum(split_uv) as 'split_uv', sum(split0) as 'T+0_fission' from changwen_data_rootsourceid where root_source_id in %s; """ return self.long_articles_client.fetch( query=query, cursor_type=DictCursor, params=(root_source_id_tuple,) ) def get_fans_num(self, gh_id, group_id_tuple): sql = f""" select count(1) as 'fans_count' from article_user_group where gzh_id = %s and user_group_id in %s and is_delete = 0; """ return self.piaoquan_client.fetch( query=sql, cursor_type=DictCursor, params=(gh_id, group_id_tuple) ) def deal(self): import pandas as pd gh_id_list = self.get_group_server_accounts() L = [] for gh_id in gh_id_list: publish_articles = self.get_publish_articles(gh_id) for article in publish_articles: try: group_id_tuple = tuple(article["group"].split(",")) fans_count = self.get_fans_num(gh_id, group_id_tuple)[0][ "fans_count" ] root_source_id_list = article["root_source_id_list"] fission_info = self.get_fission_info(root_source_id_list) article["uv"] = fission_info[0]["uv"] article["first_uv"] = fission_info[0]["first_uv"] article["split_uv"] = fission_info[0]["split_uv"] article["T+0_fission"] = fission_info[0]["T+0_fission"] article["fans_count"] = fans_count L.append(article) except Exception as e: print(f"article {article['ContentUrl']} is not available, skip it") df = pd.DataFrame(L) df.to_csv("temp2.csv", index=False)