Sfoglia il codice sorgente

外部发文监测

luojunhui 5 mesi fa
parent
commit
624d0b1e6b

+ 1 - 1
cold_start/crawler/wechat/official_accounts_api.py

@@ -57,7 +57,7 @@ def get_article_detail(
 
 @retry(**retry_desc)
 def get_article_list_from_account(
-        account_id: str, index
+        account_id: str, index=None
 ) -> dict | None:
     target_url = f"{base_url}/blogger"
     payload = json.dumps(

+ 157 - 0
tasks/monitor_tasks/outside_gzh_articles_monitor.py

@@ -0,0 +1,157 @@
+import json
+import time
+
+from tqdm import tqdm
+from pymysql.cursors import DictCursor
+
+from applications.api import FeishuBotApi
+from applications.db import DatabaseConnector
+from cold_start.crawler.wechat import get_article_detail
+from cold_start.crawler.wechat import get_article_list_from_account
+from config import long_articles_config, denet_config
+
+
+class OutsideGzhArticlesManager:
+
+    def __init__(self):
+        self.long_articles_client = DatabaseConnector(long_articles_config)
+        self.long_articles_client.connect()
+        self.denet_client = DatabaseConnector(denet_config)
+        self.denet_client.connect()
+        self.feishu_bot_api = FeishuBotApi()
+
+    def process_illegal_article(
+        self, account_name, title, reason, publish_timestamp, account_source
+    ):
+        self.feishu_bot_api.bot(
+            title="文章违规告警",
+            detail={
+                "account_name": account_name,
+                "title": title,
+                "reason": reason,
+                "publish_timestamp": publish_timestamp,
+                "account_source": account_source,
+            },
+            env="dev"
+        )
+        return
+
+
+class OutsideGzhArticlesCollector(OutsideGzhArticlesManager):
+
+    def fetch_outside_account_list(self):
+        fetch_query = f"""
+            select 
+                t2.group_source_name as account_source, 
+                t3.name as account_name,
+                t3.gh_id as gh_id,
+                t3.status as status
+            from wx_statistics_group_source t1
+                join wx_statistics_group_source_account t2 on t2.group_source_name = t1.account_source_name
+                join publish_account t3 on t3.id = t2.account_id
+            where
+                t1.mode_type = '代运营服务号';
+        """
+        return self.denet_client.fetch(query=fetch_query, cursor_type=DictCursor)
+
+    def fetch_each_account(self, account: dict):
+        gh_id = account["gh_id"]
+        fetch_response = get_article_list_from_account(gh_id)
+        msg_list = fetch_response.get("data", {}).get("data", [])
+        if msg_list:
+            for msg in msg_list[:1]:
+                self.save_each_msg_to_db(msg, account)
+
+        else:
+            print(f"crawler failed: {account['account_name']}")
+
+    def save_each_msg_to_db(self, msg: dict, account: dict):
+        base_info = msg["AppMsg"]["BaseInfo"]
+        detail_info = msg["AppMsg"]["DetailInfo"]
+        app_msg_id = base_info["AppMsgId"]
+        create_timestamp = base_info["CreateTime"]
+        update_timestamp = base_info["UpdateTime"]
+        publish_type = base_info["Type"]
+
+        # insert each article
+        for article in detail_info[:1]:
+            link = article["ContentUrl"]
+            article_detail = get_article_detail(link)
+            response_code = article_detail["code"]
+            if response_code == 25012:
+                illegal_reason = article_detail.get("msg")
+                # bot and return
+            elif response_code == 0:
+                insert_query = f"""
+                    insert into outside_gzh_account_monitor
+                    (account_name, gh_id, account_source, account_type, app_msg_id, publish_type, position, title, link, 
+                    channel_content_id, crawler_timestamp, publish_timestamp)
+                    values
+                    (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s);
+                """
+                self.long_articles_client.save(
+                    query=insert_query,
+                    params=(
+                        account["account_name"],
+                        account["gh_id"],
+                        account["account_source"],
+                        "服务号",
+                        app_msg_id,
+                        publish_type,
+                        article["ItemIndex"],
+                        article["Title"],
+                        link,
+                        article_detail["data"]["data"]["channel_content_id"],
+                        create_timestamp,
+                        update_timestamp,
+                        int(time.time()),
+                        int(article_detail["data"]["data"]["publish_timestamp"] / 1000),
+                    ),
+                )
+            else:
+                continue
+
+
+class OutsideGzhArticlesMonitor(OutsideGzhArticlesCollector):
+
+    def fetch_article_list_to_check(self):
+        publish_timestamp_threshold = int(time.time()) - 7 * 24 * 3600
+        fetch_query = f"""
+            select account_name, gh_id, account_source, account_type, title, link
+            from outside_gzh_account_monitor
+            where illegal_status = 0 and publish_timestamp > {publish_timestamp_threshold};
+        """
+        return self.long_articles_client.fetch(
+            query=fetch_query, cursor_type=DictCursor
+        )
+
+    def check_each_article(self, article: dict):
+        """
+        check each article
+        """
+        link = article["link"]
+        article_detail = get_article_detail(link)
+        response_code = article_detail["code"]
+        if response_code == 25012:
+            illegal_reason = article_detail.get("msg")
+            self.process_illegal_article(
+                account_name=article["account_name"],
+                title=article["title"],
+                reason=illegal_reason,
+                publish_timestamp=article["publish_timestamp"],
+                account_source=article["account_source"],
+            )
+        else:
+            return
+
+    def deal(self):
+        article_list = self.fetch_article_list_to_check()
+        for article in tqdm(article_list):
+            self.check_each_article(article)
+
+
+if __name__ == "__main__":
+    collector = OutsideGzhArticlesCollector()
+    accounts = collector.fetch_outside_account_list()
+    for account_ in tqdm(accounts[1:2]):
+        collector.fetch_each_account(account_)

+ 0 - 6
temp_task.py

@@ -1,6 +0,0 @@
-from tasks.ai_tasks.category_generation_task import ArticlePoolCategoryGenerationTask
-
-
-if __name__ == '__main__':
-    article_pool_category_task = ArticlePoolCategoryGenerationTask()
-    article_pool_category_task.deal()