ソースを参照

2024-06-11
优化重搜索逻辑
视频生成文章接口返回三条视频

罗俊辉 11 ヶ月 前
コミット
011a4e19de

+ 13 - 0
applications/functions/common.py

@@ -8,6 +8,7 @@ import uuid
 import random
 import hashlib
 import requests
+import aiohttp
 import urllib.parse
 
 
@@ -185,3 +186,15 @@ def account_info_map(gh_id):
         return ""
     else:
         return ""
+
+
+async def request_etl(url, headers, json_data):
+    """
+    异步请求ETL
+    :return:
+    """
+    async with aiohttp.ClientSession() as session:
+        async with session.post(url, headers=headers, json=json_data, timeout=60) as response:
+            response_data = await response.json()
+            return response_data
+

+ 5 - 10
applications/routes.py

@@ -62,7 +62,7 @@ def Routes(mysql_client):
         if "video_id=" in title:
             video_id = title.split("video_id=")[-1]
             insert_sql = f"""
-                            INSERT INTO long_articles_video 
+                            INSERT INTO long_articles_video_dev
                                 (trace_id, gh_id, article_title, article_text, account_name, recall_video_id1)
                             VALUES 
                                 ('{trace_id}', '{gh_id}', '{title}', '{contents}', '{account_name}', '{video_id}');"""
@@ -75,7 +75,7 @@ def Routes(mysql_client):
             return jsonify(res)
         else:
             insert_sql = f"""
-                INSERT INTO long_articles_video 
+                INSERT INTO long_articles_video_dev
                     (trace_id, gh_id, article_title, article_text, account_name)
                 VALUES 
                     ('{trace_id}', '{gh_id}', '{title}', '{contents}', '{account_name}');"""
@@ -146,20 +146,15 @@ def Routes(mysql_client):
             function="re_search_videos_from_the_web",
             trace_id=trace_id
         )
+
         try:
-            asyncio.ensure_future(
-                re_search_videos(
-                    params=params,
-                    trace_id=trace_id,
-                    gh_id=gh_id,
-                    mysql_client=mysql_client
-                )
-            )
+            asyncio.ensure_future(re_search_videos(params, trace_id, gh_id))
             res = {
                 "status": "success",
                 "code": 0,
                 "traceId": trace_id
             }
+            return jsonify(res)
         except Exception as e:
             res = {
                 "status": "fail",

+ 3 - 3
applications/schedule/process_schedule.py

@@ -44,7 +44,7 @@ async def return_info_v2(video_id, trace_id, mysql_client):
         "paragraphPosition": 0.25
     }
     update_result_sql = f"""
-                        UPDATE long_articles_video
+                        UPDATE long_articles_video_dev
                         SET
                             result1 = '{json.dumps(result, ensure_ascii=False)}',
                             success = 1
@@ -105,7 +105,7 @@ async def return_info(video_id, kimi_title, trace_id, mysql_client, index):
         "paragraphPosition": index * 0.25
     }
     update_result_sql = f"""
-                    UPDATE long_articles_video
+                    UPDATE long_articles_video_dev
                     SET
                         result{index} = '{json.dumps(result, ensure_ascii=False)}',
                         success = 1
@@ -136,7 +136,7 @@ async def recall_videos(trace_id, mysql_client):
     """
     select_sql = f"""
         SELECT recall_video_id1, recall_video_id2, recall_video_id3, kimi_title 
-        FROM long_articles_video
+        FROM long_articles_video_dev
         WHERE trace_id = '{trace_id}';
     """
     info_tuple = await mysql_client.async_select(select_sql)

+ 91 - 88
applications/schedule/search_schedule.py

@@ -3,16 +3,15 @@
 调用接口在微信内搜索视频
 """
 import json
-import asyncio
 import time
 
 from applications.search import *
 from applications.static.config import gh_id_dict
 from applications.functions.log import logging
 from applications.functions.video_item import VideoProducer
-from applications.functions.async_etl import AsyncETL
 from applications.functions.mysql import select_sensitive_words
 from applications.functions.kimi import KimiServer
+from applications.functions.common import request_etl
 
 
 class SearchABTest(object):
@@ -293,17 +292,17 @@ class SearchMethod(object):
             return L
 
 
-async def video_sender(video_obj, user, trace_id, platform):
+async def video_sender(video_obj, user, trace_id, platform, index):
     """
     异步处理微信 video_obj
     公众号和站内账号一一对应
+    :param index:
     :param platform:
     :param user:
     :param trace_id:
     :param video_obj:
     :return:
     """
-    # ETL_MQ = MQ(topic_name="topic_crawler_etl_prod")
     Video = VideoProducer()
     if platform == "xg_search":
         mq_obj = Video.xg_video_producer(
@@ -331,15 +330,16 @@ async def video_sender(video_obj, user, trace_id, platform):
         )
     else:
         mq_obj = {}
-    AE = AsyncETL(video_obj=mq_obj)
-    video_id = await AE.etl_deal()
-    logging(
-        code="6002",
-        info="视频下载完成, 平台是---{}".format(platform),
-        data=mq_obj,
-        trace_id=trace_id,
+    mq_obj['index'] = index
+    mq_obj['trace_id'] = trace_id
+    header = {
+        "Content-Type": "application/json",
+    }
+    await request_etl(
+        url="http://47.99.132.47:4612/etl",
+        headers=header,
+        json_data=mq_obj
     )
-    return video_id
 
 
 async def search_videos(params, trace_id, gh_id, mysql_client):
@@ -358,7 +358,7 @@ async def search_videos(params, trace_id, gh_id, mysql_client):
     content_title = kimi_info['content_title'].replace("'", "").replace('"', "")
     content_keys = json.dumps(kimi_info['content_keys'], ensure_ascii=False)
     update_kimi_sql = f"""
-                    UPDATE long_articles_video SET
+                    UPDATE long_articles_video_dev SET
                     kimi_title = '{kimi_title}',
                     kimi_summary = '{content_title}',
                     kimi_keys = '{content_keys}'
@@ -367,87 +367,86 @@ async def search_videos(params, trace_id, gh_id, mysql_client):
     await mysql_client.async_insert(update_kimi_sql)
     kimi_info["trace_id"] = trace_id
     SearchAB = SearchABTest(info=kimi_info, gh_id=gh_id)
-    recall_obj_1 = await SearchAB.ab_1()
-    # recall_obj_1 = await SearchAB.ab_0()
-    await asyncio.sleep(3)
-    recall_obj_2 = await SearchAB.ab_2()
-    await asyncio.sleep(3)
-    recall_obj_3 = await SearchAB.ab_3()
-    print("{}---视频搜索正常".format(trace_id))
-    recall_list = [recall_obj_1, recall_obj_2, recall_obj_3]
-    un_empty_list = [i for i in recall_list if i]
-    if len(un_empty_list) < 3:
-        await asyncio.sleep(3)
-        recall_obj_4 = await SearchAB.ab_4()
-        if recall_obj_4:
-            un_empty_list.append(recall_obj_4)
-
-    # 逐条下载,逐条写表
-    if un_empty_list:
-        for index, recall_obj in enumerate(un_empty_list, 1):
-            platform = recall_obj["platform"]
-            recall_video = recall_obj["result"]
+    # 启三个搜索,每个搜索都保证要搜索到, 分别用key1, key2, key3去搜索
+    recall_list = await SearchAB.ab_5()
+    print("一共搜索到{}条视频".format(len(recall_list)))
+    index = 0
+    for recall_obj in recall_list:
+        if recall_obj:
+            platform = recall_obj['platform']
+            recall_video = recall_obj['result']
             if recall_video:
-                logging(
-                    code="7002",
-                    info="视频搜索成功, 搜索平台为--{}".format(platform),
-                    trace_id=trace_id,
-                    data=recall_video,
-                )
-                video_id = await video_sender(
+                index += 1
+                await video_sender(
                     video_obj=recall_video,
                     user=gh_id_dict.get(gh_id),
                     trace_id=trace_id,
                     platform=platform,
+                    index=index
                 )
-                update_id_sql = f"""
-                    UPDATE long_articles_video
-                    SET
-                    recall_video_id{index} = {video_id}
-                    WHERE
-                    trace_id = '{trace_id}'
-                    """
-                await mysql_client.async_insert(update_id_sql)
-    else:
-        logging(
-            code="7003",
-            info="视频搜索失败, 被敏感词过滤",
-            trace_id=trace_id
-        )
-
-
-async def insert_into_mysql(index, mysql_client, recall_video, gh_id, trace_id, platform):
-    """
-    :param platform:
-    :param trace_id:
-    :param gh_id:
-    :param index:
-    :param mysql_client:
-    :param recall_video:
-    """
-    video_id = await video_sender(
-        video_obj=recall_video,
-        user=gh_id_dict.get(gh_id),
-        trace_id=trace_id,
-        platform=platform,
-    )
-    update_id_sql = f"""
-        UPDATE long_articles_video
-        SET
-        recall_video_id{index} = {video_id}
-        WHERE
-        trace_id = '{trace_id}'
-    """
-    await mysql_client.async_insert(update_id_sql)
+                logging(
+                    code="7004",
+                    info="成功请求etl",
+                    trace_id=trace_id
+                )
+                if index >= 3:
+                    print("already downloaded 3 videos")
+                    break
+    # SearchAB = SearchABTest(info=kimi_info, gh_id=gh_id)
+    # recall_obj_1 = await SearchAB.ab_1()
+    # # recall_obj_1 = await SearchAB.ab_0()
+    # await asyncio.sleep(3)
+    # recall_obj_2 = await SearchAB.ab_2()
+    # await asyncio.sleep(3)
+    # recall_obj_3 = await SearchAB.ab_3()
+    # print("{}---视频搜索正常".format(trace_id))
+    # recall_list = [recall_obj_1, recall_obj_2, recall_obj_3]
+    # un_empty_list = [i for i in recall_list if i]
+    # if len(un_empty_list) < 3:
+    #     await asyncio.sleep(3)
+    #     recall_obj_4 = await SearchAB.ab_4()
+    #     if recall_obj_4:
+    #         un_empty_list.append(recall_obj_4)
+    #
+    # # 逐条下载,逐条写表
+    # if un_empty_list:
+    #     for index, recall_obj in enumerate(un_empty_list, 1):
+    #         platform = recall_obj["platform"]
+    #         recall_video = recall_obj["result"]
+    #         if recall_video:
+    #             logging(
+    #                 code="7002",
+    #                 info="视频搜索成功, 搜索平台为--{}".format(platform),
+    #                 trace_id=trace_id,
+    #                 data=recall_video,
+    #             )
+    #             response = await video_sender(
+    #                 video_obj=recall_video,
+    #                 user=gh_id_dict.get(gh_id),
+    #                 trace_id=trace_id,
+    #                 platform=platform,
+    #                 index=index
+    #             )
+    #             logging(
+    #                 code="7004",
+    #                 info="成功请求etl",
+    #                 trace_id=trace_id,
+    #                 data=response
+    #             )
+    # else:
+    #     logging(
+    #         code="7003",
+    #         info="视频搜索失败, 被敏感词过滤",
+    #         trace_id=trace_id
+    #     )
 
 
-async def re_search_videos(params, trace_id, gh_id, mysql_client):
+async def re_search_videos(params, trace_id, gh_id):
     """
     重新搜索接口
     :param params:
     :param trace_id:
     :param gh_id:
-    :param mysql_client:
     :return:
     """
     obj = {
@@ -467,16 +466,20 @@ async def re_search_videos(params, trace_id, gh_id, mysql_client):
             recall_video = recall_obj['result']
             if recall_video:
                 index += 1
-                await insert_into_mysql(
-                    index=index,
-                    mysql_client=mysql_client,
-                    recall_video=recall_video,
-                    gh_id=gh_id,
+                await video_sender(
+                    video_obj=recall_video,
+                    user=gh_id_dict.get(gh_id),
                     trace_id=trace_id,
-                    platform=platform
+                    platform=platform,
+                    index=index
+                )
+                logging(
+                    code="7004",
+                    info="成功请求etl",
+                    trace_id=trace_id
                 )
                 if index >= 3:
                     print("already downloaded 3 videos")
                     break
 
-    print("一个匹配到{}条文章".format(index))
+    print("一个匹配到{}条".format(index))

+ 1 - 1
hypercorn_config.toml

@@ -1,5 +1,5 @@
 reload = true
-bind = "0.0.0.0:8111"
+bind = "0.0.0.0:2111"
 workers = 4
 keep_alive_timeout = 120  # 保持连接的最大秒数,根据需要调整
 graceful_timeout = 30    # 重启或停止之前等待当前工作完成的时间