luojunhui преди 4 месеца
родител
ревизия
513d37dc0f
променени са 4 файла, в които са добавени 231 реда и са изтрити 54 реда
  1. 0 1
      cold_start/crawler/baidu/__init__.py
  2. 0 0
      tasks/crawler_tasks/crawler_video/crawler_baidu_videos.py
  3. 174 0
      tasks/data_tasks/fwh_data_recycle.py
  4. 57 53
      updatePublishedMsgDaily.py

+ 0 - 1
cold_start/crawler/baidu/__init__.py

@@ -1,3 +1,2 @@
-from .video_crawler import BaiduVideoCrawler
 from .spider import haokan_search_videos
 from .spider import haokan_fetch_video_detail

+ 0 - 0
cold_start/crawler/baidu/video_crawler.py → tasks/crawler_tasks/crawler_video/crawler_baidu_videos.py


+ 174 - 0
tasks/data_tasks/fwh_data_recycle.py

@@ -0,0 +1,174 @@
+import time
+from datetime import datetime
+
+from pymysql.cursors import DictCursor
+
+from applications.db import DatabaseConnector
+from cold_start.crawler.wechat import get_article_detail
+from config import denet_config, long_articles_config, piaoquan_crawler_config
+
+
+class FwhDataRecycle:
+    RECYCLE_INIT_STATUS = 0
+    RECYCLE_PROCESSING_STATUS = 1
+    RECYCLE_SUCCESS_STATUS = 2
+    RECYCLE_FAILED_STATUS = 99
+
+    PUBLISH_SUCCESS_STATUS = 2
+
+    STAT_PERIOD = 3 * 24 * 3600
+
+    def __init__(self):
+        self.denet_client = DatabaseConnector(denet_config)
+        self.denet_client.connect()
+
+        self.long_articles_client = DatabaseConnector(long_articles_config)
+        self.long_articles_client.connect()
+
+        self.piaoquan_client = DatabaseConnector(piaoquan_crawler_config)
+        self.piaoquan_client.connect()
+
+class FwhGroupPublishRecordManager(FwhDataRecycle):
+
+    def get_published_articles(self):
+        fetch_query = f"""
+            select id, publish_content_id, gh_id, user_group_id
+            from long_articles_group_send_result
+            where status = %s and recycle_status = %s;
+        """
+        fetch_response = self.long_articles_client.fetch(
+            query=fetch_query,
+            cursor_type=DictCursor,
+            params=(self.PUBLISH_SUCCESS_STATUS, self.RECYCLE_INIT_STATUS),
+        )
+        return fetch_response
+
+    def get_article_url_from_aigc_system(self, publish_content_id, user_group_id):
+        sql = f"""
+            select t1.publish_stage_url, t2.publish_timestamp
+            from publish_content_stage_url t1
+            left join publish_content t2 on t1.publish_content_id = t2.id
+            where t1.publish_content_id = %s and t1.user_group_id = %s;
+        """
+        article_info = self.denet_client.fetch(
+            sql,
+            cursor_type=DictCursor,
+            params=(publish_content_id, user_group_id),
+        )
+        if article_info:
+            return article_info[0]
+        else:
+            return None
+
+    def update_recycle_status(self, record_id, ori_status, new_status):
+        update_query = f"""
+            update long_articles_group_send_result
+            set recycle_status = %s
+            where id = %s and recycle_status = %s;
+        """
+        return self.long_articles_client.save(
+            update_query, (new_status, record_id, ori_status)
+        )
+
+    def set_article_url(self, record_id, article_url, publish_timestamp):
+        update_query = f"""
+            update long_articles_group_send_result
+            set url = %s, publish_timestamp = %s, recycle_status = %s
+            where id = %s and recycle_status = %s;
+        """
+        return self.long_articles_client.save(
+            query=update_query,
+            params=(article_url, publish_timestamp, self.RECYCLE_SUCCESS_STATUS, record_id, self.RECYCLE_PROCESSING_STATUS)
+        )
+
+    def deal(self):
+        publish_records = self.get_published_articles()
+        for publish_record in publish_records:
+            publish_content_id = publish_record["publish_content_id"]
+            record_id = publish_record["id"]
+            # lock
+            self.update_recycle_status(record_id, self.RECYCLE_INIT_STATUS, self.RECYCLE_PROCESSING_STATUS)
+
+            publish_call_back_info = self.get_article_url_from_aigc_system(publish_content_id)
+            if publish_call_back_info:
+                article_url = publish_call_back_info["publish_stage_url"]
+                publish_timestamp = int(publish_call_back_info["publish_timestamp"] / 1000)
+                if article_url and publish_timestamp:
+                    # set record and unlock
+                    self.set_article_url(record_id, article_url, publish_timestamp)
+                else:
+                    # unlock
+                    self.update_recycle_status(record_id, self.RECYCLE_PROCESSING_STATUS, self.RECYCLE_INIT_STATUS)
+            else:
+                # unlock
+                self.update_recycle_status(record_id, self.RECYCLE_PROCESSING_STATUS, self.RECYCLE_INIT_STATUS)
+
+
+class SaveFwhDataToDatabase(FwhDataRecycle):
+
+    def update_article_read_cnt(self, wx_sn, new_read_cnt):
+        """
+        update article read cnt
+        """
+        update_query = f"""
+            update official_articles_v2
+            set show_view_count = %s
+            where wx_sn = %s;
+        """
+        return self.piaoquan_client.save(update_query, (new_read_cnt, wx_sn))
+
+    def save_data_to_database(self, article_list):
+        """
+        save data to db
+        """
+        insert_query = f"""
+            insert into official_articles_v2
+            (ghId, accountName, appMsgId, title, Type, createTime, updateTime, ItemIndex, ContentUrl, show_view_count,  wx_sn, title_md5, article_group, channel_content_id, root_source_id_list) 
+            values (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s);      
+        """
+        return self.piaoquan_client.save(insert_query, article_list)
+
+    def get_group_server_accounts(self):
+        fetch_query = f"""
+            select gzh_id from article_gzh_developer;
+        """
+        fetch_response = self.piaoquan_client.fetch(fetch_query, cursor_type=DictCursor)
+        gh_id_list = [i['gzh_id'] for i in fetch_response]
+        return gh_id_list
+
+    def get_stat_published_articles(self, gh_id):
+        earliest_timestamp = int(time.time()) - self.STAT_PERIOD
+        fetch_query = f"""
+            select account_name, gh_id, user_group_id, url, publish_timestamp
+            from long_articles_group_send_result
+            where gh_id = %s and recycle_status = %s and publish_timestamp > %s;
+        """
+        return self.long_articles_client.fetch(fetch_query, DictCursor,(gh_id, self.RECYCLE_SUCCESS_STATUS, earliest_timestamp))
+
+    def process_each_account_data(self, account_published_article_list):
+        if not account_published_article_list:
+            return
+
+        for article in account_published_article_list:
+            account_name = article['account_name']
+            gh_id = article['gh_id']
+            user_group_id = article['user_group_id']
+            url = article['url']
+            publish_timestamp = article['publish_timestamp']
+
+            # get article detail info with spider
+            article_detail_info = get_article_detail(url)
+
+            time.sleep(5)
+
+
+
+    def deal(self):
+        account_id_list = self.get_group_server_accounts()
+        for account_id in account_id_list:
+            publish_articles = self.get_stat_published_articles(account_id)
+
+
+if __name__ == '__main__':
+    FwhGroupPublishRecordManager().deal()
+

+ 57 - 53
updatePublishedMsgDaily.py

@@ -339,7 +339,9 @@ def get_articles(db_client: DatabaseConnector):
     sql = f"""
     SELECT ContentUrl, wx_sn 
     FROM {ARTICLE_TABLE}
-    WHERE publish_timestamp in {(const.DEFAULT_STATUS, const.REQUEST_FAIL_STATUS)};"""
+    WHERE from_unixtime(publish_timestamp) > '2025-06-07' 
+    and root_source_id_list = '[]';
+    """
     response = db_client.fetch(sql)
     return response
 
@@ -354,7 +356,9 @@ def update_publish_timestamp(db_client: DatabaseConnector, row: Tuple):
     url = row[0]
     wx_sn = row[1]
     try:
+        print(url)
         response = spider.get_article_text(url)
+        print(response)
         response_code = response['code']
 
         if response_code == const.ARTICLE_DELETE_CODE:
@@ -398,10 +402,10 @@ def update_publish_timestamp(db_client: DatabaseConnector, row: Tuple):
             json.dumps(root_source_id_list, ensure_ascii=False),
             wx_sn
         ))
-    if publish_timestamp_s == const.REQUEST_FAIL_STATUS:
-        return row
-    else:
-        return None
+    # if publish_timestamp_s == const.REQUEST_FAIL_STATUS:
+    #     return row
+    # else:
+    #     return None
 
 
 def get_article_detail_job(db_client: DatabaseConnector):
@@ -418,52 +422,52 @@ def get_article_detail_job(db_client: DatabaseConnector):
             print(e)
             error_msg = traceback.format_exc()
             print(error_msg)
-    # check 一遍存在请求失败-1 && 0 的文章
-    process_failed_articles = get_articles(db_client)
-    fail_list = []
-    if process_failed_articles:
-        for article in tqdm(process_failed_articles):
-            try:
-                res = update_publish_timestamp(db_client=db_client, row=article)
-                fail_list.append({"wx_sn": res[1], "url": res[0]})
-            except Exception as e:
-                print(e)
-                error_msg = traceback.format_exc()
-                print(error_msg)
-
-    # 通过msgId 来修改publish_timestamp
-    update_sql = f"""
-        UPDATE {ARTICLE_TABLE} oav 
-        JOIN (
-            SELECT ghId, appMsgId, MAX(publish_timestamp) AS publish_timestamp 
-            FROM {ARTICLE_TABLE} 
-            WHERE publish_timestamp > %s 
-            GROUP BY ghId, appMsgId
-            ) vv 
-            ON oav.appMsgId = vv.appMsgId AND oav.ghId = vv.ghId
-        SET oav.publish_timestamp = vv.publish_timestamp
-        WHERE oav.publish_timestamp <= %s;
-    """
-    db_client.save(
-        query=update_sql,
-        params=(0, 0)
-    )
-
-    # 若还是无 publish_timestamp,用update_time当作 publish_timestamp
-    update_sql_2 = f"""
-        UPDATE {ARTICLE_TABLE}
-        SET publish_timestamp = updateTime
-        WHERE publish_timestamp < %s;
-    """
-    db_client.save(
-        query=update_sql_2,
-        params=0
-    )
-    if fail_list:
-        bot(
-            title="更新文章任务,请求detail失败",
-            detail=fail_list
-        )
+    # # check 一遍存在请求失败-1 && 0 的文章
+    # process_failed_articles = get_articles(db_client)
+    # fail_list = []
+    # if process_failed_articles:
+    #     for article in tqdm(process_failed_articles):
+    #         try:
+    #             update_publish_timestamp(db_client=db_client, row=article)
+    #             # fail_list.append({"wx_sn": res[1], "url": res[0]})
+    #         except Exception as e:
+    #             print(e)
+    #             error_msg = traceback.format_exc()
+    #             print(error_msg)
+
+    # # 通过msgId 来修改publish_timestamp
+    # update_sql = f"""
+    #     UPDATE {ARTICLE_TABLE} oav
+    #     JOIN (
+    #         SELECT ghId, appMsgId, MAX(publish_timestamp) AS publish_timestamp
+    #         FROM {ARTICLE_TABLE}
+    #         WHERE publish_timestamp > %s
+    #         GROUP BY ghId, appMsgId
+    #         ) vv
+    #         ON oav.appMsgId = vv.appMsgId AND oav.ghId = vv.ghId
+    #     SET oav.publish_timestamp = vv.publish_timestamp
+    #     WHERE oav.publish_timestamp <= %s;
+    # """
+    # db_client.save(
+    #     query=update_sql,
+    #     params=(0, 0)
+    # )
+    #
+    # # 若还是无 publish_timestamp,用update_time当作 publish_timestamp
+    # update_sql_2 = f"""
+    #     UPDATE {ARTICLE_TABLE}
+    #     SET publish_timestamp = updateTime
+    #     WHERE publish_timestamp < %s;
+    # """
+    # db_client.save(
+    #     query=update_sql_2,
+    #     params=0
+    # )
+    # if fail_list:
+    #     bot(
+    #         title="更新文章任务,请求detail失败",
+    #         detail=fail_list
+    #     )
 
 
 def whether_title_unsafe(db_client: DatabaseConnector, title: str):
@@ -724,8 +728,8 @@ def main():
             case _:
                 print("No such task, input update: update_job, check: check_job, detail: get_article_detail_job")
     else:
-        update_job(piaoquan_crawler_db_client=piaoquan_crawler_db_client, aigc_db_client=aigc_db_client)
-        check_job(piaoquan_crawler_db_client=piaoquan_crawler_db_client, aigc_db_client=aigc_db_client)
+        # update_job(piaoquan_crawler_db_client=piaoquan_crawler_db_client, aigc_db_client=aigc_db_client)
+        # check_job(piaoquan_crawler_db_client=piaoquan_crawler_db_client, aigc_db_client=aigc_db_client)
         get_article_detail_job(db_client=piaoquan_crawler_db_client)