import json from typing import List, Dict from applications.utils import show_desc_to_sta, str_to_md5 async def insert_article_into_recycle_pool( pool, log_client, msg_list: List[Dict], account_info: Dict ): """insert article into recycle pool""" table_name = "official_articles_v2" for info in msg_list: base_info = info.get("BaseInfo", {}) app_msg_id = info.get("AppMsg", {}).get("BaseInfo", {}).get("AppMsgId", None) create_timestamp = ( info.get("AppMsg", {}).get("BaseInfo", {}).get("CreateTime", None) ) update_timestamp = ( info.get("AppMsg", {}).get("BaseInfo", {}).get("UpdateTime", None) ) publish_type = info.get("AppMsg", {}).get("BaseInfo", {}).get("Type", None) detail_article_list = info.get("AppMsg", {}).get("DetailInfo", []) if detail_article_list: for article in detail_article_list: title = article.get("Title", None) digest = article.get("Digest", None) item_index = article.get("ItemIndex", None) content_url = article.get("ContentUrl", None) source_url = article.get("SourceUrl", None) cover_img_url = article.get("CoverImgUrl", None) cover_img_url_1_1 = article.get("CoverImgUrl_1_1", None) cover_img_url_235_1 = article.get("CoverImgUrl_235_1", None) item_show_type = article.get("ItemShowType", None) is_original = article.get("IsOriginal", None) show_desc = article.get("ShowDesc", None) show_stat = show_desc_to_sta(show_desc) ori_content = article.get("ori_content", None) show_view_count = show_stat.get("show_view_count", 0) show_like_count = show_stat.get("show_like_count", 0) show_zs_count = show_stat.get("show_zs_count", 0) show_pay_count = show_stat.get("show_pay_count", 0) wx_sn = ( content_url.split("&sn=")[1].split("&")[0] if content_url else None ) status = account_info["using_status"] info_tuple = ( account_info["gh_id"], account_info["name"], app_msg_id, title, publish_type, create_timestamp, update_timestamp, digest, item_index, content_url, source_url, cover_img_url, cover_img_url_1_1, cover_img_url_235_1, item_show_type, is_original, show_desc, ori_content, show_view_count, show_like_count, show_zs_count, show_pay_count, wx_sn, json.dumps(base_info, ensure_ascii=False), str_to_md5(title), status, ) try: insert_query = f""" insert into {table_name} (ghId, accountName, appMsgId, title, Type, createTime, updateTime, Digest, ItemIndex, ContentUrl, SourceUrl, CoverImgUrl, CoverImgUrl_1_1, CoverImgUrl_255_1, ItemShowType, IsOriginal, ShowDesc, ori_content, show_view_count, show_like_count, show_zs_count, show_pay_count, wx_sn, baseInfo, title_md5, status) values (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s); """ await pool.async_save( query=insert_query, params=info_tuple, db_name="piaoquan_crawler", ) await log_client.log( contents={ "function": "insert_article_into_recycle_pool", "status": "success", "data": info_tuple, } ) print("insert_article_into_recycle_pool success") except Exception as e: try: update_sql = f"""update {table_name} set show_view_count = %s, show_like_count=%s where wx_sn = %s;""" await pool.async_save( query=update_sql, params=(show_view_count, show_like_count, wx_sn), db_name="piaoquan_crawler", ) print("update_article_into_recycle_pool success") except Exception as e: await log_client.log( contents={ "function": "insert_article_into_recycle_pool", "status": "fail", "message": "更新文章失败", "data": { "error": str(e), "content_link": content_url, "account_name": account_info["name"], }, } ) continue else: await log_client.log( contents={ "function": "insert_article_into_recycle_pool", "status": "fail", "message": "account has no articles", "data": {"account_name": account_info["name"]}, } )