罗俊辉 1 рік тому
батько
коміт
441916bcb5
3 змінених файлів з 358 додано та 31 видалено
  1. 41 0
      applications/pipeline.py
  2. 22 14
      spider/weixinRelativeAccountCrawler.py
  3. 295 17
      update_msg.py

+ 41 - 0
applications/pipeline.py

@@ -0,0 +1,41 @@
+"""
+@author: luojunhui
+"""
+import json
+
+from applications import WeixinSpider, PQMySQL
+
+
+class LongArticlesPipeline(object):
+    """
+    长文管道, 对数据进行加工过滤
+    """
+
+    db_client = PQMySQL()
+
+    @classmethod
+    def checkAccountExists(cls, gh_id):
+        """
+        校验账号是否已经抓过
+        :param gh_id:
+        :return:
+        """
+        sql = f"""
+            INSERT 
+        """
+
+    @classmethod
+    def accountFilter(cls, account_info):
+        """
+        账号过滤
+        :param account_info:
+        :return:
+        """
+        # 查询账号是否已经存在
+
+
+
+s = WeixinSpider()
+response = s.get_account_by_url("https://mp.weixin.qq.com/s/ZOa1JMX-TFCLwfnaXiB7nA")
+print(json.dumps(response, ensure_ascii=False, indent=4))
+

+ 22 - 14
spider/weixinRelativeAccountCrawler.py

@@ -72,7 +72,7 @@ class weixinRelationAccountGoodArticles(object):
         """
         insert_sql = f"""
             INSERT INTO long_articles_assiciation_accounts
-            (account_outside_id, accout_name, source_article_title, source_account, association_time, is_using)
+            (account_outside_id, account_name, source_article_title, source_account, association_time, is_using)
             values 
             (%s, %s, %s, %s, %s, %s);
         """
@@ -188,15 +188,16 @@ class weixinRelationAccountGoodArticles(object):
                     print(e)
 
     @classmethod
-    def searchResultFilter(cls, filter_type, info_list):
+    def searchResultFilter(cls, filter_type, info):
         """
         搜索结果过滤
-        :param info_list: 待过滤的数据list
+        :param info: 待过滤信息
         :param filter_type: 过滤类型,account表示账号过滤, article表示文章过滤
         :return: 过滤后的结果
         """
-        # if filter_type == 'account':
-        #     for item in info_list:
+        match filter_type:
+            case "account":
+                return account
 
 
 if __name__ == "__main__":
@@ -208,20 +209,27 @@ if __name__ == "__main__":
             account_id=source_account
         )
         goodArticles = weixin.filterGoodArticle(accountArticlesDataFrame)
-        for title in goodArticles[:1]:
+        for title in goodArticles[1:2]:
             account_list = weixin.searchGoodArticlesAccounts(
                 source_account=source_account, source_title=title
             )
+            print(title)
+            print(source_account)
             for associated_account in account_list:
                 source_title = associated_account[0]
                 associated_account_info = associated_account[1]
                 account_name = associated_account_info["data"]["data"]["account_name"]
                 gh_id = associated_account_info["data"]["data"]["wx_gh"]
-                # 初始化账号
-                weixin.init_account(gh_id=gh_id, account_name=account_name)
-                weixin.putIntoAssociationGraph(
-                    gh_id=gh_id,
-                    account_name=account_name,
-                    source_account=source_account,
-                    source_title=source_title,
-                )
+                if '新闻' in account_name:
+                    continue
+                elif '央视' in account_name:
+                    continue
+                else:
+                    # 初始化账号
+                    weixin.initAccount(gh_id=gh_id, account_name=account_name)
+                    weixin.putIntoAssociationGraph(
+                        gh_id=gh_id,
+                        account_name=account_name,
+                        source_account=source_account,
+                        source_title=title
+                    )

+ 295 - 17
update_msg.py

@@ -1,30 +1,308 @@
 """
 @author: luojunhui
 """
+
 import time
-from config import accountBaseInfo
-from tqdm import tqdm
-from tasks.task4 import update_articles
+import json
+
+import requests
 import schedule
+from tqdm import tqdm
+from datetime import datetime
+
+from config import accountBaseInfo
+from applications import PQMySQL, WeixinSpider, Functions
+
+
+class UpdateMsgDaily(object):
+    """
+    日常更新文章
+    """
+
+    db_client = PQMySQL()
+    spider = WeixinSpider()
+    functions = Functions()
+
+    @classmethod
+    def getAccountIdDict(cls):
+        """
+        获取全部内部账号的id
+        :return:
+        """
+        gh_id_dict = {}
+        for key in accountBaseInfo:
+            gh_id = accountBaseInfo[key]["ghId"]
+            name = accountBaseInfo[key]["accountName"]
+            gh_id_dict[gh_id] = name
+        return gh_id_dict
+
+    @classmethod
+    def bot(cls, account_list):
+        """
+        机器人
+        """
+        url = "https://open.feishu.cn/open-apis/bot/v2/hook/b44333f2-16c0-4cb1-af01-d135f8704410"
+        headers = {"Content-Type": "application/json"}
+        payload = {
+            "msg_type": "interactive",
+            "card": {
+                "elements": [
+                    {
+                        "tag": "div",
+                        "text": {
+                            "content": "存在文章更新失败<at id=all></at>\n",
+                            "tag": "lark_md",
+                        },
+                    },
+                    {
+                        "tag": "div",
+                        "text": {
+                            "content": json.dumps(
+                                account_list, ensure_ascii=False, indent=4
+                            ),
+                            "tag": "lark_md",
+                        },
+                    },
+                ],
+                "header": {"title": {"content": "【重点关注】", "tag": "plain_text"}},
+            },
+        }
+        requests.request("POST", url=url, headers=headers, data=json.dumps(payload))
+
+    @classmethod
+    def findAccountLatestUpdateTime(cls, gh_id):
+        """
+        获取账号的最近更新id
+        :param gh_id:
+        :return:
+        """
+        sql = f"""
+            select accountName, updateTime 
+            from official_articles_v2 
+            where ghId = '{gh_id}' 
+            order by updateTime DESC;
+            """
+        result = cls.db_client.select(sql)
+        if result:
+            account_name, update_time = result[0]
+            return {"update_time": update_time, "account_type": "history"}
+        else:
+            return {
+                "update_time": int(time.time()) - 30 * 24 * 60 * 60,
+                "account_type": "new",
+            }
+
+    @classmethod
+    def updateMsgList(cls, gh_id, account_name, msg_list):
+        """
+        把消息数据更新到数据库中
+        :param account_name:
+        :param gh_id:
+        :param msg_list:
+        :return:
+        """
+        for info in msg_list:
+            baseInfo = info.get("BaseInfo", {})
+            appMsgId = info.get("AppMsg", {}).get("BaseInfo", {}).get("AppMsgId", None)
+            createTime = (
+                info.get("AppMsg", {}).get("BaseInfo", {}).get("CreateTime", None)
+            )
+            updateTime = (
+                info.get("AppMsg", {}).get("BaseInfo", {}).get("UpdateTime", None)
+            )
+            Type = info.get("AppMsg", {}).get("BaseInfo", {}).get("Type", None)
+            detail_article_list = info.get("AppMsg", {}).get("DetailInfo", [])
+            if detail_article_list:
+                for article in detail_article_list:
+                    title = article.get("Title", None)
+                    Digest = article.get("Digest", None)
+                    ItemIndex = article.get("ItemIndex", None)
+                    ContentUrl = article.get("ContentUrl", None)
+                    SourceUrl = article.get("SourceUrl", None)
+                    CoverImgUrl = article.get("CoverImgUrl", None)
+                    CoverImgUrl_1_1 = article.get("CoverImgUrl_1_1", None)
+                    CoverImgUrl_235_1 = article.get("CoverImgUrl_235_1", None)
+                    ItemShowType = article.get("ItemShowType", None)
+                    IsOriginal = article.get("IsOriginal", None)
+                    ShowDesc = article.get("ShowDesc", None)
+                    show_stat = cls.functions.show_desc_to_sta(ShowDesc)
+                    ori_content = article.get("ori_content", None)
+                    show_view_count = show_stat.get("show_view_count", 0)
+                    show_like_count = show_stat.get("show_like_count", 0)
+                    show_zs_count = show_stat.get("show_zs_count", 0)
+                    show_pay_count = show_stat.get("show_pay_count", 0)
+                    wx_sn = (
+                        ContentUrl.split("&sn=")[1].split("&")[0]
+                        if ContentUrl
+                        else None
+                    )
+                    info_tuple = (
+                        gh_id,
+                        account_name,
+                        appMsgId,
+                        title,
+                        Type,
+                        createTime,
+                        updateTime,
+                        Digest,
+                        ItemIndex,
+                        ContentUrl,
+                        SourceUrl,
+                        CoverImgUrl,
+                        CoverImgUrl_1_1,
+                        CoverImgUrl_235_1,
+                        ItemShowType,
+                        IsOriginal,
+                        ShowDesc,
+                        ori_content,
+                        show_view_count,
+                        show_like_count,
+                        show_zs_count,
+                        show_pay_count,
+                        wx_sn,
+                        json.dumps(baseInfo, ensure_ascii=False),
+                    )
+                    try:
+                        insert_sql = f"""
+                                    INSERT INTO official_articles_v2
+                                    (ghId, accountName, appMsgId, title, Type, createTime, updateTime, Digest, ItemIndex, ContentUrl, SourceUrl, CoverImgUrl, CoverImgUrl_1_1, CoverImgUrl_255_1, ItemShowType, IsOriginal, ShowDesc, ori_content, show_view_count, show_like_count, show_zs_count, show_pay_count, wx_sn, baseInfo)
+                                    values
+                                    (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s);
+                                    """
+                        cls.db_client.update(sql=insert_sql, params=info_tuple)
+                    except Exception as e:
+                        try:
+                            update_sql = f"""
+                                    UPDATE official_articles_v2
+                                    SET show_view_count = %s, show_like_count=%s
+                                    WHERE wx_sn = %s;
+                                    """
+                            cls.db_client.update(
+                                sql=update_sql,
+                                params=(show_view_count, show_like_count, wx_sn),
+                            )
+                        except Exception as e:
+                            print("失败-{}".format(e))
+                            continue
+
+    @classmethod
+    def getAccountArticleList(cls, gh_id, last_update_time, cursor=None):
+        """
+        输入ghid获取账号的文章list
+        :return:
+        """
+        response = cls.spider.update_msg_list(ghId=gh_id, index=cursor)
+        msg_list = response.get("data", {}).get("data")
+        if msg_list:
+            last_article_in_this_msg = msg_list[-1]
+            last_time_stamp_in_this_msg = last_article_in_this_msg["AppMsg"][
+                "BaseInfo"
+            ]["UpdateTime"]
+            last_url = last_article_in_this_msg["AppMsg"]["DetailInfo"][0]["ContentUrl"]
+            # 校验是否抓到的是同一个账号
+            resdata = cls.spider.get_account_by_url(last_url)
+            check_name = resdata["data"].get("data", {}).get("account_name")
+            check_id = resdata["data"].get("data", {}).get("wx_gh")
+            if check_id == gh_id:
+                cls.updateMsgList(gh_id, check_name, msg_list)
+                if last_time_stamp_in_this_msg > last_update_time:
+                    next_cursor = response["data"]["next_cursor"]
+                    return cls.getAccountArticleList(
+                        gh_id=gh_id,
+                        last_update_time=last_update_time,
+                        cursor=next_cursor,
+                    )
+            else:
+                response = {
+                    "code": 1002,
+                    "info": "抓取时候账号校验失败",
+                    "error": None,
+                    "gh_id": gh_id,
+                    "time_stamp": datetime.now().__str__(),
+                }
+                print(response)
+        else:
+            response = {
+                "code": 1003,
+                "info": "账号为抓取到内容",
+                "error": None,
+                "gh_id": gh_id,
+                "time_stamp": datetime.now().__str__(),
+            }
+            print(response)
+
+    @classmethod
+    def checkEachAccount(cls, gh_id):
+        """
+        验证单个账号是否当天有更新
+        :param gh_id:
+        :return:
+        """
+        today_str = datetime.today().strftime("%Y-%m-%d")
+        today_date_time = datetime.strptime(today_str, "%Y-%m-%d")
+        today_timestamp = today_date_time.timestamp()
+        sql = f"""
+            select updateTime
+            from official_articles_v2
+            where ghId = '{gh_id}'
+            order by updateTime
+            desc
+        """
+        latest_update_time = cls.db_client.select(sql)[0][0]
+        # 判断该账号当天发布的文章是否被收集
+        if int(latest_update_time) > int(today_timestamp):
+            return True
+        else:
+            return False
+
+    @classmethod
+    def updateJob(cls):
+        """
+        更新文章任务
+        :return:
+        """
+        account_list = cls.getAccountIdDict()
+        for account_id in tqdm(account_list):
+            account_info = cls.findAccountLatestUpdateTime(account_id)
+            latest_time = account_info["update_time"]
+            try:
+                cls.getAccountArticleList(
+                    gh_id=account_id, last_update_time=latest_time
+                )
+            except Exception as e:
+                response = {
+                    "code": 1001,
+                    "info": "单个账号更新失败",
+                    "error": str(e),
+                    "time_stamp": datetime.now().__str__(),
+                }
+                print(response)
 
+    @classmethod
+    def checkJob(cls):
+        """
+        验证所有账号是否已经有更新数据
+        :return:
+        todo: 被封禁账号&&服务号需要做区分
+        """
+        account_dict = cls.getAccountIdDict()
+        error_account_list = []
+        for account_id in tqdm(account_dict):
+            if cls.checkEachAccount(account_id):
+                continue
+            else:
+                name = account_dict[account_id]
+                error_account_list.append(name)
+        if error_account_list:
+            cls.bot(error_account_list)
 
-def run():
-    gh_id_set = set()
-    for key in accountBaseInfo:
-        value = accountBaseInfo[key]['ghId']
-        gh_id_set.add(value)
 
-    for gh_id in tqdm(gh_id_set):
-        try:
-            update_articles(gh_id)
-        except Exception as e:
-            print(e)
-            continue
+if __name__ == "__main__":
+    UMD = UpdateMsgDaily()
 
+    schedule.every().day.at("21:00").do(UMD.updateJob)
+    schedule.every().day.at("21:30").do(UMD.checkJob)
 
-if __name__ == '__main__':
-    schedule.every().day.at("21:00").do(run)
     while True:
         schedule.run_pending()
-        print("定时任务正在执行")
         time.sleep(1)