فهرست منبع

只针对level1的通过裂变率重拍

luojunhui 5 ماه پیش
والد
کامیت
8a2430df2e
5فایلهای تغییر یافته به همراه85 افزوده شده و 62 حذف شده
  1. 8 8
      applications/config/__init__.py
  2. 2 1
      applications/match_algorithm/rank.py
  3. 16 0
      create_dev_table.sql
  4. 6 5
      historyTask.py
  5. 53 48
      tasks/history_task.py

+ 8 - 8
applications/config/__init__.py

@@ -9,7 +9,7 @@ class Config(object):
     apolloConfig
     """
 
-    def __init__(self, env="pre"):
+    def __init__(self, env="dev"):
         """
         :param env:
         """
@@ -29,15 +29,15 @@ class Config(object):
             case "dev":
                 self.apollo_connection = pyapollos.ApolloClient(
                     app_id="LongArticlesMatchServer",
-                    config_server_url="https://devapolloconfig-internal.piaoquantv.com/",
+                    config_server_url="http://devapolloconfig-internal.piaoquantv.com/",
                     timeout=10
                 )
-                self.article_match_video_table = "long_articles_match_videos_copy1"
-                self.article_text_table = "long_articles_text_copy1"
-                self.article_crawler_video_table = "long_articles_crawler_videos_copy1"
-                self.root_source_id_table = "long_articles_root_source_id_copy1"
-                self.get_off_video_table = "get_off_videos_copy1"
-                self.fission_detail_table = "long_articles_videos_fission_info_copy1"
+                self.article_match_video_table = "long_articles_match_videos_dev"
+                self.article_text_table = "long_articles_text_dev"
+                self.article_crawler_video_table = "long_articles_crawler_videos_dev"
+                self.root_source_id_table = "long_articles_root_source_id_dev"
+                self.get_off_video_table = "get_off_videos_dev"
+                self.fission_detail_table = "long_articles_videos_fission_info_dev"
             case "pre":
                 self.apollo_connection = pyapollos.ApolloClient(
                     app_id="LongArticlesMatchServer",

+ 2 - 1
applications/match_algorithm/rank.py

@@ -95,7 +95,7 @@ async def get_title_oss_fission_dict(db_client, config, content_id) -> dict[str:
     :return:
     """
     FISSION_DETAIL_TABLE = config.fission_detail_table
-    LONG_ARTICLES_TEXT_TABLE = config.long_article_text_table
+    LONG_ARTICLES_TEXT_TABLE = config.article_text_table
     two_days_ago_dt = (datetime.now() - timedelta(days=2)).strftime('%Y%m%d')
     sql = f"""
         SELECT
@@ -123,5 +123,6 @@ async def get_title_oss_fission_dict(db_client, config, content_id) -> dict[str:
                 continue
             else:
                 fission_info_dict[key] = value
+        return fission_info_dict
     else:
         return {}

+ 16 - 0
create_dev_table.sql

@@ -0,0 +1,16 @@
+CREATE TABLE long_articles_videos_fission_info_dev LIKE long_articles_videos_fission_info;
+INSERT INTO long_articles_videos_fission_info_dev SELECT * FROM long_articles_videos_fission_info;
+select count(1) from long_articles_videos_fission_info_dev;
+
+CREATE TABLE long_articles_text_dev LIKE long_articles_text;
+INSERT INTO long_articles_text_dev SELECT * FROM long_articles_text;
+select * from long_articles_text_dev;
+
+CREATE TABLE long_articles_crawler_videos_dev LIKE long_articles_crawler_videos;
+INSERT INTO long_articles_crawler_videos_dev SELECT * FROM long_articles_crawler_videos;
+SELECT * FROM long_articles_crawler_videos_dev;
+
+
+CREATE TABLE long_articles_match_videos_dev LIKE long_articles_match_videos;
+INSERT INTO long_articles_match_videos_dev SELECT * FROM long_articles_match_videos order by request_timestamp DESC LIMIT 1000;
+SELECT * FROM long_articles_match_videos_dev;

+ 6 - 5
historyTask.py

@@ -20,8 +20,9 @@ async def main():
 
 
 if __name__ == '__main__':
-    while True:
-        asyncio.run(main())
-        now_str = datetime.datetime.now().__str__()
-        print("{}    请求执行完成, 等待60s".format(now_str))
-        time.sleep(60)
+    asyncio.run(main())
+    # while True:
+    #     asyncio.run(main())
+    #     now_str = datetime.datetime.now().__str__()
+    #     print("{}    请求执行完成, 等待60s".format(now_str))
+    #     time.sleep(60)

+ 53 - 48
tasks/history_task.py

@@ -4,6 +4,7 @@
 import json
 import time
 import asyncio
+import traceback
 
 from applications.config import Config
 from applications.log import logging
@@ -51,9 +52,9 @@ class historyContentIdTask(object):
                 where download_status = 2
                 group by content_id
             ) VID on ART.content_id = VID.content_id and VID.cnt >= 3
-            WHERE ART.content_status = 0 and ART.process_times <= 3
+            WHERE ART.content_status = 0 and ART.process_times <= 3 AND ART.flow_pool_level = 'autoArticlePoolLevel1'
             ORDER BY request_timestamp
-            LIMIT {self.history_coroutines};
+            LIMIT 1;
         """
         tasks = await self.mysql_client.async_select(sql=select_sql1)
         task_obj_list = [
@@ -170,58 +171,60 @@ class historyContentIdTask(object):
                     config=self.config,
                     content_id=content_id
                 )
+                fission_list = [[i] + [fission_dict[i]['fission_0_on_read']] for i in fission_dict.keys()]
+                sorted_fission_list = sorted(fission_list, key=lambda x: x[1], reverse=True)
                 download_videos_with_fission_info = []
-                for video in download_videos:
-                    video["fission_0_rate"] = fission_dict.get(video['video_oss_path'], {}).get("fission_0_rate", 0)
-                    video["fission_0_on_read"] = fission_dict.get(video['video_oss_path'], {}).get("fission_0_on_read", 0)
+                for index, video in enumerate(download_videos[:3]):
+                    video['video_oss_path'] = "https://rescdn.yishihui.com/" + sorted_fission_list[index][0]
+                    video["fission_0_on_read"] = sorted_fission_list[index][1]
                     download_videos_with_fission_info.append(video)
-                # sorted_videos = sorted(download_videos_with_fission_info, key=lambda x: x['fission_0_rate'], reverse=True)
-                sorted_videos = sorted(download_videos_with_fission_info, key=lambda x: x['fission_0_on_read'], reverse=True)
-                video_list = sorted_videos[:3]
+                video_list = download_videos_with_fission_info
             case _:
                 print("未传流量池信息")
                 video_list = download_videos[:3]
         L = []
-        for video_obj in video_list:
-            params = {
-                "videoPath": video_obj['video_oss_path'],
-                "uid": video_obj['uid'],
-                "title": kimi_title
-            }
-            publish_response = await publish_to_pq(params)
-            video_id = publish_response['data']['id']
-            response = await get_pq_video_detail(video_id)
-            # time.sleep(2)
-            obj = {
-                "uid": video_obj['uid'],
-                "source": video_obj['platform'],
-                "kimiTitle": kimi_title,
-                "videoId": response['data'][0]['id'],
-                "videoCover": response['data'][0]['shareImgPath'],
-                "videoPath": response['data'][0]['videoPath'],
-                "videoOss": video_obj['video_oss_path']
-            }
-            L.append(obj)
-        update_sql = f"""
-           UPDATE {self.article_match_video_table}
-           SET content_status = %s, response = %s, process_times = %s
-           WHERE trace_id = %s and content_status = %s;
-           """
-        await self.mysql_client.async_insert(
-            sql=update_sql,
-            params=(
-                self.TASK_PUBLISHED_STATUS,
-                json.dumps(L, ensure_ascii=False),
-                process_times + 1,
-                trace_id,
-                self.TASK_PROCESSING_STATUS
-            )
-        )
-        logging(
-            code="9002",
-            info="已经从历史文章更新",
-            trace_id=trace_id
-        )
+        for index, video_obj in enumerate(video_list, 1):
+            print(index)
+            print(json.dumps(video_obj, ensure_ascii=False, indent=4))
+            # params = {
+            #     "videoPath": video_obj['video_oss_path'],
+            #     "uid": video_obj['uid'],
+            #     "title": kimi_title
+            # }
+        #     publish_response = await publish_to_pq(params)
+        #     video_id = publish_response['data']['id']
+        #     response = await get_pq_video_detail(video_id)
+        #     # time.sleep(2)
+        #     obj = {
+        #         "uid": video_obj['uid'],
+        #         "source": video_obj['platform'],
+        #         "kimiTitle": kimi_title,
+        #         "videoId": response['data'][0]['id'],
+        #         "videoCover": response['data'][0]['shareImgPath'],
+        #         "videoPath": response['data'][0]['videoPath'],
+        #         "videoOss": video_obj['video_oss_path']
+        #     }
+        #     L.append(obj)
+        # update_sql = f"""
+        #    UPDATE {self.article_match_video_table}
+        #    SET content_status = %s, response = %s, process_times = %s
+        #    WHERE trace_id = %s and content_status = %s;
+        #    """
+        # await self.mysql_client.async_insert(
+        #     sql=update_sql,
+        #     params=(
+        #         self.TASK_PUBLISHED_STATUS,
+        #         json.dumps(L, ensure_ascii=False),
+        #         process_times + 1,
+        #         trace_id,
+        #         self.TASK_PROCESSING_STATUS
+        #     )
+        # )
+        # logging(
+        #     code="9002",
+        #     info="已经从历史文章更新",
+        #     trace_id=trace_id
+        # )
 
     async def roll_back_content_status_when_fails(self, process_times, trace_id):
         """
@@ -289,6 +292,8 @@ class historyContentIdTask(object):
                     info="history task 在发布的时候出现异常, error = {}".format(e),
                     trace_id=trace_id
                 )
+                error_msg = traceback.format_exc()
+                print(error_msg)
             await self.roll_back_content_status_when_fails(
                 trace_id=trace_id,
                 process_times=process_times