Quellcode durchsuchen

Merge branch '2024-10-30-luojunhui-add-daily-accounts-crawler' of luojunhui/LongArticlesJob into master

luojunhui vor 11 Monaten
Ursprung
Commit
db9b734d3c

+ 114 - 0
account_cold_start_daily.py

@@ -0,0 +1,114 @@
+"""
+@author: luojunhui
+"""
+import datetime
+import traceback
+
+from applications import longArticlesMySQL, bot
+from coldStartTasks.crawler.weixinCategoryCrawler import weixinCategory
+from coldStartTasks.publish.publishCategoryArticles import CategoryColdStartTask
+
+
+class AccountColdStartDailyTask(object):
+    """
+    账号冷启动代码
+    """
+
+    def __init__(self):
+        """
+        """
+        self.db_client = None
+        self.default_category = '1030-手动挑号'
+
+    def init_db(self):
+        """
+        初始化数据库
+        :return:
+        """
+        try:
+            self.db_client = longArticlesMySQL()
+            return True
+        except Exception as e:
+            bot(
+                title='账号抓取任务, 冷启动数据库连接失败',
+                detail={
+                    "error": str(e),
+                    "error_msg": traceback.format_exc()
+                }
+            )
+            return False
+
+    def crawler_task(self, category_list=None):
+        """
+        :return:
+        """
+        if not category_list:
+            category_list = [self.default_category]
+
+        # 初始化category抓取类
+        try:
+            weixin_category_crawler = weixinCategory()
+            weixin_category_crawler.deal(category_list=category_list)
+            bot(
+                title="账号冷启动任务,抓取完成",
+                detail={
+                    "finish_time": datetime.datetime.today().strftime('%Y-%m-%d %H:%M:%S'),
+                    "category": category_list
+                },
+                mention=False
+            )
+        except Exception as e:
+            bot(
+                title="账号抓取冷启动任务,抓取失败",
+                detail={
+                    "error": str(e),
+                    "error_msg": traceback.format_exc()
+                }
+            )
+
+    def publish_task(self, category_list=None):
+        """
+        将账号文章发布到aigc抓取计划,并且绑定生成计划
+        :param category_list:
+        :return:
+        """
+        if not category_list:
+            category_list = [self.default_category]
+        try:
+            weixin_category_publisher = CategoryColdStartTask(db_client=self.db_client)
+            weixin_category_publisher.do_job(
+                category_list=category_list
+            )
+            bot(
+                title="账号冷启任务,发布完成",
+                detail={
+                    "finish_time": datetime.datetime.today().strftime('%Y-%m-%d %H:%M:%S'),
+                    "category": category_list
+                },
+                mention=False
+            )
+        except Exception as e:
+            bot(
+                title="账号发布冷启动任务,抓取失败",
+                detail={
+                    "error": str(e),
+                    "error_msg": traceback.format_exc()
+                }
+            )
+
+
+def main():
+    """
+    main job, use crontab to do job daily
+    todo: 1. 开放一个输入可以输入指定品类  2. 增加对指定账号的抓取&&发布
+    :return:
+    """
+    task = AccountColdStartDailyTask()
+    if task.init_db():
+        category_list = None
+        task.crawler_task(category_list=category_list)
+        task.publish_task(category_list=category_list)
+
+
+if __name__ == '__main__':
+    main()

+ 25 - 0
applications/aiditApi.py

@@ -30,6 +30,31 @@ PERSON_COOKIE = {
     "uid": 1
 }
 
+def get_generated_article_list(plan_id):
+    db = DeNetMysql()
+    sql = f"""
+        SELECT 
+            account.wx_gh,
+            content.title,
+            content.content_link,
+            content.view_count,
+            content.like_count,
+            from_unixtime(cprr.create_timestamp / 1000) AS 抓取时间,
+            from_unixtime(content.publish_timestamp / 1000) AS 发布时间
+        FROM crawler_plan_result_rel cprr
+        JOIN crawler_plan plan ON cprr.plan_id = plan.id
+        JOIN crawler_content content ON cprr.channel_source_id = content.channel_content_id
+        JOIN crawler_account account ON content.channel_account_id = account.channel_account_id
+        WHERE plan_id IN (
+            SELECT
+                input_source_value
+            FROM
+                produce_plan_input_source
+            WHERE plan_id = '{plan_id}'
+            );
+                """
+    article_list = db.select(sql)
+    return article_list
 
 def get_generated_article_title(generate_task_id):
     """

+ 4 - 0
applications/longArticlesMysql.py

@@ -25,7 +25,9 @@ class longArticlesMySQL(object):
         """
         cursor = cls.connection.cursor()
         cursor.execute(sql, params)
+        affected_rows = cursor.rowcount
         cls.connection.commit()
+        return affected_rows
 
     @classmethod
     def select(cls, sql):
@@ -49,7 +51,9 @@ class longArticlesMySQL(object):
         cursor = cls.connection.cursor()
         try:
             cursor.executemany(query=sql, args=params_list)
+            affected_rows = cursor.rowcount
             cls.connection.commit()
+            return affected_rows
         except Exception as e:
             print("Insert Many Defeat--{}".format(e))
             cls.connection.rollback()

+ 32 - 0
coldStartTasks/crawler/weixinCategoryCrawler.py

@@ -16,6 +16,7 @@ DEFAULT_LIKE_COUNT = 0
 DEFAULT_ARTICLE_STATUS = 1
 DEFAULT_TIMESTAMP = 1704038400
 
+
 class weixinCategory(object):
     """
     微信全局品类账号抓取
@@ -163,3 +164,34 @@ class weixinCategory(object):
                     print("success")
                 except Exception as e:
                     print("fail because of {}".format(e))
+
+    def deal_accounts(self, account_list):
+        """
+        input account list
+        :param account_list:
+        :return:
+        """
+        account_tuple = tuple(account_list)
+        sql = f"""
+            SELECT gh_id, account_name, account_category, latest_update_time 
+            FROM long_articles_accounts
+            WHERE account_name in {account_tuple};
+        """
+        response = self.db_client_lam.select(sql)
+        for account in tqdm(response):
+            try:
+                gh_id = account[0]
+                category = account[2]
+                try:
+                    latest_timestamp = account[3].timestamp()
+                except Exception as e:
+                    latest_timestamp = DEFAULT_TIMESTAMP
+                self.update_each_account(
+                    gh_id=gh_id,
+                    category=category,
+                    latest_time_stamp=latest_timestamp
+                )
+            except Exception as e:
+                print(e)
+
+

+ 49 - 45
coldStartTasks/publish/publishCategoryArticles.py

@@ -36,7 +36,6 @@ class CategoryColdStartTask(object):
             task="category_publish_task",
             function="__init__",
             message="数据库初始化连接完成,apollo配置获取完成",
-            status="success",
             data={
                 "category": self.category_map,
                 "threshold": self.category_cold_start_threshold
@@ -61,7 +60,6 @@ class CategoryColdStartTask(object):
             task="category_publish_task",
             function="get_articles_from_meta_table",
             message="获取品类文章总数",
-            status="success",
             data={
                 "total_articles": len(article_list),
                 "category": category
@@ -77,28 +75,7 @@ class CategoryColdStartTask(object):
         """
         plan_id = self.category_map.get(category)
         if plan_id:
-            sql = f"""
-            SELECT 
-                account.wx_gh,
-                content.title,
-                content.content_link,
-                content.view_count,
-                content.like_count,
-                from_unixtime(cprr.create_timestamp / 1000) AS 抓取时间,
-                from_unixtime(content.publish_timestamp / 1000) AS 发布时间
-            FROM crawler_plan_result_rel cprr
-            JOIN crawler_plan plan ON cprr.plan_id = plan.id
-            JOIN crawler_content content ON cprr.channel_source_id = content.channel_content_id
-            JOIN crawler_account account ON content.channel_account_id = account.channel_account_id
-            WHERE plan_id IN (
-                SELECT
-                    input_source_value
-                FROM
-                    produce_plan_input_source
-                WHERE plan_id = '{plan_id}'
-                );
-            """
-            article_list = self.db_client.select(sql)
+            article_list = aiditApi.get_generated_article_list(plan_id)
             title_list = [i[1] for i in article_list]
             if title_list:
                 # update
@@ -153,35 +130,64 @@ class CategoryColdStartTask(object):
         """
         articles_df['average_read'] = articles_df.groupby(['gh_id', 'position'])['read_cnt'].transform('mean')
         articles_df['read_times'] = articles_df['read_cnt'] / articles_df['average_read']
-        filter_df = articles_df[
-            (articles_df['read_times'] >= self.READ_TIMES_THRESHOLD)
-            & (articles_df['read_cnt'] >= self.READ_THRESHOLD)
-            & (articles_df['title'].str.len() > self.LIMIT_TITLE_LENGTH)
-            & (~articles_df['title'].str.contains('农历'))
-            & (~articles_df['title'].str.contains('太极'))
-            & (~articles_df['title'].str.contains('节'))
-            & (~articles_df['title'].str.contains('早上好'))
-            & (~articles_df['title'].str.contains('赖清德'))
-            & (~articles_df['title'].str.contains('普京'))
-            & (~articles_df['title'].str.contains('俄'))
-            & (~articles_df['title'].str.contains('南海'))
-            & (~articles_df['title'].str.contains('台海'))
-            & (~articles_df['title'].str.contains('解放军'))
-            & (~articles_df['title'].str.contains('蔡英文'))
-            & (~articles_df['title'].str.contains('中国'))
-            ]
+        total_length = articles_df.shape[0]
+        # 第一层漏斗通过阅读均值倍数过滤
+        first_level_funnel_df = articles_df[articles_df['read_times'] >= self.READ_TIMES_THRESHOLD]
+        first_level_funnel_length = first_level_funnel_df.shape[0]
+
+        # 第二层漏斗通过阅读量过滤
+        second_level_funnel_df = first_level_funnel_df[
+            first_level_funnel_df['read_cnt'] >= self.READ_THRESHOLD
+        ]
+        second_level_funnel_length = second_level_funnel_df.shape[0]
+
+        # 第三层漏斗通过标题长度过滤
+        third_level_funnel_df = second_level_funnel_df[
+            second_level_funnel_df['title'].str.len() >= self.LIMIT_TITLE_LENGTH
+        ]
+        third_level_funnel_length = third_level_funnel_df.shape[0]
 
+        # 最后一层通过敏感词过滤
+        filter_df = third_level_funnel_df[
+            (~third_level_funnel_df['title'].str.contains('农历'))
+            & (~third_level_funnel_df['title'].str.contains('太极'))
+            & (~third_level_funnel_df['title'].str.contains('节'))
+            & (~third_level_funnel_df['title'].str.contains('早上好'))
+            & (~third_level_funnel_df['title'].str.contains('赖清德'))
+            & (~third_level_funnel_df['title'].str.contains('普京'))
+            & (~third_level_funnel_df['title'].str.contains('俄'))
+            & (~third_level_funnel_df['title'].str.contains('南海'))
+            & (~third_level_funnel_df['title'].str.contains('台海'))
+            & (~third_level_funnel_df['title'].str.contains('解放军'))
+            & (~third_level_funnel_df['title'].str.contains('蔡英文'))
+            & (~third_level_funnel_df['title'].str.contains('中国'))
+            ]
+        final_length = filter_df.shape[0]
         url_list = filter_df['link'].values.tolist()
         log(
             task="category_publish_task",
             function="publish_filter_articles",
             message="过滤后文章总数",
-            status="success",
             data={
-                "total_articles": len(url_list),
+                "total_articles": final_length,
                 "category": category
             }
         )
+        bot(
+            title="冷启任务发布通知",
+            detail={
+                "总文章数量": total_length,
+                "通过阅读均值倍数过滤": "过滤数量: {}    剩余数量: {}".format(total_length - first_level_funnel_length, first_level_funnel_length),
+                "通过阅读量过滤": "过滤数量: {}    剩余数量: {}".format(first_level_funnel_length - second_level_funnel_length, second_level_funnel_length),
+                "通过标题长度过滤": "过滤数量: {}    剩余数量: {}".format(second_level_funnel_length - third_level_funnel_length, third_level_funnel_length),
+                "通过敏感词过滤":  "过滤数量: {}    剩余数量: {}".format(third_level_funnel_length - final_length, final_length),
+                "品类": category,
+                "阅读均值倍数阈值": self.READ_TIMES_THRESHOLD,
+                "阅读量阈值": self.READ_THRESHOLD,
+                "标题长度阈值": self.LIMIT_TITLE_LENGTH
+            },
+            mention=False
+        )
         if url_list:
             crawler_plan_response = aiditApi.auto_create_crawler_task(
                 plan_id=None,
@@ -193,7 +199,6 @@ class CategoryColdStartTask(object):
                 task="category_publish_task",
                 function="publish_filter_articles",
                 message="成功创建抓取计划",
-                status="success",
                 data=crawler_plan_response
             )
             # auto bind to generate plan
@@ -217,7 +222,6 @@ class CategoryColdStartTask(object):
                 task="category_publish_task",
                 function="publish_filter_articles",
                 message="成功绑定到生成计划",
-                status="success",
                 data=generate_plan_response
             )
             article_id_list = filter_df['article_id'].values.tolist()

+ 24 - 0
config/crontab_backup

@@ -0,0 +1,24 @@
+# update read rate daily at 10:00 AM everyday
+0 10 * * * bash /root/luojunhui/LongArticlesJob/sh/run_update_account_read_rate_avg.sh
+
+# update account read avg at 10:40 AM everyday
+40 10 * * * bash /root/luojunhui/LongArticlesJob/sh/run_update_account_avg_v3.sh
+
+# check publish videos status at the 20 min of each hour
+20 * * * * bash /root/luojunhui/LongArticlesJob/sh/run_check_video_status_hourly.sh
+
+
+# update published articles at 20:50 everyday
+50 20 * * * bash /root/luojunhui/LongArticlesJob/sh/run_update_published_articles_daily.sh
+
+
+# 每天上午 9:30 点,下午 2 点,晚上 7 点执行下架视频任务
+30 9 * * * bash /root/luojunhui/LongArticlesJob/sh/run_get_off_videos_three_times_per_day.sh
+
+0 14 * * * bash /root/luojunhui/LongArticlesJob/sh/run_get_off_videos_three_times_per_day.sh
+
+0 19 * * * bash /root/luojunhui/LongArticlesJob/sh/run_get_off_videos_three_times_per_day.sh
+
+# 每天上午8点执行账号冷启动任务
+0 8 * * * bash /root/luojunhui/LongArticlesJob/sh/run_account_cold_start_daily.sh
+

+ 26 - 0
sh/run_account_cold_start_daily.sh

@@ -0,0 +1,26 @@
+#!/bin/bash
+
+# 获取当前日期,格式为 YYYY-MM-DD
+CURRENT_DATE=$(date +%F)
+
+# 日志文件路径,包含日期
+LOG_FILE="/root/luojunhui/logs/account_cold_start_task_log_$CURRENT_DATE.txt"
+
+# 重定向整个脚本的输出到带日期的日志文件
+exec >> "$LOG_FILE" 2>&1
+if pgrep -f "python3 account_cold_start_daily.py" > /dev/null
+then
+    echo "$(date '+%Y-%m-%d %H:%M:%S') - account_cold_start_daily.py is running"
+else
+    echo "$(date '+%Y-%m-%d %H:%M:%S') - trying to restart account_cold_start_daily.py"
+    # 切换到指定目录
+    cd /root/luojunhui/LongArticlesJob
+
+    # 激活 Conda 环境
+    source /root/miniconda3/etc/profile.d/conda.sh
+    conda activate tasks
+
+    # 在后台运行 Python 脚本并重定向日志输出
+    nohup python3 account_cold_start_daily.py >> "${LOG_FILE}" 2>&1 &
+    echo "$(date '+%Y-%m-%d %H:%M:%S') - successfully restarted account_cold_start_daily.py"
+fi