|
@@ -0,0 +1,205 @@
|
|
|
+import json
|
|
|
+import time
|
|
|
+import urllib.parse
|
|
|
+from tqdm import tqdm
|
|
|
+
|
|
|
+from pymysql.cursors import DictCursor
|
|
|
+
|
|
|
+from applications.db import DatabaseConnector
|
|
|
+from applications.utils import str_to_md5
|
|
|
+from cold_start.crawler.wechat import get_article_detail
|
|
|
+from config import denet_config, long_articles_config, piaoquan_crawler_config
|
|
|
+
|
|
|
+
|
|
|
+class FwhDataRecycle:
|
|
|
+ RECYCLE_INIT_STATUS = 0
|
|
|
+ RECYCLE_PROCESSING_STATUS = 1
|
|
|
+ RECYCLE_SUCCESS_STATUS = 2
|
|
|
+ RECYCLE_FAILED_STATUS = 99
|
|
|
+
|
|
|
+ PUBLISH_SUCCESS_STATUS = 2
|
|
|
+
|
|
|
+ STAT_PERIOD = 3 * 24 * 3600
|
|
|
+
|
|
|
+ def __init__(self):
|
|
|
+ self.denet_client = DatabaseConnector(denet_config)
|
|
|
+ self.denet_client.connect()
|
|
|
+
|
|
|
+ self.long_articles_client = DatabaseConnector(long_articles_config)
|
|
|
+ self.long_articles_client.connect()
|
|
|
+
|
|
|
+ self.piaoquan_client = DatabaseConnector(piaoquan_crawler_config)
|
|
|
+ self.piaoquan_client.connect()
|
|
|
+
|
|
|
+
|
|
|
+class FwhGroupPublishRecordManager(FwhDataRecycle):
|
|
|
+
|
|
|
+ def get_published_articles(self):
|
|
|
+ fetch_query = f"""
|
|
|
+ select id, publish_content_id, gh_id, user_group_id
|
|
|
+ from long_articles_group_send_result
|
|
|
+ where status = %s and recycle_status = %s;
|
|
|
+ """
|
|
|
+ fetch_response = self.long_articles_client.fetch(
|
|
|
+ query=fetch_query,
|
|
|
+ cursor_type=DictCursor,
|
|
|
+ params=(self.PUBLISH_SUCCESS_STATUS, self.RECYCLE_INIT_STATUS),
|
|
|
+ )
|
|
|
+ return fetch_response
|
|
|
+
|
|
|
+ def get_article_url_from_aigc_system(self, publish_content_id, user_group_id):
|
|
|
+ sql = f"""
|
|
|
+ select t1.publish_stage_url, t2.publish_timestamp
|
|
|
+ from publish_content_stage_url t1
|
|
|
+ left join publish_content t2 on t1.publish_content_id = t2.id
|
|
|
+ where t1.publish_content_id = %s and t1.user_group_id = %s;
|
|
|
+ """
|
|
|
+ article_info = self.denet_client.fetch(
|
|
|
+ sql,
|
|
|
+ cursor_type=DictCursor,
|
|
|
+ params=(publish_content_id, user_group_id),
|
|
|
+ )
|
|
|
+ if article_info:
|
|
|
+ return article_info[0]
|
|
|
+ else:
|
|
|
+ return None
|
|
|
+
|
|
|
+ def update_recycle_status(self, record_id, ori_status, new_status):
|
|
|
+ update_query = f"""
|
|
|
+ update long_articles_group_send_result
|
|
|
+ set recycle_status = %s
|
|
|
+ where id = %s and recycle_status = %s;
|
|
|
+ """
|
|
|
+ return self.long_articles_client.save(
|
|
|
+ update_query, (new_status, record_id, ori_status)
|
|
|
+ )
|
|
|
+
|
|
|
+ def set_article_url(self, record_id, article_url, publish_timestamp):
|
|
|
+ update_query = f"""
|
|
|
+ update long_articles_group_send_result
|
|
|
+ set url = %s, publish_timestamp = %s, recycle_status = %s
|
|
|
+ where id = %s and recycle_status = %s;
|
|
|
+ """
|
|
|
+ return self.long_articles_client.save(
|
|
|
+ query=update_query,
|
|
|
+ params=(article_url, publish_timestamp, self.RECYCLE_SUCCESS_STATUS, record_id, self.RECYCLE_PROCESSING_STATUS)
|
|
|
+ )
|
|
|
+
|
|
|
+ def deal(self):
|
|
|
+ publish_records = self.get_published_articles()
|
|
|
+ for publish_record in tqdm(publish_records):
|
|
|
+ publish_content_id = publish_record["publish_content_id"]
|
|
|
+ record_id = publish_record["id"]
|
|
|
+ group_id = publish_record["user_group_id"]
|
|
|
+ # lock
|
|
|
+ self.update_recycle_status(record_id, self.RECYCLE_INIT_STATUS, self.RECYCLE_PROCESSING_STATUS)
|
|
|
+
|
|
|
+ publish_call_back_info = self.get_article_url_from_aigc_system(publish_content_id, group_id)
|
|
|
+ if publish_call_back_info:
|
|
|
+ article_url = publish_call_back_info["publish_stage_url"]
|
|
|
+ publish_timestamp = int(publish_call_back_info["publish_timestamp"] / 1000)
|
|
|
+ if article_url and publish_timestamp:
|
|
|
+ # set record and unlock
|
|
|
+ self.set_article_url(record_id, article_url, publish_timestamp)
|
|
|
+ else:
|
|
|
+ # unlock
|
|
|
+ self.update_recycle_status(record_id, self.RECYCLE_PROCESSING_STATUS, self.RECYCLE_INIT_STATUS)
|
|
|
+ else:
|
|
|
+ # unlock
|
|
|
+ self.update_recycle_status(record_id, self.RECYCLE_PROCESSING_STATUS, self.RECYCLE_INIT_STATUS)
|
|
|
+
|
|
|
+
|
|
|
+class SaveFwhDataToDatabase(FwhDataRecycle):
|
|
|
+
|
|
|
+ def update_article_read_cnt(self, wx_sn, new_read_cnt):
|
|
|
+ """
|
|
|
+ update article read cnt
|
|
|
+ """
|
|
|
+ update_query = f"""
|
|
|
+ update official_articles_v2
|
|
|
+ set show_view_count = %s
|
|
|
+ where wx_sn = %s;
|
|
|
+ """
|
|
|
+ return self.piaoquan_client.save(update_query, (new_read_cnt, wx_sn))
|
|
|
+
|
|
|
+ def save_data_to_database(self, article):
|
|
|
+ """
|
|
|
+ save data to db
|
|
|
+ """
|
|
|
+ insert_query = f"""
|
|
|
+ insert into official_articles_v2
|
|
|
+ (ghId, accountName, appMsgId, title, Type, createTime, updateTime, ItemIndex, ContentUrl, show_view_count,
|
|
|
+ wx_sn, title_md5, article_group, channel_content_id, root_source_id_list, publish_timestamp)
|
|
|
+ values (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s);
|
|
|
+ """
|
|
|
+ return self.piaoquan_client.save(insert_query, article)
|
|
|
+
|
|
|
+ def get_group_server_accounts(self):
|
|
|
+ fetch_query = f"""
|
|
|
+ select gzh_id from article_gzh_developer;
|
|
|
+ """
|
|
|
+ fetch_response = self.piaoquan_client.fetch(fetch_query, cursor_type=DictCursor)
|
|
|
+ gh_id_list = [i['gzh_id'] for i in fetch_response]
|
|
|
+ return gh_id_list
|
|
|
+
|
|
|
+ def get_stat_published_articles(self, gh_id):
|
|
|
+ earliest_timestamp = int(time.time()) - self.STAT_PERIOD
|
|
|
+ fetch_query = f"""
|
|
|
+ select publish_date, account_name, gh_id, user_group_id, url, publish_timestamp
|
|
|
+ from long_articles_group_send_result
|
|
|
+ where gh_id = %s and recycle_status = %s and publish_timestamp > %s;
|
|
|
+ """
|
|
|
+ return self.long_articles_client.fetch(fetch_query, DictCursor,(gh_id, self.RECYCLE_SUCCESS_STATUS, earliest_timestamp))
|
|
|
+
|
|
|
+ def process_each_account_data(self, account_published_article_list):
|
|
|
+ if not account_published_article_list:
|
|
|
+ return
|
|
|
+
|
|
|
+ for article in account_published_article_list:
|
|
|
+ account_name = article['account_name']
|
|
|
+ gh_id = article['gh_id']
|
|
|
+ user_group_id = article['user_group_id']
|
|
|
+ url = article['url']
|
|
|
+ publish_timestamp = article['publish_timestamp']
|
|
|
+ # get article detail info with spider
|
|
|
+
|
|
|
+ try:
|
|
|
+ article_detail_info = get_article_detail(url, is_count=True, is_cache=False)
|
|
|
+ time.sleep(3)
|
|
|
+ content_url = article_detail_info["data"]["data"]["content_link"]
|
|
|
+ app_msg_id = content_url.split("mid=")[-1].split("&")[0]
|
|
|
+ wx_sn = content_url.split("sn=")[-1]
|
|
|
+ create_time = publish_timestamp
|
|
|
+ update_time = publish_timestamp
|
|
|
+ item_index = article_detail_info["data"]["data"]["item_index"]
|
|
|
+ show_view_count = article_detail_info["data"]["data"]["view_count"]
|
|
|
+ title = article_detail_info["data"]["data"]["title"]
|
|
|
+ title_md5 = str_to_md5(title)
|
|
|
+ channel_content_id = article_detail_info["data"]["data"]["channel_content_id"]
|
|
|
+ mini_program_info = article_detail_info["data"]["data"]["mini_program"]
|
|
|
+ root_source_id_list = [
|
|
|
+ urllib.parse.parse_qs(
|
|
|
+ urllib.parse.unquote(i['path'])
|
|
|
+ )['rootSourceId'][0]
|
|
|
+ for i in mini_program_info
|
|
|
+ ]
|
|
|
+ root_source_id_list = json.dumps(root_source_id_list)
|
|
|
+ try:
|
|
|
+ self.save_data_to_database(
|
|
|
+ article=(gh_id, account_name, app_msg_id, title, '9', create_time, update_time, item_index, content_url, show_view_count, wx_sn, title_md5, user_group_id, channel_content_id, root_source_id_list, publish_timestamp)
|
|
|
+ )
|
|
|
+ except Exception as e:
|
|
|
+ self.update_article_read_cnt(wx_sn, show_view_count)
|
|
|
+ except Exception as e:
|
|
|
+ print(e)
|
|
|
+
|
|
|
+ def deal(self):
|
|
|
+ account_id_list = self.get_group_server_accounts()
|
|
|
+ for account_id in account_id_list:
|
|
|
+ publish_articles = tqdm(self.get_stat_published_articles(account_id), desc=f"<crawling> {account_id}")
|
|
|
+ self.process_each_account_data(publish_articles)
|
|
|
+
|
|
|
+
|
|
|
+if __name__ == '__main__':
|
|
|
+ SaveFwhDataToDatabase().deal()
|
|
|
+
|