浏览代码

测试环境

罗俊辉 11 月之前
父节点
当前提交
da967e60db
共有 6 个文件被更改,包括 49 次插入40 次删除
  1. 3 3
      config/__init__.py
  2. 5 6
      dailyTasks/updateDataFromOdpsDaily.py
  3. 2 1
      requirements.txt
  4. 1 1
      routes/__init__.py
  5. 27 21
      routes/insertVideoRoute.py
  6. 11 8
      touliu_schedule_app.py

+ 3 - 3
config/__init__.py

@@ -3,8 +3,8 @@
 config
 """
 # 环境
-env = "prod"
-# env = "dev"
+# env = "prod"
+env = "dev"
 
 # 视频表
-daily_video = "top_video_daily"
+daily_video = "top_videos_daily"

+ 5 - 6
dailyTasks/updateDataFromOdpsDaily.py

@@ -2,7 +2,6 @@
 @author: luojunhui
 每日将odps的回流前5000的数据存储迁移的数据库中
 """
-import time
 import asyncio
 
 import aiohttp
@@ -23,7 +22,7 @@ async def asyncPost(url, headers, payload):
     async with aiohttp.ClientSession() as session:
         for attempt in range(3):
             try:
-                async with session.post(url, headers=headers, data=payload, timeout=10) as response:
+                async with session.post(url, headers=headers, json=payload, timeout=10) as response:
                     return await response.json()
             except asyncio.TimeoutError:
                 if attempt < retries - 1:
@@ -55,7 +54,7 @@ class updateFromOdps(object):
         """
         date_info = getYesterdayStr()
         sql = f"""
-        select videoid, title, return_lastday, uid, lastday_return, share_total, 品类标签, dt
+        select videoid, title, 回流人数, uid, 总曝光, share_total, 品类标签, dt
         from loghubods.lastday_return 
         where dt = '{date_info}';
         """
@@ -64,9 +63,9 @@ class updateFromOdps(object):
             {
                 "video_id": i['videoid'],
                 "title": i['title'],
-                "last_day_return": i['return_lastday'],
+                "last_day_return": i['回流人数'],
                 "uid": i['uid'],
-                "last_day_view": i['lastday_return'],
+                "last_day_view": i['总曝光'],
                 "last_day_share": i['share_total'],
                 "category": i['品类标签'],
                 "dt": i['dt']
@@ -112,5 +111,5 @@ class updateFromOdps(object):
             headers=headers,
             payload=video_obj
         )
-        return response.json
+        return response
 

+ 2 - 1
requirements.txt

@@ -10,4 +10,5 @@ aliyun-log-python-sdk
 aliyun-python-sdk-core
 aliyun-python-sdk-kms
 odps
-apscheduler
+apscheduler
+tqdm

+ 1 - 1
routes/__init__.py

@@ -40,7 +40,7 @@ def Routes(db_client):
         :return:
         """
         request_id = "insertVideos_{}_{}".format(uuid.uuid4(), int(time.time()))
-        data = await request.get_data()
+        data = await request.get_json()
         response = await insert(
             db_client=db_client,
             params=data,

+ 27 - 21
routes/insertVideoRoute.py

@@ -28,27 +28,33 @@ async def insert(db_client, params, request_id):
 
     insert_sql = f"""
     INSERT INTO {daily_video}
-    (video_id, uid, title, return_cnt, view_cnt, share_cnt, category, dt, ros, rov)
-    values 
-    (%s, %s, %s, %s, %s, %s, %s, %s);
+    (video_id, uid, title, return_cnt, view_cnt, share_cnt, category, ros, rov, dt)
+    values
+    (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s);
     """
-    await db_client.async_insert(
-        insert_sql,
-        params=(
-            video_id,
-            title,
-            return_cnt,
-            uid,
-            view_cnt,
-            share_cnt,
-            category,
-            dt,
-            float(int(return_cnt) / int(view_cnt)),
-            float(int(share_cnt) / int(view_cnt))
+    try:
+        await db_client.asyncInsert(
+            insert_sql,
+            params=(
+                video_id,
+                uid,
+                title,
+                return_cnt,
+                view_cnt,
+                share_cnt,
+                category,
+                float(int(return_cnt) / int(view_cnt)) if int(view_cnt) > 0 else 0,
+                float(int(return_cnt) / int(share_cnt)) if int(share_cnt) > 0 else 0,
+                dt
+            )
         )
-    )
-    response = {
-        "info": "success",
-        "requestId": request_id
-    }
+        response = {
+            "info": "success",
+            "requestId": request_id
+        }
+    except Exception as e:
+        response = {
+            "error": str(e),
+            "requestId": request_id
+        }
     return response

+ 11 - 8
touliu_schedule_app.py

@@ -26,19 +26,22 @@ def asyncScheduleJob():
     :return:
     """
     scheduler = AsyncIOScheduler()
-    # 设置一个cron触发器,每天的9点执行任务
-    trigger = CronTrigger(hour=10, minute=0)
+    # 设置一个cron触发器,hour是整时,minute是分钟
+    trigger = CronTrigger(hour=10, minute=26)
     scheduler.add_job(asyncUpdatePQVideosTask, trigger)
     scheduler.start()
 
 
 if __name__ == '__main__':
-    loop = asyncio.get_event_loop()
-    asyncScheduleJob()
-    try:
-        loop.run_forever()  # 保持事件循环运行
-    except (KeyboardInterrupt, SystemExit):
-        pass
+    # 直接执行
+    asyncio.run(asyncUpdatePQVideosTask())
+    # 定时执行
+    # loop = asyncio.get_event_loop()
+    # asyncScheduleJob()
+    # try:
+    #     loop.run_forever()  # 保持事件循环运行
+    # except (KeyboardInterrupt, SystemExit):
+    #     pass