浏览代码

article cold start task improve

luojunhui 1 月之前
父节点
当前提交
fa554f726c

+ 45 - 42
account_cold_start_daily.py

@@ -7,13 +7,48 @@ import traceback
 from argparse import ArgumentParser
 
 from applications import longArticlesMySQL, bot
-from cold_start.crawler.weixinCategoryCrawler import weixinCategory
+from tasks.crawler_tasks.crawler_articles import CrawlerDailyScrapeAccountArticles
+from tasks.crawler_tasks.crawler_articles import CrawlerAssociationAccountArticles
 from cold_start.publish.publishCategoryArticles import CategoryColdStartTask
 from cold_start.filter.title_similarity_task import ColdStartTitleSimilarityTask
 
-DEFAULT_CATEGORY_LIST = ['1030-手动挑号', 'account_association']
+DEFAULT_METHOD_LIST = ['account_association']
 
 
+def crawler_task(method_list, date_str):
+    """
+    :return:
+    """
+    # 初始化category抓取类
+    try:
+        daily_scrape_tasks = CrawlerDailyScrapeAccountArticles()
+        daily_scrape_tasks.deal(method_list=method_list)
+
+        association_scrape_tasks = CrawlerAssociationAccountArticles()
+        association_scrape_tasks.deal(date_str=date_str)
+
+        # 抓取完成之后,给抓取到的标题进行相似度打分
+        cold_start_title_similarity_task = ColdStartTitleSimilarityTask()
+        cold_start_title_similarity_task.init_database()
+        cold_start_title_similarity_task.run(meta_source='article')
+
+        bot(
+            title="账号冷启动任务,抓取完成",
+            detail={
+                "finish_time": datetime.datetime.today().strftime('%Y-%m-%d %H:%M:%S'),
+                "method": method_list
+            },
+            mention=False
+        )
+    except Exception as e:
+        bot(
+            title="账号抓取冷启动任务,抓取失败",
+            detail={
+                "error": str(e),
+                "error_msg": traceback.format_exc()
+            }
+        )
+
 
 class AccountColdStartDailyTask(object):
     """
@@ -43,37 +78,6 @@ class AccountColdStartDailyTask(object):
             )
             return False
 
-    def crawler_task(self, category_list, date_str):
-        """
-        :return:
-        """
-        # 初始化category抓取类
-        try:
-            weixin_category_crawler = weixinCategory(db_client=self.db_client)
-            weixin_category_crawler.deal(category_list=category_list, date_str=date_str)
-
-            # 抓取完成之后,给抓取到的标题进行相似度打分
-            cold_start_title_similarity_task = ColdStartTitleSimilarityTask()
-            cold_start_title_similarity_task.init_database()
-            cold_start_title_similarity_task.run(meta_source='article')
-
-            bot(
-                title="账号冷启动任务,抓取完成",
-                detail={
-                    "finish_time": datetime.datetime.today().strftime('%Y-%m-%d %H:%M:%S'),
-                    "category": category_list
-                },
-                mention=False
-            )
-        except Exception as e:
-            bot(
-                title="账号抓取冷启动任务,抓取失败",
-                detail={
-                    "error": str(e),
-                    "error_msg": traceback.format_exc()
-                }
-            )
-
     def publish_article_task(self, category_list, article_source):
         """
         将账号文章发布到aigc抓取计划,并且绑定生成计划
@@ -105,21 +109,18 @@ class AccountColdStartDailyTask(object):
             )
 
 
-def main(date_str, category_list=None, article_source=None):
+def main(date_str, method_list=None, article_source=None):
     """
     main job, use crontab to do job daily
     :return:
     """
-    if not category_list:
-        category_list = DEFAULT_CATEGORY_LIST
+    if not method_list:
+        method_list = DEFAULT_METHOD_LIST
     if not article_source:
         article_source = 'weixin'
     task = AccountColdStartDailyTask()
     if task.init_db():
-        task.publish_article_task(category_list=category_list, article_source=article_source)
-
-        if article_source == 'weixin':
-            task.crawler_task(category_list=category_list, date_str=date_str)
+        task.publish_article_task(category_list=method_list, article_source=article_source)
 
 
 if __name__ == '__main__':
@@ -135,11 +136,13 @@ if __name__ == '__main__':
     # 执行头条发布
     main(
         date_str=run_date,
-        category_list=['history', 'tech', 'finance', 'entertainment'],
+        method_list=['history', 'tech', 'finance', 'entertainment'],
         article_source='toutiao'
     )
-
     # 执行微信抓取发布
     main(date_str=run_date)
 
+    # 执行抓取
+    crawler_task(
+        method_list=DEFAULT_METHOD_LIST, date_str=run_date)
 

+ 4 - 1
applications/pipeline/__init__.py

@@ -2,4 +2,7 @@
 @author: luojunhui
 """
 from .account_pipeline import scrape_account_entities_process
-from .crawler_pipeline import scrape_video_entities_process
+from .crawler_pipeline import scrape_article_entities_process
+from .crawler_pipeline import scrape_video_entities_process
+from .crawler_pipeline import whether_duplicate_article_title
+from .crawler_pipeline import whether_title_sensitive

+ 30 - 0
applications/pipeline/crawler_pipeline.py

@@ -86,3 +86,33 @@ def scrape_video_entities_process(video_item, db_client) -> dict:
         return video_item
     else:
         return empty_dict
+
+
+def whether_duplicate_article_title(article_title: str, db_client) -> bool:
+    """
+    whether duplicate video title
+    """
+    sql = f"""
+        select article_id from crawler_meta_article
+        where title = %s;
+    """
+    duplicate_id = db_client.fetch(query=sql, params=(article_title,))
+    if duplicate_id:
+        return True
+
+    return False
+
+
+def scrape_article_entities_process(article_item, db_client) -> dict:
+    """
+    article crawler pipeline
+    """
+    article_title = article_item["article_title"]
+    if whether_title_sensitive(article_title):
+        article_item['title_sensitive'] = 1
+        return article_item
+
+    if whether_duplicate_article_title(article_title, db_client):
+        return empty_dict
+
+    return article_item

+ 71 - 0
applications/utils/common.py

@@ -4,6 +4,7 @@
 
 import hashlib
 
+from datetime import datetime, timezone
 from requests import RequestException
 from urllib.parse import urlparse, parse_qs
 from tenacity import (
@@ -92,3 +93,73 @@ def extract_root_source_id(path: str) -> dict:
         return res
     else:
         return {}
+
+
+def show_desc_to_sta(show_desc):
+
+    def decode_show_v(show_v):
+        """
+
+        :param show_v:
+        :return:
+        """
+        foo = show_v.replace('千', 'e3').replace('万', 'e4').replace('亿', 'e8')
+        foo = eval(foo)
+        return int(foo)
+
+    def decode_show_k(show_k):
+        """
+
+        :param show_k:
+        :return:
+        """
+        this_dict = {
+            '阅读': 'show_view_count',  # 文章
+            '看过': 'show_view_count',  # 图文
+            '观看': 'show_view_count',  # 视频
+            '赞': 'show_like_count',
+            '付费': 'show_pay_count',
+            '赞赏': 'show_zs_count',
+        }
+        if show_k not in this_dict:
+            print(f'error from decode_show_k, show_k not found: {show_k}')
+        return this_dict.get(show_k, 'show_unknown')
+
+    show_desc = show_desc.replace('+', '')
+    sta = {}
+    for show_kv in show_desc.split('\u2004\u2005'):
+        if not show_kv:
+            continue
+        show_k, show_v = show_kv.split('\u2006')
+        k = decode_show_k(show_k)
+        v = decode_show_v(show_v)
+        sta[k] = v
+    res = {
+        'show_view_count': sta.get('show_view_count', 0),
+        'show_like_count': sta.get('show_like_count', 0),
+        'show_pay_count': sta.get('show_pay_count', 0),
+        'show_zs_count': sta.get('show_zs_count', 0),
+    }
+    return res
+
+
+def generate_gzh_id(url):
+    biz = url.split("biz=")[1].split("&")[0]
+    idx = url.split("&idx=")[1].split("&")[0]
+    sn = url.split("&sn=")[1].split("&")[0]
+    url_bit = "{}-{}-{}".format(biz, idx, sn).encode()
+    md5_hash = hashlib.md5()
+    md5_hash.update(url_bit)
+    md5_value = md5_hash.hexdigest()
+    return md5_value
+
+
+
+def timestamp_to_str(timestamp, string_format='%Y-%m-%d %H:%M:%S') -> str:
+    """
+    :param string_format:
+    :param timestamp:
+    """
+    dt_object = datetime.utcfromtimestamp(timestamp).replace(tzinfo=timezone.utc).astimezone()
+    date_string = dt_object.strftime(string_format)
+    return date_string

+ 0 - 1
applications/utils/item.py

@@ -2,7 +2,6 @@
 @author: luojunhui
 """
 
-
 import time
 
 default_single_video_table_fields = {

+ 0 - 277
cold_start/crawler/weixinCategoryCrawler.py

@@ -1,277 +0,0 @@
-"""
-@author: luojunhui
-抓取全局品类文章
-"""
-import json
-import time
-
-from tqdm import tqdm
-from pymysql.cursors import DictCursor
-
-from applications import WeixinSpider, Functions, log
-from cold_start.filter import article_crawler_duplicate_filter
-from config import apolloConfig
-
-# 常量
-ACCOUNT_GOOD_STATUS = 1
-
-# 账号是否每日抓取
-ACCOUNT_DAILY_SCRAPE = 1
-ACCOUNT_NOT_DAILY_SCRAPE = 0
-
-# 默认值
-DEFAULT_VIEW_COUNT = 0
-DEFAULT_LIKE_COUNT = 0
-DEFAULT_ARTICLE_STATUS = 1
-DEFAULT_TIMESTAMP = 1717171200
-
-# 标题sensitivity
-TITLE_SENSITIVE = 1
-TITLE_NOT_SENSITIVE = 0
-
-config = apolloConfig()
-sensitive_word_list = json.loads(config.getConfigValue("sensitive_word_list"))
-
-
-def whether_title_sensitive(title: str) -> bool:
-    """
-    : param title:
-    判断视频是否的标题是否包含敏感词
-    """
-    for word in sensitive_word_list:
-        if word in title:
-            return True
-    return False
-
-
-class weixinCategory(object):
-    """
-    微信全局品类账号抓取
-    """
-
-    def __init__(self, db_client):
-        self.db_client_lam = db_client
-        self.spider = WeixinSpider()
-        self.function = Functions()
-
-    def get_account_list(self, account_category):
-        """
-        获取账号
-        :param account_category 品类
-        :return:
-        """
-        sql = f"""
-            select gh_id, account_source, account_name, account_category, latest_update_time
-            from long_articles_accounts 
-            where account_category = '{account_category}' and is_using = {ACCOUNT_GOOD_STATUS} and daily_scrape = {ACCOUNT_DAILY_SCRAPE};
-            """
-        account_tuple = self.db_client_lam.select(sql)
-        result = [
-            {
-                "gh_id": i[0],
-                "platform": i[1],
-                "account_name": i[2],
-                "category": i[3],
-                "latest_timestamp": i[4],
-            }
-            for i in account_tuple
-        ]
-        return result
-
-    def get_association_account_list(self, date_str):
-        """
-        获取账号联想的轮询账号
-        """
-        group_id = date_str[-1]
-        sql = f"""
-            select account_id, gh_id, account_name, latest_update_time
-            from long_articles_accounts
-            where account_category = 'account_association' and is_using = {ACCOUNT_DAILY_SCRAPE} and daily_scrape = {ACCOUNT_NOT_DAILY_SCRAPE};
-        """
-        account_list = self.db_client_lam.select(sql, cursor_type=DictCursor)
-        today_crawler_account_list = [i for i in account_list if str(i['account_id'])[-1] == group_id]
-        return today_crawler_account_list
-
-    def insert_data_into_db(self, gh_id, category, article_list):
-        """
-        将数据更新到数据库
-        :return:
-
-        """
-        success_records = []
-        for article_obj in article_list:
-            detail_article_list = article_obj["AppMsg"]["DetailInfo"]
-            for obj in detail_article_list:
-                try:
-                    # 判断文章是否存在相同的标题
-                    if article_crawler_duplicate_filter(
-                            new_article_title=obj["Title"], db_client=self.db_client_lam
-                    ):
-                        log(
-                            function="weixinCategory",
-                            task="weixinCategory",
-                            message="文章去重",
-                            data={"title": obj["Title"]}
-                        )
-                        continue
-
-                    # 判断标题是否包含敏感词
-                    title_sensitivity = TITLE_SENSITIVE if whether_title_sensitive(obj["Title"]) else TITLE_NOT_SENSITIVE
-                    show_stat = self.function.show_desc_to_sta(obj["ShowDesc"])
-                    show_view_count = show_stat.get("show_view_count", DEFAULT_VIEW_COUNT)
-                    show_like_count = show_stat.get("show_like_count", DEFAULT_LIKE_COUNT)
-                    unique_idx = self.function.generateGzhId(obj["ContentUrl"])
-                    insert_sql = f"""
-                        insert into crawler_meta_article
-                        (
-                         platform, mode, category, out_account_id, article_index, title, link, read_cnt, like_cnt,
-                         description, publish_time, crawler_time, status, unique_index, llm_sensitivity, title_sensitivity
-                        )
-                        VALUES 
-                        (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s);
-                    """
-                    self.db_client_lam.update(
-                        sql=insert_sql,
-                        params=(
-                            "weixin",
-                            "account",
-                            category,
-                            gh_id,
-                            obj['ItemIndex'],
-                            obj["Title"],
-                            obj["ContentUrl"],
-                            show_view_count,
-                            show_like_count,
-                            obj["Digest"],
-                            obj["send_time"],
-                            int(time.time()),
-                            DEFAULT_ARTICLE_STATUS,
-                            unique_idx,
-                            obj.get("llm_sensitivity", -1),
-                            title_sensitivity
-                        ),
-                    )
-                    success_records.append({
-                        'unique_index': unique_idx, 'title': obj['Title']
-                    })
-                except Exception as e:
-                    print(e)
-        return success_records
-
-    def update_latest_account_timestamp(self, gh_id):
-        """
-        更新账号的最新时间戳
-        :return:
-        """
-        select_sql = f"""
-            SELECT publish_time 
-            From crawler_meta_article 
-            WHERE out_account_id = '{gh_id}'
-            ORDER BY publish_time DESC LIMIT 1;
-        """
-        result = self.db_client_lam.select(select_sql)
-        time_stamp = result[0][0]
-        dt_str = self.function.timestamp_to_str(time_stamp)
-        update_sql = f"""
-            update long_articles_accounts
-            set latest_update_time = %s
-            where gh_id = %s;
-        """
-        self.db_client_lam.update(sql=update_sql, params=(dt_str, gh_id))
-
-    def update_each_account(self, gh_id, category, latest_time_stamp, index=None):
-        """
-        更新账号文章
-        :return:
-        """
-        response = self.spider.update_msg_list(ghId=gh_id, index=index)
-        msg_list = response.get("data", {}).get("data")
-        if msg_list:
-            last_article_in_this_msg = msg_list[-1]
-            success_records = self.insert_data_into_db(
-                gh_id=gh_id, category=category, article_list=msg_list
-            )
-            last_time_stamp_in_this_msg = last_article_in_this_msg["AppMsg"]["BaseInfo"]["UpdateTime"]
-            if latest_time_stamp < last_time_stamp_in_this_msg:
-                next_cursor = response["data"]["next_cursor"]
-                return success_records + self.update_each_account(
-                    gh_id=gh_id,
-                    latest_time_stamp=latest_time_stamp,
-                    category=category,
-                    index=next_cursor,
-                )
-            else:
-                # 更新最近抓取时间
-                self.update_latest_account_timestamp(gh_id=gh_id)
-                print("账号时间更新成功")
-                return success_records
-        else:
-            print("No more data")
-            return []
-
-    def crawler_each_category(self, account_list, category):
-        """
-        抓取每个品类
-        :return:
-        """
-        success_records = []
-        for account in tqdm(account_list, desc="crawler_each_category"):
-            try:
-                gh_id = account['gh_id']
-                try:
-                    timestamp = int(account['latest_timestamp'].timestamp())
-                except Exception as e:
-                    timestamp = DEFAULT_TIMESTAMP
-                success_records += self.update_each_account(
-                    gh_id=gh_id,
-                    category=category,
-                    latest_time_stamp=timestamp
-                )
-                print("success")
-            except Exception as e:
-                print("fail because of {}".format(e))
-
-    def deal(self, category_list, date_str):
-        """
-        :param category_list:
-        :param date_str: YYYY-MM-DD
-        :return:
-        """
-        # daily 品类账号抓取
-        for category in category_list:
-            account_list = self.get_account_list(category)
-            self.crawler_each_category(account_list=account_list, category=category)
-
-        # 账号联想账号轮询抓取
-        association_account_list = self.get_association_account_list(date_str)
-        self.crawler_each_category(account_list=association_account_list, category="account_association")
-
-    def deal_accounts(self, account_list):
-        """
-        input account list
-        :param account_list: 具体账号抓取,只抓一页
-        :return:
-        """
-        account_tuple = tuple(account_list)
-        sql = f"""
-            SELECT gh_id, account_name, account_category, latest_update_time 
-            FROM long_articles_accounts
-            WHERE account_name in {account_tuple};
-        """
-        response = self.db_client_lam.select(sql)
-        for account in tqdm(response):
-            try:
-                gh_id = account[0]
-                category = account[2]
-                try:
-                    latest_timestamp = account[3].timestamp()
-                except Exception as e:
-                    print(e)
-                    latest_timestamp = DEFAULT_TIMESTAMP
-                self.update_each_account(
-                    gh_id=gh_id,
-                    category=category,
-                    latest_time_stamp=latest_timestamp
-                )
-            except Exception as e:
-                print(e)

+ 45 - 70
cold_start/publish/publishCategoryArticles.py

@@ -13,7 +13,7 @@ from applications import aiditApi, log, bot, llm_sensitivity
 from config import apolloConfig
 
 apollo = apolloConfig()
-DAILY_CRAWLER_MAX_NUM = 1000
+DAILY_CRAWLER_MAX_NUM = 100
 SIMILARITY_MIN_SCORE = 0.4
 TITLE_NOT_SENSITIVE = 0
 
@@ -86,14 +86,18 @@ class CategoryColdStartTask(object):
 
         """
         sql = f"""
-        SELECT 
-            article_id, out_account_id, article_index, title, link, read_cnt, status, llm_sensitivity, score, category_by_ai
-        FROM
-            crawler_meta_article
-        WHERE 
-            category = "{category}" and platform = "{article_source}" and title_sensitivity = {TITLE_NOT_SENSITIVE}
-        ORDER BY score DESC;
-        """
+            select 
+                article_id, title, link,  llm_sensitivity, score, category_by_ai
+            from crawler_meta_article t1 
+            join crawler_meta_article_accounts_read_avg t2 on t1.out_account_id = t2.gh_id and t1.article_index = t2.position
+            where category = '{category}' 
+                and platform = '{article_source}' 
+                and title_sensitivity = {TITLE_NOT_SENSITIVE} 
+                and t1.status = {self.INIT_STATUS}
+                and t1.read_cnt / t2.read_avg >= {self.READ_TIMES_THRESHOLD}
+                and t1.read_cnt >= {self.READ_THRESHOLD}
+            ORDER BY score DESC;
+            """
         article_list = self.db_client.select(sql)
         log(
             task="category_publish_task",
@@ -105,8 +109,7 @@ class CategoryColdStartTask(object):
             }
         )
         article_df = DataFrame(article_list,
-                               columns=['article_id', 'gh_id', 'position', 'title', 'link', 'read_cnt', 'status',
-                                        'llm_sensitivity', 'score', 'category_by_ai'])
+                               columns=['article_id', 'title', 'link', 'llm_sensitivity', 'score', 'category_by_ai'])
         return article_df
 
     def filter_each_category(self, category):
@@ -120,12 +123,9 @@ class CategoryColdStartTask(object):
             if title_list:
                 # update
                 update_sql = f"""
-                UPDATE 
-                    crawler_meta_article
-                SET
-                    status = %s
-                WHERE
-                    title in %s and status = %s;
+                update crawler_meta_article
+                set status = %s
+                where title in %s and status = %s;
                 """
                 affected_rows = self.db_client.update(
                     sql=update_sql,
@@ -164,55 +164,36 @@ class CategoryColdStartTask(object):
         :return:
         """
         update_sql = f"""
-                    UPDATE 
-                        crawler_meta_article
-                    SET
-                        status = %s
-                    WHERE
-                        article_id in %s and status = %s;
-                    """
+                    update crawler_meta_article
+                    set status = %s
+                    where article_id in %s and status = %s;
+        """
         affect_rows = self.db_client.update(
             sql=update_sql,
             params=(self.PUBLISHED_STATUS, tuple(article_id_list), self.INIT_STATUS)
         )
-        if affect_rows != len(article_id_list):
-            bot(
-                title="品类冷启任务中,出现更新状文章状态失败异常",
-                detail={
-                    "affected_rows": affect_rows,
-                    "task_rows": len(article_id_list)
-                }
-            )
+        # if affect_rows != len(article_id_list):
+        #     bot(
+        #         title="品类冷启任务中,出现更新状文章状态失败异常",
+        #         detail={
+        #             "affected_rows": affect_rows,
+        #             "task_rows": len(article_id_list)
+        #         }
+        #     )
 
     def filter_weixin_articles(self, articles_df, category):
         """
         微信抓取文章过滤漏斗
         """
-        articles_df['average_read'] = articles_df.groupby(['gh_id', 'position'])['read_cnt'].transform('mean')
-        articles_df['read_times'] = articles_df['read_cnt'] / articles_df['average_read']
         total_length = articles_df.shape[0]
-        # 第0层过滤已经发布的文章
-        filter_df = articles_df[articles_df['status'] == self.INIT_STATUS]
-        length_level0 = filter_df.shape[0]
-
-        # 第一层漏斗通过阅读均值倍数过滤
-        filter_df = filter_df[filter_df['read_times'] >= self.READ_TIMES_THRESHOLD]
-        length_level1 = filter_df.shape[0]
-
-        # 第二层漏斗通过阅读量过滤
-        filter_df = filter_df[
-            filter_df['read_cnt'] >= self.READ_THRESHOLD
-            ]
-        length_level2 = filter_df.shape[0]
-
-        # 第三层漏斗通过标题长度过滤
-        filter_df = filter_df[
-            (filter_df['title'].str.len() >= self.LIMIT_TITLE_LENGTH)
-            & (filter_df['title'].str.len() <= self.TITLE_LENGTH_MAX)
+        # 第1层漏斗通过标题长度过滤
+        filter_df = articles_df[
+            (articles_df['title'].str.len() >= self.LIMIT_TITLE_LENGTH)
+            & (articles_df['title'].str.len() <= self.TITLE_LENGTH_MAX)
             ]
-        length_level3 = filter_df.shape[0]
+        length_level1 = filter_df.shape[0]
 
-        # 第层通过敏感词过滤
+        # 第2层通过敏感词过滤
         filter_df = filter_df[
             (~filter_df['title'].str.contains('农历'))
             & (~filter_df['title'].str.contains('太极'))
@@ -227,23 +208,23 @@ class CategoryColdStartTask(object):
             & (~filter_df['title'].str.contains('蔡英文'))
             & (~filter_df['title'].str.contains('中国'))
             ]
-        length_level4 = filter_df.shape[0]
-        # 第层通过LLM敏感度过滤
+        length_level2 = filter_df.shape[0]
+        # 第3层通过LLM敏感度过滤
         filter_df = filter_df[
             ~(filter_df['llm_sensitivity'] > 0)
         ]
-        length_level5 = filter_df.shape[0]
+        length_level3 = filter_df.shape[0]
 
-        # 第层通过相关性分数过滤
+        # 第4层通过相关性分数过滤
         filter_df = filter_df[filter_df['score'] > SIMILARITY_MIN_SCORE]
-        length_level6 = filter_df.shape[0]
+        length_level4 = filter_df.shape[0]
 
         log(
             task="category_publish_task",
             function="publish_filter_articles",
             message="过滤后文章总数",
             data={
-                "total_articles": length_level5,
+                "total_articles": length_level4,
                 "category": category
             }
         )
@@ -251,21 +232,15 @@ class CategoryColdStartTask(object):
             title="冷启任务发布通知",
             detail={
                 "总文章数量": total_length,
-                "通过已经发布状态过滤": "过滤数量: {}    剩余数量: {}".format(
-                    total_length - length_level0, length_level0),
-                "通过阅读均值倍数过滤": "过滤数量: {}    剩余数量: {}".format(
-                    length_level0 - length_level1, length_level1),
-                "通过阅读量过滤": "过滤数量: {}    剩余数量: {}".format(
-                    length_level1 - length_level2, length_level2),
                 "通过标题长度过滤": "过滤数量: {}    剩余数量: {}".format(
-                    length_level2 - length_level3, length_level3),
+                    total_length - length_level1, length_level1),
                 "通过敏感词过滤": "过滤数量: {}    剩余数量: {}".format(
-                    length_level3 - length_level4, length_level4),
+                    length_level1 - length_level2, length_level2),
                 "通过LLM敏感度过滤": "过滤数量: {}    剩余数量: {}".format(
-                    length_level4 - length_level5, length_level5
+                    length_level2 - length_level3, length_level3
                 ),
                 "通过相关性分数过滤": "过滤数量: {}    剩余数量: {}".format(
-                    length_level5 - length_level6, length_level6
+                    length_level3 - length_level4, length_level4
                 ),
                 "品类": category,
                 "阅读均值倍数阈值": self.READ_TIMES_THRESHOLD,

+ 2 - 1
requirements.txt

@@ -27,4 +27,5 @@ scikit-learn~=1.6.1
 google~=3.0.0
 cffi~=1.17.1
 lxml~=5.3.2
-scipy~=1.15.2
+scipy~=1.15.2
+pydantic~=2.10.6

+ 2 - 0
tasks/crawler_tasks/crawler_articles/__init__.py

@@ -0,0 +1,2 @@
+from .gzh_article_crawler import CrawlerAssociationAccountArticles
+from .gzh_article_crawler import CrawlerDailyScrapeAccountArticles

+ 275 - 0
tasks/crawler_tasks/crawler_articles/gzh_article_crawler.py

@@ -0,0 +1,275 @@
+"""
+@author: luojunhui
+抓取全局品类文章
+"""
+
+import datetime
+import time
+import traceback
+from typing import Dict, List
+
+from tqdm import tqdm
+from pymysql.cursors import DictCursor
+
+from applications.db import DatabaseConnector
+from applications.pipeline import (
+    whether_title_sensitive,
+    whether_duplicate_article_title,
+)
+from applications.utils import show_desc_to_sta, generate_gzh_id, timestamp_to_str
+from cold_start.crawler.wechat import get_article_list_from_account
+from config import long_articles_config
+
+
+class Const:
+    ACCOUNT_GOOD_STATUS = 1
+
+    # 账号是否每日抓取
+    ACCOUNT_DAILY_SCRAPE = 1
+    ACCOUNT_NOT_DAILY_SCRAPE = 0
+
+    # 默认值
+    DEFAULT_VIEW_COUNT = 0
+    DEFAULT_LIKE_COUNT = 0
+    DEFAULT_ARTICLE_STATUS = 1
+    DEFAULT_TIMESTAMP = 1717171200
+
+    # 标题sensitivity
+    TITLE_SENSITIVE = 1
+    TITLE_NOT_SENSITIVE = 0
+
+
+class GzhArticleCrawler(Const):
+
+    def __init__(self):
+        self.db_client = DatabaseConnector(long_articles_config)
+        self.db_client.connect()
+
+    def get_latest_timestamp(self, account: dict) -> int:
+        try:
+            timestamp = int(account["latest_update_time"].timestamp())
+        except Exception as e:
+            timestamp = self.DEFAULT_TIMESTAMP
+        return timestamp
+
+    def insert_article_into_meta(self, gh_id, account_mode, article_list):
+        """
+        将数据更新到数据库
+        :return:
+        """
+        for article_obj in article_list:
+            detail_article_list = article_obj["AppMsg"]["DetailInfo"]
+            for obj in detail_article_list:
+                try:
+                    if whether_duplicate_article_title(obj["Title"], self.db_client):
+                        continue
+
+                    # 判断标题是否包含敏感词
+                    title_sensitivity = (
+                        self.TITLE_SENSITIVE
+                        if whether_title_sensitive(obj["Title"])
+                        else self.TITLE_NOT_SENSITIVE
+                    )
+                    show_stat = show_desc_to_sta(obj["ShowDesc"])
+                    show_view_count = show_stat.get(
+                        "show_view_count", self.DEFAULT_VIEW_COUNT
+                    )
+                    show_like_count = show_stat.get(
+                        "show_like_count", self.DEFAULT_LIKE_COUNT
+                    )
+                    unique_idx = generate_gzh_id(obj["ContentUrl"])
+                    insert_sql = f"""
+                        insert into crawler_meta_article
+                        (
+                         platform, mode, category, out_account_id, article_index, title, link, read_cnt, like_cnt,
+                         description, publish_time, crawler_time, status, unique_index, llm_sensitivity, title_sensitivity
+                        )
+                        VALUES 
+                        (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s);
+                    """
+                    self.db_client.save(
+                        query=insert_sql,
+                        params=(
+                            "weixin",
+                            "account",
+                            account_mode,
+                            gh_id,
+                            obj["ItemIndex"],
+                            obj["Title"],
+                            obj["ContentUrl"],
+                            show_view_count,
+                            show_like_count,
+                            obj["Digest"],
+                            obj["send_time"],
+                            int(time.time()),
+                            self.DEFAULT_ARTICLE_STATUS,
+                            unique_idx,
+                            obj.get("llm_sensitivity", -1),
+                            title_sensitivity,
+                        ),
+                    )
+                except Exception as e:
+                    print(e)
+
+    def update_latest_account_timestamp(self, gh_id):
+        """
+        更新账号的最新时间戳
+        :return:
+        """
+        select_sql = f"""
+            SELECT publish_time 
+            From crawler_meta_article 
+            WHERE out_account_id = '{gh_id}'
+            ORDER BY publish_time DESC LIMIT 1;
+        """
+        result = self.db_client.fetch(select_sql)
+        time_stamp = result[0][0]
+        dt_str = timestamp_to_str(time_stamp)
+        update_sql = f"""
+            update long_articles_accounts
+            set latest_update_time = %s
+            where gh_id = %s;
+        """
+        self.db_client.save(query=update_sql, params=(dt_str, gh_id))
+
+    def crawl_each_account(self, gh_id, account_mode, latest_time_stamp):
+        """
+        更新账号文章
+        :return:
+        """
+        current_cursor = None
+        while True:
+            # fetch response from weixin
+            response = get_article_list_from_account(
+                account_id=gh_id, index=current_cursor
+            )
+            print(response)
+            msg_list = response.get("data", {}).get("data")
+            if not msg_list:
+                break
+
+            # process current page
+            self.insert_article_into_meta(gh_id, account_mode, msg_list)
+
+            # whether crawl next page
+            last_article_in_this_page = msg_list[-1]
+            last_time_stamp_in_this_msg = last_article_in_this_page["AppMsg"][
+                "BaseInfo"
+            ]["UpdateTime"]
+            if last_time_stamp_in_this_msg > latest_time_stamp:
+                self.update_latest_account_timestamp(gh_id)
+                break
+
+            # update cursor for next page
+            current_cursor = response.get("data", {}).get("next_cursor")
+            if not current_cursor:
+                break
+
+    def crawl_account_list(self, account_list, account_method):
+        for account in tqdm(account_list):
+            try:
+                gh_id = account["gh_id"]
+                account_name = account["account_name"]
+                latest_timestamp = self.get_latest_timestamp(account)
+                self.crawl_each_account(gh_id, account_method, latest_timestamp)
+                self.update_account_read_avg_info(gh_id, account_name)
+
+            except Exception as e:
+                print(f"fail because of {e}")
+                print(traceback.format_exc() )
+
+    def update_account_read_avg_info(self, gh_id, account_name):
+        """
+        calculate read avg info and read_avg_ci_high
+        """
+        position_list = [i for i in range(1, 9)]
+        today_dt = datetime.date.today().isoformat()
+        for position in position_list:
+            fetch_query = f"""
+                select read_cnt, from_unixtime(publish_time, "%Y-%m_%d") as publish_dt from crawler_meta_article
+                where out_account_id = '{gh_id}' and article_index = {position}
+                order by publish_time desc limit 30;
+            """
+            fetch_response = self.db_client.fetch(fetch_query, cursor_type=DictCursor)
+            if fetch_response:
+                read_cnt_list = [i["read_cnt"] for i in fetch_response]
+                n = len(read_cnt_list)
+                read_avg = sum(read_cnt_list) / n
+                max_publish_dt = fetch_response[0]["publish_dt"]
+                remark = f"从{max_publish_dt}开始计算,往前算{len(fetch_response)}天"
+                insert_query = f"""
+                    insert ignore into 
+                        crawler_meta_article_accounts_read_avg
+                        (gh_id, account_name, position, read_avg, dt, status, remark)
+                        values
+                        (%s, %s, %s, %s, %s, %s, %s);
+                """
+                insert_rows = self.db_client.save(
+                    insert_query,
+                    params=(
+                        gh_id,
+                        account_name,
+                        position,
+                        read_avg,
+                        today_dt,
+                        1,
+                        remark,
+                    ),
+                )
+                if insert_rows:
+                    update_query = f"""
+                        update crawler_meta_article_accounts_read_avg
+                        set status = %s 
+                        where gh_id = %s and position = %s and dt < %s;
+                    """
+                    self.db_client.save(update_query, (0, gh_id, position, today_dt))
+
+
+class CrawlerDailyScrapeAccountArticles(GzhArticleCrawler):
+
+    def get_account_list(self, account_method: str) -> List[Dict]:
+        """
+        获取账号
+        :param account_method:
+        :return:
+        """
+        query = f"""
+            select gh_id, account_source, account_name, account_category, latest_update_time
+            from long_articles_accounts 
+            where account_category = '{account_method}' and is_using = {self.ACCOUNT_GOOD_STATUS} and daily_scrape = {self.ACCOUNT_DAILY_SCRAPE};
+        """
+        account_list = self.db_client.fetch(query, cursor_type=DictCursor)
+        return account_list
+
+    def deal(self, method_list):
+        """
+        :param method_list:
+        :return:
+        """
+        # daily 品类账号抓取
+        for account_method in method_list:
+            account_list = self.get_account_list(account_method)
+            self.crawl_account_list(account_list, account_method)
+
+
+class CrawlerAssociationAccountArticles(GzhArticleCrawler):
+
+    def get_association_account_list(self, date_str):
+        """
+        获取账号联想的轮询账号
+        """
+        group_id = date_str[-1]
+        query = f"""
+            select account_id, gh_id, account_name, latest_update_time
+            from long_articles_accounts
+            where account_category = 'account_association' and is_using = {self.ACCOUNT_DAILY_SCRAPE} and daily_scrape = {self.ACCOUNT_NOT_DAILY_SCRAPE};
+        """
+        account_list = self.db_client.fetch(query, cursor_type=DictCursor)
+        today_crawler_account_list = [
+            i for i in account_list if str(i["account_id"])[-1] == group_id
+        ]
+        return today_crawler_account_list
+
+    def deal(self, date_str):
+        account_list = self.get_association_account_list(date_str)
+        self.crawl_account_list(account_list, "account_association")