# -*- coding: utf-8 -*- import datetime from odps.udf import annotate FMT_STR_MAP = { "yyyyMMdd": "%Y%m%d", "yyyy-MM-dd": "%Y-%m-%d", "yyyy年MM月dd日": "%Y年%m月%d日" } DEFAULT_OUTPUT_MODE = "default" CN_OUTPUT_MODE = "cn" class LunarDate(object): def __init__(self, year, month, day, is_leap=False): self.year = year self.month = month self.day = day self.is_leap = is_leap class InputDateParser(object): """第一部分: 解析输入日期。""" def __init__(self, fmt_map=None): self._fmt_map = fmt_map or FMT_STR_MAP def parse(self, date_str, fmt_str): py_fmt_str = self._fmt_map.get(fmt_str) if py_fmt_str is None: raise ValueError("unsupported input format {fmt_str}".format(fmt_str=fmt_str)) dt = datetime.datetime.strptime(date_str, py_fmt_str) return dt.year, dt.month, dt.day class LunarFormatterBase(object): """第三部分: 根据 fmt_str 和 output 输出最终结果。""" LUNAR_MONTH_NAME = [ "", "正月", "二月", "三月", "四月", "五月", "六月", "七月", "八月", "九月", "十月", "冬月", "腊月" ] LUNAR_DAY_NAME = [ "", "初一", "初二", "初三", "初四", "初五", "初六", "初七", "初八", "初九", "初十", "十一", "十二", "十三", "十四", "十五", "十六", "十七", "十八", "十九", "二十", "廿一", "廿二", "廿三", "廿四", "廿五", "廿六", "廿七", "廿八", "廿九", "三十" ] DIGIT_MAP = { '0': '〇', '1': '一', '2': '二', '3': '三', '4': '四', '5': '五', '6': '六', '7': '七', '8': '八', '9': '九' } @staticmethod def _apply_leap_prefix(value, is_leap): return "闰" + str(value) if is_leap else str(value) def _year_to_cnt(self, year): return ''.join(self.DIGIT_MAP[ch] for ch in str(year)) def _to_cn_number(self, value): return self._year_to_cnt(value) @staticmethod def _to_arabic_number(value, width=None): # width 为空时只做普通字符串转换;需要固定宽度时由 formatter 传入。 value_str = str(value) if width is not None: return value_str.zfill(width) return value_str def format(self, lunar_date): raise NotImplementedError class YyyyMMddDefaultFormatter(LunarFormatterBase): def format(self, lunar_date): ly = self._to_arabic_number(lunar_date.year) lm = self._apply_leap_prefix(self._to_arabic_number(lunar_date.month, 2), lunar_date.is_leap) ld = self._to_arabic_number(lunar_date.day, 2) return "{ly}{lm}{ld}".format(ly=ly, lm=lm, ld=ld) class YyyyMMddCnFormatter(LunarFormatterBase): def format(self, lunar_date): ly = self._to_cn_number(lunar_date.year) lm = self._apply_leap_prefix(self._to_cn_number(lunar_date.month), lunar_date.is_leap) ld = self._to_cn_number(lunar_date.day) return "{ly}{lm}{ld}".format(ly=ly, lm=lm, ld=ld) class YyyyDashMmDashDdDefaultFormatter(LunarFormatterBase): def format(self, lunar_date): ly = self._to_arabic_number(lunar_date.year) lm = self._apply_leap_prefix(self._to_arabic_number(lunar_date.month, 2), lunar_date.is_leap) ld = self._to_arabic_number(lunar_date.day, 2) return "{ly}-{lm}-{ld}".format(ly=ly, lm=lm, ld=ld) class YyyyDashMmDashDdCnFormatter(LunarFormatterBase): def format(self, lunar_date): ly = self._to_cn_number(lunar_date.year) lm = self._apply_leap_prefix(self._to_cn_number(lunar_date.month), lunar_date.is_leap) ld = self._to_cn_number(lunar_date.day) return "{ly}-{lm}-{ld}".format(ly=ly, lm=lm, ld=ld) class YyyyCnFormatter(LunarFormatterBase): def format(self, lunar_date): ly = self._year_to_cnt(lunar_date.year) lm = self.LUNAR_MONTH_NAME[lunar_date.month] ld = self.LUNAR_DAY_NAME[lunar_date.day] lm = self._apply_leap_prefix(lm, lunar_date.is_leap) return "{ly}年{lm}{ld}日".format(ly=ly, lm=lm, ld=ld) class LunarFormatterRegistry(object): """按 fmt_str 选择输出格式化策略。""" _registry = { "yyyyMMdd": { DEFAULT_OUTPUT_MODE: YyyyMMddDefaultFormatter(), CN_OUTPUT_MODE: YyyyMMddCnFormatter() }, "yyyy-MM-dd": { DEFAULT_OUTPUT_MODE: YyyyDashMmDashDdDefaultFormatter(), CN_OUTPUT_MODE: YyyyDashMmDashDdCnFormatter() }, "yyyy年MM月dd日": { DEFAULT_OUTPUT_MODE: YyyyCnFormatter() } } SUPPORTED_FMT_STRS = frozenset(_registry.keys()) @classmethod def get(cls, fmt_str, output_mode): if fmt_str not in cls.SUPPORTED_FMT_STRS: return None mode_registry = cls._registry.get(fmt_str, {}) return mode_registry.get(output_mode) class SolarToLunarConverter(object): # 农历数据(1900-2100) LUNAR_INFO = [ 0x04bd8, 0x04ae0, 0x0a570, 0x054d5, 0x0d260, 0x0d950, 0x16554, 0x056a0, 0x09ad0, 0x055d2, 0x04ae0, 0x0a5b6, 0x0a4d0, 0x0d250, 0x1d255, 0x0b540, 0x0d6a0, 0x0ada2, 0x095b0, 0x14977, 0x04970, 0x0a4b0, 0x0b4b5, 0x06a50, 0x06d40, 0x1ab54, 0x02b60, 0x09570, 0x052f2, 0x04970, 0x06566, 0x0d4a0, 0x0ea50, 0x06e95, 0x05ad0, 0x02b60, 0x186e3, 0x092e0, 0x1c8d7, 0x0c950, 0x0d4a0, 0x1d8a6, 0x0b550, 0x056a0, 0x1a5b4, 0x025d0, 0x092d0, 0x0d2b2, 0x0a950, 0x0b557, 0x06ca0, 0x0b550, 0x15355, 0x04da0, 0x0a5b0, 0x14573, 0x052b0, 0x0a9a8, 0x0e950, 0x06aa0, 0x0aea6, 0x0ab50, 0x04b60, 0x0aae4, 0x0a570, 0x05260, 0x0f263, 0x0d950, 0x05b57, 0x056a0, 0x096d0, 0x04dd5, 0x04ad0, 0x0a4d0, 0x0d4d4, 0x0d250, 0x0d558, 0x0b540, 0x0b5a0, 0x195a6, 0x095b0, 0x049b0, 0x0a974, 0x0a4b0, 0x0b27a, 0x06a50, 0x06d40, 0x0af46, 0x0ab60, 0x09570, 0x04af5, 0x04970, 0x064b0, 0x074a3, 0x0ea50, 0x06b58, 0x05ac0, 0x0ab60, 0x096d5, 0x092e0, 0x0c960, 0x0d954, 0x0d4a0, 0x0da50, 0x07552, 0x056a0, 0x0abb7, 0x025d0, 0x092d0, 0x0cab5, 0x0a950, 0x0b4a0, 0x0baa4, 0x0ad50, 0x055d9, 0x04ba0, 0x0a5b0, 0x15176, 0x052b0, 0x0a930, 0x07954, 0x06aa0, 0x0ad50, 0x05b52, 0x04b60, 0x0a6e6, 0x0a4e0, 0x0d260, 0x0ea65, 0x0d530, 0x05aa0, 0x076a3, 0x096d0, 0x04bd7, 0x04ad0, 0x0a4d0, 0x1d0b6, 0x0d250, 0x0d520, 0x0dd45, 0x0b5a0, 0x056d0, 0x055b2, 0x049b0, 0x0a577, 0x0a4b0, 0x0aa50, 0x1b255, 0x06d20, 0x0ada0 ] def __init__(self): self._input_parser = InputDateParser() @staticmethod def _leap_month(year): return SolarToLunarConverter.LUNAR_INFO[year - 1900] & 0xf @staticmethod def _leap_days(year): if SolarToLunarConverter._leap_month(year): return 30 if (SolarToLunarConverter.LUNAR_INFO[year - 1900] & 0x10000) else 29 return 0 @staticmethod def _month_days(year, month): return 30 if (SolarToLunarConverter.LUNAR_INFO[year - 1900] & (0x10000 >> month)) else 29 @staticmethod def _year_days(year): sum_days = 348 i = 0x8000 while i > 0x8: if SolarToLunarConverter.LUNAR_INFO[year - 1900] & i: sum_days += 1 i >>= 1 return sum_days + SolarToLunarConverter._leap_days(year) @staticmethod def _days_since_base_date(year, month, day): base = datetime.date(1900, 1, 31) obj = datetime.date(year, month, day) return (obj - base).days def _find_lunar_year_and_offset(self, offset): """先把公历偏移量落到具体的农历年份里。""" lunar_year = 1900 while lunar_year < 2100: year_days = self._year_days(lunar_year) if offset < year_days: return lunar_year, offset offset -= year_days lunar_year += 1 raise ValueError("date is out of supported lunar range") def _iter_lunar_month_schedule(self, year): """按顺序列出该农历年的月份,闰月会在对应普通月后额外插入一次。""" leap = self._leap_month(year) for lunar_month in range(1, 13): yield lunar_month, False, self._month_days(year, lunar_month) if leap == lunar_month: yield lunar_month, True, self._leap_days(year) def _find_lunar_month_day(self, year, offset): """在指定农历年内,根据剩余偏移量定位到月和日。""" for lunar_month, is_leap, month_days in self._iter_lunar_month_schedule(year): if offset < month_days: return lunar_month, offset + 1, is_leap offset -= month_days raise ValueError("date is out of supported lunar range") def solar_to_lunar_core(self, year, month, day): """把公历日期转换成农历日期。""" offset = self._days_since_base_date(year, month, day) if offset < 0: raise ValueError("date is out of supported lunar range") lunar_year, lunar_offset = self._find_lunar_year_and_offset(offset) lunar_month, lunar_day, is_leap = self._find_lunar_month_day(lunar_year, lunar_offset) return lunar_year, lunar_month, lunar_day, is_leap def convert(self, date_str, fmt_str="yyyy-MM-dd", output=DEFAULT_OUTPUT_MODE): if not date_str: return None output = DEFAULT_OUTPUT_MODE if output in (None, "") else output y, m, d = self._input_parser.parse(date_str, fmt_str) lunar_year, lunar_month, lunar_day, is_leap = self.solar_to_lunar_core(y, m, d) lunar_date = LunarDate(lunar_year, lunar_month, lunar_day, is_leap) formatter = LunarFormatterRegistry.get(fmt_str, output) if formatter is None: raise ValueError( "unsupported output mode {output} for format {fmt_str}".format( output=output, fmt_str=fmt_str ) ) return formatter.format(lunar_date) @annotate("*->string") class solar_to_lunar(object): def __init__(self): self._converter = SolarToLunarConverter() def evaluate(self, date_str, fmt_str="yyyy-MM-dd", output=DEFAULT_OUTPUT_MODE): try: return self._converter.convert(date_str, fmt_str, output) except ValueError as e: return "ERROR: {type}: {e}".format(type=type(e).__name__, e=str(e)) except Exception as e: return "ERROR: {type}: {e}".format(type=type(e).__name__, e=str(e))