123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146 |
- #! /usr/bin/env python
- # -*- coding: utf-8 -*-
- # vim:fenc=utf-8
- from enum import Enum, auto
- from typing import Optional
- import rocketmq
- from pydantic import BaseModel
- # class MessageType(Enum):
- # DEFAULT = (-1, "未分类的消息")
- # TEXT = (1, "文本")
- # VOICE = (2, "语音")
- # GIF = (3, "GIF")
- # IMAGE_GW = (4, "个微图片")
- # IMAGE_QW = (5, "企微图片")
- # MINI_PROGRAM = (6, "小程序")
- # LINK = (7, "链接")
- # SHI_PIN_HAO = (8, "视频号")
- # NAME_CARD = (9, "名片")
- # POSITION = (10, "位置")
- # RED_PACKET = (11, "红包")
- # FILE_GW = (12, "个微文件")
- # FILE_QW = (13, "企微文件")
- # VIDEO_GW = (14, "个微视频")
- # VIDEO_QW = (15, "企微视频")
- # AGGREGATION_MSG = (16, "聚合消息")
- #
- # ACTIVE_TRIGGER = (101, "主动触发器")
- # AGGREGATION_TRIGGER = (102, "消息聚合触发器")
- #
- # def __init__(self, code, description):
- # self.code = code
- # self.description = description
- #
- # def __repr__(self):
- # return f"{self.__class__.__name__}.{self.name}"
- class MessageType(int, Enum):
- DEFAULT = -1
- TEXT = 1
- VOICE = 2
- GIF = 3
- IMAGE_GW = 4
- IMAGE_QW = 5
- MINI_PROGRAM = 6
- LINK = 7
- SHI_PIN_HAO = 8
- NAME_CARD = 9
- POSITION = 10
- RED_PACKET = 11
- FILE_GW = 12
- FILE_QW = 13
- VIDEO_GW = 14
- VIDEO_QW = 15
- AGGREGATION_MSG = 16
- ACTIVE_TRIGGER = 101
- AGGREGATION_TRIGGER = 102
- def __init__(self, code):
- self.description = {
- -1: "未分类的消息",
- 1: "文本",
- 2: "语音",
- 3: "GIF",
- 4: "个微图片",
- 5: "企微图片",
- 6: "小程序",
- 7: "链接",
- 8: "视频号",
- 9: "名片",
- 10: "位置",
- 11: "红包",
- 12: "个微文件",
- 13: "企微文件",
- 14: "个微视频",
- 15: "企微视频",
- 16: "聚合消息",
- 101: "主动触发器",
- 102: "消息聚合触发器"
- }[code]
- # class MessageChannel(Enum):
- # CORP_WECHAT = (1, "企业微信")
- # MINI_PROGRAM = (2, "小程序")
- #
- # SYSTEM = (101, "系统内部")
- #
- # def __init__(self, code, description):
- # self.code = code
- # self.description = description
- #
- # def __repr__(self):
- # return f"{self.__class__.__name__}.{self.name}"
- class MessageChannel(int, Enum):
- CORP_WECHAT = 1
- MINI_PROGRAM = 2
- SYSTEM = 101
- def __init__(self, code):
- self.description = {
- 1: "企业微信",
- 2: "小程序",
- 101: "系统内部"
- }[code]
- class Message(BaseModel):
- msgId: Optional[int] = None
- type: MessageType
- channel: MessageChannel
- sender: Optional[str] = None
- senderUnionId: Optional[str] = None
- receiver: str
- content: Optional[str] = None
- # 由于需要和其它语言如Java进行序列化和反序列化交互,因此使用camelCase命名法
- sendTime: int
- refMsgId: Optional[int] = None
- # 原始的RocketMQ消息体,用于ack
- _rmq_message: Optional[rocketmq.Message] = None
- @staticmethod
- def build(type, channel, sender, receiver, content, timestamp):
- return Message(
- msgId=0,
- type=type,
- channel=channel,
- sender=sender,
- receiver=receiver,
- content=content,
- sendTime=timestamp
- )
- def to_json(self):
- return self.model_dump_json(include={
- "msgId", "type", "channel", "sender", "senderUnionId",
- "receiver", "content", "sendTime", "refMsgId"
- })
- @staticmethod
- def from_json(json_str):
- return Message.model_validate_json(json_str)
|