luojunhui 1 неделя назад
Родитель
Сommit
7ba6abb261

+ 1 - 0
applications/api/__init__.py

@@ -5,6 +5,7 @@ from .aigc_system_api import AigcSystemApi
 from .apollo_api import ApolloApi
 from .deep_seek_api_by_volcanoengine import fetch_deepseek_response
 from .deep_seek_api_official import fetch_deepseek_completion
+from .es_api import ElasticSearchClient
 from .moon_shot_api import fetch_moon_shot_response
 from .nlp_api import similarity_between_title_list
 from .gewe_api import WechatChannelAPI

+ 42 - 107
applications/api/es_api.py

@@ -1,114 +1,49 @@
 import ssl
-import datetime
-import time
-import requests
-import json
-from pymysql.cursors import DictCursor
 
 from elasticsearch import Elasticsearch, ApiError
 from elasticsearch import helpers
-from applications.db import DatabaseConnector
-from config import long_articles_config
-from config.es_mappings import index_name, mappings, settings
+from config.es_mappings import index_name
 
-db_client = DatabaseConnector(long_articles_config)
-db_client.connect()
 
-ctx = ssl.create_default_context(cafile="es_certs.crt")
-
-es_password = 'nkvvASQuQ0XUGRq5OLvm'
-es = Elasticsearch(
-    "https://192.168.205.85:9200",
-    # 如果启用了用户名密码
-    basic_auth=("elastic", es_password),
-    ssl_context=ctx
-)
-
-
-def create_index():
-    # 2. 防守式删除旧索引(可选)
-    if es.indices.exists(index=index_name):
-        es.indices.delete(index=index_name)
-
-    # 3. 创建索引
-    try:
-        es.indices.create(
-            index=index_name,
-            settings=settings,
-            mappings=mappings
+class ElasticSearchClient:
+    def __init__(self, index_=index_name):
+        self.password = "nkvvASQuQ0XUGRq5OLvm"
+        self.hosts = ["https://192.168.205.85:9200", "https://192.168.205.85:9300"]
+        self.ctx = ssl.create_default_context(cafile="config/es_certs.crt")
+        self.es = Elasticsearch(
+            self.hosts, basic_auth=("elastic", self.password), ssl_context=self.ctx
         )
-        print(f"✅ Index <{index_name}> created.")
-    except ApiError as e:
-        # 打印 ES 返回的具体错误
-        print(f"❌ Failed: {e.meta.error['type']} – {e.meta.error['reason']}")
-
-
-def get_max_article_id():
-    response = es.search(
-        index=index_name,
-        size=1,
-        sort="article_id:desc",
-        _source=["article_id"]
-    )
-    # print(json.dumps(response.body, indent=4))
-    return response['hits']['hits'][0]['_source']['article_id']
-
-
-def get_articles(id_):
-    fetch_query = f"""
-        select article_id, platform, out_account_id, title
-        from crawler_meta_article
-        where status = 1 and article_id > %s
-        order by article_id limit 10000;
-    """
-    # 执行查询
-    results = db_client.fetch(fetch_query, cursor_type=DictCursor, params=(id_,))
-    docs = [
-    {
-        "_index": index_name,
-        "_id": item['article_id'],
-        "_source": {
-            "article_id": item['article_id'],
-            "platform":  item['platform'],
-            "out_account_id": item['out_account_id'],
-            "title": item['title']
-    }
-    } for item in results
-]
-    return docs
-
-
-def search(key_string):
-    query = {
-        "query": {
-            "match": {
-                "title": key_string
-            }
-        },
-        "_source": ["article_id", "title"],
-        "size": 100
-    }
-
-    a = time.time()
-    resp = es.search(index=index_name, body=query)
-    b = time.time()
-    for hit in resp["hits"]["hits"]:
-        print(hit["_source"])
-
-    print(b - a)
-
-def get_cluster_docs_stats():
-    """获取集群文档统计信息"""
-    stats = es.nodes.stats()
-    # print(stats)
-    # print(type(stats))
-    print(json.dumps(stats.body, indent=4, ensure_ascii=False))
-
-
-if __name__ == "__main__":
-    with open("search_keys.txt", encoding="utf-8") as f:
-        key_list = f.readlines()
-    import random
-    search_title = random.choice(key_list).strip()
-    print(search_title)
-    search(search_title)
+        self.index_name = index_
+
+    def create_index(self, settings, mappings):
+        if self.es.indices.exists(index=self.index_name):
+            self.es.indices.delete(index=self.index_name)
+
+        try:
+            self.es.indices.create(
+                index=self.index_name, settings=settings, mappings=mappings
+            )
+            print("Index created successfully")
+        except ApiError as e:
+            print(f"❌ Failed: {e.meta.error['type']} – {e.meta.error['reason']}")
+
+    def get_max_article_id(self):
+        response = self.es.search(
+            index=self.index_name,
+            size=1,
+            sort="article_id:desc",
+            _source=["article_id"],
+        )
+        return response["hits"]["hits"][0]["_source"]["article_id"]
+
+    def search(self, search_keys, size=10):
+        query = {
+            "query": {"match": {"title": search_keys}},
+            "_source": ["article_id", "title"],
+            "size": size
+        }
+        resp = self.es.search(index=index_name, body=query)
+        return [i["_source"] for i in resp["hits"]["hits"]]
+
+    def bulk_insert(self, docs):
+        helpers.bulk(self.es, docs)

+ 2 - 5
long_articles_job.py

@@ -50,10 +50,7 @@ def run_fwh_data_manager():
 
 def run_top_article_generalize_from_article_pool():
     task = TopArticleGeneralizeFromArticlePool()
-    top_articles = task.fetch_distinct_top_titles()
-    for top_article in top_articles:
-        keys = task.get_keys_by_ai(top_article)
-        print(",".join(keys))
+    task.deal()
 
 
 def main():
@@ -89,4 +86,4 @@ def main():
 
 
 if __name__ == "__main__":
-    run_top_article_generalize_from_article_pool()
+    main()

+ 55 - 20
tasks/publish_tasks/top_article_generalize.py

@@ -9,10 +9,12 @@ from typing import List, Dict
 from pymysql.cursors import DictCursor
 
 from applications import aiditApi
+from applications.api import ElasticSearchClient
 from applications.api import fetch_deepseek_completion
 from applications.api import similarity_between_title_list
 from applications.db import DatabaseConnector
 from config import long_articles_config, denet_config
+from config.es_mappings import index_name
 
 extract_keywords_prompt = """
 你是一名优秀的中文专家
@@ -36,6 +38,8 @@ class TopArticleGeneralize:
         self.denet_client = DatabaseConnector(denet_config)
         self.denet_client.connect()
 
+        self.elastic_search = ElasticSearchClient(index_=index_name)
+
     def fetch_distinct_top_titles(self) -> List[Dict]:
         """
         获取top100生成计划中的文章标题
@@ -94,19 +98,45 @@ class TopArticleGeneralize:
         )
         return response["keys"]
 
+    def migrate_article_to_es(self, max_article_id: int = 0):
+        fetch_query = f"""
+            select article_id, platform, out_account_id, title
+            from crawler_meta_article
+            where status = 1 and article_id > %s
+            order by article_id limit 10000;
+        """
+        # 执行查询
+        results = self.long_articles_client.fetch(
+            fetch_query, cursor_type=DictCursor, params=(max_article_id,)
+        )
+        docs = [
+            {
+                "_index": index_name,
+                "_id": item["article_id"],
+                "_source": {
+                    "article_id": item["article_id"],
+                    "platform": item["platform"],
+                    "out_account_id": item["out_account_id"],
+                    "title": item["title"],
+                },
+            }
+            for item in results
+        ]
+        self.elastic_search.bulk_insert(docs)
+
 
 class TopArticleGeneralizeFromArticlePool(TopArticleGeneralize):
 
-    def get_candidate_articles(self, key):
+    def get_candidate_articles(self, article_id_tuple):
         fetch_query = f"""
             select article_id, title, link, llm_sensitivity, score, category_by_ai
             from crawler_meta_article
-            where status = 1
-              and title_sensitivity = 0
-              and title like '%{key}%';
+            where status = %s
+              and title_sensitivity = %s
+              and article_id in %s;
         """
         fetch_response = self.long_articles_client.fetch(
-            fetch_query, cursor_type=DictCursor
+            fetch_query, cursor_type=DictCursor, params=(1, 0, article_id_tuple)
         )
         return fetch_response
 
@@ -117,34 +147,43 @@ class TopArticleGeneralizeFromArticlePool(TopArticleGeneralize):
         :return:
         """
         update_sql = f"""
-                    update crawler_meta_article
-                    set status = %s
-                    where article_id in %s and status = %s;
+            update crawler_meta_article
+            set status = %s
+            where article_id in %s and status = %s;
         """
         affect_rows = self.long_articles_client.save(
             query=update_sql, params=(2, tuple(article_id_list), 1)
         )
 
     def deal(self):
+        # migrate articles
+        max_id = self.elastic_search.get_max_article_id()
+        self.migrate_article_to_es(max_id)
+
+        # fetch titles
         title_obj_list = self.fetch_distinct_top_titles()
         publishing_article_list = []
         for title_obj in tqdm(title_obj_list):
             if self.get_title_read_info_detail(title_obj["title"]):
                 try:
-                    temp = []
                     keys = self.get_keys_by_ai(title_obj)
-                    for key in keys:
-                        candidate_articles = self.get_candidate_articles(key)
-                        temp += candidate_articles
+                    related_articles = self.elastic_search.search(
+                        search_keys=",".join(keys), size=50
+                    )
+                    if related_articles:
+                        article_id_list = [i["article_id"] for i in related_articles]
+                        article_list = self.get_candidate_articles(
+                            tuple(article_id_list)
+                        )
 
-                    if temp:
-                        title_list = [i["title"] for i in temp]
+                        title_list = [i["title"] for i in article_list]
                         # 相关性排序
                         similarity_array = similarity_between_title_list(
                             title_list, [title_obj["title"]]
                         )
+
                         response_with_similarity_list = []
-                        for index, item in enumerate(temp):
+                        for index, item in enumerate(article_list):
                             item["similarity"] = similarity_array[index][0]
                             response_with_similarity_list.append(item)
 
@@ -199,6 +238,7 @@ class TopArticleGeneralizeFromArticlePool(TopArticleGeneralize):
             article_id_list = [i["article_id"] for i in publishing_article_list]
             self.change_article_status_while_publishing(article_id_list=article_id_list)
 
+
 class TopArticleGeneralizeFromVideoPool(TopArticleGeneralize):
     def get_candidate_videos(self, key):
         fetch_query = f"""
@@ -223,8 +263,3 @@ class TopArticleGeneralizeFromVideoPool(TopArticleGeneralize):
                     candidate_articles = self.get_candidate_videos(key)
                     temp += candidate_articles
                 print(json.dumps(temp, ensure_ascii=False, indent=4))
-
-#
-# TopArticleGeneralizeFromVideoPool().deal()
-
-