Procházet zdrojové kódy

Merge branch '2025-05-09-monitor-outside-accounts' of luojunhui/LongArticlesJob into master

luojunhui před 2 dny
rodič
revize
291d720907

+ 44 - 20
applications/api/feishu_api.py

@@ -3,6 +3,15 @@ import requests
 
 
 class Feishu:
+    # 外部服务号投流监测机器人
+    outside_gzh_monitor_bot = "https://open.feishu.cn/open-apis/bot/v2/hook/0899d43d-9f65-48ce-a419-f83ac935bf59"
+
+    # 长文 daily 报警机器人
+    long_articles_bot = "https://open.feishu.cn/open-apis/bot/v2/hook/b44333f2-16c0-4cb1-af01-d135f8704410"
+
+    # 测试环境报警机器人
+    long_articles_bot_dev = "https://open.feishu.cn/open-apis/bot/v2/hook/f32c0456-847f-41f3-97db-33fcc1616bcd"
+
     def __init__(self):
         self.token = None
         self.headers = {"Content-Type": "application/json"}
@@ -153,12 +162,39 @@ class FeishuBotApi(Feishu):
         }
         return table_base
 
+    def create_feishu_bot_obj(self, title, mention, detail):
+        """
+        create feishu bot object
+        """
+        return {
+            "elements": [
+                {
+                    "tag": "div",
+                    "text": self.mention_all if mention else self.not_mention,
+                },
+                {
+                    "tag": "div",
+                    "text": {
+                        "content": json.dumps(detail, ensure_ascii=False, indent=4),
+                        "tag": "lark_md",
+                    },
+                },
+            ],
+            "header": {"title": {"content": title, "tag": "plain_text"}},
+        }
+
     # bot
     def bot(self, title, detail, mention=True, table=False, env="prod"):
-        if env == "prod":
-            url = "https://open.feishu.cn/open-apis/bot/v2/hook/b44333f2-16c0-4cb1-af01-d135f8704410"
-        else:
-            url = "https://open.feishu.cn/open-apis/bot/v2/hook/f32c0456-847f-41f3-97db-33fcc1616bcd"
+        match env:
+            case "dev":
+                url = self.long_articles_bot_dev
+            case "prod":
+                url = self.long_articles_bot
+            case "outside_gzh_monitor":
+                url = self.outside_gzh_monitor_bot
+            case _:
+                url = self.long_articles_bot_dev
+
         headers = {"Content-Type": "application/json"}
         if table:
             card = self.create_feishu_table(
@@ -168,22 +204,10 @@ class FeishuBotApi(Feishu):
                 mention=mention,
             )
         else:
-            card = {
-                "elements": [
-                    {
-                        "tag": "div",
-                        "text": self.mention_all if mention else self.not_mention,
-                    },
-                    {
-                        "tag": "div",
-                        "text": {
-                            "content": json.dumps(detail, ensure_ascii=False, indent=4),
-                            "tag": "lark_md",
-                        },
-                    },
-                ],
-                "header": {"title": {"content": title, "tag": "plain_text"}},
-            }
+            card = self.create_feishu_bot_obj(
+                title=title, mention=mention, detail=detail
+            )
+
         payload = {"msg_type": "interactive", "card": card}
         res = requests.request(
             "POST", url=url, headers=headers, data=json.dumps(payload), timeout=10

+ 1 - 1
cold_start/crawler/wechat/official_accounts_api.py

@@ -57,7 +57,7 @@ def get_article_detail(
 
 @retry(**retry_desc)
 def get_article_list_from_account(
-        account_id: str, index
+        account_id: str, index=None
 ) -> dict | None:
     target_url = f"{base_url}/blogger"
     payload = json.dumps(

+ 31 - 0
outside_server_accounts_monitor.py

@@ -0,0 +1,31 @@
+from argparse import ArgumentParser
+
+from tasks.monitor_tasks.outside_gzh_articles_monitor import OutsideGzhArticlesCollector
+from tasks.monitor_tasks.outside_gzh_articles_monitor import OutsideGzhArticlesMonitor
+
+
+if __name__ == "__main__":
+    parser = ArgumentParser()
+    parser.add_argument("--task", help="input monitor or collector")
+    args = parser.parse_args()
+    if args.task:
+        task = args.task
+        match task:
+            case "monitor":
+                monitor = OutsideGzhArticlesMonitor()
+                monitor.deal()
+            case "collector":
+                collector = OutsideGzhArticlesCollector()
+                collector.deal()
+            case _:
+                print("task is not support")
+    else:
+        # first collect data
+        collector = OutsideGzhArticlesCollector()
+        collector.deal()
+
+        # then monitor each article
+        monitor = OutsideGzhArticlesMonitor()
+        monitor.deal()
+
+

+ 27 - 0
sh/outside_account_monitor.sh

@@ -0,0 +1,27 @@
+#!/bin/bash
+
+# 获取当前日期,格式为 YYYY-MM-DD
+CURRENT_DATE=$(date +%F)
+
+# 日志文件路径,包含日期
+LOG_FILE="/root/luojunhui/logs/outside_account_monitor_log_$CURRENT_DATE.txt"
+
+# 重定向整个脚本的输出到带日期的日志文件
+exec >> "$LOG_FILE" 2>&1
+if pgrep -f "python3 outside_server_accounts_monitor.py" > /dev/null
+then
+    echo "$(date '+%Y-%m-%d %H:%M:%S') - outside_server_accounts_monitor.py is running"
+else
+    echo "$(date '+%Y-%m-%d %H:%M:%S') - trying to restart outside_server_accounts_monitor.py"
+
+    # 切换到指定目录
+    cd /root/luojunhui/LongArticlesJob
+
+    # 激活 Conda 环境
+    source /root/miniconda3/etc/profile.d/conda.sh
+    conda activate tasks
+
+    # 在后台运行 Python 脚本并重定向日志输出
+    nohup python3 outside_server_accounts_monitor.py >> "${LOG_FILE}" 2>&1 &
+    echo "$(date '+%Y-%m-%d %H:%M:%S') - successfully restarted outside_server_accounts_monitor.py"
+fi

+ 230 - 0
tasks/monitor_tasks/outside_gzh_articles_monitor.py

@@ -0,0 +1,230 @@
+import time
+import datetime
+
+from tqdm import tqdm
+from pymysql.cursors import DictCursor
+
+from applications.api import FeishuBotApi
+from applications.db import DatabaseConnector
+from cold_start.crawler.wechat import get_article_detail
+from cold_start.crawler.wechat import get_article_list_from_account
+from config import long_articles_config, denet_config
+
+
+class Const:
+    # 文章违规状态
+    ILLEGAL_STATUS = 1
+    INIT_STATUS = 0
+
+    # 监测周期
+    MONITOR_CYCLE = 5 * 60 * 60 * 24
+
+    # Article Code
+    ILLEGAL_CODE = 25012
+    SUCCESS_CODE = 0
+
+
+class OutsideGzhArticlesManager(Const):
+
+    def __init__(self):
+        self.long_articles_client = DatabaseConnector(long_articles_config)
+        self.long_articles_client.connect()
+        self.denet_client = DatabaseConnector(denet_config)
+        self.denet_client.connect()
+        self.feishu_bot_api = FeishuBotApi()
+
+    def update_article_illegal_status(
+        self, article_id: int, illegal_reason: str
+    ) -> None:
+        update_query = f"""
+            update outside_gzh_account_monitor
+            set illegal_status = %s, illegal_reason = %s
+            where id = %s and illegal_status = %s
+        """
+        self.long_articles_client.save(
+            query=update_query,
+            params=(self.ILLEGAL_STATUS, illegal_reason, article_id, self.INIT_STATUS),
+        )
+
+    def whether_published_in_a_week(self, gh_id: str) -> bool:
+        """
+        判断该账号一周内是否有发文,如有,则说无需抓
+        """
+        fetch_query = f"""
+            select id, publish_timestamp from outside_gzh_account_monitor
+            where gh_id = '{gh_id}'
+            order by publish_timestamp desc
+            limit 1;
+        """
+        fetch_response = self.long_articles_client.fetch(
+            query=fetch_query, cursor_type=DictCursor
+        )
+        if fetch_response:
+            publish_timestamp = fetch_response[0]["publish_timestamp"]
+            if publish_timestamp is None:
+                return False
+            else:
+                return int(time.time()) - publish_timestamp <= self.MONITOR_CYCLE
+        else:
+            return False
+
+
+class OutsideGzhArticlesCollector(OutsideGzhArticlesManager):
+
+    def fetch_outside_account_list(self):
+        fetch_query = f"""
+            select 
+                t2.group_source_name as account_source, 
+                t3.name as account_name,
+                t3.gh_id as gh_id,
+                t3.status as status
+            from wx_statistics_group_source t1
+                join wx_statistics_group_source_account t2 on t2.group_source_name = t1.account_source_name
+                join publish_account t3 on t3.id = t2.account_id
+            where
+                t1.mode_type = '代运营服务号';
+        """
+        return self.denet_client.fetch(query=fetch_query, cursor_type=DictCursor)
+
+    def fetch_each_account(self, account: dict):
+        gh_id = account["gh_id"]
+        # 判断该账号本周是否已经发布过
+        if self.whether_published_in_a_week(gh_id):
+            return
+
+        fetch_response = get_article_list_from_account(gh_id)
+        try:
+            msg_list = fetch_response.get("data", {}).get("data", [])
+            if msg_list:
+                for msg in tqdm(
+                    msg_list, desc=f"insert account {account['account_name']}"
+                ):
+                    self.save_each_msg_to_db(msg, account)
+
+            else:
+                print(f"crawler failed: {account['account_name']}")
+        except Exception as e:
+            print(
+                f"crawler failed: account_name: {account['account_name']}\n"
+                f"error: {e}\n"
+            )
+
+    def save_each_msg_to_db(self, msg: dict, account: dict):
+        base_info = msg["AppMsg"]["BaseInfo"]
+        detail_info = msg["AppMsg"]["DetailInfo"]
+        app_msg_id = base_info["AppMsgId"]
+        create_timestamp = base_info["CreateTime"]
+        publish_type = base_info["Type"]
+
+        # insert each article
+        for article in detail_info:
+            link = article["ContentUrl"]
+            article_detail = get_article_detail(link)
+            response_code = article_detail["code"]
+            if response_code == self.ILLEGAL_CODE:
+                illegal_reason = article_detail.get("msg")
+                # bot and return
+                self.feishu_bot_api.bot(
+                    title="文章违规告警",
+                    detail={
+                        "账号名称": article["account_name"],
+                        "标题": article["title"],
+                        "违规理由": illegal_reason,
+                        "发布日期": datetime.datetime.fromtimestamp(create_timestamp).strftime('%Y-%m-%d %H:%M:%S'),
+                        "账号合作商": article["account_source"],
+                    },
+                    env="outside_gzh_monitor",
+                    mention=False
+                )
+
+            elif response_code == self.SUCCESS_CODE:
+                insert_query = f"""
+                    insert ignore into outside_gzh_account_monitor
+                    (account_name, gh_id, account_source, account_type, app_msg_id, publish_type, position, title, link, 
+                    channel_content_id, crawler_timestamp, publish_timestamp)
+                    values
+                    (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s);
+                """
+                self.long_articles_client.save(
+                    query=insert_query,
+                    params=(
+                        account["account_name"],
+                        account["gh_id"],
+                        account["account_source"],
+                        "服务号",
+                        app_msg_id,
+                        publish_type,
+                        article["ItemIndex"],
+                        article["Title"],
+                        link,
+                        article_detail["data"]["data"]["channel_content_id"],
+                        int(time.time()),
+                        int(article_detail["data"]["data"]["publish_timestamp"] / 1000),
+                    ),
+                )
+            else:
+                continue
+
+    def deal(self):
+        account_list = self.fetch_outside_account_list()
+        for account in tqdm(account_list):
+            try:
+                self.fetch_each_account(account)
+            except Exception as e:
+                print(f"crawler failed: {account['account_name']}, error: {e}")
+
+
+class OutsideGzhArticlesMonitor(OutsideGzhArticlesManager):
+
+    def fetch_article_list_to_check(self):
+        publish_timestamp_threshold = int(time.time()) - self.MONITOR_CYCLE
+        fetch_query = f"""
+            select id, account_name, gh_id, account_source, account_type, 
+                title, link, from_unixtime(publish_timestamp) as publish_date
+            from outside_gzh_account_monitor
+            where illegal_status = {self.INIT_STATUS} and publish_timestamp > {publish_timestamp_threshold};
+        """
+        return self.long_articles_client.fetch(
+            query=fetch_query, cursor_type=DictCursor
+        )
+
+    def check_each_article(self, article: dict):
+        """
+        check each article
+        """
+        link = article["link"]
+        article_detail = get_article_detail(link)
+        response_code = article_detail["code"]
+        if response_code == self.ILLEGAL_CODE:
+            illegal_reason = article_detail.get("msg")
+            # illegal_reason = '测试报警功能'
+            self.feishu_bot_api.bot(
+                title="文章违规告警",
+                detail={
+                    "账号名称": article["account_name"],
+                    "标题": article["title"],
+                    "违规理由": illegal_reason,
+                    "发布日期": str(article["publish_date"]),
+                    "账号合作商": article["account_source"],
+                },
+                env="outside_gzh_monitor",
+                mention=False
+            )
+            article_id = article["id"]
+            self.update_article_illegal_status(article_id, illegal_reason)
+        else:
+            return
+
+    def deal(self):
+        article_list = self.fetch_article_list_to_check()
+        for article in tqdm(article_list):
+            try:
+                self.check_each_article(article)
+
+            except Exception as e:
+                print(
+                    f"crawler failed: account_name: {article['account_name']}\n"
+                    f"link: {article['link']}\n"
+                    f"title: {article['title']}\n"
+                    f"error: {e}\n"
+                )

+ 0 - 6
temp_task.py

@@ -1,6 +0,0 @@
-from tasks.ai_tasks.category_generation_task import ArticlePoolCategoryGenerationTask
-
-
-if __name__ == '__main__':
-    article_pool_category_task = ArticlePoolCategoryGenerationTask()
-    article_pool_category_task.deal()