package datahub import ( "encoding/json" "errors" "fmt" "github.com/golang/protobuf/proto" "github.com/aliyun/aliyun-datahub-sdk-go/datahub/pbmodel" "github.com/aliyun/aliyun-datahub-sdk-go/datahub/util" "reflect" ) // handel the http request type RequestModel interface { // serialize the requestModel and maybe need add some message on http header requestBodyEncode() ([]byte, error) } // empty request type EmptyRequest struct { } func (br *EmptyRequest) requestBodyEncode() ([]byte, error) { return nil, nil } type CreateProjectRequest struct { Comment string `json:"Comment"` } func (cpr *CreateProjectRequest) requestBodyEncode() ([]byte, error) { return json.Marshal(cpr) } type UpdateProjectRequest struct { Comment string `json:"Comment"` } func (upr *UpdateProjectRequest) requestBodyEncode() ([]byte, error) { return json.Marshal(upr) } type UpdateProjectVpcWhitelistRequest struct { VpcIds string `json:"VpcIds"` } func (upv *UpdateProjectVpcWhitelistRequest) requestBodyEncode() ([]byte, error) { return json.Marshal(upv) } type CreateTopicRequest struct { Action string `json:"Action"` ShardCount int `json:"ShardCount"` Lifecycle int `json:"Lifecycle"` RecordType RecordType `json:"RecordType"` RecordSchema *RecordSchema `json:"RecordSchema,omitempty"` Comment string `json:"Comment"` ExpandMode ExpandMode `json:"ExpandMode"` } func (ctr *CreateTopicRequest) MarshalJSON() ([]byte, error) { msg := &struct { Action string `json:"Action"` ShardCount int `json:"ShardCount"` Lifecycle int `json:"Lifecycle"` RecordType RecordType `json:"RecordType"` RecordSchema string `json:"RecordSchema,omitempty"` Comment string `json:"Comment"` ExpandMode ExpandMode `json:"ExpandMode"` }{ Action: ctr.Action, ShardCount: ctr.ShardCount, Lifecycle: ctr.Lifecycle, RecordType: ctr.RecordType, Comment: ctr.Comment, ExpandMode: ctr.ExpandMode, } switch ctr.RecordType { case TUPLE: msg.RecordSchema = ctr.RecordSchema.String() default: msg.RecordSchema = "" } return json.Marshal(msg) } func (ctr *CreateTopicRequest) requestBodyEncode() ([]byte, error) { return json.Marshal(ctr) } type UpdateTopicRequest struct { Comment string `json:"Comment,omitempty"` Lifecycle int `json:"Lifecycle,omitempty"` } func (utr *UpdateTopicRequest) requestBodyEncode() ([]byte, error) { return json.Marshal(utr) } type SplitShardRequest struct { Action string `json:"Action"` ShardId string `json:"ShardId"` SplitKey string `json:"SplitKey,omitempty"` } func (ssr *SplitShardRequest) requestBodyEncode() ([]byte, error) { return json.Marshal(ssr) } type MergeShardRequest struct { Action string `json:"Action"` ShardId string `json:"ShardId"` AdjacentShardId string `json:"AdjacentShardId"` } func (msr *MergeShardRequest) requestBodyEncode() ([]byte, error) { return json.Marshal(msr) } type ExtendShardRequest struct { Action string `json:"Action"` ExtendMode string `json:"ExtendMode"` ShardCount int `json:"ShardNumber"` } func (esr *ExtendShardRequest) requestBodyEncode() ([]byte, error) { return json.Marshal(esr) } type GetCursorRequest struct { Action string `json:"Action"` CursorType CursorType `json:"Type"` SystemTime int64 `json:"SystemTime"` Sequence int64 `json:"Sequence"` } func (gcr *GetCursorRequest) requestBodyEncode() ([]byte, error) { type ReqMsg struct { Action string `json:"Action"` Type CursorType `json:"Type"` } reqMsg := ReqMsg{ Action: gcr.Action, Type: gcr.CursorType, } switch gcr.CursorType { case OLDEST, LATEST: return json.Marshal(reqMsg) case SYSTEM_TIME: return json.Marshal(struct { ReqMsg SystemTime int64 `json:"SystemTime"` }{ ReqMsg: reqMsg, SystemTime: gcr.SystemTime, }) case SEQUENCE: return json.Marshal(struct { ReqMsg Sequence int64 `json:"Sequence"` }{ ReqMsg: reqMsg, Sequence: gcr.Sequence, }) default: return nil, errors.New(fmt.Sprintf("Cursor not support type %s", gcr.CursorType)) } } type PutRecordsRequest struct { Action string `json:"Action"` Records []IRecord `json:"Records"` } func (prr *PutRecordsRequest) requestBodyEncode() ([]byte, error) { return json.Marshal(prr) } func (ptr *PutRecordsRequest) MarshalJSON() ([]byte, error) { msg := &struct { Action string `json:"Action"` Records []RecordEntry `json:"Records"` }{ Action: ptr.Action, Records: make([]RecordEntry, len(ptr.Records)), } for idx, val := range ptr.Records { msg.Records[idx].Data = val.GetData() msg.Records[idx].BaseRecord = val.GetBaseRecord() } return json.Marshal(msg) } type GetRecordRequest struct { Action string `json:"Action"` Cursor string `json:"Cursor"` Limit int `json:"Limit"` } func (grr *GetRecordRequest) requestBodyEncode() ([]byte, error) { return json.Marshal(grr) } type AppendFieldRequest struct { Action string `json:"Action"` FieldName string `json:"FieldName"` FieldType FieldType `json:"FieldType"` } func (afr *AppendFieldRequest) requestBodyEncode() ([]byte, error) { return json.Marshal(afr) } type GetMeterInfoRequest struct { Action string `json:"Action"` } func (gmir *GetMeterInfoRequest) requestBodyEncode() ([]byte, error) { return json.Marshal(gmir) } type CreateConnectorRequest struct { Action string `json:"Action"` Type ConnectorType `json:"Type"` SinkStartTime int64 `json:"SinkStartTime"` ColumnFields []string `json:"ColumnFields"` ColumnNameMap map[string]string `json:"ColumnNameMap"` Config interface{} `json:"Config"` } func (ccr *CreateConnectorRequest) requestBodyEncode() ([]byte, error) { switch ccr.Type { case SinkOdps: return marshalCreateOdpsConnector(ccr) case SinkOss: return marshalCreateOssConnector(ccr) case SinkEs: return marshalCreateEsConnector(ccr) case SinkAds: return marshalCreateAdsConnector(ccr) case SinkMysql: return marshalCreateMysqlConnector(ccr) case SinkFc: return marshalCreateFcConnector(ccr) case SinkOts: return marshalCreateOtsConnector(ccr) case SinkDatahub: return marshalCreateDatahubConnector(ccr) case SinkHologres: return marshalCreateHologresConnector(ccr) default: return nil, errors.New(fmt.Sprintf("not support connector type config: %s", ccr.Type.String())) } } type UpdateConnectorRequest struct { Action string `json:"Action"` ColumnFields []string `json:"ColumnFields"` ColumnNameMap map[string]string `json:"ColumnNameMap"` Config interface{} `json:"Config"` } func (ucr *UpdateConnectorRequest) requestBodyEncode() ([]byte, error) { if ucr.Config == nil { return marshalUpdateConnector(ucr) } switch ucr.Config.(type) { case SinkOdpsConfig: return marshalUpdateOdpsConnector(ucr) case SinkOssConfig: return marshalUpdateOssConnector(ucr) case SinkEsConfig: return marshalUpdateEsConnector(ucr) case SinkAdsConfig: return marshalUpdateAdsConnector(ucr) case SinkMysqlConfig: return marshalUpdateMysqlConnector(ucr) case SinkFcConfig: return marshalUpdateFcConnector(ucr) case SinkOtsConfig: return marshalUpdateOtsConnector(ucr) case SinkDatahubConfig: return marshalUpdateDatahubConnector(ucr) case SinkHologresConfig: return marshalUpdateHologresConnector(ucr) default: return nil, errors.New(fmt.Sprintf("this connector type not support, %t", reflect.TypeOf(ucr.Config))) } } type ReloadConnectorRequest struct { Action string `json:"Action"` ShardId string `json:"ShardId,omitempty"` } func (rcr *ReloadConnectorRequest) requestBodyEncode() ([]byte, error) { return json.Marshal(rcr) } type UpdateConnectorStateRequest struct { Action string `json:"Action"` State ConnectorState `json:"State"` } func (ucsr *UpdateConnectorStateRequest) requestBodyEncode() ([]byte, error) { return json.Marshal(ucsr) } type UpdateConnectorOffsetRequest struct { Action string `json:"Action"` ShardId string `json:"ShardId"` Timestamp int64 `json:"CurrentTime"` Sequence int64 `json:"CurrentSequence"` } func (ucor *UpdateConnectorOffsetRequest) requestBodyEncode() ([]byte, error) { return json.Marshal(ucor) } type GetConnectorShardStatusRequest struct { Action string `json:"Action"` ShardId string `json:"ShardId,omitempty"` } func (gcss *GetConnectorShardStatusRequest) requestBodyEncode() ([]byte, error) { return json.Marshal(gcss) } type AppendConnectorFieldRequest struct { Action string `json:"Action"` FieldName string `json:"FieldName"` } func (acfr *AppendConnectorFieldRequest) requestBodyEncode() ([]byte, error) { return json.Marshal(acfr) } type CreateSubscriptionRequest struct { Action string `json:"Action"` Comment string `json:"Comment"` } func (csr *CreateSubscriptionRequest) requestBodyEncode() ([]byte, error) { return json.Marshal(csr) } type ListSubscriptionRequest struct { Action string `json:"Action"` PageIndex int `json:"PageIndex"` PageSize int `json:"PageSize"` } func (lsr *ListSubscriptionRequest) requestBodyEncode() ([]byte, error) { return json.Marshal(lsr) } type UpdateSubscriptionRequest struct { //Action string `json:"Action"` Comment string `json:"Comment"` } func (usr *UpdateSubscriptionRequest) requestBodyEncode() ([]byte, error) { return json.Marshal(usr) } type UpdateSubscriptionStateRequest struct { State SubscriptionState `json:"State"` } func (ussr *UpdateSubscriptionStateRequest) requestBodyEncode() ([]byte, error) { return json.Marshal(ussr) } type OpenSubscriptionSessionRequest struct { Action string `json:"Action"` ShardIds []string `json:"ShardIds"` } func (ossr *OpenSubscriptionSessionRequest) requestBodyEncode() ([]byte, error) { return json.Marshal(ossr) } type GetSubscriptionOffsetRequest struct { Action string `json:"Action"` ShardIds []string `json:"ShardIds"` } func (gsor *GetSubscriptionOffsetRequest) requestBodyEncode() ([]byte, error) { return json.Marshal(gsor) } type CommitSubscriptionOffsetRequest struct { Action string `json:"Action"` Offsets map[string]SubscriptionOffset `json:"Offsets"` } func (csor *CommitSubscriptionOffsetRequest) requestBodyEncode() ([]byte, error) { return json.Marshal(csor) } type ResetSubscriptionOffsetRequest struct { Action string `json:"Action"` Offsets map[string]SubscriptionOffset `json:"Offsets"` } func (rsor *ResetSubscriptionOffsetRequest) requestBodyEncode() ([]byte, error) { return json.Marshal(rsor) } type HeartbeatRequest struct { Action string `json:"Action"` ConsumerId string `json:"ConsumerId"` VersionId int64 `json:"VersionId"` HoldShardList []string `json:"HoldShardList,omitempty"` ReadEndShardList [] string `json:"ReadEndShardList,omitempty"` } func (hr *HeartbeatRequest) requestBodyEncode() ([]byte, error) { return json.Marshal(hr) } type JoinGroupRequest struct { Action String `json:"Action"` SessionTimeout int64 `json:"SessionTimeout"` } func (jgr *JoinGroupRequest) requestBodyEncode() ([]byte, error) { return json.Marshal(jgr) } type SyncGroupRequest struct { Action string `json:"Action"` ConsumerId string `json:"ConsumerId"` VersionId int64 `json:"VersionId"` ReleaseShardList []string `json:"ReleaseShardList,omitempty"` ReadEndShardList []string `json:"ReadEndShardList,omitempty"` } func (sgr *SyncGroupRequest) requestBodyEncode() ([]byte, error) { return json.Marshal(sgr) } type LeaveGroupRequest struct { Action string `json:"Action"` ConsumerId string `json:"ConsumerId"` VersionId int64 `json:"VersionId"` } func (lgr *LeaveGroupRequest) requestBodyEncode() ([]byte, error) { return json.Marshal(lgr) } type ListTopicSchemaRequest struct { Action string `json:"Action"` } func (lts *ListTopicSchemaRequest) requestBodyEncode() ([]byte, error) { return json.Marshal(lts) } type GetTopicSchemaRequest struct { Action string `json:"Action"` VersionId int `json:"VersionId"` RecordSchema *RecordSchema `json:"RecordSchema,omitempty"` } func (gts *GetTopicSchemaRequest) requestBodyEncode() ([]byte, error) { msg := &struct { Action string `json:"Action"` VersionId int `json:"VersionId"` RecordSchema string `json:"RecordSchema,omitempty"` }{ Action: gts.Action, VersionId: gts.VersionId, } if gts.RecordSchema != nil { msg.RecordSchema = gts.RecordSchema.String() } return json.Marshal(msg) } type RegisterTopicSchemaRequest struct { Action string `json:"Action"` RecordSchema *RecordSchema `json:"RecordSchema"` } func (rts *RegisterTopicSchemaRequest) requestBodyEncode() ([]byte, error) { msg := &struct { Action string `json:"Action"` RecordSchema string `json:"RecordSchema,omitempty"` }{ Action: rts.Action, } if rts.RecordSchema != nil { msg.RecordSchema = rts.RecordSchema.String() } return json.Marshal(msg) } type DeleteTopicSchemaRequest struct { Action string `json:"Action"` VersionId int `json:"VersionId"` } func (lgr *DeleteTopicSchemaRequest) requestBodyEncode() ([]byte, error) { return json.Marshal(lgr) } type PutPBRecordsRequest struct { Records []IRecord `json:"Records"` } func (pr *PutPBRecordsRequest) requestBodyEncode() ([]byte, error) { res := make([]*pbmodel.RecordEntry, len(pr.Records)) for idx, val := range pr.Records { bRecord := val.GetBaseRecord() data := val.GetData() fds := make([]*pbmodel.FieldData, 0) switch data.(type) { case []byte: fd := &pbmodel.FieldData{ Value: data.([]byte), } fds = append(fds, fd) default: v, ok := data.([]interface{}) if !ok { return nil, errors.New("data format is invalid") } for _, str := range v { fd := &pbmodel.FieldData{} if str == nil { fd.Value = nil } else { fd.Value = []byte(fmt.Sprintf("%s", str)) } fds = append(fds, fd) } } rd := &pbmodel.RecordData{ Data: fds, } recordEntry := &pbmodel.RecordEntry{ ShardId: proto.String(bRecord.ShardId), Data: rd, } if len(bRecord.Attributes) > 0 { sps := make([]*pbmodel.StringPair, len(bRecord.Attributes)) index := 0 for k, v := range bRecord.Attributes { strv := fmt.Sprintf("%v", v) sp := &pbmodel.StringPair{ Key: proto.String(k), Value: proto.String(strv), } sps[index] = sp index++ } ra := &pbmodel.RecordAttributes{ Attributes: sps, } recordEntry.Attributes = ra } res[idx] = recordEntry } prr := &pbmodel.PutRecordsRequest{ Records: res, } buf, err := proto.Marshal(prr) if err != nil { return nil, err } x := util.WrapMessage(buf) return x, nil } type GetPBRecordRequest struct { Cursor string `json:"Cursor"` Limit int `json:"Limit"` } func (gpr *GetPBRecordRequest) requestBodyEncode() ([]byte, error) { limit := int32(gpr.Limit) grr := &pbmodel.GetRecordsRequest{ Cursor: &gpr.Cursor, Limit: &limit, } buf, err := proto.Marshal(grr) if err != nil { return nil, err } wBuf := util.WrapMessage(buf) return wBuf, nil } type PutBatchRecordsRequest struct { serializer *batchSerializer Records []IRecord } func (pbr *PutBatchRecordsRequest) requestBodyEncode() ([]byte, error) { batchBuf, err := pbr.serializer.serialize(pbr.Records) if err != nil { return nil, err } entry := &pbmodel.BinaryRecordEntry{ Data: batchBuf, } protoReq := &pbmodel.PutBinaryRecordsRequest{ Records: []*pbmodel.BinaryRecordEntry{entry}, } buf, err := proto.Marshal(protoReq) if err != nil { return nil, err } return util.WrapMessage(buf), nil } type GetBatchRecordRequest struct { GetPBRecordRequest }