瀏覽代碼

add monitor task

luojunhui 3 天之前
父節點
當前提交
790227550e

+ 3 - 0
applications/api/__init__.py

@@ -19,6 +19,9 @@ from .elastic_search_api import AsyncElasticSearchClient
 # aliyun_log
 from .aliyun_log_api import log
 
+# aigc system api
+from .async_aigc_system_api import delete_illegal_gzh_articles
+
 feishu_robot = FeishuBotApi()
 feishu_sheet = FeishuSheetApi()
 task_apollo = AsyncApolloApi()

+ 3 - 8
applications/api/aliyun_log_api.py

@@ -1,6 +1,7 @@
 """
 @author: luojunhui
 """
+
 import datetime
 import json
 import time
@@ -8,13 +9,7 @@ import time
 from aliyun.log import LogClient, PutLogsRequest, LogItem
 
 
-def log(
-        task,
-        function,
-        status="success",
-        message=None,
-        data=None
-):
+def log(task, function, status="success", message=None, data=None):
     """
     @:param task 任务
     @:param
@@ -38,7 +33,7 @@ def log(
         (f"status", str(status)),
         (f"data", json.dumps(data, ensure_ascii=False) if data else ""),
         ("dateTime", datetime.datetime.now().__str__()),
-        ("timestamp", str(int(time.time())))
+        ("timestamp", str(int(time.time()))),
     ]
 
     log_item.set_contents(contents)

+ 19 - 0
applications/api/async_aigc_system_api.py

@@ -0,0 +1,19 @@
+from applications.utils import AsyncHttPClient
+
+
+async def delete_illegal_gzh_articles(gh_id: str, title: str):
+    """
+    Delete illegal gzh articles
+    :param gh_id: gzh id
+    :param title: article title
+    """
+    url = "http://101.37.174.139:80/articleAudit/titleDangerFindDelete"
+    payload = {
+        "title": title,
+        "ghId": gh_id,
+    }
+    headers = {"Content-Type": "application/json;charset=UTF-8"}
+    async with AsyncHttPClient(timeout=600) as client:
+        res = await client.post(url=url, headers=headers, json=payload)
+
+    return res

+ 1 - 1
applications/config/__init__.py

@@ -12,4 +12,4 @@ from .deepseek_config import deep_seek_official_model
 from .deepseek_config import deep_seek_official_api_key
 
 # es config
-from .elastic_search_mappings import es_index, es_mappings, es_settings
+from .elastic_search_mappings import es_index, es_mappings, es_settings

+ 3 - 8
applications/config/elastic_search_mappings.py

@@ -8,7 +8,7 @@ es_settings = {
             "ik_smart": {"type": "ik_smart"},
             "ik_max_word": {"type": "ik_max_word"},
         }
-    }
+    },
 }
 
 es_mappings = {
@@ -24,13 +24,8 @@ es_mappings = {
             "type": "text",
             "analyzer": "ik_max_word",
             "search_analyzer": "ik_smart",
-            "fields": {
-                "keyword": {"type": "keyword", "ignore_above": 256}
-            }
+            "fields": {"keyword": {"type": "keyword", "ignore_above": 256}},
         },
-        "created_at": {
-            "type": "date",
-            "format": "yyyy-MM-dd HH:mm:ss||epoch_millis"
-        }
+        "created_at": {"type": "date", "format": "yyyy-MM-dd HH:mm:ss||epoch_millis"},
     }
 }

+ 1 - 1
applications/crawler/wechat/__init__.py

@@ -1 +1 @@
-from .gzh_spider import *
+from .gzh_spider import *

+ 17 - 23
applications/crawler/wechat/gzh_spider.py

@@ -18,7 +18,7 @@ headers = {"Content-Type": "application/json"}
 
 @retry(**retry_desc)
 def get_article_detail(
-    article_link: str, is_count: bool=False, is_cache: bool=True
+    article_link: str, is_count: bool = False, is_cache: bool = True
 ) -> dict | None:
     """
     get official article detail
@@ -29,7 +29,7 @@ def get_article_detail(
             "content_link": article_link,
             "is_count": is_count,
             "is_ad": False,
-            "is_cache": is_cache
+            "is_cache": is_cache,
         }
     )
     try:
@@ -43,29 +43,22 @@ def get_article_detail(
             task="get_official_article_detail",
             function="get_official_article_detail",
             message=f"API请求失败: {e}",
-            data={"link": article_link}
+            data={"link": article_link},
         )
     except json.JSONDecodeError as e:
         log(
             task="get_official_article_detail",
             function="get_official_article_detail",
             message=f"响应解析失败: {e}",
-            data={"link": article_link}
+            data={"link": article_link},
         )
     return None
 
 
 @retry(**retry_desc)
-def get_article_list_from_account(
-        account_id: str, index=None
-) -> dict | None:
+def get_article_list_from_account(account_id: str, index=None) -> dict | None:
     target_url = f"{base_url}/blogger"
-    payload = json.dumps(
-        {
-            "account_id": account_id,
-            "cursor": index
-        }
-    )
+    payload = json.dumps({"account_id": account_id, "cursor": index})
     try:
         response = requests.post(
             url=target_url, headers=headers, data=payload, timeout=120
@@ -77,14 +70,14 @@ def get_article_list_from_account(
             task="get_official_account_article_list",
             function="get_official_account_article_list",
             message=f"API请求失败: {e}",
-            data={"gh_id": account_id}
+            data={"gh_id": account_id},
         )
     except json.JSONDecodeError as e:
         log(
             task="get_official_account_article_list",
             function="get_official_account_article_list",
             message=f"响应解析失败: {e}",
-            data={"gh_id": account_id}
+            data={"gh_id": account_id},
         )
     return None
 
@@ -97,7 +90,11 @@ def get_source_account_from_article(article_link) -> dict | None:
     :return:
     """
     try:
-        response = requests.get(url=article_link, headers={'User-Agent': FakeUserAgent().random}, timeout=120)
+        response = requests.get(
+            url=article_link,
+            headers={"User-Agent": FakeUserAgent().random},
+            timeout=120,
+        )
         response.raise_for_status()
         html_text = response.text
         regex_nickname = r"hit_nickname:\s*'([^']+)'"
@@ -106,10 +103,7 @@ def get_source_account_from_article(article_link) -> dict | None:
         username = re.search(regex_username, html_text)
         # 输出提取的结果
         if nickname and username:
-            return {
-                'name': nickname.group(1),
-                'gh_id': username.group(1)
-            }
+            return {"name": nickname.group(1), "gh_id": username.group(1)}
         else:
             return {}
     except requests.exceptions.RequestException as e:
@@ -117,13 +111,13 @@ def get_source_account_from_article(article_link) -> dict | None:
             task="get_source_account_from_article",
             function="get_source_account_from_article",
             message=f"API请求失败: {e}",
-            data={"link": article_link}
+            data={"link": article_link},
         )
     except json.JSONDecodeError as e:
         log(
             task="get_source_account_from_article",
             function="get_source_account_from_article",
             message=f"响应解析失败: {e}",
-            data={"link": article_link}
+            data={"link": article_link},
         )
-    return None
+    return None

+ 3 - 3
applications/database/mysql_pools.py

@@ -14,10 +14,10 @@ class DatabaseManager:
     async def init_pools(self):
         # 从配置获取数据库配置,也可以直接在这里配置
         self.databases = {
-            "aigc_db_pool": aigc_db_config,
-            "long_video_db_pool": long_video_db_config,
+            "aigc": aigc_db_config,
+            "long_video": long_video_db_config,
             "long_articles": long_articles_db_config,
-            "piaoquan_crawler_db": piaoquan_crawler_db_config,
+            "piaoquan_crawler": piaoquan_crawler_db_config,
         }
 
         for db_name, config in self.databases.items():

+ 13 - 17
applications/service/get_cover.py

@@ -14,12 +14,12 @@ class GetCoverService(Response):
 
     async def montage_cover(self, oss_key, pool_name):
         match pool_name:
-            case "aigc_db_pool":
+            case "aigc":
                 if oss_key.startswith("http"):
                     return oss_key + self.suffix
 
                 return self.aigc_prefix + oss_key + self.suffix
-            case "long_video_db_pool":
+            case "long_video":
                 if oss_key.startswith("http"):
                     return oss_key + self.suffix
                 return self.pq_prefix + oss_key + self.suffix
@@ -29,7 +29,7 @@ class GetCoverService(Response):
     async def fetch_cover_info(self, pool_name, channel_content_id: str):
 
         match pool_name:
-            case "aigc_db_pool":
+            case "aigc":
                 fetch_response, sql_error = await fetch_aigc_cover(
                     self.pool, channel_content_id
                 )
@@ -41,20 +41,18 @@ class GetCoverService(Response):
                 if fetch_response:
                     image_oss = fetch_response[0]["oss_object_key"]
                     if image_oss:
-                        cover = await self.montage_cover(image_oss, "aigc_db_pool")
+                        cover = await self.montage_cover(image_oss, "aigc")
                     else:
                         image_url = fetch_response[0]["image_url"]
                         if not image_url:
                             return await self.fetch_cover_info(
-                                "long_video_db_pool", channel_content_id
+                                "long_video", channel_content_id
                             )
                         else:
-                            cover = await self.montage_cover(image_url, "aigc_db_pool")
+                            cover = await self.montage_cover(image_url, "aigc")
                 else:
-                    return await self.fetch_cover_info(
-                        "long_video_db_pool", channel_content_id
-                    )
-            case "long_video_db_pool":
+                    return await self.fetch_cover_info("long_video", channel_content_id)
+            case "long_video":
                 fetch_response, sql_error = await fetch_long_video_cover(
                     self.pool, channel_content_id
                 )
@@ -65,7 +63,7 @@ class GetCoverService(Response):
                     )
                 if fetch_response:
                     image_oss = fetch_response[1]["image_path"]
-                    cover = await self.montage_cover(image_oss, "long_video_db_pool")
+                    cover = await self.montage_cover(image_oss, "long_video")
                 else:
                     return self.error_response(
                         error_code="402",
@@ -81,7 +79,7 @@ class GetCoverService(Response):
 
     async def get_cover(self, content_id: str, video_index: int, seed_video_id: str):
         if video_index == 2:
-            return await self.fetch_cover_info("long_video_db_pool", seed_video_id)
+            return await self.fetch_cover_info("long_video", seed_video_id)
 
         channel_info, sql_error = await fetch_channel_info(self.pool, content_id)
         if sql_error:
@@ -99,13 +97,11 @@ class GetCoverService(Response):
         channel_type = channel_info[0]["channel"]
         match channel_type:
             case 5:
-                return await self.fetch_cover_info("aigc_db_pool", channel_content_id)
+                return await self.fetch_cover_info("aigc", channel_content_id)
             case 6:
-                return await self.fetch_cover_info("aigc_db_pool", channel_content_id)
+                return await self.fetch_cover_info("aigc", channel_content_id)
             case 10:
-                return await self.fetch_cover_info(
-                    "long_video_db_pool", channel_content_id
-                )
+                return await self.fetch_cover_info("long_video", channel_content_id)
             case _:
                 return self.error_response(
                     error_code="403",

+ 91 - 6
applications/tasks/monitor_tasks/gzh_article_monitor.py

@@ -1,9 +1,11 @@
 import time
 import datetime
+from typing import Optional, List
 
 from tqdm import tqdm
 
 from applications.api import feishu_robot
+from applications.api import delete_illegal_gzh_articles
 from applications.crawler.wechat import get_article_detail
 from applications.crawler.wechat import get_article_list_from_account
 
@@ -14,7 +16,7 @@ class MonitorConst:
     INIT_STATUS = 0
 
     # 监测周期
-    MONITOR_CYCLE = 5 * 24 * 3600
+    MONITOR_CYCLE = 3 * 24 * 3600
 
     # article code
     ARTICLE_ILLEGAL_CODE = 25012
@@ -81,7 +83,7 @@ class OutsideGzhArticlesCollector(OutsideGzhArticlesManager):
             where
                 t1.mode_type = '代运营服务号';
         """
-        response, error = await self.pool.async_fetch(query=query, db_name="aigc_db_pool")
+        response, error = await self.pool.async_fetch(query=query, db_name="aigc")
         return response
 
     async def fetch_each_account(self, account: dict):
@@ -128,11 +130,13 @@ class OutsideGzhArticlesCollector(OutsideGzhArticlesManager):
                         "账号名称": article["account_name"],
                         "标题": article["title"],
                         "违规理由": illegal_reason,
-                        "发布日期": datetime.datetime.fromtimestamp(create_timestamp).strftime('%Y-%m-%d %H:%M:%S'),
+                        "发布日期": datetime.datetime.fromtimestamp(
+                            create_timestamp
+                        ).strftime("%Y-%m-%d %H:%M:%S"),
                         "账号合作商": article["account_source"],
                     },
                     env="outside_gzh_monitor",
-                    mention=False
+                    mention=False,
                 )
 
             elif response_code == self.ARTICLE_SUCCESS_CODE:
@@ -205,7 +209,7 @@ class OutsideGzhArticlesMonitor(OutsideGzhArticlesManager):
                     "账号合作商": article["account_source"],
                 },
                 env="outside_gzh_monitor",
-                mention=False
+                mention=False,
             )
             article_id = article["id"]
             await self.update_article_illegal_status(article_id, illegal_reason)
@@ -225,4 +229,85 @@ class OutsideGzhArticlesMonitor(OutsideGzhArticlesManager):
                     f"title: {article['title']}\n"
                     f"error: {e}\n"
                 )
-        return self.TASK_SUCCESS_CODE
+        return self.TASK_SUCCESS_CODE
+
+
+class InnerGzhArticlesMonitor(MonitorConst):
+    def __init__(self, pool):
+        self.pool = pool
+
+    async def fetch_article_list_to_check(self, run_date: str = None) -> Optional[List]:
+        """
+        :param run_date: 执行日期,格式为“%Y-%m-%d”, default None
+        """
+        if not run_date:
+            run_date = datetime.datetime.today().strftime("%Y-%m-%d")
+
+        run_timestamp = int(
+            datetime.datetime.strptime(run_date, "%Y-%m-%d").timestamp()
+        )
+        start_timestamp = run_timestamp - self.MONITOR_CYCLE
+        query = f"""
+            select ghId, accountName, title, ContentUrl, wx_sn, from_unixtime(publish_timestamp) as publish_timestamp
+            from official_articles_v2
+            where publish_timestamp >= {start_timestamp}
+            order by publish_timestamp desc;
+        """
+        response, error = await self.pool.async_fetch(
+            query=query, db_name="piaoquan_crawler"
+        )
+        if error:
+            await feishu_robot.bot(
+                title="站内微信公众号发文监测任务异常",
+                detail={"error": error, "message": "查询数据库异常"},
+            )
+            return None
+        else:
+            return response
+
+    async def check_each_article(self, article: dict):
+        gh_id, account_name, title, url, wx_sn, publish_date = article
+        try:
+            response = get_article_detail(url, is_cache=False)
+            response_code = response["code"]
+            if response_code == self.ARTICLE_ILLEGAL_CODE:
+                error_detail = article.get("msg")
+                query = f"""
+                    insert ignore into illegal_articles
+                        (gh_id, account_name, title, wx_sn, publish_date, illegal_reason)
+                    values 
+                        (%s, %s, %s, %s, %s, %s); 
+                """
+                affected_row = await self.pool.async_save(
+                    query=query,
+                    params=(
+                        gh_id,
+                        account_name,
+                        title,
+                        wx_sn,
+                        publish_date,
+                        error_detail,
+                    ),
+                )
+                if affected_row:
+                    await feishu_robot.bot(
+                        title="文章违规告警",
+                        detail={
+                            "account_name": account_name,
+                            "gh_id": gh_id,
+                            "title": title,
+                            "wx_sn": wx_sn.decode("utf-8"),
+                            "publish_date": str(publish_date),
+                            "error_detail": error_detail,
+                        },
+                        mention=False,
+                    )
+                    await delete_illegal_gzh_articles(gh_id, title)
+
+        except Exception as e:
+            print(f"crawler failed: {article['account_name']}, error: {e}")
+
+    async def deal(self):
+        article_list = await self.fetch_article_list_to_check()
+        for article in tqdm(article_list):
+            await self.check_each_article(article)

+ 13 - 3
applications/tasks/task_scheduler.py

@@ -109,7 +109,11 @@ class TaskScheduler:
                         "data": response,
                     }
                 )
-                await self.release_task(task_name=task_name, date_string=date_string, final_status=response['code'])
+                await self.release_task(
+                    task_name=task_name,
+                    date_string=date_string,
+                    final_status=response["code"],
+                )
                 return await task_schedule_response.success_response(
                     task_name=task_name, data=response
                 )
@@ -155,17 +159,23 @@ class TaskScheduler:
                 )
 
             case "outside_article_monitor":
+
                 async def background_outside_article_monitor():
                     collect_task = OutsideGzhArticlesCollector(self.db_client)
                     await collect_task.deal()
                     monitor_task = OutsideGzhArticlesMonitor(self.db_client)
                     final_status = await monitor_task.deal()
-                    await self.release_task(task_name, date_string, final_status=final_status)
+                    await self.release_task(
+                        task_name, date_string, final_status=final_status
+                    )
 
                 asyncio.create_task(background_outside_article_monitor())
                 return await task_schedule_response.success_response(
                     task_name=task_name,
-                    data={"code": 0, "message": "outside_article_monitor started background"}
+                    data={
+                        "code": 0,
+                        "message": "outside_article_monitor started background",
+                    },
                 )
 
             case _:

+ 0 - 1
applications/utils/common.py

@@ -183,4 +183,3 @@ def days_remaining_in_month():
     remaining_days = (last_day_of_month - today).days
 
     return remaining_days
-

+ 3 - 3
applications/utils/get_cover.py

@@ -10,7 +10,7 @@ async def fetch_channel_info(pools, content_id):
         where plan_exe_id = '{content_id}';
     """
     fetch_response = await pools.async_fetch(
-        query=fetch_query, db_name="aigc_db_pool", cursor_type=DictCursor
+        query=fetch_query, db_name="aigc", cursor_type=DictCursor
     )
     return fetch_response
 
@@ -25,7 +25,7 @@ async def fetch_aigc_cover(pools, channel_content_id):
         where channel_content_id = '{channel_content_id}' and image_type = 2;
     """
     fetch_response = await pools.async_fetch(
-        query=fetch_query, db_name="aigc_db_pool", cursor_type=DictCursor
+        query=fetch_query, db_name="aigc", cursor_type=DictCursor
     )
 
     return fetch_response
@@ -41,6 +41,6 @@ async def fetch_long_video_cover(pools, channel_content_id):
         where video_id = '{channel_content_id}';
     """
     fetch_response = await pools.async_fetch(
-        query=fetch_query, db_name="long_video_db_pool", cursor_type=DictCursor
+        query=fetch_query, db_name="long_video", cursor_type=DictCursor
     )
     return fetch_response