Jelajahi Sumber

Merge branch '2024-12-31-account-association-dev' of luojunhui/LongArticlesJob into master

luojunhui 6 bulan lalu
induk
melakukan
696e921a27

+ 1 - 0
.gitignore

@@ -21,6 +21,7 @@ lib64/
 parts/
 sdist/
 var/
+test/
 *.egg-info/
 .installed.cfg
 *.egg

+ 44 - 0
account_association_task.py

@@ -0,0 +1,44 @@
+"""
+@author: luojunhui
+账号联想---账号搜索任务
+"""
+import traceback
+
+from argparse import ArgumentParser
+from datetime import datetime
+
+from applications import bot
+from coldStartTasks.crawler.weixin_account_association_crawler import AccountAssociationCrawler
+
+
+def main():
+    """
+    账号联想---账号搜索任务
+    :return:
+    """
+    parser = ArgumentParser()
+    parser.add_argument("--run-date",
+                        help="Run only once for date in format of %Y-%m-%d. \
+                                If no specified, run as daily jobs.")
+    args = parser.parse_args()
+
+    if args.run_date:
+        biz_date_str = args.run_date
+    else:
+        biz_date_str = datetime.today().strftime('%Y-%m-%d')
+    try:
+        biz_date = datetime.strptime(biz_date_str, "%Y-%m-%d")
+        account_association_crawler = AccountAssociationCrawler()
+        account_association_crawler.run_account_association(biz_date=biz_date)
+    except Exception as e:
+        bot(
+            title="账号联想任务失败",
+            detail={
+                "error": str(e),
+                "error_stack": traceback.format_exc()
+            }
+        )
+
+
+if __name__ == '__main__':
+    main()

+ 18 - 0
applications/const.py

@@ -157,6 +157,24 @@ class UpdateMiniProgramDetailConst(updatePublishedMsgTaskConst):
     """
 
 
+# 账号联想
+class AccountAssociationTaskConst:
+    """
+    账号联想任务常量配置
+    """
+    # 获取种子标题的统计周期
+    STAT_PERIOD = 7 * 24 * 60 * 60
+
+    # 阅读均值阈值
+    READ_AVG_MULTIPLE = 1.3
+
+    # 最小阅读量
+    MIN_READ_COUNT = 2000
+
+    # 种子数量限制
+    SEED_TITLE_LIMIT = 100
+
+
 
 
 

+ 248 - 0
coldStartTasks/crawler/weixin_account_association_crawler.py

@@ -0,0 +1,248 @@
+"""
+@author: luojunhui
+微信账号联想
+"""
+import datetime
+import json
+import traceback
+from typing import List, Set, Dict
+
+from tqdm import tqdm
+from pymysql.cursors import DictCursor
+
+from applications import aiditApi
+from applications import bot
+from applications import log
+from applications import longArticlesMySQL
+from applications import WeixinSpider
+from applications.const import AccountAssociationTaskConst
+from applications.functions import Functions
+from config import apolloConfig
+
+const = AccountAssociationTaskConst()
+function = Functions()
+config = apolloConfig()
+empty_dict = {}
+
+
+def get_inner_account_gh_id() -> Set[str]:
+    """
+    获取内部账号名称
+    :return:
+    """
+    accounts = aiditApi.get_publish_account_from_aigc()
+    gh_id_list = [i['ghId'] for i in accounts]
+    return set(gh_id_list)
+
+
+class AccountAssociationCrawler(object):
+    """
+    账号抓取
+    """
+
+    def __init__(self):
+        self.db_client = longArticlesMySQL()
+        self.spider = WeixinSpider()
+        self.account_name_filter = json.loads(config.getConfigValue('account_name_filter'))
+        self.crawler_account_count = 0
+        self.total_crawler_count = 0
+        self.inner_account_count = 0
+        self.account_name_filter_count = 0
+        self.already_crawler_account_count = 0
+        self.official_accounts = 0
+
+    def is_bad_account(self, account_name: str) -> bool:
+        """
+        判断账号是否为bad account
+        :param account_name:
+        :return:
+        """
+        if account_name == "":
+            return True
+        for key in self.account_name_filter:
+            if key in account_name:
+                return True
+        return False
+
+    def is_account_official(self, gh_id: str) -> bool:
+        """
+        判断账号是否为官方账号
+        :param gh_id:
+        :return: True or False
+        """
+        response = self.spider.update_msg_list(ghId=gh_id, index=None)
+        article_list = response['data']['data']
+        published_articles_send_date = []
+        for item in article_list:
+            if item.get("AppMsg", empty_dict).get("BaseInfo", empty_dict).get("Type") == 9:
+                # 获取群发头条的send_time
+                send_time = item['AppMsg']['DetailInfo'][0]['send_time']
+                send_date = datetime.datetime.fromtimestamp(send_time).strftime('%Y-%m-%d')
+                published_articles_send_date.append(send_date)
+
+        published_articles_send_date_set = set(published_articles_send_date)
+        if len(published_articles_send_date_set) == len(published_articles_send_date):
+            return False
+        else:
+            return True
+ 
+    def get_seed_titles(self, run_date: datetime) -> List[Dict]:
+        """
+        :return:
+        """
+        publish_timestamp_threshold = int(run_date.timestamp()) - const.STAT_PERIOD
+        sql = f"""
+            SELECT DISTINCT t1.account_name, t1.title, t2.kimi_summary, t2.kimi_keys
+            FROM datastat_sort_strategy t1
+            JOIN long_articles_text t2
+            ON t1.source_id = t2.content_id
+            WHERE t1.read_rate > {const.READ_AVG_MULTIPLE} 
+                AND t1.view_count > {const.MIN_READ_COUNT} 
+                AND publish_timestamp > {publish_timestamp_threshold}
+            ORDER BY read_rate DESC
+            LIMIT {const.SEED_TITLE_LIMIT};
+        """
+        article_obj_list = self.db_client.select(sql, cursor_type=DictCursor)
+        return article_obj_list
+
+    def search_account_in_weixin(self, article_obj: Dict) -> Dict:
+        """
+        通过文章信息使用搜索接口搜索账号
+        :param article_obj:
+        :return:
+        """
+        ori_title = article_obj['title']
+        summary = article_obj['kimi_summary']
+        kimi_keys = json.loads(article_obj['kimi_keys']) if article_obj['kimi_keys'] else None
+        response_1 = self.spider.search_articles(title=ori_title)
+        response_2 = self.spider.search_articles(title=summary) if summary else {}
+        response_3 = self.spider.search_articles(title=", ".join(kimi_keys)) if kimi_keys else {}
+        response = {
+            "title": response_1,
+            "summary": response_2,
+            "kimi_keys": response_3
+        }
+        return response
+
+    def insert_account_into_database(self, account_name: str, gh_id: str, category: str, biz_date: str) -> int:
+        """
+        :param biz_date:
+        :param category:
+        :param account_name:
+        :param gh_id:
+        :return:
+        """
+        insert_sql = f"""
+            INSERT INTO long_articles_accounts
+            (gh_id, account_source, account_name, account_category, init_date)
+            values 
+            (%s, %s, %s, %s, %s)
+        """
+        affected_rows = self.db_client.update(
+            sql=insert_sql,
+            params=(gh_id, "weixin", account_name, category, biz_date)
+        )
+        return affected_rows
+
+    def save_account_into_db(self, search_response: Dict, inner_account_gh_id_set: Set, biz_date: str) -> None:
+        """
+        保存账号信息
+        :param biz_date:
+        :param search_response:
+        :param inner_account_gh_id_set:
+        :return:
+        """
+        for key in search_response:
+            value = search_response[key]
+            if value:
+                search_article_list = value['data']['data']
+                for article in tqdm(search_article_list):
+                    article_url = article['url']
+                    try:
+                        account_info = self.spider.get_account_by_url(article_url)
+                        self.total_crawler_count += 1
+                        account_name = account_info['data']['data']['account_name']
+                        gh_id = account_info['data']['data']['wx_gh']
+                        # 过滤内部账号
+                        if gh_id in inner_account_gh_id_set:
+                            self.inner_account_count += 1
+                            continue
+
+                        # 通过账号名称过滤一些bad_account or dangerous account
+                        if self.is_bad_account(account_name):
+                            self.account_name_filter_count += 1
+                            continue
+
+                        # 判断账号是否为官方账号
+                        if self.is_account_official(gh_id):
+                            self.official_accounts += 1
+                            continue
+
+                        try:
+                            self.insert_account_into_database(
+                                account_name=account_name,
+                                gh_id=gh_id,
+                                category="account_association",
+                                biz_date=biz_date
+                            )
+                        except Exception as e:
+                            self.already_crawler_account_count += 1
+                            print(e)
+                            continue
+
+                        self.crawler_account_count += 1
+                    except Exception as e:
+                        log(
+                            task="account_association",
+                            function="save_account_into_db",
+                            data={
+                                "biz_date": biz_date,
+                                "article": article,
+                                "trace_back": traceback.format_exc(),
+                                "error": f"{e}"
+                            }
+                        )
+                        continue
+            else:
+                continue
+
+    def run_account_association(self, biz_date: datetime):
+        """
+        执行账号联想
+        :param biz_date:
+        :return:
+        """
+        inner_account_gh_id_set = get_inner_account_gh_id()
+        seed_articles = self.get_seed_titles(biz_date)
+        for article in tqdm(seed_articles):
+            try:
+                # search from weixin
+                search_response = self.search_account_in_weixin(article)
+                # save
+                self.save_account_into_db(
+                    search_response=search_response,
+                    inner_account_gh_id_set=inner_account_gh_id_set,
+                    biz_date=biz_date.strftime("%Y-%m-%d")
+                )
+            except Exception as e:
+                log(
+                    task="account_association",
+                    function="run_account_association",
+                    data={
+                        "biz_date": biz_date,
+                        "article": article,
+                        "trace_back": traceback.format_exc(),
+                        "error": f"{e}"
+                    }
+                )
+        bot(
+            title="账号联想-账号抓取完成",
+            detail={
+                "总共联想到的账号数": self.total_crawler_count,
+                "内部账号过滤": self.inner_account_count,
+                "账号名称过滤": self.account_name_filter_count,
+                "官方账号过滤": self.official_accounts,
+                "已经抓取账号": self.already_crawler_account_count,
+                "新增账号": self.crawler_account_count
+            }
+        )