Преглед изворни кода

task4.py 增加超时处理,超时设置20s
update_msg.py 增加报警
wxSpiderApi.py 回滚至v0.3

罗俊辉 пре 1 година
родитељ
комит
1260e57c04
3 измењених фајлова са 25 додато и 208 уклоњено
  1. 8 10
      applications/wxSpiderApi.py
  2. 1 4
      tasks/task4.py
  3. 16 194
      update_msg.py

+ 8 - 10
applications/wxSpiderApi.py

@@ -2,9 +2,10 @@
 @author: luojunhui
 @author: luojunhui
 """
 """
 import json
 import json
+import time
 import requests
 import requests
 
 
-from applications.decoratorApi import retryOnNone, retryOnTimeout
+from applications.decoratorApi import retryOnNone
 
 
 
 
 class WeixinSpider(object):
 class WeixinSpider(object):
@@ -13,7 +14,6 @@ class WeixinSpider(object):
     """
     """
 
 
     @classmethod
     @classmethod
-    @retryOnTimeout()
     @retryOnNone()
     @retryOnNone()
     def search_articles(cls, title):
     def search_articles(cls, title):
         """
         """
@@ -29,11 +29,10 @@ class WeixinSpider(object):
             'Content-Type': 'application/json'
             'Content-Type': 'application/json'
         }
         }
 
 
-        response = requests.request("POST", url, headers=headers, data=payload, timeout=10)
+        response = requests.request("POST", url, headers=headers, data=payload)
         return response.json()
         return response.json()
 
 
     @classmethod
     @classmethod
-    @retryOnTimeout()
     @retryOnNone()
     @retryOnNone()
     def get_article_text(cls, content_link):
     def get_article_text(cls, content_link):
         """
         """
@@ -50,11 +49,10 @@ class WeixinSpider(object):
         headers = {
         headers = {
             'Content-Type': 'application/json'
             'Content-Type': 'application/json'
         }
         }
-        response = requests.request("POST", url, headers=headers, data=payload, timeout=10)
+        response = requests.request("POST", url, headers=headers, data=payload)
         return response.json()
         return response.json()
 
 
     @classmethod
     @classmethod
-    @retryOnTimeout()
     @retryOnNone()
     @retryOnNone()
     def update_msg_list(cls, ghId, index):
     def update_msg_list(cls, ghId, index):
         """
         """
@@ -65,14 +63,14 @@ class WeixinSpider(object):
             'account_id': ghId,
             'account_id': ghId,
             'cursor': index,
             'cursor': index,
         }
         }
+        print(payload)
         headers = {
         headers = {
             'Content-Type': 'application/json'
             'Content-Type': 'application/json'
         }
         }
-        response = requests.post(url, headers=headers, data=json.dumps(payload), timeout=10)
+        response = requests.post(url, headers=headers, data=json.dumps(payload))
         return response.json()
         return response.json()
 
 
     @classmethod
     @classmethod
-    @retryOnTimeout()
     @retryOnNone()
     @retryOnNone()
     def get_account_by_url(cls, content_url):
     def get_account_by_url(cls, content_url):
         """
         """
@@ -84,7 +82,7 @@ class WeixinSpider(object):
             "POST",
             "POST",
             url='http://8.217.190.241:8888/crawler/wei_xin/account_info',
             url='http://8.217.190.241:8888/crawler/wei_xin/account_info',
             headers={'Content-Type': 'application/json'},
             headers={'Content-Type': 'application/json'},
-            json={"content_link": content_url},
-            timeout=10
+            json={"content_link": content_url}
         )
         )
         return response.json()
         return response.json()
+

+ 1 - 4
tasks/task4.py

@@ -2,9 +2,6 @@
 @author: luojunhui
 @author: luojunhui
 """
 """
 import requests
 import requests
-from config import accountBaseInfo
-
-from tqdm import tqdm
 
 
 
 
 def update_articles(gh_id):
 def update_articles(gh_id):
@@ -15,6 +12,6 @@ def update_articles(gh_id):
     url = "http://61.48.133.26:6060/article_crawler"
     url = "http://61.48.133.26:6060/article_crawler"
     headers = {"Content-Type": "application/json"}
     headers = {"Content-Type": "application/json"}
     body = {"ghId": gh_id}
     body = {"ghId": gh_id}
-    response = requests.request("POST", url=url, headers=headers, json=body)
+    response = requests.request("POST", url=url, headers=headers, json=body, timeout=20)
     print(response.json())
     print(response.json())
 
 

+ 16 - 194
update_msg.py

@@ -12,7 +12,8 @@ from tqdm import tqdm
 from datetime import datetime
 from datetime import datetime
 
 
 from config import accountBaseInfo
 from config import accountBaseInfo
-from applications import PQMySQL, WeixinSpider, Functions
+from applications import PQMySQL
+from tasks.task4 import update_articles
 from applications.decoratorApi import retryOnTimeout
 from applications.decoratorApi import retryOnTimeout
 
 
 
 
@@ -22,8 +23,6 @@ class UpdateMsgDaily(object):
     """
     """
 
 
     db_client = PQMySQL()
     db_client = PQMySQL()
-    spider = WeixinSpider()
-    functions = Functions()
 
 
     @classmethod
     @classmethod
     def getAccountIdDict(cls):
     def getAccountIdDict(cls):
@@ -72,189 +71,6 @@ class UpdateMsgDaily(object):
         }
         }
         requests.request("POST", url=url, headers=headers, data=json.dumps(payload), timeout=10)
         requests.request("POST", url=url, headers=headers, data=json.dumps(payload), timeout=10)
 
 
-    @classmethod
-    def findAccountLatestUpdateTime(cls, gh_id):
-        """
-        获取账号的最近更新id
-        :param gh_id:
-        :return:
-        """
-        sql = f"""
-            select accountName, updateTime 
-            from official_articles_v2 
-            where ghId = '{gh_id}' 
-            order by updateTime DESC;
-            """
-        result = cls.db_client.select(sql)
-        if result:
-            account_name, update_time = result[0]
-            return {"update_time": update_time, "account_type": "history"}
-        else:
-            return {
-                "update_time": int(time.time()) - 30 * 24 * 60 * 60,
-                "account_type": "new",
-            }
-
-    @classmethod
-    def updateMsgList(cls, gh_id, account_name, msg_list):
-        """
-        把消息数据更新到数据库中
-        :param account_name:
-        :param gh_id:
-        :param msg_list:
-        :return:
-        """
-        for info in msg_list:
-            baseInfo = info.get("BaseInfo", {})
-            appMsgId = info.get("AppMsg", {}).get("BaseInfo", {}).get("AppMsgId", None)
-            createTime = (
-                info.get("AppMsg", {}).get("BaseInfo", {}).get("CreateTime", None)
-            )
-            updateTime = (
-                info.get("AppMsg", {}).get("BaseInfo", {}).get("UpdateTime", None)
-            )
-            Type = info.get("AppMsg", {}).get("BaseInfo", {}).get("Type", None)
-            detail_article_list = info.get("AppMsg", {}).get("DetailInfo", [])
-            if detail_article_list:
-                for article in detail_article_list:
-                    title = article.get("Title", None)
-                    Digest = article.get("Digest", None)
-                    ItemIndex = article.get("ItemIndex", None)
-                    ContentUrl = article.get("ContentUrl", None)
-                    SourceUrl = article.get("SourceUrl", None)
-                    CoverImgUrl = article.get("CoverImgUrl", None)
-                    CoverImgUrl_1_1 = article.get("CoverImgUrl_1_1", None)
-                    CoverImgUrl_235_1 = article.get("CoverImgUrl_235_1", None)
-                    ItemShowType = article.get("ItemShowType", None)
-                    IsOriginal = article.get("IsOriginal", None)
-                    ShowDesc = article.get("ShowDesc", None)
-                    show_stat = cls.functions.show_desc_to_sta(ShowDesc)
-                    ori_content = article.get("ori_content", None)
-                    show_view_count = show_stat.get("show_view_count", 0)
-                    show_like_count = show_stat.get("show_like_count", 0)
-                    show_zs_count = show_stat.get("show_zs_count", 0)
-                    show_pay_count = show_stat.get("show_pay_count", 0)
-                    wx_sn = (
-                        ContentUrl.split("&sn=")[1].split("&")[0]
-                        if ContentUrl
-                        else None
-                    )
-                    info_tuple = (
-                        gh_id,
-                        account_name,
-                        appMsgId,
-                        title,
-                        Type,
-                        createTime,
-                        updateTime,
-                        Digest,
-                        ItemIndex,
-                        ContentUrl,
-                        SourceUrl,
-                        CoverImgUrl,
-                        CoverImgUrl_1_1,
-                        CoverImgUrl_235_1,
-                        ItemShowType,
-                        IsOriginal,
-                        ShowDesc,
-                        ori_content,
-                        show_view_count,
-                        show_like_count,
-                        show_zs_count,
-                        show_pay_count,
-                        wx_sn,
-                        json.dumps(baseInfo, ensure_ascii=False),
-                    )
-                    try:
-                        insert_sql = f"""
-                                    INSERT INTO official_articles_v2
-                                    (ghId, accountName, appMsgId, title, Type, createTime, updateTime, Digest, ItemIndex, ContentUrl, SourceUrl, CoverImgUrl, CoverImgUrl_1_1, CoverImgUrl_255_1, ItemShowType, IsOriginal, ShowDesc, ori_content, show_view_count, show_like_count, show_zs_count, show_pay_count, wx_sn, baseInfo)
-                                    values
-                                    (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s);
-                                    """
-                        cls.db_client.update(sql=insert_sql, params=info_tuple)
-                    except Exception as e:
-                        try:
-                            update_sql = f"""
-                                    UPDATE official_articles_v2
-                                    SET show_view_count = %s, show_like_count=%s
-                                    WHERE wx_sn = %s;
-                                    """
-                            cls.db_client.update(
-                                sql=update_sql,
-                                params=(show_view_count, show_like_count, wx_sn),
-                            )
-                        except Exception as e:
-                            print("失败-{}".format(e))
-                            continue
-
-    @classmethod
-    def getAccountArticleList(cls, gh_id, last_update_time, cursor=None):
-        """
-        输入ghid获取账号的文章list
-        :return:
-        """
-        try:
-            response = cls.spider.update_msg_list(ghId=gh_id, index=cursor)
-        except Exception as e:
-            response = {
-                "error": str(e),
-                "info": "更新文章接口请求失败",
-                "gh_id": gh_id,
-                "time": datetime.now().__str__()
-            }
-            # 之后可以考虑抛出阿里云日志
-            print(response)
-            return
-        msg_list = response.get("data", {}).get("data")
-        if msg_list:
-            last_article_in_this_msg = msg_list[-1]
-            last_time_stamp_in_this_msg = last_article_in_this_msg["AppMsg"][
-                "BaseInfo"
-            ]["UpdateTime"]
-            last_url = last_article_in_this_msg["AppMsg"]["DetailInfo"][0]["ContentUrl"]
-            # 校验是否抓到的是同一个账号
-            try:
-                resdata = cls.spider.get_account_by_url(last_url)
-            except Exception as e:
-                resdata = {
-                    "error": str(e),
-                    "info": "通过链接获取账号信息失败",
-                    "gh_id": gh_id,
-                    "time": datetime.now().__str__()
-                }
-                print(resdata)
-                return
-            check_name = resdata["data"].get("data", {}).get("account_name")
-            check_id = resdata["data"].get("data", {}).get("wx_gh")
-            if check_id == gh_id:
-                cls.updateMsgList(gh_id, check_name, msg_list)
-                if last_time_stamp_in_this_msg > last_update_time:
-                    next_cursor = response["data"]["next_cursor"]
-                    return cls.getAccountArticleList(
-                        gh_id=gh_id,
-                        last_update_time=last_update_time,
-                        cursor=next_cursor,
-                    )
-            else:
-                response = {
-                    "code": 1002,
-                    "info": "抓取时候账号校验失败",
-                    "error": None,
-                    "gh_id": gh_id,
-                    "time_stamp": datetime.now().__str__(),
-                }
-                print(response)
-        else:
-            response = {
-                "code": 1003,
-                "info": "账号为抓取到内容",
-                "error": None,
-                "gh_id": gh_id,
-                "time_stamp": datetime.now().__str__(),
-            }
-            print(response)
-
     @classmethod
     @classmethod
     def checkEachAccount(cls, gh_id):
     def checkEachAccount(cls, gh_id):
         """
         """
@@ -285,14 +101,11 @@ class UpdateMsgDaily(object):
         更新文章任务
         更新文章任务
         :return:
         :return:
         """
         """
-        account_list = cls.getAccountIdDict()
+        account_dict = cls.getAccountIdDict()
+        account_list = list(account_dict.keys())
         for account_id in tqdm(account_list):
         for account_id in tqdm(account_list):
-            account_info = cls.findAccountLatestUpdateTime(account_id)
-            latest_time = account_info["update_time"]
             try:
             try:
-                cls.getAccountArticleList(
-                    gh_id=account_id, last_update_time=latest_time
-                )
+                update_articles(gh_id=account_id)
             except Exception as e:
             except Exception as e:
                 response = {
                 response = {
                     "code": 1001,
                     "code": 1001,
@@ -337,8 +150,17 @@ def job_with_thread(job_func):
 if __name__ == "__main__":
 if __name__ == "__main__":
     UMD = UpdateMsgDaily()
     UMD = UpdateMsgDaily()
 
 
-    schedule.every().day.at("21:00").do(job_with_thread, UMD.updateJob)
-    schedule.every().day.at("21:30").do(job_with_thread, UMD.checkJob)
+    try:
+        schedule.every().day.at("21:00").do(job_with_thread, UMD.updateJob)
+
+    except Exception as error:
+        UMD.bot(account_list=["更新文章定时任务异常终止", str(error)])
+
+    try:
+        schedule.every().day.at("21:30").do(job_with_thread, UMD.checkJob)
+
+    except Exception as error:
+        UMD.bot(account_list=['校验账号任务异常终止', str(error)])
 
 
     while True:
     while True:
         schedule.run_pending()
         schedule.run_pending()