Просмотр исходного кода

自动下架视频
迁移 rootsourceid
两个任务增加了阿里云日志

罗俊辉 7 месяцев назад
Родитель
Сommit
910f84081d
4 измененных файлов с 152 добавлено и 28 удалено
  1. 2 1
      applications/__init__.py
  2. 59 0
      applications/aliyunLogApi.py
  3. 58 12
      getOffVideosDaily.py
  4. 33 15
      migrateRootSourceId.py

+ 2 - 1
applications/__init__.py

@@ -8,4 +8,5 @@ from .pqMysql import PQMySQL
 from .functions import Functions
 from .data_works import ODPSApi
 from .wxSpiderApi import WeixinSpider
-from .algApi import AlgApi
+from .algApi import AlgApi
+from .aliyunLogApi import log

+ 59 - 0
applications/aliyunLogApi.py

@@ -0,0 +1,59 @@
+"""
+@author: luojunhui
+"""
+import datetime
+import json
+import time
+
+from aliyun.log import LogClient, PutLogsRequest, LogItem
+
+
+def log(
+        task,
+        function,
+        status="success",
+        message=None,
+        data=None
+):
+    """
+    @:param task 任务
+    @:param
+    :return:
+    """
+    if data is None:
+        data = {}
+    accessKeyId = "LTAIP6x1l3DXfSxm"
+    accessKey = "KbTaM9ars4OX3PMS6Xm7rtxGr1FLon"
+    project = "changwen-alg"
+    log_store = "long_articles_job"
+    endpoint = "cn-hangzhou.log.aliyuncs.com"
+    # 创建 LogClient 实例
+    client = LogClient(endpoint, accessKeyId, accessKey)
+    log_group = []
+    log_item = LogItem()
+    contents = [
+        (f"task", str(task)),
+        (f"function", str(function)),
+        (f"message", str(message)),
+        (f"status", str(status)),
+        (f"data", json.dumps(data, ensure_ascii=False) if data else ""),
+        ("dateTime", datetime.datetime.now().__str__()),
+        ("timestamp", str(int(time.time())))
+    ]
+
+    log_item.set_contents(contents)
+    log_group.append(log_item)
+    # 写入日志
+    request = PutLogsRequest(
+        project=project,
+        logstore=log_store,
+        topic="",
+        source="",
+        logitems=log_group,
+        compress=False,
+    )
+    try:
+        client.put_logs(request)
+    except Exception as e:
+        print("日志失败")
+        print(e)

+ 58 - 12
getOffVideosDaily.py

@@ -4,13 +4,12 @@
 """
 import json
 import time
-from concurrent.futures.thread import ThreadPoolExecutor
 
 import requests
 import schedule
 from tqdm import tqdm
 
-from applications import PQMySQL, Functions
+from applications import PQMySQL, Functions, log
 from applications.decoratorApi import retryOnTimeout
 
 
@@ -66,6 +65,11 @@ class AutoGetOffVideos(object):
         WHERE video_status = 1 and publish_time < {time_stamp};
         """
         result = cls.pqMysql.select(sql=select_sql)
+        log(
+            task="getOffVideosDaily",
+            function="getLongArticlesVideos",
+            message="查找到视频 id_list,一共{}条视频".format(len(result))
+        )
         return result
 
     @classmethod
@@ -81,11 +85,24 @@ class AutoGetOffVideos(object):
                 SET video_status = 0, get_off_time = {time_stamp}
                 WHERE video_id = %s;
                 """
-        cls.pqMysql.update(
-            sql=select_sql,
-            params=video_id
-        )
-        print("更新成功")
+        try:
+            cls.pqMysql.update(
+                sql=select_sql,
+                params=video_id
+            )
+            log(
+                task="getOffVideosDaily",
+                function="updateVideoIdStatus",
+                message="成功修改视频状态",
+                data={"video_id": video_id}
+            )
+        except Exception as e:
+            log(
+                task="getOffVideosDaily",
+                function="updateVideoIdStatus",
+                message="修改视频状态失败--- 推测 sql 问题,报错信息:{}".format(e),
+                status="fail"
+            )
 
     @classmethod
     def changeVideoIdStatus(cls, video_id):
@@ -115,12 +132,27 @@ class AutoGetOffVideos(object):
             "POST",
             url,
             headers=headers,
-            data=payload
+            data=payload,
+            timeout=10
         )
         if response.status_code == 200:
             result = response.json()
             if result.get("code", None) == 0:
                 cls.updateVideoIdStatus(video_id=video_id)
+            else:
+                log(
+                    task="getOffVideosDaily",
+                    function="changeVideoIdStatus",
+                    message="请求票圈修改状态异常---video_id = {}".format(video_id),
+                    data=result
+                )
+        else:
+            log(
+                task="getOffVideosDaily",
+                function="changeVideoIdStatus",
+                status="fail",
+                message="请求票圈修改状态异常,状态码非 200 ---video_id = {}".format(video_id),
+            )
 
     @classmethod
     def task1(cls):
@@ -133,9 +165,15 @@ class AutoGetOffVideos(object):
         video_set = cls.getLongArticlesVideos(time_stamp=three_days_before)
         vid_list = [i[0] for i in video_set]
         for video_id in tqdm(vid_list):
-            cls.changeVideoIdStatus(video_id=video_id)
-        # with ThreadPoolExecutor(max_workers=8) as Pool1:
-        #     Pool1.map(cls.changeVideoIdStatus, vid_list)
+            try:
+                cls.changeVideoIdStatus(video_id=video_id)
+            except Exception as e:
+                log(
+                    task="getOffVideosDaily",
+                    function="task1",
+                    status="fail",
+                    message="task1下架单个视频失败,video_id={}, 报错信息={}".format(video_id, e),
+                )
 
     @classmethod
     def task2(cls):
@@ -153,7 +191,15 @@ class AutoGetOffVideos(object):
         if vid_tuple:
             vid_list = [i[0] for i in vid_tuple]
             for vid in vid_list:
-                cls.changeVideoIdStatus(video_id=vid)
+                try:
+                    cls.changeVideoIdStatus(video_id=vid)
+                except Exception as e:
+                    log(
+                        task="getOffVideosDaily",
+                        function="task2",
+                        status="fail",
+                        message="task2下架单个视频失败,video_id={}, 报错信息={}".format(vid, e),
+                    )
             time.sleep(10)
             vid_tuple2 = cls.pqMysql.select(sql)
             if vid_tuple2:

+ 33 - 15
migrateRootSourceId.py

@@ -5,13 +5,11 @@
 import json
 import time
 
-import pymysql
 import datetime
 import schedule
 from tqdm import tqdm
-from concurrent.futures.thread import ThreadPoolExecutor
 
-from applications import Functions, PQMySQL
+from applications import Functions, PQMySQL, log
 
 
 class UpdateRootSourceId(object):
@@ -55,6 +53,11 @@ class UpdateRootSourceId(object):
             and content_status = 2;
             """
         result = cls.db_client.select(sql)
+        log(
+            task="migrateRootSourceId",
+            function="getDataList",
+            message="一共找到了: {} 条记录".format(len(result))
+        )
         return result
 
     @classmethod
@@ -83,19 +86,34 @@ class UpdateRootSourceId(object):
                 values 
                 (%s, %s, %s, %s, %s, %s, %s, %s);
                 """
-                cls.db_client.update(
-                    sql=sql,
-                    params=(
-                        source_id,
-                        account_name,
-                        gh_id,
-                        title,
-                        request_time_stamp,
-                        trace_id,
-                        cls.source_id_list.get(source_id, 2),
-                        video_id
+                try:
+                    cls.db_client.update(
+                        sql=sql,
+                        params=(
+                            source_id,
+                            account_name,
+                            gh_id,
+                            title,
+                            request_time_stamp,
+                            trace_id,
+                            cls.source_id_list.get(source_id, 2),
+                            video_id
+                        )
+                    )
+                    log(
+                        task="migrateRootSourceId",
+                        function="processEachData",
+                        message="更新消息成功",
+                        data={"trace_id": trace_id}
+                    )
+                except Exception as e:
+                    log(
+                        task="migrateRootSourceId",
+                        function="processEachData",
+                        message="更新消息失败,报错信息是: {}".format(e),
+                        status="fail",
+                        data={"trace_id": trace_id}
                     )
-                )
             else:
                 print("No result")