Просмотр исходного кода

将更新文章代码部署到线上

罗俊辉 8 месяцев назад
Родитель
Сommit
852eb199e5

+ 1 - 1
applications/spiderTool.py

@@ -24,7 +24,7 @@ class SpiderTools(object):
         :param category:
         :return:
         """
-        response = cls.spider_client.update_msg_list(gh_id=gh_id, index=None)
+        response = cls.spider_client.update_msg_list(ghId=gh_id, index=None)
         msg_list = response.get("data", {}).get("data")
         if msg_list:
             cls.updateDataIntoMysql(

+ 6 - 5
applications/wxSpiderApi.py

@@ -2,7 +2,6 @@
 @author: luojunhui
 """
 import json
-import time
 import requests
 
 from applications.decoratorApi import retryOnNone
@@ -40,7 +39,7 @@ class WeixinSpider(object):
         :param content_link:
         :return:
         """
-        url = "http://8.217.190.241:8888/crawler/wei_xin/detail"
+        url = "http://47.98.154.124:8888/crawler/wei_xin/detail"
         payload = json.dumps({
             "content_link": content_link,
             "is_count": False,
@@ -49,7 +48,10 @@ class WeixinSpider(object):
         headers = {
             'Content-Type': 'application/json'
         }
+        # print(url)
+        # print(payload)
         response = requests.request("POST", url, headers=headers, data=payload)
+        print("info", response.text)
         return response.json()
 
     @classmethod
@@ -67,7 +69,7 @@ class WeixinSpider(object):
         headers = {
             'Content-Type': 'application/json'
         }
-        response = requests.post(url, headers=headers, data=json.dumps(payload))
+        response = requests.post(url, headers=headers, data=json.dumps(payload), timeout=120)
         return response.json()
 
     @classmethod
@@ -84,5 +86,4 @@ class WeixinSpider(object):
             headers={'Content-Type': 'application/json'},
             json={"content_link": content_url}
         )
-        return response.json()
-
+        return response.json()

+ 261 - 0
dev/hurry_up.py

@@ -0,0 +1,261 @@
+import json
+
+import pandas as pd
+from tqdm import tqdm
+from datetime import datetime, timedelta
+
+from applications import AdMySQL, PQMySQL, WeixinSpider
+
+class DailyDataManager(object):
+    """
+    daily 数据每日更新
+    """
+    ad_mysql = AdMySQL()
+    pq_mysql = PQMySQL()
+    wx_spider = WeixinSpider()
+
+    @classmethod
+    def getPublishedArticles(cls):
+        """
+        获取已经发布的文章的信息
+        :return:
+        """
+        # sql = f"""
+        # SELECT ContentUrl, wx_sn, createTime
+        # FROM official_articles_v2
+        # WHERE createTime > 1719763200;
+        # """
+        sql2 = f"""
+        select ContentUrl, wx_sn, createTime from official_articles_v2 where createTime > 1719763200 and accountName in (
+    select distinct account_name from account_avg_info_v2
+    );
+        """
+        result_list = cls.pq_mysql.select(sql2)
+        return result_list
+
+    @classmethod
+    def getRootSourceIds(cls, data_info):
+        """
+        通过抓取接口获取 data_info
+        :return:
+        """
+        url = data_info[0]
+        article_detail = cls.wx_spider.get_article_text(url)
+        print(url)
+        print(article_detail)
+        # print(json.dumps(article_detail, ensure_ascii=False, indent=4))
+        mini_info = article_detail['data']['data']['mini_program']
+        return data_info[1].decode(), mini_info, data_info[2]
+
+    @classmethod
+    def getMinigramInfo(cls, rootSourceId):
+        """
+
+        :param rootIdTuple:
+        :return:
+        """
+        sql = f"""
+        select type, machinecode, create_time, first_level_dt 
+        from changwen_data_base
+        where rootsourceid = '{rootSourceId}';
+        """
+        result_list = cls.ad_mysql.select(sql)
+
+        def summarize(values):
+            """
+            :param values:
+            :return:
+            """
+            L = {}
+            first_level = {}
+            fission_level = {}
+            for line in values:
+                # 先统计首层
+                if line[0] == '首层':
+                    c_time = line[-2].__str__().split(" ")[0]
+                    if first_level.get(c_time):
+                        first_level[c_time].add(line[1])
+                    else:
+                        first_level[c_time] = {line[1]}
+                else:
+                    dt = str(line[-1])
+                    first_level_dt = datetime.strptime(dt, '%Y%m%d')
+                    create_level_dt = line[-2]
+                    delta = create_level_dt - first_level_dt
+                    days = int(delta.days)
+                    key_dt = datetime.strptime(dt, '%Y%m%d').strftime('%Y-%m-%d')
+                    if fission_level.get(key_dt):
+                        fission_level[key_dt].append((line[1], days))
+                    else:
+                        fission_level[key_dt] = [(line[1], days)]
+
+            tt = {}
+            for key in fission_level:
+                detail_list = fission_level[key]
+                temp = {}
+                for item in detail_list:
+                    mid, days = item
+                    if temp.get(days):
+                        temp[days].add(mid)
+                    else:
+                        temp[days] = {mid}
+                final = {}
+                for sub_key in temp:
+                    length = len(temp[sub_key])
+                    final[sub_key] = length
+                tt[key] = final
+            for key in first_level:
+                temp = [len(first_level[key]), tt.get(key, {}).get(0, 0), tt.get(key, {}).get(1, 0), tt.get(key, {}).get(2, 0)]
+                L[key] = temp
+            return L
+
+        return summarize(result_list)
+
+    @classmethod
+    def getArticleInfo(cls, trace_id):
+        """
+        通过 trace_id来获取文章信息
+        :param trace_id:
+        :return:
+        """
+        sql = f"""
+        SELECT account_name, article_title
+        FROM long_articles_video
+        WHERE trace_id = '{trace_id}';
+        """
+        info = cls.pq_mysql.select(sql)
+        return info[0]
+
+    @classmethod
+    def updateDetail(cls):
+        """
+
+        :return:
+        """
+        sql = f"""
+        select distinct root_source_id
+        from long_articles_detail_info
+        """
+        source_id_list = cls.pq_mysql.select(sql)
+        for item in tqdm(source_id_list):
+            s_id = item[0]
+            try:
+                result = cls.getMinigramInfo(s_id)
+                for key in result:
+                    recall_dt = key
+                    first_level = result[key][0]
+                    fission_0 = result[key][1]
+                    fission_1 = result[key][2]
+                    fission_2 = result[key][3]
+                    print(key, first_level, fission_0, fission_1, fission_2)
+                    update_sql = f"""
+                    UPDATE long_articles_detail_info
+                    set first_level = %s, fission_0 = %s, fission_1 = %s, fission_2 = %s
+                    where root_source_id = %s and recall_dt = %s;
+                    """
+                    try:
+                        cls.pq_mysql.update(
+                            sql=update_sql,
+                            params=(
+                                first_level, fission_0, fission_1, fission_2, s_id, recall_dt
+                            )
+                        )
+                    except Exception as e:
+                        print("insert error", e)
+            except Exception as e:
+                print(e)
+
+
+if __name__ == '__main__':
+    DM = DailyDataManager()
+    # DM.updateDetail()
+    publishArticles = DM.getPublishedArticles()
+    print(len(publishArticles))
+    for line in tqdm(publishArticles):
+        try:
+            wx_sn, mini_info, create_time = DM.getRootSourceIds(line)
+            dt_object = datetime.fromtimestamp(create_time)
+            publish_dt = dt_object.strftime('%Y-%m-%d')
+            one_day = timedelta(days=1)
+            two_day = timedelta(days=2)
+            next_day = dt_object + one_day
+            next_next_day = dt_object + two_day
+            recall_dt_list = [dt_object, next_day, next_next_day]
+            recall_dt_str_list = [i.strftime('%Y-%m-%d') for i in recall_dt_list]
+            for dt_str in recall_dt_str_list:
+                for index, item in enumerate(mini_info, 1):
+                    image_url = item['image_url']
+                    nick_name = item['nike_name']
+                    root_source_id = item['path'].split("rootSourceId%3D")[-1]
+                    video_id = item['path'].split("videos%3Fid%3D")[1].split("%26su%3D")[0]
+                    kimi_title = item['title']
+                    insert_sql = f"""
+                        INSERT INTO long_articles_detail_info
+                        (wx_sn, mini_title, mini_name, cover_url, video_index, root_source_id, video_id, publish_dt, recall_dt)
+                        values
+                        (%s, %s, %s, %s, %s, %s, %s, %s, %s);
+                    """
+                    DM.pq_mysql.update(
+                        sql=insert_sql,
+                        params=(
+                            wx_sn,
+                            kimi_title,
+                            nick_name,
+                            image_url,
+                            index,
+                            root_source_id,
+                            video_id,
+                            publish_dt,
+                            dt_str
+                        )
+                    )
+        except Exception as e:
+            print(e)
+            pass
+
+
+
+
+
+    # for line in DailyIds:
+    #     try:
+    #         source_id_tuple = DM.getRootSourceIds(trace_id=line)
+    #         result = DM.getMinigramInfo(source_id_tuple)
+    #         print(line)
+    #         print(result)
+    #         print("\n")
+    #     except Exception as e:
+    #         print(e)
+    #     print(line)
+    # L = {}
+    # trace_id = "search-a9bb246a-57fa-49f4-88d7-eec575813130-1723608633"
+    # source_id_tuple = DM.getRootSourceIds(trace_id=trace_id)
+    # result = DM.getMinigramInfo(source_id_tuple)
+    # print(result)
+    # for t_id in tqdm(DailyIds):
+    #     s_tuple = DM.getRootSourceIds(trace_id=t_id)
+    #     first_, fission_ = DM.getMinigramInfo(s_tuple)
+    #     obj = {
+    #         "first_": first_,
+    #         "fission_": fission_,
+    #         "rate": fission_ / first_ if first_ > 0 else 0
+    #     }
+    #     L[t_id] = obj
+    # Df = []
+    # with open("t.json", encoding="utf-8") as f:
+    #     L = json.loads(f.read())
+    # for key in L:
+    #     print(key)
+    #     value = L[key]
+    #     result = DM.getArticleInfo(trace_id=key)
+    #     account_name, article_title = result
+    #     temp = [
+    #         account_name,
+    #         article_title,
+    #         value['first_'],
+    #         value['fission_'],
+    #         value['rate']
+    #     ]
+    #     Df.append(temp)
+    # df = pd.DataFrame(Df, columns=['account_name', 'article_title', 'first_', 'fission_', 'rate0'])
+    # df.to_excel("0825.xlsx", index=False)

+ 1 - 1
spider/weixinAssociationCrawler.py

@@ -38,7 +38,7 @@ class weixinAssociation(object):
         :return:
         """
         account_info_list = cls.getAssociationAccounts()
-        for line in tqdm(account_info_list):
+        for line in tqdm(account_info_list[1:]):
             gh_id = line[0]
             cls.spider_tool.searchEachAccountArticlesSinglePage(
                 gh_id=gh_id,

+ 48 - 27
spider/weixinCategoryCrawler.py

@@ -7,7 +7,7 @@ import time
 
 from tqdm import tqdm
 
-from applications import WeixinSpider, Functions, DeNetMysql
+from applications import WeixinSpider, Functions, DeNetMysql, PQMySQL
 
 
 class weixinCategory(object):
@@ -16,19 +16,23 @@ class weixinCategory(object):
     """
 
     def __init__(self):
-        self.spider_client = DeNetMysql()
+        self.db_client_pq = PQMySQL()
+        self.db_client_dt = DeNetMysql()
         self.spider = WeixinSpider()
         self.function = Functions()
 
-    def getAccountList(self):
+    def getAccountList(self, account_category):
         """
         获取账号
+        :param account_category 品类
         :return:
         """
-        now_time = int(time.time())
-        twenty_hours_ago = now_time - 3600 * 20
-        sql = f"""select * from long_article_accounts_outside where latest_article_timestamp < {twenty_hours_ago};"""
-        account_tuple = self.spider_client.select(sql)
+        sql = f"""
+            select distinct gh_id, account_source, account_name, account_category, latest_update_time
+            from long_articles_accounts 
+            where account_category = '{account_category}';
+            """
+        account_tuple = self.db_client_pq.select(sql)
         result = [
             {
                 "gh_id": i[0],
@@ -59,7 +63,7 @@ class weixinCategory(object):
                         VALUES 
                         (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s);
                     """
-                    self.spider_client.update(
+                    self.db_client_dt.update(
                         sql=insert_sql,
                         params=(
                             "weixin",
@@ -87,19 +91,20 @@ class weixinCategory(object):
         :return:
         """
         select_sql = f"""
-        SELECT publish_time 
-        From crawler_meta_article 
-        WHERE out_account_id = '{gh_id}'
-        ORDER BY publish_time DESC LIMIT 1;
+            SELECT publish_time 
+            From crawler_meta_article 
+            WHERE out_account_id = '{gh_id}'
+            ORDER BY publish_time DESC LIMIT 1;
         """
-        result = self.spider_client.select(select_sql)
+        result = self.db_client_dt.select(select_sql)
         time_stamp = result[0][0]
+        dt_str = self.function.time_stamp_to_str(time_stamp)
         update_sql = f"""
-            update long_article_accounts_outside
-            set latest_article_timestamp = %s
-            where account_id = %s;
+            update long_articles_accounts
+            set latest_update_time = %s
+            where gh_id = %s;
         """
-        self.spider_client.update(sql=update_sql, params=(time_stamp, gh_id))
+        self.db_client_pq.update(sql=update_sql, params=(dt_str, gh_id))
 
     def updateEachAccountArticles(self, gh_id, category, latest_time_stamp, index=None):
         """
@@ -125,19 +130,35 @@ class weixinCategory(object):
             else:
                 # 更新最近抓取时间
                 self.updateLatestAccountTimeStamp(gh_id=gh_id)
+                print("账号时间更新成功")
         else:
             print("No more data")
 
 
 if __name__ == "__main__":
     wxCategory = weixinCategory()
-    account_list = wxCategory.getAccountList()
-    for account in tqdm(account_list):
-        try:
-            wxCategory.updateEachAccountArticles(
-                gh_id=account["gh_id"],
-                category=account["category"],
-                latest_time_stamp=account["latest_timestamp"],
-            )
-        except Exception as e:
-            print("fail because of {}".format(e))
+    category_list = [
+        '军事',
+        '历史',
+        '娱乐八卦',
+        '情感生活',
+        '健康养生',
+        '新闻媒体'
+    ]
+    for category in category_list[4:]:
+        account_list = wxCategory.getAccountList(category)
+        for account in tqdm(account_list):
+            try:
+                gh_id = account['gh_id']
+                category = account['category']
+                try:
+                    timestamp = int(account['latest_timestamp'].timestamp())
+                except:
+                    timestamp = 1704038400
+                wxCategory.updateEachAccountArticles(
+                    gh_id=gh_id,
+                    category=category,
+                    latest_time_stamp=timestamp
+                )
+            except Exception as e:
+                print("fail because of {}".format(e))

+ 12 - 12
spider/weixinRelativeAccountCrawler.py

@@ -187,17 +187,17 @@ class weixinRelationAccountGoodArticles(object):
                 except Exception as e:
                     print(e)
 
-    @classmethod
-    def searchResultFilter(cls, filter_type, info):
-        """
-        搜索结果过滤
-        :param info: 待过滤信息
-        :param filter_type: 过滤类型,account表示账号过滤, article表示文章过滤
-        :return: 过滤后的结果
-        """
-        match filter_type:
-            case "account":
-                return account
+    # @classmethod
+    # def searchResultFilter(cls, filter_type, info):
+    #     """
+    #     搜索结果过滤
+    #     :param info: 待过滤信息
+    #     :param filter_type: 过滤类型,account表示账号过滤, article表示文章过滤
+    #     :return: 过滤后的结果
+    #     """
+    #     match filter_type:
+    #         case "account":
+    #             return account
 
 
 if __name__ == "__main__":
@@ -209,7 +209,7 @@ if __name__ == "__main__":
             account_id=source_account
         )
         goodArticles = weixin.filterGoodArticle(accountArticlesDataFrame)
-        for title in goodArticles[1:2]:
+        for title in goodArticles:
             account_list = weixin.searchGoodArticlesAccounts(
                 source_account=source_account, source_title=title
             )