|
@@ -1,10 +1,11 @@
|
|
|
from __future__ import annotations
|
|
|
|
|
|
import asyncio
|
|
|
-import time
|
|
|
+import time, json
|
|
|
import traceback
|
|
|
from datetime import datetime, date, timedelta
|
|
|
from typing import List, Dict
|
|
|
+from tqdm.asyncio import tqdm
|
|
|
|
|
|
from applications.api import feishu_robot
|
|
|
from applications.crawler.wechat import weixin_search
|
|
@@ -27,6 +28,14 @@ class CrawlerGzhConst:
|
|
|
STAT_DURATION = 30 # days
|
|
|
DEFAULT_TIMESTAMP = 1735660800
|
|
|
|
|
|
+ DAILY_SCRAPE_POSTIVE = 1
|
|
|
+ DAILY_SCRAPE_NEGATIVE = 0
|
|
|
+
|
|
|
+ USING_STATUS = 1
|
|
|
+ NOT_USING_STATUS = 0
|
|
|
+
|
|
|
+ CRAWL_ACCOUNT_FIRST_LEVEL = 500
|
|
|
+
|
|
|
|
|
|
class CrawlerGzhBaseStrategy(CrawlerPipeline, CrawlerGzhConst):
|
|
|
def __init__(self, pool, log_client, trace_id):
|
|
@@ -42,24 +51,32 @@ class CrawlerGzhBaseStrategy(CrawlerPipeline, CrawlerGzhConst):
|
|
|
from long_articles_accounts
|
|
|
where account_category = %s and is_using = %s and daily_scrape = %s;
|
|
|
"""
|
|
|
- return await self.pool.async_fetch(query=query, params=(method, 1, 1))
|
|
|
+ return await self.pool.async_fetch(
|
|
|
+ query=query, params=(method, self.USING_STATUS, self.DAILY_SCRAPE_POSTIVE)
|
|
|
+ )
|
|
|
+
|
|
|
case "V2":
|
|
|
query = """
|
|
|
select gh_id, account_name, latest_update_time
|
|
|
from long_articles_accounts
|
|
|
- where account_category = %s and is_using = %s order by recent_score_ci_lower desc limit %s;
|
|
|
+ where account_category = %s and is_using = %s
|
|
|
+ order by recent_score_ci_lower desc limit %s;
|
|
|
"""
|
|
|
- return await self.pool.async_fetch(query=query, params=(method, 1, 500))
|
|
|
+ return await self.pool.async_fetch(
|
|
|
+ query=query, params=(method, self.USING_STATUS, self.CRAWL_ACCOUNT_FIRST_LEVEL)
|
|
|
+ )
|
|
|
+
|
|
|
case _:
|
|
|
raise Exception("strategy not supported")
|
|
|
|
|
|
async def get_account_latest_update_timestamp(self, account_id: str) -> int:
|
|
|
"""get latest update time"""
|
|
|
- query = """ select max(publish_time) as publish_time from crawler_meta_article where out_account_id = %s;"""
|
|
|
- latest_timestamp_obj = await self.pool.async_fetch(
|
|
|
- query=query, params=(account_id,)
|
|
|
- )
|
|
|
- return latest_timestamp_obj[0]["publish_time"] if latest_timestamp_obj else None
|
|
|
+ query = """
|
|
|
+ select max(publish_time) as publish_time
|
|
|
+ from crawler_meta_article where out_account_id = %s;
|
|
|
+ """
|
|
|
+ fetch_response = await self.pool.async_fetch(query=query, params=(account_id,))
|
|
|
+ return next((item.get("publish_time") for item in fetch_response or []), None)
|
|
|
|
|
|
async def crawl_each_article(
|
|
|
self, article_raw_data, mode, account_method, account_id, source_title=None
|
|
@@ -117,7 +134,7 @@ class CrawlerGzhBaseStrategy(CrawlerPipeline, CrawlerGzhConst):
|
|
|
await self.save_item_to_database(
|
|
|
media_type="article", item=new_item, trace_id=self.trace_id
|
|
|
)
|
|
|
- await asyncio.sleep(self.STAT_DURATION)
|
|
|
+ await asyncio.sleep(self.SLEEP_SECONDS)
|
|
|
|
|
|
async def update_account_read_avg_info(self, gh_id, account_name):
|
|
|
"""update account read avg info"""
|
|
@@ -136,7 +153,7 @@ class CrawlerGzhBaseStrategy(CrawlerPipeline, CrawlerGzhConst):
|
|
|
read_avg = sum(read_cnt_list) / n
|
|
|
max_publish_dt = fetch_response[0]["publish_dt"]
|
|
|
remark = f"从{max_publish_dt}开始计算,往前算{len(fetch_response)}天"
|
|
|
- insert_query = f"""
|
|
|
+ insert_query = """
|
|
|
insert ignore into crawler_meta_article_accounts_read_avg
|
|
|
(gh_id, account_name, position, read_avg, dt, status, remark)
|
|
|
values
|
|
@@ -144,24 +161,16 @@ class CrawlerGzhBaseStrategy(CrawlerPipeline, CrawlerGzhConst):
|
|
|
"""
|
|
|
insert_rows = await self.pool.async_save(
|
|
|
query=insert_query,
|
|
|
- params=(
|
|
|
- gh_id,
|
|
|
- account_name,
|
|
|
- position,
|
|
|
- read_avg,
|
|
|
- today_dt,
|
|
|
- 1,
|
|
|
- remark,
|
|
|
- ),
|
|
|
+ params=(gh_id, account_name, position, read_avg, today_dt, self.USING_STATUS, remark),
|
|
|
)
|
|
|
if insert_rows:
|
|
|
- update_query = f"""
|
|
|
+ update_query = """
|
|
|
update crawler_meta_article_accounts_read_avg
|
|
|
set status = %s
|
|
|
where gh_id = %s and position = %s and dt < %s;
|
|
|
"""
|
|
|
await self.pool.async_save(
|
|
|
- update_query, (0, gh_id, position, today_dt)
|
|
|
+ update_query, (self.NOT_USING_STATUS, gh_id, position, today_dt)
|
|
|
)
|
|
|
|
|
|
async def get_hot_titles_with_strategy(self, strategy):
|
|
@@ -177,6 +186,7 @@ class CrawlerGzhBaseStrategy(CrawlerPipeline, CrawlerGzhConst):
|
|
|
timedelta_days = 5
|
|
|
case _:
|
|
|
raise Exception(f"unknown strategy: {strategy}")
|
|
|
+
|
|
|
date_string = (datetime.today() - timedelta(days=timedelta_days)).strftime(
|
|
|
"%Y%m%d"
|
|
|
)
|
|
@@ -247,8 +257,8 @@ class CrawlerGzhAccountArticles(CrawlerGzhBaseStrategy):
|
|
|
|
|
|
async def deal(self, method: str, strategy: str = "V1"):
|
|
|
account_list = await self.get_crawler_accounts(method, strategy)
|
|
|
- for account in account_list:
|
|
|
- print(account)
|
|
|
+ for account in tqdm(account_list, desc="抓取单个账号"):
|
|
|
+ print(f"{datetime.now()}: start crawling account: {json.dumps(account, ensure_ascii=False)}")
|
|
|
try:
|
|
|
await self.crawler_single_account(method, account)
|
|
|
await self.update_account_read_avg_info(
|
|
@@ -267,20 +277,20 @@ class CrawlerGzhAccountArticles(CrawlerGzhBaseStrategy):
|
|
|
},
|
|
|
}
|
|
|
)
|
|
|
+ print(f"{datetime.now()}: finish crawled account: {json.dumps(account, ensure_ascii=False)}")
|
|
|
|
|
|
|
|
|
class CrawlerGzhSearchArticles(CrawlerGzhBaseStrategy):
|
|
|
def __init__(self, pool, log_client, trace_id):
|
|
|
super().__init__(pool, log_client, trace_id)
|
|
|
|
|
|
- async def crawl_search_articles_detail(
|
|
|
- self, article_list: List[Dict], source_title: str
|
|
|
- ):
|
|
|
+ async def crawl_search_articles_detail(self, article_list: List[Dict], source_title: str):
|
|
|
+ """
|
|
|
+ @description: 对于搜索到的文章list,获取文章详情, 并且存储到meta表中
|
|
|
+ """
|
|
|
for article in article_list:
|
|
|
url = article["url"]
|
|
|
- detail_response = await get_article_detail(
|
|
|
- url, is_count=True, is_cache=False
|
|
|
- )
|
|
|
+ detail_response = await get_article_detail(url, is_count=True, is_cache=False)
|
|
|
if not detail_response:
|
|
|
continue
|
|
|
|
|
@@ -335,13 +345,14 @@ class CrawlerGzhSearchArticles(CrawlerGzhBaseStrategy):
|
|
|
|
|
|
async def deal(self, strategy: str = "V1"):
|
|
|
hot_titles = await self.get_hot_titles_with_strategy(strategy)
|
|
|
- for hot_title in hot_titles:
|
|
|
- print("hot title:", hot_title)
|
|
|
+ for hot_title in tqdm(hot_titles, desc="在微信内搜索文章"):
|
|
|
+ print(f"{datetime.now()}: start searching hot title: {hot_title}")
|
|
|
try:
|
|
|
await self.search_each_title(hot_title)
|
|
|
except Exception as e:
|
|
|
- print(f"crawler_gzh_articles error:{e}")
|
|
|
+ print(f"crawler_gzh_articles error:{e}\nexception:{traceback.format_exc()}")
|
|
|
|
|
|
+ print(f"{datetime.now()}: finish searched hot title: {hot_title}")
|
|
|
await feishu_robot.bot(
|
|
|
title="公众号搜索任务执行完成",
|
|
|
detail={
|