浏览代码

2024-0627-处理mysql问题

罗俊辉 1 年之前
父节点
当前提交
028b5188c6

+ 60 - 60
applications/deal/process_deal_2.py → applications/deal/get_done_content_deal.py

@@ -1,9 +1,6 @@
 """
 @author: luojunhui
 """
-"""
-@author: luojunhui
-"""
 import asyncio
 
 from applications.static.config import db_article
@@ -33,7 +30,7 @@ class ProcessDeal2(object):
             WHERE content_status = 0 and process_times <= 5
             ORDER BY request_time_stamp
             ASC
-            LIMIT 30
+            LIMIT 30;
         """
         task_list = await self.mysql_client.async_select(sql=select_sql)
         task_obj_list = [
@@ -114,15 +111,26 @@ class ProcessDeal2(object):
         update_sql = f"""
         UPDATE {db_article}
         SET 
-            kimi_title='{kimi_title}',
-            recall_video_id1={vid1}, 
-            recall_video_id2={"NULL" if vid2 is None else vid2}, 
-            recall_video_id3={"NULL" if vid3 is None else vid3},
-            content_status=2,
-            process_times = {int(params['process_times']) + 1}
-        WHERE  trace_id = '{params['trace_id']}'
+            kimi_title=%s,
+            recall_video_id1=%s, 
+            recall_video_id2=%s, 
+            recall_video_id3=%s,
+            content_status=%s,
+            process_times = %s
+        WHERE  trace_id = %s;
         """
-        await self.mysql_client.async_insert(update_sql)
+        await self.mysql_client.async_insert(
+            update_sql,
+            params=(
+                kimi_title,
+                vid1,
+                {"NULL" if vid2 is None else vid2},
+                {"NULL" if vid2 is None else vid3},
+                2,
+                int(params['process_times']) + 1,
+                params['trace_id']
+            )
+        )
         logging(
             code="9002",
             info="已从历史文章更新,历史id: {}".format(history_trace_id),
@@ -139,12 +147,15 @@ class ProcessDeal2(object):
             UPDATE  
                 {db_article}
             SET 
-                recall_video_id1 = '{video_id}',
-                content_status = 2,
-                process_times = {int(process_times) + 1}
+                recall_video_id1 = %s,
+                content_status = %s,
+                process_times = %s
             WHERE  
-                trace_id = '{trace_id}';"""
-        await self.mysql_client.async_insert(update_sql)
+                trace_id = %s;"""
+        await self.mysql_client.async_insert(
+            sql=update_sql,
+            params=(video_id, 2, int(process_times) + 1, trace_id)
+        )
 
     async def start_process(self, params):
         """
@@ -156,11 +167,14 @@ class ProcessDeal2(object):
         update_sql = f"""
             UPDATE {db_article}
             SET 
-                content_status = 1
+                content_status = %s
             WHERE 
-                trace_id = '{params["trace_id"]}'
+                trace_id = %s;
         """
-        await self.mysql_client.async_insert(sql=update_sql)
+        await self.mysql_client.async_insert(
+            sql=update_sql,
+            params=(1, params['trace_id'])
+        )
         try:
             # 判断标题中是否包含video_id
             if "video_id=" in params['title']:
@@ -193,11 +207,14 @@ class ProcessDeal2(object):
                     update_sql2 = f"""
                         UPDATE {db_article}
                         SET 
-                           content_status = 2,
-                           process_times = {int(params['process_times']) + 1}
-                           WHERE trace_id = '{params["trace_id"]}';
+                           content_status = %s,
+                           process_times = %s
+                           WHERE trace_id = %s;
                     """
-                    await self.mysql_client.async_insert(sql=update_sql2)
+                    await self.mysql_client.async_insert(
+                        sql=update_sql2,
+                        params=(2, int(params['process_times']) + 1, params['trace_id'])
+                    )
                     logging(
                         code="9008",
                         info="视频搜索成功, 状态修改为2",
@@ -205,13 +222,16 @@ class ProcessDeal2(object):
                     )
                 else:
                     update_sql3 = f"""
-                                        UPDATE {db_article}
-                                        SET 
-                                           content_status = 0,
-                                           process_times = {int(params['process_times']) + 1}
-                                        WHERE trace_id = '{params["trace_id"]}';
-                                    """
-                    await self.mysql_client.async_insert(sql=update_sql3)
+                        UPDATE {db_article}
+                        SET 
+                           content_status = %s,
+                           process_times = %s
+                        WHERE trace_id = %s;
+                    """
+                    await self.mysql_client.async_insert(
+                        sql=update_sql3,
+                        params=(0, int(params['process_times']) + 1, params["trace_id"])
+                    )
                     logging(
                         code="9018",
                         info="视频搜索失败,回退状态为0",
@@ -226,11 +246,14 @@ class ProcessDeal2(object):
             update_sql4 = f"""
                 UPDATE {db_article}
                 SET 
-                   content_status = 0,
-                   process_times = {int(params['process_times']) + 1}
-                WHERE trace_id = '{params["trace_id"]}';
+                   content_status = %s,
+                   process_times = %s
+                WHERE trace_id = %s;
             """
-            await self.mysql_client.async_insert(sql=update_sql4)
+            await self.mysql_client.async_insert(
+                sql=update_sql4,
+                params=(0, int(params['process_times']) + 1, params["trace_id"])
+            )
 
     async def process_task(self, params):
         """
@@ -247,7 +270,8 @@ class ProcessDeal2(object):
             logging(
                 code="9001",
                 info="存在历史文章",
-                trace_id=trace_id
+                trace_id=trace_id,
+                function="find_history_article"
             )
             await self.insert_history_contents_videos(history_trace_id, params)
         else:
@@ -257,20 +281,6 @@ class ProcessDeal2(object):
                 trace_id=trace_id,
                 function="find_history_article"
             )
-            # flag = await self.judge_content_processing(content_id)
-            # if flag:
-            #     logging(
-            #         code="9004",
-            #         info="无正在处理的文章ID, 开始处理",
-            #         trace_id=trace_id
-            #     )
-            #     await self.start_process(params=params)
-            # else:
-            #     logging(
-            #         code="9003",
-            #         info="该文章ID正在请求--文章ID {}".format(content_id),
-            #         trace_id=trace_id
-            #     )
 
     async def deal(self):
         """
@@ -278,17 +288,7 @@ class ProcessDeal2(object):
         :return:
         """
         task_list = await self.get_task()
-        # print(task_list)
-        # task_dict = {}
-        # for task in task_list:
-        #     key = task['content_id']
-        #     task_dict[key] = task
-        # process_list = []
-        # for item in task_dict:
-        #     process_list.append(task_dict[item])
         if task_list:
-            # for task in task_list:
-            #     await self.process_task(task)
             tasks = [self.process_task(params) for params in task_list]
             await asyncio.gather(*tasks)
         else:

+ 120 - 49
applications/deal/process_deal.py

@@ -41,9 +41,9 @@ class ProcessDeal(object):
             select_sql = f"""
                 SELECT trace_id, content_id, gh_id, article_title, article_text, content_status, process_times
                 FROM {db_article} 
-                WHERE content_id in {content_ids_tuple}
+                WHERE content_id in {content_ids_tuple} and process_times <= 5 
                 ORDER BY request_time_stamp
-                ASC
+                ASC;
             """
             print(select_sql)
             task_list = await self.mysql_client.async_select(sql=select_sql)
@@ -127,15 +127,26 @@ class ProcessDeal(object):
         update_sql = f"""
         UPDATE {db_article}
         SET 
-            kimi_title='{kimi_title}',
-            recall_video_id1={vid1}, 
-            recall_video_id2={"NULL" if vid2 is None else vid2}, 
-            recall_video_id3={"NULL" if vid3 is None else vid3},
-            content_status=2,
-            process_times = {int(params['process_times']) + 1}
-        WHERE  trace_id = '{params['trace_id']}'
+            kimi_title=%s,
+            recall_video_id1=%s, 
+            recall_video_id2=%s, 
+            recall_video_id3=%s,
+            content_status=%s,
+            process_times = %s
+        WHERE  trace_id = %s
         """
-        await self.mysql_client.async_insert(update_sql)
+        await self.mysql_client.async_insert(
+            sql=update_sql,
+            params=(
+                kimi_title,
+                vid1,
+                "NULL" if vid2 is None else vid2,
+                "NULL" if vid3 is None else vid3,
+                2,
+                int(params['process_times']) + 1,
+                params['trace_id']
+            )
+        )
         logging(
             code="9002",
             info="已从历史文章更新,历史id: {}".format(history_trace_id),
@@ -152,12 +163,20 @@ class ProcessDeal(object):
             UPDATE  
                 {db_article}
             SET 
-                recall_video_id1 = '{video_id}',
-                content_status = 2,
-                process_times = {int(process_times) + 1}
+                recall_video_id1 = %s,
+                content_status = %s,
+                process_times = %s
             WHERE  
-                trace_id = '{trace_id}';"""
-        await self.mysql_client.async_insert(update_sql)
+                trace_id = %s;"""
+        await self.mysql_client.async_insert(
+            sql=update_sql,
+            params=(
+                video_id,
+                2,
+                {int(process_times) + 1},
+                trace_id
+            )
+        )
 
     async def start_process(self, params):
         """
@@ -169,11 +188,16 @@ class ProcessDeal(object):
         update_sql = f"""
             UPDATE {db_article}
             SET 
-                content_status = 1
+                content_status = %s
             WHERE 
-                trace_id = '{params["trace_id"]}'
+                trace_id = %s
         """
-        await self.mysql_client.async_insert(sql=update_sql)
+        await self.mysql_client.async_insert(
+            sql=update_sql,
+            params=(
+                1, params['trace_id']
+            )
+        )
         try:
             # 判断标题中是否包含video_id
             if "video_id=" in params['title']:
@@ -202,48 +226,95 @@ class ProcessDeal(object):
                 """
                 result = await self.mysql_client.async_select(sql=select_sql)
                 vid1, vid2, vid3 = result[0]
-                if vid1:
+                if vid1 or vid2 or vid3:
                     update_sql2 = f"""
                         UPDATE {db_article}
                         SET 
-                           content_status = 2,
-                           process_times = {int(params['process_times']) + 1}
-                           WHERE trace_id = '{params["trace_id"]}';
+                           content_status = %s,
+                           process_times = %s
+                           WHERE trace_id = %s;
                     """
-                    await self.mysql_client.async_insert(sql=update_sql2)
+                    await self.mysql_client.async_insert(
+                        sql=update_sql2,
+                        params=(
+                            2, {int(params['process_times']) + 1}, params['trace_id']
+                        )
+                    )
                     logging(
                         code="9008",
                         info="视频搜索成功, 状态修改为2",
                         trace_id=params['trace_id']
                     )
                 else:
-                    update_sql3 = f"""
-                                        UPDATE {db_article}
-                                        SET 
-                                           content_status = 0,
-                                           process_times = {int(params['process_times']) + 1}
-                                        WHERE trace_id = '{params["trace_id"]}';
-                                    """
-                    await self.mysql_client.async_insert(sql=update_sql3)
-                    logging(
-                        code="9018",
-                        info="视频搜索失败,回退状态为0",
-                        trace_id=params['trace_id']
-                    )
+                    if int(params['process_times']) < 5:
+                        update_sql3 = f"""
+                            UPDATE {db_article}
+                            SET 
+                               content_status = %s,
+                               process_times = %s
+                            WHERE trace_id = %s;
+                                        """
+                        await self.mysql_client.async_insert(
+                            sql=update_sql3,
+                            params=(0, int(params['process_times']) + 1, params['trace_id'])
+                        )
+                        logging(
+                            code="9018",
+                            info="视频搜索失败,回退状态为0",
+                            trace_id=params['trace_id']
+                        )
+                    else:
+                        update_sql3 = f"""
+                            UPDATE {db_article}
+                            SET 
+                               content_status = %s,
+                               process_times = %s
+                            WHERE trace_id = %s;
+                                        """
+                        await self.mysql_client.async_insert(
+                            sql=update_sql3,
+                            params=(3, int(params['process_times']) + 1, params['trace_id'])
+                        )
+                        logging(
+                            code="9019",
+                            info="视频多次搜索失败,状态修改为3",
+                            trace_id=params['trace_id']
+                        )
         except Exception as e:
-            logging(
-                code="9018",
-                info="{}异常错误:{}, 回退状态为0".format(params['trace_id'], e),
-                trace_id=params['trace_id']
-            )
-            update_sql4 = f"""
-                UPDATE {db_article}
-                SET 
-                   content_status = 0,
-                   process_times = {int(params['process_times']) + 1}
-                WHERE trace_id = '{params["trace_id"]}';
-            """
-            await self.mysql_client.async_insert(sql=update_sql4)
+            if int(params['process_times']) < 5:
+                logging(
+                    code="9018",
+                    info="{}异常错误:{}, 回退状态为0".format(params['trace_id'], e),
+                    trace_id=params['trace_id']
+                )
+                update_sql4 = f"""
+                    UPDATE {db_article}
+                    SET 
+                       content_status = %s,
+                       process_times = %s
+                    WHERE trace_id = %s;
+                """
+                await self.mysql_client.async_insert(
+                    sql=update_sql4,
+                    params=(0, int(params['process_times']) + 1, params['trace_id'])
+                )
+            else:
+                logging(
+                    code="9019",
+                    info="{}异常错误:{}, 状态修改为3".format(params['trace_id'], e),
+                    trace_id=params['trace_id']
+                )
+                update_sql4 = f"""
+                                    UPDATE {db_article}
+                                    SET 
+                                       content_status = %s,
+                                       process_times = %s
+                                    WHERE trace_id = %s;
+                                """
+                await self.mysql_client.async_insert(
+                    sql=update_sql4,
+                    params=(3, int(params['process_times']) + 1, params['trace_id'])
+                )
 
     async def process_task(self, params):
         """

+ 27 - 11
applications/deal/search_deal.py

@@ -70,8 +70,12 @@ class SearchDeal(object):
                         INSERT INTO {db_article}
                             (trace_id, gh_id, article_title, article_text, account_name, content_id)
                         VALUES 
-                            ('{self.trace_id}', '{self.gh_id}', '{self.title}', '{self.contents}', '{self.account_name}', '{self.content_id}');"""
-        await self.mysql_client.async_insert(insert_sql)
+                            (%s, %s, %s, %s, %s, %s);
+                            """
+        await self.mysql_client.async_insert(
+            sql=insert_sql,
+            params=(self.trace_id, self.gh_id, self.title, self.contents, self.account_name, self.content_id)
+        )
         logging(
             code="1002",
             info="成功记录请求数据到mysql中",
@@ -88,10 +92,13 @@ class SearchDeal(object):
             UPDATE  
                 {db_article}
             SET 
-                recall_video_id1 = '{video_id}'
+                recall_video_id1 = %s
             WHERE  
-                trace_id = '{self.trace_id}';"""
-        await self.mysql_client.async_insert(update_sql)
+                trace_id = %s;"""
+        await self.mysql_client.async_insert(
+            sql=update_sql,
+            params=(video_id, self.trace_id)
+        )
         res = {
             "status": "success",
             "code": 0,
@@ -112,14 +119,23 @@ class SearchDeal(object):
         update_sql = f"""
         UPDATE {db_article}
         SET 
-            kimi_title='{kimi_title}',
-            recall_video_id1={vid1}, 
-            recall_video_id2={"NULL" if vid2 is None else vid2}, 
-            recall_video_id3={"NULL" if vid3 is None else vid3}
-        WHERE  trace_id = '{self.trace_id}'
+            kimi_title=%s,
+            recall_video_id1=%s, 
+            recall_video_id2=%s, 
+            recall_video_id3=%s
+        WHERE  trace_id = %s
         """
         print(update_sql)
-        await self.mysql_client.async_insert(update_sql)
+        await self.mysql_client.async_insert(
+            sql=update_sql,
+            params=(
+                kimi_title,
+                vid1,
+                {"NULL" if vid2 is None else vid2},
+                {"NULL" if vid3 is None else vid3},
+                self.trace_id
+            )
+        )
 
     async def get_history_contents(self):
         """

+ 5 - 2
applications/deal/search_deal_v2.py

@@ -69,8 +69,11 @@ class SearchDeal2(object):
                         INSERT INTO {db_article}
                             (trace_id, content_id, gh_id, account_name, article_title, article_text, content_status, success, request_time_stamp)
                         VALUES 
-                            ('{self.trace_id}', '{self.content_id}','{self.gh_id}', '{self.account_name}', '{self.title}', '{self.contents}', 0, 0, {request_time});"""
-        await self.mysql_client.async_insert(insert_sql)
+                            (%s, %s, %s, %s, %s, %s, %s, %s, %s);"""
+        await self.mysql_client.async_insert(
+            sql=insert_sql,
+            params=(self.trace_id, self.content_id, self.gh_id, self.account_name, self.title, self.contents, 0, 0, request_time)
+        )
         logging(
             code="1002",
             info="成功记录请求数据到mysql中",

+ 3 - 2
applications/functions/async_mysql.py

@@ -48,13 +48,14 @@ class AsyncMySQLClient(object):
                 result = await cursor.fetchall()
                 return result
 
-    async def async_insert(self, sql):
+    async def async_insert(self, sql, params):
         """
         insert and update method
+        :param params:
         :param sql:
         :return:
         """
         async with self.app.mysql_pool.acquire() as coon:
             async with coon.cursor() as cursor:
-                await cursor.execute(sql)
+                await cursor.execute(sql, params)
                 await coon.commit()

+ 15 - 9
applications/schedule/process_schedule.py

@@ -47,12 +47,15 @@ async def return_info_v2(video_id, trace_id, mysql_client):
     update_result_sql = f"""
                         UPDATE {db_article}
                         SET
-                            result1 = '{json.dumps(result, ensure_ascii=False)}',
-                            success = 1
+                            result1 = %s,
+                            success = %s
                         WHERE
-                            trace_id = '{trace_id}'
+                            trace_id = %s;
                     """
-    await mysql_client.async_insert(update_result_sql)
+    await mysql_client.async_insert(
+        sql=update_result_sql,
+        params=(json.dumps(result, ensure_ascii=False), 1, trace_id)
+    )
     logging(
         code="2000",
         info="root_share_id和source_id",
@@ -108,12 +111,15 @@ async def return_info(video_id, kimi_title, trace_id, mysql_client, index):
     update_result_sql = f"""
                     UPDATE {db_article}
                     SET
-                        result{index} = '{json.dumps(result, ensure_ascii=False)}',
-                        success = 1
+                        result{index} = %s,
+                        success = %s
                     WHERE
-                        trace_id = '{trace_id}'
+                        trace_id = %s;
                 """
-    await mysql_client.async_insert(update_result_sql)
+    await mysql_client.async_insert(
+        sql=update_result_sql,
+        params=(json.dumps(result, ensure_ascii=False), 1, trace_id)
+    )
     logging(
         code="2000",
         info="统计 root_share_id && video_id",
@@ -150,7 +156,7 @@ async def recall_videos(trace_id, mysql_client):
             result = {
                 "traceId": trace_id,
                 "code": 0,
-                "Message": "匹配失败,检查原因"
+                "error": "匹配失败,检查原因"
             }
         else:
             result = {

+ 8 - 5
applications/schedule/search_schedule.py

@@ -305,12 +305,15 @@ async def search_videos(params, trace_id, gh_id, mysql_client):
     content_keys = json.dumps(kimi_info['content_keys'], ensure_ascii=False)
     update_kimi_sql = f"""
                     UPDATE {db_article} SET
-                    kimi_title = '{kimi_title}',
-                    kimi_summary = '{content_title}',
-                    kimi_keys = '{content_keys}'
-                    WHERE trace_id = '{trace_id}';
+                    kimi_title = %s,
+                    kimi_summary = %s,
+                    kimi_keys = %s
+                    WHERE trace_id = %s;
                 """
-    await mysql_client.async_insert(update_kimi_sql)
+    await mysql_client.async_insert(
+        sql=update_kimi_sql,
+        params=(kimi_title, content_title, content_keys, trace_id)
+    )
     kimi_info["trace_id"] = trace_id
     SearchAB = SearchABTest(info=kimi_info, gh_id=gh_id)
     # 启三个搜索,每个搜索都保证要搜索到, 分别用key1, key2, key3去搜索

+ 1 - 1
task.py

@@ -8,7 +8,7 @@ import asyncio
 import aiomysql
 
 from applications.deal import ProcessDeal
-from applications.deal.process_deal_2 import ProcessDeal2
+from applications.deal.get_done_content_deal import ProcessDeal2
 
 
 class TaskMySQLClient(object):