罗俊辉 hace 10 meses
padre
commit
0f4ddb2a51

+ 3 - 1
applications/deal/__init__.py

@@ -2,4 +2,6 @@
 @author: luojunhui
 """
 from .search_deal import SearchDeal
-from .re_search_deal import ReSearchDeal
+from .re_search_deal import ReSearchDeal
+from .process_deal import ProcessDeal
+from .search_deal_v2 import SearchDeal2

+ 216 - 0
applications/deal/process_deal.py

@@ -0,0 +1,216 @@
+"""
+@author: luojunhui
+"""
+from applications.static.config import db_article
+from applications.schedule import search_videos
+
+
+class ProcessDeal(object):
+    """
+    定时执行任务
+    """
+
+    def __init__(self, mysql_client):
+        """
+        :param mysql_client:
+        """
+        self.mysql_client = mysql_client
+
+    async def get_task(self):
+        """
+        获取任务
+        :return:
+        """
+        select_sql = f"""
+            SELECT trace_id, content_id, gh_id, article_title, article_text, content_status, process_times
+            FROM {db_article} 
+            WHERE content_status = 0 and process_times <= 5
+            ORDER BY request_time_stamp
+            ASC
+            LIMIT 10;
+        """
+        print(select_sql)
+        task_list = await self.mysql_client.async_select(sql=select_sql)
+        task_obj_list = [
+            {
+                "trace_id": item[0],
+                "content_id": item[1],
+                "gh_id": item[2],
+                "title": item[3],
+                "text": item[4],
+                "content_status": item[5],
+                "process_times": item[6]
+            } for item in task_list
+        ]
+        return task_obj_list
+
+    async def get_history_contents(self, content_id):
+        """
+        check whether the content id exists
+        :return: trace_id or None
+        """
+        select_sql = f"""
+               SELECT trace_id, content_status
+               FROM {db_article}
+               WHERE content_id = '{content_id}'
+               ORDER BY id DESC;
+           """
+        result = await self.mysql_client.async_select(select_sql)
+        if result:
+            for item in result:
+                trace_id, content_status = item
+                if content_status == 2:
+                    return trace_id
+                else:
+                    continue
+            return None
+        else:
+            return None
+
+    async def judge_content_processing(self, content_id):
+        """
+        判断该content_id是否在处理中
+        :param content_id:
+        :return:
+        """
+        select_sql = f"""
+                       SELECT trace_id, content_status
+                       FROM {db_article}
+                       WHERE content_id = '{content_id}'
+                       ORDER BY id DESC;
+                   """
+        result = await self.mysql_client.async_select(select_sql)
+        if result:
+            for item in result:
+                trace_id, content_status = item
+                if content_status == 1:
+                    return False
+            return True
+        else:
+            return True
+
+    async def insert_history_contents_videos(self, history_trace_id, params):
+        """
+        插入历史视频id
+        :return:
+        """
+        select_sql = f"""
+            SELECT kimi_title, recall_video_id1, recall_video_id2, recall_video_id3
+            FROM {db_article}
+            WHERE trace_id = '{history_trace_id}';
+        """
+        info = await self.mysql_client.async_select(sql=select_sql)
+        kimi_title, vid1, vid2, vid3 = info[0]
+        update_sql = f"""
+        UPDATE {db_article}
+        SET 
+            kimi_title='{kimi_title}',
+            recall_video_id1={vid1}, 
+            recall_video_id2={"NULL" if vid2 is None else vid2}, 
+            recall_video_id3={"NULL" if vid3 is None else vid3},
+            content_status=2,
+            process_times = {int(params['process_times']) + 1}
+        WHERE  trace_id = '{params['trace_id']}'
+        """
+        await self.mysql_client.async_insert(update_sql)
+
+    async def process_video_id(self, title, trace_id, process_times):
+        """
+        如果video_id在标题中,则做特殊处理
+        :return:
+        """
+        video_id = title.split("video_id=")[-1]
+        update_sql = f"""
+            UPDATE  
+                {db_article}
+            SET 
+                recall_video_id1 = '{video_id}',
+                content_status = 2,
+                process_times = {int(process_times) + 1}
+            WHERE  
+                trace_id = '{trace_id}';"""
+        await self.mysql_client.async_insert(update_sql)
+
+    async def start_process(self, params):
+        """
+        开始处理
+        :param params:
+        :return:
+        """
+        # 更新文章contentId为1, 说明该文章正在处理中
+        update_sql = f"""
+            UPDATE {db_article}
+            SET 
+                content_status = 1
+            WHERE 
+                trace_id = '{params["trace_id"]}'
+        """
+        await self.mysql_client.async_insert(sql=update_sql)
+        # 判断标题中是否包含video_id
+        if "video_id=" in params['title']:
+            await self.process_video_id(
+                title=params['title'],
+                trace_id=params['trace_id'],
+                process_times=params['process_times']
+            )
+        else:
+            print("开始搜索视频")
+            await search_videos(
+                params={"title": params['title'], "content": params['text'], "trace_id": params['trace_id']},
+                trace_id=params['trace_id'],
+                gh_id=params['gh_id'],
+                mysql_client=self.mysql_client
+            )
+            # 执行完成之后,判断是否存在视频id
+            select_sql = f"""
+                SELECT recall_video_id1, recall_video_id2, recall_video_id3
+                FROM {db_article}
+                WHERE trace_id = '{params["trace_id"]}';
+            """
+            result = await self.mysql_client.async_select(sql=select_sql)
+            vid1, vid2, vid3 = result[0]
+            if vid1:
+                update_sql2 = f"""
+                    UPDATE {db_article}
+                    SET 
+                       content_status = 2,
+                       process_times = {int(params['process_times']) + 1}
+                       WHERE trace_id = '{params["trace_id"]}';
+                """
+                await self.mysql_client.async_insert(sql=update_sql2)
+                print("搜索视频成功")
+            else:
+                print("搜索视频失败")
+                update_sql3 = f"""
+                                    UPDATE {db_article}
+                                    SET 
+                                       content_status = 0,
+                                       process_times = {int(params['process_times']) + 1}
+                                    WHERE trace_id = '{params["trace_id"]}';
+                                """
+                await self.mysql_client.async_insert(sql=update_sql3)
+
+    async def deal(self):
+        """
+        处理
+        :return:
+        """
+        task_list = await self.get_task()
+        if task_list:
+            for params in task_list:
+                content_id = params['content_id']
+                # 判断该文章是否已经生成了
+                history_trace_id = await self.get_history_contents(content_id)
+                if history_trace_id:
+                    # 说明已经存在了结果, 将该条记录下的video_id拿出来
+                    print("该文章已经成功请求")
+                    await self.insert_history_contents_videos(history_trace_id, params)
+                else:
+                    flag = await self.judge_content_processing(content_id)
+                    if flag:
+                        print("开始处理这条视频")
+                        await self.start_process(params=params)
+                    else:
+                        print("该文章id正在处理中")
+        else:
+            print("没有要处理的视频")

+ 97 - 0
applications/deal/search_deal_v2.py

@@ -0,0 +1,97 @@
+"""
+@author: luojunhui
+"""
+import time
+
+from uuid import uuid4
+
+from applications.functions.log import logging
+from applications.static.config import db_article
+
+
+class SearchDeal2(object):
+    """
+    搜索接口处理逻辑
+    """
+
+    def __init__(self, params, mysql_client):
+        self.content_id = None
+        self.account_name = None
+        self.contents = None
+        self.title = None
+        self.gh_id = None
+        self.params = params
+        self.mysql_client = mysql_client
+        self.trace_id = "search-{}-{}".format(str(uuid4()), str(int(time.time())))
+
+    def check_params(self):
+        """
+        检查请求params
+        :return:
+        """
+        try:
+            self.gh_id = self.params['ghId']
+            self.title = self.params['title'].split("@@")[-1].replace("'", "")
+            self.contents = self.params['content'].replace("'", "")
+            self.account_name = self.params['accountName'].replace("'", "")
+            self.content_id = self.params['articleId']
+            logging(
+                code="1001",
+                info="搜索视频内容接口请求成功, 参数校验成功",
+                port="title_to_search",
+                trace_id=self.trace_id,
+                data=self.params
+            )
+            return None
+        except Exception as e:
+            result = {
+                "status": "fail",
+                "code": 1,
+                "message": str(e),
+                "info": "params check error"
+            }
+            logging(
+                code="4001",
+                info="搜索视频内容接口请求成功, 参数校验失败",
+                port="title_to_search",
+                trace_id=self.trace_id,
+                data=self.params
+            )
+            return result
+
+    async def input_queue(self):
+        """
+        把数据插入待处理队列
+        :return:
+        """
+        request_time = int(time.time())
+        insert_sql = f"""
+                        INSERT INTO {db_article}
+                            (trace_id, content_id, gh_id, account_name, article_title, article_text, content_status, success, request_time_stamp)
+                        VALUES 
+                            ('{self.trace_id}', '{self.content_id}','{self.gh_id}', '{self.account_name}', '{self.title}', '{self.contents}', 0, 0, {request_time});"""
+        print(insert_sql)
+        await self.mysql_client.async_insert(insert_sql)
+        logging(
+            code="1002",
+            info="成功记录请求数据到mysql中",
+            trace_id=self.trace_id
+        )
+
+    async def deal(self):
+        """
+        deal
+        :return:
+        """
+        params_error = self.check_params()
+        if params_error:
+            return params_error
+        else:
+            # 记录
+            await self.input_queue()
+            res = {
+                "status": "success input to article queue",
+                "code": 0,
+                "traceId": self.trace_id
+            }
+            return res

+ 12 - 2
applications/routes.py

@@ -5,7 +5,7 @@ from quart import Blueprint, jsonify, request
 
 from applications.functions.log import logging
 from applications.schedule import recall_videos
-from applications.deal import SearchDeal, ReSearchDeal
+from applications.deal import ReSearchDeal, SearchDeal2, ProcessDeal
 
 my_blueprint = Blueprint('LongArticles', __name__)
 
@@ -30,7 +30,7 @@ def Routes(mysql_client):
         :return:
         """
         params = await request.get_json()
-        SD = SearchDeal(params=params, mysql_client=mysql_client)
+        SD = SearchDeal2(params=params, mysql_client=mysql_client)
         result = await SD.deal()
         return jsonify(result)
 
@@ -73,4 +73,14 @@ def Routes(mysql_client):
         res = await RSD.deal()
         return jsonify(res)
 
+    @my_blueprint.route("/task")
+    async def schedule_task():
+        """
+        执行代码
+        :return:
+        """
+        PD = ProcessDeal(mysql_client=mysql_client)
+        await PD.deal()
+        return jsonify({"success": "true"})
+
     return my_blueprint

+ 16 - 4
applications/schedule/process_schedule.py

@@ -136,16 +136,28 @@ async def recall_videos(trace_id, mysql_client):
     :return:
     """
     select_sql = f"""
-        SELECT recall_video_id1, recall_video_id2, recall_video_id3, kimi_title 
+        SELECT recall_video_id1, recall_video_id2, recall_video_id3, kimi_title, content_status
         FROM {db_article}
         WHERE trace_id = '{trace_id}';
     """
     info_tuple = await mysql_client.async_select(select_sql)
-    vid1, vid2, vid3, kimi_title = info_tuple[0]
+    vid1, vid2, vid3, kimi_title, content_status = info_tuple[0]
     vid_list = [vid1, vid2, vid3]
     unEmptyList = [i for i in vid_list if i]
     L = []
-    if unEmptyList:
+    if content_status == 0:
+        result = {
+            "traceId": trace_id,
+            "code": 0,
+            "Message": "该请求还没处理"
+        }
+    elif content_status == 1:
+        result = {
+            "traceId": trace_id,
+            "code": 1,
+            "Message": "该请求正在处理中"
+        }
+    elif content_status == 2:
         logging(
             code="1002",
             info="vid_list: {}".format(json.dumps(unEmptyList, ensure_ascii=False)),
@@ -181,7 +193,7 @@ async def recall_videos(trace_id, mysql_client):
     else:
         result = {
             "traceId": trace_id,
-            "Message": "No Videos Found now, Please try again in one minute"
+            "Message": "UnKnow Error"
         }
     logging(
         code="1002",

+ 5 - 0
applications/schedule/search_schedule.py

@@ -282,6 +282,11 @@ async def video_sender(video_obj, user, trace_id, platform, index):
         headers=header,
         json_data=mq_obj
     )
+    # await request_etl(
+    #     url="http://localhost:4612/etl",
+    #     headers=header,
+    #     json_data=mq_obj
+    # )
 
 
 async def search_videos(params, trace_id, gh_id, mysql_client):

La diferencia del archivo ha sido suprimido porque es demasiado grande
+ 0 - 0
applications/static/config.py


+ 9 - 0
dev/task_test.py

@@ -0,0 +1,9 @@
+"""
+@author: luojunhui
+"""
+import requests
+
+url = "http://{}:8111/task".format("localhost")
+
+w = requests.get(url)
+print(w)

+ 79 - 0
task.py

@@ -0,0 +1,79 @@
+"""
+@author: luojunhui
+"""
+import asyncio
+
+import aiomysql
+
+from applications.deal import ProcessDeal
+
+
+class TaskMySQLClient(object):
+    """
+    Async MySQL
+    """
+
+    def __init__(self):
+        self.mysql_pool = None
+
+    async def init_pool(self):
+        """
+        初始化连接
+        :return:
+        """
+        self.mysql_pool = await aiomysql.create_pool(
+            host='rm-bp1159bu17li9hi94.mysql.rds.aliyuncs.com',
+            port=3306,
+            user='crawler',
+            password='crawler123456@',
+            db='piaoquan-crawler',
+            charset='utf8mb4',
+            connect_timeout=120,
+        )
+        print("mysql init successfully")
+
+    async def close_pool(self):
+        """
+        关闭 mysql 连接
+        :return:
+        """
+        self.mysql_pool.close()
+        await self.mysql_pool.wait_closed()
+
+    async def async_select(self, sql):
+        """
+        select method
+        :param sql:
+        :return:
+        """
+        async with self.mysql_pool.acquire() as conn:
+            async with conn.cursor() as cursor:
+                await cursor.execute(sql)
+                result = await cursor.fetchall()
+                return result
+
+    async def async_insert(self, sql):
+        """
+        insert and update method
+        :param sql:
+        :return:
+        """
+        async with self.mysql_pool.acquire() as coon:
+            async with coon.cursor() as cursor:
+                await cursor.execute(sql)
+                await coon.commit()
+
+
+async def main():
+    """
+    main job
+    :return:
+    """
+    TMC = TaskMySQLClient()
+    await TMC.init_pool()
+    PD = ProcessDeal(TMC)
+    await PD.deal()
+
+
+if __name__ == '__main__':
+    asyncio.run(main())

Algunos archivos no se mostraron porque demasiados archivos cambiaron en este cambio