1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372137313741375137613771378137913801381138213831384138513861387138813891390139113921393139413951396139713981399140014011402140314041405140614071408140914101411141214131414141514161417141814191420142114221423142414251426142714281429143014311432143314341435143614371438143914401441144214431444144514461447144814491450145114521453145414551456145714581459146014611462146314641465146614671468146914701471147214731474147514761477147814791480148114821483148414851486148714881489149014911492149314941495149614971498149915001501150215031504150515061507150815091510151115121513151415151516151715181519152015211522152315241525152615271528152915301531153215331534153515361537153815391540154115421543154415451546154715481549155015511552155315541555155615571558155915601561156215631564156515661567156815691570157115721573157415751576157715781579158015811582158315841585158615871588158915901591159215931594159515961597159815991600160116021603160416051606160716081609161016111612161316141615161616171618161916201621162216231624162516261627162816291630163116321633163416351636163716381639164016411642164316441645164616471648164916501651165216531654165516561657165816591660166116621663166416651666166716681669167016711672167316741675167616771678167916801681168216831684168516861687168816891690169116921693169416951696169716981699170017011702170317041705170617071708170917101711171217131714171517161717171817191720172117221723172417251726172717281729173017311732173317341735173617371738 |
- package datahub
- import (
- "errors"
- "fmt"
- "github.com/aliyun/aliyun-datahub-sdk-go/datahub/util"
- "time"
- )
- type DataHub struct {
- Client *RestClient
- // for batch client
- cType CompressorType
- schemaClient *schemaRegistryClient
- }
- // ListProjects list all projects
- func (datahub *DataHub) ListProject() (*ListProjectResult, error) {
- path := projectsPath
- reqPara := &RequestParameter{
- Header: map[string]string{httpHeaderContentType: httpJsonContent},
- }
- responseBody, commonResp, err := datahub.Client.Get(path, reqPara)
- if err != nil {
- return nil, err
- }
- return NewListProjectResult(responseBody, commonResp)
- }
- // ListProjects list projects with filter
- func (datahub *DataHub) ListProjectWithFilter(filter string) (*ListProjectResult, error) {
- path := projectsPath
- reqPara := &RequestParameter{
- Header: map[string]string{httpHeaderContentType: httpJsonContent},
- Query: map[string]string{httpFilterQuery: filter},
- }
- responseBody, commonResp, err := datahub.Client.Get(path, reqPara)
- if err != nil {
- return nil, err
- }
- return NewListProjectResult(responseBody, commonResp)
- }
- // CreateProject create new project
- func (datahub *DataHub) CreateProject(projectName, comment string) (*CreateProjectResult, error) {
- if !util.CheckProjectName(projectName) {
- return nil, NewInvalidParameterErrorWithMessage(projectNameInvalid)
- }
- if !util.CheckComment(comment) {
- return nil, NewInvalidParameterErrorWithMessage(commentInvalid)
- }
- path := fmt.Sprintf(projectPath, projectName)
- reqPara := &RequestParameter{
- Header: map[string]string{httpHeaderContentType: httpJsonContent},
- }
- requestBody := &CreateProjectRequest{
- Comment: comment,
- }
- _, commonResp, err := datahub.Client.Post(path, requestBody, reqPara)
- if err != nil {
- return nil, err
- }
- return NewCreateProjectResult(commonResp)
- }
- // UpdateProject update project
- func (datahub *DataHub) UpdateProject(projectName, comment string) (*UpdateProjectResult, error) {
- if !util.CheckProjectName(projectName) {
- return nil, NewInvalidParameterErrorWithMessage(projectNameInvalid)
- }
- if !util.CheckComment(comment) {
- return nil, NewInvalidParameterErrorWithMessage(commentInvalid)
- }
- path := fmt.Sprintf(projectPath, projectName)
- reqPara := &RequestParameter{
- Header: map[string]string{httpHeaderContentType: httpJsonContent},
- }
- requestBody := &UpdateProjectRequest{
- Comment: comment,
- }
- _, commonResp, err := datahub.Client.Put(path, requestBody, reqPara)
- if err != nil {
- return nil, err
- }
- return NewUpdateProjectResult(commonResp)
- }
- // DeleteProject delete project
- func (datahub *DataHub) DeleteProject(projectName string) (*DeleteProjectResult, error) {
- if !util.CheckProjectName(projectName) {
- return nil, NewInvalidParameterErrorWithMessage(projectNameInvalid)
- }
- path := fmt.Sprintf(projectPath, projectName)
- reqPara := &RequestParameter{
- Header: map[string]string{httpHeaderContentType: httpJsonContent},
- }
- _, commonResp, err := datahub.Client.Delete(path, reqPara)
- if err != nil {
- return nil, err
- }
- return NewDeleteProjectResult(commonResp)
- }
- // GetProject get a project deatil named the given name
- func (datahub *DataHub) GetProject(projectName string) (*GetProjectResult, error) {
- if !util.CheckProjectName(projectName) {
- return nil, NewInvalidParameterErrorWithMessage(projectNameInvalid)
- }
- path := fmt.Sprintf(projectPath, projectName)
- reqPara := &RequestParameter{
- Header: map[string]string{httpHeaderContentType: httpJsonContent},
- }
- respBody, commonResp, err := datahub.Client.Get(path, reqPara)
- if err != nil {
- return nil, err
- }
- result, err := NewGetProjectResult(respBody, commonResp)
- if err != nil {
- return nil, err
- }
- result.ProjectName = projectName
- return result, nil
- }
- // Update project vpc white list.
- func (datahub *DataHub) UpdateProjectVpcWhitelist(projectName, vpcIds string) (*UpdateProjectVpcWhitelistResult, error) {
- if !util.CheckProjectName(projectName) {
- return nil, NewInvalidParameterErrorWithMessage(projectNameInvalid)
- }
- path := fmt.Sprintf(projectPath, projectName)
- reqPara := &RequestParameter{
- Header: map[string]string{httpHeaderContentType: httpJsonContent},
- }
- requestBody := &UpdateProjectVpcWhitelistRequest{
- VpcIds: vpcIds,
- }
- _, commonResp, err := datahub.Client.Put(path, requestBody, reqPara)
- if err != nil {
- return nil, err
- }
- return NewUpdateProjectVpcWhitelistResult(commonResp)
- }
- func (datahub *DataHub) WaitAllShardsReady(projectName, topicName string) bool {
- return datahub.WaitAllShardsReadyWithTime(projectName, topicName, minWaitingTimeInMs/1000)
- }
- func (datahub *DataHub) WaitAllShardsReadyWithTime(projectName, topicName string, timeout int64) bool {
- ready := make(chan bool)
- if timeout > 0 {
- go func(timeout int64) {
- time.Sleep(time.Duration(timeout) * time.Second)
- ready <- false
- }(timeout)
- }
- go func(datahub DataHubApi) {
- for {
- ls, err := datahub.ListShard(projectName, topicName)
- if err != nil {
- time.Sleep(1 * time.Microsecond)
- continue
- }
- ok := true
- for _, shard := range ls.Shards {
- switch shard.State {
- case ACTIVE, CLOSED:
- continue
- default:
- ok = false
- break
- }
- }
- if ok {
- break
- }
- }
- ready <- true
- }(datahub)
- return <-ready
- }
- func (datahub *DataHub) ListTopic(projectName string) (*ListTopicResult, error) {
- if !util.CheckProjectName(projectName) {
- return nil, NewInvalidParameterErrorWithMessage(projectNameInvalid)
- }
- path := fmt.Sprintf(topicsPath, projectName)
- reqPara := &RequestParameter{
- Header: map[string]string{httpHeaderContentType: httpJsonContent},
- }
- respBody, commonResp, err := datahub.Client.Get(path, reqPara)
- if err != nil {
- return nil, err
- }
- return NewListTopicResult(respBody, commonResp)
- }
- func (datahub *DataHub) ListTopicWithFilter(projectName, filter string) (*ListTopicResult, error) {
- if !util.CheckProjectName(projectName) {
- return nil, NewInvalidParameterErrorWithMessage(projectNameInvalid)
- }
- path := fmt.Sprintf(topicsPath, projectName)
- reqPara := &RequestParameter{
- Header: map[string]string{httpHeaderContentType: httpJsonContent},
- Query: map[string]string{httpFilterQuery: filter},
- }
- respBody, commonResp, err := datahub.Client.Get(path, reqPara)
- if err != nil {
- return nil, err
- }
- return NewListTopicResult(respBody, commonResp)
- }
- func (datahub *DataHub) CreateBlobTopic(projectName, topicName, comment string, shardCount, lifeCycle int) (*CreateBlobTopicResult, error) {
- para := &CreateTopicParameter{
- ShardCount: shardCount,
- LifeCycle: lifeCycle,
- Comment: comment,
- RecordType: BLOB,
- RecordSchema: nil,
- ExpandMode: SPLIT_EXTEND,
- }
- ret, err := datahub.CreateTopicWithPara(projectName, topicName, para)
- if err != nil {
- return nil, err
- }
- return NewCreateBlobTopicResult(&ret.CommonResponseResult)
- }
- func (datahub *DataHub) CreateTupleTopic(projectName, topicName, comment string, shardCount, lifeCycle int, recordSchema *RecordSchema) (*CreateTupleTopicResult, error) {
- para := &CreateTopicParameter{
- ShardCount: shardCount,
- LifeCycle: lifeCycle,
- Comment: comment,
- RecordType: TUPLE,
- RecordSchema: recordSchema,
- ExpandMode: SPLIT_EXTEND,
- }
- ret, err := datahub.CreateTopicWithPara(projectName, topicName, para)
- if err != nil {
- return nil, err
- }
- return NewCreateTupleTopicResult(&ret.CommonResponseResult)
- }
- func (datahub *DataHub) CreateTopicWithPara(projectName, topicName string, para *CreateTopicParameter) (*CreateTopicWithParaResult, error) {
- if !util.CheckProjectName(projectName) {
- return nil, NewInvalidParameterErrorWithMessage(projectNameInvalid)
- }
- if !util.CheckTopicName(topicName) {
- return nil, NewInvalidParameterErrorWithMessage(topicNameInvalid)
- }
- if para == nil {
- return nil, NewInvalidParameterErrorWithMessage(parameterNull)
- }
- if !util.CheckComment(para.Comment) {
- return nil, NewInvalidParameterErrorWithMessage(commentInvalid)
- }
- if para.RecordType != TUPLE && para.RecordType != BLOB {
- return nil, NewInvalidParameterErrorWithMessage(fmt.Sprintf("Invalid RecordType: %s", para.RecordType))
- }
- if para.RecordType == TUPLE && para.RecordSchema == nil {
- return nil, NewInvalidParameterErrorWithMessage("Tuple topic must set RecordSchema")
- }
- if para.LifeCycle <= 0 {
- return nil, NewInvalidParameterErrorWithMessage(lifecycleInvalid)
- }
- path := fmt.Sprintf(topicPath, projectName, topicName)
- reqPara := &RequestParameter{
- Header: map[string]string{httpHeaderContentType: httpJsonContent},
- }
- ctr := &CreateTopicRequest{
- Action: "create",
- ShardCount: para.ShardCount,
- Lifecycle: para.LifeCycle,
- RecordType: para.RecordType,
- RecordSchema: para.RecordSchema,
- Comment: para.Comment,
- ExpandMode: para.ExpandMode,
- }
- _, commonResp, err := datahub.Client.Post(path, ctr, reqPara)
- if err != nil {
- return nil, err
- }
- return NewCreateTopicWithParaResult(commonResp)
- }
- func (datahub *DataHub) UpdateTopic(projectName, topicName, comment string) (*UpdateTopicResult, error) {
- para := &UpdateTopicParameter{
- Comment: comment,
- }
- return datahub.UpdateTopicWithPara(projectName, topicName, para)
- }
- // Update topic meta information. Only support comment and lifeCycle now.
- func (datahub *DataHub) UpdateTopicWithPara(projectName, topicName string, para *UpdateTopicParameter) (*UpdateTopicResult, error) {
- if !util.CheckProjectName(projectName) {
- return nil, NewInvalidParameterErrorWithMessage(projectNameInvalid)
- }
- if !util.CheckTopicName(topicName) {
- return nil, NewInvalidParameterErrorWithMessage(topicNameInvalid)
- }
- if para == nil {
- return nil, NewInvalidParameterErrorWithMessage(parameterNull)
- }
- if !util.CheckComment(para.Comment) {
- return nil, NewInvalidParameterErrorWithMessage(commentInvalid)
- }
- path := fmt.Sprintf(topicPath, projectName, topicName)
- reqPara := &RequestParameter{
- Header: map[string]string{httpHeaderContentType: httpJsonContent},
- }
- ut := &UpdateTopicRequest{
- Lifecycle: para.LifeCycle,
- Comment: para.Comment,
- }
- _, commonResp, err := datahub.Client.Put(path, ut, reqPara)
- if err != nil {
- return nil, err
- }
- return NewUpdateTopicResult(commonResp)
- }
- func (datahub *DataHub) DeleteTopic(projectName, topicName string) (*DeleteTopicResult, error) {
- if !util.CheckProjectName(projectName) {
- return nil, NewInvalidParameterErrorWithMessage(projectNameInvalid)
- }
- if !util.CheckTopicName(topicName) {
- return nil, NewInvalidParameterErrorWithMessage(topicNameInvalid)
- }
- path := fmt.Sprintf(topicPath, projectName, topicName)
- reqPara := &RequestParameter{
- Header: map[string]string{httpHeaderContentType: httpJsonContent},
- }
- _, commonResp, err := datahub.Client.Delete(path, reqPara)
- if err != nil {
- return nil, err
- }
- return NewDeleteTopicResult(commonResp)
- }
- func (datahub *DataHub) GetTopic(projectName, topicName string) (*GetTopicResult, error) {
- if !util.CheckProjectName(projectName) {
- return nil, NewInvalidParameterErrorWithMessage(projectNameInvalid)
- }
- if !util.CheckTopicName(topicName) {
- return nil, NewInvalidParameterErrorWithMessage(topicNameInvalid)
- }
- path := fmt.Sprintf(topicPath, projectName, topicName)
- reqPara := &RequestParameter{
- Header: map[string]string{httpHeaderContentType: httpJsonContent},
- }
- respBody, commonResp, err := datahub.Client.Get(path, reqPara)
- if err != nil {
- return nil, err
- }
- result, err := NewGetTopicResult(respBody, commonResp)
- if err != nil {
- return nil, err
- }
- result.ProjectName = projectName
- result.TopicName = topicName
- return result, nil
- }
- func (datahub *DataHub) ListShard(projectName, topicName string) (*ListShardResult, error) {
- if !util.CheckProjectName(projectName) {
- return nil, NewInvalidParameterErrorWithMessage(projectNameInvalid)
- }
- if !util.CheckTopicName(topicName) {
- return nil, NewInvalidParameterErrorWithMessage(topicNameInvalid)
- }
- path := fmt.Sprintf(shardsPath, projectName, topicName)
- reqPara := &RequestParameter{
- Header: map[string]string{httpHeaderContentType: httpJsonContent},
- }
- respBody, commonResp, err := datahub.Client.Get(path, reqPara)
- if err != nil {
- return nil, err
- }
- return NewListShardResult(respBody, commonResp)
- }
- func (datahub *DataHub) SplitShard(projectName, topicName, shardId string) (*SplitShardResult, error) {
- if !util.CheckProjectName(projectName) {
- return nil, NewInvalidParameterErrorWithMessage(projectNameInvalid)
- }
- if !util.CheckTopicName(topicName) {
- return nil, NewInvalidParameterErrorWithMessage(topicNameInvalid)
- }
- if !util.CheckShardId(shardId) {
- return nil, NewInvalidParameterErrorWithMessage(shardIdInvalid)
- }
- splitKey, err := generateSpliteKey(projectName, topicName, shardId, datahub)
- if err != nil {
- return nil, err
- }
- path := fmt.Sprintf(shardsPath, projectName, topicName)
- reqPara := &RequestParameter{
- Header: map[string]string{httpHeaderContentType: httpJsonContent},
- }
- ssr := &SplitShardRequest{
- Action: "split",
- ShardId: shardId,
- SplitKey: splitKey,
- }
- respBody, commonResp, err := datahub.Client.Post(path, ssr, reqPara)
- if err != nil {
- return nil, err
- }
- return NewSplitShardResult(respBody, commonResp)
- }
- func (datahub *DataHub) SplitShardBySplitKey(projectName, topicName, shardId, splitKey string) (*SplitShardResult, error) {
- if !util.CheckProjectName(projectName) {
- return nil, NewInvalidParameterErrorWithMessage(projectNameInvalid)
- }
- if !util.CheckTopicName(topicName) {
- return nil, NewInvalidParameterErrorWithMessage(topicNameInvalid)
- }
- if !util.CheckShardId(shardId) {
- return nil, NewInvalidParameterErrorWithMessage(shardIdInvalid)
- }
- path := fmt.Sprintf(shardsPath, projectName, topicName)
- reqPara := &RequestParameter{
- Header: map[string]string{httpHeaderContentType: httpJsonContent},
- }
- ssr := &SplitShardRequest{
- Action: "split",
- ShardId: shardId,
- SplitKey: splitKey,
- }
- respBody, commonResp, err := datahub.Client.Post(path, ssr, reqPara)
- if err != nil {
- return nil, err
- }
- return NewSplitShardResult(respBody, commonResp)
- }
- func (datahub *DataHub) MergeShard(projectName, topicName, shardId, adjacentShardId string) (*MergeShardResult, error) {
- if !util.CheckProjectName(projectName) {
- return nil, NewInvalidParameterErrorWithMessage(projectNameInvalid)
- }
- if !util.CheckTopicName(topicName) {
- return nil, NewInvalidParameterErrorWithMessage(topicNameInvalid)
- }
- if !util.CheckShardId(shardId) || !util.CheckShardId(adjacentShardId) {
- return nil, NewInvalidParameterErrorWithMessage(shardIdInvalid)
- }
- path := fmt.Sprintf(shardsPath, projectName, topicName)
- reqPara := &RequestParameter{
- Header: map[string]string{httpHeaderContentType: httpJsonContent},
- }
- mss := &MergeShardRequest{
- Action: "merge",
- ShardId: shardId,
- AdjacentShardId: adjacentShardId,
- }
- respBody, commonResp, err := datahub.Client.Post(path, mss, reqPara)
- if err != nil {
- return nil, err
- }
- return NewMergeShardResult(respBody, commonResp)
- }
- func (datahub *DataHub) ExtendShard(projectName, topicName string, shardCount int) (*ExtendShardResult, error) {
- if !util.CheckProjectName(projectName) {
- return nil, NewInvalidParameterErrorWithMessage(projectNameInvalid)
- }
- if !util.CheckTopicName(topicName) {
- return nil, NewInvalidParameterErrorWithMessage(topicNameInvalid)
- }
- if shardCount <= 0 {
- return nil, NewInvalidParameterErrorWithMessage("shardCount is invalid")
- }
- path := fmt.Sprintf(shardsPath, projectName, topicName)
- reqPara := &RequestParameter{
- Header: map[string]string{httpHeaderContentType: httpJsonContent},
- }
- mss := &ExtendShardRequest{
- Action: "extend",
- ExtendMode: "TO",
- ShardCount: shardCount,
- }
- _, commonResp, err := datahub.Client.Post(path, mss, reqPara)
- if err != nil {
- return nil, err
- }
- return NewExtendShardResult(commonResp)
- }
- func (datahub *DataHub) GetCursor(projectName, topicName, shardId string, ctype CursorType, param ...int64) (*GetCursorResult, error) {
- if !util.CheckProjectName(projectName) {
- return nil, NewInvalidParameterErrorWithMessage(projectNameInvalid)
- }
- if !util.CheckTopicName(topicName) {
- return nil, NewInvalidParameterErrorWithMessage(topicNameInvalid)
- }
- if !util.CheckShardId(shardId) {
- return nil, NewInvalidParameterErrorWithMessage(shardIdInvalid)
- }
- if len(param) > 1 {
- return nil, NewInvalidParameterErrorWithMessage(parameterNumInvalid)
- }
- path := fmt.Sprintf(shardPath, projectName, topicName, shardId)
- reqPara := &RequestParameter{
- Header: map[string]string{httpHeaderContentType: httpJsonContent},
- }
- gcr := &GetCursorRequest{
- Action: "cursor",
- CursorType: ctype,
- }
- switch ctype {
- case OLDEST, LATEST:
- if len(param) != 0 {
- return nil, NewInvalidParameterErrorWithMessage("Not need extra parameter when CursorType OLDEST or LATEST")
- }
- case SYSTEM_TIME:
- if len(param) != 1 {
- return nil, NewInvalidParameterErrorWithMessage("Timestamp must be set when CursorType is SYSTEM_TIME")
- }
- gcr.SystemTime = param[0]
- case SEQUENCE:
- if len(param) != 1 {
- return nil, NewInvalidParameterErrorWithMessage("Sequence must be set when CursorType is SEQUENCE")
- }
- gcr.Sequence = param[0]
- }
- respBody, commonResp, err := datahub.Client.Post(path, gcr, reqPara)
- if err != nil {
- return nil, err
- }
- return NewGetCursorResult(respBody, commonResp)
- }
- func (datahub *DataHub) PutRecords(projectName, topicName string, records []IRecord) (*PutRecordsResult, error) {
- if !util.CheckProjectName(projectName) {
- return nil, NewInvalidParameterErrorWithMessage(projectNameInvalid)
- }
- if !util.CheckTopicName(topicName) {
- return nil, NewInvalidParameterErrorWithMessage(topicNameInvalid)
- }
- if records == nil || len(records) == 0 {
- return nil, NewInvalidParameterErrorWithMessage(recordsInvalid)
- }
- path := fmt.Sprintf(shardsPath, projectName, topicName)
- reqPara := &RequestParameter{
- Header: map[string]string{httpHeaderContentType: httpJsonContent},
- }
- prr := &PutRecordsRequest{
- Action: "pub",
- Records: records,
- }
- respBody, commonResp, err := datahub.Client.Post(path, prr, reqPara)
- if err != nil {
- return nil, err
- }
- return NewPutRecordsResult(respBody, commonResp)
- }
- func (datahub *DataHub) PutRecordsByShard(projectName, topicName, shardId string, records []IRecord) (*PutRecordsByShardResult, error) {
- return nil, errors.New("not support this method")
- }
- func (datahub *DataHub) GetTupleRecords(projectName, topicName, shardId, cursor string, limit int, recordSchema *RecordSchema) (*GetRecordsResult, error) {
- if !util.CheckProjectName(projectName) {
- return nil, NewInvalidParameterErrorWithMessage(projectNameInvalid)
- }
- if !util.CheckTopicName(topicName) {
- return nil, NewInvalidParameterErrorWithMessage(topicNameInvalid)
- }
- if !util.CheckShardId(shardId) {
- return nil, NewInvalidParameterErrorWithMessage(shardIdInvalid)
- }
- if recordSchema == nil {
- return nil, NewInvalidParameterErrorWithMessage(missingRecordSchema)
- }
- path := fmt.Sprintf(shardPath, projectName, topicName, shardId)
- reqPara := &RequestParameter{
- Header: map[string]string{httpHeaderContentType: httpJsonContent},
- }
- grr := &GetRecordRequest{
- Action: "sub",
- Cursor: cursor,
- Limit: limit,
- }
- respBody, commonResp, err := datahub.Client.Post(path, grr, reqPara)
- if err != nil {
- return nil, err
- }
- ret, err := NewGetRecordsResult(respBody, recordSchema, commonResp)
- if err != nil {
- return nil, err
- }
- for _, record := range ret.Records {
- if _, ok := record.(*TupleRecord); !ok {
- return nil, NewInvalidParameterErrorWithMessage("Shouldn't call this method for BLOB topic")
- }
- }
- return ret, nil
- }
- func (datahub *DataHub) GetBlobRecords(projectName, topicName, shardId, cursor string, limit int) (*GetRecordsResult, error) {
- if !util.CheckProjectName(projectName) {
- return nil, NewInvalidParameterErrorWithMessage(projectNameInvalid)
- }
- if !util.CheckTopicName(topicName) {
- return nil, NewInvalidParameterErrorWithMessage(topicNameInvalid)
- }
- if !util.CheckShardId(shardId) {
- return nil, NewInvalidParameterErrorWithMessage(shardIdInvalid)
- }
- path := fmt.Sprintf(shardPath, projectName, topicName, shardId)
- reqPara := &RequestParameter{
- Header: map[string]string{httpHeaderContentType: httpJsonContent},
- }
- grr := &GetRecordRequest{
- Action: "sub",
- Cursor: cursor,
- Limit: limit,
- }
- respBody, commonResp, err := datahub.Client.Post(path, grr, reqPara)
- if err != nil {
- return nil, err
- }
- return NewGetRecordsResult(respBody, nil, commonResp)
- }
- func (datahub *DataHub) AppendField(projectName, topicName string, field Field) (*AppendFieldResult, error) {
- if !util.CheckProjectName(projectName) {
- return nil, NewInvalidParameterErrorWithMessage(projectNameInvalid)
- }
- if !util.CheckTopicName(topicName) {
- return nil, NewInvalidParameterErrorWithMessage(topicNameInvalid)
- }
- path := fmt.Sprintf(topicPath, projectName, topicName)
- reqPara := &RequestParameter{
- Header: map[string]string{httpHeaderContentType: httpJsonContent},
- }
- afr := &AppendFieldRequest{
- Action: "AppendField",
- FieldName: field.Name,
- FieldType: field.Type,
- }
- _, commonResp, err := datahub.Client.Post(path, afr, reqPara)
- if err != nil {
- return nil, err
- }
- return NewAppendFieldResult(commonResp)
- }
- func (datahub *DataHub) GetMeterInfo(projectName, topicName, shardId string) (*GetMeterInfoResult, error) {
- if !util.CheckProjectName(projectName) {
- return nil, NewInvalidParameterErrorWithMessage(projectNameInvalid)
- }
- if !util.CheckTopicName(topicName) {
- return nil, NewInvalidParameterErrorWithMessage(topicNameInvalid)
- }
- if !util.CheckShardId(shardId) {
- return nil, NewInvalidParameterErrorWithMessage(shardIdInvalid)
- }
- path := fmt.Sprintf(shardPath, projectName, topicName, shardId)
- reqPara := &RequestParameter{
- Header: map[string]string{httpHeaderContentType: httpJsonContent},
- }
- gmir := &GetMeterInfoRequest{
- Action: "meter",
- }
- respBody, commonResp, err := datahub.Client.Post(path, gmir, reqPara)
- if err != nil {
- return nil, err
- }
- return NewGetMeterInfoResult(respBody, commonResp)
- }
- func (datahub *DataHub) ListConnector(projectName, topicName string) (*ListConnectorResult, error) {
- if !util.CheckProjectName(projectName) {
- return nil, NewInvalidParameterErrorWithMessage(projectNameInvalid)
- }
- if !util.CheckTopicName(topicName) {
- return nil, NewInvalidParameterErrorWithMessage(topicNameInvalid)
- }
- path := fmt.Sprintf(connectorsPath, projectName, topicName)
- reqPara := &RequestParameter{
- Header: map[string]string{httpHeaderContentType: httpJsonContent},
- Query: map[string]string{httpHeaderConnectorMode: "id"},
- }
- respBody, commonResp, err := datahub.Client.Get(path, reqPara)
- if err != nil {
- return nil, err
- }
- return NewListConnectorResult(respBody, commonResp)
- }
- func (datahub *DataHub) CreateConnector(projectName, topicName string, cType ConnectorType, columnFields []string, config interface{}) (*CreateConnectorResult, error) {
- return datahub.CreateConnectorWithStartTime(projectName, topicName, cType, columnFields, -1, config)
- }
- func (datahub *DataHub) CreateConnectorWithStartTime(projectName, topicName string, cType ConnectorType,
- columnFields []string, sinkStartTime int64, config interface{}) (*CreateConnectorResult, error) {
- para := &CreateConnectorParameter{
- SinkStartTime: sinkStartTime,
- ConnectorType: cType,
- ColumnFields: columnFields,
- ColumnNameMap: nil,
- Config: config,
- }
- return datahub.CreateConnectorWithPara(projectName, topicName, para)
- }
- func (datahub *DataHub) CreateConnectorWithPara(projectName, topicName string, para *CreateConnectorParameter) (*CreateConnectorResult, error) {
- if !util.CheckProjectName(projectName) {
- return nil, NewInvalidParameterErrorWithMessage(projectNameInvalid)
- }
- if !util.CheckTopicName(topicName) {
- return nil, NewInvalidParameterErrorWithMessage(topicNameInvalid)
- }
- if para == nil {
- return nil, NewInvalidParameterErrorWithMessage(parameterNull)
- }
- if !validateConnectorType(para.ConnectorType) {
- return nil, NewInvalidParameterErrorWithMessage(parameterTypeInvalid)
- }
- path := fmt.Sprintf(connectorPath, projectName, topicName, para.ConnectorType.String())
- reqPara := &RequestParameter{
- Header: map[string]string{httpHeaderContentType: httpJsonContent},
- }
- ccr := &CreateConnectorRequest{
- Action: "create",
- Type: para.ConnectorType,
- SinkStartTime: para.SinkStartTime,
- ColumnFields: para.ColumnFields,
- ColumnNameMap: para.ColumnNameMap,
- Config: para.Config,
- }
- respBody, commonResp, err := datahub.Client.Post(path, ccr, reqPara)
- if err != nil {
- return nil, err
- }
- return NewCreateConnectorResult(respBody, commonResp)
- }
- func (datahub *DataHub) GetConnector(projectName, topicName, connectorId string) (*GetConnectorResult, error) {
- if !util.CheckProjectName(projectName) {
- return nil, NewInvalidParameterErrorWithMessage(projectNameInvalid)
- }
- if !util.CheckTopicName(topicName) {
- return nil, NewInvalidParameterErrorWithMessage(topicNameInvalid)
- }
- path := fmt.Sprintf(connectorPath, projectName, topicName, connectorId)
- reqPara := &RequestParameter{
- Header: map[string]string{httpHeaderContentType: httpJsonContent},
- }
- respBody, commonResp, err := datahub.Client.Get(path, reqPara)
- if err != nil {
- return nil, err
- }
- return NewGetConnectorResult(respBody, commonResp)
- }
- func (datahub *DataHub) UpdateConnector(projectName, topicName, connectorId string, config interface{}) (*UpdateConnectorResult, error) {
- para := &UpdateConnectorParameter{
- ColumnFields: nil,
- ColumnNameMap: nil,
- Config: config,
- }
- return datahub.UpdateConnectorWithPara(projectName, topicName, connectorId, para)
- }
- func (datahub *DataHub) UpdateConnectorWithPara(projectName, topicName, connectorId string, para *UpdateConnectorParameter) (*UpdateConnectorResult, error) {
- if !util.CheckProjectName(projectName) {
- return nil, NewInvalidParameterErrorWithMessage(projectNameInvalid)
- }
- if !util.CheckTopicName(topicName) {
- return nil, NewInvalidParameterErrorWithMessage(topicNameInvalid)
- }
- if para == nil {
- return nil, NewInvalidParameterErrorWithMessage(parameterNull)
- }
- path := fmt.Sprintf(connectorPath, projectName, topicName, connectorId)
- reqPara := &RequestParameter{
- Header: map[string]string{httpHeaderContentType: httpJsonContent},
- }
- ucr := &UpdateConnectorRequest{
- Action: "updateconfig",
- ColumnFields: para.ColumnFields,
- ColumnNameMap: para.ColumnNameMap,
- Config: para.Config,
- }
- _, commonResp, err := datahub.Client.Post(path, ucr, reqPara)
- if err != nil {
- return nil, err
- }
- return NewUpdateConnectorResult(commonResp)
- }
- func (datahub *DataHub) DeleteConnector(projectName, topicName, connectorId string) (*DeleteConnectorResult, error) {
- if !util.CheckProjectName(projectName) {
- return nil, NewInvalidParameterErrorWithMessage(projectNameInvalid)
- }
- if !util.CheckTopicName(topicName) {
- return nil, NewInvalidParameterErrorWithMessage(topicNameInvalid)
- }
- path := fmt.Sprintf(connectorPath, projectName, topicName, connectorId)
- reqPara := &RequestParameter{
- Header: map[string]string{httpHeaderContentType: httpJsonContent},
- }
- _, commonResp, err := datahub.Client.Delete(path, reqPara)
- if err != nil {
- return nil, err
- }
- return NewDeleteConnectorResult(commonResp)
- }
- func (datahub *DataHub) GetConnectorDoneTime(projectName, topicName, connectorId string) (*GetConnectorDoneTimeResult, error) {
- if !util.CheckProjectName(projectName) {
- return nil, NewInvalidParameterErrorWithMessage(projectNameInvalid)
- }
- if !util.CheckTopicName(topicName) {
- return nil, NewInvalidParameterErrorWithMessage(topicNameInvalid)
- }
- path := fmt.Sprintf(connectorPath, projectName, topicName, connectorId)
- reqPara := &RequestParameter{
- Header: map[string]string{httpHeaderContentType: httpJsonContent},
- Query: map[string]string{"donetime": ""},
- }
- respBody, commonResp, err := datahub.Client.Get(path, reqPara)
- if err != nil {
- return nil, err
- }
- return NewGetConnectorDoneTimeResult(respBody, commonResp)
- }
- func (datahub *DataHub) GetConnectorShardStatus(projectName, topicName, connectorId string) (*GetConnectorShardStatusResult, error) {
- if !util.CheckProjectName(projectName) {
- return nil, NewInvalidParameterErrorWithMessage(projectNameInvalid)
- }
- if !util.CheckTopicName(topicName) {
- return nil, NewInvalidParameterErrorWithMessage(topicNameInvalid)
- }
- path := fmt.Sprintf(connectorPath, projectName, topicName, connectorId)
- reqPara := &RequestParameter{
- Header: map[string]string{httpHeaderContentType: httpJsonContent},
- }
- gcss := &GetConnectorShardStatusRequest{
- Action: "Status",
- }
- respBody, commonResp, err := datahub.Client.Post(path, gcss, reqPara)
- if err != nil {
- return nil, err
- }
- return NewGetConnectorShardStatusResult(respBody, commonResp)
- }
- func (datahub *DataHub) GetConnectorShardStatusByShard(projectName, topicName, connectorId, shardId string) (*GetConnectorShardStatusByShardResult, error) {
- if !util.CheckProjectName(projectName) {
- return nil, NewInvalidParameterErrorWithMessage(projectNameInvalid)
- }
- if !util.CheckTopicName(topicName) {
- return nil, NewInvalidParameterErrorWithMessage(topicNameInvalid)
- }
- if !util.CheckShardId(shardId) {
- return nil, NewInvalidParameterErrorWithMessage(shardIdInvalid)
- }
- path := fmt.Sprintf(connectorPath, projectName, topicName, connectorId)
- reqPara := &RequestParameter{
- Header: map[string]string{httpHeaderContentType: httpJsonContent},
- }
- gcss := &GetConnectorShardStatusRequest{
- Action: "Status",
- ShardId: shardId,
- }
- respBody, commonResp, err := datahub.Client.Post(path, gcss, reqPara)
- if err != nil {
- return nil, err
- }
- return NewGetConnectorShardStatusByShardResult(respBody, commonResp)
- }
- func (datahub *DataHub) ReloadConnector(projectName, topicName, connectorId string) (*ReloadConnectorResult, error) {
- if !util.CheckProjectName(projectName) {
- return nil, NewInvalidParameterErrorWithMessage(projectNameInvalid)
- }
- if !util.CheckTopicName(topicName) {
- return nil, NewInvalidParameterErrorWithMessage(topicNameInvalid)
- }
- path := fmt.Sprintf(connectorPath, projectName, topicName, connectorId)
- reqPara := &RequestParameter{
- Header: map[string]string{httpHeaderContentType: httpJsonContent},
- }
- rcr := &ReloadConnectorRequest{
- Action: "Reload",
- }
- _, commonResp, err := datahub.Client.Post(path, rcr, reqPara)
- if err != nil {
- return nil, err
- }
- return NewReloadConnectorResult(commonResp)
- }
- func (datahub *DataHub) ReloadConnectorByShard(projectName, topicName, connectorId, shardId string) (*ReloadConnectorByShardResult, error) {
- if !util.CheckProjectName(projectName) {
- return nil, NewInvalidParameterErrorWithMessage(projectNameInvalid)
- }
- if !util.CheckTopicName(topicName) {
- return nil, NewInvalidParameterErrorWithMessage(topicNameInvalid)
- }
- if !util.CheckShardId(shardId) {
- return nil, NewInvalidParameterErrorWithMessage(shardIdInvalid)
- }
- path := fmt.Sprintf(connectorPath, projectName, topicName, connectorId)
- reqPara := &RequestParameter{
- Header: map[string]string{httpHeaderContentType: httpJsonContent},
- }
- rcr := &ReloadConnectorRequest{
- Action: "Reload",
- ShardId: shardId,
- }
- _, commonResp, err := datahub.Client.Post(path, rcr, reqPara)
- if err != nil {
- return nil, err
- }
- return NewReloadConnectorByShardResult(commonResp)
- }
- func (datahub *DataHub) UpdateConnectorState(projectName, topicName, connectorId string, state ConnectorState) (*UpdateConnectorStateResult, error) {
- if !util.CheckProjectName(projectName) {
- return nil, NewInvalidParameterErrorWithMessage(projectNameInvalid)
- }
- if !util.CheckTopicName(topicName) {
- return nil, NewInvalidParameterErrorWithMessage(topicNameInvalid)
- }
- if !validateConnectorState(state) {
- return nil, NewInvalidParameterErrorWithMessage(parameterTypeInvalid)
- }
- path := fmt.Sprintf(connectorPath, projectName, topicName, connectorId)
- reqPara := &RequestParameter{
- Header: map[string]string{httpHeaderContentType: httpJsonContent},
- }
- ucsr := &UpdateConnectorStateRequest{
- Action: "updatestate",
- State: state,
- }
- _, commonResp, err := datahub.Client.Post(path, ucsr, reqPara)
- if err != nil {
- return nil, err
- }
- return NewUpdateConnectorStateResult(commonResp)
- }
- func (datahub *DataHub) UpdateConnectorOffset(projectName, topicName, connectorId, shardId string, offset ConnectorOffset) (*UpdateConnectorOffsetResult, error) {
- if !util.CheckProjectName(projectName) {
- return nil, NewInvalidParameterErrorWithMessage(projectNameInvalid)
- }
- if !util.CheckTopicName(topicName) {
- return nil, NewInvalidParameterErrorWithMessage(topicNameInvalid)
- }
- if !util.CheckShardId(shardId) {
- return nil, NewInvalidParameterErrorWithMessage(shardIdInvalid)
- }
- path := fmt.Sprintf(connectorPath, projectName, topicName, connectorId)
- reqPara := &RequestParameter{
- Header: map[string]string{httpHeaderContentType: httpJsonContent},
- }
- ucor := &UpdateConnectorOffsetRequest{
- Action: "updateshardcontext",
- ShardId: shardId,
- Timestamp: offset.Timestamp,
- Sequence: offset.Sequence,
- }
- _, commonResp, err := datahub.Client.Post(path, ucor, reqPara)
- if err != nil {
- return nil, err
- }
- return NewUpdateConnectorOffsetResult(commonResp)
- }
- func (datahub *DataHub) AppendConnectorField(projectName, topicName, connectorId, fieldName string) (*AppendConnectorFieldResult, error) {
- if !util.CheckProjectName(projectName) {
- return nil, NewInvalidParameterErrorWithMessage(projectNameInvalid)
- }
- if !util.CheckTopicName(topicName) {
- return nil, NewInvalidParameterErrorWithMessage(topicNameInvalid)
- }
- path := fmt.Sprintf(connectorPath, projectName, topicName, connectorId)
- reqPara := &RequestParameter{
- Header: map[string]string{httpHeaderContentType: httpJsonContent},
- }
- acfr := &AppendConnectorFieldRequest{
- Action: "appendfield",
- FieldName: fieldName,
- }
- _, commonResp, err := datahub.Client.Post(path, acfr, reqPara)
- if err != nil {
- return nil, err
- }
- return NewAppendConnectorFieldResult(commonResp)
- }
- func (datahub *DataHub) ListSubscription(projectName, topicName string, pageIndex, pageSize int) (*ListSubscriptionResult, error) {
- if !util.CheckProjectName(projectName) {
- return nil, NewInvalidParameterErrorWithMessage(projectNameInvalid)
- }
- if !util.CheckTopicName(topicName) {
- return nil, NewInvalidParameterErrorWithMessage(topicNameInvalid)
- }
- path := fmt.Sprintf(subscriptionsPath, projectName, topicName)
- reqPara := &RequestParameter{
- Header: map[string]string{httpHeaderContentType: httpJsonContent},
- }
- lsr := &ListSubscriptionRequest{
- Action: "list",
- PageIndex: pageIndex,
- PageSize: pageSize,
- }
- respBody, commonResp, err := datahub.Client.Post(path, lsr, reqPara)
- if err != nil {
- return nil, err
- }
- return NewListSubscriptionResult(respBody, commonResp)
- }
- func (datahub *DataHub) CreateSubscription(projectName, topicName, comment string) (*CreateSubscriptionResult, error) {
- if !util.CheckProjectName(projectName) {
- return nil, NewInvalidParameterErrorWithMessage(projectNameInvalid)
- }
- if !util.CheckTopicName(topicName) {
- return nil, NewInvalidParameterErrorWithMessage(topicNameInvalid)
- }
- if !util.CheckComment(comment) {
- return nil, NewInvalidParameterErrorWithMessage(commentInvalid)
- }
- path := fmt.Sprintf(subscriptionsPath, projectName, topicName)
- reqPara := &RequestParameter{
- Header: map[string]string{httpHeaderContentType: httpJsonContent},
- }
- csr := &CreateSubscriptionRequest{
- Action: "create",
- Comment: comment,
- }
- respBody, commonResp, err := datahub.Client.Post(path, csr, reqPara)
- if err != nil {
- return nil, err
- }
- return NewCreateSubscriptionResult(respBody, commonResp)
- }
- func (datahub *DataHub) UpdateSubscription(projectName, topicName, subId, comment string) (*UpdateSubscriptionResult, error) {
- if !util.CheckProjectName(projectName) {
- return nil, NewInvalidParameterErrorWithMessage(projectNameInvalid)
- }
- if !util.CheckTopicName(topicName) {
- return nil, NewInvalidParameterErrorWithMessage(topicNameInvalid)
- }
- if !util.CheckComment(comment) {
- return nil, NewInvalidParameterErrorWithMessage(commentInvalid)
- }
- path := fmt.Sprintf(subscriptionPath, projectName, topicName, subId)
- reqPara := &RequestParameter{
- Header: map[string]string{httpHeaderContentType: httpJsonContent},
- }
- usr := &UpdateSubscriptionRequest{
- Comment: comment,
- }
- _, commonResp, err := datahub.Client.Put(path, usr, reqPara)
- if err != nil {
- return nil, err
- }
- return NewUpdateSubscriptionResult(commonResp)
- }
- func (datahub *DataHub) DeleteSubscription(projectName, topicName, subId string) (*DeleteSubscriptionResult, error) {
- if !util.CheckProjectName(projectName) {
- return nil, NewInvalidParameterErrorWithMessage(projectNameInvalid)
- }
- if !util.CheckTopicName(topicName) {
- return nil, NewInvalidParameterErrorWithMessage(topicNameInvalid)
- }
- path := fmt.Sprintf(subscriptionPath, projectName, topicName, subId)
- reqPara := &RequestParameter{
- Header: map[string]string{httpHeaderContentType: httpJsonContent},
- }
- _, commonResp, err := datahub.Client.Delete(path, reqPara)
- if err != nil {
- return nil, err
- }
- return NewDeleteSubscriptionResult(commonResp)
- }
- func (datahub *DataHub) GetSubscription(projectName, topicName, subId string) (*GetSubscriptionResult, error) {
- if !util.CheckProjectName(projectName) {
- return nil, NewInvalidParameterErrorWithMessage(projectNameInvalid)
- }
- if !util.CheckTopicName(topicName) {
- return nil, NewInvalidParameterErrorWithMessage(topicNameInvalid)
- }
- path := fmt.Sprintf(subscriptionPath, projectName, topicName, subId)
- reqPara := &RequestParameter{
- Header: map[string]string{httpHeaderContentType: httpJsonContent},
- }
- respBody, commonResp, err := datahub.Client.Get(path, reqPara)
- if err != nil {
- return nil, err
- }
- return NewGetSubscriptionResult(respBody, commonResp)
- }
- func (datahub *DataHub) UpdateSubscriptionState(projectName, topicName, subId string, state SubscriptionState) (*UpdateSubscriptionStateResult, error) {
- if !util.CheckProjectName(projectName) {
- return nil, NewInvalidParameterErrorWithMessage(projectNameInvalid)
- }
- if !util.CheckTopicName(topicName) {
- return nil, NewInvalidParameterErrorWithMessage(topicNameInvalid)
- }
- path := fmt.Sprintf(subscriptionPath, projectName, topicName, subId)
- reqPara := &RequestParameter{
- Header: map[string]string{httpHeaderContentType: httpJsonContent},
- }
- usr := &UpdateSubscriptionStateRequest{
- State: state,
- }
- _, commonResp, err := datahub.Client.Put(path, usr, reqPara)
- if err != nil {
- return nil, err
- }
- return NewUpdateSubscriptionStateResult(commonResp)
- }
- func (datahub *DataHub) OpenSubscriptionSession(projectName, topicName, subId string, shardIds []string) (*OpenSubscriptionSessionResult, error) {
- if !util.CheckProjectName(projectName) {
- return nil, NewInvalidParameterErrorWithMessage(projectNameInvalid)
- }
- if !util.CheckTopicName(topicName) {
- return nil, NewInvalidParameterErrorWithMessage(topicNameInvalid)
- }
- for _, id := range shardIds {
- if !util.CheckShardId(id) {
- return nil, NewInvalidParameterErrorWithMessage(shardIdInvalid)
- }
- }
- path := fmt.Sprintf(offsetsPath, projectName, topicName, subId)
- reqPara := &RequestParameter{
- Header: map[string]string{httpHeaderContentType: httpJsonContent},
- }
- ossr := &OpenSubscriptionSessionRequest{
- Action: "open",
- ShardIds: shardIds,
- }
- respBody, commonResp, err := datahub.Client.Post(path, ossr, reqPara)
- if err != nil {
- return nil, err
- }
- return NewOpenSubscriptionSessionResult(respBody, commonResp)
- }
- func (datahub *DataHub) GetSubscriptionOffset(projectName, topicName, subId string, shardIds []string) (*GetSubscriptionOffsetResult, error) {
- if !util.CheckProjectName(projectName) {
- return nil, NewInvalidParameterErrorWithMessage(projectNameInvalid)
- }
- if !util.CheckTopicName(topicName) {
- return nil, NewInvalidParameterErrorWithMessage(topicNameInvalid)
- }
- for _, id := range shardIds {
- if !util.CheckShardId(id) {
- return nil, NewInvalidParameterErrorWithMessage(shardIdInvalid)
- }
- }
- path := fmt.Sprintf(offsetsPath, projectName, topicName, subId)
- reqPara := &RequestParameter{
- Header: map[string]string{httpHeaderContentType: httpJsonContent},
- }
- gsor := &GetSubscriptionOffsetRequest{
- Action: "get",
- ShardIds: shardIds,
- }
- respBody, commonResp, err := datahub.Client.Post(path, gsor, reqPara)
- if err != nil {
- return nil, err
- }
- return NewGetSubscriptionOffsetResult(respBody, commonResp)
- }
- func (datahub *DataHub) CommitSubscriptionOffset(projectName, topicName, subId string, offsets map[string]SubscriptionOffset) (*CommitSubscriptionOffsetResult, error) {
- if !util.CheckProjectName(projectName) {
- return nil, NewInvalidParameterErrorWithMessage(projectNameInvalid)
- }
- if !util.CheckTopicName(topicName) {
- return nil, NewInvalidParameterErrorWithMessage(topicNameInvalid)
- }
- path := fmt.Sprintf(offsetsPath, projectName, topicName, subId)
- reqPara := &RequestParameter{
- Header: map[string]string{httpHeaderContentType: httpJsonContent},
- }
- req := &CommitSubscriptionOffsetRequest{
- Action: "commit",
- Offsets: offsets,
- }
- _, commonResp, err := datahub.Client.Put(path, req, reqPara)
- if err != nil {
- return nil, err
- }
- return NewCommitSubscriptionOffsetResult(commonResp)
- }
- func (datahub *DataHub) ResetSubscriptionOffset(projectName, topicName, subId string, offsets map[string]SubscriptionOffset) (*ResetSubscriptionOffsetResult, error) {
- if !util.CheckProjectName(projectName) {
- return nil, NewInvalidParameterErrorWithMessage(projectNameInvalid)
- }
- if !util.CheckTopicName(topicName) {
- return nil, NewInvalidParameterErrorWithMessage(topicNameInvalid)
- }
- path := fmt.Sprintf(offsetsPath, projectName, topicName, subId)
- reqPara := &RequestParameter{
- Header: map[string]string{httpHeaderContentType: httpJsonContent},
- }
- req := &ResetSubscriptionOffsetRequest{
- Action: "reset",
- Offsets: offsets,
- }
- _, commonResp, err := datahub.Client.Put(path, req, reqPara)
- if err != nil {
- return nil, err
- }
- return NewResetSubscriptionOffsetResult(commonResp)
- }
- func (datahub *DataHub) Heartbeat(projectName, topicName, consumerGroup, consumerId string, versionId int64, holdShardList, readEndShardList []string) (*HeartbeatResult, error) {
- if !util.CheckProjectName(projectName) {
- return nil, NewInvalidParameterErrorWithMessage(projectNameInvalid)
- }
- if !util.CheckTopicName(topicName) {
- return nil, NewInvalidParameterErrorWithMessage(topicNameInvalid)
- }
- for _, id := range holdShardList {
- if !util.CheckShardId(id) {
- return nil, NewInvalidParameterErrorWithMessage(shardIdInvalid)
- }
- }
- for _, id := range readEndShardList {
- if !util.CheckShardId(id) {
- return nil, NewInvalidParameterErrorWithMessage(shardIdInvalid)
- }
- }
- path := fmt.Sprintf(consumerGroupPath, projectName, topicName, consumerGroup)
- reqPara := &RequestParameter{
- Header: map[string]string{httpHeaderContentType: httpJsonContent},
- }
- hr := &HeartbeatRequest{
- Action: "heartbeat",
- ConsumerId: consumerId,
- VersionId: versionId,
- HoldShardList: holdShardList,
- ReadEndShardList: readEndShardList,
- }
- respBody, commonResp, err := datahub.Client.Post(path, hr, reqPara)
- if err != nil {
- return nil, err
- }
- return NewHeartbeatResult(respBody, commonResp)
- }
- func (datahub *DataHub) JoinGroup(projectName, topicName, consumerGroup string, sessionTimeout int64) (*JoinGroupResult, error) {
- if !util.CheckProjectName(projectName) {
- return nil, NewInvalidParameterErrorWithMessage(projectNameInvalid)
- }
- if !util.CheckTopicName(topicName) {
- return nil, NewInvalidParameterErrorWithMessage(topicNameInvalid)
- }
- path := fmt.Sprintf(consumerGroupPath, projectName, topicName, consumerGroup)
- reqPara := &RequestParameter{
- Header: map[string]string{httpHeaderContentType: httpJsonContent},
- }
- jgr := &JoinGroupRequest{
- Action: "joinGroup",
- SessionTimeout: sessionTimeout,
- }
- respBody, commonResp, err := datahub.Client.Post(path, jgr, reqPara)
- if err != nil {
- return nil, err
- }
- return NewJoinGroupResult(respBody, commonResp)
- }
- func (datahub *DataHub) SyncGroup(projectName, topicName, consumerGroup, consumerId string, versionId int64, releaseShardList, readEndShardList []string) (*SyncGroupResult, error) {
- if !util.CheckProjectName(projectName) {
- return nil, NewInvalidParameterErrorWithMessage(projectNameInvalid)
- }
- if !util.CheckTopicName(topicName) {
- return nil, NewInvalidParameterErrorWithMessage(topicNameInvalid)
- }
- if len(releaseShardList) == 0 || len(readEndShardList) == 0 {
- return nil, NewInvalidParameterErrorWithMessage(shardListInvalid)
- }
- for _, id := range releaseShardList {
- if !util.CheckShardId(id) {
- return nil, NewInvalidParameterErrorWithMessage(shardIdInvalid)
- }
- }
- for _, id := range readEndShardList {
- if !util.CheckShardId(id) {
- return nil, NewInvalidParameterErrorWithMessage(shardIdInvalid)
- }
- }
- path := fmt.Sprintf(consumerGroupPath, projectName, topicName, consumerGroup)
- reqPara := &RequestParameter{
- Header: map[string]string{httpHeaderContentType: httpJsonContent},
- }
- sgr := &SyncGroupRequest{
- Action: "syncGroup",
- ConsumerId: consumerId,
- VersionId: versionId,
- ReleaseShardList: releaseShardList,
- ReadEndShardList: readEndShardList,
- }
- _, commonResp, err := datahub.Client.Post(path, sgr, reqPara)
- if err != nil {
- return nil, err
- }
- return NewSyncGroupResult(commonResp)
- }
- func (datahub *DataHub) LeaveGroup(projectName, topicName, consumerGroup, consumerId string, versionId int64) (*LeaveGroupResult, error) {
- if !util.CheckProjectName(projectName) {
- return nil, NewInvalidParameterErrorWithMessage(projectNameInvalid)
- }
- if !util.CheckTopicName(topicName) {
- return nil, NewInvalidParameterErrorWithMessage(topicNameInvalid)
- }
- path := fmt.Sprintf(consumerGroupPath, projectName, topicName, consumerGroup)
- reqPara := &RequestParameter{
- Header: map[string]string{httpHeaderContentType: httpJsonContent},
- }
- lgr := &LeaveGroupRequest{
- Action: "leaveGroup",
- ConsumerId: consumerId,
- VersionId: versionId,
- }
- _, commonResp, err := datahub.Client.Post(path, lgr, reqPara)
- if err != nil {
- return nil, err
- }
- return NewLeaveGroupResult(commonResp)
- }
- func (datahub *DataHub) ListTopicSchema(projectName, topicName string) (*ListTopicSchemaResult, error) {
- if !util.CheckProjectName(projectName) {
- return nil, NewInvalidParameterErrorWithMessage(projectNameInvalid)
- }
- if !util.CheckTopicName(topicName) {
- return nil, NewInvalidParameterErrorWithMessage(topicNameInvalid)
- }
- path := fmt.Sprintf(topicPath, projectName, topicName)
- reqPara := &RequestParameter{
- Header: map[string]string{httpHeaderContentType: httpJsonContent},
- }
- lts := &ListTopicSchemaRequest{
- Action: "ListSchema",
- }
- respBody, commonResp, err := datahub.Client.Post(path, lts, reqPara)
- if err != nil {
- return nil, err
- }
- return NewListTopicSchemaResult(respBody, commonResp)
- }
- func (datahub *DataHub) GetTopicSchemaByVersion(projectName, topicName string, versionId int) (*GetTopicSchemaResult, error) {
- if !util.CheckProjectName(projectName) {
- return nil, NewInvalidParameterErrorWithMessage(projectNameInvalid)
- }
- if !util.CheckTopicName(topicName) {
- return nil, NewInvalidParameterErrorWithMessage(topicNameInvalid)
- }
- path := fmt.Sprintf(topicPath, projectName, topicName)
- reqPara := &RequestParameter{
- Header: map[string]string{httpHeaderContentType: httpJsonContent},
- }
- lts := &GetTopicSchemaRequest{
- Action: "GetSchema",
- VersionId: versionId,
- RecordSchema: nil,
- }
- respBody, commonResp, err := datahub.Client.Post(path, lts, reqPara)
- if err != nil {
- return nil, err
- }
- return NewGetTopicSchemaResult(respBody, commonResp)
- }
- func (datahub *DataHub) GetTopicSchemaBySchema(projectName, topicName string, recordSchema *RecordSchema) (*GetTopicSchemaResult, error) {
- if !util.CheckProjectName(projectName) {
- return nil, NewInvalidParameterErrorWithMessage(projectNameInvalid)
- }
- if !util.CheckTopicName(topicName) {
- return nil, NewInvalidParameterErrorWithMessage(topicNameInvalid)
- }
- path := fmt.Sprintf(topicPath, projectName, topicName)
- reqPara := &RequestParameter{
- Header: map[string]string{httpHeaderContentType: httpJsonContent},
- }
- lts := &GetTopicSchemaRequest{
- Action: "GetSchema",
- VersionId: -1,
- RecordSchema: recordSchema,
- }
- respBody, commonResp, err := datahub.Client.Post(path, lts, reqPara)
- if err != nil {
- return nil, err
- }
- return NewGetTopicSchemaResult(respBody, commonResp)
- }
- func (datahub *DataHub) RegisterTopicSchema(projectName, topicName string, recordSchema *RecordSchema) (*RegisterTopicSchemaResult, error) {
- if !util.CheckProjectName(projectName) {
- return nil, NewInvalidParameterErrorWithMessage(projectNameInvalid)
- }
- if !util.CheckTopicName(topicName) {
- return nil, NewInvalidParameterErrorWithMessage(topicNameInvalid)
- }
- path := fmt.Sprintf(topicPath, projectName, topicName)
- reqPara := &RequestParameter{
- Header: map[string]string{httpHeaderContentType: httpJsonContent},
- }
- lts := &RegisterTopicSchemaRequest{
- Action: "RegisterSchema",
- RecordSchema: recordSchema,
- }
- respBody, commonResp, err := datahub.Client.Post(path, lts, reqPara)
- if err != nil {
- return nil, err
- }
- return NewRegisterTopicSchemaResult(respBody, commonResp)
- }
- func (datahub *DataHub) DeleteTopicSchema(projectName, topicName string, versionId int) (*DeleteTopicSchemaResult, error) {
- if !util.CheckProjectName(projectName) {
- return nil, NewInvalidParameterErrorWithMessage(projectNameInvalid)
- }
- if !util.CheckTopicName(topicName) {
- return nil, NewInvalidParameterErrorWithMessage(topicNameInvalid)
- }
- path := fmt.Sprintf(topicPath, projectName, topicName)
- reqPara := &RequestParameter{
- Header: map[string]string{httpHeaderContentType: httpJsonContent},
- }
- lts := &DeleteTopicSchemaRequest{
- Action: "DeleteSchema",
- VersionId: versionId,
- }
- _, commonResp, err := datahub.Client.Post(path, lts, reqPara)
- if err != nil {
- return nil, err
- }
- return NewDeleteTopicSchemaResult(commonResp)
- }
- func (datahub *DataHub) getSchemaRegistry() *schemaRegistryClient {
- return datahub.schemaClient
- }
- type DataHubPB struct {
- DataHub
- }
- func (datahub *DataHubPB) PutRecords(projectName, topicName string, records []IRecord) (*PutRecordsResult, error) {
- if !util.CheckProjectName(projectName) {
- return nil, NewInvalidParameterErrorWithMessage(projectNameInvalid)
- }
- if !util.CheckTopicName(topicName) {
- return nil, NewInvalidParameterErrorWithMessage(topicNameInvalid)
- }
- path := fmt.Sprintf(shardsPath, projectName, topicName)
- reqPara := &RequestParameter{
- Header: map[string]string{
- httpHeaderContentType: httpProtoContent,
- httpHeaderRequestAction: httpPublistContent},
- }
- prr := &PutPBRecordsRequest{
- Records: records,
- }
- respBody, commonResp, err := datahub.Client.Post(path, prr, reqPara)
- if err != nil {
- return nil, err
- }
- return NewPutPBRecordsResult(respBody, commonResp)
- }
- func (datahub *DataHubPB) PutRecordsByShard(projectName, topicName, shardId string, records []IRecord) (*PutRecordsByShardResult, error) {
- if !util.CheckProjectName(projectName) {
- return nil, NewInvalidParameterErrorWithMessage(projectNameInvalid)
- }
- if !util.CheckTopicName(topicName) {
- return nil, NewInvalidParameterErrorWithMessage(topicNameInvalid)
- }
- if !util.CheckShardId(shardId) {
- return nil, NewInvalidParameterErrorWithMessage(shardIdInvalid)
- }
- path := fmt.Sprintf(shardPath, projectName, topicName, shardId)
- reqPara := &RequestParameter{
- Header: map[string]string{
- httpHeaderContentType: httpProtoContent,
- httpHeaderRequestAction: httpPublistContent},
- }
- prr := &PutPBRecordsRequest{
- Records: records,
- }
- _, commonResp, err := datahub.Client.Post(path, prr, reqPara)
- if err != nil {
- return nil, err
- }
- return NewPutRecordsByShardResult(commonResp)
- }
- func (datahub *DataHubPB) GetTupleRecords(projectName, topicName, shardId, cursor string, limit int, recordSchema *RecordSchema) (*GetRecordsResult, error) {
- if !util.CheckProjectName(projectName) {
- return nil, NewInvalidParameterErrorWithMessage(projectNameInvalid)
- }
- if !util.CheckTopicName(topicName) {
- return nil, NewInvalidParameterErrorWithMessage(topicNameInvalid)
- }
- if !util.CheckShardId(shardId) {
- return nil, NewInvalidParameterErrorWithMessage(shardIdInvalid)
- }
- path := fmt.Sprintf(shardPath, projectName, topicName, shardId)
- reqPara := &RequestParameter{
- Header: map[string]string{
- httpHeaderContentType: httpProtoContent,
- httpHeaderRequestAction: httpSubscribeContent},
- }
- grr := &GetPBRecordRequest{
- Cursor: cursor,
- Limit: limit,
- }
- respBody, commonResp, err := datahub.Client.Post(path, grr, reqPara)
- if err != nil {
- return nil, err
- }
- return NewGetPBRecordsResult(respBody, recordSchema, commonResp)
- }
- func (datahub *DataHubPB) GetBlobRecords(projectName, topicName, shardId, cursor string, limit int) (*GetRecordsResult, error) {
- if !util.CheckProjectName(projectName) {
- return nil, NewInvalidParameterErrorWithMessage(projectNameInvalid)
- }
- if !util.CheckTopicName(topicName) {
- return nil, NewInvalidParameterErrorWithMessage(topicNameInvalid)
- }
- if !util.CheckShardId(shardId) {
- return nil, NewInvalidParameterErrorWithMessage(shardIdInvalid)
- }
- path := fmt.Sprintf(shardPath, projectName, topicName, shardId)
- reqPara := &RequestParameter{
- Header: map[string]string{
- httpHeaderContentType: httpProtoContent,
- httpHeaderRequestAction: httpSubscribeContent},
- }
- grr := &GetPBRecordRequest{
- Cursor: cursor,
- Limit: limit,
- }
- respBody, commonResp, err := datahub.Client.Post(path, grr, reqPara)
- if err != nil {
- return nil, err
- }
- return NewGetPBRecordsResult(respBody, nil, commonResp)
- }
- type DataHubBatch struct {
- DataHub
- }
- func (datahub *DataHubBatch) PutRecords(projectName, topicName string, records []IRecord) (*PutRecordsResult, error) {
- return nil, errors.New("not support this method")
- }
- func (datahub *DataHubBatch) PutRecordsByShard(projectName, topicName, shardId string, records []IRecord) (*PutRecordsByShardResult, error) {
- if !util.CheckProjectName(projectName) {
- return nil, NewInvalidParameterErrorWithMessage(projectNameInvalid)
- }
- if !util.CheckTopicName(topicName) {
- return nil, NewInvalidParameterErrorWithMessage(topicNameInvalid)
- }
- if !util.CheckShardId(shardId) {
- return nil, NewInvalidParameterErrorWithMessage(shardIdInvalid)
- }
- path := fmt.Sprintf(shardPath, projectName, topicName, shardId)
- reqPara := &RequestParameter{
- Header: map[string]string{
- httpHeaderContentType: httpProtoBatchContent,
- httpHeaderRequestAction: httpPublistContent},
- }
- serializer := newBatchSerializer(projectName, topicName, datahub.cType, datahub.schemaClient)
- prr := &PutBatchRecordsRequest{
- serializer: serializer,
- Records: records,
- }
- _, commonResp, err := datahub.Client.Post(path, prr, reqPara)
- if err != nil {
- return nil, err
- }
- return NewPutRecordsByShardResult(commonResp)
- }
- func (datahub *DataHubBatch) GetTupleRecords(projectName, topicName, shardId, cursor string, limit int, recordSchema *RecordSchema) (*GetRecordsResult, error) {
- if !util.CheckProjectName(projectName) {
- return nil, NewInvalidParameterErrorWithMessage(projectNameInvalid)
- }
- if !util.CheckTopicName(topicName) {
- return nil, NewInvalidParameterErrorWithMessage(topicNameInvalid)
- }
- if !util.CheckShardId(shardId) {
- return nil, NewInvalidParameterErrorWithMessage(shardIdInvalid)
- }
- path := fmt.Sprintf(shardPath, projectName, topicName, shardId)
- reqPara := &RequestParameter{
- Header: map[string]string{
- httpHeaderContentType: httpProtoBatchContent,
- httpHeaderRequestAction: httpSubscribeContent},
- }
- gbr := &GetBatchRecordRequest{
- GetPBRecordRequest{
- Cursor: cursor,
- Limit: limit,
- },
- }
- respBody, commonResp, err := datahub.Client.Post(path, gbr, reqPara)
- if err != nil {
- return nil, err
- }
- deserializer := newBatchDeserializer(projectName, topicName, shardId, recordSchema, datahub.schemaClient)
- return NewGetBatchRecordsResult(respBody, recordSchema, commonResp, deserializer)
- }
- func (datahub *DataHubBatch) GetBlobRecords(projectName, topicName, shardId, cursor string, limit int) (*GetRecordsResult, error) {
- return datahub.GetTupleRecords(projectName, topicName, shardId, cursor, limit, nil)
- }
|