Browse Source

Merge branch 'feature/luojunhui/20260423-add-es-server' of Server/LongArticleTaskServer into master

luojunhui 1 week ago
parent
commit
5486b209df

+ 2 - 0
app/api/service/__init__.py

@@ -4,6 +4,7 @@
 from .task_manager_service import TaskManager
 from .gzh_cookie_manager import GzhCookieManager
 from .daily_rank_manager import DailyRankManager
+from .elastic_search_manager import ElasticSearchManager
 
 # 任务调度器
 from .task_scheduler import TaskScheduler
@@ -13,5 +14,6 @@ __all__ = [
     "TaskManager",
     "GzhCookieManager",
     "DailyRankManager",
+    "ElasticSearchManager",
     "TaskScheduler",
 ]

+ 17 - 0
app/api/service/elastic_search_manager.py

@@ -0,0 +1,17 @@
+from __future__ import annotations
+
+from app.core.config import GlobalConfigSettings
+from app.infra.external import AsyncElasticSearchClient
+
+
+class ElasticSearchManager:
+    def __init__(self, config: GlobalConfigSettings):
+        self.config = config
+
+    async def search(self, keywords: str, size: int = 10) -> dict:
+        async with AsyncElasticSearchClient(self.config) as client:
+            results = await client.search(search_keys=keywords, size=size)
+        return {
+            "total": len(results),
+            "results": results,
+        }

+ 2 - 0
app/api/v1/endpoints/__init__.py

@@ -4,6 +4,7 @@ from .tasks import create_tasks_bp
 from .tokens import create_tokens_bp
 from .monitor import create_monitor_bp
 from .rank_log import create_rank_log_bp
+from .elasticsearch import create_elasticsearch_bp
 # from .mcp import create_mcp_bp
 
 __all__ = [
@@ -13,5 +14,6 @@ __all__ = [
     "create_tokens_bp",
     "create_monitor_bp",
     "create_rank_log_bp",
+    "create_elasticsearch_bp",
     # "create_mcp_bp",
 ]

+ 27 - 0
app/api/v1/endpoints/elasticsearch.py

@@ -0,0 +1,27 @@
+from __future__ import annotations
+
+from pydantic import ValidationError
+from quart import Blueprint, jsonify
+
+from app.api.service import ElasticSearchManager
+from app.api.v1.utils import ApiDependencies
+from app.api.v1.utils import ElasticSearchRequest
+from app.api.v1.utils import parse_json, validation_error_response
+
+
+def create_elasticsearch_bp(deps: ApiDependencies) -> Blueprint:
+    bp = Blueprint("elasticsearch", __name__)
+
+    @bp.route("/es_search", methods=["POST"])
+    async def es_search():
+        try:
+            obj, _ = await parse_json(ElasticSearchRequest)
+        except ValidationError as e:
+            payload, status = validation_error_response(e)
+            return jsonify(payload), status
+
+        manager = ElasticSearchManager(config=deps.config)
+        result = await manager.search(keywords=obj.keywords, size=obj.size)
+        return jsonify({"code": 0, "message": "success", "data": result})
+
+    return bp

+ 2 - 0
app/api/v1/routes/routes.py

@@ -10,6 +10,7 @@ from app.api.v1.endpoints import (
     create_tokens_bp,
     create_monitor_bp,
     create_rank_log_bp,
+    create_elasticsearch_bp,
     # create_mcp_bp
 )
 from app.core.config import GlobalConfigSettings
@@ -35,6 +36,7 @@ def register_v1_blueprints(deps: ApiDependencies) -> Blueprint:
     api.register_blueprint(create_abtest_bp(deps))
     api.register_blueprint(create_monitor_bp(deps))
     api.register_blueprint(create_rank_log_bp(deps))
+    api.register_blueprint(create_elasticsearch_bp(deps))
     # api.register_blueprint(create_mcp_bp(deps))
 
     return api

+ 2 - 0
app/api/v1/utils/__init__.py

@@ -7,6 +7,7 @@ from .schemas import (
     GetCoverRequest,
     LongArticlesMcpRequest,
     DailyRankQueryRequest,
+    ElasticSearchRequest,
 )
 
 
@@ -19,5 +20,6 @@ __all__ = [
     "GetCoverRequest",
     "LongArticlesMcpRequest",
     "DailyRankQueryRequest",
+    "ElasticSearchRequest",
     "ApiDependencies",
 ]

+ 7 - 0
app/api/v1/utils/schemas.py

@@ -69,3 +69,10 @@ class DailyRankQueryRequest(BaseRequest):
 
     sort_by: str = Field(default="new_score", description="排序字段")
     sort_dir: str = Field(default="desc", description="排序方向")
+
+
+class ElasticSearchRequest(BaseRequest):
+    """ES 搜索请求"""
+
+    keywords: str = Field(..., min_length=1, description="搜索关键词")
+    size: int = Field(default=10, ge=1, le=100, description="返回结果数量")

File diff suppressed because it is too large
+ 13 - 4
app/core/config/settings/elasticsearch.py


+ 1 - 1
app/domains/data_recycle_tasks/recycle_daily_publish_articles.py

@@ -55,7 +55,7 @@ class Const:
         "gh_9eef14ad6c16",
         "gh_c5cdf60d9ab4",
         "gh_bff0bcb0694a",
-        "gh_3ed305b5817f"
+        "gh_3ed305b5817f",
     ]
 
     # NOT USED SERVER ACCOUNT

+ 1 - 1
app/domains/monitor_tasks/rank_log_monitor/__init__.py

@@ -1,3 +1,3 @@
 from .entrance import RankLogMonitor
 
-__all__ = ['RankLogMonitor']
+__all__ = ["RankLogMonitor"]

+ 0 - 1
app/domains/monitor_tasks/rank_log_monitor/_const.py

@@ -1,5 +1,4 @@
 class RankLogMonitorConst:
-
     DEFAULT_HIS_FISSION_OPEN_RATE_SCORE = 0.32
 
     DETAIL_COLUMN_MAP = {

+ 6 - 3
app/domains/monitor_tasks/rank_log_monitor/_utils.py

@@ -5,7 +5,6 @@ from ._const import RankLogMonitorConst
 
 
 class RankLogMonitorUtils(RankLogMonitorConst):
-
     @staticmethod
     def safe_float(raw: Any) -> float | None:
         if raw in ("", None):
@@ -35,7 +34,9 @@ class RankLogMonitorUtils(RankLogMonitorConst):
             normalized[col] = self.safe_float(record.get(col))
         return normalized
 
-    def parse_score_map(self, score_map: str | None, content_pool_type: str | None = None) -> dict[str, Any]:
+    def parse_score_map(
+        self, score_map: str | None, content_pool_type: str | None = None
+    ) -> dict[str, Any]:
         if not score_map:
             return {}
         try:
@@ -48,7 +49,9 @@ class RankLogMonitorUtils(RankLogMonitorConst):
             result[column] = score_map_json.get(key, 0)
 
         his_fission_column = self.DETAIL_COLUMN_MAP["HisFissionOpenRateStrategy"]
-        if content_pool_type == "autoArticlePoolLevel1" and result.get(his_fission_column) in (0, None):
+        if content_pool_type == "autoArticlePoolLevel1" and result.get(
+            his_fission_column
+        ) in (0, None):
             result[his_fission_column] = self.DEFAULT_HIS_FISSION_OPEN_RATE_SCORE
 
         return result

+ 1 - 2
app/domains/recommend/i2i_recommend/_const.py

@@ -1,5 +1,4 @@
 class I2IRecommendDataSyncConst:
-
     RANK_BOT = "rank_bot"
 
     class TaskStatus:
@@ -11,4 +10,4 @@ class I2IRecommendDataSyncConst:
     class VersionStatus:
         INIT = 0
         ONLINE = 1
-        ARCHIVED = 2
+        ARCHIVED = 2

+ 3 - 3
app/domains/recommend/i2i_recommend/_utils.py

@@ -6,19 +6,19 @@ from ._const import I2IRecommendDataSyncConst
 
 
 class I2IRecommendDataSyncUtil(I2IRecommendDataSyncConst):
-
     @staticmethod
     def filter_accounts_to_sync(account_version_list: list[dict]) -> list[dict]:
         """
         筛选需要切换的账号:最大version的status为OFFLINE(0)
         """
         return [
-            row for row in account_version_list
+            row
+            for row in account_version_list
             if row["status"] == I2IRecommendDataSyncConst.VersionStatus.INIT
         ]
 
     # 飞书通知
-    async def bot(self, title: str, detail: Dict, mention: bool=False):
+    async def bot(self, title: str, detail: Dict, mention: bool = False):
         return await feishu_robot.bot(
             title=title,
             detail=detail,

+ 6 - 2
app/domains/recommend/i2i_recommend/entrance.py

@@ -30,7 +30,9 @@ class I2IRecommendDataSyncTask(I2IRecommendDataSyncConst):
                 # 上游数据未更新,无待激活账号
                 await self.tool.bot(
                     title="i2i推荐数据同步-上游数据未更新",
-                    detail={"message": "未查询到status=0的待激活数据,请检查上游大数据任务是否正常"},
+                    detail={
+                        "message": "未查询到status=0的待激活数据,请检查上游大数据任务是否正常"
+                    },
                     mention=True,
                 )
                 return
@@ -40,7 +42,9 @@ class I2IRecommendDataSyncTask(I2IRecommendDataSyncConst):
             for account in accounts:
                 gh_id = account["gh_id"]
                 try:
-                    version_info = await self.mapper.fetch_max_version_for_account(gh_id)
+                    version_info = await self.mapper.fetch_max_version_for_account(
+                        gh_id
+                    )
                     if not version_info:
                         continue
 

+ 20 - 8
app/infra/external/elastic_search.py

@@ -1,20 +1,32 @@
 import ssl
+import base64
 
 from elasticsearch import AsyncElasticsearch
 from elasticsearch.helpers import async_bulk
 
+from app.core.config import GlobalConfigSettings
+
 
 class AsyncElasticSearchClient:
-    def __init__(self, index_):
-        self.password = "nkvvASQuQ0XUGRq5OLvm"
-        self.hosts = ["https://192.168.205.85:9200", "https://192.168.205.85:9300"]
-        self.ctx = ssl.create_default_context(
-            cafile="app/core/config/cert/es_certs.crt"
-        )
+    def __init__(self, global_config: GlobalConfigSettings):
+        config = global_config.elasticsearch
+        self.hosts = config.hosts
+
+        # 优先使用 cert_content (base64 编码的证书内容),其次使用 cert_path
+        if config.cert_content:
+            cert_data = base64.b64decode(config.cert_content).decode("utf-8")
+            self.ctx = ssl.create_default_context(cadata=cert_data)
+        elif config.cert_path:
+            self.ctx = ssl.create_default_context(cafile=config.cert_path)
+        else:
+            self.ctx = ssl.create_default_context()
+
         self.es = AsyncElasticsearch(
-            self.hosts, basic_auth=("elastic", self.password), ssl_context=self.ctx
+            self.hosts,
+            basic_auth=(config.username, config.password),
+            ssl_context=self.ctx,
         )
-        self.index_name = index_
+        self.index_name = config.index_name
 
     async def create_index(self, settings, mappings):
         exists = await self.es.indices.exists(index=self.index_name)

+ 4 - 0
app/infra/internal/aigc_system.py

@@ -206,3 +206,7 @@ async def get_titles_from_produce_plan(pool, plan_id, threshold=None):
         query=query, db_name="aigc", params=(plan_id, threshold)
     )
     return tuple([i["title"] for i in response])
+
+
+async def create_aigc_decode_task():
+    pass

+ 1 - 1
app/infra/internal/piaoquan.py

@@ -75,4 +75,4 @@ async def publish_video_to_piaoquan(oss_path: str, uid: str, title: str) -> Dict
     async with AsyncHttpClient() as client:
         response = await client.post(url, data=payload, headers=headers)
 
-    return response
+    return response

+ 1 - 1
app/jobs/domains/algorithm.py

@@ -1,3 +1,3 @@
 from app.domains.algorithm_tasks import AccountCategoryAnalysis
 
-__all__ = ['AccountCategoryAnalysis']
+__all__ = ["AccountCategoryAnalysis"]

+ 1 - 1
app/jobs/domains/anaylsis.py

@@ -10,4 +10,4 @@ __all__ = [
     "AccountPositionReadAvg",
     "AccountPositionOpenRateAvg",
     "RateLimitedArticleFilter",
-]
+]

+ 1 - 1
app/jobs/domains/cold_start.py

@@ -4,4 +4,4 @@ from app.domains.cold_start_tasks import AdPlatformArticlePublishTask
 __all__ = [
     "ArticlePoolColdStart",
     "AdPlatformArticlePublishTask",
-]
+]

+ 2 - 2
app/jobs/domains/crawler_tasks.py

@@ -8,5 +8,5 @@ __all__ = [
     "CrawlerToutiao",
     "WeixinAccountManager",
     "CrawlerGzhAccountArticles",
-    "CrawlerGzhSearchArticles"
-]
+    "CrawlerGzhSearchArticles",
+]

+ 1 - 1
app/jobs/domains/data_recycle.py

@@ -17,4 +17,4 @@ __all__ = [
     "UpdateOutsideRootSourceIdAndUpdateTimeTask",
     "RecycleFwhDailyPublishArticlesTask",
     "RecycleMiniProgramDetailTask",
-]
+]

+ 1 - 1
app/jobs/domains/llm_task.py

@@ -16,4 +16,4 @@ __all__ = [
     "ArticlePoolCategoryGeneration",
     "CandidateAccountQualityScoreRecognizer",
     "ExtractTitleFeatures",
-]
+]

+ 1 - 1
app/jobs/domains/monitor_task.py

@@ -25,4 +25,4 @@ __all__ = [
     "LimitedAccountAnalysisTask",
     "AdPlatformAccountsMonitorTask",
     "RankLogMonitor",
-]
+]

+ 1 - 1
app/jobs/domains/recommend.py

@@ -3,4 +3,4 @@ from app.domains.recommend.i2i_recommend import I2IRecommendDataSyncTask
 
 __all__ = [
     "I2IRecommendDataSyncTask",
-]
+]

+ 3 - 6
app/jobs/task_handler.py

@@ -449,18 +449,14 @@ class TaskHandler:
     @register("fetch_decode_result")
     async def _fetch_decode_result(self) -> int:
         """获取解构任务结果"""
-        task = FetchDecodeResults(
-            pool=self.db_client, log_service=self.log_client
-        )
+        task = FetchDecodeResults(pool=self.db_client, log_service=self.log_client)
         await task.deal()
         return TaskStatus.SUCCESS
 
     @register("extract_decode_result")
     async def _extract_ad_platform_accounts_decode_result(self) -> int:
         """提取解构任务结果"""
-        task = ExtractDecodeTaskDetail(
-            pool=self.db_client, log_service=self.log_client
-        )
+        task = ExtractDecodeTaskDetail(pool=self.db_client, log_service=self.log_client)
         await task.deal()
         return TaskStatus.SUCCESS
 
@@ -473,4 +469,5 @@ class TaskHandler:
         await task.deal()
         return TaskStatus.SUCCESS
 
+
 __all__ = ["TaskHandler"]

Some files were not shown because too many files changed in this diff