luojunhui 3 kuukautta sitten
vanhempi
commit
62f590bd32

+ 1 - 1
account_cold_start_daily.py

@@ -108,4 +108,4 @@ def main(category_list=None):
 
 
 if __name__ == '__main__':
-    main()
+    main(['account_association'])

+ 214 - 0
coldStartTasks/crawler/weixin_account_association_crawler.py

@@ -0,0 +1,214 @@
+"""
+@author: luojunhui
+微信账号联想
+"""
+import datetime
+import json
+import traceback
+from typing import List, Set, Dict
+
+from tqdm import tqdm
+from pymysql.cursors import DictCursor
+
+from applications import WeixinSpider, longArticlesMySQL, log, bot, aiditApi
+from applications.const import WeixinVideoCrawlerConst
+from applications.functions import Functions
+from config import apolloConfig
+
+const = WeixinVideoCrawlerConst()
+function = Functions()
+config = apolloConfig()
+
+
+def get_inner_account_gh_id() -> Set[str]:
+    """
+    获取内部账号名称
+    :return:
+    """
+    accounts = aiditApi.get_publish_account_from_aigc()
+    gh_id_list = [i['ghId'] for i in accounts]
+    return set(gh_id_list)
+
+
+class AccountAssociationCrawler(object):
+    """
+    账号抓取
+    """
+
+    def __init__(self):
+        self.db_client = longArticlesMySQL()
+        self.spider = WeixinSpider()
+        self.account_name_filter = json.loads(config.getConfigValue('account_name_filter'))
+        self.crawler_account_count = 0
+        self.total_crawler_count = 0
+        self.inner_account_count = 0
+        self.account_name_filter_count = 0
+        self.already_crawler_account_count = 0
+
+    def is_bad_account(self, account_name: str) -> bool:
+        """
+        判断账号是否为bad account
+        :param account_name:
+        :return:
+        """
+        if account_name == "":
+            return True
+        for key in self.account_name_filter:
+            if key in account_name:
+                return True
+        return False
+
+    def get_seed_titles(self, run_date: datetime) -> List[Dict]:
+        """
+        :return:
+        """
+        publish_timestamp_threshold = int(run_date.timestamp()) - const.STAT_PERIOD
+        sql = f"""
+            SELECT DISTINCT t1.account_name, t1.title, t2.kimi_summary, t2.kimi_keys
+            FROM datastat_sort_strategy t1
+            JOIN long_articles_text t2
+            ON t1.source_id = t2.content_id
+            WHERE t1.read_rate > {const.READ_AVG_MULTIPLE} 
+                AND t1.view_count > {const.MIN_READ_COUNT} 
+                AND publish_timestamp > {publish_timestamp_threshold}
+            ORDER BY read_rate DESC
+            LIMIT 100;
+        """
+        article_obj_list = self.db_client.select(sql, cursor_type=DictCursor)
+        return article_obj_list
+
+    def search_account_in_weixin(self, article_obj: Dict) -> Dict:
+        """
+        通过文章信息使用搜索接口搜索账号
+        :param article_obj:
+        :return:
+        """
+        ori_title = article_obj['title']
+        summary = article_obj['kimi_summary']
+        kimi_keys = json.loads(article_obj['kimi_keys']) if article_obj['kimi_keys'] else None
+        response_1 = self.spider.search_articles(title=ori_title)
+        response_2 = self.spider.search_articles(title=summary) if summary else {}
+        response_3 = self.spider.search_articles(title=", ".join(kimi_keys)) if kimi_keys else {}
+        response = {
+            "title": response_1,
+            "summary": response_2,
+            "kimi_keys": response_3
+        }
+        return response
+
+    def insert_account_into_database(self, account_name: str, gh_id: str, category: str, biz_date: str) -> int:
+        """
+        :param biz_date:
+        :param category:
+        :param account_name:
+        :param gh_id:
+        :return:
+        """
+        insert_sql = f"""
+            INSERT INTO long_articles_accounts
+            (gh_id, account_source, account_name, account_category, init_date)
+            values 
+            (%s, %s, %s, %s, %s)
+        """
+        affected_rows = self.db_client.update(
+            sql=insert_sql,
+            params=(gh_id, "weixin", account_name, category, biz_date)
+        )
+        return affected_rows
+
+    def save_account_into_db(self, search_response: Dict, inner_account_gh_id_set: Set, biz_date: str) -> None:
+        """
+        保存账号信息
+        :param biz_date:
+        :param search_response:
+        :param inner_account_gh_id_set:
+        :return:
+        """
+        for key in search_response:
+            value = search_response[key]
+            if value:
+                search_article_list = value['data']['data']
+                for article in tqdm(search_article_list):
+                    article_url = article['url']
+                    try:
+                        account_info = self.spider.get_account_by_url(article_url)
+                        self.total_crawler_count += 1
+                        account_name = account_info['data']['data']['account_name']
+                        gh_id = account_info['data']['data']['wx_gh']
+                        # 过滤内部账号
+                        if gh_id in inner_account_gh_id_set:
+                            self.inner_account_count += 1
+                            continue
+
+                        # 通过账号名称过滤一些bad_account or dangerous account
+                        if self.is_bad_account(account_name):
+                            self.account_name_filter_count += 1
+                            continue
+                        try:
+                            self.insert_account_into_database(
+                                account_name=account_name,
+                                gh_id=gh_id,
+                                category="account_association",
+                                biz_date=biz_date
+                            )
+                        except Exception as e:
+                            self.already_crawler_account_count += 1
+                            print(e)
+                            continue
+
+                        self.crawler_account_count += 1
+                    except Exception as e:
+                        log(
+                            task="account_association",
+                            function="save_account_into_db",
+                            data={
+                                "biz_date": biz_date,
+                                "article": article,
+                                "trace_back": traceback.format_exc(),
+                                "error": f"{e}"
+                            }
+                        )
+                        continue
+            else:
+                continue
+
+    def run_account_association(self, biz_date: datetime):
+        """
+        执行账号联想
+        :param biz_date:
+        :return:
+        """
+        inner_account_gh_id_set = get_inner_account_gh_id()
+        seed_articles = self.get_seed_titles(biz_date)
+        for article in tqdm(seed_articles):
+            try:
+                # search from weixin
+                search_response = self.search_account_in_weixin(article)
+                # save
+                self.save_account_into_db(
+                    search_response=search_response,
+                    inner_account_gh_id_set=inner_account_gh_id_set,
+                    biz_date=biz_date.strftime("%Y-%m-%d")
+                )
+            except Exception as e:
+                log(
+                    task="account_association",
+                    function="run_account_association",
+                    data={
+                        "biz_date": biz_date,
+                        "article": article,
+                        "trace_back": traceback.format_exc(),
+                        "error": f"{e}"
+                    }
+                )
+        bot(
+            title="账号联想-账号抓取完成",
+            detail={
+                "总共联想到的账号数": self.total_crawler_count,
+                "内部账号过滤": self.inner_account_count,
+                "账号名称过滤": self.account_name_filter_count,
+                "已经抓取账号": self.already_crawler_account_count,
+                "新增账号": self.crawler_account_count
+            }
+        )
+