Ver código fonte

Merge branch '2025-06-09-add-server-accounts' of luojunhui/LongArticlesJob into master

luojunhui 4 meses atrás
pai
commit
286d9f3a71
3 arquivos alterados com 353 adições e 0 exclusões
  1. 12 0
      fwh_data_manager.py
  2. 26 0
      sh/run_fwh_data_manager.sh
  3. 315 0
      tasks/data_tasks/fwh_data_recycle.py

+ 12 - 0
fwh_data_manager.py

@@ -0,0 +1,12 @@
+from tasks.data_tasks.fwh_data_recycle import FwhGroupPublishRecordManager
+from tasks.data_tasks.fwh_data_recycle import SaveFwhDataToDatabase
+
+
+if __name__ == '__main__':
+    # 1. 从 aigc 获取数据
+    fwh_group_publish_record_manager = FwhGroupPublishRecordManager()
+    fwh_group_publish_record_manager.deal()
+
+    # 2. 保存数据到数据库
+    save_fwh_data_to_database = SaveFwhDataToDatabase()
+    save_fwh_data_to_database.deal()

+ 26 - 0
sh/run_fwh_data_manager.sh

@@ -0,0 +1,26 @@
+#!/bin/bash
+
+# 获取当前日期,格式为 YYYY-MM-DD
+CURRENT_DATE=$(date +%F)
+
+# 日志文件路径,包含日期
+LOG_FILE="/root/luojunhui/logs/fwh_data_manage_task_log_$CURRENT_DATE.txt"
+
+# 重定向整个脚本的输出到带日期的日志文件
+exec >> "$LOG_FILE" 2>&1
+if pgrep -f "python3 fwh_data_manager.py" > /dev/null
+then
+    echo "$(date '+%Y-%m-%d %H:%M:%S') - fwh_data_manager.py is running"
+else
+    echo "$(date '+%Y-%m-%d %H:%M:%S') - trying to restart fwh_data_manager.py"
+    # 切换到指定目录
+    cd /root/luojunhui/LongArticlesJob
+
+    # 激活 Conda 环境
+    source /root/miniconda3/etc/profile.d/conda.sh
+    conda activate tasks
+
+    # 在后台运行 Python 脚本并重定向日志输出
+    nohup python3 fwh_data_manager.py >> "${LOG_FILE}" 2>&1 &
+    echo "$(date '+%Y-%m-%d %H:%M:%S') - successfully restarted fwh_data_manager.py"
+fi

+ 315 - 0
tasks/data_tasks/fwh_data_recycle.py

@@ -0,0 +1,315 @@
+import json
+import time
+import urllib.parse
+from tqdm import tqdm
+
+from pymysql.cursors import DictCursor
+
+from applications.db import DatabaseConnector
+from applications.utils import str_to_md5
+from cold_start.crawler.wechat import get_article_detail
+from config import denet_config, long_articles_config, piaoquan_crawler_config
+
+
+class FwhDataRecycle:
+    RECYCLE_INIT_STATUS = 0
+    RECYCLE_PROCESSING_STATUS = 1
+    RECYCLE_SUCCESS_STATUS = 2
+    RECYCLE_FAILED_STATUS = 99
+
+    PUBLISH_SUCCESS_STATUS = 2
+
+    STAT_PERIOD = 2 * 24 * 3600
+
+    def __init__(self):
+        self.denet_client = DatabaseConnector(denet_config)
+        self.denet_client.connect()
+
+        self.long_articles_client = DatabaseConnector(long_articles_config)
+        self.long_articles_client.connect()
+
+        self.piaoquan_client = DatabaseConnector(piaoquan_crawler_config)
+        self.piaoquan_client.connect()
+
+    def get_group_server_accounts(self):
+        fetch_query = f"""
+            select gzh_id from article_gzh_developer;
+        """
+        fetch_response = self.piaoquan_client.fetch(fetch_query, cursor_type=DictCursor)
+        gh_id_list = [i["gzh_id"] for i in fetch_response]
+        # gh_id_list = ['gh_5e543853d8f0']
+        return gh_id_list
+
+
+class FwhGroupPublishRecordManager(FwhDataRecycle):
+
+    def get_published_articles(self):
+        fetch_query = f"""
+            select id, publish_content_id, gh_id, user_group_id
+            from long_articles_group_send_result
+            where status = %s and recycle_status = %s;
+        """
+        fetch_response = self.long_articles_client.fetch(
+            query=fetch_query,
+            cursor_type=DictCursor,
+            params=(self.PUBLISH_SUCCESS_STATUS, self.RECYCLE_INIT_STATUS),
+        )
+        return fetch_response
+
+    def get_article_url_from_aigc_system(self, publish_content_id, user_group_id):
+        sql = f"""
+            select t1.publish_stage_url
+            from publish_content_stage_url t1
+            left join publish_content t2 on t1.publish_content_id = t2.id
+            where t1.publish_content_id = %s and t1.user_group_id = %s;
+        """
+        article_info = self.denet_client.fetch(
+            sql,
+            cursor_type=DictCursor,
+            params=(publish_content_id, user_group_id),
+        )
+        if article_info:
+            return article_info[0]
+        else:
+            return None
+
+    def update_recycle_status(self, record_id, ori_status, new_status):
+        update_query = f"""
+            update long_articles_group_send_result
+            set recycle_status = %s
+            where id = %s and recycle_status = %s;
+        """
+        return self.long_articles_client.save(
+            update_query, (new_status, record_id, ori_status)
+        )
+
+    def set_article_url(self, record_id, article_url):
+        update_query = f"""
+            update long_articles_group_send_result
+            set url = %s, recycle_status = %s
+            where id = %s and recycle_status = %s;
+        """
+        return self.long_articles_client.save(
+            query=update_query,
+            params=(
+                article_url,
+                self.RECYCLE_SUCCESS_STATUS,
+                record_id,
+                self.RECYCLE_PROCESSING_STATUS,
+            ),
+        )
+
+    def deal(self):
+        publish_records = self.get_published_articles()
+        for publish_record in tqdm(publish_records):
+            publish_content_id = publish_record["publish_content_id"]
+            record_id = publish_record["id"]
+            group_id = publish_record["user_group_id"]
+            # lock
+            self.update_recycle_status(
+                record_id, self.RECYCLE_INIT_STATUS, self.RECYCLE_PROCESSING_STATUS
+            )
+
+            publish_call_back_info = self.get_article_url_from_aigc_system(
+                publish_content_id, group_id
+            )
+            if publish_call_back_info:
+                article_url = publish_call_back_info["publish_stage_url"]
+                if article_url:
+                    # set record and unlock
+                    self.set_article_url(record_id, article_url)
+                else:
+                    # unlock
+                    self.update_recycle_status(
+                        record_id,
+                        self.RECYCLE_PROCESSING_STATUS,
+                        self.RECYCLE_INIT_STATUS,
+                    )
+            else:
+                # unlock
+                self.update_recycle_status(
+                    record_id, self.RECYCLE_PROCESSING_STATUS, self.RECYCLE_INIT_STATUS
+                )
+
+
+class SaveFwhDataToDatabase(FwhDataRecycle):
+
+    def update_article_read_cnt(self, wx_sn, new_read_cnt):
+        """
+        update article read cnt
+        """
+        if new_read_cnt <= 0:
+            return 0
+
+        update_query = f"""
+            update official_articles_v2
+            set show_view_count = %s
+            where wx_sn = %s;
+        """
+        return self.piaoquan_client.save(update_query, (new_read_cnt, wx_sn))
+
+    def save_data_to_database(self, article):
+        """
+        save data to db
+        """
+        insert_query = f"""
+            insert into official_articles_v2
+            (ghId, accountName, appMsgId, title, Type, createTime, updateTime, ItemIndex, ContentUrl, show_view_count, 
+             wx_sn, title_md5, article_group, channel_content_id, root_source_id_list, publish_timestamp) 
+            values (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s);      
+        """
+        return self.piaoquan_client.save(insert_query, article)
+
+    def get_stat_published_articles(self, gh_id):
+        earliest_timestamp = int(time.time()) - self.STAT_PERIOD
+        fetch_query = f"""
+            select publish_date, account_name, gh_id, user_group_id, url, publish_timestamp
+            from long_articles_group_send_result
+            where gh_id = %s and recycle_status = %s and create_time > %s;
+        """
+        return self.long_articles_client.fetch(
+            fetch_query,
+            DictCursor,
+            (gh_id, self.RECYCLE_SUCCESS_STATUS, earliest_timestamp),
+        )
+
+    def process_each_account_data(self, account_published_article_list):
+        if not account_published_article_list:
+            return
+
+        for article in account_published_article_list:
+            account_name = article["account_name"]
+            gh_id = article["gh_id"]
+            user_group_id = article["user_group_id"]
+            url = article["url"]
+            # get article detail info with spider
+
+            try:
+                article_detail_info = get_article_detail(
+                    url, is_count=True, is_cache=False
+                )
+                time.sleep(1)
+                content_url = article_detail_info["data"]["data"]["content_link"]
+                app_msg_id = content_url.split("mid=")[-1].split("&")[0]
+                wx_sn = content_url.split("sn=")[-1]
+                publish_timestamp = int(
+                    article_detail_info["data"]["data"]["publish_timestamp"] / 1000
+                )
+                create_time = publish_timestamp
+                update_time = publish_timestamp
+                item_index = article_detail_info["data"]["data"]["item_index"]
+                show_view_count = article_detail_info["data"]["data"]["view_count"]
+                title = article_detail_info["data"]["data"]["title"]
+                title_md5 = str_to_md5(title)
+                channel_content_id = article_detail_info["data"]["data"][
+                    "channel_content_id"
+                ]
+                mini_program_info = article_detail_info["data"]["data"]["mini_program"]
+                root_source_id_list = [
+                    urllib.parse.parse_qs(urllib.parse.unquote(i["path"]))[
+                        "rootSourceId"
+                    ][0]
+                    for i in mini_program_info
+                ]
+                root_source_id_list = json.dumps(root_source_id_list)
+                try:
+                    self.save_data_to_database(
+                        article=(
+                            gh_id,
+                            account_name,
+                            app_msg_id,
+                            title,
+                            "9",
+                            create_time,
+                            update_time,
+                            item_index,
+                            content_url,
+                            show_view_count,
+                            wx_sn,
+                            title_md5,
+                            user_group_id,
+                            channel_content_id,
+                            root_source_id_list,
+                            publish_timestamp,
+                        )
+                    )
+                except Exception as e:
+                    self.update_article_read_cnt(wx_sn, show_view_count)
+            except Exception as e:
+                print(f"article {url} is not available, skip it")
+                print(e)
+
+    def deal(self):
+        account_id_list = self.get_group_server_accounts()
+        for account_id in account_id_list:
+            publish_articles = tqdm(
+                self.get_stat_published_articles(account_id),
+                desc=f"<crawling> {account_id}",
+            )
+            self.process_each_account_data(publish_articles)
+
+
+class FwhDataExportTemp(FwhDataRecycle):
+
+    def get_publish_articles(self, gh_id):
+        sql = f"""
+            -- select accountName, title, article_group, ItemIndex, show_view_count, from_unixtime(createTime, '%Y-%m-%d'), root_source_id_list
+            select accountName, ContentUrl, title, ItemIndex, from_unixtime(createTime, '%Y-%m-%d'), sum(show_view_count), group_concat(article_group) as 'group', root_source_id_list
+            from official_articles_v2
+            where from_unixtime(publish_timestamp) between '2025-06-09' and '2025-06-13'
+            and ghId = '{gh_id}' and article_group is not null
+            group by accountName, title, ItemIndex;
+        """
+        return self.piaoquan_client.fetch(query=sql, cursor_type=DictCursor)
+
+    def get_fission_info(self, root_source_id_list):
+        """
+        获取裂变信息
+        """
+        root_source_id_tuple = tuple(json.loads(root_source_id_list))
+        query = f"""
+            select sum(uv) as 'uv', sum(first_uv) as 'first_uv', sum(split_uv) as 'split_uv', sum(split0) as 'T+0_fission'
+            from changwen_data_rootsourceid 
+            where root_source_id
+            in %s;
+        """
+        return self.long_articles_client.fetch(
+            query=query, cursor_type=DictCursor, params=(root_source_id_tuple,)
+        )
+
+    def get_fans_num(self, gh_id, group_id_tuple):
+        sql = f"""
+            select count(1) as 'fans_count'
+            from article_user_group
+            where gzh_id = %s and user_group_id in %s
+            and is_delete = 0;
+        """
+        return self.piaoquan_client.fetch(
+            query=sql, cursor_type=DictCursor, params=(gh_id, group_id_tuple)
+        )
+
+    def deal(self):
+        import pandas as pd
+
+        gh_id_list = self.get_group_server_accounts()
+        L = []
+        for gh_id in gh_id_list:
+            publish_articles = self.get_publish_articles(gh_id)
+            for article in publish_articles:
+                try:
+                    group_id_tuple = tuple(article["group"].split(","))
+                    fans_count = self.get_fans_num(gh_id, group_id_tuple)[0][
+                        "fans_count"
+                    ]
+                    root_source_id_list = article["root_source_id_list"]
+                    fission_info = self.get_fission_info(root_source_id_list)
+                    article["uv"] = fission_info[0]["uv"]
+                    article["first_uv"] = fission_info[0]["first_uv"]
+                    article["split_uv"] = fission_info[0]["split_uv"]
+                    article["T+0_fission"] = fission_info[0]["T+0_fission"]
+                    article["fans_count"] = fans_count
+                    L.append(article)
+                except Exception as e:
+                    print(f"article {article['ContentUrl']} is not available, skip it")
+        df = pd.DataFrame(L)
+        df.to_csv("temp2.csv", index=False)