Просмотр исходного кода

添加飞书机器人方法
更新账号均值,
更新小程序信息
更新账号每日发布信息
三个任务增加 aliyun 日志和报警

罗俊辉 7 месяцев назад
Родитель
Сommit
58891ed651
5 измененных файлов с 296 добавлено и 108 удалено
  1. 2 1
      applications/__init__.py
  2. 41 0
      applications/feishuBotApi.py
  3. 72 29
      updateAccountAvgDaily.py
  4. 103 22
      updateMinigramInfoDaily.py
  5. 78 56
      updatePublishedMsgDaily.py

+ 2 - 1
applications/__init__.py

@@ -9,4 +9,5 @@ from .functions import Functions
 from .data_works import ODPSApi
 from .wxSpiderApi import WeixinSpider
 from .algApi import AlgApi
-from .aliyunLogApi import log
+from .aliyunLogApi import log
+from .feishuBotApi import bot

+ 41 - 0
applications/feishuBotApi.py

@@ -0,0 +1,41 @@
+"""
+@author: luojunhui
+"""
+import json
+import requests
+
+from applications.decoratorApi import retryOnTimeout
+
+
+@retryOnTimeout()
+def bot(title, detail):
+    """
+    机器人
+    """
+    url = "https://open.feishu.cn/open-apis/bot/v2/hook/b44333f2-16c0-4cb1-af01-d135f8704410"
+    headers = {"Content-Type": "application/json"}
+    payload = {
+        "msg_type": "interactive",
+        "card": {
+            "elements": [
+                {
+                    "tag": "div",
+                    "text": {
+                        "content": "{}<at id=all></at>\n".format(title),
+                        "tag": "lark_md",
+                    },
+                },
+                {
+                    "tag": "div",
+                    "text": {
+                        "content": json.dumps(
+                            detail, ensure_ascii=False, indent=4
+                        ),
+                        "tag": "lark_md",
+                    },
+                },
+            ],
+            "header": {"title": {"content": "【重点关注】", "tag": "plain_text"}},
+        },
+    }
+    requests.request("POST", url=url, headers=headers, data=json.dumps(payload), timeout=10)

+ 72 - 29
updateAccountAvgDaily.py

@@ -21,7 +21,7 @@ from datetime import datetime
 from pandas import DataFrame
 from tqdm import tqdm
 
-from applications import PQMySQL, DeNetMysql, Functions
+from applications import PQMySQL, DeNetMysql, Functions, log, bot
 
 
 def filter_outlier_data(group, key='show_view_count'):
@@ -52,7 +52,7 @@ class UpdateAvgDaily(object):
     @classmethod
     def getAccountList(cls):
         """
-        获取账号 list
+        get official accounts and its ghid, fans, and account_type
         :return:
         """
         sql = f"""
@@ -64,6 +64,11 @@ class UpdateAvgDaily(object):
             ON t2.group_source_name = t3.account_source_name
         """
         response = cls.deNetClient.select(sql)
+        log(
+            task="updateAccountAvgDaily",
+            function="getAccountList",
+            message="获取账号成功,一共获取: {} 个账号".format(len(response))
+        )
         L = []
         for item in response:
             temp = {
@@ -78,20 +83,13 @@ class UpdateAvgDaily(object):
                 continue
             else:
                 L.append(temp)
+        log(
+            task="updateAccountAvgDaily",
+            function="getAccountList",
+            message="过滤账号成功,过滤后一共获取: {} 个账号".format(len(L))
+        )
         return L
 
-    @classmethod
-    def getAccountIdDict(cls):
-        """
-        获取全部内部账号的id
-        :return:
-        """
-        gh_id_dict = {}
-        for line in cls.account_list:
-            gh_id = line['gh_id']
-            gh_id_dict[gh_id] = line
-        return gh_id_dict
-
     @classmethod
     def insertIntoMysql(cls, data):
         """
@@ -105,19 +103,34 @@ class UpdateAvgDaily(object):
         values 
         (%s, %s, %s, %s, %s, %s, %s, %s);
         """
-        cls.pqClient.update(
-            sql=sql,
-            params=(
-                data['gh_id'],
-                data['position'],
-                data['account_name'],
-                data['fans'],
-                data['avg_read'],
-                data['avg_like'],
-                data['update_time'],
-                1
+        try:
+            cls.pqClient.update(
+                sql=sql,
+                params=(
+                    data['gh_id'],
+                    data['position'],
+                    data['account_name'],
+                    data['fans'],
+                    data['avg_read'],
+                    data['avg_like'],
+                    data['update_time'],
+                    1
+                )
+            )
+            log(
+                task="updateAccountAgvDaily",
+                function="insertIntoMysql",
+                message="数据插入成功",
+                data=data
+            )
+        except Exception as e:
+            log(
+                task="updateAccountAgvDaily",
+                function="insertIntoMysql",
+                message="数据插入失败, 失败原因是: {}".format(e),
+                status="fail",
+                data=data
             )
-        )
 
     @classmethod
     def getAllAvgRead(cls):
@@ -151,17 +164,47 @@ class UpdateAvgDaily(object):
                     cls.insertIntoMysql(obj)
                     L.append(obj)
                 except Exception as e:
-                    print(e)
+                    log(
+                        task="updateAccountAvgDaily",
+                        function="getAllAvgRead",
+                        status="fail",
+                        message="更新单个账号单个位置的账号均值失败, 失败原因是: {}".format(e)
+                    )
 
         with open("new_account_avg_v3.json", "w", encoding="utf-8") as f:
             f.write(json.dumps(L, ensure_ascii=False, indent=4))
 
+        log(
+            task="updateAccountAvgDaily",
+            function="getAllAvgRead",
+            message="账号均值数据写入文件成功"
+        )
+
         update_sql = f"""
         UPDATE account_avg_info_v2
         SET status = %s
         where update_time != '{dt_str}';
         """
-        cls.pqClient.update(sql=update_sql, params=0)
+        try:
+            cls.pqClient.update(sql=update_sql, params=0)
+            log(
+                task="updateAccountAvgDaily",
+                function="getAllAvgRead",
+                message="修改非当日数据状态为 0 成功"
+            )
+        except Exception as e:
+            bot(
+                title="账号均值表,更新非当日数据状态失败",
+                detail={
+                    "task": "updateAccountAvgDaily"
+                }
+            )
+            log(
+                task="updateAccountAvgDaily",
+                function="getAllAvgRead",
+                status="fail",
+                message="修改非当日数据状态为 0 失败, 报错为 {}".format(e)
+            )
 
     @classmethod
     def getEachAvgRead(cls, account_name, index):
@@ -229,7 +272,7 @@ def updateAvgJob():
 
 
 if __name__ == "__main__":
-    # updateAvgJob()
+
     schedule.every().day.at("22:30").do(Functions().job_with_thread, updateAvgJob)
 
     while True:

+ 103 - 22
updateMinigramInfoDaily.py

@@ -8,7 +8,7 @@ from tqdm import tqdm
 from datetime import datetime, timedelta
 import schedule
 
-from applications import longArticlesMySQL, PQMySQL, WeixinSpider, Functions
+from applications import longArticlesMySQL, PQMySQL, WeixinSpider, Functions, log, bot
 
 
 class DailyDataManager(object):
@@ -40,6 +40,11 @@ class DailyDataManager(object):
                         );
         """
         result_list = cls.pqMysql.select(sql2)
+        log(
+            task="updateMinigramInfoDaily",
+            function="getPublishedArticles",
+            message="一共获取 {} 篇文章数据".format(len(result_list))
+        )
         return result_list
 
     @classmethod
@@ -86,9 +91,18 @@ class DailyDataManager(object):
                             dt_str
                         )
                     )
+                    log(
+                        task="updateMinigramInfoDaily",
+                        function="updateInfo",
+                        message="插入数据成功, video_id 是: {}".format(video_id)
+                    )
         except Exception as e:
-            print(e)
-            pass
+            log(
+                task="updateMinigramInfoDaily",
+                function="updateInfo",
+                status="fail",
+                message="插入数据失败, 失败原因是".format(e)
+            )
 
     @classmethod
     def getRootSourceIds(cls, data_info):
@@ -97,9 +111,32 @@ class DailyDataManager(object):
         :return:
         """
         url = data_info[0]
-        article_detail = cls.wxSpider.get_article_text(url)
-        mini_info = article_detail['data']['data']['mini_program']
-        return data_info[1].decode(), mini_info, data_info[2]
+        try:
+            article_detail = cls.wxSpider.get_article_text(url)
+            mini_info = article_detail['data']['data']['mini_program']
+            log(
+                task="updateMinigramInfoDaily",
+                function="getRootSourceIds",
+                message="获取文章链接对应的 rootSourceId 成功",
+                data={
+                    "ContentUrl": url,
+                    "wxSn": data_info[1].decode(),
+                    "createTime": data_info[2],
+                    "miniInfo": mini_info
+                }
+            )
+            return data_info[1].decode(), mini_info, data_info[2]
+        except Exception as e:
+            log(
+                task="updateMinigramInfoDaily",
+                function="getRootSourceIds",
+                status="fail",
+                message="获取文章链接对应的 rootSourceId失败, 报错信息是: {}".format(e),
+                data={
+                    "ContentUrl": url
+                }
+            )
+            return
 
     @classmethod
     def getMinigramInfo(cls, rootSourceId):
@@ -171,7 +208,23 @@ class DailyDataManager(object):
                 L[key] = temp
             return L
 
-        return summarize(result_list)
+        try:
+            response = summarize(result_list)
+            log(
+                task="updateMinigramInfoDaily",
+                function="getMinigramInfo",
+                message="计算source_id信息成功",
+                data=response
+            )
+            return response
+        except Exception as e:
+            log(
+                task="updateMinigramInfoDaily",
+                function="getMinigramInfo",
+                message="获取 source_id信息失败, 报错信息是: {}".format(e),
+                status="fail"
+            )
+            return None
 
     @classmethod
     def updateDetail(cls):
@@ -179,16 +232,20 @@ class DailyDataManager(object):
         :return:
         """
         today = datetime.today()
-        # 获取昨天的日期
+        # 获取三天前的日期
         yesterday = today - timedelta(days=3)
         yesterday_str = yesterday.__str__().split(" ")[0]
-        print(yesterday_str)
         sql = f"""
-        select distinct root_source_id
-        from long_articles_detail_info
-        where publish_dt >= '{yesterday_str}';
+            select distinct root_source_id
+            from long_articles_detail_info
+            where publish_dt >= '{yesterday_str}';
         """
         source_id_list = cls.pqMysql.select(sql)
+        log(
+            task="updateMinigramInfoDaily",
+            function="updateDetail",
+            message="获取前三天的 rootSourceId, 一共有 {} 条记录".format(len(source_id_list))
+        )
         for item in tqdm(source_id_list):
             s_id = item[0]
             try:
@@ -201,9 +258,9 @@ class DailyDataManager(object):
                     fission_2 = result[key][3]
                     # print(s_id, recall_dt, first_level, fission_0, fission_1, fission_2)
                     update_sql = f"""
-                    UPDATE long_articles_detail_info
-                    set first_level = %s, fission_0 = %s, fission_1 = %s, fission_2 = %s
-                    where root_source_id = %s and recall_dt = %s;
+                        UPDATE long_articles_detail_info
+                        set first_level = %s, fission_0 = %s, fission_1 = %s, fission_2 = %s
+                        where root_source_id = %s and recall_dt = %s;
                     """
                     try:
                         cls.pqMysql.update(
@@ -213,9 +270,19 @@ class DailyDataManager(object):
                             )
                         )
                     except Exception as e:
-                        print("insert error", e)
+                        log(
+                            task="updateMinigramInfoDaily",
+                            function="updateDetail",
+                            status="fail",
+                            message="mysql 更新失败, 报错信息是 {}".format(e)
+                        )
             except Exception as e:
-                print(e)
+                log(
+                    task="updateMinigramInfoDaily",
+                    function="updateDetail",
+                    status="fail",
+                    message="更新单条数据失败, 报错信息是 {}".format(e)
+                )
 
 
 def updateArticlesJob():
@@ -227,7 +294,11 @@ def updateArticlesJob():
     article_list = DDM.getPublishedArticles()
     for article in tqdm(article_list):
         DDM.updateInfo(article)
-    print("文章更新完成---{}".format(datetime.today().__str__()))
+    log(
+        task="updateMinigramInfoDaily",
+        function="updateArticlesJob",
+        message="文章更新完成---{}".format(datetime.today().__str__())
+    )
 
 
 def updateMinigramInfoJob():
@@ -236,14 +307,24 @@ def updateMinigramInfoJob():
     :return:
     """
     DDM = DailyDataManager()
-    DDM.updateDetail()
-    print("小程序更新完成---{}".format(datetime.today().__str__()))
+    try:
+        DDM.updateDetail()
+        log(
+            task="updateMinigramInfoDaily",
+            function="updateArticlesJob",
+            message="小程序更新完成---{}".format(datetime.today().__str__())
+        )
+    except Exception as e:
+        log(
+            task="updateMinigramInfoDaily",
+            function="updateArticlesJob",
+            status="fail",
+            message="小程序更新失败---{}, 报错信息是: {}".format(datetime.today().__str__(), e)
+        )
 
 
 if __name__ == '__main__':
 
-    # updateArticlesJob()
-    # updateMinigramInfoJob()
     schedule.every().day.at("01:00").do(Functions().job_with_thread, updateArticlesJob)
 
     schedule.every().day.at("03:30").do(Functions().job_with_thread, updateMinigramInfoJob)

+ 78 - 56
updatePublishedMsgDaily.py

@@ -7,49 +7,11 @@ import time
 import json
 import threading
 
-import requests
 import schedule
 from tqdm import tqdm
 from datetime import datetime
 
-from applications import PQMySQL
-from applications.decoratorApi import retryOnTimeout
-from applications import WeixinSpider
-from applications import Functions
-
-
-@retryOnTimeout()
-def bot(account_list):
-    """
-    机器人
-    """
-    url = "https://open.feishu.cn/open-apis/bot/v2/hook/b44333f2-16c0-4cb1-af01-d135f8704410"
-    headers = {"Content-Type": "application/json"}
-    payload = {
-        "msg_type": "interactive",
-        "card": {
-            "elements": [
-                {
-                    "tag": "div",
-                    "text": {
-                        "content": "存在文章更新失败<at id=all></at>\n",
-                        "tag": "lark_md",
-                    },
-                },
-                {
-                    "tag": "div",
-                    "text": {
-                        "content": json.dumps(
-                            account_list, ensure_ascii=False, indent=4
-                        ),
-                        "tag": "lark_md",
-                    },
-                },
-            ],
-            "header": {"title": {"content": "【重点关注】", "tag": "plain_text"}},
-        },
-    }
-    requests.request("POST", url=url, headers=headers, data=json.dumps(payload), timeout=10)
+from applications import PQMySQL, WeixinSpider, Functions, log, bot
 
 
 def getAccounts():
@@ -78,8 +40,6 @@ def insertEachMsg(db_client, gh_id, account_name, msg_list):
         appMsgId = info.get("AppMsg", {}).get("BaseInfo", {}).get("AppMsgId", None)
         createTime = info.get("AppMsg", {}).get("BaseInfo", {}).get("CreateTime", None)
         updateTime = info.get("AppMsg", {}).get("BaseInfo", {}).get("UpdateTime", None)
-        # if int(time.time()) - int(updateTime) <= 20 * 60 * 60:
-        #     continue
         Type = info.get("AppMsg", {}).get("BaseInfo", {}).get("Type", None)
         detail_article_list = info.get("AppMsg", {}).get("DetailInfo", [])
         if detail_article_list:
@@ -128,7 +88,6 @@ def insertEachMsg(db_client, gh_id, account_name, msg_list):
                     wx_sn,
                     json.dumps(baseInfo, ensure_ascii=False)
                 )
-                # print(info_tuple)
                 try:
                     insert_sql = f"""
                         INSERT INTO official_articles_v2
@@ -137,7 +96,15 @@ def insertEachMsg(db_client, gh_id, account_name, msg_list):
                         (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s);
                         """
                     db_client.update(sql=insert_sql, params=info_tuple)
-                    print("插入成功")
+                    log(
+                        task="updatePublishedMsgDaily",
+                        function="insertEachMsg",
+                        message="插入文章数据成功",
+                        data={
+                            "info": info_tuple
+                        }
+
+                    )
                 except Exception as e:
                     try:
                         update_sql = f"""
@@ -147,9 +114,24 @@ def insertEachMsg(db_client, gh_id, account_name, msg_list):
                         """
                         db_client.update(sql=update_sql,
                                          params=(show_view_count, show_like_count, wx_sn))
-                        print("更新成功")
+                        log(
+                            task="updatePublishedMsgDaily",
+                            function="insertEachMsg",
+                            message="更新文章数据成功",
+                            data={
+                                "wxSn": wx_sn,
+                                "likeCount": show_like_count,
+                                "viewCount": show_view_count
+                            }
+
+                        )
                     except Exception as e:
-                        print("失败-{}".format(e))
+                        log(
+                            task="updatePublishedMsgDaily",
+                            function="insertEachMsg",
+                            message="更新文章失败, 报错原因是: {}".format(e),
+                            status="fail"
+                        )
                         continue
 
 
@@ -189,7 +171,20 @@ def updateEachAccount(db_client, gh_id, account_name, latest_update_time, cursor
                     latest_update_time=latest_update_time,
                     cursor=next_cursor
                 )
+            log(
+                task="updatePublishedMsgDaily",
+                function="updateEachAccount",
+                message="账号文章更新成功",
+                data=response
+            )
     else:
+        log(
+            task="updatePublishedMsgDaily",
+            function="updateEachAccount",
+            message="账号文章更新失败",
+            status="fail",
+            data=response
+        )
         return
 
 
@@ -284,20 +279,45 @@ def updateJob():
     """
     db_client = PQMySQL()
     sub_accounts, server_accounts = getAccounts()
-
+    s_count = 0
+    f_count = 0
     for sub_item in tqdm(sub_accounts):
-        # updateSingleJob(db_client, sub_item['ghId'])
         try:
             updateSingleJob(db_client, sub_item['ghId'])
-            time.sleep(2)
+            s_count += 1
+            time.sleep(5)
         except Exception as e:
-            print(e)
-    print("订阅号更新完成")
+            f_count += 1
+            log(
+                task="updatePublishedMsgDaily",
+                function="updateJob",
+                message="单个账号文章更新失败, 报错信息是: {}".format(e),
+                status="fail",
+            )
+    log(
+        task="updatePublishedMsgDaily",
+        function="updateJob",
+        message="订阅号更新完成",
+        data={
+            "success": s_count,
+            "fail": f_count
+        }
+    )
+
+    if f_count / (s_count + f_count) > 0.3:
+        bot(
+            title="订阅号超过 30% 的账号更新失败",
+            detail={
+                "success": s_count,
+                "fail": f_count,
+                "failRate": f_count / (s_count + f_count)
+            }
+        )
 
     for sub_item in tqdm(server_accounts):
         try:
             updateSingleJob(db_client, sub_item['ghId'])
-            time.sleep(2)
+            time.sleep(5)
         except Exception as e:
             print(e)
 
@@ -325,7 +345,10 @@ def checkJob():
             fail_list.append(sub_item)
     if fail_list:
         try:
-            bot(fail_list)
+            bot(
+                title="日常报警, 存在账号更新失败",
+                detail=fail_list
+            )
         except Exception as e:
             print("Timeout Error: {}".format(e))
 
@@ -342,10 +365,9 @@ def job_with_thread(job_func):
 
 if __name__ == '__main__':
 
-    # updateJob()
-    # schedule.every().day.at("20:50").do(job_with_thread, updateJob)
-    #
-    schedule.every().day.at("22:30").do(job_with_thread, checkJob)
+    schedule.every().day.at("20:50").do(job_with_thread, updateJob)
+
+    schedule.every().day.at("22:00").do(job_with_thread, checkJob)
 
     while True:
         schedule.run_pending()