Browse Source

add get_off_videos task

luojunhui 21 hours ago
parent
commit
cc5df22698

+ 13 - 0
applications/api/__init__.py

@@ -7,5 +7,18 @@ from .async_piaoquan_api import change_video_audit_status
 from .async_piaoquan_api import publish_video_to_piaoquan
 from .async_piaoquan_api import fetch_piaoquan_video_list_detail
 
+# async apollo api
+from .async_apollo_api import AsyncApolloApi
+
+# deepseek api
+from .deep_seek_official_api import fetch_deepseek_completion
+
+# es_api
+from .elastic_search_api import AsyncElasticSearchClient
+
+# aliyun_log
+from .aliyun_log_api import log
+
 feishu_robot = FeishuBotApi()
 feishu_sheet = FeishuSheetApi()
+task_apollo = AsyncApolloApi()

+ 59 - 0
applications/api/aliyun_log_api.py

@@ -0,0 +1,59 @@
+"""
+@author: luojunhui
+"""
+import datetime
+import json
+import time
+
+from aliyun.log import LogClient, PutLogsRequest, LogItem
+
+
+def log(
+        task,
+        function,
+        status="success",
+        message=None,
+        data=None
+):
+    """
+    @:param task 任务
+    @:param
+    :return:
+    """
+    if data is None:
+        data = {}
+    accessKeyId = "LTAIP6x1l3DXfSxm"
+    accessKey = "KbTaM9ars4OX3PMS6Xm7rtxGr1FLon"
+    project = "changwen-alg"
+    log_store = "long_articles_job"
+    endpoint = "cn-hangzhou.log.aliyuncs.com"
+    # 创建 LogClient 实例
+    client = LogClient(endpoint, accessKeyId, accessKey)
+    log_group = []
+    log_item = LogItem()
+    contents = [
+        (f"task", str(task)),
+        (f"function", str(function)),
+        (f"message", str(message)),
+        (f"status", str(status)),
+        (f"data", json.dumps(data, ensure_ascii=False) if data else ""),
+        ("dateTime", datetime.datetime.now().__str__()),
+        ("timestamp", str(int(time.time())))
+    ]
+
+    log_item.set_contents(contents)
+    log_group.append(log_item)
+    # 写入日志
+    request = PutLogsRequest(
+        project=project,
+        logstore=log_store,
+        topic="",
+        source="",
+        logitems=log_group,
+        compress=False,
+    )
+    try:
+        client.put_logs(request)
+    except Exception as e:
+        print("日志失败")
+        print(e)

+ 30 - 0
applications/api/async_apollo_api.py

@@ -0,0 +1,30 @@
+import json
+from typing import Optional, Dict
+from applications.utils import AsyncApolloClient
+
+
+class AsyncApolloApi:
+    def __init__(self, app_id="LongArticlesJob", env="pre"):
+        match env:
+            case "pre":
+                config_server_url = "http://preapolloconfig-internal.piaoquantv.com/"
+            case "dev":
+                config_server_url = "https://devapolloconfig-internal.piaoquantv.com/"
+            case "prod":
+                config_server_url = "https://apolloconfig-internal.piaoquantv.com/"
+            case _:
+                raise ValueError("env must be 'pre' or 'dev' or 'prod'")
+
+        self.apollo_connection = AsyncApolloClient(
+            app_id=app_id, config_server_url=config_server_url
+        )
+
+    async def get_config_value(
+        self, key: str, output_type: str = "json"
+    ) -> Optional[Dict]:
+        match output_type:
+            case "json":
+                response = await self.apollo_connection.get_value(key)
+                return json.loads(response)
+            case _:
+                return await self.apollo_connection.get_value(key)

+ 53 - 0
applications/api/deep_seek_official_api.py

@@ -0,0 +1,53 @@
+"""
+@author: luojunhui
+@description: deepseek 官方api
+"""
+
+import json
+
+from typing import Dict, List, Optional
+from openai import OpenAI
+
+from applications.config import deep_seek_official_model
+from applications.config import deep_seek_official_api_key
+
+
+def fetch_deepseek_completion(
+    model: str,
+    prompt: str,
+    output_type: str = "text",
+    tool_calls: bool = False,
+    tools: List[Dict] = None,
+) -> Optional[Dict]:
+    messages = [{"role": "user", "content": prompt}]
+    kwargs = {
+        "model": deep_seek_official_model.get(model, "deepseek-chat"),
+        "messages": messages,
+    }
+
+    # add tool calls
+    if tool_calls and tools:
+        kwargs["tools"] = tools
+        kwargs["tool_choice"] = "auto"
+
+    client = OpenAI(
+        api_key=deep_seek_official_api_key, base_url="https://api.deepseek.com"
+    )
+
+    if output_type == "json":
+        kwargs["response_format"] = {"type": "json_object"}
+
+    try:
+        response = client.chat.completions.create(**kwargs)
+        choice = response.choices[0]
+
+        if output_type == "text":
+            return choice.message.content  # 只返回文本
+        elif output_type == "json":
+            return json.loads(choice.message.content)
+        else:
+            raise ValueError(f"Invalid output_type: {output_type}")
+
+    except Exception as e:
+        print(f"[ERROR] fetch_deepseek_completion failed: {e}")
+        return None

+ 61 - 0
applications/api/elastic_search_api.py

@@ -0,0 +1,61 @@
+import ssl
+
+from elasticsearch import AsyncElasticsearch
+from elasticsearch.helpers import async_bulk
+
+from applications.config import es_index
+
+
+class AsyncElasticSearchClient:
+
+    def __init__(self, index_=es_index):
+        self.password = "nkvvASQuQ0XUGRq5OLvm"
+        self.hosts = ["https://192.168.205.85:9200", "https://192.168.205.85:9300"]
+        self.ctx = ssl.create_default_context(cafile="applications/config/es_certs.crt")
+        self.es = AsyncElasticsearch(
+            self.hosts, basic_auth=("elastic", self.password), ssl_context=self.ctx
+        )
+        self.index_name = index_
+
+    async def create_index(self, settings, mappings):
+        exists = await self.es.indices.exists(index=self.index_name)
+        if exists:
+            await self.es.indices.delete(index=self.index_name)
+
+        try:
+            await self.es.indices.create(
+                index=self.index_name, settings=settings, mappings=mappings
+            )
+            print("Index created successfully")
+        except Exception as e:
+            print("fail to create index, reason:", e)
+
+    async def get_max_article_id(self):
+        response = await self.es.search(
+            index=self.index_name,
+            size=1,
+            sort="article_id:desc",
+            _source=["article_id"],
+        )
+        return response["hits"]["hits"][0]["_source"]["article_id"]
+
+    async def search(self, search_keys, size=10):
+        query = {
+            "query": {"match": {"title": search_keys}},
+            "_source": ["article_id", "title"],
+            "size": size,
+        }
+        resp = await self.es.search(index=self.index_name, body=query)
+        return [i["_source"] for i in resp["hits"]["hits"]]
+
+    async def bulk_insert(self, docs):
+        await async_bulk(self.es, docs)
+
+    async def close(self):
+        await self.es.close()
+
+    async def __aenter__(self):
+        return self
+
+    async def __aexit__(self, exc_type, exc_val, exc_tb):
+        await self.es.close()

+ 8 - 1
applications/config/__init__.py

@@ -1,8 +1,15 @@
+# mysql config
 from .mysql_config import aigc_db_config
 from .mysql_config import long_video_db_config
 from .mysql_config import long_articles_db_config
 from .mysql_config import piaoquan_crawler_db_config
 
 # aliyun log sdk config
-
 from .aliyun_log_config import aliyun_log_config
+
+# deepseek config
+from .deepseek_config import deep_seek_official_model
+from .deepseek_config import deep_seek_official_api_key
+
+# es config
+from .elastic_search_mappings import es_index, es_mappings, es_settings

+ 7 - 0
applications/config/deepseek_config.py

@@ -0,0 +1,7 @@
+# deepseek official api
+deep_seek_official_api_key = "sk-cfd2df92c8864ab999d66a615ee812c5"
+
+deep_seek_official_model = {
+    "DeepSeek-R1": "deepseek-reasoner",
+    "DeepSeek-V3": "deepseek-chat",
+}

+ 36 - 0
applications/config/elastic_search_mappings.py

@@ -0,0 +1,36 @@
+es_index = "meta_articles_v1"
+
+es_settings = {
+    "number_of_shards": 3,
+    "number_of_replicas": 1,
+    "analysis": {
+        "analyzer": {
+            "ik_smart": {"type": "ik_smart"},
+            "ik_max_word": {"type": "ik_max_word"},
+        }
+    }
+}
+
+es_mappings = {
+    "properties": {
+        "auto_id": {
+            "type": "long",
+            "doc_values": True,
+        },
+        "article_id": {"type": "long"},
+        "platform": {"type": "keyword"},
+        "out_account_id": {"type": "keyword"},
+        "title": {
+            "type": "text",
+            "analyzer": "ik_max_word",
+            "search_analyzer": "ik_smart",
+            "fields": {
+                "keyword": {"type": "keyword", "ignore_above": 256}
+            }
+        },
+        "created_at": {
+            "type": "date",
+            "format": "yyyy-MM-dd HH:mm:ss||epoch_millis"
+        }
+    }
+}

+ 31 - 0
applications/config/es_certs.crt

@@ -0,0 +1,31 @@
+-----BEGIN CERTIFICATE-----
+MIIFaTCCA1GgAwIBAgIUWHH9T8PVfiSyvT6S6NrAQ9iSLeEwDQYJKoZIhvcNAQEL
+BQAwPDE6MDgGA1UEAxMxRWxhc3RpY3NlYXJjaCBzZWN1cml0eSBhdXRvLWNvbmZp
+Z3VyYXRpb24gSFRUUCBDQTAeFw0yNTA3MDcwNzIwNTRaFw0yODA3MDYwNzIwNTRa
+MDwxOjA4BgNVBAMTMUVsYXN0aWNzZWFyY2ggc2VjdXJpdHkgYXV0by1jb25maWd1
+cmF0aW9uIEhUVFAgQ0EwggIiMA0GCSqGSIb3DQEBAQUAA4ICDwAwggIKAoICAQCb
+Y8E68+7S+hGKQX6vhyOxuCe3QyBHYlsxiSqGhi+WFx953u4SEMqrbqiyg2QquB9/
+ynjKo3Tvhn0OPjuJRytteKn9OZkVhUT1D5P6PFo0j8x1LIJZm551XRCnQUZ8jC0C
+REHy/JoKdT4YSCRIuXVTM5iM66vQ1t5Du4sb70mTygtc2DyXwgE4LkVnrHcwr2BZ
+3/O69WvF7Zd7WP93yEfUsLsAAQStaCYMeYyaY5K8UwIVcFyWKJ9lnDGbR9KmuXb9
+ipWqGw6aAYhmSs5gL+6xJ5dBpgMOqoBTvZpNniLA/phkelq9W2nAhBLFpRGRof8K
+5iKwjAN8gnBXeSVklBoL23QD5zfoVjz+5eaXWO4qP+90jbwf+vEg/duncDRONGtk
+TQd0Vr9NeO3Aye8PZsmmhKAaciaPWYyQO30omUq9kPsSUzZPu4k+CYb8qwVQCHpn
+Za19NkvERQ8hCQks08/ly5qDM+5lBxJQFQjhjtzSDQ/ybbarMmgaBxpCexiksRmP
+CQqVLW6IaLxUGEkIJqXRx8nmKUfK43vTBitOBFt5UcKob6+ikZLrqZ6xLY/jklE8
+Z1wt9I8ZdQ3L3X9EORgmQ+4KIu/JQfBdfAYtLaS6MYWhiZSaKaIhgfXiZQTO9YuW
+KrI5g+d2Yu2BYgIioLKo9LFWK1eTG2gNAGUI/+rqswIDAQABo2MwYTAdBgNVHQ4E
+FgQUab2kAtPlJHLirQvbThvIwJ7hbLwwHwYDVR0jBBgwFoAUab2kAtPlJHLirQvb
+ThvIwJ7hbLwwDwYDVR0TAQH/BAUwAwEB/zAOBgNVHQ8BAf8EBAMCAQYwDQYJKoZI
+hvcNAQELBQADggIBAF+wJ598Krfai5Br6Vq0Z1jj0JsU8Kij4t9D+89QPgI85/Mv
+zwj8xRgxx9RinKYdnzFJWrD9BITG2l3D0zcJhXfYUpq5HLP+c3zMwEMGzTLbgi70
+cpYqkTJ+g/Ah5WRYZRHJIMF6BVK6izCOO0J49eYC6AONNxG2HeeUvEL4cNnxpw8T
+NUe7v0FXe2iPLeE713h99ray0lBgI6J9QZqc/oEM47gHy+ByfWCv6Yw9qLlprppP
+taHz2VWnCAACDLzbDnYhemQDji86yrUTEdCT8at1jAwHSixgkm88nEBgxPHDuq8t
+thmiS6dELvXVUbyeWO7A/7zVde0Kndxe003OuYcX9I2IX7aIpC8sW/yY+alRhklq
+t9vF6g1qvsN69xXfW5yI5G31TYMUw/3ng0aVJfRFaXkEV2SWEZD+4sWoYC/GU7kK
+zlfaF22jTeul5qCKkN1k+i8K2lheEE3ZBC358W0RyvsrDwtXOra3VCpZ7qrez8OA
+/HeY6iISZQ7g0s209KjqOPqVGcI8B0p6KMh00AeWisU6E/wy1LNTxkf2IS9b88n6
+a3rj0TCycwhKOPTPB5pwlfbZNI00tGTFjqqi07SLqO9ZypsVkyR32G16JPJzk8Zw
+kngBZt6y9LtCMRVbyDuIDNq+fjtDjgxMI9bQXtve4bOuq8cZzcMjC6khz/Ja
+-----END CERTIFICATE-----

+ 1 - 0
applications/crawler/wechat/__init__.py

@@ -0,0 +1 @@
+from .gzh_spider import *

+ 129 - 0
applications/crawler/wechat/gzh_spider.py

@@ -0,0 +1,129 @@
+from __future__ import annotations
+
+import re
+import json
+import requests
+from fake_useragent import FakeUserAgent
+from tenacity import retry
+
+from applications.api import log
+from applications.utils import request_retry
+
+retry_desc = request_retry(retry_times=3, min_retry_delay=2, max_retry_delay=30)
+
+# url from aigc
+base_url = "http://crawler-cn.aiddit.com/crawler/wei_xin"
+headers = {"Content-Type": "application/json"}
+
+
+@retry(**retry_desc)
+def get_article_detail(
+    article_link: str, is_count: bool=False, is_cache: bool=True
+) -> dict | None:
+    """
+    get official article detail
+    """
+    target_url = f"{base_url}/detail"
+    payload = json.dumps(
+        {
+            "content_link": article_link,
+            "is_count": is_count,
+            "is_ad": False,
+            "is_cache": is_cache
+        }
+    )
+    try:
+        response = requests.post(
+            url=target_url, headers=headers, data=payload, timeout=120
+        )
+        response.raise_for_status()
+        return response.json()
+    except requests.exceptions.RequestException as e:
+        log(
+            task="get_official_article_detail",
+            function="get_official_article_detail",
+            message=f"API请求失败: {e}",
+            data={"link": article_link}
+        )
+    except json.JSONDecodeError as e:
+        log(
+            task="get_official_article_detail",
+            function="get_official_article_detail",
+            message=f"响应解析失败: {e}",
+            data={"link": article_link}
+        )
+    return None
+
+
+@retry(**retry_desc)
+def get_article_list_from_account(
+        account_id: str, index=None
+) -> dict | None:
+    target_url = f"{base_url}/blogger"
+    payload = json.dumps(
+        {
+            "account_id": account_id,
+            "cursor": index
+        }
+    )
+    try:
+        response = requests.post(
+            url=target_url, headers=headers, data=payload, timeout=120
+        )
+        response.raise_for_status()
+        return response.json()
+    except requests.exceptions.RequestException as e:
+        log(
+            task="get_official_account_article_list",
+            function="get_official_account_article_list",
+            message=f"API请求失败: {e}",
+            data={"gh_id": account_id}
+        )
+    except json.JSONDecodeError as e:
+        log(
+            task="get_official_account_article_list",
+            function="get_official_account_article_list",
+            message=f"响应解析失败: {e}",
+            data={"gh_id": account_id}
+        )
+    return None
+
+
+@retry(**retry_desc)
+def get_source_account_from_article(article_link) -> dict | None:
+    """
+    get account info from official article
+    :param article_link:
+    :return:
+    """
+    try:
+        response = requests.get(url=article_link, headers={'User-Agent': FakeUserAgent().random}, timeout=120)
+        response.raise_for_status()
+        html_text = response.text
+        regex_nickname = r"hit_nickname:\s*'([^']+)'"
+        regex_username = r"hit_username:\s*'([^']+)'"
+        nickname = re.search(regex_nickname, html_text)
+        username = re.search(regex_username, html_text)
+        # 输出提取的结果
+        if nickname and username:
+            return {
+                'name': nickname.group(1),
+                'gh_id': username.group(1)
+            }
+        else:
+            return {}
+    except requests.exceptions.RequestException as e:
+        log(
+            task="get_source_account_from_article",
+            function="get_source_account_from_article",
+            message=f"API请求失败: {e}",
+            data={"link": article_link}
+        )
+    except json.JSONDecodeError as e:
+        log(
+            task="get_source_account_from_article",
+            function="get_source_account_from_article",
+            message=f"响应解析失败: {e}",
+            data={"link": article_link}
+        )
+    return None

+ 2 - 0
applications/tasks/monitor_tasks/__init__.py

@@ -1,3 +1,5 @@
 from .kimi_balance import check_kimi_balance
 from .get_off_videos import GetOffVideos
 from .get_off_videos import CheckVideoAuditStatus
+from .gzh_article_monitor import OutsideGzhArticlesMonitor
+from .gzh_article_monitor import OutsideGzhArticlesCollector

+ 226 - 0
applications/tasks/monitor_tasks/gzh_article_monitor.py

@@ -0,0 +1,226 @@
+import time
+import datetime
+
+from tqdm import tqdm
+
+from applications.api import feishu_robot
+from applications.crawler.wechat import get_article_detail
+from applications.crawler.wechat import get_article_list_from_account
+
+
+class MonitorConst:
+    # 文章违规状态
+    ILLEGAL_STATUS = 1
+    INIT_STATUS = 0
+
+    # 监测周期
+    MONITOR_CYCLE = 5 * 24 * 3600
+
+    # article code
+    ARTICLE_ILLEGAL_CODE = 25012
+    ARTICLE_DELETE_CODE = 25005
+    ARTICLE_SUCCESS_CODE = 0
+    ARTICLE_UNKNOWN_CODE = 10000
+
+    # Task status
+    TASK_SUCCESS_CODE = 2
+    TASK_FAIL_CODE = 99
+
+
+class OutsideGzhArticlesManager(MonitorConst):
+
+    def __init__(self, pool):
+        self.pool = pool
+
+    async def update_article_illegal_status(
+        self, article_id: int, illegal_reason: str
+    ) -> None:
+        query = f"""
+            update outside_gzh_account_monitor
+            set illegal_status = %s, illegal_reason = %s
+            where id = %s and illegal_status = %s
+        """
+        await self.pool.async_save(
+            query=query,
+            params=(self.ILLEGAL_STATUS, illegal_reason, article_id, self.INIT_STATUS),
+        )
+
+    async def whether_published_in_a_week(self, gh_id: str) -> bool:
+        """
+        判断该账号一周内是否有发文,如有,则说无需抓
+        """
+        query = f"""
+            select id, publish_timestamp from outside_gzh_account_monitor
+            where gh_id = %s
+            order by publish_timestamp desc
+            limit %s;
+        """
+        response = await self.pool.async_fetch(query=query, params=(gh_id, 1))
+        if response:
+            publish_timestamp = response[0]["publish_timestamp"]
+            if publish_timestamp is None:
+                return False
+            else:
+                return int(time.time()) - publish_timestamp <= self.MONITOR_CYCLE
+        else:
+            return False
+
+
+class OutsideGzhArticlesCollector(OutsideGzhArticlesManager):
+
+    async def fetch_outside_account_list(self):
+        query = f"""
+            select 
+                t2.group_source_name as account_source, 
+                t3.name as account_name,
+                t3.gh_id as gh_id,
+                t3.status as status
+            from wx_statistics_group_source t1
+                join wx_statistics_group_source_account t2 on t2.group_source_name = t1.account_source_name
+                join publish_account t3 on t3.id = t2.account_id
+            where
+                t1.mode_type = '代运营服务号';
+        """
+        return await self.pool.async_fetch(query=query, db_name="aigc_db_pool")
+
+    async def fetch_each_account(self, account: dict):
+        gh_id = account["gh_id"]
+        # 判断该账号本周是否已经发布过
+        if await self.whether_published_in_a_week(gh_id):
+            return
+
+        fetch_response = get_article_list_from_account(gh_id)
+        try:
+            msg_list = fetch_response.get("data", {}).get("data", [])
+            if msg_list:
+                for msg in tqdm(
+                    msg_list, desc=f"insert account {account['account_name']}"
+                ):
+                    await self.save_each_msg_to_db(msg, account)
+
+            else:
+                print(f"crawler failed: {account['account_name']}")
+        except Exception as e:
+            print(
+                f"crawler failed: account_name: {account['account_name']}\n"
+                f"error: {e}\n"
+            )
+
+    async def save_each_msg_to_db(self, msg: dict, account: dict):
+        base_info = msg["AppMsg"]["BaseInfo"]
+        detail_info = msg["AppMsg"]["DetailInfo"]
+        app_msg_id = base_info["AppMsgId"]
+        create_timestamp = base_info["CreateTime"]
+        publish_type = base_info["Type"]
+
+        # insert each article
+        for article in detail_info:
+            link = article["ContentUrl"]
+            article_detail = get_article_detail(link)
+            response_code = article_detail["code"]
+            if response_code == self.ARTICLE_ILLEGAL_CODE:
+                illegal_reason = article_detail.get("msg")
+                # bot and return
+                feishu_robot.bot(
+                    title="文章违规告警",
+                    detail={
+                        "账号名称": article["account_name"],
+                        "标题": article["title"],
+                        "违规理由": illegal_reason,
+                        "发布日期": datetime.datetime.fromtimestamp(create_timestamp).strftime('%Y-%m-%d %H:%M:%S'),
+                        "账号合作商": article["account_source"],
+                    },
+                    env="outside_gzh_monitor",
+                    mention=False
+                )
+
+            elif response_code == self.ARTICLE_SUCCESS_CODE:
+                insert_query = f"""
+                    insert ignore into outside_gzh_account_monitor
+                    (account_name, gh_id, account_source, account_type, app_msg_id, publish_type, position, title, link, 
+                    channel_content_id, crawler_timestamp, publish_timestamp)
+                    values
+                    (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s);
+                """
+                await self.pool.async_save(
+                    query=insert_query,
+                    params=(
+                        account["account_name"],
+                        account["gh_id"],
+                        account["account_source"],
+                        "服务号",
+                        app_msg_id,
+                        publish_type,
+                        article["ItemIndex"],
+                        article["Title"],
+                        link,
+                        article_detail["data"]["data"]["channel_content_id"],
+                        int(time.time()),
+                        int(article_detail["data"]["data"]["publish_timestamp"] / 1000),
+                    ),
+                )
+            else:
+                continue
+
+    async def deal(self):
+        account_list = await self.fetch_outside_account_list()
+        for account in tqdm(account_list):
+            try:
+                await self.fetch_each_account(account)
+            except Exception as e:
+                print(f"crawler failed: {account['account_name']}, error: {e}")
+
+
+class OutsideGzhArticlesMonitor(OutsideGzhArticlesManager):
+
+    async def fetch_article_list_to_check(self):
+        publish_timestamp_threshold = int(time.time()) - self.MONITOR_CYCLE
+        fetch_query = f"""
+            select id, account_name, gh_id, account_source, account_type, 
+                title, link, from_unixtime(publish_timestamp) as publish_date
+            from outside_gzh_account_monitor
+            where illegal_status = {self.INIT_STATUS} and publish_timestamp > {publish_timestamp_threshold};
+        """
+        return await self.pool.async_fetch(query=fetch_query)
+
+    async def check_each_article(self, article: dict):
+        """
+        check each article
+        """
+        link = article["link"]
+        article_detail = get_article_detail(link)
+        response_code = article_detail["code"]
+        if response_code == self.ARTICLE_ILLEGAL_CODE:
+            illegal_reason = article_detail.get("msg")
+            # illegal_reason = '测试报警功能'
+            feishu_robot.bot(
+                title="文章违规告警",
+                detail={
+                    "账号名称": article["account_name"],
+                    "标题": article["title"],
+                    "违规理由": illegal_reason,
+                    "发布日期": str(article["publish_date"]),
+                    "账号合作商": article["account_source"],
+                },
+                env="outside_gzh_monitor",
+                mention=False
+            )
+            article_id = article["id"]
+            await self.update_article_illegal_status(article_id, illegal_reason)
+        else:
+            return
+
+    async def deal(self):
+        article_list = await self.fetch_article_list_to_check()
+        for article in tqdm(article_list):
+            try:
+                await self.check_each_article(article)
+
+            except Exception as e:
+                print(
+                    f"crawler failed: account_name: {article['account_name']}\n"
+                    f"link: {article['link']}\n"
+                    f"title: {article['title']}\n"
+                    f"error: {e}\n"
+                )
+        return self.TASK_SUCCESS_CODE

+ 16 - 0
applications/tasks/task_scheduler.py

@@ -7,6 +7,8 @@ from applications.utils import task_schedule_response
 from applications.tasks.monitor_tasks import check_kimi_balance
 from applications.tasks.monitor_tasks import GetOffVideos
 from applications.tasks.monitor_tasks import CheckVideoAuditStatus
+from applications.tasks.monitor_tasks import OutsideGzhArticlesMonitor
+from applications.tasks.monitor_tasks import OutsideGzhArticlesCollector
 
 
 class TaskScheduler:
@@ -151,6 +153,20 @@ class TaskScheduler:
                     },
                 )
 
+            case "outside_article_monitor":
+                async def background_outside_article_monitor():
+                    collect_task = OutsideGzhArticlesCollector(self.db_client)
+                    await collect_task.deal()
+                    monitor_task = OutsideGzhArticlesMonitor(self.db_client)
+                    final_status = await monitor_task.deal()
+                    await self.release_task(task_name, date_string, final_status=final_status)
+
+                asyncio.create_task(background_outside_article_monitor())
+                return await task_schedule_response.success_response(
+                    task_name=task_name,
+                    data={"code": 0, "message": "outside_article_monitor started background"}
+                )
+
             case _:
                 await self.log_client.log(
                     contents={

+ 12 - 2
applications/utils/__init__.py

@@ -1,7 +1,17 @@
+# import async apollo client
+from .async_apollo_client import AsyncApolloClient
+
+# import async http client
+from .async_http_client import AsyncHttPClient
+
 from .get_cover import fetch_channel_info
 from .get_cover import fetch_aigc_cover
 from .get_cover import fetch_long_video_cover
-from .async_http_client import AsyncHttPClient
-from applications.utils.response import TaskScheduleResponse
+
+# server response
+from .response import TaskScheduleResponse
+
+# common
+from .common import *
 
 task_schedule_response = TaskScheduleResponse()

+ 131 - 0
applications/utils/async_apollo_client.py

@@ -0,0 +1,131 @@
+# -*- coding: utf-8 -*-
+import json
+import logging
+import socket
+import asyncio
+import aiohttp
+
+
+class AsyncApolloClient:
+    def __init__(
+        self,
+        app_id,
+        cluster="default",
+        config_server_url="http://localhost:8080",
+        timeout=35,
+        ip=None,
+    ):
+        self.config_server_url = config_server_url
+        self.appId = app_id
+        self.cluster = cluster
+        self.timeout = timeout
+        self.stopped = False
+        self.ip = ip or self._init_ip()
+
+        self._cache = {}
+        self._notification_map = {"application": -1}
+        self._stop_event = asyncio.Event()
+
+    def _init_ip(self):
+        s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
+        try:
+            s.connect(("8.8.8.8", 53))
+            return s.getsockname()[0]
+        finally:
+            s.close()
+
+    async def get_value(
+        self,
+        key,
+        default_val=None,
+        namespace="application",
+        auto_fetch_on_cache_miss=False,
+    ):
+        if namespace not in self._notification_map:
+            self._notification_map[namespace] = -1
+            logging.info(f"Add namespace '{namespace}' to local notification map")
+
+        if namespace not in self._cache:
+            self._cache[namespace] = {}
+            logging.info(f"Add namespace '{namespace}' to local cache")
+            await self._long_poll()
+
+        if key in self._cache[namespace]:
+            return self._cache[namespace][key]
+        elif auto_fetch_on_cache_miss:
+            return await self._cached_http_get(key, default_val, namespace)
+        else:
+            return default_val
+
+    async def start(self):
+        if len(self._cache) == 0:
+            await self._long_poll()
+        asyncio.create_task(self._listener())
+
+    async def stop(self):
+        logging.info("Stopping listener...")
+        self._stop_event.set()
+
+    async def _cached_http_get(self, key, default_val, namespace="application"):
+        url = f"{self.config_server_url}/configfiles/json/{self.appId}/{self.cluster}/{namespace}?ip={self.ip}"
+        async with aiohttp.ClientSession() as session:
+            async with session.get(url) as r:
+                if r.status == 200:
+                    data = await r.json()
+                    self._cache[namespace] = data
+                    logging.info(f"Updated local cache for namespace {namespace}")
+                else:
+                    data = self._cache.get(namespace, {})
+
+        return data.get(key, default_val)
+
+    async def _uncached_http_get(self, namespace="application"):
+        url = f"{self.config_server_url}/configs/{self.appId}/{self.cluster}/{namespace}?ip={self.ip}"
+        async with aiohttp.ClientSession() as session:
+            async with session.get(url) as r:
+                if r.status == 200:
+                    data = await r.json()
+                    self._cache[namespace] = data["configurations"]
+                    logging.info(
+                        f"Updated local cache for namespace {namespace} release key {data['releaseKey']}"
+                    )
+
+    async def _long_poll(self):
+        url = f"{self.config_server_url}/notifications/v2"
+        notifications = [
+            {"namespaceName": k, "notificationId": v}
+            for k, v in self._notification_map.items()
+        ]
+        params = {
+            "appId": self.appId,
+            "cluster": self.cluster,
+            "notifications": json.dumps(notifications, ensure_ascii=False),
+        }
+
+        try:
+            async with aiohttp.ClientSession() as session:
+                async with session.get(url, params=params, timeout=self.timeout) as r:
+                    if r.status == 304:
+                        logging.debug("No change.")
+                        return
+                    if r.status == 200:
+                        data = await r.json()
+                        for entry in data:
+                            ns = entry["namespaceName"]
+                            nid = entry["notificationId"]
+                            logging.info(f"{ns} has changes: notificationId={nid}")
+                            await self._uncached_http_get(ns)
+                            self._notification_map[ns] = nid
+                    else:
+                        logging.warning("Sleep due to unexpected status...")
+                        await asyncio.sleep(self.timeout)
+        except Exception as e:
+            logging.warning(f"Error during long poll: {e}")
+            await asyncio.sleep(self.timeout)
+
+    async def _listener(self):
+        logging.info("Entering listener loop...")
+        while not self._stop_event.is_set():
+            await self._long_poll()
+        logging.info("Listener stopped.")
+        self.stopped = True

+ 186 - 0
applications/utils/common.py

@@ -0,0 +1,186 @@
+"""
+@author: luojunhui
+"""
+
+import hashlib
+
+from datetime import datetime, timezone, date, timedelta
+from requests import RequestException
+from urllib.parse import urlparse, parse_qs
+from tenacity import (
+    stop_after_attempt,
+    wait_exponential,
+    retry_if_exception_type,
+)
+
+
+def str_to_md5(strings):
+    """
+    字符串转化为 md5 值
+    :param strings:
+    :return:
+    """
+    # 将字符串转换为字节
+    original_bytes = strings.encode("utf-8")
+    # 创建一个md5 hash对象
+    md5_hash = hashlib.md5()
+    # 更新hash对象,传入原始字节
+    md5_hash.update(original_bytes)
+    # 获取16进制形式的MD5哈希值
+    md5_value = md5_hash.hexdigest()
+    return md5_value
+
+
+def proxy():
+    """
+    快代理
+    """
+    # 隧道域名:端口号
+    tunnel = "j685.kdltps.com:15818"
+
+    # 用户名密码方式
+    username = "t14070979713487"
+    password = "hqwanfvy"
+    proxies = {
+        "http": "http://%(user)s:%(pwd)s@%(proxy)s/"
+        % {"user": username, "pwd": password, "proxy": tunnel},
+        "https": "http://%(user)s:%(pwd)s@%(proxy)s/"
+        % {"user": username, "pwd": password, "proxy": tunnel},
+    }
+    return proxies
+
+
+def request_retry(retry_times, min_retry_delay, max_retry_delay):
+    """
+    :param retry_times:
+    :param min_retry_delay:
+    :param max_retry_delay:
+    """
+    common_retry = dict(
+        stop=stop_after_attempt(retry_times),
+        wait=wait_exponential(min=min_retry_delay, max=max_retry_delay),
+        retry=retry_if_exception_type((RequestException, TimeoutError)),
+        reraise=True,  # 重试耗尽后重新抛出异常
+    )
+    return common_retry
+
+
+def yield_batch(data, batch_size):
+    """
+    生成批次数据
+    :param data:
+    :param batch_size:
+    :return:
+    """
+    for i in range(0, len(data), batch_size):
+        yield data[i : i + batch_size]
+
+
+def extract_root_source_id(path: str) -> dict:
+    """
+    提取path参数
+    :param path:
+    :return:
+    """
+    params = parse_qs(urlparse(path).query)
+    jump_page = params.get("jumpPage", [None])[0]
+    if jump_page:
+        params2 = parse_qs(jump_page)
+        res = {
+            "video_id": params2["pages/user-videos?id"][0],
+            "root_source_id": params2["rootSourceId"][0],
+        }
+        return res
+    else:
+        return {}
+
+
+def show_desc_to_sta(show_desc):
+
+    def decode_show_v(show_v):
+        """
+
+        :param show_v:
+        :return:
+        """
+        foo = show_v.replace("千", "e3").replace("万", "e4").replace("亿", "e8")
+        foo = eval(foo)
+        return int(foo)
+
+    def decode_show_k(show_k):
+        """
+
+        :param show_k:
+        :return:
+        """
+        this_dict = {
+            "阅读": "show_view_count",  # 文章
+            "看过": "show_view_count",  # 图文
+            "观看": "show_view_count",  # 视频
+            "赞": "show_like_count",
+            "付费": "show_pay_count",
+            "赞赏": "show_zs_count",
+        }
+        if show_k not in this_dict:
+            print(f"error from decode_show_k, show_k not found: {show_k}")
+        return this_dict.get(show_k, "show_unknown")
+
+    show_desc = show_desc.replace("+", "")
+    sta = {}
+    for show_kv in show_desc.split("\u2004\u2005"):
+        if not show_kv:
+            continue
+        show_k, show_v = show_kv.split("\u2006")
+        k = decode_show_k(show_k)
+        v = decode_show_v(show_v)
+        sta[k] = v
+    res = {
+        "show_view_count": sta.get("show_view_count", 0),
+        "show_like_count": sta.get("show_like_count", 0),
+        "show_pay_count": sta.get("show_pay_count", 0),
+        "show_zs_count": sta.get("show_zs_count", 0),
+    }
+    return res
+
+
+def generate_gzh_id(url):
+    biz = url.split("biz=")[1].split("&")[0]
+    idx = url.split("&idx=")[1].split("&")[0]
+    sn = url.split("&sn=")[1].split("&")[0]
+    url_bit = "{}-{}-{}".format(biz, idx, sn).encode()
+    md5_hash = hashlib.md5()
+    md5_hash.update(url_bit)
+    md5_value = md5_hash.hexdigest()
+    return md5_value
+
+
+def timestamp_to_str(timestamp, string_format="%Y-%m-%d %H:%M:%S") -> str:
+    """
+    :param string_format:
+    :param timestamp:
+    """
+    dt_object = (
+        datetime.utcfromtimestamp(timestamp).replace(tzinfo=timezone.utc).astimezone()
+    )
+    date_string = dt_object.strftime(string_format)
+    return date_string
+
+
+def days_remaining_in_month():
+    # 获取当前日期
+    today = date.today()
+
+    # 获取下个月的第一天
+    if today.month == 12:
+        next_month = today.replace(year=today.year + 1, month=1, day=1)
+    else:
+        next_month = today.replace(month=today.month + 1, day=1)
+
+    # 计算本月最后一天(下个月第一天减去1天)
+    last_day_of_month = next_month - timedelta(days=1)
+
+    # 计算剩余天数
+    remaining_days = (last_day_of_month - today).days
+
+    return remaining_days
+

+ 6 - 1
requirements.txt

@@ -12,4 +12,9 @@ aliyun-python-sdk-kms
 odps~=3.5.1
 apscheduler~=3.10.4
 tqdm~=4.66.6
-pyapollos~=0.1.5
+pyapollos~=0.1.5
+pyotp~=2.9.0
+elasticsearch~=8.17.2
+openai~=1.97.0
+tenacity~=9.0.0
+fake-useragent~=2.1.0