Ver Fonte

更新小程序任务优化

luojunhui há 2 meses atrás
pai
commit
39315b7ed0

+ 160 - 0
applications/tasks/data_recycle_tasks/recycle_root_source_id_detail.py

@@ -0,0 +1,160 @@
+import json
+import traceback
+
+from datetime import datetime, timedelta
+from tqdm.asyncio import tqdm
+
+from applications.crawler.wechat import get_article_detail
+from applications.utils import get_beijing_date, handle_spider_exception, transform_to_beijing_date, extract_root_source_id
+
+class Const:
+    ARTICLE_SUCCESS_CODE = 0
+    # 记录默认状态
+    DEFAULT_STATUS = 0
+    # 请求接口失败状态
+    REQUEST_FAIL_STATUS = -1
+    # 文章被删除状态
+    DELETE_STATUS = -2
+    # 未知原因无信息返回状态
+    UNKNOWN_STATUS = -3
+    # 文章违规状态
+    ILLEGAL_STATUS = -4
+
+
+class RecycleRootSourceIdDetail(Const):
+    def __init__(self, pool, log_client, trace_id, run_date):
+        self.pool = pool
+        self.log_client = log_client
+        self.trace_id = trace_id
+        self.run_date = run_date
+
+        if not self.run_date:
+            self.run_date = get_beijing_date()
+
+    async def get_publish_articles_last_day(self):
+        """获取前一天的所有发文"""
+        query = """
+             SELECT ContentUrl, wx_sn, publish_timestamp, accountName, title, root_source_id_list
+             FROM official_articles_v2
+             WHERE FROM_UNIXTIME(publish_timestamp)
+             BETWEEN DATE_SUB(%s, INTERVAL 1 DAY) AND DATE_SUB(%s, INTERVAL 1 SECOND);
+        """
+        article_list = await self.pool.async_fetch(
+            query=query,
+            db_name="piaoquan_crawler",
+            params=(self.run_date, self.run_date),
+        )
+        return article_list
+
+    async def get_mini_program_info_by_root_source_id(self, root_source_id_list):
+        query = """
+            select video_id, root_source_id 
+            from long_articles_root_source_id
+            where root_source_id in %s;
+        """
+        return await self.pool.async_fetch(query=query, params=(tuple(root_source_id_list),))
+
+    async def get_article_mini_program_detail(self, url, root_source_id_list):
+        if not root_source_id_list:
+            try:
+                article_detail = await get_article_detail(url)
+                response_code = article_detail["code"]
+                if response_code == self.ARTICLE_SUCCESS_CODE:
+                    mini_info = article_detail["data"]["data"]["mini_program"]
+                    return mini_info
+                else:
+                    return []
+            except Exception as e:
+                await handle_spider_exception(
+                    log_client=self.log_client,
+                    error=e,
+                    traceback=traceback.format_exc(),
+                    trace_id=self.trace_id,
+                    task_name=self.__class__.__name__
+                )
+                return []
+        else:
+            mini_program_info = await self.get_mini_program_info_by_root_source_id(root_source_id_list)
+            if mini_program_info:
+                return [
+                    {
+                        "app_id": "wx89e7eb06478361d7",
+                        "avatar": "https://rescdn.yishihui.com/0temp/logo.png",
+                        "image_url": "",
+                        "nike_name": "票圈 l 3亿人喜欢的视频平台",
+                        "root_source_id": item["root_source_id"],
+                        "video_id": item["video_id"],
+                        "service_type": "0",
+                        "title": "",
+                        "type": "card",
+                    } for item in mini_program_info
+                ]
+            else:
+                return []
+
+    async def record_single_article(self, article):
+        url = article["ContentUrl"]
+        wx_sn = article["wx_sn"].decode("utf-8")
+        publish_timestamp = article["publish_timestamp"]
+        root_source_id_list = json.loads(article["root_source_id_list"]) if article["root_source_id_list"] else []
+
+        # get article mini program info
+        article_mini_program_detail = await self.get_article_mini_program_detail(url, root_source_id_list)
+        if not article_mini_program_detail:
+            return {}
+        else:
+            try:
+                publish_date = transform_to_beijing_date(publish_timestamp)
+                # generate T+0, T+1, T+2 date string
+                recall_dt_str_list = [
+                    (publish_date + timedelta(days=i)).strftime("%Y-%m-%d")
+                    for i in range(3)
+                ]
+
+                for date_str in recall_dt_str_list:
+                    for video_index, mini_item in enumerate(
+                        article_mini_program_detail, 1
+                    ):
+                        image_url = mini_item["image_url"]
+                        nick_name = mini_item["nike_name"]
+                        # extract video id and root_source_id
+                        if mini_item.get("root_source_id") and mini_item.get(
+                            "video_id"
+                        ):
+                            root_source_id = mini_item["root_source_id"]
+                            video_id = mini_item["video_id"]
+                        else:
+                            id_info = extract_root_source_id(mini_item["path"])
+                            root_source_id = id_info["root_source_id"]
+                            video_id = id_info["video_id"]
+                        kimi_title = mini_item["title"]
+                        # self.insert_each_root_source_id(
+                        #     wx_sn=wx_sn,
+                        #     mini_title=kimi_title,
+                        #     mini_name=nick_name,
+                        #     cover_url=image_url,
+                        #     video_index=video_index,
+                        #     root_source_id=root_source_id,
+                        #     video_id=video_id,
+                        #     publish_dt=publish_date.strftime("%Y-%m-%d"),
+                        #     recall_dt=date_str,
+                        # )
+                return {}
+            except Exception as e:
+                print(e)
+                error_msg = traceback.format_exc()
+                return article
+
+
+    async def deal(self):
+        """deal function"""
+        # step 1, record articles to detail table
+        publish_articles_list = await self.get_publish_articles_last_day()
+        for article in tqdm(publish_articles_list, desc="更新文章"):
+            await self.record_single_article(article)
+
+        # step2, update root_source_id detail info
+
+
+
+

+ 3 - 0
applications/utils/__init__.py

@@ -24,4 +24,7 @@ from .item import CrawlerMetaAccount
 # mysql utils
 from .async_mysql_utils import *
 
+# exception
+from .exceptions import *
+
 task_schedule_response = TaskScheduleResponse()

+ 8 - 0
applications/utils/common.py

@@ -250,5 +250,13 @@ def get_task_chinese_name(data):
         return task_name_chinese
 
 
+def get_beijing_date():
+    return (datetime.utcnow() + timedelta(hours=8)).strftime("%Y-%m-%d")
+
+def transform_to_beijing_date(publish_timestamp):
+    utc_time = datetime.utcfromtimestamp(publish_timestamp)
+    # 添加8小时偏移
+    utc_plus_8_time = utc_time + timedelta(hours=8)
+    return utc_plus_8_time.strftime("%Y-%m-%d")
 
 

+ 20 - 0
applications/utils/exceptions.py

@@ -0,0 +1,20 @@
+"""
+Exception Handle Functions
+"""
+
+async def handle_spider_exception(log_client, error, traceback, trace_id, task_name):
+    await log_client.log(
+        contexts={
+            "data": {
+                "error": error,
+                "traceback": traceback,
+            },
+            "trace_id": trace_id,
+            "message": "爬虫接口请求失败",
+            "task": task_name
+        }
+    )
+
+
+
+