罗俊辉 8 ماه پیش
والد
کامیت
6f78f26f73

+ 2 - 1
applications/__init__.py

@@ -2,4 +2,5 @@
 @author: luojunhui
 """
 from .aliyunLog import log
-from .asyncDB import AsyncMySQLClient
+from .asyncDB import AsyncMySQLClient
+from .odpsServer import ODPSApi

+ 34 - 0
applications/odpsServer.py

@@ -0,0 +1,34 @@
+"""
+@author: luojunhui
+"""
+from odps import ODPS
+
+
+class ODPSApi(object):
+    """
+    PyODPS class, get data from odps server
+    """
+
+    def __init__(self):
+        self.endpoint = "http://service.cn.maxcompute.aliyun.com/api"
+        self.access_id = "LTAIWYUujJAm7CbH"
+        self.access_key = "RfSjdiWwED1sGFlsjXv0DlfTnZTG1P"
+        self.project = "loghubods"
+
+        self.od = ODPS(
+            access_id=self.access_id,
+            secret_access_key=self.access_key,
+            endpoint=self.endpoint,
+            project=self.project,
+        )
+
+    def select(self, sql):
+        """
+        :param sql: 查询语句
+        :return: odps_obj{}
+        """
+        result = []
+        with self.od.execute_sql(sql).open_reader() as reader:
+            for record in reader[:5000]:
+                result.append(record)
+        return result

+ 4 - 1
config/__init__.py

@@ -4,4 +4,7 @@ config
 """
 # 环境
 env = "prod"
-# env = "dev"
+# env = "dev"
+
+# 视频表
+daily_video = "top_video_daily"

+ 4 - 0
dailyTasks/__init__.py

@@ -0,0 +1,4 @@
+"""
+@author: luojunhui
+"""
+from .updateDataFromOdpsDaily import updateFromOdps

+ 116 - 0
dailyTasks/updateDataFromOdpsDaily.py

@@ -0,0 +1,116 @@
+"""
+@author: luojunhui
+每日将odps的回流前5000的数据存储迁移的数据库中
+"""
+import time
+import asyncio
+
+import aiohttp
+from datetime import datetime, timedelta
+
+from applications import ODPSApi
+
+
+# 异步post请求
+async def asyncPost(url, headers, payload):
+    """
+    :param url:
+    :param headers:
+    :param payload:
+    :return:
+    """
+    retries = 3
+    async with aiohttp.ClientSession() as session:
+        for attempt in range(3):
+            try:
+                async with session.post(url, headers=headers, data=payload, timeout=10) as response:
+                    return await response.json()
+            except asyncio.TimeoutError:
+                if attempt < retries - 1:
+                    await asyncio.sleep(2)  # 等待一段时间后重试
+                else:
+                    raise
+
+
+# 获取昨天的日期的字符串
+def getYesterdayStr():
+    """获取昨天的日期字符"""
+    today = datetime.now()
+    # 计算昨天的日期
+    yesterday = today - timedelta(days=1)
+    return yesterday.strftime('%Y%m%d')
+
+
+class updateFromOdps(object):
+    """
+    从odps更新数据
+    """
+    odps_server = ODPSApi()
+
+    @classmethod
+    def getVideoFromOdps(cls):
+        """
+        从odps中获取视频list
+        :return:
+        """
+        date_info = getYesterdayStr()
+        sql = f"""
+        select videoid, title, return_lastday, uid, lastday_return, share_total, 品类标签, dt
+        from loghubods.lastday_return 
+        where dt = '{date_info}';
+        """
+        result = cls.odps_server.select(sql)
+        response = [
+            {
+                "video_id": i['videoid'],
+                "title": i['title'],
+                "last_day_return": i['return_lastday'],
+                "uid": i['uid'],
+                "last_day_view": i['lastday_return'],
+                "last_day_share": i['share_total'],
+                "category": i['品类标签'],
+                "dt": i['dt']
+            }
+            for i in result
+        ]
+        return response
+
+    @classmethod
+    async def insertIntoDB(cls, data_list):
+        """
+        插入mysql
+        :return:
+        """
+        # 分组,每组分50个
+        def chunk_list(lst, chunk_size):
+            """
+            将列表分割成指定大小的chunks。
+
+            :param lst: 要分割的列表。
+            :param chunk_size: 每个chunk的大小。
+            :return: 包含chunks的生成器。
+            """
+            for i in range(0, len(lst), chunk_size):
+                yield lst[i:i + chunk_size]
+
+        task_list = chunk_list(data_list, chunk_size=50)
+        for tasks in task_list:
+            task_ = [cls.insertSingleVideoToDB(params) for params in tasks]
+            await asyncio.gather(*task_)
+
+    @classmethod
+    async def insertSingleVideoToDB(cls, video_obj):
+        """
+        更新单个视频
+        :param video_obj:
+        :return:
+        """
+        url = "http://localhost:8813/insertVideo"
+        headers = {"Content-Type": "application/json"}
+        response = await asyncPost(
+            url=url,
+            headers=headers,
+            payload=video_obj
+        )
+        return response.json
+

+ 3 - 1
requirements.txt

@@ -8,4 +8,6 @@ numpy
 pandas
 aliyun-log-python-sdk
 aliyun-python-sdk-core
-aliyun-python-sdk-kms
+aliyun-python-sdk-kms
+odps
+apscheduler

+ 18 - 1
routes/__init__.py

@@ -2,9 +2,11 @@
 @author: luojunhui
 投流--路由
 """
+import time
+import uuid
 from quart import Blueprint, jsonify, request
 
-from config import env
+from .insertVideoRoute import insert
 
 TL_blueprint = Blueprint("TouLiu", __name__)
 
@@ -31,5 +33,20 @@ def Routes(db_client):
         """
         return jsonify({"message": "this function is developing"})
 
+    @TL_blueprint.route("/insertVideo", methods=['POST'])
+    async def insertVideos():
+        """
+        插入视频信息
+        :return:
+        """
+        request_id = "insertVideos_{}_{}".format(uuid.uuid4(), int(time.time()))
+        data = await request.get_data()
+        response = await insert(
+            db_client=db_client,
+            params=data,
+            request_id=request_id
+        )
+        return jsonify(response)
+
     return TL_blueprint
 

+ 54 - 0
routes/insertVideoRoute.py

@@ -0,0 +1,54 @@
+"""
+@author: luojunhui
+"""
+from config import daily_video
+
+
+async def insert(db_client, params, request_id):
+    """
+
+    :return:
+    """
+    try:
+        video_id = params['video_id']
+        title = params['title']
+        return_cnt = params['last_day_return']
+        uid = params['uid']
+        view_cnt = params['last_day_view']
+        share_cnt = params['last_day_share']
+        category = params['category']
+        dt = params['dt']
+    except Exception as e:
+        response = {
+            "error": str(e),
+            "message": "params check failed",
+            "requestId": request_id
+        }
+        return response
+
+    insert_sql = f"""
+    INSERT INTO {daily_video}
+    (video_id, uid, title, return_cnt, view_cnt, share_cnt, category, dt, ros, rov)
+    values 
+    (%s, %s, %s, %s, %s, %s, %s, %s);
+    """
+    await db_client.async_insert(
+        insert_sql,
+        params=(
+            video_id,
+            title,
+            return_cnt,
+            uid,
+            view_cnt,
+            share_cnt,
+            category,
+            dt,
+            float(int(return_cnt) / int(view_cnt)),
+            float(int(share_cnt) / int(view_cnt))
+        )
+    )
+    response = {
+        "info": "success",
+        "requestId": request_id
+    }
+    return response

+ 46 - 0
touliu_schedule_app.py

@@ -0,0 +1,46 @@
+"""
+@author: luojunhui
+投流每日任务
+"""
+import asyncio
+
+from apscheduler.schedulers.asyncio import AsyncIOScheduler
+from apscheduler.triggers.cron import CronTrigger
+
+from dailyTasks import updateFromOdps
+
+
+async def asyncUpdatePQVideosTask():
+    """
+    更新任务
+    :return:
+    """
+    ufo = updateFromOdps()
+    video_list = ufo.getVideoFromOdps()
+    await ufo.insertIntoDB(data_list=video_list)
+
+
+def asyncScheduleJob():
+    """
+    更新代码
+    :return:
+    """
+    scheduler = AsyncIOScheduler()
+    # 设置一个cron触发器,每天的9点执行任务
+    trigger = CronTrigger(hour=10, minute=0)
+    scheduler.add_job(asyncUpdatePQVideosTask, trigger)
+    scheduler.start()
+
+
+if __name__ == '__main__':
+    loop = asyncio.get_event_loop()
+    asyncScheduleJob()
+    try:
+        loop.run_forever()  # 保持事件循环运行
+    except (KeyboardInterrupt, SystemExit):
+        pass
+
+
+
+
+