luojunhui 2 miesięcy temu
rodzic
commit
71028245f7

+ 23 - 11
account_cold_start_daily.py

@@ -4,6 +4,8 @@
 import datetime
 import traceback
 
+from argparse import ArgumentParser
+
 from applications import longArticlesMySQL, bot
 from coldStartTasks.crawler.weixinCategoryCrawler import weixinCategory
 from coldStartTasks.publish.publishCategoryArticles import CategoryColdStartTask
@@ -39,14 +41,14 @@ class AccountColdStartDailyTask(object):
             )
             return False
 
-    def crawler_task(self, category_list):
+    def crawler_task(self, category_list, date_str):
         """
         :return:
         """
         # 初始化category抓取类
         try:
             weixin_category_crawler = weixinCategory(db_client=self.db_client)
-            weixin_category_crawler.deal(category_list=category_list)
+            weixin_category_crawler.deal(category_list=category_list, date_str=date_str)
             bot(
                 title="账号冷启动任务,抓取完成",
                 detail={
@@ -95,7 +97,7 @@ class AccountColdStartDailyTask(object):
             )
 
 
-def main(category_list=None, article_source=None):
+def main(date_str, category_list=None, article_source=None):
     """
     main job, use crontab to do job daily
     :return:
@@ -107,19 +109,29 @@ def main(category_list=None, article_source=None):
     task = AccountColdStartDailyTask()
     if task.init_db():
         if article_source == 'weixin':
-            task.crawler_task(category_list=category_list)
+            task.crawler_task(category_list=category_list, date_str=date_str)
 
         task.publish_task(category_list=category_list, article_source=article_source)
 
 
 if __name__ == '__main__':
-    # 执行微信抓取发布
-    main(['1030-手动挑号'])
+    parser = ArgumentParser()
+    parser.add_argument("--run_date", help="--run_date format: %Y-%m-%d")
+    args = parser.parse_args()
 
-    # # 执行头条发布
-    # main(
-    #     category_list=['history', 'tech', 'finance', 'entertainment'],
-    #     article_source='toutiao'
-    # )
+    if args.run_date:
+        run_date = args.run_date
+    else:
+        run_date = datetime.date.today().isoformat()
+
+    # 执行微信抓取发布
+    main(date_str=run_date)
+
+    # 执行头条发布
+    main(
+        date_str=run_date,
+        category_list=['history', 'tech', 'finance', 'entertainment'],
+        article_source='toutiao'
+    )
 
 

+ 66 - 30
coldStartTasks/crawler/weixinCategoryCrawler.py

@@ -6,12 +6,19 @@
 import time
 
 from tqdm import tqdm
+from pymysql.cursors import DictCursor
+
 from applications import WeixinSpider, Functions, llm_sensitivity, log
 from coldStartTasks.filter import article_crawler_duplicate_filter
 
 # 常量
 ACCOUNT_GOOD_STATUS = 1
+
+# 账号是否每日抓取
 ACCOUNT_DAILY_SCRAPE = 1
+ACCOUNT_NOT_DAILY_SCRAPE = 0
+
+# 默认值
 DEFAULT_VIEW_COUNT = 0
 DEFAULT_LIKE_COUNT = 0
 DEFAULT_ARTICLE_STATUS = 1
@@ -52,6 +59,20 @@ class weixinCategory(object):
         ]
         return result
 
+    def get_association_account_list(self, date_str):
+        """
+        获取账号联想的轮询账号
+        """
+        group_id = date_str[-1]
+        sql = f"""
+            select account_id, gh_id, account_name, latest_update_time
+            from long_articles_accounts
+            where account_category = 'account_association' and is_using = {ACCOUNT_DAILY_SCRAPE} and daily_scrape = {ACCOUNT_NOT_DAILY_SCRAPE};
+        """
+        account_list = self.db_client_lam.select(sql, cursor_type=DictCursor)
+        today_crawler_account_list = [i for i in account_list if str(i['account_id'])[-1] == group_id]
+        return today_crawler_account_list
+
     def insert_data_into_db(self, gh_id, category, article_list):
         """
         将数据更新到数据库
@@ -176,43 +197,57 @@ class weixinCategory(object):
             print("No more data")
             return []
 
-    def deal(self, category_list):
+    def crawler_each_category(self, account_list, category):
         """
+        抓取每个品类
+        :return:
+        """
+        success_records = []
+        for account in tqdm(account_list, desc="crawler_each_category"):
+            try:
+                gh_id = account['gh_id']
+                try:
+                    timestamp = int(account['latest_timestamp'].timestamp())
+                except Exception as e:
+                    timestamp = DEFAULT_TIMESTAMP
+                success_records += self.update_each_account(
+                    gh_id=gh_id,
+                    category=category,
+                    latest_time_stamp=timestamp
+                )
+                print("success")
+            except Exception as e:
+                print("fail because of {}".format(e))
+        success_titles = [x['title'] for x in success_records]
+        if success_titles:
+            try:
+                sensitive_results = llm_sensitivity.check_titles(success_titles)
+                for record, sensitive_result in zip(success_records, sensitive_results):
+                    self.update_article_sensitive_status(
+                        category=category,
+                        unique_index=record['unique_index'],
+                        status=sensitive_result['hit_rule']
+                    )
+            except Exception as e:
+                print("failed to update sensitive status: {}".format(e))
 
+    def deal(self, category_list, date_str):
+        """
         :param category_list:
+        :param date_str: YYYY-MM-DD
         :return:
         """
+        # daily 品类账号抓取
         for category in category_list:
-            success_records = []
             account_list = self.get_account_list(category)
-            for account in tqdm(account_list):
-                try:
-                    gh_id = account['gh_id']
-                    category = account['category']
-                    try:
-                        timestamp = int(account['latest_timestamp'].timestamp())
-                    except Exception as e:
-                        timestamp = DEFAULT_TIMESTAMP
-                    success_records += self.update_each_account(
-                        gh_id=gh_id,
-                        category=category,
-                        latest_time_stamp=timestamp
-                    )
-                    print("success")
-                except Exception as e:
-                    print("fail because of {}".format(e))
-            success_titles = [x['title'] for x in success_records]
-            if success_titles:
-                try:
-                    sensitive_results = llm_sensitivity.check_titles(success_titles)
-                    for record, sensitive_result in zip(success_records, sensitive_results):
-                        self.update_article_sensitive_status(
-                            category=category,
-                            unique_index=record['unique_index'],
-                            status=sensitive_result['hit_rule']
-                        )
-                except Exception as e:
-                    print("failed to update sensitive status: {}".format(e))
+            self.crawler_each_category(account_list=account_list, category=category)
+
+        # 账号联想账号轮询抓取
+        association_account_list = self.get_association_account_list(date_str)
+        self.crawler_each_category(account_list=association_account_list, category="association")
+
+        # 抓完之后,执行相似度打分任务
+        return
 
     def deal_accounts(self, account_list):
         """
@@ -234,6 +269,7 @@ class weixinCategory(object):
                 try:
                     latest_timestamp = account[3].timestamp()
                 except Exception as e:
+                    print(e)
                     latest_timestamp = DEFAULT_TIMESTAMP
                 self.update_each_account(
                     gh_id=gh_id,