소스 검색

auto publish toutiao articles

luojunhui 9 달 전
부모
커밋
f443a34eec
4개의 변경된 파일235개의 추가작업 그리고 76개의 파일을 삭제
  1. 6 0
      applications/const.py
  2. 9 0
      applications/functions.py
  3. 105 3
      tasks/update_article_info_from_aigc.py
  4. 115 73
      tasks/update_published_articles_read_detail.py

+ 6 - 0
applications/const.py

@@ -199,3 +199,9 @@ class ArticleCollectorConst:
     # 发布状态
     PUBLISHED_STATUS = 2
 
+    # 爬虫接口
+    ARTICLE_ILLEGAL_CODE = 25012
+    ARTICLE_DELETE_CODE = 25005
+    ARTICLE_SUCCESS_CODE = 0
+    ARTICLE_UNKNOWN_CODE = 10000
+

+ 9 - 0
applications/functions.py

@@ -287,3 +287,12 @@ class Functions(object):
             return res
         else:
             return {}
+
+    @classmethod
+    def extract_params_from_url(cls, url: str, key: str):
+        """
+        extract params from url
+        """
+        params = parse_qs(urlparse(url).query)
+        info = params.get(key, [])
+        return info[0] if info else None

+ 105 - 3
tasks/update_article_info_from_aigc.py

@@ -3,11 +3,16 @@
 """
 
 import json
+import time
 from typing import List, Dict
 
 from pymysql.cursors import DictCursor
 from tqdm import tqdm
 
+from applications import aiditApi
+from applications import bot
+from applications import log
+from applications import WeixinSpider
 from applications.const import ArticleCollectorConst
 from applications.db import DatabaseConnector
 from applications.functions import Functions
@@ -16,6 +21,7 @@ from config import denet_config, long_articles_config
 empty_dict = {}
 const = ArticleCollectorConst()
 functions = Functions()
+spider = WeixinSpider()
 
 
 class UpdateArticleInfoFromAIGC(object):
@@ -57,6 +63,23 @@ class UpdateArticleInfoFromAIGC(object):
         else:
             return empty_dict
 
+    def get_article_info_by_trace_id(self, trace_id: str) -> Dict:
+        """
+        通过trace_id来查询文章信息
+        """
+        select_sql = f"""
+            SELECT t1.gh_id, t1.account_name, t2.article_title
+            FROM long_articles_match_videos t1
+            JOIN long_articles_text t2
+            ON t1.content_id = t2.content_id
+            WHERE t1.trace_id = '{trace_id}';
+        """
+        article_info = self.long_articles_db_client.fetch(select_sql, cursor_type=DictCursor)
+        if article_info:
+            return article_info[0]
+        else:
+            return empty_dict
+
     def update_each_article(self, article: Dict):
         """
         更新每个文章的信息
@@ -73,21 +96,100 @@ class UpdateArticleInfoFromAIGC(object):
             root_source_id_list = [
                 functions.extract_path(item["productionPath"])["root_source_id"] for item in result_data
             ]
+            wx_sn = None
             if published_url:
-                status = const.SUCCESS_STATUS
+                response = spider.get_article_text(content_link=published_url)
+                code = response['code']
+                match code:
+                    case const.ARTICLE_SUCCESS_CODE:
+                        long_url = response['data']['data']['content_link']
+                        wx_sn = functions.extract_params_from_url(url=long_url, key="sn")
+                        status = const.SUCCESS_STATUS
+                    case const.ARTICLE_DELETE_CODE:
+                        log(
+                            task="update_article_info_from_aigc",
+                            function="update_each_article",
+                            status="fail",
+                            message=trace_id,
+                            data={
+                                "msg": "文章被删文",
+                                "publish_timestamp": publish_timestamp,
+                                "article_delete_timestamp": int(time.time()),
+                                "duration": int(time.time()) - publish_timestamp
+                            }
+                        )
+                        status = const.FAIL_STATUS
+
+                    case const.ARTICLE_ILLEGAL_CODE:
+                        log(
+                            task="update_article_info_from_aigc",
+                            function="update_each_article",
+                            status="fail",
+                            message=trace_id,
+                            data={
+                                "msg": "文章被判断违规",
+                                "publish_timestamp": publish_timestamp,
+                                "illegal_timestamp": int(time.time()),
+                                "duration": int(time.time()) - publish_timestamp
+                            }
+                        )
+                        article_info = self.get_article_info_by_trace_id(trace_id)
+                        if article_info:
+                            error_detail = response.get("msg")
+                            insert_sql = f"""
+                                INSERT IGNORE INTO illegal_articles 
+                                (gh_id, account_name, title, wx_sn, publish_date, illegal_reason)
+                                VALUES 
+                                (%s, %s, %s, %s, %s, %s);
+                            """
+
+                            affected_rows = self.long_articles_db_client.save(
+                                query=insert_sql,
+                                params=(
+                                    article_info['gh_id'],
+                                    article_info['account_name'],
+                                    article_info['article_title'],
+                                    wx_sn,
+                                    functions.timestamp_to_str(publish_timestamp),
+                                    error_detail
+                                )
+                            )
+                            if affected_rows:
+                                bot(
+                                    title="文章违规告警(new task)",
+                                    detail={
+                                        "account_name": article_info['account_name'],
+                                        "gh_id": article_info['gh_id'],
+                                        "title": article_info['article_title'],
+                                        "wx_sn": wx_sn,
+                                        "publish_date": functions.timestamp_to_str(publish_timestamp),
+                                        "error_detail": error_detail,
+                                    },
+                                    mention=False
+                                )
+                                aiditApi.delete_articles(
+                                    gh_id=article_info['gh_id'],
+                                    title=article_info['article_title']
+                                )
+                        status = const.FAIL_STATUS
+
+                    case _:
+                        status = const.FAIL_STATUS
+
             else:
                 if push_type == const.BULK_AUTO_PUSH:
                     status = const.INIT_STATUS
                 else:
                     status = const.SUCCESS_STATUS
+
             update_sql = f"""
                 UPDATE long_articles_published_trace_id
-                SET published_url = %s, status = %s, publish_timestamp = %s, crawler_channel_content_id = %s, root_source_id_list = %s
+                SET published_url = %s, status = %s, wx_sn = %s, publish_timestamp = %s, crawler_channel_content_id = %s, root_source_id_list = %s
                 WHERE trace_id = %s;
             """
             self.long_articles_db_client.save(
                 query=update_sql,
-                params=(published_url, status, publish_timestamp, channel_content_id, json.dumps(root_source_id_list), trace_id)
+                params=(published_url, status, wx_sn, publish_timestamp, channel_content_id, json.dumps(root_source_id_list), trace_id)
             )
         else:
             update_sql = f"""

+ 115 - 73
tasks/update_published_articles_read_detail.py

@@ -294,61 +294,102 @@ class UpdatePublishedArticlesReadDetail(object):
         更新单个账号的文章
         """
         gh_id = account_info['ghId']
+        account_name = account_info['name']
         select_sql = f"""
             SELECT published_url, publish_timestamp, root_source_id_list, create_timestamp
             FROM long_articles_published_trace_id
-            WHERE gh_id = '{gh_id}' AND publish_timestamp> UNIX_TIMESTAMP(DATE_SUB('{run_date}', INTERVAL 3 DAY));
+            WHERE gh_id = '{gh_id}' AND publish_timestamp > UNIX_TIMESTAMP(DATE_SUB('{run_date}', INTERVAL 3 DAY));
         """
         result = self.long_articles_db_client.fetch(select_sql, cursor_type=DictCursor)
         for article in result:
             published_url = article['published_url']
             article_info = spider.get_article_text(content_link=published_url, is_cache=False, is_count=True)
             response_code = article_info['code']
-            if response_code == 0:
-                response_data = article_info['data']['data']
-                title = response_data['title']
-                article_url = response_data['content_link']
-                show_view_count = response_data['view_count']
-                show_like_count = response_data['like_count']
-                show_zs_count = 0
-                show_pay_count = 0
-                wx_sn = article_url.split("&sn=")[1].split("&")[0] if article_url else None
-                app_msg_id = article_url.split("&mid=")[1].split("&")[0] if article_url else None
-                status = account_info['using_status']
-                info_tuple = (
-                    gh_id,
-                    account_info['name'],
-                    app_msg_id,
-                    title,
-                    "9",
-                    article['create_timestamp'],
-                    response_data['update_timestamp'],
-                    None,
-                    response_data['item_index'],
-                    response_data['content_link'],
-                    None,
-                    None,
-                    None,
-                    None,
-                    None,
-                    response_data.get("is_original", None),
-                    None,
-                    None,
-                    show_view_count,
-                    show_like_count,
-                    show_zs_count,
-                    show_pay_count,
-                    wx_sn,
-                    None,
-                    functions.str_to_md5(title),
-                    status
-                )
-                self.insert_each_article(
-                    info_tuple=info_tuple,
-                    show_view_count=show_view_count,
-                    show_like_count=show_like_count,
-                    wx_sn=wx_sn
-                )
+            match response_code:
+                case const.ARTICLE_SUCCESS_CODE:
+                    response_data = article_info['data']['data']
+                    title = response_data['title']
+                    article_url = response_data['content_link']
+                    show_view_count = response_data['view_count']
+                    show_like_count = response_data['like_count']
+                    show_zs_count = 0
+                    show_pay_count = 0
+                    wx_sn = article_url.split("&sn=")[1].split("&")[0] if article_url else None
+                    app_msg_id = article_url.split("&mid=")[1].split("&")[0] if article_url else None
+                    status = account_info['using_status']
+                    info_tuple = (
+                        gh_id,
+                        account_info['name'],
+                        app_msg_id,
+                        title,
+                        "9",
+                        article['create_timestamp'],
+                        response_data['update_timestamp'],
+                        None,
+                        response_data['item_index'],
+                        response_data['content_link'],
+                        None,
+                        None,
+                        None,
+                        None,
+                        None,
+                        response_data.get("is_original", None),
+                        None,
+                        None,
+                        show_view_count,
+                        show_like_count,
+                        show_zs_count,
+                        show_pay_count,
+                        wx_sn,
+                        None,
+                        functions.str_to_md5(title),
+                        status
+                    )
+                    self.insert_each_article(
+                        info_tuple=info_tuple,
+                        show_view_count=show_view_count,
+                        show_like_count=show_like_count,
+                        wx_sn=wx_sn
+                    )
+                case const.ARTICLE_DELETE_CODE:
+                    log(
+                        task="updatePublishedMsgDaily",
+                        function="update_account_by_aigc",
+                        message="文章被删除",
+                        data={
+                            "ghId": gh_id,
+                            "publishedUrl": published_url
+                        }
+                    )
+                case const.ARTICLE_ILLEGAL_CODE:
+                    error_detail = article_info.get("msg")
+                    insert_sql = f"""
+                        INSERT IGNORE INTO illegal_articles 
+                        (gh_id, account_name, title, wx_sn, publish_date, illegal_reason)
+                        VALUES 
+                        (%s, %s, %s, %s, %s, %s);
+                    """
+                    affected_rows = self.long_articles_db_client.save(
+                        query=insert_sql,
+                        params=(gh_id, account_name, title, wx_sn, publish_date, error_detail),
+                    )
+                    if affected_rows:
+                        bot(
+                            title="文章违规告警",
+                            detail={
+                                "account_name": account_name,
+                                "gh_id": gh_id,
+                                "title": title,
+                                "wx_sn": wx_sn.decode("utf-8"),
+                                "publish_date": str(publish_date),
+                                "error_detail": error_detail,
+                            },
+                            mention=False
+                        )
+                        aiditApi.delete_articles(
+                            gh_id=gh_id,
+                            title=title
+                        )
 
     def get_account_info(self, gh_id: str) -> int:
         """
@@ -494,7 +535,7 @@ class UpdatePublishedArticlesReadDetail(object):
         subscription_accounts = [i for i in account_list if i['account_type'] in const.SUBSCRIBE_TYPE_SET]
         success_count = 0
         fail_count = 0
-        for account in tqdm(subscription_accounts):
+        for account in tqdm(subscription_accounts[:10]):
             try:
                 self.process_single_account(account_info=account, run_date=biz_date)
                 success_count += 1
@@ -525,14 +566,14 @@ class UpdatePublishedArticlesReadDetail(object):
                     "failRate": fail_count / (success_count + fail_count)
                 }
             )
-        bot(
-            title="更新每日发布文章任务完成通知",
-            detail={
-                "msg": "订阅号更新完成",
-                "finish_time": datetime.today().__str__()
-            },
-            mention=False
-        )
+        # bot(
+        #     title="更新每日发布文章任务完成通知",
+        #     detail={
+        #         "msg": "订阅号更新完成",
+        #         "finish_time": datetime.today().__str__()
+        #     },
+        #     mention=False
+        # )
 
         # 服务号
         server_accounts = [i for i in account_list if i['account_type'] == const.SERVICE_TYPE]
@@ -542,14 +583,14 @@ class UpdatePublishedArticlesReadDetail(object):
                 time.sleep(1)
             except Exception as e:
                 print(e)
-        bot(
-            title="更新每日发布文章任务完成通知",
-            detail={
-                "msg": "服务号更新完成",
-                "finish_time": datetime.today().__str__()
-            },
-            mention=False
-        )
+        # bot(
+        #     title="更新每日发布文章任务完成通知",
+        #     detail={
+        #         "msg": "服务号更新完成",
+        #         "finish_time": datetime.today().__str__()
+        #     },
+        #     mention=False
+        # )
 
     def check_job(self, biz_date: str = None):
         """
@@ -562,13 +603,13 @@ class UpdatePublishedArticlesReadDetail(object):
         subscription_accounts = [i for i in account_list if i['account_type'] in const.SUBSCRIBE_TYPE_SET]
         fail_list = []
         # check and rework if fail
-        for sub_item in tqdm(subscription_accounts):
+        for sub_item in tqdm(subscription_accounts[:10]):
             res = self.check_single_account(sub_item)
             if not res:
                 self.process_single_account(sub_item, biz_date)
 
         # check whether success and bot if fails
-        for sub_item in tqdm(subscription_accounts):
+        for sub_item in tqdm(subscription_accounts[:10]):
             res = self.check_single_account(sub_item)
             if not res:
                 # 去掉三个不需要查看的字段
@@ -578,14 +619,15 @@ class UpdatePublishedArticlesReadDetail(object):
                 fail_list.append(sub_item)
         if fail_list:
             try:
-                bot(
-                    title="更新当天发布文章,存在未更新的账号",
-                    detail={
-                        "columns": generate_bot_columns(),
-                        "rows": fail_list
-                    },
-                    table=True
-                )
+                print(fail_list)
+                # bot(
+                #     title="更新当天发布文章,存在未更新的账号",
+                #     detail={
+                #         "columns": generate_bot_columns(),
+                #         "rows": fail_list
+                #     },
+                #     table=True
+                # )
             except Exception as e:
                 print("Timeout Error: {}".format(e))
         else: