| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285 |
- # -*- coding: utf-8 -*-
- import datetime
- from odps.udf import annotate
- FMT_STR_MAP = {
- "yyyyMMdd": "%Y%m%d",
- "yyyy-MM-dd": "%Y-%m-%d",
- "yyyy年MM月dd日": "%Y年%m月%d日"
- }
- DEFAULT_OUTPUT_MODE = "default"
- CN_OUTPUT_MODE = "cn"
- class LunarDate(object):
- def __init__(self, year, month, day, is_leap=False):
- self.year = year
- self.month = month
- self.day = day
- self.is_leap = is_leap
- class InputDateParser(object):
- """第一部分: 解析输入日期。"""
- def __init__(self, fmt_map=None):
- self._fmt_map = fmt_map or FMT_STR_MAP
- def parse(self, date_str, fmt_str):
- py_fmt_str = self._fmt_map.get(fmt_str)
- if py_fmt_str is None:
- raise ValueError("unsupported input format {fmt_str}".format(fmt_str=fmt_str))
- dt = datetime.datetime.strptime(date_str, py_fmt_str)
- return dt.year, dt.month, dt.day
- class LunarFormatterBase(object):
- """第三部分: 根据 fmt_str 和 output 输出最终结果。"""
- LUNAR_MONTH_NAME = [
- "", "正月", "二月", "三月", "四月", "五月", "六月",
- "七月", "八月", "九月", "十月", "冬月", "腊月"
- ]
- LUNAR_DAY_NAME = [
- "", "初一", "初二", "初三", "初四", "初五", "初六", "初七", "初八", "初九", "初十",
- "十一", "十二", "十三", "十四", "十五", "十六", "十七", "十八", "十九", "二十",
- "廿一", "廿二", "廿三", "廿四", "廿五", "廿六", "廿七", "廿八", "廿九", "三十"
- ]
- DIGIT_MAP = {
- '0': '〇',
- '1': '一',
- '2': '二',
- '3': '三',
- '4': '四',
- '5': '五',
- '6': '六',
- '7': '七',
- '8': '八',
- '9': '九'
- }
- @staticmethod
- def _apply_leap_prefix(value, is_leap):
- return "闰" + str(value) if is_leap else str(value)
- def _year_to_cnt(self, year):
- return ''.join(self.DIGIT_MAP[ch] for ch in str(year))
- def _to_cn_number(self, value):
- return self._year_to_cnt(value)
- @staticmethod
- def _to_arabic_number(value, width=None):
- # width 为空时只做普通字符串转换;需要固定宽度时由 formatter 传入。
- value_str = str(value)
- if width is not None:
- return value_str.zfill(width)
- return value_str
- def format(self, lunar_date):
- raise NotImplementedError
- class YyyyMMddDefaultFormatter(LunarFormatterBase):
- def format(self, lunar_date):
- ly = self._to_arabic_number(lunar_date.year)
- lm = self._apply_leap_prefix(self._to_arabic_number(lunar_date.month, 2), lunar_date.is_leap)
- ld = self._to_arabic_number(lunar_date.day, 2)
- return "{ly}{lm}{ld}".format(ly=ly, lm=lm, ld=ld)
- class YyyyMMddCnFormatter(LunarFormatterBase):
- def format(self, lunar_date):
- ly = self._to_cn_number(lunar_date.year)
- lm = self._apply_leap_prefix(self._to_cn_number(lunar_date.month), lunar_date.is_leap)
- ld = self._to_cn_number(lunar_date.day)
- return "{ly}{lm}{ld}".format(ly=ly, lm=lm, ld=ld)
- class YyyyDashMmDashDdDefaultFormatter(LunarFormatterBase):
- def format(self, lunar_date):
- ly = self._to_arabic_number(lunar_date.year)
- lm = self._apply_leap_prefix(self._to_arabic_number(lunar_date.month, 2), lunar_date.is_leap)
- ld = self._to_arabic_number(lunar_date.day, 2)
- return "{ly}-{lm}-{ld}".format(ly=ly, lm=lm, ld=ld)
- class YyyyDashMmDashDdCnFormatter(LunarFormatterBase):
- def format(self, lunar_date):
- ly = self._to_cn_number(lunar_date.year)
- lm = self._apply_leap_prefix(self._to_cn_number(lunar_date.month), lunar_date.is_leap)
- ld = self._to_cn_number(lunar_date.day)
- return "{ly}-{lm}-{ld}".format(ly=ly, lm=lm, ld=ld)
- class YyyyCnFormatter(LunarFormatterBase):
- def format(self, lunar_date):
- ly = self._year_to_cnt(lunar_date.year)
- lm = self.LUNAR_MONTH_NAME[lunar_date.month]
- ld = self.LUNAR_DAY_NAME[lunar_date.day]
- lm = self._apply_leap_prefix(lm, lunar_date.is_leap)
- return "{ly}年{lm}{ld}日".format(ly=ly, lm=lm, ld=ld)
- class LunarFormatterRegistry(object):
- """按 fmt_str 选择输出格式化策略。"""
- _registry = {
- "yyyyMMdd": {
- DEFAULT_OUTPUT_MODE: YyyyMMddDefaultFormatter(),
- CN_OUTPUT_MODE: YyyyMMddCnFormatter()
- },
- "yyyy-MM-dd": {
- DEFAULT_OUTPUT_MODE: YyyyDashMmDashDdDefaultFormatter(),
- CN_OUTPUT_MODE: YyyyDashMmDashDdCnFormatter()
- },
- "yyyy年MM月dd日": {
- DEFAULT_OUTPUT_MODE: YyyyCnFormatter()
- }
- }
- SUPPORTED_FMT_STRS = frozenset(_registry.keys())
- @classmethod
- def get(cls, fmt_str, output_mode):
- if fmt_str not in cls.SUPPORTED_FMT_STRS:
- return None
- mode_registry = cls._registry.get(fmt_str, {})
- return mode_registry.get(output_mode)
- class SolarToLunarConverter(object):
- # 农历数据(1900-2100)
- LUNAR_INFO = [
- 0x04bd8, 0x04ae0, 0x0a570, 0x054d5, 0x0d260, 0x0d950, 0x16554,
- 0x056a0, 0x09ad0, 0x055d2, 0x04ae0, 0x0a5b6, 0x0a4d0, 0x0d250,
- 0x1d255, 0x0b540, 0x0d6a0, 0x0ada2, 0x095b0, 0x14977, 0x04970,
- 0x0a4b0, 0x0b4b5, 0x06a50, 0x06d40, 0x1ab54, 0x02b60, 0x09570,
- 0x052f2, 0x04970, 0x06566, 0x0d4a0, 0x0ea50, 0x06e95, 0x05ad0,
- 0x02b60, 0x186e3, 0x092e0, 0x1c8d7, 0x0c950, 0x0d4a0, 0x1d8a6,
- 0x0b550, 0x056a0, 0x1a5b4, 0x025d0, 0x092d0, 0x0d2b2, 0x0a950,
- 0x0b557, 0x06ca0, 0x0b550, 0x15355, 0x04da0, 0x0a5b0, 0x14573,
- 0x052b0, 0x0a9a8, 0x0e950, 0x06aa0, 0x0aea6, 0x0ab50, 0x04b60,
- 0x0aae4, 0x0a570, 0x05260, 0x0f263, 0x0d950, 0x05b57, 0x056a0,
- 0x096d0, 0x04dd5, 0x04ad0, 0x0a4d0, 0x0d4d4, 0x0d250, 0x0d558,
- 0x0b540, 0x0b5a0, 0x195a6, 0x095b0, 0x049b0, 0x0a974, 0x0a4b0,
- 0x0b27a, 0x06a50, 0x06d40, 0x0af46, 0x0ab60, 0x09570, 0x04af5,
- 0x04970, 0x064b0, 0x074a3, 0x0ea50, 0x06b58, 0x05ac0, 0x0ab60,
- 0x096d5, 0x092e0, 0x0c960, 0x0d954, 0x0d4a0, 0x0da50, 0x07552,
- 0x056a0, 0x0abb7, 0x025d0, 0x092d0, 0x0cab5, 0x0a950, 0x0b4a0,
- 0x0baa4, 0x0ad50, 0x055d9, 0x04ba0, 0x0a5b0, 0x15176, 0x052b0,
- 0x0a930, 0x07954, 0x06aa0, 0x0ad50, 0x05b52, 0x04b60, 0x0a6e6,
- 0x0a4e0, 0x0d260, 0x0ea65, 0x0d530, 0x05aa0, 0x076a3, 0x096d0,
- 0x04bd7, 0x04ad0, 0x0a4d0, 0x1d0b6, 0x0d250, 0x0d520, 0x0dd45,
- 0x0b5a0, 0x056d0, 0x055b2, 0x049b0, 0x0a577, 0x0a4b0, 0x0aa50,
- 0x1b255, 0x06d20, 0x0ada0
- ]
- def __init__(self):
- self._input_parser = InputDateParser()
- @staticmethod
- def _leap_month(year):
- return SolarToLunarConverter.LUNAR_INFO[year - 1900] & 0xf
- @staticmethod
- def _leap_days(year):
- if SolarToLunarConverter._leap_month(year):
- return 30 if (SolarToLunarConverter.LUNAR_INFO[year - 1900] & 0x10000) else 29
- return 0
- @staticmethod
- def _month_days(year, month):
- return 30 if (SolarToLunarConverter.LUNAR_INFO[year - 1900] & (0x10000 >> month)) else 29
- @staticmethod
- def _year_days(year):
- sum_days = 348
- i = 0x8000
- while i > 0x8:
- if SolarToLunarConverter.LUNAR_INFO[year - 1900] & i:
- sum_days += 1
- i >>= 1
- return sum_days + SolarToLunarConverter._leap_days(year)
- @staticmethod
- def _days_since_base_date(year, month, day):
- base = datetime.date(1900, 1, 31)
- obj = datetime.date(year, month, day)
- return (obj - base).days
- def _find_lunar_year_and_offset(self, offset):
- """先把公历偏移量落到具体的农历年份里。"""
- lunar_year = 1900
- while lunar_year < 2100:
- year_days = self._year_days(lunar_year)
- if offset < year_days:
- return lunar_year, offset
- offset -= year_days
- lunar_year += 1
- raise ValueError("date is out of supported lunar range")
- def _iter_lunar_month_schedule(self, year):
- """按顺序列出该农历年的月份,闰月会在对应普通月后额外插入一次。"""
- leap = self._leap_month(year)
- for lunar_month in range(1, 13):
- yield lunar_month, False, self._month_days(year, lunar_month)
- if leap == lunar_month:
- yield lunar_month, True, self._leap_days(year)
- def _find_lunar_month_day(self, year, offset):
- """在指定农历年内,根据剩余偏移量定位到月和日。"""
- for lunar_month, is_leap, month_days in self._iter_lunar_month_schedule(year):
- if offset < month_days:
- return lunar_month, offset + 1, is_leap
- offset -= month_days
- raise ValueError("date is out of supported lunar range")
- def solar_to_lunar_core(self, year, month, day):
- """把公历日期转换成农历日期。"""
- offset = self._days_since_base_date(year, month, day)
- if offset < 0:
- raise ValueError("date is out of supported lunar range")
- lunar_year, lunar_offset = self._find_lunar_year_and_offset(offset)
- lunar_month, lunar_day, is_leap = self._find_lunar_month_day(lunar_year, lunar_offset)
- return lunar_year, lunar_month, lunar_day, is_leap
- def convert(self, date_str, fmt_str="yyyy-MM-dd", output=DEFAULT_OUTPUT_MODE):
- if not date_str:
- return None
- output = DEFAULT_OUTPUT_MODE if output in (None, "") else output
- y, m, d = self._input_parser.parse(date_str, fmt_str)
- lunar_year, lunar_month, lunar_day, is_leap = self.solar_to_lunar_core(y, m, d)
- lunar_date = LunarDate(lunar_year, lunar_month, lunar_day, is_leap)
- formatter = LunarFormatterRegistry.get(fmt_str, output)
- if formatter is None:
- raise ValueError(
- "unsupported output mode {output} for format {fmt_str}".format(
- output=output,
- fmt_str=fmt_str
- )
- )
- return formatter.format(lunar_date)
- @annotate("*->string")
- class solar_to_lunar(object):
- def __init__(self):
- self._converter = SolarToLunarConverter()
- def evaluate(self, date_str, fmt_str="yyyy-MM-dd", output=DEFAULT_OUTPUT_MODE):
- try:
- return self._converter.convert(date_str, fmt_str, output)
- except ValueError as e:
- return "ERROR: {type}: {e}".format(type=type(e).__name__, e=str(e))
- except Exception as e:
- return "ERROR: {type}: {e}".format(type=type(e).__name__, e=str(e))
|