package datahub import ( "errors" "fmt" "github.com/aliyun/aliyun-datahub-sdk-go/datahub/util" "time" ) type DataHub struct { Client *RestClient // for batch client cType CompressorType schemaClient *schemaRegistryClient } // ListProjects list all projects func (datahub *DataHub) ListProject() (*ListProjectResult, error) { path := projectsPath reqPara := &RequestParameter{ Header: map[string]string{httpHeaderContentType: httpJsonContent}, } responseBody, commonResp, err := datahub.Client.Get(path, reqPara) if err != nil { return nil, err } return NewListProjectResult(responseBody, commonResp) } // ListProjects list projects with filter func (datahub *DataHub) ListProjectWithFilter(filter string) (*ListProjectResult, error) { path := projectsPath reqPara := &RequestParameter{ Header: map[string]string{httpHeaderContentType: httpJsonContent}, Query: map[string]string{httpFilterQuery: filter}, } responseBody, commonResp, err := datahub.Client.Get(path, reqPara) if err != nil { return nil, err } return NewListProjectResult(responseBody, commonResp) } // CreateProject create new project func (datahub *DataHub) CreateProject(projectName, comment string) (*CreateProjectResult, error) { if !util.CheckProjectName(projectName) { return nil, NewInvalidParameterErrorWithMessage(projectNameInvalid) } if !util.CheckComment(comment) { return nil, NewInvalidParameterErrorWithMessage(commentInvalid) } path := fmt.Sprintf(projectPath, projectName) reqPara := &RequestParameter{ Header: map[string]string{httpHeaderContentType: httpJsonContent}, } requestBody := &CreateProjectRequest{ Comment: comment, } _, commonResp, err := datahub.Client.Post(path, requestBody, reqPara) if err != nil { return nil, err } return NewCreateProjectResult(commonResp) } // UpdateProject update project func (datahub *DataHub) UpdateProject(projectName, comment string) (*UpdateProjectResult, error) { if !util.CheckProjectName(projectName) { return nil, NewInvalidParameterErrorWithMessage(projectNameInvalid) } if !util.CheckComment(comment) { return nil, NewInvalidParameterErrorWithMessage(commentInvalid) } path := fmt.Sprintf(projectPath, projectName) reqPara := &RequestParameter{ Header: map[string]string{httpHeaderContentType: httpJsonContent}, } requestBody := &UpdateProjectRequest{ Comment: comment, } _, commonResp, err := datahub.Client.Put(path, requestBody, reqPara) if err != nil { return nil, err } return NewUpdateProjectResult(commonResp) } // DeleteProject delete project func (datahub *DataHub) DeleteProject(projectName string) (*DeleteProjectResult, error) { if !util.CheckProjectName(projectName) { return nil, NewInvalidParameterErrorWithMessage(projectNameInvalid) } path := fmt.Sprintf(projectPath, projectName) reqPara := &RequestParameter{ Header: map[string]string{httpHeaderContentType: httpJsonContent}, } _, commonResp, err := datahub.Client.Delete(path, reqPara) if err != nil { return nil, err } return NewDeleteProjectResult(commonResp) } // GetProject get a project deatil named the given name func (datahub *DataHub) GetProject(projectName string) (*GetProjectResult, error) { if !util.CheckProjectName(projectName) { return nil, NewInvalidParameterErrorWithMessage(projectNameInvalid) } path := fmt.Sprintf(projectPath, projectName) reqPara := &RequestParameter{ Header: map[string]string{httpHeaderContentType: httpJsonContent}, } respBody, commonResp, err := datahub.Client.Get(path, reqPara) if err != nil { return nil, err } result, err := NewGetProjectResult(respBody, commonResp) if err != nil { return nil, err } result.ProjectName = projectName return result, nil } // Update project vpc white list. func (datahub *DataHub) UpdateProjectVpcWhitelist(projectName, vpcIds string) (*UpdateProjectVpcWhitelistResult, error) { if !util.CheckProjectName(projectName) { return nil, NewInvalidParameterErrorWithMessage(projectNameInvalid) } path := fmt.Sprintf(projectPath, projectName) reqPara := &RequestParameter{ Header: map[string]string{httpHeaderContentType: httpJsonContent}, } requestBody := &UpdateProjectVpcWhitelistRequest{ VpcIds: vpcIds, } _, commonResp, err := datahub.Client.Put(path, requestBody, reqPara) if err != nil { return nil, err } return NewUpdateProjectVpcWhitelistResult(commonResp) } func (datahub *DataHub) WaitAllShardsReady(projectName, topicName string) bool { return datahub.WaitAllShardsReadyWithTime(projectName, topicName, minWaitingTimeInMs/1000) } func (datahub *DataHub) WaitAllShardsReadyWithTime(projectName, topicName string, timeout int64) bool { ready := make(chan bool) if timeout > 0 { go func(timeout int64) { time.Sleep(time.Duration(timeout) * time.Second) ready <- false }(timeout) } go func(datahub DataHubApi) { for { ls, err := datahub.ListShard(projectName, topicName) if err != nil { time.Sleep(1 * time.Microsecond) continue } ok := true for _, shard := range ls.Shards { switch shard.State { case ACTIVE, CLOSED: continue default: ok = false break } } if ok { break } } ready <- true }(datahub) return <-ready } func (datahub *DataHub) ListTopic(projectName string) (*ListTopicResult, error) { if !util.CheckProjectName(projectName) { return nil, NewInvalidParameterErrorWithMessage(projectNameInvalid) } path := fmt.Sprintf(topicsPath, projectName) reqPara := &RequestParameter{ Header: map[string]string{httpHeaderContentType: httpJsonContent}, } respBody, commonResp, err := datahub.Client.Get(path, reqPara) if err != nil { return nil, err } return NewListTopicResult(respBody, commonResp) } func (datahub *DataHub) ListTopicWithFilter(projectName, filter string) (*ListTopicResult, error) { if !util.CheckProjectName(projectName) { return nil, NewInvalidParameterErrorWithMessage(projectNameInvalid) } path := fmt.Sprintf(topicsPath, projectName) reqPara := &RequestParameter{ Header: map[string]string{httpHeaderContentType: httpJsonContent}, Query: map[string]string{httpFilterQuery: filter}, } respBody, commonResp, err := datahub.Client.Get(path, reqPara) if err != nil { return nil, err } return NewListTopicResult(respBody, commonResp) } func (datahub *DataHub) CreateBlobTopic(projectName, topicName, comment string, shardCount, lifeCycle int) (*CreateBlobTopicResult, error) { para := &CreateTopicParameter{ ShardCount: shardCount, LifeCycle: lifeCycle, Comment: comment, RecordType: BLOB, RecordSchema: nil, ExpandMode: SPLIT_EXTEND, } ret, err := datahub.CreateTopicWithPara(projectName, topicName, para) if err != nil { return nil, err } return NewCreateBlobTopicResult(&ret.CommonResponseResult) } func (datahub *DataHub) CreateTupleTopic(projectName, topicName, comment string, shardCount, lifeCycle int, recordSchema *RecordSchema) (*CreateTupleTopicResult, error) { para := &CreateTopicParameter{ ShardCount: shardCount, LifeCycle: lifeCycle, Comment: comment, RecordType: TUPLE, RecordSchema: recordSchema, ExpandMode: SPLIT_EXTEND, } ret, err := datahub.CreateTopicWithPara(projectName, topicName, para) if err != nil { return nil, err } return NewCreateTupleTopicResult(&ret.CommonResponseResult) } func (datahub *DataHub) CreateTopicWithPara(projectName, topicName string, para *CreateTopicParameter) (*CreateTopicWithParaResult, error) { if !util.CheckProjectName(projectName) { return nil, NewInvalidParameterErrorWithMessage(projectNameInvalid) } if !util.CheckTopicName(topicName) { return nil, NewInvalidParameterErrorWithMessage(topicNameInvalid) } if para == nil { return nil, NewInvalidParameterErrorWithMessage(parameterNull) } if !util.CheckComment(para.Comment) { return nil, NewInvalidParameterErrorWithMessage(commentInvalid) } if para.RecordType != TUPLE && para.RecordType != BLOB { return nil, NewInvalidParameterErrorWithMessage(fmt.Sprintf("Invalid RecordType: %s", para.RecordType)) } if para.RecordType == TUPLE && para.RecordSchema == nil { return nil, NewInvalidParameterErrorWithMessage("Tuple topic must set RecordSchema") } if para.LifeCycle <= 0 { return nil, NewInvalidParameterErrorWithMessage(lifecycleInvalid) } path := fmt.Sprintf(topicPath, projectName, topicName) reqPara := &RequestParameter{ Header: map[string]string{httpHeaderContentType: httpJsonContent}, } ctr := &CreateTopicRequest{ Action: "create", ShardCount: para.ShardCount, Lifecycle: para.LifeCycle, RecordType: para.RecordType, RecordSchema: para.RecordSchema, Comment: para.Comment, ExpandMode: para.ExpandMode, } _, commonResp, err := datahub.Client.Post(path, ctr, reqPara) if err != nil { return nil, err } return NewCreateTopicWithParaResult(commonResp) } func (datahub *DataHub) UpdateTopic(projectName, topicName, comment string) (*UpdateTopicResult, error) { para := &UpdateTopicParameter{ Comment: comment, } return datahub.UpdateTopicWithPara(projectName, topicName, para) } // Update topic meta information. Only support comment and lifeCycle now. func (datahub *DataHub) UpdateTopicWithPara(projectName, topicName string, para *UpdateTopicParameter) (*UpdateTopicResult, error) { if !util.CheckProjectName(projectName) { return nil, NewInvalidParameterErrorWithMessage(projectNameInvalid) } if !util.CheckTopicName(topicName) { return nil, NewInvalidParameterErrorWithMessage(topicNameInvalid) } if para == nil { return nil, NewInvalidParameterErrorWithMessage(parameterNull) } if !util.CheckComment(para.Comment) { return nil, NewInvalidParameterErrorWithMessage(commentInvalid) } path := fmt.Sprintf(topicPath, projectName, topicName) reqPara := &RequestParameter{ Header: map[string]string{httpHeaderContentType: httpJsonContent}, } ut := &UpdateTopicRequest{ Lifecycle: para.LifeCycle, Comment: para.Comment, } _, commonResp, err := datahub.Client.Put(path, ut, reqPara) if err != nil { return nil, err } return NewUpdateTopicResult(commonResp) } func (datahub *DataHub) DeleteTopic(projectName, topicName string) (*DeleteTopicResult, error) { if !util.CheckProjectName(projectName) { return nil, NewInvalidParameterErrorWithMessage(projectNameInvalid) } if !util.CheckTopicName(topicName) { return nil, NewInvalidParameterErrorWithMessage(topicNameInvalid) } path := fmt.Sprintf(topicPath, projectName, topicName) reqPara := &RequestParameter{ Header: map[string]string{httpHeaderContentType: httpJsonContent}, } _, commonResp, err := datahub.Client.Delete(path, reqPara) if err != nil { return nil, err } return NewDeleteTopicResult(commonResp) } func (datahub *DataHub) GetTopic(projectName, topicName string) (*GetTopicResult, error) { if !util.CheckProjectName(projectName) { return nil, NewInvalidParameterErrorWithMessage(projectNameInvalid) } if !util.CheckTopicName(topicName) { return nil, NewInvalidParameterErrorWithMessage(topicNameInvalid) } path := fmt.Sprintf(topicPath, projectName, topicName) reqPara := &RequestParameter{ Header: map[string]string{httpHeaderContentType: httpJsonContent}, } respBody, commonResp, err := datahub.Client.Get(path, reqPara) if err != nil { return nil, err } result, err := NewGetTopicResult(respBody, commonResp) if err != nil { return nil, err } result.ProjectName = projectName result.TopicName = topicName return result, nil } func (datahub *DataHub) ListShard(projectName, topicName string) (*ListShardResult, error) { if !util.CheckProjectName(projectName) { return nil, NewInvalidParameterErrorWithMessage(projectNameInvalid) } if !util.CheckTopicName(topicName) { return nil, NewInvalidParameterErrorWithMessage(topicNameInvalid) } path := fmt.Sprintf(shardsPath, projectName, topicName) reqPara := &RequestParameter{ Header: map[string]string{httpHeaderContentType: httpJsonContent}, } respBody, commonResp, err := datahub.Client.Get(path, reqPara) if err != nil { return nil, err } return NewListShardResult(respBody, commonResp) } func (datahub *DataHub) SplitShard(projectName, topicName, shardId string) (*SplitShardResult, error) { if !util.CheckProjectName(projectName) { return nil, NewInvalidParameterErrorWithMessage(projectNameInvalid) } if !util.CheckTopicName(topicName) { return nil, NewInvalidParameterErrorWithMessage(topicNameInvalid) } if !util.CheckShardId(shardId) { return nil, NewInvalidParameterErrorWithMessage(shardIdInvalid) } splitKey, err := generateSpliteKey(projectName, topicName, shardId, datahub) if err != nil { return nil, err } path := fmt.Sprintf(shardsPath, projectName, topicName) reqPara := &RequestParameter{ Header: map[string]string{httpHeaderContentType: httpJsonContent}, } ssr := &SplitShardRequest{ Action: "split", ShardId: shardId, SplitKey: splitKey, } respBody, commonResp, err := datahub.Client.Post(path, ssr, reqPara) if err != nil { return nil, err } return NewSplitShardResult(respBody, commonResp) } func (datahub *DataHub) SplitShardBySplitKey(projectName, topicName, shardId, splitKey string) (*SplitShardResult, error) { if !util.CheckProjectName(projectName) { return nil, NewInvalidParameterErrorWithMessage(projectNameInvalid) } if !util.CheckTopicName(topicName) { return nil, NewInvalidParameterErrorWithMessage(topicNameInvalid) } if !util.CheckShardId(shardId) { return nil, NewInvalidParameterErrorWithMessage(shardIdInvalid) } path := fmt.Sprintf(shardsPath, projectName, topicName) reqPara := &RequestParameter{ Header: map[string]string{httpHeaderContentType: httpJsonContent}, } ssr := &SplitShardRequest{ Action: "split", ShardId: shardId, SplitKey: splitKey, } respBody, commonResp, err := datahub.Client.Post(path, ssr, reqPara) if err != nil { return nil, err } return NewSplitShardResult(respBody, commonResp) } func (datahub *DataHub) MergeShard(projectName, topicName, shardId, adjacentShardId string) (*MergeShardResult, error) { if !util.CheckProjectName(projectName) { return nil, NewInvalidParameterErrorWithMessage(projectNameInvalid) } if !util.CheckTopicName(topicName) { return nil, NewInvalidParameterErrorWithMessage(topicNameInvalid) } if !util.CheckShardId(shardId) || !util.CheckShardId(adjacentShardId) { return nil, NewInvalidParameterErrorWithMessage(shardIdInvalid) } path := fmt.Sprintf(shardsPath, projectName, topicName) reqPara := &RequestParameter{ Header: map[string]string{httpHeaderContentType: httpJsonContent}, } mss := &MergeShardRequest{ Action: "merge", ShardId: shardId, AdjacentShardId: adjacentShardId, } respBody, commonResp, err := datahub.Client.Post(path, mss, reqPara) if err != nil { return nil, err } return NewMergeShardResult(respBody, commonResp) } func (datahub *DataHub) ExtendShard(projectName, topicName string, shardCount int) (*ExtendShardResult, error) { if !util.CheckProjectName(projectName) { return nil, NewInvalidParameterErrorWithMessage(projectNameInvalid) } if !util.CheckTopicName(topicName) { return nil, NewInvalidParameterErrorWithMessage(topicNameInvalid) } if shardCount <= 0 { return nil, NewInvalidParameterErrorWithMessage("shardCount is invalid") } path := fmt.Sprintf(shardsPath, projectName, topicName) reqPara := &RequestParameter{ Header: map[string]string{httpHeaderContentType: httpJsonContent}, } mss := &ExtendShardRequest{ Action: "extend", ExtendMode: "TO", ShardCount: shardCount, } _, commonResp, err := datahub.Client.Post(path, mss, reqPara) if err != nil { return nil, err } return NewExtendShardResult(commonResp) } func (datahub *DataHub) GetCursor(projectName, topicName, shardId string, ctype CursorType, param ...int64) (*GetCursorResult, error) { if !util.CheckProjectName(projectName) { return nil, NewInvalidParameterErrorWithMessage(projectNameInvalid) } if !util.CheckTopicName(topicName) { return nil, NewInvalidParameterErrorWithMessage(topicNameInvalid) } if !util.CheckShardId(shardId) { return nil, NewInvalidParameterErrorWithMessage(shardIdInvalid) } if len(param) > 1 { return nil, NewInvalidParameterErrorWithMessage(parameterNumInvalid) } path := fmt.Sprintf(shardPath, projectName, topicName, shardId) reqPara := &RequestParameter{ Header: map[string]string{httpHeaderContentType: httpJsonContent}, } gcr := &GetCursorRequest{ Action: "cursor", CursorType: ctype, } switch ctype { case OLDEST, LATEST: if len(param) != 0 { return nil, NewInvalidParameterErrorWithMessage("Not need extra parameter when CursorType OLDEST or LATEST") } case SYSTEM_TIME: if len(param) != 1 { return nil, NewInvalidParameterErrorWithMessage("Timestamp must be set when CursorType is SYSTEM_TIME") } gcr.SystemTime = param[0] case SEQUENCE: if len(param) != 1 { return nil, NewInvalidParameterErrorWithMessage("Sequence must be set when CursorType is SEQUENCE") } gcr.Sequence = param[0] } respBody, commonResp, err := datahub.Client.Post(path, gcr, reqPara) if err != nil { return nil, err } return NewGetCursorResult(respBody, commonResp) } func (datahub *DataHub) PutRecords(projectName, topicName string, records []IRecord) (*PutRecordsResult, error) { if !util.CheckProjectName(projectName) { return nil, NewInvalidParameterErrorWithMessage(projectNameInvalid) } if !util.CheckTopicName(topicName) { return nil, NewInvalidParameterErrorWithMessage(topicNameInvalid) } if records == nil || len(records) == 0 { return nil, NewInvalidParameterErrorWithMessage(recordsInvalid) } path := fmt.Sprintf(shardsPath, projectName, topicName) reqPara := &RequestParameter{ Header: map[string]string{httpHeaderContentType: httpJsonContent}, } prr := &PutRecordsRequest{ Action: "pub", Records: records, } respBody, commonResp, err := datahub.Client.Post(path, prr, reqPara) if err != nil { return nil, err } return NewPutRecordsResult(respBody, commonResp) } func (datahub *DataHub) PutRecordsByShard(projectName, topicName, shardId string, records []IRecord) (*PutRecordsByShardResult, error) { return nil, errors.New("not support this method") } func (datahub *DataHub) GetTupleRecords(projectName, topicName, shardId, cursor string, limit int, recordSchema *RecordSchema) (*GetRecordsResult, error) { if !util.CheckProjectName(projectName) { return nil, NewInvalidParameterErrorWithMessage(projectNameInvalid) } if !util.CheckTopicName(topicName) { return nil, NewInvalidParameterErrorWithMessage(topicNameInvalid) } if !util.CheckShardId(shardId) { return nil, NewInvalidParameterErrorWithMessage(shardIdInvalid) } if recordSchema == nil { return nil, NewInvalidParameterErrorWithMessage(missingRecordSchema) } path := fmt.Sprintf(shardPath, projectName, topicName, shardId) reqPara := &RequestParameter{ Header: map[string]string{httpHeaderContentType: httpJsonContent}, } grr := &GetRecordRequest{ Action: "sub", Cursor: cursor, Limit: limit, } respBody, commonResp, err := datahub.Client.Post(path, grr, reqPara) if err != nil { return nil, err } ret, err := NewGetRecordsResult(respBody, recordSchema, commonResp) if err != nil { return nil, err } for _, record := range ret.Records { if _, ok := record.(*TupleRecord); !ok { return nil, NewInvalidParameterErrorWithMessage("Shouldn't call this method for BLOB topic") } } return ret, nil } func (datahub *DataHub) GetBlobRecords(projectName, topicName, shardId, cursor string, limit int) (*GetRecordsResult, error) { if !util.CheckProjectName(projectName) { return nil, NewInvalidParameterErrorWithMessage(projectNameInvalid) } if !util.CheckTopicName(topicName) { return nil, NewInvalidParameterErrorWithMessage(topicNameInvalid) } if !util.CheckShardId(shardId) { return nil, NewInvalidParameterErrorWithMessage(shardIdInvalid) } path := fmt.Sprintf(shardPath, projectName, topicName, shardId) reqPara := &RequestParameter{ Header: map[string]string{httpHeaderContentType: httpJsonContent}, } grr := &GetRecordRequest{ Action: "sub", Cursor: cursor, Limit: limit, } respBody, commonResp, err := datahub.Client.Post(path, grr, reqPara) if err != nil { return nil, err } return NewGetRecordsResult(respBody, nil, commonResp) } func (datahub *DataHub) AppendField(projectName, topicName string, field Field) (*AppendFieldResult, error) { if !util.CheckProjectName(projectName) { return nil, NewInvalidParameterErrorWithMessage(projectNameInvalid) } if !util.CheckTopicName(topicName) { return nil, NewInvalidParameterErrorWithMessage(topicNameInvalid) } path := fmt.Sprintf(topicPath, projectName, topicName) reqPara := &RequestParameter{ Header: map[string]string{httpHeaderContentType: httpJsonContent}, } afr := &AppendFieldRequest{ Action: "AppendField", FieldName: field.Name, FieldType: field.Type, } _, commonResp, err := datahub.Client.Post(path, afr, reqPara) if err != nil { return nil, err } return NewAppendFieldResult(commonResp) } func (datahub *DataHub) GetMeterInfo(projectName, topicName, shardId string) (*GetMeterInfoResult, error) { if !util.CheckProjectName(projectName) { return nil, NewInvalidParameterErrorWithMessage(projectNameInvalid) } if !util.CheckTopicName(topicName) { return nil, NewInvalidParameterErrorWithMessage(topicNameInvalid) } if !util.CheckShardId(shardId) { return nil, NewInvalidParameterErrorWithMessage(shardIdInvalid) } path := fmt.Sprintf(shardPath, projectName, topicName, shardId) reqPara := &RequestParameter{ Header: map[string]string{httpHeaderContentType: httpJsonContent}, } gmir := &GetMeterInfoRequest{ Action: "meter", } respBody, commonResp, err := datahub.Client.Post(path, gmir, reqPara) if err != nil { return nil, err } return NewGetMeterInfoResult(respBody, commonResp) } func (datahub *DataHub) ListConnector(projectName, topicName string) (*ListConnectorResult, error) { if !util.CheckProjectName(projectName) { return nil, NewInvalidParameterErrorWithMessage(projectNameInvalid) } if !util.CheckTopicName(topicName) { return nil, NewInvalidParameterErrorWithMessage(topicNameInvalid) } path := fmt.Sprintf(connectorsPath, projectName, topicName) reqPara := &RequestParameter{ Header: map[string]string{httpHeaderContentType: httpJsonContent}, Query: map[string]string{httpHeaderConnectorMode: "id"}, } respBody, commonResp, err := datahub.Client.Get(path, reqPara) if err != nil { return nil, err } return NewListConnectorResult(respBody, commonResp) } func (datahub *DataHub) CreateConnector(projectName, topicName string, cType ConnectorType, columnFields []string, config interface{}) (*CreateConnectorResult, error) { return datahub.CreateConnectorWithStartTime(projectName, topicName, cType, columnFields, -1, config) } func (datahub *DataHub) CreateConnectorWithStartTime(projectName, topicName string, cType ConnectorType, columnFields []string, sinkStartTime int64, config interface{}) (*CreateConnectorResult, error) { para := &CreateConnectorParameter{ SinkStartTime: sinkStartTime, ConnectorType: cType, ColumnFields: columnFields, ColumnNameMap: nil, Config: config, } return datahub.CreateConnectorWithPara(projectName, topicName, para) } func (datahub *DataHub) CreateConnectorWithPara(projectName, topicName string, para *CreateConnectorParameter) (*CreateConnectorResult, error) { if !util.CheckProjectName(projectName) { return nil, NewInvalidParameterErrorWithMessage(projectNameInvalid) } if !util.CheckTopicName(topicName) { return nil, NewInvalidParameterErrorWithMessage(topicNameInvalid) } if para == nil { return nil, NewInvalidParameterErrorWithMessage(parameterNull) } if !validateConnectorType(para.ConnectorType) { return nil, NewInvalidParameterErrorWithMessage(parameterTypeInvalid) } path := fmt.Sprintf(connectorPath, projectName, topicName, para.ConnectorType.String()) reqPara := &RequestParameter{ Header: map[string]string{httpHeaderContentType: httpJsonContent}, } ccr := &CreateConnectorRequest{ Action: "create", Type: para.ConnectorType, SinkStartTime: para.SinkStartTime, ColumnFields: para.ColumnFields, ColumnNameMap: para.ColumnNameMap, Config: para.Config, } respBody, commonResp, err := datahub.Client.Post(path, ccr, reqPara) if err != nil { return nil, err } return NewCreateConnectorResult(respBody, commonResp) } func (datahub *DataHub) GetConnector(projectName, topicName, connectorId string) (*GetConnectorResult, error) { if !util.CheckProjectName(projectName) { return nil, NewInvalidParameterErrorWithMessage(projectNameInvalid) } if !util.CheckTopicName(topicName) { return nil, NewInvalidParameterErrorWithMessage(topicNameInvalid) } path := fmt.Sprintf(connectorPath, projectName, topicName, connectorId) reqPara := &RequestParameter{ Header: map[string]string{httpHeaderContentType: httpJsonContent}, } respBody, commonResp, err := datahub.Client.Get(path, reqPara) if err != nil { return nil, err } return NewGetConnectorResult(respBody, commonResp) } func (datahub *DataHub) UpdateConnector(projectName, topicName, connectorId string, config interface{}) (*UpdateConnectorResult, error) { para := &UpdateConnectorParameter{ ColumnFields: nil, ColumnNameMap: nil, Config: config, } return datahub.UpdateConnectorWithPara(projectName, topicName, connectorId, para) } func (datahub *DataHub) UpdateConnectorWithPara(projectName, topicName, connectorId string, para *UpdateConnectorParameter) (*UpdateConnectorResult, error) { if !util.CheckProjectName(projectName) { return nil, NewInvalidParameterErrorWithMessage(projectNameInvalid) } if !util.CheckTopicName(topicName) { return nil, NewInvalidParameterErrorWithMessage(topicNameInvalid) } if para == nil { return nil, NewInvalidParameterErrorWithMessage(parameterNull) } path := fmt.Sprintf(connectorPath, projectName, topicName, connectorId) reqPara := &RequestParameter{ Header: map[string]string{httpHeaderContentType: httpJsonContent}, } ucr := &UpdateConnectorRequest{ Action: "updateconfig", ColumnFields: para.ColumnFields, ColumnNameMap: para.ColumnNameMap, Config: para.Config, } _, commonResp, err := datahub.Client.Post(path, ucr, reqPara) if err != nil { return nil, err } return NewUpdateConnectorResult(commonResp) } func (datahub *DataHub) DeleteConnector(projectName, topicName, connectorId string) (*DeleteConnectorResult, error) { if !util.CheckProjectName(projectName) { return nil, NewInvalidParameterErrorWithMessage(projectNameInvalid) } if !util.CheckTopicName(topicName) { return nil, NewInvalidParameterErrorWithMessage(topicNameInvalid) } path := fmt.Sprintf(connectorPath, projectName, topicName, connectorId) reqPara := &RequestParameter{ Header: map[string]string{httpHeaderContentType: httpJsonContent}, } _, commonResp, err := datahub.Client.Delete(path, reqPara) if err != nil { return nil, err } return NewDeleteConnectorResult(commonResp) } func (datahub *DataHub) GetConnectorDoneTime(projectName, topicName, connectorId string) (*GetConnectorDoneTimeResult, error) { if !util.CheckProjectName(projectName) { return nil, NewInvalidParameterErrorWithMessage(projectNameInvalid) } if !util.CheckTopicName(topicName) { return nil, NewInvalidParameterErrorWithMessage(topicNameInvalid) } path := fmt.Sprintf(connectorPath, projectName, topicName, connectorId) reqPara := &RequestParameter{ Header: map[string]string{httpHeaderContentType: httpJsonContent}, Query: map[string]string{"donetime": ""}, } respBody, commonResp, err := datahub.Client.Get(path, reqPara) if err != nil { return nil, err } return NewGetConnectorDoneTimeResult(respBody, commonResp) } func (datahub *DataHub) GetConnectorShardStatus(projectName, topicName, connectorId string) (*GetConnectorShardStatusResult, error) { if !util.CheckProjectName(projectName) { return nil, NewInvalidParameterErrorWithMessage(projectNameInvalid) } if !util.CheckTopicName(topicName) { return nil, NewInvalidParameterErrorWithMessage(topicNameInvalid) } path := fmt.Sprintf(connectorPath, projectName, topicName, connectorId) reqPara := &RequestParameter{ Header: map[string]string{httpHeaderContentType: httpJsonContent}, } gcss := &GetConnectorShardStatusRequest{ Action: "Status", } respBody, commonResp, err := datahub.Client.Post(path, gcss, reqPara) if err != nil { return nil, err } return NewGetConnectorShardStatusResult(respBody, commonResp) } func (datahub *DataHub) GetConnectorShardStatusByShard(projectName, topicName, connectorId, shardId string) (*GetConnectorShardStatusByShardResult, error) { if !util.CheckProjectName(projectName) { return nil, NewInvalidParameterErrorWithMessage(projectNameInvalid) } if !util.CheckTopicName(topicName) { return nil, NewInvalidParameterErrorWithMessage(topicNameInvalid) } if !util.CheckShardId(shardId) { return nil, NewInvalidParameterErrorWithMessage(shardIdInvalid) } path := fmt.Sprintf(connectorPath, projectName, topicName, connectorId) reqPara := &RequestParameter{ Header: map[string]string{httpHeaderContentType: httpJsonContent}, } gcss := &GetConnectorShardStatusRequest{ Action: "Status", ShardId: shardId, } respBody, commonResp, err := datahub.Client.Post(path, gcss, reqPara) if err != nil { return nil, err } return NewGetConnectorShardStatusByShardResult(respBody, commonResp) } func (datahub *DataHub) ReloadConnector(projectName, topicName, connectorId string) (*ReloadConnectorResult, error) { if !util.CheckProjectName(projectName) { return nil, NewInvalidParameterErrorWithMessage(projectNameInvalid) } if !util.CheckTopicName(topicName) { return nil, NewInvalidParameterErrorWithMessage(topicNameInvalid) } path := fmt.Sprintf(connectorPath, projectName, topicName, connectorId) reqPara := &RequestParameter{ Header: map[string]string{httpHeaderContentType: httpJsonContent}, } rcr := &ReloadConnectorRequest{ Action: "Reload", } _, commonResp, err := datahub.Client.Post(path, rcr, reqPara) if err != nil { return nil, err } return NewReloadConnectorResult(commonResp) } func (datahub *DataHub) ReloadConnectorByShard(projectName, topicName, connectorId, shardId string) (*ReloadConnectorByShardResult, error) { if !util.CheckProjectName(projectName) { return nil, NewInvalidParameterErrorWithMessage(projectNameInvalid) } if !util.CheckTopicName(topicName) { return nil, NewInvalidParameterErrorWithMessage(topicNameInvalid) } if !util.CheckShardId(shardId) { return nil, NewInvalidParameterErrorWithMessage(shardIdInvalid) } path := fmt.Sprintf(connectorPath, projectName, topicName, connectorId) reqPara := &RequestParameter{ Header: map[string]string{httpHeaderContentType: httpJsonContent}, } rcr := &ReloadConnectorRequest{ Action: "Reload", ShardId: shardId, } _, commonResp, err := datahub.Client.Post(path, rcr, reqPara) if err != nil { return nil, err } return NewReloadConnectorByShardResult(commonResp) } func (datahub *DataHub) UpdateConnectorState(projectName, topicName, connectorId string, state ConnectorState) (*UpdateConnectorStateResult, error) { if !util.CheckProjectName(projectName) { return nil, NewInvalidParameterErrorWithMessage(projectNameInvalid) } if !util.CheckTopicName(topicName) { return nil, NewInvalidParameterErrorWithMessage(topicNameInvalid) } if !validateConnectorState(state) { return nil, NewInvalidParameterErrorWithMessage(parameterTypeInvalid) } path := fmt.Sprintf(connectorPath, projectName, topicName, connectorId) reqPara := &RequestParameter{ Header: map[string]string{httpHeaderContentType: httpJsonContent}, } ucsr := &UpdateConnectorStateRequest{ Action: "updatestate", State: state, } _, commonResp, err := datahub.Client.Post(path, ucsr, reqPara) if err != nil { return nil, err } return NewUpdateConnectorStateResult(commonResp) } func (datahub *DataHub) UpdateConnectorOffset(projectName, topicName, connectorId, shardId string, offset ConnectorOffset) (*UpdateConnectorOffsetResult, error) { if !util.CheckProjectName(projectName) { return nil, NewInvalidParameterErrorWithMessage(projectNameInvalid) } if !util.CheckTopicName(topicName) { return nil, NewInvalidParameterErrorWithMessage(topicNameInvalid) } if !util.CheckShardId(shardId) { return nil, NewInvalidParameterErrorWithMessage(shardIdInvalid) } path := fmt.Sprintf(connectorPath, projectName, topicName, connectorId) reqPara := &RequestParameter{ Header: map[string]string{httpHeaderContentType: httpJsonContent}, } ucor := &UpdateConnectorOffsetRequest{ Action: "updateshardcontext", ShardId: shardId, Timestamp: offset.Timestamp, Sequence: offset.Sequence, } _, commonResp, err := datahub.Client.Post(path, ucor, reqPara) if err != nil { return nil, err } return NewUpdateConnectorOffsetResult(commonResp) } func (datahub *DataHub) AppendConnectorField(projectName, topicName, connectorId, fieldName string) (*AppendConnectorFieldResult, error) { if !util.CheckProjectName(projectName) { return nil, NewInvalidParameterErrorWithMessage(projectNameInvalid) } if !util.CheckTopicName(topicName) { return nil, NewInvalidParameterErrorWithMessage(topicNameInvalid) } path := fmt.Sprintf(connectorPath, projectName, topicName, connectorId) reqPara := &RequestParameter{ Header: map[string]string{httpHeaderContentType: httpJsonContent}, } acfr := &AppendConnectorFieldRequest{ Action: "appendfield", FieldName: fieldName, } _, commonResp, err := datahub.Client.Post(path, acfr, reqPara) if err != nil { return nil, err } return NewAppendConnectorFieldResult(commonResp) } func (datahub *DataHub) ListSubscription(projectName, topicName string, pageIndex, pageSize int) (*ListSubscriptionResult, error) { if !util.CheckProjectName(projectName) { return nil, NewInvalidParameterErrorWithMessage(projectNameInvalid) } if !util.CheckTopicName(topicName) { return nil, NewInvalidParameterErrorWithMessage(topicNameInvalid) } path := fmt.Sprintf(subscriptionsPath, projectName, topicName) reqPara := &RequestParameter{ Header: map[string]string{httpHeaderContentType: httpJsonContent}, } lsr := &ListSubscriptionRequest{ Action: "list", PageIndex: pageIndex, PageSize: pageSize, } respBody, commonResp, err := datahub.Client.Post(path, lsr, reqPara) if err != nil { return nil, err } return NewListSubscriptionResult(respBody, commonResp) } func (datahub *DataHub) CreateSubscription(projectName, topicName, comment string) (*CreateSubscriptionResult, error) { if !util.CheckProjectName(projectName) { return nil, NewInvalidParameterErrorWithMessage(projectNameInvalid) } if !util.CheckTopicName(topicName) { return nil, NewInvalidParameterErrorWithMessage(topicNameInvalid) } if !util.CheckComment(comment) { return nil, NewInvalidParameterErrorWithMessage(commentInvalid) } path := fmt.Sprintf(subscriptionsPath, projectName, topicName) reqPara := &RequestParameter{ Header: map[string]string{httpHeaderContentType: httpJsonContent}, } csr := &CreateSubscriptionRequest{ Action: "create", Comment: comment, } respBody, commonResp, err := datahub.Client.Post(path, csr, reqPara) if err != nil { return nil, err } return NewCreateSubscriptionResult(respBody, commonResp) } func (datahub *DataHub) UpdateSubscription(projectName, topicName, subId, comment string) (*UpdateSubscriptionResult, error) { if !util.CheckProjectName(projectName) { return nil, NewInvalidParameterErrorWithMessage(projectNameInvalid) } if !util.CheckTopicName(topicName) { return nil, NewInvalidParameterErrorWithMessage(topicNameInvalid) } if !util.CheckComment(comment) { return nil, NewInvalidParameterErrorWithMessage(commentInvalid) } path := fmt.Sprintf(subscriptionPath, projectName, topicName, subId) reqPara := &RequestParameter{ Header: map[string]string{httpHeaderContentType: httpJsonContent}, } usr := &UpdateSubscriptionRequest{ Comment: comment, } _, commonResp, err := datahub.Client.Put(path, usr, reqPara) if err != nil { return nil, err } return NewUpdateSubscriptionResult(commonResp) } func (datahub *DataHub) DeleteSubscription(projectName, topicName, subId string) (*DeleteSubscriptionResult, error) { if !util.CheckProjectName(projectName) { return nil, NewInvalidParameterErrorWithMessage(projectNameInvalid) } if !util.CheckTopicName(topicName) { return nil, NewInvalidParameterErrorWithMessage(topicNameInvalid) } path := fmt.Sprintf(subscriptionPath, projectName, topicName, subId) reqPara := &RequestParameter{ Header: map[string]string{httpHeaderContentType: httpJsonContent}, } _, commonResp, err := datahub.Client.Delete(path, reqPara) if err != nil { return nil, err } return NewDeleteSubscriptionResult(commonResp) } func (datahub *DataHub) GetSubscription(projectName, topicName, subId string) (*GetSubscriptionResult, error) { if !util.CheckProjectName(projectName) { return nil, NewInvalidParameterErrorWithMessage(projectNameInvalid) } if !util.CheckTopicName(topicName) { return nil, NewInvalidParameterErrorWithMessage(topicNameInvalid) } path := fmt.Sprintf(subscriptionPath, projectName, topicName, subId) reqPara := &RequestParameter{ Header: map[string]string{httpHeaderContentType: httpJsonContent}, } respBody, commonResp, err := datahub.Client.Get(path, reqPara) if err != nil { return nil, err } return NewGetSubscriptionResult(respBody, commonResp) } func (datahub *DataHub) UpdateSubscriptionState(projectName, topicName, subId string, state SubscriptionState) (*UpdateSubscriptionStateResult, error) { if !util.CheckProjectName(projectName) { return nil, NewInvalidParameterErrorWithMessage(projectNameInvalid) } if !util.CheckTopicName(topicName) { return nil, NewInvalidParameterErrorWithMessage(topicNameInvalid) } path := fmt.Sprintf(subscriptionPath, projectName, topicName, subId) reqPara := &RequestParameter{ Header: map[string]string{httpHeaderContentType: httpJsonContent}, } usr := &UpdateSubscriptionStateRequest{ State: state, } _, commonResp, err := datahub.Client.Put(path, usr, reqPara) if err != nil { return nil, err } return NewUpdateSubscriptionStateResult(commonResp) } func (datahub *DataHub) OpenSubscriptionSession(projectName, topicName, subId string, shardIds []string) (*OpenSubscriptionSessionResult, error) { if !util.CheckProjectName(projectName) { return nil, NewInvalidParameterErrorWithMessage(projectNameInvalid) } if !util.CheckTopicName(topicName) { return nil, NewInvalidParameterErrorWithMessage(topicNameInvalid) } for _, id := range shardIds { if !util.CheckShardId(id) { return nil, NewInvalidParameterErrorWithMessage(shardIdInvalid) } } path := fmt.Sprintf(offsetsPath, projectName, topicName, subId) reqPara := &RequestParameter{ Header: map[string]string{httpHeaderContentType: httpJsonContent}, } ossr := &OpenSubscriptionSessionRequest{ Action: "open", ShardIds: shardIds, } respBody, commonResp, err := datahub.Client.Post(path, ossr, reqPara) if err != nil { return nil, err } return NewOpenSubscriptionSessionResult(respBody, commonResp) } func (datahub *DataHub) GetSubscriptionOffset(projectName, topicName, subId string, shardIds []string) (*GetSubscriptionOffsetResult, error) { if !util.CheckProjectName(projectName) { return nil, NewInvalidParameterErrorWithMessage(projectNameInvalid) } if !util.CheckTopicName(topicName) { return nil, NewInvalidParameterErrorWithMessage(topicNameInvalid) } for _, id := range shardIds { if !util.CheckShardId(id) { return nil, NewInvalidParameterErrorWithMessage(shardIdInvalid) } } path := fmt.Sprintf(offsetsPath, projectName, topicName, subId) reqPara := &RequestParameter{ Header: map[string]string{httpHeaderContentType: httpJsonContent}, } gsor := &GetSubscriptionOffsetRequest{ Action: "get", ShardIds: shardIds, } respBody, commonResp, err := datahub.Client.Post(path, gsor, reqPara) if err != nil { return nil, err } return NewGetSubscriptionOffsetResult(respBody, commonResp) } func (datahub *DataHub) CommitSubscriptionOffset(projectName, topicName, subId string, offsets map[string]SubscriptionOffset) (*CommitSubscriptionOffsetResult, error) { if !util.CheckProjectName(projectName) { return nil, NewInvalidParameterErrorWithMessage(projectNameInvalid) } if !util.CheckTopicName(topicName) { return nil, NewInvalidParameterErrorWithMessage(topicNameInvalid) } path := fmt.Sprintf(offsetsPath, projectName, topicName, subId) reqPara := &RequestParameter{ Header: map[string]string{httpHeaderContentType: httpJsonContent}, } req := &CommitSubscriptionOffsetRequest{ Action: "commit", Offsets: offsets, } _, commonResp, err := datahub.Client.Put(path, req, reqPara) if err != nil { return nil, err } return NewCommitSubscriptionOffsetResult(commonResp) } func (datahub *DataHub) ResetSubscriptionOffset(projectName, topicName, subId string, offsets map[string]SubscriptionOffset) (*ResetSubscriptionOffsetResult, error) { if !util.CheckProjectName(projectName) { return nil, NewInvalidParameterErrorWithMessage(projectNameInvalid) } if !util.CheckTopicName(topicName) { return nil, NewInvalidParameterErrorWithMessage(topicNameInvalid) } path := fmt.Sprintf(offsetsPath, projectName, topicName, subId) reqPara := &RequestParameter{ Header: map[string]string{httpHeaderContentType: httpJsonContent}, } req := &ResetSubscriptionOffsetRequest{ Action: "reset", Offsets: offsets, } _, commonResp, err := datahub.Client.Put(path, req, reqPara) if err != nil { return nil, err } return NewResetSubscriptionOffsetResult(commonResp) } func (datahub *DataHub) Heartbeat(projectName, topicName, consumerGroup, consumerId string, versionId int64, holdShardList, readEndShardList []string) (*HeartbeatResult, error) { if !util.CheckProjectName(projectName) { return nil, NewInvalidParameterErrorWithMessage(projectNameInvalid) } if !util.CheckTopicName(topicName) { return nil, NewInvalidParameterErrorWithMessage(topicNameInvalid) } for _, id := range holdShardList { if !util.CheckShardId(id) { return nil, NewInvalidParameterErrorWithMessage(shardIdInvalid) } } for _, id := range readEndShardList { if !util.CheckShardId(id) { return nil, NewInvalidParameterErrorWithMessage(shardIdInvalid) } } path := fmt.Sprintf(consumerGroupPath, projectName, topicName, consumerGroup) reqPara := &RequestParameter{ Header: map[string]string{httpHeaderContentType: httpJsonContent}, } hr := &HeartbeatRequest{ Action: "heartbeat", ConsumerId: consumerId, VersionId: versionId, HoldShardList: holdShardList, ReadEndShardList: readEndShardList, } respBody, commonResp, err := datahub.Client.Post(path, hr, reqPara) if err != nil { return nil, err } return NewHeartbeatResult(respBody, commonResp) } func (datahub *DataHub) JoinGroup(projectName, topicName, consumerGroup string, sessionTimeout int64) (*JoinGroupResult, error) { if !util.CheckProjectName(projectName) { return nil, NewInvalidParameterErrorWithMessage(projectNameInvalid) } if !util.CheckTopicName(topicName) { return nil, NewInvalidParameterErrorWithMessage(topicNameInvalid) } path := fmt.Sprintf(consumerGroupPath, projectName, topicName, consumerGroup) reqPara := &RequestParameter{ Header: map[string]string{httpHeaderContentType: httpJsonContent}, } jgr := &JoinGroupRequest{ Action: "joinGroup", SessionTimeout: sessionTimeout, } respBody, commonResp, err := datahub.Client.Post(path, jgr, reqPara) if err != nil { return nil, err } return NewJoinGroupResult(respBody, commonResp) } func (datahub *DataHub) SyncGroup(projectName, topicName, consumerGroup, consumerId string, versionId int64, releaseShardList, readEndShardList []string) (*SyncGroupResult, error) { if !util.CheckProjectName(projectName) { return nil, NewInvalidParameterErrorWithMessage(projectNameInvalid) } if !util.CheckTopicName(topicName) { return nil, NewInvalidParameterErrorWithMessage(topicNameInvalid) } if len(releaseShardList) == 0 || len(readEndShardList) == 0 { return nil, NewInvalidParameterErrorWithMessage(shardListInvalid) } for _, id := range releaseShardList { if !util.CheckShardId(id) { return nil, NewInvalidParameterErrorWithMessage(shardIdInvalid) } } for _, id := range readEndShardList { if !util.CheckShardId(id) { return nil, NewInvalidParameterErrorWithMessage(shardIdInvalid) } } path := fmt.Sprintf(consumerGroupPath, projectName, topicName, consumerGroup) reqPara := &RequestParameter{ Header: map[string]string{httpHeaderContentType: httpJsonContent}, } sgr := &SyncGroupRequest{ Action: "syncGroup", ConsumerId: consumerId, VersionId: versionId, ReleaseShardList: releaseShardList, ReadEndShardList: readEndShardList, } _, commonResp, err := datahub.Client.Post(path, sgr, reqPara) if err != nil { return nil, err } return NewSyncGroupResult(commonResp) } func (datahub *DataHub) LeaveGroup(projectName, topicName, consumerGroup, consumerId string, versionId int64) (*LeaveGroupResult, error) { if !util.CheckProjectName(projectName) { return nil, NewInvalidParameterErrorWithMessage(projectNameInvalid) } if !util.CheckTopicName(topicName) { return nil, NewInvalidParameterErrorWithMessage(topicNameInvalid) } path := fmt.Sprintf(consumerGroupPath, projectName, topicName, consumerGroup) reqPara := &RequestParameter{ Header: map[string]string{httpHeaderContentType: httpJsonContent}, } lgr := &LeaveGroupRequest{ Action: "leaveGroup", ConsumerId: consumerId, VersionId: versionId, } _, commonResp, err := datahub.Client.Post(path, lgr, reqPara) if err != nil { return nil, err } return NewLeaveGroupResult(commonResp) } func (datahub *DataHub) ListTopicSchema(projectName, topicName string) (*ListTopicSchemaResult, error) { if !util.CheckProjectName(projectName) { return nil, NewInvalidParameterErrorWithMessage(projectNameInvalid) } if !util.CheckTopicName(topicName) { return nil, NewInvalidParameterErrorWithMessage(topicNameInvalid) } path := fmt.Sprintf(topicPath, projectName, topicName) reqPara := &RequestParameter{ Header: map[string]string{httpHeaderContentType: httpJsonContent}, } lts := &ListTopicSchemaRequest{ Action: "ListSchema", } respBody, commonResp, err := datahub.Client.Post(path, lts, reqPara) if err != nil { return nil, err } return NewListTopicSchemaResult(respBody, commonResp) } func (datahub *DataHub) GetTopicSchemaByVersion(projectName, topicName string, versionId int) (*GetTopicSchemaResult, error) { if !util.CheckProjectName(projectName) { return nil, NewInvalidParameterErrorWithMessage(projectNameInvalid) } if !util.CheckTopicName(topicName) { return nil, NewInvalidParameterErrorWithMessage(topicNameInvalid) } path := fmt.Sprintf(topicPath, projectName, topicName) reqPara := &RequestParameter{ Header: map[string]string{httpHeaderContentType: httpJsonContent}, } lts := &GetTopicSchemaRequest{ Action: "GetSchema", VersionId: versionId, RecordSchema: nil, } respBody, commonResp, err := datahub.Client.Post(path, lts, reqPara) if err != nil { return nil, err } return NewGetTopicSchemaResult(respBody, commonResp) } func (datahub *DataHub) GetTopicSchemaBySchema(projectName, topicName string, recordSchema *RecordSchema) (*GetTopicSchemaResult, error) { if !util.CheckProjectName(projectName) { return nil, NewInvalidParameterErrorWithMessage(projectNameInvalid) } if !util.CheckTopicName(topicName) { return nil, NewInvalidParameterErrorWithMessage(topicNameInvalid) } path := fmt.Sprintf(topicPath, projectName, topicName) reqPara := &RequestParameter{ Header: map[string]string{httpHeaderContentType: httpJsonContent}, } lts := &GetTopicSchemaRequest{ Action: "GetSchema", VersionId: -1, RecordSchema: recordSchema, } respBody, commonResp, err := datahub.Client.Post(path, lts, reqPara) if err != nil { return nil, err } return NewGetTopicSchemaResult(respBody, commonResp) } func (datahub *DataHub) RegisterTopicSchema(projectName, topicName string, recordSchema *RecordSchema) (*RegisterTopicSchemaResult, error) { if !util.CheckProjectName(projectName) { return nil, NewInvalidParameterErrorWithMessage(projectNameInvalid) } if !util.CheckTopicName(topicName) { return nil, NewInvalidParameterErrorWithMessage(topicNameInvalid) } path := fmt.Sprintf(topicPath, projectName, topicName) reqPara := &RequestParameter{ Header: map[string]string{httpHeaderContentType: httpJsonContent}, } lts := &RegisterTopicSchemaRequest{ Action: "RegisterSchema", RecordSchema: recordSchema, } respBody, commonResp, err := datahub.Client.Post(path, lts, reqPara) if err != nil { return nil, err } return NewRegisterTopicSchemaResult(respBody, commonResp) } func (datahub *DataHub) DeleteTopicSchema(projectName, topicName string, versionId int) (*DeleteTopicSchemaResult, error) { if !util.CheckProjectName(projectName) { return nil, NewInvalidParameterErrorWithMessage(projectNameInvalid) } if !util.CheckTopicName(topicName) { return nil, NewInvalidParameterErrorWithMessage(topicNameInvalid) } path := fmt.Sprintf(topicPath, projectName, topicName) reqPara := &RequestParameter{ Header: map[string]string{httpHeaderContentType: httpJsonContent}, } lts := &DeleteTopicSchemaRequest{ Action: "DeleteSchema", VersionId: versionId, } _, commonResp, err := datahub.Client.Post(path, lts, reqPara) if err != nil { return nil, err } return NewDeleteTopicSchemaResult(commonResp) } func (datahub *DataHub) getSchemaRegistry() *schemaRegistryClient { return datahub.schemaClient } type DataHubPB struct { DataHub } func (datahub *DataHubPB) PutRecords(projectName, topicName string, records []IRecord) (*PutRecordsResult, error) { if !util.CheckProjectName(projectName) { return nil, NewInvalidParameterErrorWithMessage(projectNameInvalid) } if !util.CheckTopicName(topicName) { return nil, NewInvalidParameterErrorWithMessage(topicNameInvalid) } path := fmt.Sprintf(shardsPath, projectName, topicName) reqPara := &RequestParameter{ Header: map[string]string{ httpHeaderContentType: httpProtoContent, httpHeaderRequestAction: httpPublistContent}, } prr := &PutPBRecordsRequest{ Records: records, } respBody, commonResp, err := datahub.Client.Post(path, prr, reqPara) if err != nil { return nil, err } return NewPutPBRecordsResult(respBody, commonResp) } func (datahub *DataHubPB) PutRecordsByShard(projectName, topicName, shardId string, records []IRecord) (*PutRecordsByShardResult, error) { if !util.CheckProjectName(projectName) { return nil, NewInvalidParameterErrorWithMessage(projectNameInvalid) } if !util.CheckTopicName(topicName) { return nil, NewInvalidParameterErrorWithMessage(topicNameInvalid) } if !util.CheckShardId(shardId) { return nil, NewInvalidParameterErrorWithMessage(shardIdInvalid) } path := fmt.Sprintf(shardPath, projectName, topicName, shardId) reqPara := &RequestParameter{ Header: map[string]string{ httpHeaderContentType: httpProtoContent, httpHeaderRequestAction: httpPublistContent}, } prr := &PutPBRecordsRequest{ Records: records, } _, commonResp, err := datahub.Client.Post(path, prr, reqPara) if err != nil { return nil, err } return NewPutRecordsByShardResult(commonResp) } func (datahub *DataHubPB) GetTupleRecords(projectName, topicName, shardId, cursor string, limit int, recordSchema *RecordSchema) (*GetRecordsResult, error) { if !util.CheckProjectName(projectName) { return nil, NewInvalidParameterErrorWithMessage(projectNameInvalid) } if !util.CheckTopicName(topicName) { return nil, NewInvalidParameterErrorWithMessage(topicNameInvalid) } if !util.CheckShardId(shardId) { return nil, NewInvalidParameterErrorWithMessage(shardIdInvalid) } path := fmt.Sprintf(shardPath, projectName, topicName, shardId) reqPara := &RequestParameter{ Header: map[string]string{ httpHeaderContentType: httpProtoContent, httpHeaderRequestAction: httpSubscribeContent}, } grr := &GetPBRecordRequest{ Cursor: cursor, Limit: limit, } respBody, commonResp, err := datahub.Client.Post(path, grr, reqPara) if err != nil { return nil, err } return NewGetPBRecordsResult(respBody, recordSchema, commonResp) } func (datahub *DataHubPB) GetBlobRecords(projectName, topicName, shardId, cursor string, limit int) (*GetRecordsResult, error) { if !util.CheckProjectName(projectName) { return nil, NewInvalidParameterErrorWithMessage(projectNameInvalid) } if !util.CheckTopicName(topicName) { return nil, NewInvalidParameterErrorWithMessage(topicNameInvalid) } if !util.CheckShardId(shardId) { return nil, NewInvalidParameterErrorWithMessage(shardIdInvalid) } path := fmt.Sprintf(shardPath, projectName, topicName, shardId) reqPara := &RequestParameter{ Header: map[string]string{ httpHeaderContentType: httpProtoContent, httpHeaderRequestAction: httpSubscribeContent}, } grr := &GetPBRecordRequest{ Cursor: cursor, Limit: limit, } respBody, commonResp, err := datahub.Client.Post(path, grr, reqPara) if err != nil { return nil, err } return NewGetPBRecordsResult(respBody, nil, commonResp) } type DataHubBatch struct { DataHub } func (datahub *DataHubBatch) PutRecords(projectName, topicName string, records []IRecord) (*PutRecordsResult, error) { return nil, errors.New("not support this method") } func (datahub *DataHubBatch) PutRecordsByShard(projectName, topicName, shardId string, records []IRecord) (*PutRecordsByShardResult, error) { if !util.CheckProjectName(projectName) { return nil, NewInvalidParameterErrorWithMessage(projectNameInvalid) } if !util.CheckTopicName(topicName) { return nil, NewInvalidParameterErrorWithMessage(topicNameInvalid) } if !util.CheckShardId(shardId) { return nil, NewInvalidParameterErrorWithMessage(shardIdInvalid) } path := fmt.Sprintf(shardPath, projectName, topicName, shardId) reqPara := &RequestParameter{ Header: map[string]string{ httpHeaderContentType: httpProtoBatchContent, httpHeaderRequestAction: httpPublistContent}, } serializer := newBatchSerializer(projectName, topicName, datahub.cType, datahub.schemaClient) prr := &PutBatchRecordsRequest{ serializer: serializer, Records: records, } _, commonResp, err := datahub.Client.Post(path, prr, reqPara) if err != nil { return nil, err } return NewPutRecordsByShardResult(commonResp) } func (datahub *DataHubBatch) GetTupleRecords(projectName, topicName, shardId, cursor string, limit int, recordSchema *RecordSchema) (*GetRecordsResult, error) { if !util.CheckProjectName(projectName) { return nil, NewInvalidParameterErrorWithMessage(projectNameInvalid) } if !util.CheckTopicName(topicName) { return nil, NewInvalidParameterErrorWithMessage(topicNameInvalid) } if !util.CheckShardId(shardId) { return nil, NewInvalidParameterErrorWithMessage(shardIdInvalid) } path := fmt.Sprintf(shardPath, projectName, topicName, shardId) reqPara := &RequestParameter{ Header: map[string]string{ httpHeaderContentType: httpProtoBatchContent, httpHeaderRequestAction: httpSubscribeContent}, } gbr := &GetBatchRecordRequest{ GetPBRecordRequest{ Cursor: cursor, Limit: limit, }, } respBody, commonResp, err := datahub.Client.Post(path, gbr, reqPara) if err != nil { return nil, err } deserializer := newBatchDeserializer(projectName, topicName, shardId, recordSchema, datahub.schemaClient) return NewGetBatchRecordsResult(respBody, recordSchema, commonResp, deserializer) } func (datahub *DataHubBatch) GetBlobRecords(projectName, topicName, shardId, cursor string, limit int) (*GetRecordsResult, error) { return datahub.GetTupleRecords(projectName, topicName, shardId, cursor, limit, nil) }