ソースを参照

2024-06-12
第一版缓存搜索接口
记录搜索信息

罗俊辉 10 ヶ月 前
コミット
8ae1c49702

+ 21 - 0
applications/deal/recall_deal.py

@@ -0,0 +1,21 @@
+"""
+@author: luojunhui
+"""
+
+
+class RecallDeal(object):
+    """
+    召回逻辑
+    """
+    def __init__(self, trace_id, mysql_client):
+        self.trace_id = trace_id
+        self.mysql_client = mysql_client
+
+    def deal(self):
+        """
+        Recall Deal
+        :return:
+        """
+
+
+

+ 50 - 18
applications/deal/search_deal.py

@@ -38,11 +38,11 @@ class SearchDeal(object):
             self.account_name = self.params['accountName'].replace("'", "")
             self.content_id = self.params['articleId']
             logging(
-                code="2000",
-                info="搜索视频内容接口请求成功",
+                code="1001",
+                info="搜索视频内容接口请求成功, 参数校验成功",
                 port="title_to_search",
-                function="search_videos_from_the_web",
-                trace_id=self.trace_id
+                trace_id=self.trace_id,
+                data=self.params
             )
             return None
         except Exception as e:
@@ -52,6 +52,13 @@ class SearchDeal(object):
                 "message": str(e),
                 "info": "params check error"
             }
+            logging(
+                code="4001",
+                info="搜索视频内容接口请求成功, 参数校验失败",
+                port="title_to_search",
+                trace_id=self.trace_id,
+                data=self.params
+            )
             return result
 
     async def record(self):
@@ -61,10 +68,15 @@ class SearchDeal(object):
         """
         insert_sql = f"""
                         INSERT INTO {db_article}
-                            (trace_id, gh_id, article_title, article_text, account_name)
+                            (trace_id, gh_id, article_title, article_text, account_name, content_id)
                         VALUES 
-                            ('{self.trace_id}', '{self.gh_id}', '{self.title}', '{self.contents}', '{self.account_name}');"""
+                            ('{self.trace_id}', '{self.gh_id}', '{self.title}', '{self.contents}', '{self.account_name}', '{self.content_id}');"""
         await self.mysql_client.async_insert(insert_sql)
+        logging(
+            code="1002",
+            info="成功记录请求数据到mysql中",
+            trace_id=self.trace_id
+        )
 
     async def process_video_id(self):
         """
@@ -85,9 +97,14 @@ class SearchDeal(object):
             "code": 0,
             "traceId": self.trace_id
         }
+        logging(
+            code="1003",
+            info="视频生成文本服务请求,video_id = {}".format(video_id),
+            trace_id=self.trace_id
+        )
         return res
 
-    async def insert_history_contents_videos(self, vid1, vid2, vid3):
+    async def insert_history_contents_videos(self, vid1, vid2, vid3, kimi_title):
         """
         插入历史视频id
         :return:
@@ -95,26 +112,35 @@ class SearchDeal(object):
         update_sql = f"""
         UPDATE {db_article}
         SET 
-            recall_video_id1={vid1}, recall_video_id2={vid2}, recall_video_id3={vid3}
-        WHERE  trace_id = {self.trace_id}
+            kimi_title='{kimi_title}',
+            recall_video_id1={vid1}, 
+            recall_video_id2={vid2}, 
+            recall_video_id3={vid3}
+        WHERE  trace_id = '{self.trace_id}'
         """
-        self.mysql_client.async_insert(update_sql)
+        await self.mysql_client.async_insert(update_sql)
 
     async def get_history_contents(self):
         """
         check whether the content id exists
         :return:
+
         """
         select_sql = f"""
-            SELECT recall_video_id1, recall_video_id2, recall_video_id3
+            SELECT recall_video_id1, recall_video_id2, recall_video_id3, kimi_title
             FROM {db_article}
-            WHERE content_id = '{self.content_id}'
+            WHERE content_id = '{self.content_id}' and trace_id != '{self.trace_id}'
             ORDER BY id DESC;
         """
-        result = await self.mysql_client.async_select(select_sql)[0]
-        video_1, video_2, video_3 = result
-        if video_1 and video_2 and video_3:
-            return [video_1, video_2, video_3]
+        result = await self.mysql_client.async_select(select_sql)
+        if result:
+            for item in result:
+                video_1, video_2, video_3, kimi_title = item
+                if video_1 and video_2 and video_3 and kimi_title:
+                    return [video_1, video_2, video_3, kimi_title]
+                else:
+                    continue
+            return None
         else:
             return None
 
@@ -129,16 +155,22 @@ class SearchDeal(object):
         else:
             # 记录
             await self.record()
-            # 处理video_id
             if "video_id=" in self.title:
                 return await self.process_video_id()
             else:
                 video_ids = await self.get_history_contents()
                 if video_ids:
+                    logging(
+                        code="1004",
+                        info="获取历史到文章视频",
+                        data=video_ids,
+                        trace_id=self.trace_id
+                    )
                     await self.insert_history_contents_videos(
                         video_ids[0],
                         video_ids[1],
-                        video_ids[2]
+                        video_ids[2],
+                        video_ids[3]
                     )
                     return {"status": "success", "code": 0, "traceId": self.trace_id}
                 else:

+ 3 - 3
applications/functions/kimi.py

@@ -28,7 +28,7 @@ class KimiServer(object):
             kimi_title = await cls.kimi_title(title)
         except Exception as e:
             logging(
-                code="9000",
+                code="4002",
                 info="kimi 挖掘失败--{}, 采用 gpt".format(e),
                 trace_id=trace_id
             )
@@ -45,7 +45,7 @@ class KimiServer(object):
             kimi_info = await cls.kimi_mining(contents)
         except Exception as e:
             logging(
-                code="9000",
+                code="4002",
                 info="kimi 挖掘失败--{}, 采用 gpt".format(e),
                 trace_id=trace_id
             )
@@ -61,7 +61,7 @@ class KimiServer(object):
         kimi_info['k_title'] = kimi_title
         kimi_info['ori_title'] = title
         logging(
-            code="8000",
+            code="1005",
             info="kimi_mining",
             data=kimi_info,
             trace_id=trace_id

+ 0 - 1
applications/routes.py

@@ -1,7 +1,6 @@
 """
 @author: luojunhui
 """
-import asyncio
 from quart import Blueprint, jsonify, request
 
 from applications.functions.log import logging

+ 18 - 113
applications/schedule/search_schedule.py

@@ -187,46 +187,6 @@ class SearchMethod(object):
     """
     s_words = select_sensitive_words()
 
-    @classmethod
-    async def search_v0(cls, text, trace_id):
-        """
-        搜索顺序-wx --> baidu --> xigua
-        一共需要返回三条视频
-        :return:
-        """
-        wx_result = []
-        if wx_result:
-            return {"platform": "wx_search", "result": wx_result[0]}
-        else:
-            logging(
-                code="7001",
-                info="通过微信搜索失败---{}".format(text),
-                trace_id=trace_id,
-            )
-            # 微信搜不到的话,采用好看视频搜索
-            time.sleep(1)
-            baidu_result = hksp_search(key=text, sensitive_words=cls.s_words)
-            if baidu_result:
-                return {"platform": "baidu_search", "result": baidu_result[0]}
-            else:
-                # 若好看视频未搜到,则采用西瓜搜索
-                logging(
-                    code="7001",
-                    info="通过baidu搜索失败---{}".format(text),
-                    trace_id=trace_id,
-                )
-                # return None
-                xigua_result = xigua_search_v2(keyword=text, sensitive_words=cls.s_words)
-                if xigua_result:
-                    return {"platform": "xg_search", "result": xigua_result[0]}
-                else:
-                    logging(
-                        code="7001",
-                        info="通过西瓜搜索失败---{}, 启用兜底方式".format(text),
-                        trace_id=trace_id,
-                    )
-                    return None
-
     @classmethod
     async def search_v1(cls, text, trace_id):
         """
@@ -235,36 +195,19 @@ class SearchMethod(object):
         :param trace_id:
         :return:
         """
-        douyin_result = douyin_search(keyword=text, sensitive_words=cls.s_words)
+        douyin_result = douyin_search(keyword=text, sensitive_words=cls.s_words, trace_id=trace_id)
         if douyin_result:
             return {"platform": "dy_search", "result": douyin_result[0]}
         else:
-            logging(
-                code="7001",
-                info="抖音搜索失败--{}".format(text),
-                trace_id=trace_id
-            )
             time.sleep(1)
-            baidu_result = hksp_search(key=text, sensitive_words=cls.s_words)
+            baidu_result = hksp_search(key=text, sensitive_words=cls.s_words, trace_id=trace_id)
             if baidu_result:
                 return {"platform": "baidu_search", "result": baidu_result[0]}
             else:
-                # 若好看视频未搜到,则采用西瓜搜索
-                logging(
-                    code="7001",
-                    info="通过baidu搜索失败---{}".format(text),
-                    trace_id=trace_id,
-                )
-                # return None
                 xigua_result = xigua_search_v2(keyword=text, sensitive_words=cls.s_words)
                 if xigua_result:
                     return {"platform": "xg_search", "result": xigua_result[0]}
                 else:
-                    logging(
-                        code="7001",
-                        info="通过西瓜搜索失败---{}, 启用兜底方式".format(text),
-                        trace_id=trace_id,
-                    )
                     return None
 
     @classmethod
@@ -276,14 +219,13 @@ class SearchMethod(object):
         :return:
         """
         L = []
-        print(trace_id)
-        douyin_result = douyin_search(keyword=text, sensitive_words=cls.s_words)
+        douyin_result = douyin_search(keyword=text, sensitive_words=cls.s_words, trace_id=trace_id)
         for vid_obj in douyin_result:
             L.append({"platform": "dy_search", "result": vid_obj})
         if len(L) >= 3:
             return L
         else:
-            baidu_result = hksp_search(key=text, sensitive_words=cls.s_words)
+            baidu_result = hksp_search(key=text, sensitive_words=cls.s_words, trace_id=trace_id)
             if baidu_result:
                 L.append({"platform": "baidu_search", "result": baidu_result[0]})
             xigua_result = xigua_search_v2(keyword=text, sensitive_words=cls.s_words)
@@ -336,7 +278,7 @@ async def video_sender(video_obj, user, trace_id, platform, index):
         "Content-Type": "application/json",
     }
     await request_etl(
-        url="http://192.168.203.137:4612/etl",
+        url="http://localhost:4612/etl",
         headers=header,
         json_data=mq_obj
     )
@@ -353,7 +295,6 @@ async def search_videos(params, trace_id, gh_id, mysql_client):
     """
     K = KimiServer()
     kimi_info = await K.search_kimi_schedule(params=params)
-    print("{}---kimi 挖掘正常".format(trace_id))
     kimi_title = kimi_info['k_title']
     content_title = kimi_info['content_title'].replace("'", "").replace('"', "")
     content_keys = json.dumps(kimi_info['content_keys'], ensure_ascii=False)
@@ -369,7 +310,12 @@ async def search_videos(params, trace_id, gh_id, mysql_client):
     SearchAB = SearchABTest(info=kimi_info, gh_id=gh_id)
     # 启三个搜索,每个搜索都保证要搜索到, 分别用key1, key2, key3去搜索
     recall_list = await SearchAB.ab_5()
-    print("一共搜索到{}条视频".format(len(recall_list)))
+    logging(
+        code="1006",
+        info="搜索到{}条视频".format(len(recall_list)),
+        data=recall_list,
+        trace_id=trace_id
+    )
     index = 0
     for recall_obj in recall_list:
         if recall_obj:
@@ -385,60 +331,19 @@ async def search_videos(params, trace_id, gh_id, mysql_client):
                     index=index
                 )
                 logging(
-                    code="7004",
+                    code="1007",
                     info="成功请求etl",
+                    data=recall_video,
                     trace_id=trace_id
                 )
                 if index >= 3:
                     print("already downloaded 3 videos")
+                    logging(
+                        code="1008",
+                        info="成功下载三条视频",
+                        trace_id=trace_id
+                    )
                     break
-    # SearchAB = SearchABTest(info=kimi_info, gh_id=gh_id)
-    # recall_obj_1 = await SearchAB.ab_1()
-    # # recall_obj_1 = await SearchAB.ab_0()
-    # await asyncio.sleep(3)
-    # recall_obj_2 = await SearchAB.ab_2()
-    # await asyncio.sleep(3)
-    # recall_obj_3 = await SearchAB.ab_3()
-    # print("{}---视频搜索正常".format(trace_id))
-    # recall_list = [recall_obj_1, recall_obj_2, recall_obj_3]
-    # un_empty_list = [i for i in recall_list if i]
-    # if len(un_empty_list) < 3:
-    #     await asyncio.sleep(3)
-    #     recall_obj_4 = await SearchAB.ab_4()
-    #     if recall_obj_4:
-    #         un_empty_list.append(recall_obj_4)
-    #
-    # # 逐条下载,逐条写表
-    # if un_empty_list:
-    #     for index, recall_obj in enumerate(un_empty_list, 1):
-    #         platform = recall_obj["platform"]
-    #         recall_video = recall_obj["result"]
-    #         if recall_video:
-    #             logging(
-    #                 code="7002",
-    #                 info="视频搜索成功, 搜索平台为--{}".format(platform),
-    #                 trace_id=trace_id,
-    #                 data=recall_video,
-    #             )
-    #             response = await video_sender(
-    #                 video_obj=recall_video,
-    #                 user=gh_id_dict.get(gh_id),
-    #                 trace_id=trace_id,
-    #                 platform=platform,
-    #                 index=index
-    #             )
-    #             logging(
-    #                 code="7004",
-    #                 info="成功请求etl",
-    #                 trace_id=trace_id,
-    #                 data=response
-    #             )
-    # else:
-    #     logging(
-    #         code="7003",
-    #         info="视频搜索失败, 被敏感词过滤",
-    #         trace_id=trace_id
-    #     )
 
 
 async def re_search_videos(params, trace_id, gh_id):

+ 18 - 2
applications/search/dy_search.py

@@ -5,12 +5,14 @@ import json
 import requests
 
 from applications.functions.common import sensitive_flag
+from applications.functions.log import logging
 
 
-def douyin_search(keyword, sensitive_words):
+def douyin_search(keyword, sensitive_words, trace_id):
     """
     Search with dou cha cha
     rank the relevance and recall the best three videos
+    :param trace_id:
     :param sensitive_words: sensitive words in pq
     :param keyword: the words needs to be searched
     :return:
@@ -42,9 +44,23 @@ def douyin_search(keyword, sensitive_words):
                     continue
             except Exception as e:
                 continue
+        logging(
+            code="8001",
+            info="抖音搜索",
+            data={
+                "keys": keyword,
+                "search_count": len(dt_list),
+                "useful_count": len(L)
+            },
+            trace_id=trace_id
+        )
         return L
     except Exception as e:
-        print("search_fail---{}, error---{}".format(keyword, e))
+        logging(
+            code="4003",
+            info="抖音搜索失败-搜索词:{} 原因:-{}".format(keyword, e),
+            trace_id=trace_id
+        )
         return []
 
 

+ 22 - 3
applications/search/hksp_search.py

@@ -12,10 +12,14 @@ from uuid import uuid4
 from fake_useragent import FakeUserAgent
 
 from applications.functions.common import sensitive_flag
+from applications.functions.log import logging
 
 
 def tunnel_proxies():
-    # 隧道域名:端口号
+    """
+    快代理
+    :return:
+    """
     tunnel = "q796.kdltps.com:15818"
     # 用户名密码方式
     username = "t17772369458618"
@@ -61,7 +65,7 @@ def get_video_detail(video_id):
     return response['data']['apiData']['curVideoMeta']
 
 
-def hksp_search(key, sensitive_words):
+def hksp_search(key, sensitive_words, trace_id):
     """
     好看视频搜索爬虫
     """
@@ -114,6 +118,21 @@ def hksp_search(key, sensitive_words):
                     continue
             except Exception as e:
                 pass
+        logging(
+            code="8001",
+            info="百度搜索",
+            data={
+                "keys": key,
+                "search_count": len(data_list),
+                "useful_count": len(L)
+            },
+            trace_id=trace_id
+        )
         return L
-    except:
+    except Exception as e:
+        logging(
+            code="4003",
+            info="百度搜索失败-搜索词:{} 原因:-{}".format(key, e),
+            trace_id=trace_id
+        )
         return []

+ 1 - 3
hypercorn_config.toml → match_server.toml

@@ -3,6 +3,4 @@ bind = "0.0.0.0:8111"
 workers = 4
 keep_alive_timeout = 120  # 保持连接的最大秒数,根据需要调整
 graceful_timeout = 30    # 重启或停止之前等待当前工作完成的时间
-loglevel = "debug"  # 日志级别
-accesslog = "access.log"  # 访问日志文件
-errorlog = "error.log"  # 错误日志文件
+loglevel = "debug"  # 日志级别