Browse Source

异步处理 kimi

罗俊辉 11 months ago
parent
commit
9b47b96e7a

+ 0 - 5
applications/functions/async_etl.py

@@ -212,9 +212,4 @@ class AsyncETL(object):
             cover=oss_cover
         )
         print(json.dumps(result, ensure_ascii=False, indent=4))
-        a = time.time()
-        os.remove(file_path)
-        os.remove(cover_path)
-        b = time.time()
-        print(b - a)
         return result["data"]["id"]

+ 1 - 16
applications/routes.py

@@ -9,7 +9,6 @@ from quart import Blueprint, jsonify, request
 
 from applications.functions.log import logging
 from applications.schedule import recall_videos, search_videos
-from applications.functions.kimi import KimiServer
 
 my_blueprint = Blueprint('LongArticles', __name__)
 
@@ -39,7 +38,6 @@ def Routes(mysql_client):
         :return:
         """
         params = await request.get_json()
-        K = KimiServer()
         gh_id = params['ghId']
         trace_id = "search-{}-{}".format(str(uuid.uuid4()), str(int(time.time())))
         params['trace_id'] = trace_id
@@ -60,22 +58,9 @@ def Routes(mysql_client):
                 ('{trace_id}', '{gh_id}', '{title}', '{contents}', '{account_name}');"""
         await mysql_client.async_insert(insert_sql)
         try:
-            kimi_info = await K.search_kimi_schedule(params=params)
-            print(json.dumps(kimi_info, ensure_ascii=False, indent=4))
-            kimi_title = kimi_info['k_title']
-            content_title = kimi_info['content_title']
-            content_keys = json.dumps(kimi_info['content_keys'], ensure_ascii=False)
-            update_kimi_sql = f"""
-                UPDATE long_articles_video_dev SET
-                kimi_title = '{kimi_title}',
-                kimi_summary = '{content_title}',
-                kimi_keys = '{content_keys}'
-                WHERE trace_id = '{trace_id}';
-            """
-            await mysql_client.async_insert(update_kimi_sql)
             asyncio.ensure_future(
                 search_videos(
-                    kimi_info=kimi_info,
+                    params=params,
                     trace_id=trace_id,
                     gh_id=gh_id,
                     mysql_client=mysql_client

+ 69 - 43
applications/schedule/search_schedule.py

@@ -2,6 +2,7 @@
 @author: luojunhui
 调用接口在微信内搜索视频
 """
+import json
 import asyncio
 
 from applications.search import *
@@ -10,6 +11,7 @@ from applications.functions.log import logging
 from applications.functions.video_item import VideoProducer
 from applications.functions.async_etl import AsyncETL
 from applications.functions.mysql import select_sensitive_words
+from applications.functions.kimi import KimiServer
 
 
 class SearchABTest(object):
@@ -228,56 +230,80 @@ async def video_sender(video_obj, user, trace_id, platform):
     return video_id
 
 
-async def search_videos(kimi_info, trace_id, gh_id, mysql_client):
+async def search_videos(params, trace_id, gh_id, mysql_client):
     """
     search and send msg to ETL
     :param mysql_client:
-    :param kimi_info:
+    :param params:
     :param gh_id: 通过账号 id 来控制实验策略
     :param trace_id:
     :return:
     """
-    kimi_info["trace_id"] = trace_id
-    SearchAB = SearchABTest(info=kimi_info, gh_id=gh_id)
-    recall_obj_1 = await SearchAB.ab_1()
-    await asyncio.sleep(3)
-    recall_obj_2 = await SearchAB.ab_2()
-    await asyncio.sleep(3)
-    recall_obj_3 = await SearchAB.ab_3()
-    recall_list = [recall_obj_1, recall_obj_2, recall_obj_3]
-    un_empty_list = [i for i in recall_list if i]
-    if len(un_empty_list) < 3:
+    K = KimiServer()
+    try:
+        kimi_info = await K.search_kimi_schedule(params=params)
+        print(json.dumps(kimi_info, ensure_ascii=False, indent=4))
+        kimi_title = kimi_info['k_title']
+        content_title = kimi_info['content_title']
+        content_keys = json.dumps(kimi_info['content_keys'], ensure_ascii=False)
+        update_kimi_sql = f"""
+                        UPDATE long_articles_video_dev SET
+                        kimi_title = '{kimi_title}',
+                        kimi_summary = '{content_title}',
+                        kimi_keys = '{content_keys}'
+                        WHERE trace_id = '{trace_id}';
+                    """
+        await mysql_client.async_insert(update_kimi_sql)
+        kimi_info["trace_id"] = trace_id
+        SearchAB = SearchABTest(info=kimi_info, gh_id=gh_id)
+        recall_obj_1 = await SearchAB.ab_1()
         await asyncio.sleep(3)
-        recall_obj_4 = await SearchAB.ab_4()
-        un_empty_list.append(recall_obj_4)
+        recall_obj_2 = await SearchAB.ab_2()
+        await asyncio.sleep(3)
+        recall_obj_3 = await SearchAB.ab_3()
+        recall_list = [recall_obj_1, recall_obj_2, recall_obj_3]
+        un_empty_list = [i for i in recall_list if i]
+        if len(un_empty_list) < 3:
+            await asyncio.sleep(3)
+            recall_obj_4 = await SearchAB.ab_4()
+            un_empty_list.append(recall_obj_4)
 
-    # 逐条下载,逐条写表
-    if un_empty_list:
-        for index, recall_obj in enumerate(un_empty_list, 1):
-            platform = recall_obj["platform"]
-            recall_video = recall_obj["result"]
-            if recall_video:
-                logging(
-                    code="7002",
-                    info="视频搜索成功, 搜索平台为--{}".format(platform),
-                    trace_id=trace_id,
-                    data=recall_video,
-                )
-                video_id = await video_sender(
-                    video_obj=recall_video,
-                    user=gh_id_dict.get(gh_id),
-                    trace_id=trace_id,
-                    platform=platform,
-                )
-                update_id_sql = f"""
-                UPDATE long_articles_video_dev
-                SET
-                recall_video_id{index} = {video_id}
-                WHERE
-                trace_id = '{trace_id}'
-                """
-                print(update_id_sql)
-                await mysql_client.async_insert(update_id_sql)
-    else:
-        logging(code="7003", info="视频搜索失败", trace_id=trace_id)
+        # 逐条下载,逐条写表
+        if un_empty_list:
+            for index, recall_obj in enumerate(un_empty_list, 1):
+                platform = recall_obj["platform"]
+                recall_video = recall_obj["result"]
+                if recall_video:
+                    logging(
+                        code="7002",
+                        info="视频搜索成功, 搜索平台为--{}".format(platform),
+                        trace_id=trace_id,
+                        data=recall_video,
+                    )
+                    video_id = await video_sender(
+                        video_obj=recall_video,
+                        user=gh_id_dict.get(gh_id),
+                        trace_id=trace_id,
+                        platform=platform,
+                    )
+                    update_id_sql = f"""
+                        UPDATE long_articles_video_dev
+                        SET
+                        recall_video_id{index} = {video_id}
+                        WHERE
+                        trace_id = '{trace_id}'
+                        """
+                    print(update_id_sql)
+                    await mysql_client.async_insert(update_id_sql)
+        else:
+            logging(code="7003", info="视频搜索失败", trace_id=trace_id)
+            return None
+    except Exception as e:
+        logging(
+            code="9000",
+            info="kimi挖掘失败, 原因是-{}".format(e),
+            trace_id=trace_id
+        )
         return None
+
+