luojunhui 3 일 전
부모
커밋
99eefb2105

+ 1 - 0
applications/pipeline/__init__.py

@@ -0,0 +1 @@
+from .data_recycle_pipeline import insert_article_into_recycle_pool

+ 5 - 0
applications/pipeline/crawler_pipeline.py

@@ -0,0 +1,5 @@
+
+
+class CrawlerPipeline:
+
+    pass

+ 114 - 0
applications/pipeline/data_recycle_pipeline.py

@@ -0,0 +1,114 @@
+
+import json
+from typing import List, Dict
+
+from applications.utils import show_desc_to_sta, str_to_md5
+
+
+async def insert_article_into_recycle_pool(pool, log_client, msg_list: List[Dict], account_info: Dict):
+        """insert article into recycle pool"""
+        table_name = 'official_articles_v2'
+        for info in msg_list:
+            base_info = info.get("BaseInfo", {})
+            app_msg_id = info.get("AppMsg", {}).get("BaseInfo", {}).get("AppMsgId", None)
+            create_timestamp = info.get("AppMsg", {}).get("BaseInfo", {}).get("CreateTime", None)
+            update_timestamp = info.get("AppMsg", {}).get("BaseInfo", {}).get("UpdateTime", None)
+            publish_type = info.get("AppMsg", {}).get("BaseInfo", {}).get("Type", None)
+            detail_article_list = info.get("AppMsg", {}).get("DetailInfo", [])
+            if detail_article_list:
+                for article in detail_article_list:
+                    title = article.get("Title", None)
+                    digest = article.get("Digest", None)
+                    item_index = article.get("ItemIndex", None)
+                    content_url = article.get("ContentUrl", None)
+                    source_url = article.get("SourceUrl", None)
+                    cover_img_url = article.get("CoverImgUrl", None)
+                    cover_img_url_1_1 = article.get("CoverImgUrl_1_1", None)
+                    cover_img_url_235_1 = article.get("CoverImgUrl_235_1", None)
+                    item_show_type = article.get("ItemShowType", None)
+                    is_original = article.get("IsOriginal", None)
+                    show_desc = article.get("ShowDesc", None)
+                    show_stat = show_desc_to_sta(show_desc)
+                    ori_content = article.get("ori_content", None)
+                    show_view_count = show_stat.get("show_view_count", 0)
+                    show_like_count = show_stat.get("show_like_count", 0)
+                    show_zs_count = show_stat.get("show_zs_count", 0)
+                    show_pay_count = show_stat.get("show_pay_count", 0)
+                    wx_sn = content_url.split("&sn=")[1].split("&")[0] if content_url else None
+                    status = account_info['using_status']
+                    info_tuple = (
+                        account_info['gh_id'],
+                        account_info['account_name'],
+                        app_msg_id,
+                        title,
+                        publish_type,
+                        create_timestamp,
+                        update_timestamp,
+                        digest,
+                        item_index,
+                        content_url,
+                        source_url,
+                        cover_img_url,
+                        cover_img_url_1_1,
+                        cover_img_url_235_1,
+                        item_show_type,
+                        is_original,
+                        show_desc,
+                        ori_content,
+                        show_view_count,
+                        show_like_count,
+                        show_zs_count,
+                        show_pay_count,
+                        wx_sn,
+                        json.dumps(base_info, ensure_ascii=False),
+                        str_to_md5(title),
+                        status
+                    )
+                    try:
+                        insert_query = f"""
+                            insert into {table_name}
+                            (ghId, accountName, appMsgId, title, Type, createTime, updateTime, Digest, ItemIndex, ContentUrl, SourceUrl, CoverImgUrl, CoverImgUrl_1_1, CoverImgUrl_255_1, ItemShowType, IsOriginal, ShowDesc, ori_content, show_view_count, show_like_count, show_zs_count, show_pay_count, wx_sn, baseInfo, title_md5, status)
+                            values
+                            (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s);
+                            """
+                        await pool.async_save(query=insert_query, params=info_tuple, db_name="piaoquan_crawler")
+                        await log_client.log(
+                            contents={
+                                "function": "insert_article_into_recycle_pool",
+                                "status": "success",
+                                "data": info_tuple
+                            }
+                        )
+
+                    except Exception as e:
+                        try:
+                            update_sql = f"""update {table_name} set show_view_count = %s, show_like_count=%s where wx_sn = %s;"""
+                            await pool.async_save(query=update_sql, params=(show_view_count, show_like_count, wx_sn), db_name="piaoquan_crawler")
+                        except Exception as e:
+                            await log_client.log(
+                                contents={
+                                    "function": "insert_article_into_recycle_pool",
+                                    "status": "fail",
+                                    "message": "更新文章失败",
+                                    "data": {
+                                        "error": str(e),
+                                        "content_link": content_url,
+                                        "account_name": account_info["account_name"]
+                                    }
+                                }
+                            )
+                            continue
+
+            else:
+                await log_client.log(
+                    contents={
+                        "function": "insert_article_into_recycle_pool",
+                        "status": "fail",
+                        "message": "account has no articles",
+                        "data": {
+                            "account_name": account_info["account_name"]
+                        }
+                    }
+                )
+
+

+ 0 - 0
applications/tasks/data_recycle_tasks/__init__.py


+ 124 - 0
applications/tasks/data_recycle_tasks/recycle_daily_publish_articles.py

@@ -0,0 +1,124 @@
+import time
+from unittest import case
+
+from tqdm import tqdm
+
+from applications.api import feishu_robot
+from applications.crawler.wechat import get_article_list_from_account
+from applications.pipeline import insert_article_into_recycle_pool
+
+class Const:
+    # 订阅号
+    SUBSCRIBE_TYPE_SET = {0, 1}
+
+    NEW_ACCOUNT_CRAWL_PERIOD = 60 * 60 * 24 * 30
+
+class RecycleDailyPublishArticlesTask(Const):
+    def __init__(self, pool, log_client):
+        self.pool = pool
+        self.log_client = log_client
+
+    async def get_publish_accounts(self):
+        """
+        get all publish accounts
+        """
+        query = f"""
+            select distinct t3.name, t3.gh_id, t3.follower_count, t3.create_timestamp as account_init_timestamp,
+                t4.service_type_info as account_type, t4.verify_type_info as account_auth, t3.id as account_id,
+                group_concat(distinct t5.remark) as account_remark
+            from
+                publish_plan t1
+                join publish_plan_account t2 on t1.id = t2.plan_id
+                join publish_account t3 on t2.account_id = t3.id
+                left join publish_account_wx_type t4 on t3.id = t4.account_id
+                left join publish_account_remark t5 on t3.id = t5.publish_account_id
+            where t1.plan_status = 1 and t1.content_modal = 3 and t3.channel = 5
+            group by t3.id;
+        """
+        account_list = await self.pool.async_fetch(query, db_name='aigc')
+        return [i for i in account_list if '自动回复' not in str(i['account_remark'])]
+
+    async def get_account_status(self):
+        """get account experiment status"""
+        sql = f"""  
+            select t1.account_id, t2.status
+            from wx_statistics_group_source_account t1
+            join wx_statistics_group_source t2 on t1.group_source_name = t2.account_source_name;
+        """
+        account_status_list = await self.pool.async_fetch(sql, db_name='aigc')
+        account_status_dict = {account['account_id']: account['status'] for account in account_status_list}
+        return account_status_dict
+
+    async def recycle_single_account(self, account):
+        """recycle single account"""
+        query = f"""
+            select max(publish_timestamp) as publish_timestamp from official_articles_v2 where ghId = %s;
+        """
+        response = await self.pool.async_fetch(query, params=(account['gh_id'],), db_name='aigc')
+        if response:
+            max_publish_timestamp = response[0]['publish_timestamp']
+        else:
+            max_publish_timestamp = int(time.time()) - self.NEW_ACCOUNT_CRAWL_PERIOD
+
+        cursor = None
+        while True:
+            response = get_article_list_from_account(account_id=account['account_id'], cursor=cursor)
+            response_code = response['code']
+            match response_code:
+                case 25013:
+                    await feishu_robot.bot(
+                        title="发布账号封禁",
+                        detail={
+                            "账号名称": account['name'],
+                            "账号id": account['account_id']
+                        },
+                    )
+                    return
+                case 0:
+                    msg_list = response.get("data", {}).get("data", [])
+                    if not msg_list:
+                        return
+
+                    await insert_article_into_recycle_pool(self.pool, self.log_client, msg_list, account)
+
+                    # check last article
+                    last_article = msg_list[-1]
+                    last_publish_timestamp = last_article['AppMsg']['BaseInfo']['UpdateTime']
+                    if last_publish_timestamp <= max_publish_timestamp:
+                        return
+
+                    cursor = response['data'].get('next_cursor')
+                    if not cursor:
+                        return
+                case _:
+                    return
+
+    async def deal(self):
+        """recycle all publish accounts articles"""
+        binding_accounts = await self.get_publish_accounts()
+        account_status = await self.get_account_status()
+        account_list = [
+            {
+                **item,
+                'using_status': 0 if account_status.get(item['account_id']) == '实验' else 1
+            }
+            for item in binding_accounts
+        ]
+
+        # 订阅号
+        subscription_accounts = [i for i in account_list if i['account_type'] in self.SUBSCRIBE_TYPE_SET]
+
+        for account in tqdm(subscription_accounts, desc="recycle each account"):
+            try:
+                await self.recycle_single_account(account)
+
+            except Exception as e:
+                print("recycle account error:", e)
+
+
+
+
+
+
+
+