Browse Source

Merge branch 'feature/20250718-create-async-task' of Server/LongArticleTaskServer into master

luojunhui 2 days ago
parent
commit
c1ad366ce6
46 changed files with 3085 additions and 72 deletions
  1. 17 4
      README.md
  2. 2 2
      app_config.toml
  3. 0 0
      applications/__init__.py
  4. 27 0
      applications/api/__init__.py
  5. 54 0
      applications/api/aliyun_log_api.py
  6. 19 0
      applications/api/async_aigc_system_api.py
  7. 30 0
      applications/api/async_apollo_api.py
  8. 233 0
      applications/api/async_feishu_api.py
  9. 78 0
      applications/api/async_piaoquan_api.py
  10. 53 0
      applications/api/deep_seek_official_api.py
  11. 61 0
      applications/api/elastic_search_api.py
  12. 13 0
      applications/config/__init__.py
  13. 8 0
      applications/config/aliyun_log_config.py
  14. 7 0
      applications/config/deepseek_config.py
  15. 31 0
      applications/config/elastic_search_mappings.py
  16. 31 0
      applications/config/es_certs.crt
  17. 28 4
      applications/config/mysql_config.py
  18. 1 0
      applications/crawler/wechat/__init__.py
  19. 123 0
      applications/crawler/wechat/gzh_spider.py
  20. 33 26
      applications/database/mysql_pools.py
  21. 1 0
      applications/pipeline/__init__.py
  22. 3 0
      applications/pipeline/crawler_pipeline.py
  23. 128 0
      applications/pipeline/data_recycle_pipeline.py
  24. 4 0
      applications/service/__init__.py
  25. 14 18
      applications/service/get_cover.py
  26. 48 0
      applications/service/log_service.py
  27. 0 9
      applications/service/response.py
  28. 0 0
      applications/tasks/__init__.py
  29. 3 0
      applications/tasks/data_recycle_tasks/__init__.py
  30. 442 0
      applications/tasks/data_recycle_tasks/recycle_daily_publish_articles.py
  31. 1 0
      applications/tasks/llm_tasks/__init__.py
  32. 269 0
      applications/tasks/llm_tasks/process_title.py
  33. 6 0
      applications/tasks/monitor_tasks/__init__.py
  34. 183 0
      applications/tasks/monitor_tasks/get_off_videos.py
  35. 332 0
      applications/tasks/monitor_tasks/gzh_article_monitor.py
  36. 34 0
      applications/tasks/monitor_tasks/kimi_balance.py
  37. 273 0
      applications/tasks/task_scheduler.py
  38. 14 0
      applications/utils/__init__.py
  39. 131 0
      applications/utils/async_apollo_client.py
  40. 99 0
      applications/utils/async_http_client.py
  41. 185 0
      applications/utils/common.py
  42. 3 3
      applications/utils/get_cover.py
  43. 24 0
      applications/utils/response.py
  44. 7 2
      requirements.txt
  45. 15 1
      routes/blueprint.py
  46. 17 3
      task_app.py

+ 17 - 4
README.md

@@ -13,15 +13,19 @@ docker compose up -d
 
 ### 项目结构
 ```
+.
 ├── Dockerfile
 ├── LICENSE
 ├── README.md
 ├── app_config.toml
 ├── applications
+│   ├── __init__.py
 │   ├── api
-│   │   └── aliyun_log_api.py
+│   │   ├── __init__.py
+│   │   └── async_feishu_api.py
 │   ├── config
 │   │   ├── __init__.py
+│   │   ├── aliyun_log_config.py
 │   │   └── mysql_config.py
 │   ├── database
 │   │   ├── __init__.py
@@ -29,12 +33,21 @@ docker compose up -d
 │   ├── service
 │   │   ├── __init__.py
 │   │   ├── get_cover.py
-│   │   └── response.py
+│   │   └── log_service.py
+│   ├── tasks
+│   │   ├── __init__.py
+│   │   ├── monitor_tasks
+│   │   │   ├── __init__.py
+│   │   │   └── kimi_balance.py
+│   │   └── task_scheduler.py
 │   └── utils
 │       ├── __init__.py
-│       └── get_cover.py
+│       ├── async_http_client.py
+│       ├── get_cover.py
+│       └── response.py
 ├── dev
-│   └── dev.py
+│   ├── dev.py
+│   └── run_task_dev.py
 ├── docker-compose.yaml
 ├── requirements.txt
 ├── routes

+ 2 - 2
app_config.toml

@@ -1,6 +1,6 @@
 reload = true
 bind = "0.0.0.0:6060"
-workers = 4
+workers = 6
 keep_alive_timeout = 120  # 保持连接的最大秒数,根据需要调整
 graceful_timeout = 30    # 重启或停止之前等待当前工作完成的时间
-loglevel = "debug"  # 日志级别
+loglevel = "warning"  # 日志级别

+ 0 - 0
applications/__init__.py


+ 27 - 0
applications/api/__init__.py

@@ -0,0 +1,27 @@
+# feishu_api
+from .async_feishu_api import FeishuBotApi
+from .async_feishu_api import FeishuSheetApi
+
+# piaoquan api
+from .async_piaoquan_api import change_video_audit_status
+from .async_piaoquan_api import publish_video_to_piaoquan
+from .async_piaoquan_api import fetch_piaoquan_video_list_detail
+
+# async apollo api
+from .async_apollo_api import AsyncApolloApi
+
+# deepseek api
+from .deep_seek_official_api import fetch_deepseek_completion
+
+# es_api
+from .elastic_search_api import AsyncElasticSearchClient
+
+# aliyun_log
+from .aliyun_log_api import log
+
+# aigc system api
+from .async_aigc_system_api import delete_illegal_gzh_articles
+
+feishu_robot = FeishuBotApi()
+feishu_sheet = FeishuSheetApi()
+task_apollo = AsyncApolloApi()

+ 54 - 0
applications/api/aliyun_log_api.py

@@ -0,0 +1,54 @@
+"""
+@author: luojunhui
+"""
+
+import datetime
+import json
+import time
+
+from aliyun.log import LogClient, PutLogsRequest, LogItem
+
+
+def log(task, function, status="success", message=None, data=None):
+    """
+    @:param task 任务
+    @:param
+    :return:
+    """
+    if data is None:
+        data = {}
+    accessKeyId = "LTAIP6x1l3DXfSxm"
+    accessKey = "KbTaM9ars4OX3PMS6Xm7rtxGr1FLon"
+    project = "changwen-alg"
+    log_store = "long_articles_job"
+    endpoint = "cn-hangzhou.log.aliyuncs.com"
+    # 创建 LogClient 实例
+    client = LogClient(endpoint, accessKeyId, accessKey)
+    log_group = []
+    log_item = LogItem()
+    contents = [
+        (f"task", str(task)),
+        (f"function", str(function)),
+        (f"message", str(message)),
+        (f"status", str(status)),
+        (f"data", json.dumps(data, ensure_ascii=False) if data else ""),
+        ("dateTime", datetime.datetime.now().__str__()),
+        ("timestamp", str(int(time.time()))),
+    ]
+
+    log_item.set_contents(contents)
+    log_group.append(log_item)
+    # 写入日志
+    request = PutLogsRequest(
+        project=project,
+        logstore=log_store,
+        topic="",
+        source="",
+        logitems=log_group,
+        compress=False,
+    )
+    try:
+        client.put_logs(request)
+    except Exception as e:
+        print("日志失败")
+        print(e)

+ 19 - 0
applications/api/async_aigc_system_api.py

@@ -0,0 +1,19 @@
+from applications.utils import AsyncHttPClient
+
+
+async def delete_illegal_gzh_articles(gh_id: str, title: str):
+    """
+    Delete illegal gzh articles
+    :param gh_id: gzh id
+    :param title: article title
+    """
+    url = "http://101.37.174.139:80/articleAudit/titleDangerFindDelete"
+    payload = {
+        "title": title,
+        "ghId": gh_id,
+    }
+    headers = {"Content-Type": "application/json;charset=UTF-8"}
+    async with AsyncHttPClient(timeout=600) as client:
+        res = await client.post(url=url, headers=headers, json=payload)
+
+    return res

+ 30 - 0
applications/api/async_apollo_api.py

@@ -0,0 +1,30 @@
+import json
+from typing import Optional, Dict
+from applications.utils import AsyncApolloClient
+
+
+class AsyncApolloApi:
+    def __init__(self, app_id="LongArticlesJob", env="pre"):
+        match env:
+            case "pre":
+                config_server_url = "http://preapolloconfig-internal.piaoquantv.com/"
+            case "dev":
+                config_server_url = "https://devapolloconfig-internal.piaoquantv.com/"
+            case "prod":
+                config_server_url = "https://apolloconfig-internal.piaoquantv.com/"
+            case _:
+                raise ValueError("env must be 'pre' or 'dev' or 'prod'")
+
+        self.apollo_connection = AsyncApolloClient(
+            app_id=app_id, config_server_url=config_server_url
+        )
+
+    async def get_config_value(
+        self, key: str, output_type: str = "json"
+    ) -> Optional[Dict]:
+        match output_type:
+            case "json":
+                response = await self.apollo_connection.get_value(key)
+                return json.loads(response)
+            case _:
+                return await self.apollo_connection.get_value(key)

+ 233 - 0
applications/api/async_feishu_api.py

@@ -0,0 +1,233 @@
+import json
+
+import requests
+
+from applications.utils import AsyncHttPClient
+
+
+class Feishu:
+    # 服务号分组群发监测机器人
+    server_account_publish_monitor_bot = "https://open.feishu.cn/open-apis/bot/v2/hook/380fdecf-402e-4426-85b6-7d9dbd2a9f59"
+
+    # 外部服务号投流监测机器人
+    outside_gzh_monitor_bot = "https://open.feishu.cn/open-apis/bot/v2/hook/0899d43d-9f65-48ce-a419-f83ac935bf59"
+
+    # 长文 daily 报警机器人
+    long_articles_bot = "https://open.feishu.cn/open-apis/bot/v2/hook/b44333f2-16c0-4cb1-af01-d135f8704410"
+
+    # 测试环境报警机器人
+    long_articles_bot_dev = "https://open.feishu.cn/open-apis/bot/v2/hook/f32c0456-847f-41f3-97db-33fcc1616bcd"
+
+    # 长文任务报警群
+    long_articles_task_bot = "https://open.feishu.cn/open-apis/bot/v2/hook/223b3d72-f2e8-40e0-9b53-6956e0ae7158"
+
+    def __init__(self):
+        self.token = None
+        self.headers = {"Content-Type": "application/json"}
+        self.mention_all = {
+            "content": "<at id=all></at>\n",
+            "tag": "lark_md",
+        }
+        self.not_mention = {}
+
+    async def fetch_token(self):
+        url = "https://open.feishu.cn/open-apis/auth/v3/tenant_access_token/internal/"
+        post_data = {
+            "app_id": "cli_a51114cf8bf8d00c",
+            "app_secret": "cNoTAqMpsAm7mPBcpCAXFfvOzCNL27fe",
+        }
+        async with AsyncHttPClient(default_headers=self.headers) as client:
+            response = await client.post(url=url, data=post_data)
+
+        tenant_access_token = response["tenant_access_token"]
+        self.token = tenant_access_token
+
+
+class FeishuSheetApi(Feishu):
+
+    async def prepend_value(self, sheet_token, sheet_id, ranges, values):
+        insert_value_url = "https://open.feishu.cn/open-apis/sheets/v2/spreadsheets/{}/values_prepend".format(
+            sheet_token
+        )
+        headers = {
+            "Authorization": "Bearer " + self.token,
+            "contentType": "application/json; charset=utf-8",
+        }
+        body = {
+            "valueRange": {"range": "{}!{}".format(sheet_id, ranges), "values": values}
+        }
+        async with AsyncHttPClient() as client:
+            response = await client.post(
+                url=insert_value_url, json=body, headers=headers
+            )
+
+        print(response)
+
+    async def insert_value(self, sheet_token, sheet_id, ranges, values):
+        insert_value_url = (
+            "https://open.feishu.cn/open-apis/sheets/v2/spreadsheets/{}/values".format(
+                sheet_token
+            )
+        )
+        headers = {
+            "Authorization": "Bearer " + self.token,
+            "contentType": "application/json; charset=utf-8",
+        }
+        body = {
+            "valueRange": {"range": "{}!{}".format(sheet_id, ranges), "values": values}
+        }
+        async with AsyncHttPClient() as client:
+            response = await client.put(
+                url=insert_value_url, json=body, headers=headers
+            )
+
+
+class FeishuBotApi(Feishu):
+
+    @classmethod
+    def create_feishu_columns_sheet(
+        cls,
+        sheet_type,
+        sheet_name,
+        display_name,
+        width="auto",
+        vertical_align="top",
+        horizontal_align="left",
+        number_format=None,
+    ):
+        match sheet_type:
+            case "plain_text":
+                return {
+                    "name": sheet_name,
+                    "display_name": display_name,
+                    "width": width,
+                    "data_type": "text",
+                    "vertical_align": vertical_align,
+                    "horizontal_align": horizontal_align,
+                }
+
+            case "lark_md":
+                return {
+                    "name": sheet_name,
+                    "display_name": display_name,
+                    "data_type": "lark_md",
+                }
+
+            case "number":
+                return {
+                    "name": sheet_name,
+                    "display_name": display_name,
+                    "data_type": "number",
+                    "format": number_format,
+                    "width": width,
+                }
+
+            case "date":
+                return {
+                    "name": sheet_name,
+                    "display_name": display_name,
+                    "data_type": "date",
+                    "date_format": "YYYY/MM/DD",
+                }
+
+            case "options":
+                return {
+                    "name": sheet_name,
+                    "display_name": display_name,
+                    "data_type": "options",
+                }
+
+            case _:
+                return {
+                    "name": sheet_name,
+                    "display_name": display_name,
+                    "width": width,
+                    "data_type": "text",
+                    "vertical_align": vertical_align,
+                    "horizontal_align": horizontal_align,
+                }
+
+    # 表格形式
+    def create_feishu_table(self, title, columns, rows, mention):
+        table_base = {
+            "header": {
+                "template": "blue",
+                "title": {"content": title, "tag": "plain_text"},
+            },
+            "elements": [
+                self.mention_all if mention else self.not_mention,
+                {
+                    "tag": "table",
+                    "page_size": len(rows) + 1,
+                    "row_height": "low",
+                    "header_style": {
+                        "text_align": "left",
+                        "text_size": "normal",
+                        "background_style": "grey",
+                        "text_color": "default",
+                        "bold": True,
+                        "lines": 1,
+                    },
+                    "columns": columns,
+                    "rows": rows,
+                },
+            ],
+        }
+        return table_base
+
+    def create_feishu_bot_obj(self, title, mention, detail):
+        """
+        create feishu bot object
+        """
+        return {
+            "elements": [
+                {
+                    "tag": "div",
+                    "text": self.mention_all if mention else self.not_mention,
+                },
+                {
+                    "tag": "div",
+                    "text": {
+                        "content": json.dumps(detail, ensure_ascii=False, indent=4),
+                        "tag": "lark_md",
+                    },
+                },
+            ],
+            "header": {"title": {"content": title, "tag": "plain_text"}},
+        }
+
+    # bot
+    async def bot(
+        self, title, detail, mention=True, table=False, env="long_articles_task"
+    ):
+        match env:
+            case "dev":
+                url = self.long_articles_bot_dev
+            case "prod":
+                url = self.long_articles_bot
+            case "outside_gzh_monitor":
+                url = self.outside_gzh_monitor_bot
+            case "server_account_publish_monitor":
+                url = self.server_account_publish_monitor_bot
+            case "long_articles_task":
+                url = self.long_articles_task_bot
+            case _:
+                url = self.long_articles_bot_dev
+
+        headers = {"Content-Type": "application/json"}
+        if table:
+            card = self.create_feishu_table(
+                title=title,
+                columns=detail["columns"],
+                rows=detail["rows"],
+                mention=mention,
+            )
+        else:
+            card = self.create_feishu_bot_obj(
+                title=title, mention=mention, detail=detail
+            )
+
+        data = {"msg_type": "interactive", "card": card}
+        async with AsyncHttPClient() as client:
+            res = await client.post(url=url, headers=headers, json=data)
+        return res

+ 78 - 0
applications/api/async_piaoquan_api.py

@@ -0,0 +1,78 @@
+from typing import Optional, Dict, List
+
+from applications.utils import AsyncHttPClient
+
+
+async def fetch_piaoquan_video_list_detail(video_list: List[int]) -> Optional[Dict]:
+    """async fetch piaoquan video detail"""
+    url = "https://longvideoapi.piaoquantv.com/longvideoapi/openapi/video/batchSelectVideoInfo"
+    data = {"videoIdList": video_list}
+    header = {
+        "Content-Type": "application/json",
+    }
+    async with AsyncHttPClient() as client:
+        response = await client.post(url, json=data, headers=header)
+
+    return response
+
+
+async def change_video_audit_status(video_id: int, status_code: int = 5) -> Dict:
+    url = "https://admin.piaoquantv.com/manager/video/audit/v2/updateAuditStatus"
+    payload = "videoId={}&auditStatus={}&updateReasonJson=&rejectReasonJson=%5B%7B%22reason%22%3A%22%E9%95%BF%E6%96%87%E8%87%AA%E5%8A%A8%E4%B8%8B%E6%9E%B6%22%2C%22reasonId%22%3A-1%7D%5D&adminUid=206".format(
+        video_id, status_code
+    )
+    headers = {
+        "accept": "application/json",
+        "accept-language": "en,zh;q=0.9,zh-CN;q=0.8",
+        "Content-Type": "application/json",
+        "cookie": "",
+        "origin": "https://admin.piaoquantv.com",
+        "priority": "u=1, i",
+        "sec-ch-ua": '"Not/A)Brand";v="8", "Chromium";v="126", "Google Chrome";v="126"',
+        "sec-ch-ua-mobile": "?0",
+        "sec-ch-ua-platform": '"macOS"',
+        "sec-fetch-dest": "empty",
+        "sec-fetch-mode": "cors",
+        "sec-fetch-site": "same-origin",
+        "user-agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/126.0.0.0 Safari/537.36",
+    }
+    async with AsyncHttPClient() as client:
+        response = await client.post(url, data=payload, headers=headers)
+
+    return response
+
+
+async def publish_video_to_piaoquan(oss_path: str, uid: str, title: str) -> Dict:
+    url = "https://vlogapi.piaoquantv.com/longvideoapi/crawler/video/send"
+    headers = {
+        "User-Agent": "PQSpeed/486 CFNetwork/1410.1 Darwin/22.6.0",
+        "cookie": "JSESSIONID=4DEA2B5173BB9A9E82DB772C0ACDBC9F; JSESSIONID=D02C334150025222A0B824A98B539B78",
+        "referer": "http://appspeed.piaoquantv.com",
+        "token": "524a8bc871dbb0f4d4717895083172ab37c02d2f",
+        "accept-language": "zh-CN,zh-Hans;q=0.9",
+        "Content-Type": "application/json",
+    }
+    payload = {
+        "deviceToken": "9ef064f2f7869b3fd67d6141f8a899175dddc91240971172f1f2a662ef891408",
+        "fileExtensions": "MP4",
+        "loginUid": uid,
+        "networkType": "Wi-Fi",
+        "platform": "iOS",
+        "requestId": "fb972cbd4f390afcfd3da1869cd7d001",
+        "sessionId": "362290597725ce1fa870d7be4f46dcc2",
+        "subSessionId": "362290597725ce1fa870d7be4f46dcc2",
+        "title": title,
+        "token": "524a8bc871dbb0f4d4717895083172ab37c02d2f",
+        "uid": uid,
+        "versionCode": "486",
+        "versionName": "3.4.12",
+        "videoFromScene": "1",
+        "videoPath": oss_path,
+        "viewStatus": "1",
+        "appType": 888888,
+        "repeatStatus": 1,
+    }
+    async with AsyncHttPClient() as client:
+        response = await client.post(url, data=payload, headers=headers)
+
+    return response

+ 53 - 0
applications/api/deep_seek_official_api.py

@@ -0,0 +1,53 @@
+"""
+@author: luojunhui
+@description: deepseek 官方api
+"""
+
+import json
+
+from typing import Dict, List, Optional
+from openai import OpenAI
+
+from applications.config import deep_seek_official_model
+from applications.config import deep_seek_official_api_key
+
+
+def fetch_deepseek_completion(
+    model: str,
+    prompt: str,
+    output_type: str = "text",
+    tool_calls: bool = False,
+    tools: List[Dict] = None,
+) -> Optional[Dict]:
+    messages = [{"role": "user", "content": prompt}]
+    kwargs = {
+        "model": deep_seek_official_model.get(model, "deepseek-chat"),
+        "messages": messages,
+    }
+
+    # add tool calls
+    if tool_calls and tools:
+        kwargs["tools"] = tools
+        kwargs["tool_choice"] = "auto"
+
+    client = OpenAI(
+        api_key=deep_seek_official_api_key, base_url="https://api.deepseek.com"
+    )
+
+    if output_type == "json":
+        kwargs["response_format"] = {"type": "json_object"}
+
+    try:
+        response = client.chat.completions.create(**kwargs)
+        choice = response.choices[0]
+
+        if output_type == "text":
+            return choice.message.content  # 只返回文本
+        elif output_type == "json":
+            return json.loads(choice.message.content)
+        else:
+            raise ValueError(f"Invalid output_type: {output_type}")
+
+    except Exception as e:
+        print(f"[ERROR] fetch_deepseek_completion failed: {e}")
+        return None

+ 61 - 0
applications/api/elastic_search_api.py

@@ -0,0 +1,61 @@
+import ssl
+
+from elasticsearch import AsyncElasticsearch
+from elasticsearch.helpers import async_bulk
+
+from applications.config import es_index
+
+
+class AsyncElasticSearchClient:
+
+    def __init__(self, index_=es_index):
+        self.password = "nkvvASQuQ0XUGRq5OLvm"
+        self.hosts = ["https://192.168.205.85:9200", "https://192.168.205.85:9300"]
+        self.ctx = ssl.create_default_context(cafile="applications/config/es_certs.crt")
+        self.es = AsyncElasticsearch(
+            self.hosts, basic_auth=("elastic", self.password), ssl_context=self.ctx
+        )
+        self.index_name = index_
+
+    async def create_index(self, settings, mappings):
+        exists = await self.es.indices.exists(index=self.index_name)
+        if exists:
+            await self.es.indices.delete(index=self.index_name)
+
+        try:
+            await self.es.indices.create(
+                index=self.index_name, settings=settings, mappings=mappings
+            )
+            print("Index created successfully")
+        except Exception as e:
+            print("fail to create index, reason:", e)
+
+    async def get_max_article_id(self):
+        response = await self.es.search(
+            index=self.index_name,
+            size=1,
+            sort="article_id:desc",
+            _source=["article_id"],
+        )
+        return response["hits"]["hits"][0]["_source"]["article_id"]
+
+    async def search(self, search_keys, size=10):
+        query = {
+            "query": {"match": {"title": search_keys}},
+            "_source": ["article_id", "title"],
+            "size": size,
+        }
+        resp = await self.es.search(index=self.index_name, body=query)
+        return [i["_source"] for i in resp["hits"]["hits"]]
+
+    async def bulk_insert(self, docs):
+        await async_bulk(self.es, docs)
+
+    async def close(self):
+        await self.es.close()
+
+    async def __aenter__(self):
+        return self
+
+    async def __aexit__(self, exc_type, exc_val, exc_tb):
+        await self.es.close()

+ 13 - 0
applications/config/__init__.py

@@ -1,2 +1,15 @@
+# mysql config
 from .mysql_config import aigc_db_config
 from .mysql_config import long_video_db_config
+from .mysql_config import long_articles_db_config
+from .mysql_config import piaoquan_crawler_db_config
+
+# aliyun log sdk config
+from .aliyun_log_config import aliyun_log_config
+
+# deepseek config
+from .deepseek_config import deep_seek_official_model
+from .deepseek_config import deep_seek_official_api_key
+
+# es config
+from .elastic_search_mappings import es_index, es_mappings, es_settings

+ 8 - 0
applications/config/aliyun_log_config.py

@@ -0,0 +1,8 @@
+# aliyun日志配置文件
+aliyun_log_config = {
+    "endpoint": "cn-hangzhou.log.aliyuncs.com",
+    "logstore": "long_articles_job",
+    "project": "changwen-alg",
+    "access_key_id": "LTAIP6x1l3DXfSxm",
+    "access_key_secret": "KbTaM9ars4OX3PMS6Xm7rtxGr1FLon",
+}

+ 7 - 0
applications/config/deepseek_config.py

@@ -0,0 +1,7 @@
+# deepseek official api
+deep_seek_official_api_key = "sk-cfd2df92c8864ab999d66a615ee812c5"
+
+deep_seek_official_model = {
+    "DeepSeek-R1": "deepseek-reasoner",
+    "DeepSeek-V3": "deepseek-chat",
+}

+ 31 - 0
applications/config/elastic_search_mappings.py

@@ -0,0 +1,31 @@
+es_index = "meta_articles_v1"
+
+es_settings = {
+    "number_of_shards": 3,
+    "number_of_replicas": 1,
+    "analysis": {
+        "analyzer": {
+            "ik_smart": {"type": "ik_smart"},
+            "ik_max_word": {"type": "ik_max_word"},
+        }
+    },
+}
+
+es_mappings = {
+    "properties": {
+        "auto_id": {
+            "type": "long",
+            "doc_values": True,
+        },
+        "article_id": {"type": "long"},
+        "platform": {"type": "keyword"},
+        "out_account_id": {"type": "keyword"},
+        "title": {
+            "type": "text",
+            "analyzer": "ik_max_word",
+            "search_analyzer": "ik_smart",
+            "fields": {"keyword": {"type": "keyword", "ignore_above": 256}},
+        },
+        "created_at": {"type": "date", "format": "yyyy-MM-dd HH:mm:ss||epoch_millis"},
+    }
+}

+ 31 - 0
applications/config/es_certs.crt

@@ -0,0 +1,31 @@
+-----BEGIN CERTIFICATE-----
+MIIFaTCCA1GgAwIBAgIUWHH9T8PVfiSyvT6S6NrAQ9iSLeEwDQYJKoZIhvcNAQEL
+BQAwPDE6MDgGA1UEAxMxRWxhc3RpY3NlYXJjaCBzZWN1cml0eSBhdXRvLWNvbmZp
+Z3VyYXRpb24gSFRUUCBDQTAeFw0yNTA3MDcwNzIwNTRaFw0yODA3MDYwNzIwNTRa
+MDwxOjA4BgNVBAMTMUVsYXN0aWNzZWFyY2ggc2VjdXJpdHkgYXV0by1jb25maWd1
+cmF0aW9uIEhUVFAgQ0EwggIiMA0GCSqGSIb3DQEBAQUAA4ICDwAwggIKAoICAQCb
+Y8E68+7S+hGKQX6vhyOxuCe3QyBHYlsxiSqGhi+WFx953u4SEMqrbqiyg2QquB9/
+ynjKo3Tvhn0OPjuJRytteKn9OZkVhUT1D5P6PFo0j8x1LIJZm551XRCnQUZ8jC0C
+REHy/JoKdT4YSCRIuXVTM5iM66vQ1t5Du4sb70mTygtc2DyXwgE4LkVnrHcwr2BZ
+3/O69WvF7Zd7WP93yEfUsLsAAQStaCYMeYyaY5K8UwIVcFyWKJ9lnDGbR9KmuXb9
+ipWqGw6aAYhmSs5gL+6xJ5dBpgMOqoBTvZpNniLA/phkelq9W2nAhBLFpRGRof8K
+5iKwjAN8gnBXeSVklBoL23QD5zfoVjz+5eaXWO4qP+90jbwf+vEg/duncDRONGtk
+TQd0Vr9NeO3Aye8PZsmmhKAaciaPWYyQO30omUq9kPsSUzZPu4k+CYb8qwVQCHpn
+Za19NkvERQ8hCQks08/ly5qDM+5lBxJQFQjhjtzSDQ/ybbarMmgaBxpCexiksRmP
+CQqVLW6IaLxUGEkIJqXRx8nmKUfK43vTBitOBFt5UcKob6+ikZLrqZ6xLY/jklE8
+Z1wt9I8ZdQ3L3X9EORgmQ+4KIu/JQfBdfAYtLaS6MYWhiZSaKaIhgfXiZQTO9YuW
+KrI5g+d2Yu2BYgIioLKo9LFWK1eTG2gNAGUI/+rqswIDAQABo2MwYTAdBgNVHQ4E
+FgQUab2kAtPlJHLirQvbThvIwJ7hbLwwHwYDVR0jBBgwFoAUab2kAtPlJHLirQvb
+ThvIwJ7hbLwwDwYDVR0TAQH/BAUwAwEB/zAOBgNVHQ8BAf8EBAMCAQYwDQYJKoZI
+hvcNAQELBQADggIBAF+wJ598Krfai5Br6Vq0Z1jj0JsU8Kij4t9D+89QPgI85/Mv
+zwj8xRgxx9RinKYdnzFJWrD9BITG2l3D0zcJhXfYUpq5HLP+c3zMwEMGzTLbgi70
+cpYqkTJ+g/Ah5WRYZRHJIMF6BVK6izCOO0J49eYC6AONNxG2HeeUvEL4cNnxpw8T
+NUe7v0FXe2iPLeE713h99ray0lBgI6J9QZqc/oEM47gHy+ByfWCv6Yw9qLlprppP
+taHz2VWnCAACDLzbDnYhemQDji86yrUTEdCT8at1jAwHSixgkm88nEBgxPHDuq8t
+thmiS6dELvXVUbyeWO7A/7zVde0Kndxe003OuYcX9I2IX7aIpC8sW/yY+alRhklq
+t9vF6g1qvsN69xXfW5yI5G31TYMUw/3ng0aVJfRFaXkEV2SWEZD+4sWoYC/GU7kK
+zlfaF22jTeul5qCKkN1k+i8K2lheEE3ZBC358W0RyvsrDwtXOra3VCpZ7qrez8OA
+/HeY6iISZQ7g0s209KjqOPqVGcI8B0p6KMh00AeWisU6E/wy1LNTxkf2IS9b88n6
+a3rj0TCycwhKOPTPB5pwlfbZNI00tGTFjqqi07SLqO9ZypsVkyR32G16JPJzk8Zw
+kngBZt6y9LtCMRVbyDuIDNq+fjtDjgxMI9bQXtve4bOuq8cZzcMjC6khz/Ja
+-----END CERTIFICATE-----

+ 28 - 4
applications/config/mysql_config.py

@@ -6,8 +6,8 @@ aigc_db_config = {
     "password": "cyber#crawler_2023",
     "db": "aigc-admin-prod",
     "charset": "utf8mb4",
-    "min_size": 5,
-    "max_size": 20,
+    "minsize": 5,
+    "maxsize": 20,
 }
 
 # long_video_db_config
@@ -18,6 +18,30 @@ long_video_db_config = {
     "password": "wx2016_longvideoP@assword1234",
     "db": "longvideo",
     "charset": "utf8mb4",
-    "min_size": 5,
-    "max_size": 20,
+    "minsize": 5,
+    "maxsize": 20,
+}
+
+# 长文数据库连接配置
+long_articles_db_config = {
+    "host": "rm-bp14529nwwcw75yr1ko.mysql.rds.aliyuncs.com",
+    "port": 3306,
+    "user": "changwen_admin",
+    "password": "changwen@123456",
+    "db": "long_articles",
+    "charset": "utf8mb4",
+    "minsize": 5,
+    "maxsize": 20,
+}
+
+# 票圈爬虫库数据库配置
+piaoquan_crawler_db_config = {
+    "host": "rm-bp1159bu17li9hi94.mysql.rds.aliyuncs.com",
+    "port": 3306,
+    "user": "crawler",
+    "password": "crawler123456@",
+    "db": "piaoquan-crawler",
+    "charset": "utf8mb4",
+    "minsize": 5,
+    "maxsize": 20,
 }

+ 1 - 0
applications/crawler/wechat/__init__.py

@@ -0,0 +1 @@
+from .gzh_spider import *

+ 123 - 0
applications/crawler/wechat/gzh_spider.py

@@ -0,0 +1,123 @@
+from __future__ import annotations
+
+import re
+import json
+import requests
+from fake_useragent import FakeUserAgent
+from tenacity import retry
+
+from applications.api import log
+from applications.utils import request_retry
+
+retry_desc = request_retry(retry_times=3, min_retry_delay=2, max_retry_delay=30)
+
+# url from aigc
+base_url = "http://crawler-cn.aiddit.com/crawler/wei_xin"
+headers = {"Content-Type": "application/json"}
+
+
+@retry(**retry_desc)
+def get_article_detail(
+    article_link: str, is_count: bool = False, is_cache: bool = True
+) -> dict | None:
+    """
+    get official article detail
+    """
+    target_url = f"{base_url}/detail"
+    payload = json.dumps(
+        {
+            "content_link": article_link,
+            "is_count": is_count,
+            "is_ad": False,
+            "is_cache": is_cache,
+        }
+    )
+    try:
+        response = requests.post(
+            url=target_url, headers=headers, data=payload, timeout=120
+        )
+        response.raise_for_status()
+        return response.json()
+    except requests.exceptions.RequestException as e:
+        log(
+            task="get_official_article_detail",
+            function="get_official_article_detail",
+            message=f"API请求失败: {e}",
+            data={"link": article_link},
+        )
+    except json.JSONDecodeError as e:
+        log(
+            task="get_official_article_detail",
+            function="get_official_article_detail",
+            message=f"响应解析失败: {e}",
+            data={"link": article_link},
+        )
+    return None
+
+
+@retry(**retry_desc)
+def get_article_list_from_account(account_id: str, index=None) -> dict | None:
+    target_url = f"{base_url}/blogger"
+    payload = json.dumps({"account_id": account_id, "cursor": index})
+    try:
+        response = requests.post(
+            url=target_url, headers=headers, data=payload, timeout=120
+        )
+        response.raise_for_status()
+        return response.json()
+    except requests.exceptions.RequestException as e:
+        log(
+            task="get_official_account_article_list",
+            function="get_official_account_article_list",
+            message=f"API请求失败: {e}",
+            data={"gh_id": account_id},
+        )
+    except json.JSONDecodeError as e:
+        log(
+            task="get_official_account_article_list",
+            function="get_official_account_article_list",
+            message=f"响应解析失败: {e}",
+            data={"gh_id": account_id},
+        )
+    return None
+
+
+@retry(**retry_desc)
+def get_source_account_from_article(article_link) -> dict | None:
+    """
+    get account info from official article
+    :param article_link:
+    :return:
+    """
+    try:
+        response = requests.get(
+            url=article_link,
+            headers={"User-Agent": FakeUserAgent().random},
+            timeout=120,
+        )
+        response.raise_for_status()
+        html_text = response.text
+        regex_nickname = r"hit_nickname:\s*'([^']+)'"
+        regex_username = r"hit_username:\s*'([^']+)'"
+        nickname = re.search(regex_nickname, html_text)
+        username = re.search(regex_username, html_text)
+        # 输出提取的结果
+        if nickname and username:
+            return {"name": nickname.group(1), "gh_id": username.group(1)}
+        else:
+            return {}
+    except requests.exceptions.RequestException as e:
+        log(
+            task="get_source_account_from_article",
+            function="get_source_account_from_article",
+            message=f"API请求失败: {e}",
+            data={"link": article_link},
+        )
+    except json.JSONDecodeError as e:
+        log(
+            task="get_source_account_from_article",
+            function="get_source_account_from_article",
+            message=f"响应解析失败: {e}",
+            data={"link": article_link},
+        )
+    return None

+ 33 - 26
applications/database/mysql_pools.py

@@ -1,7 +1,10 @@
-from aiomysql import create_pool, DictCursor
-
+import logging
+from aiomysql import create_pool
+from aiomysql.cursors import DictCursor
 from applications.config import *
 
+logging.basicConfig(level=logging.INFO)
+
 
 class DatabaseManager:
     def __init__(self):
@@ -9,26 +12,12 @@ class DatabaseManager:
         self.pools = {}
 
     async def init_pools(self):
-        # 从环境变量获取数据库配置,也可以直接在这里配置
+        # 从配置获取数据库配置,也可以直接在这里配置
         self.databases = {
-            "aigc_db_pool": {
-                "host": aigc_db_config.get("host", "localhost"),
-                "port": 3306,
-                "user": aigc_db_config.get("user", "root"),
-                "password": aigc_db_config.get("password", ""),
-                "db": aigc_db_config.get("db", "database1"),
-                "minsize": int(aigc_db_config.get("min_size", 1)),
-                "maxsize": int(aigc_db_config.get("max_size", 5)),
-            },
-            "long_video_db_pool": {
-                "host": long_video_db_config.get("host", "localhost"),
-                "port": 3306,
-                "user": long_video_db_config.get("user", "root"),
-                "password": long_video_db_config.get("password", ""),
-                "db": long_video_db_config.get("db", "database1"),
-                "minsize": int(long_video_db_config.get("min_size", 1)),
-                "maxsize": int(long_video_db_config.get("max_size", 5)),
-            },
+            "aigc": aigc_db_config,
+            "long_video": long_video_db_config,
+            "long_articles": long_articles_db_config,
+            "piaoquan_crawler": piaoquan_crawler_db_config,
         }
 
         for db_name, config in self.databases.items():
@@ -45,9 +34,9 @@ class DatabaseManager:
                     autocommit=True,
                 )
                 self.pools[db_name] = pool
-                print(f"✅ Created connection pool for {db_name}")
+                logging.info(f"Created connection pool for {db_name}")
             except Exception as e:
-                print(f"❌ Failed to create pool for {db_name}: {str(e)}")
+                logging.error(f"Failed to create pool for {db_name}: {str(e)}")
                 self.pools[db_name] = None
 
     async def close_pools(self):
@@ -55,9 +44,11 @@ class DatabaseManager:
             if pool:
                 pool.close()
                 await pool.wait_closed()
-                print(f"🔌 Closed connection pool for {name}")
+                logging.info("🔌 Closed connection pool for {name}")
 
-    async def async_fetch(self, db_name, query, cursor_type=DictCursor):
+    async def async_fetch(
+        self, query, db_name="long_articles", params=None, cursor_type=DictCursor
+    ):
         pool = self.pools[db_name]
         if not pool:
             await self.init_pools()
@@ -65,12 +56,28 @@ class DatabaseManager:
         try:
             async with pool.acquire() as conn:
                 async with conn.cursor(cursor_type) as cursor:
-                    await cursor.execute(query)
+                    await cursor.execute(query, params)
                     fetch_response = await cursor.fetchall()
             return fetch_response, None
         except Exception as e:
             return None, str(e)
 
+    async def async_save(self, query, params, db_name="long_articles"):
+        pool = self.pools[db_name]
+        if not pool:
+            await self.init_pools()
+
+        async with pool.acquire() as connection:
+            async with connection.cursor() as cursor:
+                try:
+                    await cursor.execute(query, params)
+                    affected_rows = cursor.rowcount
+                    await connection.commit()
+                    return affected_rows
+                except Exception as e:
+                    await connection.rollback()
+                    raise e
+
     def get_pool(self, db_name):
         return self.pools.get(db_name)
 

+ 1 - 0
applications/pipeline/__init__.py

@@ -0,0 +1 @@
+from .data_recycle_pipeline import insert_article_into_recycle_pool

+ 3 - 0
applications/pipeline/crawler_pipeline.py

@@ -0,0 +1,3 @@
+class CrawlerPipeline:
+
+    pass

+ 128 - 0
applications/pipeline/data_recycle_pipeline.py

@@ -0,0 +1,128 @@
+import json
+from typing import List, Dict
+
+from applications.utils import show_desc_to_sta, str_to_md5
+
+
+async def insert_article_into_recycle_pool(
+    pool, log_client, msg_list: List[Dict], account_info: Dict
+):
+    """insert article into recycle pool"""
+    table_name = "official_articles_v2"
+    for info in msg_list:
+        base_info = info.get("BaseInfo", {})
+        app_msg_id = info.get("AppMsg", {}).get("BaseInfo", {}).get("AppMsgId", None)
+        create_timestamp = (
+            info.get("AppMsg", {}).get("BaseInfo", {}).get("CreateTime", None)
+        )
+        update_timestamp = (
+            info.get("AppMsg", {}).get("BaseInfo", {}).get("UpdateTime", None)
+        )
+        publish_type = info.get("AppMsg", {}).get("BaseInfo", {}).get("Type", None)
+        detail_article_list = info.get("AppMsg", {}).get("DetailInfo", [])
+        if detail_article_list:
+            for article in detail_article_list:
+                title = article.get("Title", None)
+                digest = article.get("Digest", None)
+                item_index = article.get("ItemIndex", None)
+                content_url = article.get("ContentUrl", None)
+                source_url = article.get("SourceUrl", None)
+                cover_img_url = article.get("CoverImgUrl", None)
+                cover_img_url_1_1 = article.get("CoverImgUrl_1_1", None)
+                cover_img_url_235_1 = article.get("CoverImgUrl_235_1", None)
+                item_show_type = article.get("ItemShowType", None)
+                is_original = article.get("IsOriginal", None)
+                show_desc = article.get("ShowDesc", None)
+                show_stat = show_desc_to_sta(show_desc)
+                ori_content = article.get("ori_content", None)
+                show_view_count = show_stat.get("show_view_count", 0)
+                show_like_count = show_stat.get("show_like_count", 0)
+                show_zs_count = show_stat.get("show_zs_count", 0)
+                show_pay_count = show_stat.get("show_pay_count", 0)
+                wx_sn = (
+                    content_url.split("&sn=")[1].split("&")[0] if content_url else None
+                )
+                status = account_info["using_status"]
+                info_tuple = (
+                    account_info["gh_id"],
+                    account_info["name"],
+                    app_msg_id,
+                    title,
+                    publish_type,
+                    create_timestamp,
+                    update_timestamp,
+                    digest,
+                    item_index,
+                    content_url,
+                    source_url,
+                    cover_img_url,
+                    cover_img_url_1_1,
+                    cover_img_url_235_1,
+                    item_show_type,
+                    is_original,
+                    show_desc,
+                    ori_content,
+                    show_view_count,
+                    show_like_count,
+                    show_zs_count,
+                    show_pay_count,
+                    wx_sn,
+                    json.dumps(base_info, ensure_ascii=False),
+                    str_to_md5(title),
+                    status,
+                )
+                try:
+                    insert_query = f"""
+                            insert into {table_name}
+                            (ghId, accountName, appMsgId, title, Type, createTime, updateTime, Digest, ItemIndex, ContentUrl, SourceUrl, CoverImgUrl, CoverImgUrl_1_1, CoverImgUrl_255_1, ItemShowType, IsOriginal, ShowDesc, ori_content, show_view_count, show_like_count, show_zs_count, show_pay_count, wx_sn, baseInfo, title_md5, status)
+                            values
+                            (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s);
+                            """
+                    await pool.async_save(
+                        query=insert_query,
+                        params=info_tuple,
+                        db_name="piaoquan_crawler",
+                    )
+                    await log_client.log(
+                        contents={
+                            "function": "insert_article_into_recycle_pool",
+                            "status": "success",
+                            "data": info_tuple,
+                        }
+                    )
+                    print("insert_article_into_recycle_pool success")
+
+                except Exception as e:
+                    try:
+                        update_sql = f"""update {table_name} set show_view_count = %s, show_like_count=%s where wx_sn = %s;"""
+                        await pool.async_save(
+                            query=update_sql,
+                            params=(show_view_count, show_like_count, wx_sn),
+                            db_name="piaoquan_crawler",
+                        )
+                        print("update_article_into_recycle_pool success")
+
+                    except Exception as e:
+                        await log_client.log(
+                            contents={
+                                "function": "insert_article_into_recycle_pool",
+                                "status": "fail",
+                                "message": "更新文章失败",
+                                "data": {
+                                    "error": str(e),
+                                    "content_link": content_url,
+                                    "account_name": account_info["name"],
+                                },
+                            }
+                        )
+                        continue
+
+        else:
+            await log_client.log(
+                contents={
+                    "function": "insert_article_into_recycle_pool",
+                    "status": "fail",
+                    "message": "account has no articles",
+                    "data": {"account_name": account_info["name"]},
+                }
+            )

+ 4 - 0
applications/service/__init__.py

@@ -1 +1,5 @@
+# 日志服务
+from .log_service import LogService
+
+# 实验
 from .get_cover import GetCoverService

+ 14 - 18
applications/service/get_cover.py

@@ -1,4 +1,4 @@
-from applications.service.response import Response
+from applications.utils.response import Response
 from applications.utils import fetch_channel_info
 from applications.utils import fetch_aigc_cover
 from applications.utils import fetch_long_video_cover
@@ -14,12 +14,12 @@ class GetCoverService(Response):
 
     async def montage_cover(self, oss_key, pool_name):
         match pool_name:
-            case "aigc_db_pool":
+            case "aigc":
                 if oss_key.startswith("http"):
                     return oss_key + self.suffix
 
                 return self.aigc_prefix + oss_key + self.suffix
-            case "long_video_db_pool":
+            case "long_video":
                 if oss_key.startswith("http"):
                     return oss_key + self.suffix
                 return self.pq_prefix + oss_key + self.suffix
@@ -29,7 +29,7 @@ class GetCoverService(Response):
     async def fetch_cover_info(self, pool_name, channel_content_id: str):
 
         match pool_name:
-            case "aigc_db_pool":
+            case "aigc":
                 fetch_response, sql_error = await fetch_aigc_cover(
                     self.pool, channel_content_id
                 )
@@ -41,20 +41,18 @@ class GetCoverService(Response):
                 if fetch_response:
                     image_oss = fetch_response[0]["oss_object_key"]
                     if image_oss:
-                        cover = await self.montage_cover(image_oss, "aigc_db_pool")
+                        cover = await self.montage_cover(image_oss, "aigc")
                     else:
                         image_url = fetch_response[0]["image_url"]
                         if not image_url:
                             return await self.fetch_cover_info(
-                                "long_video_db_pool", channel_content_id
+                                "long_video", channel_content_id
                             )
                         else:
-                            cover = await self.montage_cover(image_url, "aigc_db_pool")
+                            cover = await self.montage_cover(image_url, "aigc")
                 else:
-                    return await self.fetch_cover_info(
-                        "long_video_db_pool", channel_content_id
-                    )
-            case "long_video_db_pool":
+                    return await self.fetch_cover_info("long_video", channel_content_id)
+            case "long_video":
                 fetch_response, sql_error = await fetch_long_video_cover(
                     self.pool, channel_content_id
                 )
@@ -65,7 +63,7 @@ class GetCoverService(Response):
                     )
                 if fetch_response:
                     image_oss = fetch_response[1]["image_path"]
-                    cover = await self.montage_cover(image_oss, "long_video_db_pool")
+                    cover = await self.montage_cover(image_oss, "long_video")
                 else:
                     return self.error_response(
                         error_code="402",
@@ -81,7 +79,7 @@ class GetCoverService(Response):
 
     async def get_cover(self, content_id: str, video_index: int, seed_video_id: str):
         if video_index == 2:
-            return await self.fetch_cover_info("long_video_db_pool", seed_video_id)
+            return await self.fetch_cover_info("long_video", seed_video_id)
 
         channel_info, sql_error = await fetch_channel_info(self.pool, content_id)
         if sql_error:
@@ -99,13 +97,11 @@ class GetCoverService(Response):
         channel_type = channel_info[0]["channel"]
         match channel_type:
             case 5:
-                return await self.fetch_cover_info("aigc_db_pool", channel_content_id)
+                return await self.fetch_cover_info("aigc", channel_content_id)
             case 6:
-                return await self.fetch_cover_info("aigc_db_pool", channel_content_id)
+                return await self.fetch_cover_info("aigc", channel_content_id)
             case 10:
-                return await self.fetch_cover_info(
-                    "long_video_db_pool", channel_content_id
-                )
+                return await self.fetch_cover_info("long_video", channel_content_id)
             case _:
                 return self.error_response(
                     error_code="403",

+ 48 - 0
applications/service/log_service.py

@@ -0,0 +1,48 @@
+import asyncio
+import traceback
+import time
+import json
+import datetime
+from aliyun.log import LogClient, PutLogsRequest, LogItem
+
+
+class LogService:
+    def __init__(self, endpoint, access_key_id, access_key_secret, project, logstore):
+        self.client = LogClient(endpoint, access_key_id, access_key_secret)
+        self.project = project
+        self.logstore = logstore
+        self.queue = asyncio.Queue()
+        self.running = False
+
+    async def start(self):
+        self.running = True
+        asyncio.create_task(self._worker())
+
+    async def stop(self):
+        self.running = False
+
+    async def log(self, contents: dict):
+        """外部调用日志接口"""
+        await self.queue.put(contents)
+
+    async def _worker(self):
+        while self.running:
+            contents = await self.queue.get()
+            try:
+                await asyncio.to_thread(self._put_log, contents)
+            except Exception as e:
+                print(f"[Log Error] {e}")
+                print(traceback.format_exc())
+
+    def _put_log(self, contents: dict):
+        timestamp = int(time.time())
+        contents["datetime"] = datetime.datetime.now().__str__()
+        safe_items = [
+            (str(k), json.dumps(v) if isinstance(v, (dict, list)) else str(v))
+            for k, v in contents.items()
+        ]
+        log_item = LogItem(timestamp=timestamp, contents=safe_items)
+        req = PutLogsRequest(
+            self.project, self.logstore, topic="", source="", logitems=[log_item]
+        )
+        self.client.put_logs(req)

+ 0 - 9
applications/service/response.py

@@ -1,9 +0,0 @@
-class Response:
-
-    @classmethod
-    def success_response(cls, data):
-        return {"code": 0, "status": "success", "data": data}
-
-    @classmethod
-    def error_response(cls, error_code, error_message):
-        return {"code": error_code, "status": "error", "message": error_message}

+ 0 - 0
applications/tasks/__init__.py


+ 3 - 0
applications/tasks/data_recycle_tasks/__init__.py

@@ -0,0 +1,3 @@
+from .recycle_daily_publish_articles import RecycleDailyPublishArticlesTask
+from .recycle_daily_publish_articles import CheckDailyPublishArticlesTask
+from .recycle_daily_publish_articles import UpdateRootSourceIdAndUpdateTimeTask

+ 442 - 0
applications/tasks/data_recycle_tasks/recycle_daily_publish_articles.py

@@ -0,0 +1,442 @@
+import json
+import time
+import datetime
+import urllib.parse
+import traceback
+
+from tqdm import tqdm
+
+from applications.api import feishu_robot
+from applications.crawler.wechat import get_article_list_from_account
+from applications.crawler.wechat import get_article_detail
+from applications.pipeline import insert_article_into_recycle_pool
+
+
+class Const:
+    # 订阅号
+    SUBSCRIBE_TYPE_SET = {0, 1}
+
+    NEW_ACCOUNT_CRAWL_PERIOD = 60 * 60 * 24 * 30
+
+    FORBIDDEN_GH_IDS = [
+        "gh_4c058673c07e",
+        "gh_de9f9ebc976b",
+        "gh_7b4a5f86d68c",
+        "gh_f902cea89e48",
+        "gh_789a40fe7935",
+        "gh_cd041ed721e6",
+        "gh_62d7f423f382",
+        "gh_043223059726",
+    ]
+
+    # 文章状态
+    # 记录默认状态
+    DEFAULT_STATUS = 0
+    # 请求接口失败状态
+    REQUEST_FAIL_STATUS = -1
+    # 文章被删除状态
+    DELETE_STATUS = -2
+    # 未知原因无信息返回状态
+    UNKNOWN_STATUS = -3
+    # 文章违规状态
+    ILLEGAL_STATUS = -4
+
+    ARTICLE_ILLEGAL_CODE = 25012
+    ARTICLE_DELETE_CODE = 25005
+    ARTICLE_SUCCESS_CODE = 0
+    ARTICLE_UNKNOWN_CODE = 10000
+
+
+class RecycleDailyPublishArticlesTask(Const):
+
+    def __init__(self, pool, log_client, date_string):
+        self.pool = pool
+        self.log_client = log_client
+        self.date_string = date_string
+
+    async def get_publish_accounts(self):
+        """
+        get all publish accounts
+        """
+        query = f"""
+            select distinct t3.name, t3.gh_id, t3.follower_count, t3.create_timestamp as account_init_timestamp,
+                t4.service_type_info as account_type, t4.verify_type_info as account_auth, t3.id as account_id,
+                group_concat(distinct t5.remark) as account_remark
+            from
+                publish_plan t1
+                join publish_plan_account t2 on t1.id = t2.plan_id
+                join publish_account t3 on t2.account_id = t3.id
+                left join publish_account_wx_type t4 on t3.id = t4.account_id
+                left join publish_account_remark t5 on t3.id = t5.publish_account_id
+            where t1.plan_status = 1 and t1.content_modal = 3 and t3.channel = 5
+            group by t3.id;
+        """
+        account_list, error = await self.pool.async_fetch(query, db_name="aigc")
+        return [i for i in account_list if "自动回复" not in str(i["account_remark"])]
+
+    async def get_account_status(self):
+        """get account experiment status"""
+        sql = f"""  
+            select t1.account_id, t2.status
+            from wx_statistics_group_source_account t1
+            join wx_statistics_group_source t2 on t1.group_source_name = t2.account_source_name;
+        """
+        account_status_list, error = await self.pool.async_fetch(sql, db_name="aigc")
+        account_status_dict = {
+            account["account_id"]: account["status"] for account in account_status_list
+        }
+        return account_status_dict
+
+    async def recycle_single_account(self, account):
+        """recycle single account"""
+        query = f"""
+            select max(publish_timestamp) as publish_timestamp from official_articles_v2 where ghId = %s;
+        """
+        response, error = await self.pool.async_fetch(
+            query, params=(account["gh_id"],), db_name="piaoquan_crawler"
+        )
+        if response:
+            max_publish_timestamp = response[0]["publish_timestamp"]
+        else:
+            max_publish_timestamp = int(time.time()) - self.NEW_ACCOUNT_CRAWL_PERIOD
+
+        cursor = None
+        while True:
+            response = get_article_list_from_account(
+                account_id=account["gh_id"], index=cursor
+            )
+            response_code = response["code"]
+            match response_code:
+                case 25013:
+                    await feishu_robot.bot(
+                        title="发布账号封禁",
+                        detail={
+                            "账号名称": account["name"],
+                            "账号id": account["gh_id"],
+                        },
+                    )
+                    return
+                case 0:
+                    msg_list = response.get("data", {}).get("data", [])
+                    if not msg_list:
+                        return
+
+                    await insert_article_into_recycle_pool(
+                        self.pool, self.log_client, msg_list, account
+                    )
+
+                    # check last article
+                    last_article = msg_list[-1]
+                    last_publish_timestamp = last_article["AppMsg"]["BaseInfo"][
+                        "UpdateTime"
+                    ]
+                    if last_publish_timestamp <= max_publish_timestamp:
+                        return
+
+                    cursor = response["data"].get("next_cursor")
+                    if not cursor:
+                        return
+                case _:
+                    return
+
+    async def get_task_list(self):
+        """recycle all publish accounts articles"""
+        binding_accounts = await self.get_publish_accounts()
+        # 过滤封禁账号
+        binding_accounts = [
+            i for i in binding_accounts if i["gh_id"] not in self.FORBIDDEN_GH_IDS
+        ]
+
+        account_status = await self.get_account_status()
+        account_list = [
+            {
+                **item,
+                "using_status": (
+                    0 if account_status.get(item["account_id"]) == "实验" else 1
+                ),
+            }
+            for item in binding_accounts
+        ]
+
+        # 订阅号
+        subscription_accounts = [
+            i for i in account_list if i["account_type"] in self.SUBSCRIBE_TYPE_SET
+        ]
+        return subscription_accounts
+
+    async def deal(self):
+        subscription_accounts = await self.get_task_list()
+        for account in tqdm(subscription_accounts, desc="recycle each account"):
+            try:
+                await self.recycle_single_account(account)
+
+            except Exception as e:
+                print(
+                    f"{account['name']}\t{account['gh_id']}: recycle account error:", e
+                )
+
+
+class CheckDailyPublishArticlesTask(RecycleDailyPublishArticlesTask):
+
+    async def check_account(self, account: dict, date_string: str) -> bool:
+        """check account data"""
+        query = f"""
+            select accountName, count(1) as publish_count 
+            from official_articles_v2 where ghId = %s and from_unixtime(createTime) > %s;
+        """
+        response, error = await self.pool.async_fetch(
+            query=query,
+            db_name="piaoquan_crawler",
+            params=(account["gh_id"], date_string),
+        )
+        if error:
+            await feishu_robot.bot(
+                title="sql错误",
+                detail={
+                    "task": "CheckDailyPublishArticlesTask",
+                    "function": "check_account",
+                    "account": account,
+                    "date_string": date_string,
+                },
+                mention=False,
+            )
+            return False
+        else:
+            today_publish_count = response[0]["publish_count"]
+            return today_publish_count > 0
+
+    async def deal(self):
+        task_list = await self.get_task_list()
+        for task in tqdm(task_list, desc="check each account step1: "):
+            if await self.check_account(task, self.date_string):
+                continue
+            else:
+                await self.recycle_single_account(task)
+
+        # check again
+        fail_list = []
+        for second_task in tqdm(task_list, desc="check each account step2: "):
+            if await self.check_account(second_task, self.date_string):
+                continue
+            else:
+                second_task.pop("account_type", None)
+                second_task.pop("account_auth", None)
+                second_task.pop("account_id", None)
+                second_task.pop("account_remark", None)
+                fail_list.append(second_task)
+
+        if fail_list:
+            columns = [
+                feishu_robot.create_feishu_columns_sheet(
+                    sheet_type="plain_text",
+                    sheet_name="name",
+                    display_name="公众号名称",
+                ),
+                feishu_robot.create_feishu_columns_sheet(
+                    sheet_type="plain_text", sheet_name="gh_id", display_name="gh_id"
+                ),
+                feishu_robot.create_feishu_columns_sheet(
+                    sheet_type="number",
+                    sheet_name="follower_count",
+                    display_name="粉丝数",
+                ),
+                feishu_robot.create_feishu_columns_sheet(
+                    sheet_type="date",
+                    sheet_name="account_init_timestamp",
+                    display_name="账号接入系统时间",
+                ),
+                feishu_robot.create_feishu_columns_sheet(
+                    sheet_type="plain_text",
+                    sheet_name="using_status",
+                    display_name="利用状态",
+                ),
+            ]
+            await feishu_robot.bot(
+                title=f"{self.date_string} 发布文章,存在未更新的账号",
+                detail={"columns": columns, "rows": fail_list},
+                table=True,
+                mention=False,
+            )
+        else:
+            await feishu_robot.bot(
+                title=f"{self.date_string} 发布文章,所有文章更新成功",
+                detail={
+                    "date_string": self.date_string,
+                    "finish_time": datetime.datetime.now().__str__(),
+                },
+                mention=False,
+            )
+
+
+class UpdateRootSourceIdAndUpdateTimeTask(Const):
+    """
+    update publish_timestamp && root_source_id
+    """
+
+    def __init__(self, pool, log_client):
+        self.pool = pool
+        self.log_client = log_client
+
+    async def get_article_list(self):
+        query = f"""select ContentUrl, wx_sn from official_articles_v2 where publish_timestamp in %s;"""
+        article_list, error = await self.pool.async_fetch(
+            query=query, db_name="piaoquan_crawler", params=(tuple([0, -1]),)
+        )
+        return article_list
+
+    async def check_each_article(self, article: dict):
+        url = article["ContentUrl"]
+        wx_sn = article["wx_sn"].decode("utf-8")
+        try:
+            response = get_article_detail(url)
+            response_code = response["code"]
+
+            if response_code == self.ARTICLE_DELETE_CODE:
+                publish_timestamp_s = self.DELETE_STATUS
+                root_source_id_list = []
+            elif response_code == self.ARTICLE_ILLEGAL_CODE:
+                publish_timestamp_s = self.ILLEGAL_STATUS
+                root_source_id_list = []
+            elif response_code == self.ARTICLE_SUCCESS_CODE:
+                data = response["data"]["data"]
+                publish_timestamp_ms = data["publish_timestamp"]
+                publish_timestamp_s = int(publish_timestamp_ms / 1000)
+                mini_program = data.get("mini_program", [])
+                if mini_program:
+                    root_source_id_list = [
+                        urllib.parse.parse_qs(urllib.parse.unquote(i["path"]))[
+                            "rootSourceId"
+                        ][0]
+                        for i in mini_program
+                    ]
+                else:
+                    root_source_id_list = []
+            else:
+                publish_timestamp_s = self.UNKNOWN_STATUS
+                root_source_id_list = []
+        except Exception as e:
+            publish_timestamp_s = self.REQUEST_FAIL_STATUS
+            root_source_id_list = None
+            error_msg = traceback.format_exc()
+            await self.log_client.log(
+                contents={
+                    "task": "get_official_article_detail",
+                    "data": {
+                        "url": url,
+                        "wx_sn": wx_sn,
+                        "error_msg": error_msg,
+                        "error": str(e),
+                    },
+                    "function": "check_each_article",
+                    "status": "fail",
+                }
+            )
+        query = f"""
+            update official_articles_v2 set publish_timestamp = %s, root_source_id_list = %s
+            where wx_sn = %s;
+        """
+        await self.pool.async_save(
+            query=query,
+            db_name="piaoquan_crawler",
+            params=(
+                publish_timestamp_s,
+                json.dumps(root_source_id_list, ensure_ascii=False),
+                wx_sn,
+            ),
+        )
+        if publish_timestamp_s == self.REQUEST_FAIL_STATUS:
+            return article
+        else:
+            return None
+
+    async def fallback_mechanism(self):
+        # 通过msgId 来修改publish_timestamp
+        update_sql = f"""
+            update official_articles_v2 oav 
+            join (
+                select ghId, appMsgId, max(publish_timestamp) as publish_timestamp 
+                from official_articles_v2
+                where publish_timestamp > %s 
+                group by ghId, appMsgId
+                ) vv 
+                on oav.appMsgId = vv.appMsgId and oav.ghId = vv.ghId
+            set oav.publish_timestamp = vv.publish_timestamp
+            where oav.publish_timestamp <= %s;
+        """
+        affected_rows_1 = await self.pool.async_save(query=update_sql, params=(0, 0), db_name="piaoquan_crawler")
+
+        # 若还是无 publish_timestamp,用update_time当作 publish_timestamp
+        update_sql_2 = f"""
+            update official_articles_v2
+            set publish_timestamp = updateTime
+            where publish_timestamp < %s;
+        """
+        affected_rows_2 = await self.pool.async_save(query=update_sql_2, params=0)
+        if affected_rows_1 or affected_rows_2:
+            await feishu_robot.bot(
+                title="执行兜底修改发布时间戳",
+                detail={
+                    "通过msgId修改": affected_rows_1,
+                    "通过update_timestamp修改": affected_rows_2
+                },
+                mention=False
+            )
+
+    async def deal(self):
+        task_list = await self.get_article_list()
+        for task in tqdm(task_list, desc="get article detail step1: "):
+            try:
+                await self.check_each_article(task)
+            except Exception as e:
+                try:
+                    await self.log_client.log(
+                        contents={
+                            "task": "get_official_article_detail_step1",
+                            "data": {
+                                "detail": {
+                                    "url": task["ContentUrl"],
+                                    "wx_sn": task["wx_sn"].decode("utf-8"),
+                                },
+                                "error_msg": traceback.format_exc(),
+                                "error": str(e),
+                            },
+                            "function": "check_each_article",
+                            "status": "fail",
+                        }
+                    )
+                except Exception as e:
+                    print(e)
+                    print(traceback.format_exc())
+
+        # process_failed_task_reproduce
+        fail_tasks = await self.get_article_list()
+        fail_list = []
+        for fail_task in tqdm(fail_tasks, desc="get article detail step2: "):
+            try:
+                res = await self.check_each_article(fail_task)
+                if res:
+                    fail_list.append(res)
+            except Exception as e:
+                await self.log_client.log(
+                    contents={
+                        "task": "get_official_article_detail_step2",
+                        "data": {
+                            "detail": {
+                                "url": fail_task["ContentUrl"],
+                                "wx_sn": fail_task["wx_sn"].decode("utf-8"),
+                            },
+                            "error_msg": traceback.format_exc(),
+                            "error": str(e),
+                        },
+                        "function": "check_each_article",
+                        "status": "fail",
+                    }
+                )
+        if fail_list:
+            await feishu_robot.bot(title="更新文章,获取detail失败", detail=fail_list)
+
+            current_hour = datetime.datetime.now().hour
+            if current_hour >= 21:
+                await self.fallback_mechanism()
+
+

+ 1 - 0
applications/tasks/llm_tasks/__init__.py

@@ -0,0 +1 @@
+from .process_title import TitleRewrite

+ 269 - 0
applications/tasks/llm_tasks/process_title.py

@@ -0,0 +1,269 @@
+import time
+import traceback
+
+from tqdm import tqdm
+
+from applications.api import fetch_deepseek_completion
+
+
+class Const:
+    # title rewrite status
+    TITLE_REWRITE_INIT_STATUS = 0
+    TITLE_REWRITE_SUCCESS_STATUS = 1
+    TITLE_REWRITE_FAIL_STATUS = 99
+    TITLE_REWRITE_LOCK_STATUS = 101
+
+    # article status
+    ARTICLE_AUDIT_PASSED_STATUS = 1
+    ARTICLE_POSITIVE_STATUS = 0
+
+    # title useful status
+    TITLE_USEFUL_STATUS = 1
+
+    # prompt version
+    PROMPT_VERSION = "xx_250228"  # 信欣2025-02-28提供
+
+    # block expire time 1h
+    TITLE_REWRITE_LOCK_TIME = 60 * 60
+
+
+class TitleProcess(Const):
+    def __init__(self, pool, aliyun_log):
+        self.pool = pool
+        self.aliyun_log = aliyun_log
+
+    @classmethod
+    def generate_title_rewrite_prompt(cls, ori_title):
+        """
+        生成prompt
+        """
+        prompt = f"""
+        请将以下标题改写成适合公众号中小程序点击和传播的文章标题,文章标题的写作规范如下,请学习后进行文章标题的编写。直接输出最终的文章标题,文章标题撰写规范如下:
+        1. 标题结构:要点前置,信息明确
+            核心信息前置:标题开头直接点出文章的核心内容或亮点,吸引读者注意。例如:
+              “我国存款最安全的五大银行,永远都不会倒闭,你知道是哪五家吗?”
+              “亩产7000斤,被误认成萝卜却曾是‘救命粮’,如今成我国出口名蔬”。
+            简洁明了:标题通常在20字以内,信息集中且易于理解。
+            悬念前置结构:前半句设置反常/冲突场景(如"刑满释放蹬三轮")+后半句用结果反转制造悬念("政府领导登门分配工作")
+            多要素拼接:通过冒号/逗号分隔不同叙事主体(地域+人物冲突+权威评价),如"辽宁女子住高档小区被敲门,法院判决意外"
+
+        2. 情绪表达:激发共鸣,引发好奇
+            情感共鸣:通过情感化的语言触动读者,泪崩/守护/抱头痛哭等情感冲击词,配合家庭伦理场景
+            例如:
+              “老母亲分家产,给亲闺女30万,给养女一筐青菜,养女意外摔倒,看到筐子里的东西,瞬间愣住了”。
+              “儿子卖车卖房给母亲治病,母亲去世后儿媳收拾房间,打开床底柜,儿子突然痛哭”。
+            悬念与好奇心:通过提问或制造悬念,激发读者点击欲望。例如:
+              “你知道是哪五家吗?”
+              “打开床底柜,儿子突然痛哭”。
+            冲突性情绪词:拍桌大骂/气愤不已/眼红不已/算计等强对抗性词汇
+            结果反差刺激:用"风光善终/价值过亿/判决意外"等违反预期的结果
+
+        3. 语言风格:口语化、接地气
+            口语化表达:使用通俗易懂的语言,贴近读者生活。
+            刻意使用"赶都赶不走/各吃各的/我就知道你在家"等市井化用语。
+            例如:
+              “狗屎运?江西男子钓鱼时发现青鱼尸骸,扒开后捡到鸡蛋大小的青鱼石”。
+              “聪明的女人,不会帮婆家3种忙,而蠢女人才一再插手”。
+            接地气的词汇:使用“狗屎运”“蠢女人”等口语化词汇,增强亲切感。
+            身份反差构建:突出人物命运转折(老农→亿万富翁/囚犯→政府帮扶对象)
+            权威背书暗示:"专家气愤/法院判决/网友评价"等第三方视角增强可信度
+
+        4. 标点运用:增强语气,突出重点
+            问号与感叹号:通过问号制造悬念,感叹号强化情感。
+            在关键转折点使用("太气人了!/赔不了!")
+            问号制造互动:如"容嬷嬷是校花?"激发读者验证心理
+            例如:
+              “你知道是哪五家吗?”
+              “太无耻了!湖南,一名厨师被公司派到云南‘出差’被拒……”
+            引号与冒号:用于突出关键词或转折点。
+            破折号递进:用"——"引导关键信息("吃不完最好扔掉——")
+            例如:
+              “被误认成萝卜却曾是‘救命粮’”。
+              “女子归还后,失主拒绝支付报酬,还说:要有格局”。
+
+        5. 热点与话题性:结合社会热点或争议
+            社会热点:结合当前热点事件或争议话题,吸引关注。例如:
+              “上海:男子超市连续购买46枚过期咸鸭蛋,2天分46次交易,向厂家索赔金14万,法院判了!”
+            争议性话题:通过争议性内容引发讨论。例如:
+              “李玉成终于说出实话,公开吐槽马玉琴年纪太大,结婚28年疑似后悔”。
+
+        6. 数字与具体细节:增强可信度与吸引力
+            数字的运用:通过具体数字增强标题的可信度和吸引力。例如:
+              “亩产7000斤”。
+              “22年河南男子跳河救人,体力耗尽留遗言”。
+            细节描述:通过细节让标题更具画面感。例如:
+              “打开床底柜,儿子突然痛哭”。
+              “扒开后捡到鸡蛋大小的青鱼石”。
+
+        7. 价值诉求:传递实用信息或情感价值
+            实用信息:提供对读者有价值的信息。例如:
+              “我国存款最安全的五大银行,永远都不会倒闭”。
+              “72岁老人每天一个蒸苹果,半年后体检,看到指标变化让他乐开了花”。
+            情感价值:通过情感故事或人生哲理打动读者。例如:
+              “父母越老越能暴露家庭最真实的一面:当父母70岁,子女不该抱有这三种期待”。
+
+        8. 名人效应与历史情怀:增强吸引力
+            名人效应:提及名人或历史事件,吸引关注。例如:
+              “难怪王扶林说陈晓旭不够漂亮,看看他选的原黛玉候选人,那才叫美”。
+              “1975年‘下馆子’的老照片,2元能吃些什么,勾起那段最难忘的时光”。
+
+        9.隐藏传播逻辑:通过标题中暗含的、能触发人性弱点(如猎奇、贪婪、同情)或社会痛点的心理机制,通过潜意识刺激读者点击欲望
+           人性弱点触发:贪婪(200万保单)、猎奇(林彪密件)、窥私(家庭算计)
+           生存焦虑关联:医疗(脑瘫儿)、养老(子女不孝)、食品安全(二次加热)
+           身份代入设计:选择"老太太/外甥女/退休母亲"等易引发群体共鸣的角色
+        输入的标题是: '{ori_title}'
+        """
+        return prompt
+
+
+class TitleRewrite(TitleProcess):
+
+    async def roll_back_blocked_tasks(self):
+        """
+        rollback blocked tasks
+        """
+        query = f"""
+            select id, title_rewrite_status_update_timestamp
+            from publish_single_video_source
+            where title_rewrite_status = {self.TITLE_REWRITE_LOCK_STATUS};
+        """
+        article_list = await self.pool.async_fetch(
+            query=query,
+            db_name="long_articles",
+        )
+        if article_list:
+            blocked_id_list = [
+                i["id"]
+                for i in article_list
+                if (int(time.time()) - i["title_rewrite_status_update_timestamp"])
+                > self.TITLE_REWRITE_LOCK_TIME
+            ]
+            if blocked_id_list:
+                update_query = f"""
+                    update publish_single_video_source
+                    set title_rewrite_status = %s
+                    where id in %s and title_rewrite_status = %s;
+                """
+                await self.pool.async_save(
+                    query=update_query,
+                    params=(
+                        self.TITLE_REWRITE_INIT_STATUS,
+                        tuple(blocked_id_list),
+                        self.TITLE_REWRITE_LOCK_STATUS,
+                    ),
+                )
+
+    async def get_articles_batch(self, batch_size=1000):
+        query = f"""
+            select content_trace_id, article_title
+            from publish_single_video_source 
+            where bad_status = {self.ARTICLE_POSITIVE_STATUS} 
+                and audit_status = {self.ARTICLE_AUDIT_PASSED_STATUS} 
+                and title_rewrite_status = {self.TITLE_REWRITE_INIT_STATUS}
+                and platform in ('hksp', 'sph')
+            limit {batch_size};
+        """
+        return await self.pool.async_fetch(query=query, db_name="long_articles")
+
+    async def update_title_rewrite_status(
+        self, content_trace_id, ori_status, new_status
+    ):
+        query = f"""
+            update publish_single_video_source
+            set title_rewrite_status = %s, title_rewrite_status_update_timestamp = %s
+            where content_trace_id = %s and title_rewrite_status= %s;
+        """
+        affected_rows = await self.pool.async_save(
+            query=query,
+            params=(new_status, int(time.time()), content_trace_id, ori_status),
+        )
+        return affected_rows
+
+    async def insert_into_rewrite_table(self, content_trace_id, new_title):
+        """
+        insert into rewrite_table
+        """
+        insert_sql = f"""
+            insert into video_title_rewrite
+            (content_trace_id, new_title, status, prompt_version)
+            values (%s, %s, %s, %s);
+        """
+        await self.pool.async_save(
+            query=insert_sql,
+            params=(
+                content_trace_id,
+                new_title,
+                self.TITLE_USEFUL_STATUS,
+                self.PROMPT_VERSION,
+            ),
+        )
+
+    async def rewrite_each_article(self, article):
+        """
+        rewrite each article
+        """
+        content_trace_id = article["content_trace_id"]
+        article_title = article["article_title"]
+
+        # lock each task
+        affected_rows = await self.update_title_rewrite_status(
+            content_trace_id=content_trace_id,
+            ori_status=self.TITLE_REWRITE_INIT_STATUS,
+            new_status=self.TITLE_REWRITE_LOCK_STATUS,
+        )
+        if not affected_rows:
+            return
+
+        try:
+            prompt = self.generate_title_rewrite_prompt(article_title)
+            new_title = fetch_deepseek_completion(model="default", prompt=prompt)
+
+            # insert into rewrite table
+            await self.insert_into_rewrite_table(
+                content_trace_id=content_trace_id, new_title=new_title
+            )
+
+            # unlock
+            await self.update_title_rewrite_status(
+                content_trace_id=content_trace_id,
+                ori_status=self.TITLE_REWRITE_LOCK_STATUS,
+                new_status=self.TITLE_REWRITE_SUCCESS_STATUS,
+            )
+        except Exception as e:
+            await self.aliyun_log.log(
+                contents={
+                    "task": "title rewrite task",
+                    "function": "rewrite_each_article",
+                    "message": content_trace_id,
+                    "status": "fail",
+                    "data": {
+                        "error_message": str(e),
+                        "error_type": type(e).__name__,
+                        "traceback": traceback.format_exc(),
+                    },
+                }
+            )
+            await self.update_title_rewrite_status(
+                content_trace_id=content_trace_id,
+                ori_status=self.TITLE_REWRITE_LOCK_STATUS,
+                new_status=self.TITLE_REWRITE_FAIL_STATUS,
+            )
+
+    async def deal(self):
+        """title rewrite task deal"""
+        await self.roll_back_blocked_tasks()
+
+        task_list = await self.get_articles_batch()
+
+        bar = tqdm(task_list, desc="title rewrite task")
+        for article in bar:
+            await self.rewrite_each_article(article)
+            bar.set_description("title rewrite task")
+
+
+class VideoPoolCategoryGeneration:
+    pass
+
+
+class ArticlePoolCategoryGeneration:
+    pass

+ 6 - 0
applications/tasks/monitor_tasks/__init__.py

@@ -0,0 +1,6 @@
+from .kimi_balance import check_kimi_balance
+from .get_off_videos import GetOffVideos
+from .get_off_videos import CheckVideoAuditStatus
+from .gzh_article_monitor import OutsideGzhArticlesMonitor
+from .gzh_article_monitor import OutsideGzhArticlesCollector
+from .gzh_article_monitor import InnerGzhArticlesMonitor

+ 183 - 0
applications/tasks/monitor_tasks/get_off_videos.py

@@ -0,0 +1,183 @@
+import time
+import traceback
+from typing import List, Optional
+
+from tqdm import tqdm
+
+from applications.api import change_video_audit_status
+from applications.api import fetch_piaoquan_video_list_detail
+from applications.api import feishu_robot
+
+
+class GetOffVideosConst:
+    EXPIRE_TIME = 3 * 24 * 3600
+    EARLIEST_TIME = 7 * 24 * 3600
+    VIDEO_AVAILABLE_STATUS = 1
+    VIDEO_DISABLE_STATUS = 0
+
+    # VIDEO_AUDIT
+    VIDEO_AUDIT_FAIL_STATUS = 2
+    VIDEO_AUDIT_SUCCESS_STATUS = 5
+
+    # Task code
+    TASK_SUCCESS_STATUS = 2
+    TASK_FAILED_STATUS = 99
+
+    # check status
+    CHECK_INIT_STATUS = 0
+    CHECK_FINISHED_STATUS = 1
+
+    # table
+    table = "get_off_videos"
+
+
+class GetOffVideos(GetOffVideosConst):
+    def __init__(self, db_client, log_client):
+        self.db_client = db_client
+        self.log_client = log_client
+
+    async def get_task_list(
+        self, earliest_timestamp_threshold: int, expire_timestamp_threshold: int
+    ):
+        """get videos which need get off"""
+        query = f"""
+            select video_id from {self.table} where video_status = %s and publish_time between %s and %s;
+        """
+        video_list, error = await self.db_client.async_fetch(
+            query,
+            params=(
+                self.VIDEO_AVAILABLE_STATUS,
+                earliest_timestamp_threshold,
+                expire_timestamp_threshold,
+            ),
+        )
+        return video_list
+
+    async def update_video_status(self, video_id):
+        query = f"""
+            update {self.table} set video_status = %s, get_off_time = %s where video_id = %s;
+        """
+        return await self.db_client.async_save(
+            query, params=(self.VIDEO_DISABLE_STATUS, int(time.time()), video_id)
+        )
+
+    async def update_video_audit_status(self, video_id):
+        """use pq api to update video status"""
+        response = await change_video_audit_status(
+            video_id, self.VIDEO_AUDIT_FAIL_STATUS
+        )
+        await self.update_video_status(video_id)
+        return response
+
+    async def get_off_job(self):
+        """get off videos out of expire time"""
+        expire_timestamp_threshold = int(time.time()) - self.EXPIRE_TIME
+        earliest_timestamp_threshold = int(time.time()) - self.EARLIEST_TIME
+        task_list = await self.get_task_list(
+            earliest_timestamp_threshold, expire_timestamp_threshold
+        )
+        for task in tqdm(task_list):
+            video_id = task["video_id"]
+            try:
+                await self.update_video_audit_status(video_id)
+
+            except Exception as e:
+                await self.log_client.log(
+                    contents={
+                        "task": "get_off_videos",
+                        "function": "get_off_job",
+                        "status": "fail",
+                        "message": "get off video fail",
+                        "data": {
+                            "video_id": video_id,
+                            "error": str(e),
+                            "traceback": traceback.format_exc(),
+                        },
+                    }
+                )
+
+    async def check(self):
+        earliest_timestamp = int(time.time()) - self.EARLIEST_TIME
+        expire_timestamp = int(time.time()) - self.EXPIRE_TIME
+        task_list = await self.get_task_list(earliest_timestamp, expire_timestamp)
+        if task_list:
+            await feishu_robot.bot(
+                title="自动下架视频失败",
+                detail={
+                    "total_video": len(task_list),
+                    "video_list": [i["video_id"] for i in task_list],
+                },
+                mention=False,
+            )
+            return self.TASK_FAILED_STATUS
+        else:
+            return self.TASK_SUCCESS_STATUS
+
+
+class CheckVideoAuditStatus(GetOffVideosConst):
+    def __init__(self, db_client, log_client):
+        self.db_client = db_client
+        self.log_client = log_client
+
+    async def get_video_list_status(self, video_list: List[int]):
+        response = await fetch_piaoquan_video_list_detail(video_list)
+        video_detail_list = response.get("data", [])
+        if video_detail_list:
+            bad_video_list = [
+                i["id"]
+                for i in video_detail_list
+                if i["auditStatus"] != self.VIDEO_AUDIT_SUCCESS_STATUS
+            ]
+        else:
+            bad_video_list = []
+
+        return bad_video_list
+
+    async def get_unchecked_video_list(self) -> Optional[List[int]]:
+        """find unchecked videos"""
+        query = f"""
+            select video_id from {self.table} where check_status = %s and video_status = %s limit 1000;
+        """
+        video_id_list, error = await self.db_client.async_fetch(
+            query, params=(self.CHECK_INIT_STATUS, self.VIDEO_AVAILABLE_STATUS)
+        )
+        if error:
+            print("error", error)
+            return None
+        else:
+            return [i["video_id"] for i in video_id_list]
+
+    async def update_check_status(self, video_list: List[int]):
+        query = f"""update {self.table} set check_status = %s where video_id in %s;"""
+        return await self.db_client.async_save(
+            query, params=(self.CHECK_FINISHED_STATUS, tuple(video_list))
+        )
+
+    async def deal(self):
+        def chuck_iterator(arr, chunk_size):
+            for i in range(0, len(arr), chunk_size):
+                yield arr[i : i + chunk_size]
+
+        video_id_list = await self.get_unchecked_video_list()
+        video_chunks = chuck_iterator(video_id_list, 10)
+
+        bad_videos_count = 0
+        fail_list = []
+        for video_chunk in video_chunks:
+            bad_video_id_list = await self.get_video_list_status(video_chunk)
+            if bad_video_id_list:
+                bad_videos_count += len(bad_video_id_list)
+                for bad_video_id in tqdm(bad_video_id_list):
+                    response = await change_video_audit_status(bad_video_id)
+                    if not response:
+                        fail_list.append(bad_video_id)
+
+            await self.update_check_status(video_chunk)
+
+        if fail_list:
+            await feishu_robot.bot(
+                title="校验已发布视频状态出现错误", detail=fail_list, mention=False
+            )
+            return self.TASK_FAILED_STATUS
+        else:
+            return self.TASK_SUCCESS_STATUS

+ 332 - 0
applications/tasks/monitor_tasks/gzh_article_monitor.py

@@ -0,0 +1,332 @@
+import time
+import datetime
+from typing import Optional, List
+
+from tqdm import tqdm
+
+from applications.api import feishu_robot
+from applications.api import delete_illegal_gzh_articles
+from applications.crawler.wechat import get_article_detail
+from applications.crawler.wechat import get_article_list_from_account
+from applications.utils import str_to_md5
+
+
+class MonitorConst:
+    # 文章违规状态
+    ILLEGAL_STATUS = 1
+    INIT_STATUS = 0
+
+    # 监测周期
+    MONITOR_CYCLE = 3 * 24 * 3600
+
+    # article code
+    ARTICLE_ILLEGAL_CODE = 25012
+    ARTICLE_DELETE_CODE = 25005
+    ARTICLE_SUCCESS_CODE = 0
+    ARTICLE_UNKNOWN_CODE = 10000
+
+    # Task status
+    TASK_SUCCESS_CODE = 2
+    TASK_FAIL_CODE = 99
+
+
+class OutsideGzhArticlesManager(MonitorConst):
+
+    def __init__(self, pool):
+        self.pool = pool
+
+    async def update_article_illegal_status(
+        self, article_id: int, illegal_reason: str
+    ) -> None:
+        query = f"""
+            update outside_gzh_account_monitor
+            set illegal_status = %s, illegal_reason = %s
+            where id = %s and illegal_status = %s
+        """
+        await self.pool.async_save(
+            query=query,
+            params=(self.ILLEGAL_STATUS, illegal_reason, article_id, self.INIT_STATUS),
+        )
+
+    async def whether_published_in_a_week(self, gh_id: str) -> bool:
+        """
+        判断该账号一周内是否有发文,如有,则说无需抓
+        """
+        query = f"""
+            select id, publish_timestamp from outside_gzh_account_monitor
+            where gh_id = %s
+            order by publish_timestamp desc
+            limit %s;
+        """
+        response, error = await self.pool.async_fetch(query=query, params=(gh_id, 1))
+        if response:
+            publish_timestamp = response[0]["publish_timestamp"]
+            if publish_timestamp is None:
+                return False
+            else:
+                return int(time.time()) - publish_timestamp <= self.MONITOR_CYCLE
+        else:
+            return False
+
+
+class OutsideGzhArticlesCollector(OutsideGzhArticlesManager):
+
+    async def fetch_outside_account_list(self):
+        query = f"""
+            select 
+                t2.group_source_name as account_source, 
+                t3.name as account_name,
+                t3.gh_id as gh_id,
+                t3.status as status
+            from wx_statistics_group_source t1
+                join wx_statistics_group_source_account t2 on t2.group_source_name = t1.account_source_name
+                join publish_account t3 on t3.id = t2.account_id
+            where
+                t1.mode_type = '代运营服务号';
+        """
+        response, error = await self.pool.async_fetch(query=query, db_name="aigc")
+        return response
+
+    async def fetch_each_account(self, account: dict):
+        gh_id = account["gh_id"]
+        # 判断该账号本周是否已经发布过
+        if await self.whether_published_in_a_week(gh_id):
+            return
+
+        fetch_response = get_article_list_from_account(gh_id)
+        try:
+            msg_list = fetch_response.get("data", {}).get("data", [])
+            if msg_list:
+                for msg in tqdm(
+                    msg_list, desc=f"insert account {account['account_name']}"
+                ):
+                    await self.save_each_msg_to_db(msg, account)
+
+            else:
+                print(f"crawler failed: {account['account_name']}")
+        except Exception as e:
+            print(
+                f"crawler failed: account_name: {account['account_name']}\n"
+                f"error: {e}\n"
+            )
+
+    async def save_each_msg_to_db(self, msg: dict, account: dict):
+        base_info = msg["AppMsg"]["BaseInfo"]
+        detail_info = msg["AppMsg"]["DetailInfo"]
+        app_msg_id = base_info["AppMsgId"]
+        create_timestamp = base_info["CreateTime"]
+        publish_type = base_info["Type"]
+
+        # insert each article
+        for article in detail_info:
+            link = article["ContentUrl"]
+            article_detail = get_article_detail(link)
+            response_code = article_detail["code"]
+            if response_code == self.ARTICLE_ILLEGAL_CODE:
+                illegal_reason = article_detail.get("msg")
+                # bot and return
+                await feishu_robot.bot(
+                    title="文章违规告警",
+                    detail={
+                        "账号名称": article["account_name"],
+                        "标题": article["title"],
+                        "违规理由": illegal_reason,
+                        "发布日期": datetime.datetime.fromtimestamp(
+                            create_timestamp
+                        ).strftime("%Y-%m-%d %H:%M:%S"),
+                        "账号合作商": article["account_source"],
+                    },
+                    env="outside_gzh_monitor",
+                    mention=False,
+                )
+
+            elif response_code == self.ARTICLE_SUCCESS_CODE:
+                insert_query = f"""
+                    insert ignore into outside_gzh_account_monitor
+                    (account_name, gh_id, account_source, account_type, app_msg_id, publish_type, position, title, link, 
+                    channel_content_id, crawler_timestamp, publish_timestamp)
+                    values
+                    (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s);
+                """
+                await self.pool.async_save(
+                    query=insert_query,
+                    params=(
+                        account["account_name"],
+                        account["gh_id"],
+                        account["account_source"],
+                        "服务号",
+                        app_msg_id,
+                        publish_type,
+                        article["ItemIndex"],
+                        article["Title"],
+                        link,
+                        article_detail["data"]["data"]["channel_content_id"],
+                        int(time.time()),
+                        int(article_detail["data"]["data"]["publish_timestamp"] / 1000),
+                    ),
+                )
+            else:
+                continue
+
+    async def deal(self):
+        account_list = await self.fetch_outside_account_list()
+        for account in tqdm(account_list):
+            try:
+                await self.fetch_each_account(account)
+            except Exception as e:
+                print(f"crawler failed: {account['account_name']}, error: {e}")
+
+
+class OutsideGzhArticlesMonitor(OutsideGzhArticlesManager):
+
+    async def fetch_article_list_to_check(self):
+        publish_timestamp_threshold = int(time.time()) - self.MONITOR_CYCLE
+        fetch_query = f"""
+            select id, account_name, gh_id, account_source, account_type, 
+                title, link, from_unixtime(publish_timestamp) as publish_date
+            from outside_gzh_account_monitor
+            where illegal_status = {self.INIT_STATUS} and publish_timestamp > {publish_timestamp_threshold};
+        """
+        response, error = await self.pool.async_fetch(query=fetch_query)
+        return response
+
+    async def check_each_article(self, article: dict):
+        """
+        check each article
+        """
+        link = article["link"]
+        article_detail = get_article_detail(link)
+        response_code = article_detail["code"]
+        if response_code == self.ARTICLE_ILLEGAL_CODE:
+            illegal_reason = article_detail.get("msg")
+            # illegal_reason = '测试报警功能'
+            feishu_robot.bot(
+                title="文章违规告警",
+                detail={
+                    "账号名称": article["account_name"],
+                    "标题": article["title"],
+                    "违规理由": illegal_reason,
+                    "发布日期": str(article["publish_date"]),
+                    "账号合作商": article["account_source"],
+                },
+                env="outside_gzh_monitor",
+                mention=False,
+            )
+            article_id = article["id"]
+            await self.update_article_illegal_status(article_id, illegal_reason)
+        else:
+            return
+
+    async def deal(self):
+        article_list = await self.fetch_article_list_to_check()
+        for article in tqdm(article_list, desc="外部服务号监控"):
+            try:
+                await self.check_each_article(article)
+
+            except Exception as e:
+                print(
+                    f"crawler failed: account_name: {article['account_name']}\n"
+                    f"link: {article['link']}\n"
+                    f"title: {article['title']}\n"
+                    f"error: {e}\n"
+                )
+        return self.TASK_SUCCESS_CODE
+
+
+class InnerGzhArticlesMonitor(MonitorConst):
+    def __init__(self, pool):
+        self.pool = pool
+
+    async def whether_title_unsafe(self, title: str) -> bool:
+        """
+        :param title: gzh article title
+        :return: bool
+        """
+        title_md5 = str_to_md5(title)
+        query = f"""
+            select title_md5 from article_unsafe_title where title_md5 = '{title_md5}';
+        """
+        response, error = await self.pool.async_fetch(query=query)
+        return True if response else False
+
+    async def fetch_article_list_to_check(self, run_date: str = None) -> Optional[List]:
+        """
+        :param run_date: 执行日期,格式为“%Y-%m-%d”, default None
+        """
+        if not run_date:
+            run_date = datetime.datetime.today().strftime("%Y-%m-%d")
+
+        run_timestamp = int(
+            datetime.datetime.strptime(run_date, "%Y-%m-%d").timestamp()
+        )
+        start_timestamp = run_timestamp - self.MONITOR_CYCLE
+        query = f"""
+            select ghId, accountName, title, ContentUrl, wx_sn, from_unixtime(publish_timestamp) as publish_timestamp
+            from official_articles_v2
+            where publish_timestamp >= {start_timestamp}
+            order by publish_timestamp desc;
+        """
+        response, error = await self.pool.async_fetch(
+            query=query, db_name="piaoquan_crawler"
+        )
+        if error:
+            await feishu_robot.bot(
+                title="站内微信公众号发文监测任务异常",
+                detail={"error": error, "message": "查询数据库异常"},
+            )
+            return None
+        else:
+            return response
+
+    async def check_each_article(self, article: dict):
+        gh_id, account_name, title, url, wx_sn, publish_date = article
+        try:
+            response = get_article_detail(url, is_cache=False)
+            response_code = response["code"]
+            if response_code == self.ARTICLE_ILLEGAL_CODE:
+                error_detail = article.get("msg")
+                query = f"""
+                    insert ignore into illegal_articles
+                        (gh_id, account_name, title, wx_sn, publish_date, illegal_reason)
+                    values 
+                        (%s, %s, %s, %s, %s, %s); 
+                """
+                affected_row = await self.pool.async_save(
+                    query=query,
+                    params=(
+                        gh_id,
+                        account_name,
+                        title,
+                        wx_sn,
+                        publish_date,
+                        error_detail,
+                    ),
+                )
+                if affected_row:
+                    if await self.whether_title_unsafe(title):
+                        return
+
+                    await feishu_robot.bot(
+                        title="文章违规告警",
+                        detail={
+                            "account_name": account_name,
+                            "gh_id": gh_id,
+                            "title": title,
+                            "wx_sn": wx_sn.decode("utf-8"),
+                            "publish_date": str(publish_date),
+                            "error_detail": error_detail,
+                        },
+                        mention=False,
+                        env="prod",
+                    )
+                    await delete_illegal_gzh_articles(gh_id, title)
+
+        except Exception as e:
+            print(f"crawler failed: {account_name}, error: {e}")
+
+    async def deal(self):
+        article_list = await self.fetch_article_list_to_check()
+        for article in tqdm(article_list, desc="站内文章监测任务"):
+            await self.check_each_article(article)
+
+        return self.TASK_SUCCESS_CODE

+ 34 - 0
applications/tasks/monitor_tasks/kimi_balance.py

@@ -0,0 +1,34 @@
+import traceback
+from typing import Dict
+from applications.api import feishu_robot
+from applications.utils import AsyncHttPClient
+
+# const
+BALANCE_LIMIT_THRESHOLD = 100.0
+
+
+async def check_kimi_balance() -> Dict:
+    url = "https://api.moonshot.cn/v1/users/me/balance"
+    headers = {
+        "Authorization": "Bearer sk-5DqYCa88kche6nwIWjLE1p4oMm8nXrR9kQMKbBolNAWERu7q",
+        "Content-Type": "application/json; charset=utf-8",
+    }
+
+    try:
+        async with AsyncHttPClient() as client:
+            response = await client.get(url, headers=headers)
+
+        balance = response["data"]["available_balance"]
+        if balance < BALANCE_LIMIT_THRESHOLD:
+            await feishu_robot.bot(
+                title="kimi余额小于 {} 块".format(BALANCE_LIMIT_THRESHOLD),
+                detail={"balance": balance},
+            )
+        return {"code": 2, "data": response}
+    except Exception as e:
+        error_stack = traceback.format_exc()
+        await feishu_robot.bot(
+            title="kimi余额接口处理失败,数据结构异常",
+            detail={"error": str(e), "error_msg": error_stack},
+        )
+        return {"code": 99, "data": error_stack}

+ 273 - 0
applications/tasks/task_scheduler.py

@@ -0,0 +1,273 @@
+import asyncio
+import time
+from datetime import datetime
+
+from applications.api import feishu_robot
+from applications.utils import task_schedule_response
+
+from applications.tasks.data_recycle_tasks import CheckDailyPublishArticlesTask
+from applications.tasks.data_recycle_tasks import RecycleDailyPublishArticlesTask
+from applications.tasks.data_recycle_tasks import UpdateRootSourceIdAndUpdateTimeTask
+from applications.tasks.llm_tasks import TitleRewrite
+from applications.tasks.monitor_tasks import check_kimi_balance
+from applications.tasks.monitor_tasks import GetOffVideos
+from applications.tasks.monitor_tasks import CheckVideoAuditStatus
+from applications.tasks.monitor_tasks import InnerGzhArticlesMonitor
+from applications.tasks.monitor_tasks import OutsideGzhArticlesMonitor
+from applications.tasks.monitor_tasks import OutsideGzhArticlesCollector
+
+
+class TaskScheduler:
+    def __init__(self, data, log_service, db_client):
+        self.data = data
+        self.log_client = log_service
+        self.db_client = db_client
+        self.table = "long_articles_task_manager"
+
+    async def whether_task_processing(self, task_name: str) -> bool:
+        """whether task is processing"""
+        query = f"""
+            select start_timestamp from {self.table} where task_name = %s and task_status = %s;
+        """
+        response, error = await self.db_client.async_fetch(
+            query=query, params=(task_name, 1)
+        )
+        if not response:
+            # no task is processing
+            return False
+        else:
+            start_timestamp = response[0]["start_timestamp"]
+            # todo: every task should has a unique expire timestamp, remember to write that in a task config file
+            if int(time.time()) - start_timestamp >= 86400:
+                await feishu_robot.bot(
+                    title=f"{task_name} has been processing for more than one day",
+                    detail={"timestamp": start_timestamp},
+                    env="long_articles_task",
+                )
+            return True
+
+    async def record_task(self, task_name, date_string):
+        """record task"""
+        query = f"""insert into {self.table} (date_string, task_name, start_timestamp) values (%s, %s, %s);"""
+        await self.db_client.async_save(
+            query=query, params=(date_string, task_name, int(time.time()))
+        )
+
+    async def lock_task(self, task_name, date_string):
+        query = f"""update {self.table} set task_status = %s where task_name = %s and date_string = %s and task_status = %s;"""
+        return await self.db_client.async_save(
+            query=query, params=(1, task_name, date_string, 0)
+        )
+
+    async def release_task(self, task_name, date_string, final_status):
+        """
+        任务执行完成之后,将任务状态设置为完成状态/失败状态
+        """
+        query = f"""
+               update {self.table} 
+               set task_status = %s, finish_timestamp = %s
+               where task_name = %s and date_string = %s and task_status = %s;
+           """
+        return await self.db_client.async_save(
+            query=query,
+            params=(final_status, int(time.time()), task_name, date_string, 1),
+        )
+
+    async def deal(self):
+        task_name = self.data.get("task_name")
+        date_string = self.data.get("date_string")
+        if not task_name:
+            await self.log_client.log(
+                contents={
+                    "task": task_name,
+                    "function": "task_scheduler_deal",
+                    "message": "not task name in params",
+                    "status": "fail",
+                    "data": self.data,
+                }
+            )
+            return await task_schedule_response.fail_response(
+                error_code="4002", error_message="task_name must be input"
+            )
+
+        if not date_string:
+            date_string = datetime.today().strftime("%Y-%m-%d")
+
+        # prepare for task
+        if await self.whether_task_processing(task_name):
+            return await task_schedule_response.fail_response(
+                error_code="5001", error_message="task is processing"
+            )
+
+        await self.record_task(task_name=task_name, date_string=date_string)
+
+        await self.lock_task(task_name, date_string)
+
+        match task_name:
+            case "check_kimi_balance":
+                response = await check_kimi_balance()
+                await self.log_client.log(
+                    contents={
+                        "task": task_name,
+                        "function": "task_scheduler_deal",
+                        "message": "check_kimi_balance task execute successfully",
+                        "status": "success",
+                        "data": response,
+                    }
+                )
+                await self.release_task(
+                    task_name=task_name,
+                    date_string=date_string,
+                    final_status=response["code"],
+                )
+                return await task_schedule_response.success_response(
+                    task_name=task_name, data=response
+                )
+
+            case "get_off_videos":
+
+                async def background_get_off_videos():
+                    sub_task = GetOffVideos(self.db_client, self.log_client)
+                    await sub_task.get_off_job()
+                    task_status = await sub_task.check()
+                    await self.release_task(
+                        task_name=task_name,
+                        date_string=date_string,
+                        final_status=task_status,
+                    )
+
+                asyncio.create_task(background_get_off_videos())
+                return await task_schedule_response.success_response(
+                    task_name=task_name,
+                    data={"code": 0, "message": "get off_videos started background"},
+                )
+
+            case "check_publish_video_audit_status":
+
+                async def background_check_publish_video_audit_status():
+                    sub_task = CheckVideoAuditStatus(self.db_client, self.log_client)
+                    print("start processing task status: ")
+                    task_status = await sub_task.deal()
+                    await self.release_task(
+                        task_name=task_name,
+                        date_string=date_string,
+                        final_status=task_status,
+                    )
+                    print("finish task status: ", task_status)
+
+                asyncio.create_task(background_check_publish_video_audit_status())
+                return await task_schedule_response.success_response(
+                    task_name=task_name,
+                    data={
+                        "code": 0,
+                        "message": "check publish video audit status started",
+                    },
+                )
+
+            case "outside_article_monitor":
+
+                async def background_outside_article_monitor():
+                    collect_task = OutsideGzhArticlesCollector(self.db_client)
+                    await collect_task.deal()
+                    monitor_task = OutsideGzhArticlesMonitor(self.db_client)
+                    final_status = await monitor_task.deal()
+                    await self.release_task(
+                        task_name, date_string, final_status=final_status
+                    )
+
+                asyncio.create_task(background_outside_article_monitor())
+                return await task_schedule_response.success_response(
+                    task_name=task_name,
+                    data={
+                        "code": 0,
+                        "message": "outside_article_monitor started background",
+                    },
+                )
+
+            case "inner_article_monitor":
+
+                async def background_inner_article_monitor():
+                    task = InnerGzhArticlesMonitor(self.db_client)
+                    final_status = await task.deal()
+                    await self.release_task(
+                        task_name, date_string, final_status=final_status
+                    )
+
+                asyncio.create_task(background_inner_article_monitor())
+                return await task_schedule_response.success_response(
+                    task_name=task_name,
+                    data={"code": 0, "message": "task started background"},
+                )
+
+            case "title_rewrite":
+
+                async def background_title_rewrite():
+                    sub_task = TitleRewrite(self.db_client, self.log_client)
+                    await sub_task.deal()
+                    await self.release_task(
+                        task_name=task_name, date_string=date_string, final_status=2
+                    )
+
+                asyncio.create_task(background_title_rewrite())
+                return await task_schedule_response.success_response(
+                    task_name=task_name,
+                    data={
+                        "code": 0,
+                        "message": "inner_article_monitor started background",
+                    },
+                )
+
+            case "daily_publish_articles_recycle":
+
+                async def background_daily_publish_articles_recycle():
+                    sub_task = RecycleDailyPublishArticlesTask(
+                        self.db_client, self.log_client, date_string
+                    )
+                    await sub_task.deal()
+
+                    task = CheckDailyPublishArticlesTask(
+                        self.db_client, self.log_client, date_string
+                    )
+                    await task.deal()
+
+                    await self.release_task(
+                        task_name=task_name, date_string=date_string, final_status=2
+                    )
+
+                asyncio.create_task(background_daily_publish_articles_recycle())
+                return await task_schedule_response.success_response(
+                    task_name=task_name,
+                    data={"code": 0, "message": "task started background"},
+                )
+
+            case "update_root_source_id":
+
+                async def background_update_root_source_id():
+                    sub_task = UpdateRootSourceIdAndUpdateTimeTask(
+                        self.db_client, self.log_client
+                    )
+                    await sub_task.deal()
+                    await self.release_task(
+                        task_name=task_name, date_string=date_string, final_status=2
+                    )
+
+                asyncio.create_task(background_update_root_source_id())
+                return await task_schedule_response.success_response(
+                    task_name=task_name,
+                    data={"code": 0, "message": "task started background"},
+                )
+
+            case _:
+                await self.log_client.log(
+                    contents={
+                        "task": task_name,
+                        "function": "task_scheduler_deal",
+                        "message": "wrong task input",
+                        "status": "success",
+                        "data": self.data,
+                    }
+                )
+                await self.release_task(task_name, date_string, 99)
+                return await task_schedule_response.fail_response(
+                    error_code="4001", error_message="wrong task name input"
+                )

+ 14 - 0
applications/utils/__init__.py

@@ -1,3 +1,17 @@
+# import async apollo client
+from .async_apollo_client import AsyncApolloClient
+
+# import async http client
+from .async_http_client import AsyncHttPClient
+
 from .get_cover import fetch_channel_info
 from .get_cover import fetch_aigc_cover
 from .get_cover import fetch_long_video_cover
+
+# server response
+from .response import TaskScheduleResponse
+
+# common
+from .common import *
+
+task_schedule_response = TaskScheduleResponse()

+ 131 - 0
applications/utils/async_apollo_client.py

@@ -0,0 +1,131 @@
+# -*- coding: utf-8 -*-
+import json
+import logging
+import socket
+import asyncio
+import aiohttp
+
+
+class AsyncApolloClient:
+    def __init__(
+        self,
+        app_id,
+        cluster="default",
+        config_server_url="http://localhost:8080",
+        timeout=35,
+        ip=None,
+    ):
+        self.config_server_url = config_server_url
+        self.appId = app_id
+        self.cluster = cluster
+        self.timeout = timeout
+        self.stopped = False
+        self.ip = ip or self._init_ip()
+
+        self._cache = {}
+        self._notification_map = {"application": -1}
+        self._stop_event = asyncio.Event()
+
+    def _init_ip(self):
+        s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
+        try:
+            s.connect(("8.8.8.8", 53))
+            return s.getsockname()[0]
+        finally:
+            s.close()
+
+    async def get_value(
+        self,
+        key,
+        default_val=None,
+        namespace="application",
+        auto_fetch_on_cache_miss=False,
+    ):
+        if namespace not in self._notification_map:
+            self._notification_map[namespace] = -1
+            logging.info(f"Add namespace '{namespace}' to local notification map")
+
+        if namespace not in self._cache:
+            self._cache[namespace] = {}
+            logging.info(f"Add namespace '{namespace}' to local cache")
+            await self._long_poll()
+
+        if key in self._cache[namespace]:
+            return self._cache[namespace][key]
+        elif auto_fetch_on_cache_miss:
+            return await self._cached_http_get(key, default_val, namespace)
+        else:
+            return default_val
+
+    async def start(self):
+        if len(self._cache) == 0:
+            await self._long_poll()
+        asyncio.create_task(self._listener())
+
+    async def stop(self):
+        logging.info("Stopping listener...")
+        self._stop_event.set()
+
+    async def _cached_http_get(self, key, default_val, namespace="application"):
+        url = f"{self.config_server_url}/configfiles/json/{self.appId}/{self.cluster}/{namespace}?ip={self.ip}"
+        async with aiohttp.ClientSession() as session:
+            async with session.get(url) as r:
+                if r.status == 200:
+                    data = await r.json()
+                    self._cache[namespace] = data
+                    logging.info(f"Updated local cache for namespace {namespace}")
+                else:
+                    data = self._cache.get(namespace, {})
+
+        return data.get(key, default_val)
+
+    async def _uncached_http_get(self, namespace="application"):
+        url = f"{self.config_server_url}/configs/{self.appId}/{self.cluster}/{namespace}?ip={self.ip}"
+        async with aiohttp.ClientSession() as session:
+            async with session.get(url) as r:
+                if r.status == 200:
+                    data = await r.json()
+                    self._cache[namespace] = data["configurations"]
+                    logging.info(
+                        f"Updated local cache for namespace {namespace} release key {data['releaseKey']}"
+                    )
+
+    async def _long_poll(self):
+        url = f"{self.config_server_url}/notifications/v2"
+        notifications = [
+            {"namespaceName": k, "notificationId": v}
+            for k, v in self._notification_map.items()
+        ]
+        params = {
+            "appId": self.appId,
+            "cluster": self.cluster,
+            "notifications": json.dumps(notifications, ensure_ascii=False),
+        }
+
+        try:
+            async with aiohttp.ClientSession() as session:
+                async with session.get(url, params=params, timeout=self.timeout) as r:
+                    if r.status == 304:
+                        logging.debug("No change.")
+                        return
+                    if r.status == 200:
+                        data = await r.json()
+                        for entry in data:
+                            ns = entry["namespaceName"]
+                            nid = entry["notificationId"]
+                            logging.info(f"{ns} has changes: notificationId={nid}")
+                            await self._uncached_http_get(ns)
+                            self._notification_map[ns] = nid
+                    else:
+                        logging.warning("Sleep due to unexpected status...")
+                        await asyncio.sleep(self.timeout)
+        except Exception as e:
+            logging.warning(f"Error during long poll: {e}")
+            await asyncio.sleep(self.timeout)
+
+    async def _listener(self):
+        logging.info("Entering listener loop...")
+        while not self._stop_event.is_set():
+            await self._long_poll()
+        logging.info("Listener stopped.")
+        self.stopped = True

+ 99 - 0
applications/utils/async_http_client.py

@@ -0,0 +1,99 @@
+import aiohttp
+from typing import Optional, Union, Dict, Any
+
+
+class AsyncHttPClient:
+    def __init__(
+        self,
+        timeout: int = 10,
+        max_connections: int = 100,
+        default_headers: Optional[Dict[str, str]] = None,
+    ):
+        """
+        简化版异步 HTTP 客户端
+
+        :param timeout: 请求超时时间(秒)
+        :param max_connections: 连接池最大连接数
+        :param default_headers: 默认请求头
+        """
+        self.timeout = aiohttp.ClientTimeout(total=timeout)
+        self.connector = aiohttp.TCPConnector(limit=max_connections)
+        self.default_headers = default_headers or {}
+        self.session = None
+
+    async def __aenter__(self):
+        self.session = aiohttp.ClientSession(
+            connector=self.connector, timeout=self.timeout, headers=self.default_headers
+        )
+        return self
+
+    async def __aexit__(self, exc_type, exc_val, exc_tb):
+        await self.session.close()
+
+    async def request(
+        self,
+        method: str,
+        url: str,
+        params: Optional[Dict[str, Any]] = None,
+        data: Optional[Union[Dict[str, Any], str, bytes]] = None,
+        json: Optional[Dict[str, Any]] = None,
+        headers: Optional[Dict[str, str]] = None,
+    ) -> Union[Dict[str, Any], str]:
+        """核心请求方法"""
+        request_headers = {**self.default_headers, **(headers or {})}
+
+        try:
+            async with self.session.request(
+                method,
+                url,
+                params=params,
+                data=data,
+                json=json,
+                headers=request_headers,
+            ) as response:
+                response.raise_for_status()
+                content_type = response.headers.get("Content-Type", "")
+
+                if "application/json" in content_type:
+                    return await response.json()
+                return await response.text()
+
+        except aiohttp.ClientResponseError as e:
+            print(f"HTTP error: {e.status} {e.message}")
+            raise
+        except aiohttp.ClientError as e:
+            print(f"Network error: {str(e)}")
+            raise
+
+    async def get(
+        self,
+        url: str,
+        params: Optional[Dict[str, Any]] = None,
+        headers: Optional[Dict[str, str]] = None,
+    ) -> Union[Dict[str, Any], str]:
+        """GET 请求"""
+        return await self.request("GET", url, params=params, headers=headers)
+
+    async def post(
+        self,
+        url: str,
+        data: Optional[Union[Dict[str, Any], str, bytes]] = None,
+        json: Optional[Dict[str, Any]] = None,
+        headers: Optional[Dict[str, str]] = None,
+    ) -> Union[Dict[str, Any], str]:
+        """POST 请求"""
+        return await self.request("POST", url, data=data, json=json, headers=headers)
+
+    async def put(
+        self,
+        url: str,
+        data: Optional[Union[Dict[str, Any], str, bytes]] = None,
+        json: Optional[Dict[str, Any]] = None,
+        headers: Optional[Dict[str, str]] = None,
+    ) -> Union[Dict[str, Any], str]:
+        """
+        PUT 请求
+
+        通常用于更新资源,可以发送表单数据或 JSON 数据
+        """
+        return await self.request("PUT", url, data=data, json=json, headers=headers)

+ 185 - 0
applications/utils/common.py

@@ -0,0 +1,185 @@
+"""
+@author: luojunhui
+"""
+
+import hashlib
+
+from datetime import datetime, timezone, date, timedelta
+from requests import RequestException
+from urllib.parse import urlparse, parse_qs
+from tenacity import (
+    stop_after_attempt,
+    wait_exponential,
+    retry_if_exception_type,
+)
+
+
+def str_to_md5(strings):
+    """
+    字符串转化为 md5 值
+    :param strings:
+    :return:
+    """
+    # 将字符串转换为字节
+    original_bytes = strings.encode("utf-8")
+    # 创建一个md5 hash对象
+    md5_hash = hashlib.md5()
+    # 更新hash对象,传入原始字节
+    md5_hash.update(original_bytes)
+    # 获取16进制形式的MD5哈希值
+    md5_value = md5_hash.hexdigest()
+    return md5_value
+
+
+def proxy():
+    """
+    快代理
+    """
+    # 隧道域名:端口号
+    tunnel = "j685.kdltps.com:15818"
+
+    # 用户名密码方式
+    username = "t14070979713487"
+    password = "hqwanfvy"
+    proxies = {
+        "http": "http://%(user)s:%(pwd)s@%(proxy)s/"
+        % {"user": username, "pwd": password, "proxy": tunnel},
+        "https": "http://%(user)s:%(pwd)s@%(proxy)s/"
+        % {"user": username, "pwd": password, "proxy": tunnel},
+    }
+    return proxies
+
+
+def request_retry(retry_times, min_retry_delay, max_retry_delay):
+    """
+    :param retry_times:
+    :param min_retry_delay:
+    :param max_retry_delay:
+    """
+    common_retry = dict(
+        stop=stop_after_attempt(retry_times),
+        wait=wait_exponential(min=min_retry_delay, max=max_retry_delay),
+        retry=retry_if_exception_type((RequestException, TimeoutError)),
+        reraise=True,  # 重试耗尽后重新抛出异常
+    )
+    return common_retry
+
+
+def yield_batch(data, batch_size):
+    """
+    生成批次数据
+    :param data:
+    :param batch_size:
+    :return:
+    """
+    for i in range(0, len(data), batch_size):
+        yield data[i : i + batch_size]
+
+
+def extract_root_source_id(path: str) -> dict:
+    """
+    提取path参数
+    :param path:
+    :return:
+    """
+    params = parse_qs(urlparse(path).query)
+    jump_page = params.get("jumpPage", [None])[0]
+    if jump_page:
+        params2 = parse_qs(jump_page)
+        res = {
+            "video_id": params2["pages/user-videos?id"][0],
+            "root_source_id": params2["rootSourceId"][0],
+        }
+        return res
+    else:
+        return {}
+
+
+def show_desc_to_sta(show_desc):
+
+    def decode_show_v(show_v):
+        """
+
+        :param show_v:
+        :return:
+        """
+        foo = show_v.replace("千", "e3").replace("万", "e4").replace("亿", "e8")
+        foo = eval(foo)
+        return int(foo)
+
+    def decode_show_k(show_k):
+        """
+
+        :param show_k:
+        :return:
+        """
+        this_dict = {
+            "阅读": "show_view_count",  # 文章
+            "看过": "show_view_count",  # 图文
+            "观看": "show_view_count",  # 视频
+            "赞": "show_like_count",
+            "付费": "show_pay_count",
+            "赞赏": "show_zs_count",
+        }
+        if show_k not in this_dict:
+            print(f"error from decode_show_k, show_k not found: {show_k}")
+        return this_dict.get(show_k, "show_unknown")
+
+    show_desc = show_desc.replace("+", "")
+    sta = {}
+    for show_kv in show_desc.split("\u2004\u2005"):
+        if not show_kv:
+            continue
+        show_k, show_v = show_kv.split("\u2006")
+        k = decode_show_k(show_k)
+        v = decode_show_v(show_v)
+        sta[k] = v
+    res = {
+        "show_view_count": sta.get("show_view_count", 0),
+        "show_like_count": sta.get("show_like_count", 0),
+        "show_pay_count": sta.get("show_pay_count", 0),
+        "show_zs_count": sta.get("show_zs_count", 0),
+    }
+    return res
+
+
+def generate_gzh_id(url):
+    biz = url.split("biz=")[1].split("&")[0]
+    idx = url.split("&idx=")[1].split("&")[0]
+    sn = url.split("&sn=")[1].split("&")[0]
+    url_bit = "{}-{}-{}".format(biz, idx, sn).encode()
+    md5_hash = hashlib.md5()
+    md5_hash.update(url_bit)
+    md5_value = md5_hash.hexdigest()
+    return md5_value
+
+
+def timestamp_to_str(timestamp, string_format="%Y-%m-%d %H:%M:%S") -> str:
+    """
+    :param string_format:
+    :param timestamp:
+    """
+    dt_object = (
+        datetime.utcfromtimestamp(timestamp).replace(tzinfo=timezone.utc).astimezone()
+    )
+    date_string = dt_object.strftime(string_format)
+    return date_string
+
+
+def days_remaining_in_month():
+    # 获取当前日期
+    today = date.today()
+
+    # 获取下个月的第一天
+    if today.month == 12:
+        next_month = today.replace(year=today.year + 1, month=1, day=1)
+    else:
+        next_month = today.replace(month=today.month + 1, day=1)
+
+    # 计算本月最后一天(下个月第一天减去1天)
+    last_day_of_month = next_month - timedelta(days=1)
+
+    # 计算剩余天数
+    remaining_days = (last_day_of_month - today).days
+
+    return remaining_days

+ 3 - 3
applications/utils/get_cover.py

@@ -10,7 +10,7 @@ async def fetch_channel_info(pools, content_id):
         where plan_exe_id = '{content_id}';
     """
     fetch_response = await pools.async_fetch(
-        query=fetch_query, db_name="aigc_db_pool", cursor_type=DictCursor
+        query=fetch_query, db_name="aigc", cursor_type=DictCursor
     )
     return fetch_response
 
@@ -25,7 +25,7 @@ async def fetch_aigc_cover(pools, channel_content_id):
         where channel_content_id = '{channel_content_id}' and image_type = 2;
     """
     fetch_response = await pools.async_fetch(
-        query=fetch_query, db_name="aigc_db_pool", cursor_type=DictCursor
+        query=fetch_query, db_name="aigc", cursor_type=DictCursor
     )
 
     return fetch_response
@@ -41,6 +41,6 @@ async def fetch_long_video_cover(pools, channel_content_id):
         where video_id = '{channel_content_id}';
     """
     fetch_response = await pools.async_fetch(
-        query=fetch_query, db_name="long_video_db_pool", cursor_type=DictCursor
+        query=fetch_query, db_name="long_video", cursor_type=DictCursor
     )
     return fetch_response

+ 24 - 0
applications/utils/response.py

@@ -0,0 +1,24 @@
+class Response:
+
+    @classmethod
+    def success_response(cls, data):
+        return {"code": 0, "status": "success", "data": data}
+
+    @classmethod
+    def error_response(cls, error_code, error_message):
+        return {"code": error_code, "status": "error", "message": error_message}
+
+
+class TaskScheduleResponse:
+    @classmethod
+    async def fail_response(cls, error_code, error_message):
+        return {"code": error_code, "status": "error", "message": error_message}
+
+    @classmethod
+    async def success_response(cls, task_name, data):
+        return {
+            "code": 0,
+            "status": "task execute successfully",
+            "data": data,
+            "task_name": task_name,
+        }

+ 7 - 2
requirements.txt

@@ -11,5 +11,10 @@ aliyun-python-sdk-core
 aliyun-python-sdk-kms
 odps~=3.5.1
 apscheduler~=3.10.4
-tqdm
-pyapollos~=0.1.5
+tqdm~=4.66.6
+pyapollos~=0.1.5
+pyotp~=2.9.0
+elasticsearch~=8.17.2
+openai~=1.97.0
+tenacity~=9.0.0
+fake-useragent~=2.1.0

+ 15 - 1
routes/blueprint.py

@@ -1,10 +1,12 @@
 from quart import Blueprint, jsonify, request
 from applications.service import GetCoverService
 
+from applications.tasks.task_scheduler import TaskScheduler
+
 server_blueprint = Blueprint("api", __name__, url_prefix="/api")
 
 
-def server_routes(pools):
+def server_routes(pools, log_service):
 
     @server_blueprint.route("/get_cover", methods=["POST"])
     async def get_cover():
@@ -12,4 +14,16 @@ def server_routes(pools):
         task = GetCoverService(pools, params)
         return jsonify(await task.deal())
 
+    @server_blueprint.route("/run_task", methods=["POST"])
+    async def run_task():
+        data = await request.get_json()
+        task_scheduler = TaskScheduler(data, log_service, pools)
+        response = await task_scheduler.deal()
+        return jsonify(response)
+
+    @server_blueprint.route("/finish_task", methods=["POST"])
+    async def finish_task():
+        data = await request.get_json()
+        return jsonify({"message": "hello world"})
+
     return server_blueprint

+ 17 - 3
task_app.py

@@ -1,19 +1,33 @@
+import logging
+
 from quart import Quart
+from applications.config import aliyun_log_config
 from applications.database import mysql_manager
+from applications.service import LogService
 from routes import server_routes
 
+log_service = LogService(**aliyun_log_config)
+
 app = Quart(__name__)
-routes = server_routes(mysql_manager)
+routes = server_routes(mysql_manager, log_service)
 app.register_blueprint(routes)
 
+logging.basicConfig(level=logging.INFO)
+
 
 @app.before_serving
 async def startup():
-    print("🚀 Starting application...")
+    logging.info("Starting application...")
     await mysql_manager.init_pools()
+    logging.info("Mysql pools init successfully")
+    await log_service.start()
+    logging.info("aliyun log service init successfully")
 
 
 @app.after_serving
 async def shutdown():
-    print("🛑 Shutting down application...")
+    logging.info("Shutting down application...")
     await mysql_manager.close_pools()
+    logging.info("Mysql pools close successfully")
+    await log_service.stop()
+    logging.info("aliyun log service stop successfully")