# coding:utf-8 from odps.udf import annotate from odps.distcache import get_cache_file @annotate("string,bigint->string") class fenci_str2str(object): def __init__(self): import sys sys.path.insert(0, './work/jieba-0.42.1-py3-none-any.zip') cache_file = get_cache_file('top_tags.txt') kv = {} for index, line in enumerate(cache_file): if index == 10000: break line = line.strip() if not line: continue k, _ = line.split('\t') kv[k] = index cache_file.close() self.kv = kv def evaluate(self, title, top_size): import jieba.analyse if title is None or len(title) == 0: return "" keys = jieba.analyse.extract_tags(title, topK=top_size, withWeight=False, allowPOS=('n', 'v')) if keys is None or len(keys) == 0: return "" keys_filter = [] for k in keys: if k in self.kv.keys(): keys_filter.append(k) if len(keys_filter) == 0: return "" return ",".join(keys_filter) @annotate("string,bigint,bigint->string") class get_top_tags(object): def evaluate(self, tag_ts, ts_limit, top): k_v = [kv.split(":") for kv in tag_ts.split(",")] result = dict() for tag, ts in k_v: ts = int(ts) if ts >= 0 and ts < ts_limit: tmp = result[tag] if tag in result else [0, -1] result[tag] = [tmp[0] + 1, max(tmp[1], ts)] if len(result) == 0: return "" sorted_result = sorted(result.items(), key=lambda x: (-x[1][0], -x[1][1]), reverse=False) top_keys = [item[0] for item in sorted_result[:top]] return ",".join(top_keys) @annotate("string,bigint,bigint->string") class get_top_tags_v2(object): def evaluate(self, tag_ts, ts_limit, top): k_v = [kv.split(":") for kv in tag_ts.split(",")] result = dict() for tag, ts, uv in k_v: ts = int(ts) uv = int(uv) if ts >= 0 and ts < ts_limit: tmp = result[tag] if tag in result else [0, -1] result[tag] = [tmp[0] + uv, max(tmp[1], ts)] if len(result) == 0: return "" sorted_result = sorted(result.items(), key=lambda x: (-x[1][0], -x[1][1]), reverse=False) top_keys = [item[0] for item in sorted_result[:top]] return ",".join(top_keys) @annotate("string,bigint,bigint->string") class mid_ts_dedup(object): def evaluate(self, mid_ts, ts_exp, time_diff): l = [(s.split(":")[0], s.split(":")[1]) for s in mid_ts.split(",")] m = {} for id, ts in l: ts = int(ts) if ts - ts_exp > time_diff or ts - ts_exp < 0: continue if id not in m or m[id] > ts: m[id] = ts ll = [] for k, v in m.items(): ll.append(k + ":" + str(v)) if len(ll) == 0: return "" res = ",".join(ll) return res @annotate("string->string") class get_cate1(object): def __init__(self): self.cate_list = [ "音乐", "剧情", "舞蹈", "动物", "三农", "科技", "财经", "母婴", "法律", "科普", "情感", "文化", "搞笑", "名人", "体育", "医疗健康", "时政", "奇人异象", "历史", "军事", "艺术", "美食", "旅行", "地域本地", "生活记录", "生活家居", "二次元", "游戏", "公益", "随拍", "职场", "教育", "摄影摄像", "时尚", "综艺", "电影", "电视剧", "汽车", "宗教", "短剧", "收藏品"] def evaluate(self, cate_str): if cate_str is None or cate_str == "" or cate_str == "无": return "unknown" result = [] for cate in self.cate_list: if cate in cate_str: result.append(cate) if len(result) == 0: result.append("unknown_" + cate_str) res = ",".join(result) return res @annotate("string->string") class get_cate2_all(object): def __init__(self): self.cate_list = [ "祝福音乐", "中国战争史", "中国历史影像", "知识科普", "正能量剧情", "杂技柔术", "早中晚好", "益智解密", "饮食健康", "戏曲戏剧", "未来科幻", "天气变化", "他国政策", "贪污腐败", "书法", "食品安全", "社会风气", "生活小妙招", "生活技巧科普", "省份城市亮点", "人生忠告", "人财诈骗", "亲子日常", "亲情音乐", "木工", "魔术特效", "迷信祝福", "民族异域音乐", "民生政策", "名画赏析", "美食教程", "麻将", "旅行攻略", "历史名人", "老综艺影像", "老年相关法律科普", "老年时尚", "老年审美美女", "老年生活", "老明星", "惊奇事件", "节日祝福", "健身操", "健康知识", "惠民新闻", "绘画", "怀念时光", "红歌老歌", "罕见画面", "国际文化", "国家统一", "国家力量", "国家科技力量", "搞笑段子", "风景实拍", "对口型表演", "动物萌宠", "动物表演", "大型集体艺术", "当代正能量人物", "传统文化", "吃播探店", "长寿知识", "本地生活", "K12教育", "(老)电影切片" ] def evaluate(self, cate_str): result = [] for cate in self.cate_list: if cate in cate_str: result.append(cate) if len(result) == 0: result.append("unknown") res = ",".join(result) return res @annotate("string->string") class get_cate2_only(object): def __init__(self): self.cate_list = [ "祝福音乐", "中国战争史", "中国历史影像", "知识科普", "正能量剧情", "杂技柔术", "早中晚好", "益智解密", "饮食健康", "戏曲戏剧", "未来科幻", "天气变化", "他国政策", "贪污腐败", "书法", "食品安全", "社会风气", "生活小妙招", "生活技巧科普", "省份城市亮点", "人生忠告", "人财诈骗", "亲子日常", "亲情音乐", "木工", "魔术特效", "迷信祝福", "民族异域音乐", "民生政策", "名画赏析", "美食教程", "麻将", "旅行攻略", "历史名人", "老综艺影像", "老年相关法律科普", "老年时尚", "老年审美美女", "老年生活", "老明星", "惊奇事件", "节日祝福", "健身操", "健康知识", "惠民新闻", "绘画", "怀念时光", "红歌老歌", "罕见画面", "国际文化", "国家统一", "国家力量", "国家科技力量", "搞笑段子", "风景实拍", "对口型表演", "动物萌宠", "动物表演", "大型集体艺术", "当代正能量人物", "传统文化", "吃播探店", "长寿知识", "本地生活", "K12教育", "(老)电影切片" ] def evaluate(self, cate_str): result = "unknown" for cate in self.cate_list: if cate in cate_str: result = cate break if result == "unknown": result = cate_str.split("、")[0].replace("品类-", "").replace("品类", "").replace("分数", "").replace("一人", "") result = self.clean_text(result) if len(result) == 0: result = "unknown" return result def clean_text(self, input_text): """ 去除字符串中的标点符号,只保留汉字和英文字符。 参数: input_text (str): 输入的字符串。 返回: str: 处理后的字符串。 """ # 使用正则表达式匹配汉字和英文字符 import re cleaned_text = re.sub(r'[^\u4e00-\u9fa5a-zA-Z]', '', input_text) return cleaned_text @annotate("string->string") class clean_text(object): def __init__(self): import re self.re = re def evaluate(self, input_text): if input_text is None: return "" cleaned_text = self.re.sub(r"[^\u4e00-\u9fa5a-zA-Z0-9]", "", input_text) return "" if cleaned_text is None else cleaned_text @annotate("string->string") class deduplication4list(object): def evaluate(self, input_text): if input_text is None: return None result = list(set(input_text.split(","))) return ",".join(result)