123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125 |
- import time
- from applications.api import AsyncApolloApi
- from applications.utils import CrawlerMetaArticle
- class CrawlerPipeline(AsyncApolloApi):
- def __init__(self, pool, log_client):
- super().__init__()
- self.pool = pool
- self.log_client = log_client
- async def whether_title_sensitive(self, title: str) -> bool:
- sensitive_word_list = await self.get_config_value("sensitive_word_list")
- for word in sensitive_word_list:
- if word in title:
- return True
- return False
- async def whether_article_title_duplicate(self, title: str) -> bool:
- query = f"""select article_id from crawler_meta_article where title = %s;"""
- duplicated_id = await self.pool.async_fetch(query=query, params=(title,))
- return True if duplicated_id else False
- async def save_article(self, article_item: dict) -> None:
- """save articles into database"""
- query = f"""
- insert into crawler_meta_article
- (platform, mode, category, out_account_id, article_index, title, link,
- read_cnt, like_cnt, description, publish_time, crawler_time, score, status,
- unique_index, source_article_title, source_account, title_sensitivity)
- values (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s,%s);
- """
- await self.pool.async_save(
- query=query,
- params=(
- article_item.get("platform", "weixin"),
- article_item.get("mode"),
- article_item.get("category"),
- article_item.get("out_account_id"),
- article_item.get("article_index"),
- article_item.get("title"),
- article_item.get("link"),
- article_item.get("read_cnt", 0),
- article_item.get("like_cnt", 0),
- article_item.get("description"),
- article_item.get("publish_time"),
- article_item.get("crawler_time", int(time.time())),
- article_item.get("score"),
- article_item.get("status", 1),
- article_item.get("unique_index"),
- article_item.get("source_article_title", None),
- article_item.get("source_account", None),
- article_item.get("title_sensitivity", 0),
- ),
- )
- async def save_article_v2(self, article_item: dict) -> None:
- """save articles into database"""
- new_article = CrawlerMetaArticle(**article_item)
- new_article_dict = new_article.model_dump()
- insert_template = (
- """insert into crawler_meta_article ({columns}) values ({values});"""
- )
- insert_data = {k: v for k, v in new_article_dict.items() if v is not None}
- columns = ", ".join(insert_data.keys())
- values = ", ".join([f"%s" for i in range(len(insert_data))])
- query = insert_template.format(columns=columns, values=values)
- await self.pool.async_save(
- query=query,
- params=tuple(list(insert_data.values())),
- )
- async def save_video(self, video_item: dict) -> str:
- pass
- async def save_item_to_database(self, media_type: str, item: dict):
- """deal function"""
- match media_type:
- case "video":
- await self.save_video(item)
- case "article":
- log_data = {
- "title": item["title"],
- "platform": item["platform"],
- "mode": item["mode"],
- "source": item["category"],
- }
- await self.log_client.log(
- contents={
- "task": "save_article",
- "data": log_data,
- "message": "start crawler article",
- "code": 1001,
- }
- )
- # 判断文章标题是否已经存在
- if await self.whether_article_title_duplicate(log_data["title"]):
- await self.log_client.log(
- contents={
- "task": "save_article",
- "data": log_data,
- "message": "duplicate article title",
- "code": 1002,
- }
- )
- return
- # 判断标题是否敏感
- if await self.whether_title_sensitive(item["title"]):
- await self.log_client.log(
- contents={
- "task": "save_article",
- "data": log_data,
- "message": "title_sensitive",
- "code": 1003,
- }
- )
- item["title_sensitive"] = 1
- # save article
- await self.save_article_v2(item)
- case _:
- raise Exception("Unknown media type")
|