瀏覽代碼

异步轮询接口

罗俊辉 1 年之前
父節點
當前提交
151b9c3c98

+ 0 - 147
applications/functions/calculate.py

@@ -1,147 +0,0 @@
-"""
-@author: luojunhui
-"""
-import json
-import os
-
-from applications.log import logging
-from applications.functions.date import generate_daily_strings, five_days_before
-
-
-def read_single_file(filename):
-    """
-    :param filename:
-    """
-    with open(filename, encoding="utf-8") as f:
-        data = json.loads(f.read())
-    if data:
-        return data
-    else:
-        return {}
-
-
-def compute_similarity(file_1, file_2):
-    """
-    计算
-    :param file_1:
-    :param file_2:
-    :return:
-    """
-    data_1 = read_single_file(file_1)
-    data_2 = read_single_file(file_2)
-
-    def calculate_v1(d1, d2):
-        """
-        通过交并集来判断
-        :param d1:
-        :param d2:
-        :return:
-        """
-        f1_keys = set(d1["key_words"])
-        f2_keys = set(d2["key_words"])
-        keys_union = f1_keys | f2_keys
-        keys_intersection = f1_keys & f2_keys
-        f1_search_keys = set(d1["search_keys"])
-        f2_search_keys = set(d2["search_keys"])
-        search_keys_union = f1_search_keys | f2_search_keys
-        search_keys_intersection = f1_search_keys & f2_search_keys
-        f1_extra_keys = set(d1["extra_keys"])
-        f2_extra_keys = set(d2["extra_keys"])
-        extra_keys_union = f1_extra_keys | f2_extra_keys
-        extra_keys_intersection = f1_extra_keys & f2_extra_keys
-        score_1 = len(keys_intersection) / len(keys_union)
-        score_2 = len(search_keys_intersection) / len(search_keys_union)
-        score_3 = len(extra_keys_intersection) / len(extra_keys_union)
-        return score_1 * 0.4 + score_2 * 0.4 + score_3 * 0.2
-
-    def calculate_v2(d1, d2):
-        """
-        计算方法 v2
-        :param d1:
-        :param d2:
-        :return:
-        """
-        score = 0
-        tone_1 = d1["tone"]
-        tone_2 = d2["tone"]
-        if tone_1 == tone_2:
-            score += 0.1
-        target_audience_1 = d1["target_audience"]
-        target_audience_2 = d2["target_audience"]
-        if target_audience_1 == target_audience_2:
-            score += 0.2
-        target_age_1 = d1["target_age"]
-        target_age_2 = d2["target_age"]
-        if target_age_1 == target_age_2:
-            score += 0.2
-        address_1 = d1["address"]
-        address_2 = d2["address"]
-        if address_1 == address_2:
-            score += 0.2
-        gender_1 = d1["theme"]
-        gender_2 = d2["theme"]
-        if gender_1 == gender_2:
-            score += 0.5
-        return score
-
-    if data_1 and data_2:
-        try:
-            score_1 = calculate_v1(data_1, data_2)
-            return score_1
-            # score_2 = calculate_v2(data_1, data_2)
-            # return score_1, score_2
-        except Exception as e:
-            return 0
-    else:
-        return 0
-
-
-def title_mix(title_p, dt, trace_id):
-    """
-    执行代码
-    :param trace_id: 请求唯一 id
-    :param title_p:
-    :param dt: dt
-    """
-    five_days_ago = five_days_before(ori_dt=dt)
-    days_list = generate_daily_strings(five_days_ago, dt)
-    L = []
-    for day_str in days_list:
-        json_path = os.path.join(os.getcwd(), 'applications', 'static', day_str)
-        # 处理标题信息
-        files = os.listdir(json_path)
-        for file in files:
-            if file.endswith(".json"):
-                L.append(os.path.join(json_path, file))
-    print("召回的视频量", len(L))
-    score_list_1 = []
-    # score_list_2 = []
-    for file in L:
-        file_name = file.split('/')[-1].replace(".json", "")
-        v_id = file_name.split('_')[1]
-        uid = file_name.split('_')[0]
-        # score1, score2 = compute_similarity(title_p, file)
-        score1 = compute_similarity(title_p, file)
-        score_list_1.append([score1, v_id, uid])
-        # score_list_2.append([score2, v_id, uid])
-
-    s1_list = sorted(score_list_1, key=lambda x: x[0], reverse=True)
-    # s2_list = sorted(score_list_2, key=lambda x: x[0], reverse=True)
-    title = title_p.split("/")[-1].replace(".json", "")
-    obj = {
-        "title": title,
-        "s1_vid": s1_list[0][1],
-        "s1_score": s1_list[0][0],
-        "s1_uid": s1_list[0][2],
-        # "s2_vid": s2_list[0][1],
-        # "s2_score": s2_list[0][0],
-        # "s2_uid": s2_list[0][2]
-    }
-    logging(
-        code="1003",
-        info="计算结果得分",
-        data=obj,
-        function="title_mix",
-        trace_id=trace_id
-    )
-    return obj

+ 71 - 0
applications/functions/common.py

@@ -1,12 +1,47 @@
 """
 @author: luojunhui
 """
+import os
 import json
 import uuid
 import requests
 import urllib.parse
+from concurrent.futures import ThreadPoolExecutor
 
 from applications.functions.auto_white import auto_white
+from applications.functions.mysql import select
+from applications.functions.ask_kimi import ask_kimi
+from applications.log import logging
+
+
+def ask_kimi_and_save_to_local(info_tuple):
+    """
+    save file to local
+    :return:
+    """
+    title, trace_id, save_path = info_tuple[0], info_tuple[1], info_tuple[2]
+    if os.path.exists(save_path):
+        logging(
+            code="1002",
+            info="该 video 信息已经挖掘完成---{}".format(title),
+            function="ask_kimi_and_save_to_local",
+            trace_id=trace_id,
+        )
+    else:
+        os.makedirs(os.path.dirname(save_path), exist_ok=True)
+        if not title:
+            result = {}
+        else:
+            result = ask_kimi(title)
+        logging(
+            code="1002",
+            info="kimi-result",
+            data=result,
+            trace_id=trace_id,
+            function="ask_kimi_and_save_to_local"
+        )
+        with open(save_path, "w", encoding="utf-8") as f:
+            f.write(json.dumps(result, ensure_ascii=False))
 
 
 def create_gzh_path(video_id, shared_uid):
@@ -48,3 +83,39 @@ def choose_video(result):
         return result['s1_uid'], result['s1_vid']
     else:
         return None, None
+
+
+def find_videos_in_mysql(trace_id):
+    """
+    通过 trace_id去 pq_spider_mysql 搜索视频
+    :param trace_id:
+    :return:
+    """
+    sql = "select video_id, video_title from crawler_video where out_user_id = '{}' limit 10;".format(trace_id)
+    out_video_list = select(sql=sql)
+    if len(out_video_list) > 0:
+        vid_list = [i[0] for i in out_video_list]
+        dir_path = os.path.join(os.getcwd(), 'applications', 'static', "out_videos")
+        os.makedirs(os.path.dirname(dir_path), exist_ok=True)
+        done_list = os.listdir(dir_path)
+        process_list = [
+            (
+                i[1],
+                trace_id,
+                os.path.join(dir_path, "{}.json".format(i[0]))
+            ) for i in out_video_list if not "{}.json".format(i[0]) in done_list
+        ]
+        with ThreadPoolExecutor(max_workers=10) as pool:
+            pool.map(ask_kimi_and_save_to_local, process_list)
+
+        return {
+            "search_videos": "success",
+            "trace_id": trace_id,
+            "video_list": vid_list
+        }
+    else:
+        return {
+            "search_videos": "failed",
+            "trace_id": trace_id,
+            "video_list": []
+        }

+ 4 - 3
applications/match_alg/rank.py

@@ -29,12 +29,12 @@ def jac_score(d1, d2):
     return score_1 * 0.4 + score_2 * 0.4 + score_3 * 0.2, d2['video_id']
 
 
-async def best_choice(params_obj, request_param, trace_id):
+async def best_choice(params_obj, trace_id, search_videos):
     """
     计算,返回出最合适的 video_id
     :return: video_id
     """
-    pq_list, search_list = await recall_videos(params=request_param, trace_id=trace_id)
+    pq_list, search_list = await recall_videos(trace_id=trace_id, s_videos=search_videos)
 
     def best_video_id(target_list):
         """
@@ -49,7 +49,8 @@ async def best_choice(params_obj, request_param, trace_id):
             except Exception as e:
                 print(e)
         sorted_list = sorted(score_list, key=lambda x: x[1], reverse=True)
-        return sorted_list[0]
+        return sorted_list[0] if sorted_list else (0, 0)
+
     if search_list:
         best_search_tuple = best_video_id(search_list)
         if best_search_tuple[1] > 0:

+ 16 - 71
applications/match_alg/recall.py

@@ -3,16 +3,9 @@
 """
 import os
 import json
-import time
-import asyncio
-from concurrent.futures import ThreadPoolExecutor
-
-import httpx
-import requests
 
 from applications.log import logging
-from applications.functions.mysql import select, select_pq_videos
-from applications.functions.ask_kimi import ask_kimi
+from applications.functions.mysql import select_pq_videos
 
 
 gh_id_dict = {
@@ -315,80 +308,23 @@ gh_id_dict = {
 }
 
 
-def ask_kimi_and_save_to_local(info_tuple):
-    """
-    save file to local
-    :return:
-    """
-    title, trace_id, save_path = info_tuple[0], info_tuple[1], info_tuple[2]
-    if os.path.exists(save_path):
-        logging(
-            code="1002",
-            info="该 video 信息已经挖掘完成---{}".format(title),
-            function="ask_kimi_and_save_to_local",
-            trace_id=trace_id,
-        )
-    else:
-        os.makedirs(os.path.dirname(save_path), exist_ok=True)
-        if not title:
-            result = {}
-        else:
-            result = ask_kimi(title)
-        logging(
-            code="1002",
-            info="kimi-result",
-            data=result,
-            trace_id=trace_id,
-            function="ask_kimi_and_save_to_local"
-        )
-        with open(save_path, "w", encoding="utf-8") as f:
-            f.write(json.dumps(result, ensure_ascii=False))
-
-
-async def recall_videos(params, trace_id):
+async def recall_videos(trace_id, s_videos):
     """
     通过请求的数据来召回视频
+    :param s_videos:
     :param trace_id:
-    :param params: 请求参数
     :return: file_list
     """
-    title = params['title']
+    # title = params['title']
     # content = params['content']
-    ghId = params['ghId']
-    user_id = gh_id_dict[ghId]['uid']
-
-    # 在外面搜索视频
-    # payload = {
-    #     "ghId": ghId,
-    #     "search_keys": [title],
-    #     "trace_id": trace_id
-    # }
-    # # print(payload)
-    # url = "http://61.48.133.26:8111/search_videos"
-    # requests.post(url, json=payload)
-    # # print("请求完成")
-    await asyncio.sleep(15)
-    # select_sql = "select video_id, video_title from crawler_video where platform='weixin_search' and user_id = '{}' order by update_time DESC limit 10".format(
-    #     user_id)
-    # out_video_list = select(sql=select_sql)
-    # dir_path = os.path.join(os.getcwd(), 'applications', 'static', "out_videos")
-    # os.makedirs(os.path.dirname(dir_path), exist_ok=True)
-    # done_list = os.listdir(dir_path)
-    # process_list = [
-    #     (
-    #         i[1],
-    #         trace_id,
-    #         os.path.join(dir_path, "{}.json".format(i[0]))
-    #     ) for i in out_video_list if not "{}.json".format(i[0]) in done_list
-    # ]
-    # with ThreadPoolExecutor(max_workers=10) as pool:
-    #     pool.map(ask_kimi_and_save_to_local, process_list)
+    # ghId = params['ghId']
+    # user_id = gh_id_dict[ghId]['uid']
 
     # 在两边召回视频
     # pq_videos
     recall_video_list = select_pq_videos()
     dirs_1 = os.path.join(os.getcwd(), 'applications', 'static', 'out_videos')
-    file_list = [os.path.join(dirs_1, file) for file in os.listdir(dirs_1) if file.endswith(".json")]
+    file_list = [os.path.join(dirs_1, "{}.json".format(vid)) for vid in s_videos]
     search_list = []
     for file in file_list:
         with open(file, encoding="utf-8") as f:
@@ -396,4 +332,13 @@ async def recall_videos(params, trace_id):
             if obj:
                 obj['video_id'] = file.split("/")[-1].replace('.json', '')
         search_list.append(obj)
+    logging(
+        code="1002",
+        info="召回视频",
+        data={
+            "pq_list": recall_video_list,
+            "search_list": search_list
+        },
+        trace_id=trace_id
+    )
     return recall_video_list, search_list

+ 6 - 46
applications/process.py

@@ -2,11 +2,8 @@
 @author: luojunhui
 对请求进行操作
 """
+import time
 
-import os
-
-from applications.log import logging
-from applications.functions.ask_kimi import ask_kimi
 from applications.match_alg import best_choice
 from applications.functions.common import *
 
@@ -37,63 +34,26 @@ class ProcessParams(object):
         )
         return data
 
-    def ask_kimi_and_save_to_local(self, title):
-        """
-        save file to local
-        :param title:
-        :return:
-        """
-        save_path = os.path.join(os.getcwd(), 'applications', 'static', "titles", "{}.json".format(title))
-        if os.path.exists(save_path):
-            logging(
-                code="1002",
-                info="该 video 信息已经挖掘完成---{}".format(title),
-                function="ask_kimi_and_save_to_local",
-                trace_id=self.trace_id,
-            )
-            return
-        else:
-            os.makedirs(os.path.dirname(save_path), exist_ok=True)
-            if not title:
-                result = {}
-            else:
-                result = ask_kimi(title)
-            logging(
-                code="1002",
-                info="kimi-result",
-                data=result,
-                trace_id=self.trace_id,
-                function="ask_kimi_and_save_to_local"
-            )
-            with open(save_path, "w", encoding="utf-8") as f:
-                f.write(json.dumps(result, ensure_ascii=False))
-
     async def deal(self, data):
         """执行代码"""
         params = self.get_params(data)
         title = params['title']
         # account_name = params['accountName']
         # ghId = params['ghId']
+        video_list = params['videoList']
 
         title_p = os.path.join(os.getcwd(), 'applications', 'static', "titles", "{}.json".format(title))
-        if os.path.exists(title_p):
-            logging(
-                code="1002",
-                info="该标题已经被 kimi 处理过,跳过请求 kimi 操作--- {}".format(title),
-                function="process",
-                trace_id=self.trace_id
-            )
-        else:
-            self.ask_kimi_and_save_to_local(title)
 
         with open(title_p, encoding="utf-8") as f:
             params_obj = json.loads(f.read())
-
+        s = time.time()
         best_video_id = await best_choice(
             params_obj=params_obj,
             trace_id=self.trace_id,
-            request_param=params
+            search_videos=video_list
         )
+        e = time.time()
+        print(e - s)
         logging(
             code="1002",
             info="best video_id --{}".format(best_video_id),

+ 55 - 22
applications/routes.py

@@ -1,14 +1,15 @@
 """
 @author: luojunhui
 """
+import os
 import time
-import json
 import uuid
-from quart import Blueprint, jsonify, request, websocket
+from quart import Blueprint, jsonify, request
 
 from applications.log import logging
 from applications.process import ProcessParams
 from applications.mq import MQ
+from applications.functions.common import find_videos_in_mysql, ask_kimi_and_save_to_local
 
 my_blueprint = Blueprint('kimi', __name__)
 
@@ -27,41 +28,73 @@ async def hello():
     return jsonify({'message': 'Hello, World!'})
 
 
-@my_blueprint.route('/title_to_video', methods=['POST'])
-async def post_data():
+@my_blueprint.route('/title_to_search', methods=['POST'])
+async def search_videos_from_the_web():
     """
-    请求接口代码
+    从web 搜索视频并且存储到票圈的视频库中
     :return:
     """
-    trace_id = str(uuid.uuid4()) + "-" + str(int(time.time()))
+    mq = MQ(topic_name="search_spider_prod")
+    trace_id = "search-{}-{}".format(str(uuid.uuid4()), str(int(time.time())))
     logging(
         code="1001",
-        info="请求接口成功",
-        port="title_to_video",
+        info="搜索视频内容接口请求成功",
+        port="title_to_search",
         trace_id=trace_id
     )
-    p = ProcessParams(t_id=trace_id)
+    params = await request.get_json()
+    params['trace_id'] = trace_id
+    title = params['title']
+    title_p = os.path.join(os.getcwd(), 'applications', 'static', "titles", "{}.json".format(title))
+    if os.path.exists(title_p):
+        logging(
+            code="1002",
+            info="该标题已经被 kimi 处理过,跳过请求 kimi 操作--- {}".format(title),
+            function="process",
+            trace_id=trace_id
+        )
+    else:
+        ask_kimi_and_save_to_local((title, trace_id, title_p))
+    mq.send_msg(params=params)
+    res = {
+        "trace_id": trace_id,
+        "code": 0
+    }
+    return jsonify(res)
+
+
+@my_blueprint.route('/out_videos', methods=['POST'])
+async def find_in_mysql():
+    """
+    搜索是否存在外站视频 video_list, 如果存在,则返回成功
+    :return:
+    """
     data = await request.get_json()
-    processed_data = await p.deal(data)
-    return jsonify(processed_data)
+    trace_id = data['traceId']
+    logging(
+        code="1001",
+        info="请求接口成功",
+        port="title_to_video",
+        trace_id=data['traceId']
+    )
+    res = find_videos_in_mysql(trace_id=trace_id)
+    return jsonify(res)
 
 
-@my_blueprint.route('/search_videos', methods=['POST'])
-async def search_data():
+@my_blueprint.route('/title_to_video', methods=['POST'])
+async def post_data():
     """
-    通过搜索词去搜索获取视频信息
+    请求接口代码
     :return:
     """
-    mq = MQ(topic_name="search_spider_prod")
-    trace_id = "search-{}-{}".format(str(uuid.uuid4()), str(int(time.time())))
+    data = await request.get_json()
+    trace_id = data['traceId']
     logging(
         code="1001",
         info="请求接口成功",
-        port="search_videos",
+        port="title_to_video",
         trace_id=trace_id
     )
-    data = await request.get_json()
-    mq.send_msg(params=data)
-    return jsonify({
-        "code": 0
-    })
+    p = ProcessParams(t_id=trace_id)
+    processed_data = await p.deal(data)
+    return jsonify(processed_data)