Sfoglia il codice sorgente

outside article monitor

luojunhui 5 mesi fa
parent
commit
38c0be76b5

+ 44 - 20
applications/api/feishu_api.py

@@ -3,6 +3,15 @@ import requests
 
 
 class Feishu:
+    # 外部服务号投流监测机器人
+    outside_gzh_monitor_bot = "https://open.feishu.cn/open-apis/bot/v2/hook/0899d43d-9f65-48ce-a419-f83ac935bf59"
+
+    # 长文 daily 报警机器人
+    long_articles_bot = "https://open.feishu.cn/open-apis/bot/v2/hook/b44333f2-16c0-4cb1-af01-d135f8704410"
+
+    # 测试环境报警机器人
+    long_articles_bot_dev = "https://open.feishu.cn/open-apis/bot/v2/hook/f32c0456-847f-41f3-97db-33fcc1616bcd"
+
     def __init__(self):
         self.token = None
         self.headers = {"Content-Type": "application/json"}
@@ -153,12 +162,39 @@ class FeishuBotApi(Feishu):
         }
         return table_base
 
+    def create_feishu_bot_obj(self, title, mention, detail):
+        """
+        create feishu bot object
+        """
+        return {
+            "elements": [
+                {
+                    "tag": "div",
+                    "text": self.mention_all if mention else self.not_mention,
+                },
+                {
+                    "tag": "div",
+                    "text": {
+                        "content": json.dumps(detail, ensure_ascii=False, indent=4),
+                        "tag": "lark_md",
+                    },
+                },
+            ],
+            "header": {"title": {"content": title, "tag": "plain_text"}},
+        }
+
     # bot
     def bot(self, title, detail, mention=True, table=False, env="prod"):
-        if env == "prod":
-            url = "https://open.feishu.cn/open-apis/bot/v2/hook/b44333f2-16c0-4cb1-af01-d135f8704410"
-        else:
-            url = "https://open.feishu.cn/open-apis/bot/v2/hook/f32c0456-847f-41f3-97db-33fcc1616bcd"
+        match env:
+            case "dev":
+                url = self.long_articles_bot_dev
+            case "prod":
+                url = self.long_articles_bot
+            case "outside_gzh_monitor":
+                url = self.outside_gzh_monitor_bot
+            case _:
+                url = self.long_articles_bot_dev
+
         headers = {"Content-Type": "application/json"}
         if table:
             card = self.create_feishu_table(
@@ -168,22 +204,10 @@ class FeishuBotApi(Feishu):
                 mention=mention,
             )
         else:
-            card = {
-                "elements": [
-                    {
-                        "tag": "div",
-                        "text": self.mention_all if mention else self.not_mention,
-                    },
-                    {
-                        "tag": "div",
-                        "text": {
-                            "content": json.dumps(detail, ensure_ascii=False, indent=4),
-                            "tag": "lark_md",
-                        },
-                    },
-                ],
-                "header": {"title": {"content": title, "tag": "plain_text"}},
-            }
+            card = self.create_feishu_bot_obj(
+                title=title, mention=mention, detail=detail
+            )
+
         payload = {"msg_type": "interactive", "card": card}
         res = requests.request(
             "POST", url=url, headers=headers, data=json.dumps(payload), timeout=10

+ 19 - 4
outside_server_accounts_monitor.py

@@ -1,9 +1,24 @@
+from argparse import ArgumentParser
+
 from tasks.monitor_tasks.outside_gzh_articles_monitor import OutsideGzhArticlesCollector
 from tasks.monitor_tasks.outside_gzh_articles_monitor import OutsideGzhArticlesMonitor
 
 
 if __name__ == "__main__":
-    collector = OutsideGzhArticlesCollector()
-    collector.deal()
-    monitor = OutsideGzhArticlesMonitor()
-    monitor.deal()
+    parser = ArgumentParser()
+    parser.add_argument("--task", help="input monitor or collector")
+    args = parser.parse_args()
+    if args.task:
+        task = args.task
+    else:
+        task = "monitor"
+
+    match task:
+        case "monitor":
+            monitor = OutsideGzhArticlesMonitor()
+            monitor.deal()
+        case "collector":
+            collector = OutsideGzhArticlesCollector()
+            collector.deal()
+        case _:
+            print("task is not support")

+ 27 - 0
sh/outside_account_monitor.sh

@@ -0,0 +1,27 @@
+#!/bin/bash
+
+# 获取当前日期,格式为 YYYY-MM-DD
+CURRENT_DATE=$(date +%F)
+
+# 日志文件路径,包含日期
+LOG_FILE="/root/luojunhui/logs/outside_account_monitor_log_$CURRENT_DATE.txt"
+
+# 重定向整个脚本的输出到带日期的日志文件
+exec >> "$LOG_FILE" 2>&1
+if pgrep -f "python3 outside_server_accounts_monitor.py" > /dev/null
+then
+    echo "$(date '+%Y-%m-%d %H:%M:%S') - outside_server_accounts_monitor.py is running"
+else
+    echo "$(date '+%Y-%m-%d %H:%M:%S') - trying to restart outside_server_accounts_monitor.py"
+
+    # 切换到指定目录
+    cd /root/luojunhui/LongArticlesJob
+
+    # 激活 Conda 环境
+    source /root/miniconda3/etc/profile.d/conda.sh
+    conda activate tasks
+
+    # 在后台运行 Python 脚本并重定向日志输出
+    nohup python3 outside_server_accounts_monitor.py >> "${LOG_FILE}" 2>&1 &
+    echo "$(date '+%Y-%m-%d %H:%M:%S') - successfully restarted outside_server_accounts_monitor.py"
+fi

+ 48 - 12
tasks/monitor_tasks/outside_gzh_articles_monitor.py

@@ -19,7 +19,7 @@ class OutsideGzhArticlesManager:
         self.denet_client.connect()
         self.feishu_bot_api = FeishuBotApi()
 
-    def update_article_illegal_status(self, article_id, illegal_reason):
+    def update_article_illegal_status(self, article_id: int, illegal_reason: str) -> None:
         update_query = f"""
             update outside_gzh_account_monitor
             set illegal_status = %s, illegal_reason = %s
@@ -30,6 +30,26 @@ class OutsideGzhArticlesManager:
             params=(1, illegal_reason, article_id, 0)
         )
 
+    def whether_published_in_a_week(self, gh_id: str) -> bool:
+        """
+        判断该账号一周内是否有发文,如有,则说无需抓
+        """
+        fetch_query = f"""
+            select id, publish_timestamp from outside_gzh_account_monitor
+            where gh_id = '{gh_id}'
+            order by publish_timestamp desc
+            limit 1;
+        """
+        fetch_response = self.long_articles_client.fetch(query=fetch_query, cursor_type=DictCursor)
+        if fetch_response:
+             publish_timestamp = fetch_response[0]['publish_timestamp']
+             if publish_timestamp is None:
+                 return False
+             else:
+                 return int(time.time()) - publish_timestamp <= 5 * 24 * 3600
+        else:
+            return False
+
 
 class OutsideGzhArticlesCollector(OutsideGzhArticlesManager):
 
@@ -50,14 +70,24 @@ class OutsideGzhArticlesCollector(OutsideGzhArticlesManager):
 
     def fetch_each_account(self, account: dict):
         gh_id = account["gh_id"]
+        # 判断该账号本周是否已经发布过
+        if self.whether_published_in_a_week(gh_id):
+            return
+
         fetch_response = get_article_list_from_account(gh_id)
-        msg_list = fetch_response.get("data", {}).get("data", [])
-        if msg_list:
-            for msg in tqdm(msg_list, desc=f"insert account {account['account_name']}"):
-                self.save_each_msg_to_db(msg, account)
+        try:
+            msg_list = fetch_response.get("data", {}).get("data", [])
+            if msg_list:
+                for msg in tqdm(msg_list, desc=f"insert account {account['account_name']}"):
+                    self.save_each_msg_to_db(msg, account)
 
-        else:
-            print(f"crawler failed: {account['account_name']}")
+            else:
+                print(f"crawler failed: {account['account_name']}")
+        except Exception as e:
+            print(
+                f"crawler failed: account_name: {account['account_name']}\n"
+                f"error: {e}\n"
+            )
 
     def save_each_msg_to_db(self, msg: dict, account: dict):
         base_info = msg["AppMsg"]["BaseInfo"]
@@ -83,7 +113,7 @@ class OutsideGzhArticlesCollector(OutsideGzhArticlesManager):
                         "publish_timestamp": create_timestamp,
                         "account_source": article["account_source"]
                     },
-                    env="dev"
+                    env="outside_gzh_monitor"
                 )
 
             elif response_code == 0:
@@ -120,7 +150,9 @@ class OutsideGzhArticlesCollector(OutsideGzhArticlesManager):
             try:
                 self.fetch_each_account(account)
             except Exception as e:
-                print(e)
+               print(
+                   f"crawler failed: {account['account_name']}, error: {e}"
+               )
 
 
 class OutsideGzhArticlesMonitor(OutsideGzhArticlesManager):
@@ -155,7 +187,7 @@ class OutsideGzhArticlesMonitor(OutsideGzhArticlesManager):
                     "publish_date": article["publish_date"],
                     "account_source": article["account_source"]
                 },
-                env="dev"
+                env="outside_gzh_monitor"
             )
             article_id = article["id"]
             self.update_article_illegal_status(article_id, illegal_reason)
@@ -168,5 +200,9 @@ class OutsideGzhArticlesMonitor(OutsideGzhArticlesManager):
             try:
                 self.check_each_article(article)
             except Exception as e:
-                print(e)
-                continue
+                print(
+                    f"crawler failed: account_name: {article['account_name']}\n"
+                    f"link: {article['link']}\n"
+                    f"title: {article['title']}\n"
+                    f"error: {e}\n"
+                )