罗俊辉 6 месяцев назад
Родитель
Сommit
0107fdbf2a

+ 1 - 1
applications/aiditApi.py

@@ -187,7 +187,7 @@ def bind_crawler_task_to_generate_task(crawler_task_list, generate_task_id):
         "baseInfo": PERSON_COOKIE
     })
     response = requests.request("POST", url, headers=HEADERS, data=payload)
-    return response
+    return response.json()
 
 
 @retryOnTimeout()

+ 2 - 1
applications/denetMysql.py

@@ -25,8 +25,9 @@ class DeNetMysql(object):
         :return:
         """
         cursor = cls.connection.cursor()
-        cursor.execute(sql, params)
+        affect_rows = cursor.execute(sql, params)
         cls.connection.commit()
+        return affect_rows
 
     @classmethod
     def select(cls, sql):

+ 0 - 0
coldStartTasks/__init__.py → coldStartTasks/crawler/__init__.py


+ 0 - 0
spider/weixinAssociationCrawler.py → coldStartTasks/crawler/weixinAssociationCrawler.py


+ 0 - 0
spider/weixinCategoryCrawler.py → coldStartTasks/crawler/weixinCategoryCrawler.py


+ 0 - 0
spider/weixinRelativeAccountCrawler.py → coldStartTasks/crawler/weixinRelativeAccountCrawler.py


+ 3 - 0
coldStartTasks/publish/__init__.py

@@ -0,0 +1,3 @@
+"""
+@author: luojunhui
+"""

+ 0 - 0
coldStartTasks/publishAccountAssociationArticles.py → coldStartTasks/publish/publishAccountAssociationArticles.py


+ 0 - 0
coldStartTasks/publishArticleAssociationArticles.py → coldStartTasks/publish/publishArticleAssociationArticles.py


+ 256 - 0
coldStartTasks/publish/publishCategoryArticles.py

@@ -0,0 +1,256 @@
+"""
+@author: luojunhui
+品类文章发布到aigc系统的冷启层
+"""
+import datetime
+import json
+
+from pandas import DataFrame
+
+from applications import aiditApi, log, bot, DeNetMysql
+from config import apolloConfig
+
+apollo = apolloConfig()
+
+
+class CategoryColdStartTask(object):
+    """
+    品类冷启动发布任务
+    """
+    PUBLISHED_STATUS = 2
+    INIT_STATUS = 1
+    BAD_STATUS = 0
+
+    def __init__(self, db_client):
+        """
+
+        :param db_client:
+        """
+        self.db_client = db_client
+        self.category_map = json.loads(apollo.getConfigValue("category_cold_start_map"))
+        self.category_cold_start_threshold = json.loads(apollo.getConfigValue("category_cold_start_threshold"))
+        self.READ_THRESHOLD = self.category_cold_start_threshold.get("READ_THRESHOLD", 5000)
+        self.READ_TIMES_THRESHOLD = self.category_cold_start_threshold.get("READ_TIMES_THRESHOLD", 1.3)
+        self.LIMIT_TITLE_LENGTH = self.category_cold_start_threshold.get("LIMIT_TITLE_LENGTH", 15)
+        log(
+            task="category_publish_task",
+            function="__init__",
+            message="数据库初始化连接完成,apollo配置获取完成",
+            status="success",
+            data={
+                "category": self.category_map,
+                "threshold": self.category_cold_start_threshold
+            }
+        )
+
+    def get_articles_from_meta_table(self, category):
+        """
+        从长文 meta 库中获取冷启文章
+        :return:
+        """
+        sql = f"""
+        SELECT 
+            article_id, out_account_id, article_index, title, link, read_cnt
+        FROM
+            crawler_meta_article
+        WHERE 
+            category = "{category}" and status = '{self.INIT_STATUS}';
+        """
+        article_list = self.db_client.select(sql)
+        log(
+            task="category_publish_task",
+            function="get_articles_from_meta_table",
+            message="获取品类文章总数",
+            status="success",
+            data={
+                "total_articles": len(article_list),
+                "category": category
+            }
+        )
+        article_df = DataFrame(article_list, columns=['article_id', 'gh_id', 'position', 'title', 'link', 'read_cnt'])
+        return article_df
+
+    def change_article_status(self, category):
+        """
+        已经发布到生成计划中的 id,
+        :return:
+        """
+        plan_id = self.category_map.get(category)
+        if plan_id:
+            sql = f"""
+            SELECT 
+                account.wx_gh,
+                content.title,
+                content.content_link,
+                content.view_count,
+                content.like_count,
+                from_unixtime(cprr.create_timestamp / 1000) AS 抓取时间,
+                from_unixtime(content.publish_timestamp / 1000) AS 发布时间
+            FROM crawler_plan_result_rel cprr
+            JOIN crawler_plan plan ON cprr.plan_id = plan.id
+            JOIN crawler_content content ON cprr.channel_source_id = content.channel_content_id
+            JOIN crawler_account account ON content.channel_account_id = account.channel_account_id
+            WHERE plan_id IN (
+                SELECT
+                    input_source_value
+                FROM
+                    produce_plan_input_source
+                WHERE plan_id = '{plan_id}'
+                );
+            """
+            article_list = self.db_client.select(sql)
+            title_list = [i[1] for i in article_list]
+            if title_list:
+                # update
+                update_sql = f"""
+                UPDATE 
+                    crawler_meta_article
+                SET
+                    status = %s
+                WHERE
+                    title in %s and status = %s;
+                """
+                self.db_client.update(
+                    sql=update_sql,
+                    params=(self.PUBLISHED_STATUS, tuple(title_list), self.INIT_STATUS)
+                )
+        else:
+            return
+
+    def change_article_status_while_publishing(self, article_id_list):
+        """
+
+        :param: article_id_list: 文章的唯一 id
+        :return:
+        """
+        update_sql = f"""
+                    UPDATE 
+                        crawler_meta_article
+                    SET
+                        status = %s
+                    WHERE
+                        article_id in %s and status = %s;
+                    """
+        affect_rows = self.db_client.update(
+            sql=update_sql,
+            params=(self.PUBLISHED_STATUS, tuple(article_id_list), self.INIT_STATUS)
+        )
+        if affect_rows != len(article_id_list):
+            bot(
+                title="品类冷启任务中,出现更新状文章状态失败异常",
+                detail={
+                    "affected_rows": affect_rows,
+                    "task_rows": len(article_id_list)
+                }
+            )
+
+    def publish_filter_articles(self, category, articles_df):
+        """
+        过滤文章
+        :param category:
+        :param articles_df:
+        :return:
+        """
+        articles_df['average_read'] = articles_df.groupby(['gh_id', 'position'])['read_cnt'].transform('mean')
+        articles_df['read_times'] = articles_df['read_cnt'] / articles_df['average_read']
+        filter_df = articles_df[
+            (articles_df['read_times'] >= self.READ_TIMES_THRESHOLD)
+            & (articles_df['read_cnt'] >= self.READ_THRESHOLD)
+            & (articles_df['title'].str.len() > self.LIMIT_TITLE_LENGTH)
+            & (~articles_df['title'].str.contains('农历'))
+            & (~articles_df['title'].str.contains('太极'))
+            & (~articles_df['title'].str.contains('节'))
+            & (~articles_df['title'].str.contains('早上好'))
+            & (~articles_df['title'].str.contains('赖清德'))
+            & (~articles_df['title'].str.contains('普京'))
+            & (~articles_df['title'].str.contains('俄'))
+            & (~articles_df['title'].str.contains('南海'))
+            & (~articles_df['title'].str.contains('台海'))
+            & (~articles_df['title'].str.contains('解放军'))
+            & (~articles_df['title'].str.contains('蔡英文'))
+            & (~articles_df['title'].str.contains('中国'))
+            ]
+
+        url_list = filter_df['link'].values.tolist()
+        log(
+            task="category_publish_task",
+            function="publish_filter_articles",
+            message="过滤后文章总数",
+            status="success",
+            data={
+                "total_articles": len(url_list),
+                "category": category
+            }
+        )
+        if url_list:
+            crawler_plan_response = aiditApi.auto_create_crawler_task(
+                plan_id=None,
+                plan_name="自动绑定-{}--{}--{}".format(category, datetime.date.today().__str__(), len(url_list)),
+                plan_tag="品类冷启动",
+                url_list=url_list
+            )
+            log(
+                task="category_publish_task",
+                function="publish_filter_articles",
+                message="成功创建抓取计划",
+                status="success",
+                data=crawler_plan_response
+            )
+            # auto bind to generate plan
+            new_crawler_task_list = [
+                {
+                    "contentType": 1,
+                    "inputSourceType": 2,
+                    "inputSourceSubType": None,
+                    "fieldName": None,
+                    "inputSourceValue": crawler_plan_response['data']['id'],
+                    "inputSourceLabel": crawler_plan_response['data']['name'],
+                    "inputSourceModal": 3,
+                    "inputSourceChannel": 5
+                }
+            ]
+            generate_plan_response = aiditApi.bind_crawler_task_to_generate_task(
+                crawler_task_list=new_crawler_task_list,
+                generate_task_id=self.category_map[category]
+            )
+            log(
+                task="category_publish_task",
+                function="publish_filter_articles",
+                message="成功绑定到生成计划",
+                status="success",
+                data=generate_plan_response
+            )
+            article_id_list = filter_df['article_id'].values.tolist()
+            self.change_article_status_while_publishing(article_id_list=article_id_list)
+
+    def do_job(self):
+        """
+        执行任务
+        :return:
+        """
+        category_list = self.category_map.keys()
+        log(
+            task="category_publish_task",
+            function="do_job",
+            message="开始自动创建品类文章抓取计划",
+            status="success",
+            data={
+                "category_list": list(category_list)
+            }
+        )
+        for category in category_list:
+            try:
+                category_df = self.get_articles_from_meta_table(category=category)
+                self.publish_filter_articles(
+                    category=category,
+                    articles_df=category_df
+                )
+            except Exception as e:
+                bot(
+                    title="品类冷启任务报错",
+                    detail={
+                        "category": category,
+                        "error": str(e),
+                        "function": "do_job"
+                    }
+                )

+ 0 - 146
coldStartTasks/publishCategoryArticles.py

@@ -1,146 +0,0 @@
-"""
-@author: luojunhui
-品类文章发布到aigc系统的冷启层
-"""
-import datetime
-
-from pandas import DataFrame
-
-from applications import DeNetMysql, aiditApi
-
-
-class CategoryColdStartTask(object):
-    """
-    品类冷启动
-    """
-    CATEGORY_MAP = {
-        "军事": "20240805154433785506170",
-        "历史": "20240805154359027876170",
-        "娱乐八卦": "20241016121719447880753",
-        "情感生活": "20240731052727708334827",
-        "健康养生": "20240731052921849744134",
-        # "新闻媒体": "20240731052554397387407"
-    }
-    PUBLISHED_STATUS = 2
-    INIT_STATUS = 1
-    BAD_STATUS = 0
-    READ_THRESHOLD = 5000
-    READ_TIMES_THRESHOLD = 1.3
-    LIMIT_TITLE_LENGTH = 15
-
-    def __init__(self, db_client):
-        """
-
-        :param db_client:
-        """
-        self.db_client = db_client
-
-    def get_articles_from_meta_table(self, category):
-        """
-        从长文 meta 库中获取冷启文章
-        :return:
-        """
-        sql = f"""
-        SELECT 
-            out_account_id, article_index, title, link, read_cnt
-        FROM
-            crawler_meta_article
-        WHERE 
-            category = "{category}" and status = '{self.INIT_STATUS}';
-        """
-        article_list = self.db_client.select(sql)
-        article_df = DataFrame(article_list, columns=['gh_id', 'position', 'title', 'link', 'read_cnt'])
-        return article_df
-
-    def change_article_status(self, category):
-        """
-        已经发布到生成计划中的 id,
-        :return:
-        """
-        plan_id = self.CATEGORY_MAP.get(category)
-        if plan_id:
-            sql = f"""
-            SELECT 
-                account.wx_gh,
-                content.title,
-                content.content_link,
-                content.view_count,
-                content.like_count,
-                from_unixtime(cprr.create_timestamp / 1000) AS 抓取时间,
-                from_unixtime(content.publish_timestamp / 1000) AS 发布时间
-            FROM crawler_plan_result_rel cprr
-            JOIN crawler_plan plan ON cprr.plan_id = plan.id
-            JOIN crawler_content content ON cprr.channel_source_id = content.channel_content_id
-            JOIN crawler_account account ON content.channel_account_id = account.channel_account_id
-            WHERE plan_id IN (
-                SELECT
-                    input_source_value
-                FROM
-                    produce_plan_input_source
-                WHERE plan_id = '{plan_id}'
-                );
-            """
-            article_list = self.db_client.select(sql)
-            title_list = [i[1] for i in article_list]
-            # update
-            update_sql = f"""
-            UPDATE 
-                crawler_meta_article
-            SET
-                status = %s
-            WHERE
-                title in %s and status = %s;
-            """
-            self.db_client.update(
-                sql=update_sql,
-                params=(self.PUBLISHED_STATUS, tuple(title_list), self.INIT_STATUS)
-            )
-        else:
-            return
-
-    def filter_articles(self, category, articles_df):
-        """
-        过滤文章
-        :param articles_df:
-        :return:
-        """
-        print(articles_df.size)
-        articles_df['average_read'] = articles_df.groupby(['gh_id', 'position'])['read_cnt'].transform('mean')
-        articles_df['read_times'] = articles_df['read_cnt'] / articles_df['average_read']
-        filter_df = articles_df[
-            (articles_df['read_times'] >= self.READ_TIMES_THRESHOLD)
-            & (articles_df['read_cnt'] >= self.READ_THRESHOLD)
-            & (articles_df['title'].str.len() > 15)
-            & (~articles_df['title'].str.contains('农历'))
-            & (~articles_df['title'].str.contains('太极'))
-            & (~articles_df['title'].str.contains('节'))
-            & (~articles_df['title'].str.contains('早上好'))
-            & (~articles_df['title'].str.contains('赖清德'))
-            & (~articles_df['title'].str.contains('普京'))
-            & (~articles_df['title'].str.contains('俄'))
-            & (~articles_df['title'].str.contains('南海'))
-            & (~articles_df['title'].str.contains('台海'))
-            & (~articles_df['title'].str.contains('解放军'))
-            & (~articles_df['title'].str.contains('蔡英文'))
-            & (~articles_df['title'].str.contains('中国'))
-            ]
-        url_list = filter_df['link'].values.tolist()
-        # title_list = filter_df['title'].values.tolist()
-        # for line in title_list:
-        #     print(line + "\n")
-        aiditApi.auto_create_crawler_task(
-            plan_id=None,
-            plan_name="{}--{}".format(category, datetime.date.today().__str__()),
-            plan_tag="品类冷启动",
-            url_list=url_list
-        )
-
-
-d = DeNetMysql()
-
-c = CategoryColdStartTask(d)
-
-for ca in c.CATEGORY_MAP.keys():
-    all_articles = c.get_articles_from_meta_table(category=ca)
-    c.filter_articles(ca, all_articles)
-

+ 0 - 4
spider/__init__.py

@@ -1,4 +0,0 @@
-"""
-@author: luojunhui
-"""
-from .weixinCategoryCrawler import weixinCategory