Kaynağa Gözat

写文件接口

罗俊辉 9 ay önce
ebeveyn
işleme
24271f2fdb

+ 40 - 1
applications/functions/__init__.py

@@ -3,4 +3,43 @@
 """
 from .article_account import ArticleRank
 from .article_tools import title_sim_v2_by_list
-from .server_article_account import get_article_title_url_list, get_article_titles
+from .server_article_account import get_article_title_url_list, get_article_titles
+
+
+def decode_show_k(show_k):
+    this_dict = {
+        '阅读': 'show_view_count',  # 文章
+        '看过': 'show_view_count',  # 图文
+        '观看': 'show_view_count',  # 视频
+        '赞': 'show_like_count',
+        '付费': 'show_pay_count',
+        '赞赏': 'show_zs_count',
+    }
+    if show_k not in this_dict:
+        print(f'error from decode_show_k, show_k not found: {show_k}')
+    return this_dict.get(show_k, 'show_unknown')
+
+
+def decode_show_v(show_v):
+    foo = show_v.replace('千', 'e3').replace('万', 'e4').replace('亿', 'e8')
+    foo = eval(foo)
+    return int(foo)
+
+
+def show_desc_to_sta(show_desc):
+    show_desc = show_desc.replace('+', '')
+    sta = {}
+    for show_kv in show_desc.split('\u2004\u2005'):
+        if not show_kv:
+            continue
+        show_k, show_v = show_kv.split('\u2006')
+        k = decode_show_k(show_k)
+        v = decode_show_v(show_v)
+        sta[k] = v
+    res = {
+        'show_view_count': sta.get('show_view_count', 0),
+        'show_like_count': sta.get('show_like_count', 0),
+        'show_pay_count': sta.get('show_pay_count', 0),
+        'show_zs_count': sta.get('show_zs_count', 0),
+    }
+    return res

+ 0 - 81
applications/updateAccountArticles.py

@@ -1,81 +0,0 @@
-"""
-@author: luojunhui
-"""
-import json
-import requests
-
-
-class ArticleManager(object):
-    """
-    Update account articles
-    """
-
-    @classmethod
-    def search_articles(cls, title):
-        """
-        search articles in wx
-        :return:
-        """
-        url = "http://8.217.190.241:8888/crawler/wei_xin/keyword"
-        payload = json.dumps({
-            "keyword": title,
-            "cursor": "1"
-        })
-        headers = {
-            'Content-Type': 'application/json'
-        }
-
-        response = requests.request("POST", url, headers=headers, data=payload)
-        return response.json()
-
-    @classmethod
-    def get_article_text(cls, content_link):
-        """
-        获取文章
-        :param content_link:
-        :return:
-        """
-        url = "http://8.217.190.241:8888/crawler/wei_xin/detail"
-        payload = json.dumps({
-            "content_link": content_link,
-            "is_count": False,
-            "is_ad": False
-        })
-        headers = {
-            'Content-Type': 'application/json'
-        }
-        response = requests.request("POST", url, headers=headers, data=payload)
-        return response.json()
-
-    @classmethod
-    def getAccountArticleList(cls, gh_id, cursor):
-        """
-        获取账号的文章list
-        :return:
-        """
-        url = 'http://8.217.190.241:8888/crawler/wei_xin/blogger'
-        payload = {
-            'account_id': gh_id,
-            'cursor': cursor,
-        }
-        msg_list = []
-        next_cursor = None
-        has_more = None
-        try:
-            res_data = requests.request("POST", url, headers={}, data=json.dumps(payload)).json()['data']
-            msg_list = res_data['data']
-            next_cursor = res_data['next_cursor']
-            has_more = res_data['has_more']
-            print(json.dumps(res_data, ensure_ascii=False, indent=4))
-            # for msg in msg_list:
-            #     msg['cursor'] = curso
-            #     msg['next_cursor'] = next_cursor
-            #     msg['has_more'] = has_more
-        except Exception as e:
-            print(e)
-        return msg_list, next_cursor, has_more
-
-
-if __name__ == '__main__':
-    AM = ArticleManager()
-    AM.getAccountArticleList(gh_id="gh_5ae65db96cb7", cursor=None)

+ 159 - 0
applications/wxSpider.py

@@ -0,0 +1,159 @@
+"""
+@author: luojunhui
+"""
+import json
+import time
+import asyncio
+import aiohttp
+import requests
+
+
+def retry_on_none_data():
+    """
+    基于None类型数据的重试装饰器
+    :return:
+    """
+
+    def decorator(func):
+        """
+        :param func:
+        :return:
+        """
+        max_retries = 5
+        wait_seconds = 1
+
+        def wrapper(*args, **kwargs):
+            """
+
+            :param args:
+            :param kwargs:
+            :return:
+            """
+            for attempt in range(max_retries):
+                response = func(*args, **kwargs)
+                if response['data'] is not None:
+                    return response
+                time.sleep(wait_seconds)
+            return None
+
+        return wrapper
+
+    return decorator
+
+
+def retryAsyncOnNoneData():
+    """
+    异步装饰器
+    :return:
+    """
+
+    def decorator(func):
+        """
+        :param func:
+        :return:
+        """
+        max_retries = 5
+        wait_seconds = 1
+
+        async def wrapper(*args, **kwargs):
+            """
+
+            :param args:
+            :param kwargs:
+            :return:
+            """
+            for attempt in range(max_retries):
+                response = await func(*args, **kwargs)
+                if response.get('data') is not None:
+                    return response
+                await asyncio.sleep(wait_seconds)
+            return None
+
+        return wrapper
+
+    return decorator
+
+
+class ArticleManager(object):
+    """
+    Update account articles
+    """
+
+    @classmethod
+    @retry_on_none_data()
+    def search_articles(cls, title):
+        """
+        search articles in wx
+        :return:
+        """
+        url = "http://8.217.190.241:8888/crawler/wei_xin/keyword"
+        payload = json.dumps({
+            "keyword": title,
+            "cursor": "1"
+        })
+        headers = {
+            'Content-Type': 'application/json'
+        }
+
+        response = requests.request("POST", url, headers=headers, data=payload)
+        return response.json()
+
+    @classmethod
+    @retry_on_none_data()
+    def get_article_text(cls, content_link):
+        """
+        获取文章
+        :param content_link:
+        :return:
+        """
+        url = "http://8.217.190.241:8888/crawler/wei_xin/detail"
+        payload = json.dumps({
+            "content_link": content_link,
+            "is_count": False,
+            "is_ad": False
+        })
+        headers = {
+            'Content-Type': 'application/json'
+        }
+        response = requests.request("POST", url, headers=headers, data=payload)
+        return response.json()
+
+    @classmethod
+    @retry_on_none_data()
+    def update_msg_list(cls, ghId, index):
+        """
+        :return:
+        """
+        url = 'http://8.217.190.241:8888/crawler/wei_xin/blogger'
+        payload = {
+            'account_id': ghId,
+            'cursor': index,
+        }
+        headers = {
+            'Content-Type': 'application/json'
+        }
+        response = requests.post(url, headers=headers, data=json.dumps(payload))
+        return response.json()
+
+    @classmethod
+    @retryAsyncOnNoneData()
+    async def get_account_by_url(cls, content_url):
+        """
+        通过文章获取账号信息
+        :param content_url:
+        :return:
+        """
+        async with aiohttp.ClientSession() as session:
+            async with session.post(
+                    url='http://8.217.190.241:8888/crawler/wei_xin/account_info',
+                    headers={'Content-Type': 'application/json'},
+                    json={"content_link": content_url}
+            ) as response:
+                return await response.json()
+        # response = requests.request(
+        #     "POST",
+        #     url='http://8.217.190.241:8888/crawler/wei_xin/account_info',
+        #     headers={'Content-Type': 'application/json'},
+        #     json={"content_link": content_url}
+        # )
+        # return response.json()

+ 3 - 3
routes/__init__.py

@@ -6,7 +6,7 @@ from quart import Blueprint, jsonify, request
 
 from .accountArticleRank import AccountArticleRank
 from .nlpServer import NLPServer
-from .articleDBServer import ArticleDB
+from .articleDBServer import ArticleSpider
 from .accountServer import AccountServer
 
 
@@ -60,14 +60,14 @@ def AlgRoutes(mysql_client, model):
         response = await AS.deal()
         return jsonify(response)
 
-    @blueprint.route("/article_db", methods=["POST"])
+    @blueprint.route("/article_crawler", methods=["POST"])
     async def articleMysql():
         """
         长文数据库相关接口
         :return:
         """
         params = await request.get_json()
-        ADB = ArticleDB(params=params, mysql_client=mysql_client)
+        ADB = ArticleSpider(params=params, mysql_client=mysql_client)
         response = await ADB.deal()
         return jsonify(response)
 

+ 165 - 9
routes/articleDBServer.py

@@ -1,35 +1,191 @@
 """
 @author: luojunhui
 """
-from applications.articleTools import ArticleDBTools
+import json
+import time
 
+from applications.wxSpider import ArticleManager
+from applications.functions import show_desc_to_sta
 
-class ArticleDB(object):
+
+class ArticleSpider(object):
     """
-    长文数据库功能
+    input: ghId, AccountName
     """
+
     def __init__(self, params, mysql_client):
+        self.endTime = None
+        self.startTime = None
+        self.ghId = None
         self.params = params
         self.mysql_client = mysql_client
-        self.tools = ArticleDBTools(self.mysql_client)
+        self.tools = ArticleManager()
 
     def checkParams(self):
         """
         校验参数
         :return:
         """
-        a = self.params
+        try:
+            self.ghId = self.params['ghId']
+            # self.startTime = self.params['startTime']
+            # self.endTime = self.params['endTime']
+            return None
+        except Exception as e:
+            return {
+                "error": "Params Error",
+                "msg": str(e),
+                "params": self.params
+            }
+
+    async def checkAccount(self):
+        """
+        判断账号是否是新账号, 内部账号还是外部账号
+        :return:
+        """
+        sql = f"""
+        select accountName, updateTime 
+        from official_articles 
+        where ghId = '{self.ghId}' 
+        order by updateTime DESC;"""
+        result = await self.mysql_client.async_select(sql)
+        if result:
+            account_name, update_time = result[0]
+            return {
+                "account_name": account_name,
+                "update_time": update_time,
+                "account_type": "history"
+            }
+        else:
+            return {
+                "account_name": "",
+                "update_time": int(time.time()) - 30 * 24 * 60 * 60,
+                "account_type": "new"
+            }
 
-    def task_schedule(self):
+    async def process_msg_list(self, gh_id, account_name, msg_list):
         """
-        调度任务
+        把消息数据更新到数据库中
+        :param account_name:
+        :param gh_id:
+        :param msg_list:
         :return:
         """
+        for info in msg_list:
+            baseInfo = info.get("BaseInfo", {})
+            appMsgId = info.get("AppMsg", {}).get("BaseInfo", {}).get("AppMsgId", None)
+            createTime = info.get("AppMsg", {}).get("BaseInfo", {}).get("CreateTime", None)
+            updateTime = info.get("AppMsg", {}).get("BaseInfo", {}).get("UpdateTime", None)
+            if int(time.time()) - int(updateTime) <= 20 * 60 * 60:
+                continue
+            Type = info.get("AppMsg", {}).get("BaseInfo", {}).get("Type", None)
+            detail_article_list = info.get("AppMsg", {}).get("DetailInfo", [])
+            if detail_article_list:
+                for article in detail_article_list:
+                    try:
+                        title = article.get("Title", None)
+                        Digest = article.get("Digest", None)
+                        ItemIndex = article.get("ItemIndex", None)
+                        ContentUrl = article.get("ContentUrl", None)
+                        SourceUrl = article.get("SourceUrl", None)
+                        CoverImgUrl = article.get("CoverImgUrl", None)
+                        CoverImgUrl_1_1 = article.get("CoverImgUrl_1_1", None)
+                        CoverImgUrl_235_1 = article.get("CoverImgUrl_235_1", None)
+                        ItemShowType = article.get("ItemShowType", None)
+                        IsOriginal = article.get("IsOriginal", None)
+                        ShowDesc = article.get("ShowDesc", None)
+                        show_stat = show_desc_to_sta(ShowDesc)
+                        ori_content = article.get("ori_content", None)
+                        show_view_count = show_stat.get("show_view_count", 0)
+                        show_like_count = show_stat.get("show_like_count", 0)
+                        show_zs_count = show_stat.get("show_zs_count", 0)
+                        show_pay_count = show_stat.get("show_pay_count", 0)
+                        wx_sn = ContentUrl.split("&sn=")[1].split("&")[0] if ContentUrl else None
+                        info_tuple = (
+                            gh_id,
+                            account_name,
+                            appMsgId,
+                            title,
+                            Type,
+                            createTime,
+                            updateTime,
+                            Digest,
+                            ItemIndex,
+                            ContentUrl,
+                            SourceUrl,
+                            CoverImgUrl,
+                            CoverImgUrl_1_1,
+                            CoverImgUrl_235_1,
+                            ItemShowType,
+                            IsOriginal,
+                            ShowDesc,
+                            ori_content,
+                            show_view_count,
+                            show_like_count,
+                            show_zs_count,
+                            show_pay_count,
+                            wx_sn,
+                            json.dumps(baseInfo, ensure_ascii=False)
+                        )
+                        insert_sql = f"""
+                            INSERT INTO official_articles
+                            (ghId, accountName, appMsgId, title, Type, createTime, updateTime, Digest, ItemIndex, ContentUrl, SourceUrl, CoverImgUrl, CoverImgUrl_1_1, CoverImgUrl_255_1, ItemShowType, IsOriginal, ShowDesc, ori_content, show_view_count, show_like_count, show_zs_count, show_pay_count, wx_sn, baseInfo)
+                            values
+                            (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s);
+                            """
+                        await self.mysql_client.async_insert(sql=insert_sql, params=info_tuple)
+                        print("更新成功")
+                    except Exception as e:
+                        print("error")
+                        print(e)
+                        continue
+
+    async def getAccountArticleList(self, gh_id, account_name, last_update_time, cursor=None):
+        """
+        输入ghid获取账号的文章list
+        :return:
+        """
+        response = self.tools.update_msg_list(ghId=gh_id, index=cursor)
+        msg_list = response.get("data", {}).get("data")
+        if msg_list:
+            # print(msg_list)
+            print("获取msg_list成功")
+            last_article_in_this_msg = msg_list[-1]
+            last_time_stamp_in_this_msg = last_article_in_this_msg['AppMsg']['BaseInfo']['UpdateTime']
+            last_url = last_article_in_this_msg['AppMsg']['DetailInfo'][0]['ContentUrl']
+            resdata = await self.tools.get_account_by_url(last_url)
+            check_name = resdata['data'].get('data', {}).get('account_name')
+            check_id = resdata['data'].get('data', {}).get('wx_gh')
+            print(check_name, check_id, last_url)
+            if check_id == gh_id:
+                print("校验成功")
+                await self.process_msg_list(gh_id, check_name, msg_list)
+                if last_time_stamp_in_this_msg > last_update_time:
+                    next_cursor = response['data']['next_cursor']
+                    return await self.getAccountArticleList(
+                        gh_id=gh_id,
+                        account_name=check_name,
+                        last_update_time=last_update_time,
+                        cursor=next_cursor
+                    )
+            else:
+                print("校验失败")
 
     async def deal(self):
         """
         deal function
         :return:
         """
-        return {"message": "此接口正在开发中"}
-
+        if self.checkParams():
+            return self.checkParams()
+        else:
+            account_info = await self.checkAccount()
+            account_name = account_info['account_name']
+            update_time = account_info['update_time']
+            print("开始执行")
+            await self.getAccountArticleList(
+                gh_id=self.ghId,
+                account_name=account_name,
+                last_update_time=update_time
+            )
+            return {"message": "successful"}

+ 19 - 0
test/mysql_dev.py

@@ -0,0 +1,19 @@
+"""
+@author: luojunhui
+"""
+import pymysql
+
+connection = pymysql.connect(
+        host='rm-bp1159bu17li9hi94.mysql.rds.aliyuncs.com',
+        port=3306,
+        user='crawler',
+        password='crawler123456@',
+        db='piaoquan-crawler',
+        charset='utf8mb4'
+    )
+
+sql = "select accountName, updateTime from official_articles where ghId = '' order by updateTime DESC;"
+cursor = connection.cursor()
+cursor.execute(sql)
+response = cursor.fetchall()
+print(response[0])

+ 5 - 5
test/rank_dev.py

@@ -2,14 +2,14 @@ import json
 import requests
 import time
 
-url = "http://localhost:6060/articleRank"
+url = "http://192.168.100.31:8179/score_list"
 
 with open("body.json") as f:
     data = json.loads(f.read())
 body = json.loads(data['data'])
 body['strategy'] = "ArticleRankV2"
-body['publishNum'] = 8
-body['accountName'] = "指尖奇文"
+body['publishNum'] = 4
+# body['accountName'] = "指尖奇文"
 
 headers = {"Content-Type": "application/json"}
 
@@ -18,5 +18,5 @@ response = requests.post(url=url, headers=headers, json=body)
 b = time.time()
 print(json.dumps(response.json(), ensure_ascii=False, indent=4))
 print(b - a)
-for res in response.json()['data']['rank_list']:
-    print(res['title'], res['producePlanName'], res['score'], res['crawlerViewCount'])
+# for res in response.json()['data']['rank_list']:
+#     print(res['title'], res['producePlanName'], res['score'], res['crawlerViewCount'])

+ 2 - 1
test/score_list_dev.py

@@ -13,6 +13,7 @@ class ArticleRank(object):
     url = "http://192.168.100.31:8179/score_list"
     url1 = "http://47.98.154.124:6060/score_list"
     # url1 = "http://localhost:6060/score_list"
+    url2 = "http://192.168.100.31:8179/score_list"
 
     @classmethod
     def rank(cls, account_list, text_list):
@@ -31,7 +32,7 @@ class ArticleRank(object):
             "sim_type": "mean",
             "rate": 0.1
         }
-        response = requests.post(url=cls.url1, headers={}, json=body).json()
+        response = requests.post(url=cls.url, headers={}, json=body).json()
         return response
 
 

+ 13 - 0
test/spider_dev.py

@@ -0,0 +1,13 @@
+"""
+@author: luojunhui
+"""
+import json
+
+from applications.wxSpider import ArticleManager
+
+
+url = "https://mp.weixin.qq.com/s?__biz=MzkyMTUwODkxOA==&mid=2247488156&idx=1&sn=ae7e723070e445dd2162f830143cb855&chksm=c040af8b5e0ddb1c2e0c50487e35de0bbd8603e756682d2ffbdb08e74bf62719adf91e5fe840&scene=126&sessionid=1679649075#rd"
+res = ArticleManager().get_account_by_url(
+    content_url=url
+)
+print(json.dumps(res, ensure_ascii=False, indent=4))