ソースを参照

add_recycle_fwh_task

luojunhui 2 ヶ月 前
コミット
896b995eb0

+ 47 - 0
applications/pipeline/recycle_published_articles_pipiline.py

@@ -0,0 +1,47 @@
+# 插入文章数据
+async def save_article_to_recycle_pool(
+    pool,
+    log_client,
+    info_tuple: tuple,
+    content_url: str,
+    account_info: dict,
+    table_name: str = "official_articles_v2",
+):
+    """插入文章,冲突则更新"""
+
+    query = f"""
+        INSERT INTO {table_name}
+        (ghId, accountName, appMsgId, title, Type, createTime, updateTime, Digest,
+         ItemIndex, ContentUrl, SourceUrl, CoverImgUrl, CoverImgUrl_1_1,
+         CoverImgUrl_255_1, ItemShowType, IsOriginal, ShowDesc, ori_content,
+         show_view_count, show_like_count, show_zs_count, show_pay_count,
+         wx_sn, baseInfo, title_md5, status)
+        VALUES
+        (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s,
+         %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
+        ON DUPLICATE KEY UPDATE
+            show_view_count = VALUES(show_view_count),
+            show_like_count = VALUES(show_like_count);
+    """
+
+    try:
+        await pool.async_save(
+            query=query,
+            params=info_tuple,
+            db_name="piaoquan_crawler",
+        )
+        print("insert/update article success")
+
+    except Exception as e:
+        await log_client.log(
+            contents={
+                "function": "save_article_to_recycle_pool",
+                "status": "fail",
+                "message": "insert/update article failed",
+                "data": {
+                    "error": str(e),
+                    "content_link": content_url,
+                    "account_name": account_info["name"],
+                },
+            }
+        )

+ 431 - 0
applications/tasks/data_recycle_tasks/recycle_fwh_group_publish_articles.py

@@ -0,0 +1,431 @@
+import asyncio
+import json
+import time
+import urllib.parse
+from datetime import datetime
+from typing import Optional, List, Dict
+
+from tqdm import tqdm
+
+from applications.api import feishu_robot
+from applications.utils import str_to_md5, days_remaining_in_month
+from applications.crawler.wechat import get_article_detail
+
+
+class RecycleFwhGroupPublishArticlesConst:
+    # 状态Code
+    INIT_STATUS = 0
+    PROCESSING_STATUS = 1
+    SUCCESS_STATUS = 2
+    FAILED_STATUS = 99
+
+    # 群发成功状态
+    PUBLISH_SUCCESS_STATUS = 2
+
+    # 阅读量更新区间
+    STAT_PERIOD = 3 * 24 * 3600
+
+    # 文章违规状态
+    ARTICLE_ILLEGAL_CODE = 25012
+
+    # 未使用的账号
+    NOT_USED_SERVER_ACCOUNT = {"gh_84e744b16b3a", "gh_5855bed97938", "gh_61a72b720de3"}
+
+
+class RecycleFwhGroupPublishArticlesBase(RecycleFwhGroupPublishArticlesConst):
+    def __init__(self, pool, log_client):
+        self.pool = pool
+        self.log_client = log_client
+
+    async def get_server_group_publish_accounts(self) -> List[str]:
+        query = "select gzh_id from article_gzh_developer;"
+        fetch_response = await self.pool.async_fetch(
+            query=query, db_name="piaoquan_crawler"
+        )
+        return [
+            i["gzh_id"]
+            for i in fetch_response
+            if i["gzh_id"] not in self.NOT_USED_SERVER_ACCOUNT
+        ]
+
+    async def get_account_name(self, gh_id: str) -> Optional[str]:
+        query = """select account_name from long_articles_group_send_result where gh_id = %s limit 1;"""
+        fetch_response = await self.pool.async_fetch(query=query, params=(gh_id,))
+        if fetch_response:
+            return fetch_response[0]["account_name"]
+        return None
+
+    # 违规文章报警
+    async def alert_illegal_article(
+        self,
+        gh_id: str,
+        account_name: str,
+        group_id: str,
+        illegal_msg: str,
+        publish_date: str,
+    ):
+        await feishu_robot.bot(
+            title="服务号文章违规告警,请前往微信公众平台处理",
+            detail={
+                "account_name": account_name,
+                "gh_id": gh_id,
+                "group_id": group_id,
+                "illegal_msg": illegal_msg,
+                "publish_date": str(publish_date),
+            },
+            env="server_account_publish_monitor",
+        )
+
+
+class RecordFwhGroupPublishArticles(RecycleFwhGroupPublishArticlesBase):
+    # 获取服务号发文细节
+    async def get_group_published_articles(self):
+        query = """
+            SELECT 
+                id, publish_content_id, gh_id, user_group_id
+            FROM long_articles_group_send_result
+            WHERE status = %s AND recycle_status = %s;
+        """
+        return await self.pool.async_fetch(
+            query=query, params=(self.PUBLISH_SUCCESS_STATUS, self.INIT_STATUS)
+        )
+
+    # 通过回调结果获取分组群发文章的回调信息
+    async def get_article_call_back_from_aigc(
+        self, publish_content_id: str, user_group_id: str
+    ) -> Optional[Dict]:
+        query = """
+            SELECT t1.publish_stage_url
+            FROM publish_content_stage_url t1
+            LEFT JOIN publish_content t2 ON t1.publish_content_id = t2.id
+            WHERE t1.publish_content_id = %s AND t1.user_group_id = %s;
+        """
+        fetch_response = await self.pool.async_fetch(
+            query=query, db_name="aigc", params=(publish_content_id, user_group_id)
+        )
+        if fetch_response:
+            return fetch_response[0]
+        return None
+
+    # 更新文章回收状态
+    async def update_recycle_status(
+        self, record_id: int, ori_status: int, new_status: int
+    ) -> int:
+        query = """
+            UPDATE long_articles_group_send_result
+            SET recycle_status = %s
+            WHERE id = %s AND recycle_status = %s;
+        """
+        return await self.pool.async_save(
+            query=query, params=(new_status, record_id, ori_status)
+        )
+
+    # 为文章写入链接
+    async def set_article_url(self, record_id: int, url: str) -> int:
+        query = """
+            UPDATE long_articles_group_send_result
+            SET url = %s, recycle_status = %s
+            WHERE id = %s and recycle_status = %s;
+        """
+        return await self.pool.async_save(
+            query=query, params=(url, self.SUCCESS_STATUS, record_id, self.INIT_STATUS)
+        )
+
+    async def deal(self):
+        group_published_articles = await self.get_group_published_articles()
+
+        for record in tqdm(group_published_articles):
+            record_id = record["id"]
+            publish_content_id = record["publish_content_id"]
+            user_group_id = record["user_group_id"]
+
+            # lock task
+            acquire_lock = await self.update_recycle_status(
+                record_id, self.INIT_STATUS, self.PROCESSING_STATUS
+            )
+            if not acquire_lock:
+                continue
+
+            # get article link from aigc
+            call_back_response = await self.get_article_call_back_from_aigc(
+                publish_content_id, user_group_id
+            )
+            if call_back_response:
+                article_link = call_back_response["publish_stage_url"]
+                if article_link:
+                    await self.set_article_url(record_id, article_link)
+                else:
+                    # unlock
+                    await self.update_recycle_status(
+                        record_id, self.PROCESSING_STATUS, self.FAILED_STATUS
+                    )
+            else:
+                # unlock
+                await self.update_recycle_status(
+                    record_id, self.PROCESSING_STATUS, self.FAILED_STATUS
+                )
+
+
+class SaveFwhDataToDatabase(RecycleFwhGroupPublishArticlesBase):
+    # 更新阅读量
+    async def update_read_count(self, wx_sn, read_count):
+        query = """
+            UPDATE official_articles_v2
+            SET show_view_count = %s
+            WHERE wx_sn = %s;
+        """
+        return await self.pool.async_save(
+            query=query, db_name="piaoquan_crawler", params=(read_count, wx_sn)
+        )
+
+    # 保存文章数据到数据库
+    async def save_data_to_database(self, article):
+        query = f"""
+            insert into official_articles_v2
+            (ghId, accountName, appMsgId, title, Type, createTime, updateTime, ItemIndex, ContentUrl, show_view_count, 
+             wx_sn, title_md5, article_group, channel_content_id, root_source_id_list, publish_timestamp) 
+            values (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s);      
+        """
+        return await self.pool.async_save(
+            query=query, params=article, db_name="piaoquan_crawler"
+        )
+
+    # 获取需要更新的分组群发文章
+    async def get_stat_published_articles(self, gh_id):
+        earliest_timestamp = int(time.time()) - self.STAT_PERIOD
+        fetch_query = f"""
+            select publish_date, account_name, gh_id, user_group_id, url, publish_timestamp
+            from long_articles_group_send_result
+            where gh_id = %s and recycle_status = %s and create_time > %s;
+        """
+        earliest_time = datetime.fromtimestamp(earliest_timestamp).strftime(
+            "%Y-%m-%d %H:%M:%S"
+        )
+        return await self.pool.async_fetch(
+            query=fetch_query, params=(gh_id, self.SUCCESS_STATUS, earliest_time)
+        )
+
+    # 处理某个账号
+    async def process_each_fwh_account(self, gh_id):
+        stat_articles = await self.get_stat_published_articles(gh_id)
+        if not stat_articles:
+            return
+
+        for article in stat_articles:
+            account_name = article["account_name"]
+            user_group_id = article["user_group_id"]
+            url = article["url"]
+            publish_date = article["publish_date"]
+            # get article detail info with spider
+
+            try:
+                article_detail_info = await get_article_detail(
+                    article_link=url, is_count=True, is_cache=False
+                )
+                response_code = article_detail_info["code"]
+                if response_code == self.ARTICLE_ILLEGAL_CODE:
+                    await self.alert_illegal_article(
+                        account_name=account_name,
+                        gh_id=gh_id,
+                        group_id=user_group_id,
+                        illegal_msg=article_detail_info["msg"],
+                        publish_date=publish_date,
+                    )
+
+                await asyncio.sleep(1)
+                content_url = article_detail_info["data"]["data"]["content_link"]
+                app_msg_id = content_url.split("mid=")[-1].split("&")[0]
+                wx_sn = content_url.split("sn=")[-1]
+                publish_timestamp = int(
+                    article_detail_info["data"]["data"]["publish_timestamp"] / 1000
+                )
+                create_time = publish_timestamp
+                update_time = publish_timestamp
+                item_index = article_detail_info["data"]["data"]["item_index"]
+                show_view_count = article_detail_info["data"]["data"]["view_count"]
+                title = article_detail_info["data"]["data"]["title"]
+                title_md5 = str_to_md5(title)
+                channel_content_id = article_detail_info["data"]["data"][
+                    "channel_content_id"
+                ]
+                mini_program_info = article_detail_info["data"]["data"]["mini_program"]
+                root_source_id_list = [
+                    urllib.parse.parse_qs(urllib.parse.unquote(i["path"]))[
+                        "rootSourceId"
+                    ][0]
+                    for i in mini_program_info["item_list"]
+                ]
+                root_source_id_list = json.dumps(root_source_id_list)
+                try:
+                    await self.save_data_to_database(
+                        article=(
+                            gh_id,
+                            account_name,
+                            app_msg_id,
+                            title,
+                            "9",
+                            create_time,
+                            update_time,
+                            item_index,
+                            url,
+                            show_view_count,
+                            wx_sn,
+                            title_md5,
+                            user_group_id,
+                            channel_content_id,
+                            root_source_id_list,
+                            publish_timestamp,
+                        )
+                    )
+                except Exception as e:
+                    await self.update_read_count(
+                        wx_sn=wx_sn, read_count=show_view_count
+                    )
+            except Exception as e:
+                print(f"article {url} is not available, skip it")
+                print(e)
+
+    # deal
+    async def deal(self):
+        account_id_list = await self.get_server_group_publish_accounts()
+        if not account_id_list:
+            return
+
+        for account_id in account_id_list:
+            await self.process_each_fwh_account(gh_id=account_id)
+
+
+class FwhGroupPublishMonitor(RecycleFwhGroupPublishArticlesBase):
+    # 获取指定日期,该账号下所有群发任务的平均粉丝数
+    async def get_sent_fans(self, date_string: str, gh_id: str) -> int:
+        query = """
+            select push_id, avg(sent_count) as 'total_sent_fans'
+            from long_articles_group_send_result
+            where publish_date = %s and gh_id = %s and status = %s
+            group by push_id;
+        """
+        fetch_response = await self.pool.async_fetch(
+            query=query,
+            params=(date_string, gh_id, self.PUBLISH_SUCCESS_STATUS),
+        )
+        fans_list = [i["total_sent_fans"] for i in fetch_response]
+        return sum(fans_list) if fans_list else 0
+
+    # 获取指定日期,该账号下所有群发任务的剩余粉丝数
+    async def get_remain_fans(self, gh_id: str):
+        query = """
+            select count(1) as 'remain_fans'
+            from article_user_group
+            where gzh_id = %s and is_delete = %s and remaining_count > %s;
+        """
+        fetch_response = await self.pool.async_fetch(query=query, params=(gh_id, 0, 0))
+        response = fetch_response[0]["remain_fans"]
+        return response if response else 0
+
+    async def get_remain_publish_times(self, gh_id: str):
+        """
+        获取剩余可发布次数
+        """
+        fetch_query = """
+            select sum(remaining_count) as 'remain_publish_times'
+            from article_user_group
+            where gzh_id = %s and is_delete = %s;
+        """
+        fetch_response = await self.pool.async_fetch(
+            query=fetch_query, db_name="piaoquan_crawler", params=(gh_id, 0)
+        )
+        response = fetch_response[0]["remain_publish_times"]
+        return response if response else 0
+
+    async def get_remain_fans_and_publish_times(self, gh_id: str, date_string: str):
+        """
+        获取发布前,该账号剩余的发布次数和粉丝数
+        """
+        fetch_query = """
+            select fans_before_publish, publish_times_before_publish
+            from fwh_daily_publish_detail
+            where gh_id = %s and publish_date = %s;
+        """
+        fetch_response = await self.pool.async_fetch(
+            query=fetch_query, db_name="piaoquan_crawler", params=(gh_id, date_string)
+        )
+        return fetch_response[0] if fetch_response else None
+
+    async def deal(self, date_string: str = datetime.today().strftime("%Y-%m-%d")):
+        """
+        monitor the publish record
+        """
+        now = datetime.now()
+        if now.hour > 12:
+            return
+
+        gh_id_list = await self.get_server_group_publish_accounts()
+        if not gh_id_list:
+            return
+
+        # get rest publish days
+        remain_days = days_remaining_in_month()
+
+        # get table columns
+        columns = [
+            feishu_robot.create_feishu_columns_sheet(
+                sheet_type="plain_text",
+                sheet_name="account_name",
+                display_name="公众号名称",
+            ),
+            feishu_robot.create_feishu_columns_sheet(
+                sheet_type="number", sheet_name="sent_fans", display_name="今日发布次数"
+            ),
+            feishu_robot.create_feishu_columns_sheet(
+                sheet_type="number",
+                sheet_name="rest_publish_times",
+                display_name="发文前剩余发文次数",
+            ),
+            feishu_robot.create_feishu_columns_sheet(
+                sheet_type="number",
+                sheet_name="rest_publish_fans",
+                display_name="发文前剩余发文粉丝数",
+            ),
+            feishu_robot.create_feishu_columns_sheet(
+                sheet_type="number",
+                sheet_name="remain_days",
+                display_name="本月剩余天数",
+            ),
+        ]
+
+        monitor_table = []
+
+        for gh_id in gh_id_list:
+            account_name = await self.get_account_name(gh_id)
+
+            sent_fans = await self.get_sent_fans(date_string, gh_id)
+
+            detail = await self.get_remain_fans_and_publish_times(gh_id, date_string)
+            if not detail:
+                await feishu_robot.bot(
+                    title=f"{date_string}服务号发文详情",
+                    detail=f"{gh_id}--{account_name} 没有发布详情",
+                    env="server_account_publish_monitor",
+                )
+                continue
+
+            remain_fans, remain_publish_times = (
+                detail["fans_before_publish"],
+                detail["publish_times_before_publish"],
+            )
+            temp = {
+                "account_name": account_name,
+                "rest_publish_times": int(remain_publish_times),
+                "rest_publish_fans": int(remain_fans),
+                "remain_days": int(remain_days),
+                "sent_fans": int(sent_fans),
+            }
+            monitor_table.append(temp)
+
+        await feishu_robot.bot(
+            title=f"{date_string}服务号发文详情",
+            detail={"columns": columns, "rows": monitor_table},
+            table=True,
+            mention=False,
+            env="server_account_publish_monitor",
+        )