Pārlūkot izejas kodu

2025-06-13服务号数据统计

luojunhui 1 mēnesi atpakaļ
vecāks
revīzija
42db8fd011
1 mainītis faili ar 118 papildinājumiem un 56 dzēšanām
  1. 118 56
      tasks/data_tasks/fwh_data_recycle.py

+ 118 - 56
tasks/data_tasks/fwh_data_recycle.py

@@ -31,6 +31,15 @@ class FwhDataRecycle:
         self.piaoquan_client = DatabaseConnector(piaoquan_crawler_config)
         self.piaoquan_client.connect()
 
+    def get_group_server_accounts(self):
+        fetch_query = f"""
+            select gzh_id from article_gzh_developer;
+        """
+        fetch_response = self.piaoquan_client.fetch(fetch_query, cursor_type=DictCursor)
+        gh_id_list = [i["gzh_id"] for i in fetch_response]
+        # gh_id_list = ['gh_5e543853d8f0']
+        return gh_id_list
+
 
 class FwhGroupPublishRecordManager(FwhDataRecycle):
 
@@ -49,7 +58,7 @@ class FwhGroupPublishRecordManager(FwhDataRecycle):
 
     def get_article_url_from_aigc_system(self, publish_content_id, user_group_id):
         sql = f"""
-            select t1.publish_stage_url, t2.publish_timestamp
+            select t1.publish_stage_url
             from publish_content_stage_url t1
             left join publish_content t2 on t1.publish_content_id = t2.id
             where t1.publish_content_id = %s and t1.user_group_id = %s;
@@ -74,15 +83,20 @@ class FwhGroupPublishRecordManager(FwhDataRecycle):
             update_query, (new_status, record_id, ori_status)
         )
 
-    def set_article_url(self, record_id, article_url, publish_timestamp):
+    def set_article_url(self, record_id, article_url):
         update_query = f"""
             update long_articles_group_send_result
-            set url = %s, publish_timestamp = %s, recycle_status = %s
+            set url = %s, recycle_status = %s
             where id = %s and recycle_status = %s;
         """
         return self.long_articles_client.save(
             query=update_query,
-            params=(article_url, publish_timestamp, self.RECYCLE_SUCCESS_STATUS, record_id, self.RECYCLE_PROCESSING_STATUS)
+            params=(
+                article_url,
+                self.RECYCLE_SUCCESS_STATUS,
+                record_id,
+                self.RECYCLE_PROCESSING_STATUS,
+            ),
         )
 
     def deal(self):
@@ -92,21 +106,30 @@ class FwhGroupPublishRecordManager(FwhDataRecycle):
             record_id = publish_record["id"]
             group_id = publish_record["user_group_id"]
             # lock
-            self.update_recycle_status(record_id, self.RECYCLE_INIT_STATUS, self.RECYCLE_PROCESSING_STATUS)
+            self.update_recycle_status(
+                record_id, self.RECYCLE_INIT_STATUS, self.RECYCLE_PROCESSING_STATUS
+            )
 
-            publish_call_back_info = self.get_article_url_from_aigc_system(publish_content_id, group_id)
+            publish_call_back_info = self.get_article_url_from_aigc_system(
+                publish_content_id, group_id
+            )
             if publish_call_back_info:
                 article_url = publish_call_back_info["publish_stage_url"]
-                publish_timestamp = int(publish_call_back_info["publish_timestamp"] / 1000)
-                if article_url and publish_timestamp:
+                if article_url:
                     # set record and unlock
-                    self.set_article_url(record_id, article_url, publish_timestamp)
+                    self.set_article_url(record_id, article_url)
                 else:
                     # unlock
-                    self.update_recycle_status(record_id, self.RECYCLE_PROCESSING_STATUS, self.RECYCLE_INIT_STATUS)
+                    self.update_recycle_status(
+                        record_id,
+                        self.RECYCLE_PROCESSING_STATUS,
+                        self.RECYCLE_INIT_STATUS,
+                    )
             else:
                 # unlock
-                self.update_recycle_status(record_id, self.RECYCLE_PROCESSING_STATUS, self.RECYCLE_INIT_STATUS)
+                self.update_recycle_status(
+                    record_id, self.RECYCLE_PROCESSING_STATUS, self.RECYCLE_INIT_STATUS
+                )
 
 
 class SaveFwhDataToDatabase(FwhDataRecycle):
@@ -134,60 +157,78 @@ class SaveFwhDataToDatabase(FwhDataRecycle):
         """
         return self.piaoquan_client.save(insert_query, article)
 
-    def get_group_server_accounts(self):
-        fetch_query = f"""
-            select gzh_id from article_gzh_developer;
-        """
-        fetch_response = self.piaoquan_client.fetch(fetch_query, cursor_type=DictCursor)
-        gh_id_list = [i['gzh_id'] for i in fetch_response]
-        gh_id_list = ['gh_5e543853d8f0']
-        return gh_id_list
-
     def get_stat_published_articles(self, gh_id):
         earliest_timestamp = int(time.time()) - self.STAT_PERIOD
         fetch_query = f"""
             select publish_date, account_name, gh_id, user_group_id, url, publish_timestamp
             from long_articles_group_send_result
-            where gh_id = %s and recycle_status = %s and publish_timestamp > %s;
+            where gh_id = %s and recycle_status = %s and create_time > %s;
         """
-        return self.long_articles_client.fetch(fetch_query, DictCursor,(gh_id, self.RECYCLE_SUCCESS_STATUS, earliest_timestamp))
+        return self.long_articles_client.fetch(
+            fetch_query,
+            DictCursor,
+            (gh_id, self.RECYCLE_SUCCESS_STATUS, earliest_timestamp),
+        )
 
     def process_each_account_data(self, account_published_article_list):
         if not account_published_article_list:
             return
 
         for article in account_published_article_list:
-            account_name = article['account_name']
-            gh_id = article['gh_id']
-            user_group_id = article['user_group_id']
-            url = article['url']
-            publish_timestamp = article['publish_timestamp']
+            account_name = article["account_name"]
+            gh_id = article["gh_id"]
+            user_group_id = article["user_group_id"]
+            url = article["url"]
             # get article detail info with spider
 
             try:
-                article_detail_info = get_article_detail(url, is_count=True, is_cache=False)
-                time.sleep(3)
+                article_detail_info = get_article_detail(
+                    url, is_count=True, is_cache=False
+                )
+                time.sleep(1)
                 content_url = article_detail_info["data"]["data"]["content_link"]
                 app_msg_id = content_url.split("mid=")[-1].split("&")[0]
                 wx_sn = content_url.split("sn=")[-1]
+                publish_timestamp = int(
+                    article_detail_info["data"]["data"]["publish_timestamp"] / 1000
+                )
                 create_time = publish_timestamp
                 update_time = publish_timestamp
                 item_index = article_detail_info["data"]["data"]["item_index"]
                 show_view_count = article_detail_info["data"]["data"]["view_count"]
                 title = article_detail_info["data"]["data"]["title"]
                 title_md5 = str_to_md5(title)
-                channel_content_id = article_detail_info["data"]["data"]["channel_content_id"]
+                channel_content_id = article_detail_info["data"]["data"][
+                    "channel_content_id"
+                ]
                 mini_program_info = article_detail_info["data"]["data"]["mini_program"]
                 root_source_id_list = [
-                    urllib.parse.parse_qs(
-                        urllib.parse.unquote(i['path'])
-                    )['rootSourceId'][0]
+                    urllib.parse.parse_qs(urllib.parse.unquote(i["path"]))[
+                        "rootSourceId"
+                    ][0]
                     for i in mini_program_info
                 ]
                 root_source_id_list = json.dumps(root_source_id_list)
                 try:
                     self.save_data_to_database(
-                        article=(gh_id, account_name, app_msg_id, title, '9', create_time, update_time, item_index, content_url, show_view_count, wx_sn, title_md5, user_group_id, channel_content_id, root_source_id_list, publish_timestamp)
+                        article=(
+                            gh_id,
+                            account_name,
+                            app_msg_id,
+                            title,
+                            "9",
+                            create_time,
+                            update_time,
+                            item_index,
+                            content_url,
+                            show_view_count,
+                            wx_sn,
+                            title_md5,
+                            user_group_id,
+                            channel_content_id,
+                            root_source_id_list,
+                            publish_timestamp,
+                        )
                     )
                 except Exception as e:
                     self.update_article_read_cnt(wx_sn, show_view_count)
@@ -198,19 +239,22 @@ class SaveFwhDataToDatabase(FwhDataRecycle):
     def deal(self):
         account_id_list = self.get_group_server_accounts()
         for account_id in account_id_list:
-            publish_articles = tqdm(self.get_stat_published_articles(account_id), desc=f"<crawling> {account_id}")
+            publish_articles = tqdm(
+                self.get_stat_published_articles(account_id),
+                desc=f"<crawling> {account_id}",
+            )
             self.process_each_account_data(publish_articles)
 
 
 class FwhDataExportTemp(FwhDataRecycle):
 
-    def get_publish_articles(self):
+    def get_publish_articles(self, gh_id):
         sql = f"""
             -- select accountName, title, article_group, ItemIndex, show_view_count, from_unixtime(createTime, '%Y-%m-%d'), root_source_id_list
-            select accountName, title, ItemIndex, from_unixtime(createTime, '%Y-%m-%d'), sum(show_view_count), root_source_id_list
+            select accountName, ContentUrl, title, ItemIndex, from_unixtime(createTime, '%Y-%m-%d'), sum(show_view_count), group_concat(article_group) as 'group', root_source_id_list
             from official_articles_v2
-            where accountName = '票圈精彩'
-            and from_unixtime(publish_timestamp) between '2025-06-07' and '2025-06-10'
+            where from_unixtime(publish_timestamp) between '2025-06-09' and '2025-06-13'
+            and ghId = '{gh_id}' and article_group is not null
             group by accountName, title, ItemIndex;
         """
         return self.piaoquan_client.fetch(query=sql, cursor_type=DictCursor)
@@ -226,25 +270,43 @@ class FwhDataExportTemp(FwhDataRecycle):
             where root_source_id
             in %s;
         """
-        return self.long_articles_client.fetch(query=query, cursor_type=DictCursor, params=(root_source_id_tuple,))
+        return self.long_articles_client.fetch(
+            query=query, cursor_type=DictCursor, params=(root_source_id_tuple,)
+        )
+
+    def get_fans_num(self, gh_id, group_id_tuple):
+        sql = f"""
+            select count(1) as 'fans_count'
+            from article_user_group
+            where gzh_id = %s and user_group_id in %s
+            and is_delete = 0;
+        """
+        return self.piaoquan_client.fetch(
+            query=sql, cursor_type=DictCursor, params=(gh_id, group_id_tuple)
+        )
 
     def deal(self):
         import pandas as pd
-        publish_articles = self.get_publish_articles()
+
+        gh_id_list = self.get_group_server_accounts()
         L = []
-        for article in publish_articles:
-            root_source_id_list = article['root_source_id_list']
-            fission_info = self.get_fission_info(root_source_id_list)
-            article['uv'] = fission_info[0]['uv']
-            article['first_uv'] = fission_info[0]['first_uv']
-            article['split_uv'] = fission_info[0]['split_uv']
-            article['T+0_fission'] = fission_info[0]['T+0_fission']
-            L.append(article)
+        for gh_id in gh_id_list:
+            publish_articles = self.get_publish_articles(gh_id)
+            for article in publish_articles:
+                try:
+                    group_id_tuple = tuple(article["group"].split(","))
+                    fans_count = self.get_fans_num(gh_id, group_id_tuple)[0][
+                        "fans_count"
+                    ]
+                    root_source_id_list = article["root_source_id_list"]
+                    fission_info = self.get_fission_info(root_source_id_list)
+                    article["uv"] = fission_info[0]["uv"]
+                    article["first_uv"] = fission_info[0]["first_uv"]
+                    article["split_uv"] = fission_info[0]["split_uv"]
+                    article["T+0_fission"] = fission_info[0]["T+0_fission"]
+                    article["fans_count"] = fans_count
+                    L.append(article)
+                except Exception as e:
+                    print(f"article {article['ContentUrl']} is not available, skip it")
         df = pd.DataFrame(L)
-        df.to_csv('temp.csv', index=False)
-
-
-if __name__ == '__main__':
-    FwhGroupPublishRecordManager().deal()
-    SaveFwhDataToDatabase().deal()
-    # FwhDataExportTemp().deal()
+        df.to_csv("temp2.csv", index=False)