12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277 |
- package tablestore
- import (
- "bytes"
- "crypto/md5"
- "encoding/base64"
- "fmt"
- "github.com/aliyun/aliyun-tablestore-go-sdk/tablestore/otsprotocol"
- "github.com/golang/protobuf/proto"
- "io"
- "math/rand"
- "net"
- "net/http"
- "strings"
- "time"
- )
- const (
- userAgent = "aliyun-tablestore-sdk-golang/4.0.2"
- createTableUri = "/CreateTable"
- listTableUri = "/ListTable"
- deleteTableUri = "/DeleteTable"
- describeTableUri = "/DescribeTable"
- updateTableUri = "/UpdateTable"
- putRowUri = "/PutRow"
- deleteRowUri = "/DeleteRow"
- getRowUri = "/GetRow"
- updateRowUri = "/UpdateRow"
- batchGetRowUri = "/BatchGetRow"
- batchWriteRowUri = "/BatchWriteRow"
- getRangeUri = "/GetRange"
- listStreamUri = "/ListStream"
- describeStreamUri = "/DescribeStream"
- getShardIteratorUri = "/GetShardIterator"
- getStreamRecordUri = "/GetStreamRecord"
- computeSplitPointsBySizeRequestUri = "/ComputeSplitPointsBySize"
- searchUri = "/Search"
- createSearchIndexUri = "/CreateSearchIndex"
- listSearchIndexUri = "/ListSearchIndex"
- deleteSearchIndexUri = "/DeleteSearchIndex"
- describeSearchIndexUri = "/DescribeSearchIndex"
- createIndexUri = "/CreateIndex"
- dropIndexUri = "/DropIndex"
- createlocaltransactionuri = "/StartLocalTransaction"
- committransactionuri = "/CommitTransaction"
- aborttransactionuri = "/AbortTransaction"
- )
- // Constructor: to create the client of TableStore service.
- // 构造函数:创建表格存储服务的客户端。
- //
- // @param endPoint The address of TableStore service. 表格存储服务地址。
- // @param instanceName
- // @param accessId The Access ID. 用于标示用户的ID。
- // @param accessKey The Access Key. 用于签名和验证的密钥。
- // @param options set client config
- func NewClient(endPoint, instanceName, accessKeyId, accessKeySecret string, options ...ClientOption) *TableStoreClient {
- client := NewClientWithConfig(endPoint, instanceName, accessKeyId, accessKeySecret, "", nil)
- // client options parse
- for _, option := range options {
- option(client)
- }
- return client
- }
- type GetHttpClient func() IHttpClient
- var currentGetHttpClientFunc GetHttpClient = func() IHttpClient {
- return &TableStoreHttpClient{}
- }
- // Constructor: to create the client of OTS service. 传入config
- // 构造函数:创建OTS服务的客户端。
- func NewClientWithConfig(endPoint, instanceName, accessKeyId, accessKeySecret string, securityToken string, config *TableStoreConfig) *TableStoreClient {
- tableStoreClient := new(TableStoreClient)
- tableStoreClient.endPoint = endPoint
- tableStoreClient.instanceName = instanceName
- tableStoreClient.accessKeyId = accessKeyId
- tableStoreClient.accessKeySecret = accessKeySecret
- tableStoreClient.securityToken = securityToken
- if config == nil {
- config = NewDefaultTableStoreConfig()
- }
- tableStoreClient.config = config
- var tableStoreTransportProxy http.RoundTripper
- if config.Transport != nil {
- tableStoreTransportProxy = config.Transport
- } else {
- tableStoreTransportProxy = &http.Transport{
- MaxIdleConnsPerHost: config.MaxIdleConnections,
- Dial: (&net.Dialer{
- Timeout: config.HTTPTimeout.ConnectionTimeout,
- }).Dial,
- }
- }
- tableStoreClient.httpClient = currentGetHttpClientFunc()
- httpClient := &http.Client{
- Transport: tableStoreTransportProxy,
- Timeout: tableStoreClient.config.HTTPTimeout.RequestTimeout,
- }
- tableStoreClient.httpClient.New(httpClient)
- tableStoreClient.random = rand.New(rand.NewSource(time.Now().Unix()))
- return tableStoreClient
- }
- func NewClientWithExternalHeader(endPoint, instanceName, accessKeyId, accessKeySecret string, securityToken string, config *TableStoreConfig, header map[string]string) *TableStoreClient {
- tableStoreClient := NewClientWithConfig(endPoint, instanceName, accessKeyId, accessKeySecret, securityToken, config)
- tableStoreClient.externalHeader = header
- return tableStoreClient
- }
- // 请求服务端
- func (tableStoreClient *TableStoreClient) doRequestWithRetry(uri string, req, resp proto.Message, responseInfo *ResponseInfo) error {
- end := time.Now().Add(tableStoreClient.config.MaxRetryTime)
- url := fmt.Sprintf("%s%s", tableStoreClient.endPoint, uri)
- /* request body */
- var body []byte
- var err error
- if req != nil {
- body, err = proto.Marshal(req)
- if err != nil {
- return err
- }
- } else {
- body = nil
- }
- var value int64
- var i uint
- var respBody []byte
- var requestId string
- for i = 0; ; i++ {
- respBody, err, requestId = tableStoreClient.doRequest(url, uri, body, resp)
- responseInfo.RequestId = requestId
- if err == nil {
- break
- } else {
- value = getNextPause(tableStoreClient, err, i, end, value, uri)
- // fmt.Println("hit retry", uri, err, *e.Code, value)
- if value <= 0 {
- return err
- }
- time.Sleep(time.Duration(value) * time.Millisecond)
- }
- }
- if respBody == nil || len(respBody) == 0 {
- return nil
- }
- err = proto.Unmarshal(respBody, resp)
- if err != nil {
- return fmt.Errorf("decode resp failed: %s", err)
- }
- return nil
- }
- func getNextPause(tableStoreClient *TableStoreClient, err error, count uint, end time.Time, lastInterval int64, action string) int64 {
- if tableStoreClient.config.RetryTimes <= count || time.Now().After(end) {
- return 0
- }
- var retry bool
- if otsErr, ok := err.(*OtsError); ok {
- retry = shouldRetry(tableStoreClient, otsErr.Code, otsErr.Message, action, otsErr.HttpStatusCode)
- } else {
- if err == io.EOF || err == io.ErrUnexpectedEOF || //retry on special net error contains EOF or reset
- strings.Contains(err.Error(), io.EOF.Error()) ||
- strings.Contains(err.Error(), "Connection reset by peer") ||
- strings.Contains(err.Error(), "connection reset by peer") {
- retry = true
- } else if nErr, ok := err.(net.Error); ok {
- retry = nErr.Temporary()
- }
- }
- if retry {
- value := lastInterval*2 + tableStoreClient.random.Int63n(DefaultRetryInterval-1) + 1
- if value > MaxRetryInterval {
- value = MaxRetryInterval
- }
- return value
- }
- return 0
- }
- func shouldRetry(tableStoreClient *TableStoreClient, errorCode string, errorMsg string, action string, statusCode int) bool {
- if tableStoreClient.CustomizedRetryFunc != nil {
- if tableStoreClient.CustomizedRetryFunc(errorCode, errorMsg, action, statusCode) == true {
- return true
- }
- }
- if retryNotMatterActions(errorCode, errorMsg) == true {
- return true
- }
- if isIdempotent(action) &&
- (errorCode == STORAGE_TIMEOUT || errorCode == INTERNAL_SERVER_ERROR || errorCode == SERVER_UNAVAILABLE) {
- return true
- }
- return false
- }
- type CustomizedRetryNotMatterActions func(errorCode string, errorMsg string, action string, httpStatus int) bool
- func retryNotMatterActions(errorCode string, errorMsg string) bool {
- if errorCode == ROW_OPERATION_CONFLICT || errorCode == NOT_ENOUGH_CAPACITY_UNIT ||
- errorCode == TABLE_NOT_READY || errorCode == PARTITION_UNAVAILABLE ||
- errorCode == SERVER_BUSY || errorCode == STORAGE_SERVER_BUSY || (errorCode == QUOTA_EXHAUSTED && errorMsg == "Too frequent table operations.") {
- return true
- } else {
- return false
- }
- }
- func isIdempotent(action string) bool {
- if action == batchGetRowUri || action == describeTableUri ||
- action == getRangeUri || action == getRowUri ||
- action == listTableUri || action == listStreamUri ||
- action == getStreamRecordUri || action == describeStreamUri {
- return true
- } else {
- return false
- }
- }
- func (tableStoreClient *TableStoreClient) doRequest(url string, uri string, body []byte, resp proto.Message) ([]byte, error, string) {
- hreq, err := http.NewRequest("POST", url, bytes.NewBuffer(body))
- if err != nil {
- return nil, err, ""
- }
- /* set headers */
- hreq.Header.Set("User-Agent", userAgent)
- date := time.Now().UTC().Format(xOtsDateFormat)
- hreq.Header.Set(xOtsDate, date)
- hreq.Header.Set(xOtsApiversion, ApiVersion)
- hreq.Header.Set(xOtsAccesskeyid, tableStoreClient.accessKeyId)
- hreq.Header.Set(xOtsInstanceName, tableStoreClient.instanceName)
- for key, value := range tableStoreClient.externalHeader {
- hreq.Header[key] = []string{value}
- }
- md5Byte := md5.Sum(body)
- md5Base64 := base64.StdEncoding.EncodeToString(md5Byte[:16])
- hreq.Header.Set(xOtsContentmd5, md5Base64)
- otshead := createOtsHeaders(tableStoreClient.accessKeySecret)
- otshead.set(xOtsDate, date)
- otshead.set(xOtsApiversion, ApiVersion)
- otshead.set(xOtsAccesskeyid, tableStoreClient.accessKeyId)
- if tableStoreClient.securityToken != "" {
- hreq.Header.Set(xOtsHeaderStsToken, tableStoreClient.securityToken)
- otshead.set(xOtsHeaderStsToken, tableStoreClient.securityToken)
- }
- otshead.set(xOtsContentmd5, md5Base64)
- otshead.set(xOtsInstanceName, tableStoreClient.instanceName)
- for key, value := range tableStoreClient.externalHeader {
- if strings.HasPrefix(key, xOtsPrefix) {
- otshead.set(key, value)
- }
- }
- sign, err := otshead.signature(uri, "POST", tableStoreClient.accessKeySecret)
- if err != nil {
- return nil, err, ""
- }
- hreq.Header.Set(xOtsSignature, sign)
- /* end set headers */
- return tableStoreClient.postReq(hreq, url)
- }
- // table API
- // Create a table with the CreateTableRequest, in which the table name and
- // primary keys are required.
- // 根据CreateTableRequest创建一个表,其中表名和主健列是必选项
- //
- // @param request of CreateTableRequest.
- // @return Void. 无返回值。
- func (tableStoreClient *TableStoreClient) CreateTable(request *CreateTableRequest) (*CreateTableResponse, error) {
- if len(request.TableMeta.TableName) > maxTableNameLength {
- return nil, errTableNameTooLong(request.TableMeta.TableName)
- }
- if len(request.TableMeta.SchemaEntry) > maxPrimaryKeyNum {
- return nil, errPrimaryKeyTooMuch
- }
- if len(request.TableMeta.SchemaEntry) == 0 {
- return nil, errCreateTableNoPrimaryKey
- }
- req := new(otsprotocol.CreateTableRequest)
- req.TableMeta = new(otsprotocol.TableMeta)
- req.TableMeta.TableName = proto.String(request.TableMeta.TableName)
- if len(request.TableMeta.DefinedColumns) > 0 {
- for _, value := range request.TableMeta.DefinedColumns {
- req.TableMeta.DefinedColumn = append(req.TableMeta.DefinedColumn, &otsprotocol.DefinedColumnSchema{Name: &value.Name, Type: value.ColumnType.ConvertToPbDefinedColumnType().Enum()})
- }
- }
- if len(request.IndexMetas) > 0 {
- for _, value := range request.IndexMetas {
- req.IndexMetas = append(req.IndexMetas, value.ConvertToPbIndexMeta())
- }
- }
- for _, key := range request.TableMeta.SchemaEntry {
- keyType := otsprotocol.PrimaryKeyType(*key.Type)
- if key.Option != nil {
- keyOption := otsprotocol.PrimaryKeyOption(*key.Option)
- req.TableMeta.PrimaryKey = append(req.TableMeta.PrimaryKey, &otsprotocol.PrimaryKeySchema{Name: key.Name, Type: &keyType, Option: &keyOption})
- } else {
- req.TableMeta.PrimaryKey = append(req.TableMeta.PrimaryKey, &otsprotocol.PrimaryKeySchema{Name: key.Name, Type: &keyType})
- }
- }
- req.ReservedThroughput = new(otsprotocol.ReservedThroughput)
- req.ReservedThroughput.CapacityUnit = new(otsprotocol.CapacityUnit)
- req.ReservedThroughput.CapacityUnit.Read = proto.Int32(int32(request.ReservedThroughput.Readcap))
- req.ReservedThroughput.CapacityUnit.Write = proto.Int32(int32(request.ReservedThroughput.Writecap))
- req.TableOptions = new(otsprotocol.TableOptions)
- req.TableOptions.TimeToLive = proto.Int32(int32(request.TableOption.TimeToAlive))
- req.TableOptions.MaxVersions = proto.Int32(int32(request.TableOption.MaxVersion))
- if request.TableOption.DeviationCellVersionInSec > 0 {
- req.TableOptions.DeviationCellVersionInSec = proto.Int64(request.TableOption.DeviationCellVersionInSec)
- }
- if request.StreamSpec != nil {
- var ss otsprotocol.StreamSpecification
- if request.StreamSpec.EnableStream {
- ss = otsprotocol.StreamSpecification{
- EnableStream: &request.StreamSpec.EnableStream,
- ExpirationTime: &request.StreamSpec.ExpirationTime}
- } else {
- ss = otsprotocol.StreamSpecification{
- EnableStream: &request.StreamSpec.EnableStream}
- }
- req.StreamSpec = &ss
- }
- resp := new(otsprotocol.CreateTableResponse)
- response := &CreateTableResponse{}
- if err := tableStoreClient.doRequestWithRetry(createTableUri, req, resp, &response.ResponseInfo); err != nil {
- return nil, err
- }
- return response, nil
- }
- func (tableStoreClient *TableStoreClient) CreateIndex(request *CreateIndexRequest) (*CreateIndexResponse, error) {
- if len(request.MainTableName) > maxTableNameLength {
- return nil, errTableNameTooLong(request.MainTableName)
- }
- req := new(otsprotocol.CreateIndexRequest)
- req.IndexMeta = request.IndexMeta.ConvertToPbIndexMeta()
- req.IncludeBaseData = proto.Bool(request.IncludeBaseData)
- req.MainTableName = proto.String(request.MainTableName)
- resp := new(otsprotocol.CreateIndexResponse)
- response := &CreateIndexResponse{}
- if err := tableStoreClient.doRequestWithRetry(createIndexUri, req, resp, &response.ResponseInfo); err != nil {
- return nil, err
- }
- return response, nil
- }
- func (tableStoreClient *TableStoreClient) DeleteIndex(request *DeleteIndexRequest) (*DeleteIndexResponse, error) {
- if len(request.MainTableName) > maxTableNameLength {
- return nil, errTableNameTooLong(request.MainTableName)
- }
- req := new(otsprotocol.DropIndexRequest)
- req.IndexName = proto.String(request.IndexName)
- req.MainTableName = proto.String(request.MainTableName)
- resp := new(otsprotocol.DropIndexResponse)
- response := &DeleteIndexResponse{}
- if err := tableStoreClient.doRequestWithRetry(dropIndexUri, req, resp, &response.ResponseInfo); err != nil {
- return nil, err
- }
- return response, nil
- }
- // List all tables. If done, all table names will be returned.
- // 列出所有的表,如果操作成功,将返回所有表的名称。
- //
- // @param tableNames The returned table names. 返回的表名集合。
- // @return Void. 无返回值。
- func (tableStoreClient *TableStoreClient) ListTable() (*ListTableResponse, error) {
- resp := new(otsprotocol.ListTableResponse)
- response := &ListTableResponse{}
- if err := tableStoreClient.doRequestWithRetry(listTableUri, nil, resp, &response.ResponseInfo); err != nil {
- return response, err
- }
- response.TableNames = resp.TableNames
- return response, nil
- }
- // Delete a table and all its views will be deleted.
- // 删除一个表
- //
- // @param tableName The table name. 表名。
- // @return Void. 无返回值。
- func (tableStoreClient *TableStoreClient) DeleteTable(request *DeleteTableRequest) (*DeleteTableResponse, error) {
- req := new(otsprotocol.DeleteTableRequest)
- req.TableName = proto.String(request.TableName)
- response := &DeleteTableResponse{}
- if err := tableStoreClient.doRequestWithRetry(deleteTableUri, req, nil, &response.ResponseInfo); err != nil {
- return nil, err
- }
- return response, nil
- }
- // Query the tablemeta, tableoption and reservedthroughtputdetails
- // @param DescribeTableRequest
- // @param DescribeTableResponse
- func (tableStoreClient *TableStoreClient) DescribeTable(request *DescribeTableRequest) (*DescribeTableResponse, error) {
- req := new(otsprotocol.DescribeTableRequest)
- req.TableName = proto.String(request.TableName)
- resp := new(otsprotocol.DescribeTableResponse)
- response := new(DescribeTableResponse)
- if err := tableStoreClient.doRequestWithRetry(describeTableUri, req, resp, &response.ResponseInfo); err != nil {
- return &DescribeTableResponse{}, err
- }
- response.ReservedThroughput = &ReservedThroughput{Readcap: int(*(resp.ReservedThroughputDetails.CapacityUnit.Read)), Writecap: int(*(resp.ReservedThroughputDetails.CapacityUnit.Write))}
- responseTableMeta := new(TableMeta)
- responseTableMeta.TableName = *resp.TableMeta.TableName
- for _, key := range resp.TableMeta.PrimaryKey {
- keyType := PrimaryKeyType(*key.Type)
- // enable it when we support kep option in describe table
- if key.Option != nil {
- keyOption := PrimaryKeyOption(*key.Option)
- responseTableMeta.SchemaEntry = append(responseTableMeta.SchemaEntry, &PrimaryKeySchema{Name: key.Name, Type: &keyType, Option: &keyOption})
- } else {
- responseTableMeta.SchemaEntry = append(responseTableMeta.SchemaEntry, &PrimaryKeySchema{Name: key.Name, Type: &keyType})
- }
- }
- response.TableMeta = responseTableMeta
- response.TableOption = &TableOption{TimeToAlive: int(*resp.TableOptions.TimeToLive), MaxVersion: int(*resp.TableOptions.MaxVersions), DeviationCellVersionInSec: *resp.TableOptions.DeviationCellVersionInSec}
- if resp.StreamDetails != nil && *resp.StreamDetails.EnableStream {
- response.StreamDetails = &StreamDetails{
- EnableStream: *resp.StreamDetails.EnableStream,
- StreamId: (*StreamId)(resp.StreamDetails.StreamId),
- ExpirationTime: *resp.StreamDetails.ExpirationTime,
- LastEnableTime: *resp.StreamDetails.LastEnableTime}
- } else {
- response.StreamDetails = &StreamDetails{
- EnableStream: false}
- }
- for _, meta := range resp.IndexMetas {
- response.IndexMetas = append(response.IndexMetas, ConvertPbIndexMetaToIndexMeta(meta))
- }
- return response, nil
- }
- // Update the table info includes tableoptions and reservedthroughput
- // @param UpdateTableRequest
- // @param UpdateTableResponse
- func (tableStoreClient *TableStoreClient) UpdateTable(request *UpdateTableRequest) (*UpdateTableResponse, error) {
- req := new(otsprotocol.UpdateTableRequest)
- req.TableName = proto.String(request.TableName)
- if request.ReservedThroughput != nil {
- req.ReservedThroughput = new(otsprotocol.ReservedThroughput)
- req.ReservedThroughput.CapacityUnit = new(otsprotocol.CapacityUnit)
- req.ReservedThroughput.CapacityUnit.Read = proto.Int32(int32(request.ReservedThroughput.Readcap))
- req.ReservedThroughput.CapacityUnit.Write = proto.Int32(int32(request.ReservedThroughput.Writecap))
- }
- if request.TableOption != nil {
- req.TableOptions = new(otsprotocol.TableOptions)
- req.TableOptions.TimeToLive = proto.Int32(int32(request.TableOption.TimeToAlive))
- req.TableOptions.MaxVersions = proto.Int32(int32(request.TableOption.MaxVersion))
- if request.TableOption.DeviationCellVersionInSec > 0 {
- req.TableOptions.DeviationCellVersionInSec = proto.Int64(request.TableOption.DeviationCellVersionInSec)
- }
- }
- if request.StreamSpec != nil {
- if request.StreamSpec.EnableStream == true {
- req.StreamSpec = &otsprotocol.StreamSpecification{
- EnableStream: &request.StreamSpec.EnableStream,
- ExpirationTime: &request.StreamSpec.ExpirationTime}
- } else {
- req.StreamSpec = &otsprotocol.StreamSpecification{EnableStream: &request.StreamSpec.EnableStream}
- }
- }
- resp := new(otsprotocol.UpdateTableResponse)
- response := new(UpdateTableResponse)
- if err := tableStoreClient.doRequestWithRetry(updateTableUri, req, resp, &response.ResponseInfo); err != nil {
- return nil, err
- }
- response.ReservedThroughput = &ReservedThroughput{
- Readcap: int(*(resp.ReservedThroughputDetails.CapacityUnit.Read)),
- Writecap: int(*(resp.ReservedThroughputDetails.CapacityUnit.Write))}
- response.TableOption = &TableOption{
- TimeToAlive: int(*resp.TableOptions.TimeToLive),
- MaxVersion: int(*resp.TableOptions.MaxVersions),
- DeviationCellVersionInSec: *resp.TableOptions.DeviationCellVersionInSec}
- if *resp.StreamDetails.EnableStream {
- response.StreamDetails = &StreamDetails{
- EnableStream: *resp.StreamDetails.EnableStream,
- StreamId: (*StreamId)(resp.StreamDetails.StreamId),
- ExpirationTime: *resp.StreamDetails.ExpirationTime,
- LastEnableTime: *resp.StreamDetails.LastEnableTime}
- } else {
- response.StreamDetails = &StreamDetails{
- EnableStream: false}
- }
- return response, nil
- }
- // Put or update a row in a table. The operation is determined by CheckingType,
- // which has three options: NO, UPDATE, INSERT. The transaction id is optional.
- // 插入或更新行数据。操作针对数据的存在性包含三种检查类型:NO(不检查),UPDATE
- // (更新,数据必须存在)和INSERT(插入,数据必须不存在)。事务ID是可选项。
- //
- // @param builder The builder for putting a row. 插入或更新数据的Builder。
- // @return Void. 无返回值。
- func (tableStoreClient *TableStoreClient) PutRow(request *PutRowRequest) (*PutRowResponse, error) {
- if request == nil {
- return nil, nil
- }
- if request.PutRowChange == nil {
- return nil, nil
- }
- req := new(otsprotocol.PutRowRequest)
- req.TableName = proto.String(request.PutRowChange.TableName)
- req.Row = request.PutRowChange.Serialize()
- condition := new(otsprotocol.Condition)
- condition.RowExistence = request.PutRowChange.Condition.buildCondition()
- if request.PutRowChange.Condition.ColumnCondition != nil {
- condition.ColumnCondition = request.PutRowChange.Condition.ColumnCondition.Serialize()
- }
- if request.PutRowChange.ReturnType == ReturnType_RT_PK {
- content := otsprotocol.ReturnContent{ReturnType: otsprotocol.ReturnType_RT_PK.Enum()}
- req.ReturnContent = &content
- }
- if request.PutRowChange.TransactionId != nil {
- req.TransactionId = request.PutRowChange.TransactionId
- }
- req.Condition = condition
- resp := new(otsprotocol.PutRowResponse)
- response := &PutRowResponse{}
- if err := tableStoreClient.doRequestWithRetry(putRowUri, req, resp, &response.ResponseInfo); err != nil {
- return nil, err
- }
- response.ConsumedCapacityUnit = &ConsumedCapacityUnit{}
- response.ConsumedCapacityUnit.Read = *resp.Consumed.CapacityUnit.Read
- response.ConsumedCapacityUnit.Write = *resp.Consumed.CapacityUnit.Write
- if request.PutRowChange.ReturnType == ReturnType_RT_PK {
- rows, err := readRowsWithHeader(bytes.NewReader(resp.Row))
- if err != nil {
- return response, err
- }
- for _, pk := range rows[0].primaryKey {
- pkColumn := &PrimaryKeyColumn{ColumnName: string(pk.cellName), Value: pk.cellValue.Value}
- response.PrimaryKey.PrimaryKeys = append(response.PrimaryKey.PrimaryKeys, pkColumn)
- }
- }
- return response, nil
- }
- // Delete row with pk
- // @param DeleteRowRequest
- func (tableStoreClient *TableStoreClient) DeleteRow(request *DeleteRowRequest) (*DeleteRowResponse, error) {
- req := new(otsprotocol.DeleteRowRequest)
- req.TableName = proto.String(request.DeleteRowChange.TableName)
- req.Condition = request.DeleteRowChange.getCondition()
- req.PrimaryKey = request.DeleteRowChange.PrimaryKey.Build(true)
- if request.DeleteRowChange.TransactionId != nil {
- req.TransactionId = request.DeleteRowChange.TransactionId
- }
- resp := new(otsprotocol.DeleteRowResponse)
- response := &DeleteRowResponse{}
- if err := tableStoreClient.doRequestWithRetry(deleteRowUri, req, resp, &response.ResponseInfo); err != nil {
- return nil, err
- }
- response.ConsumedCapacityUnit = &ConsumedCapacityUnit{}
- response.ConsumedCapacityUnit.Read = *resp.Consumed.CapacityUnit.Read
- response.ConsumedCapacityUnit.Write = *resp.Consumed.CapacityUnit.Write
- return response, nil
- }
- // row API
- // Get the data of a row or some columns.
- //
- // @param getrowrequest
- func (tableStoreClient *TableStoreClient) GetRow(request *GetRowRequest) (*GetRowResponse, error) {
- req := new(otsprotocol.GetRowRequest)
- resp := new(otsprotocol.GetRowResponse)
- req.TableName = proto.String(request.SingleRowQueryCriteria.TableName)
- if (request.SingleRowQueryCriteria.getColumnsToGet() != nil) && len(request.SingleRowQueryCriteria.getColumnsToGet()) > 0 {
- req.ColumnsToGet = request.SingleRowQueryCriteria.getColumnsToGet()
- }
- req.PrimaryKey = request.SingleRowQueryCriteria.PrimaryKey.Build(false)
- if request.SingleRowQueryCriteria.MaxVersion != 0 {
- req.MaxVersions = proto.Int32(int32(request.SingleRowQueryCriteria.MaxVersion))
- }
- if request.SingleRowQueryCriteria.TransactionId != nil {
- req.TransactionId = request.SingleRowQueryCriteria.TransactionId
- }
- if request.SingleRowQueryCriteria.StartColumn != nil {
- req.StartColumn = request.SingleRowQueryCriteria.StartColumn
- }
- if request.SingleRowQueryCriteria.EndColumn != nil {
- req.EndColumn = request.SingleRowQueryCriteria.EndColumn
- }
- if request.SingleRowQueryCriteria.TimeRange != nil {
- if request.SingleRowQueryCriteria.TimeRange.Specific != 0 {
- req.TimeRange = &otsprotocol.TimeRange{SpecificTime: proto.Int64(request.SingleRowQueryCriteria.TimeRange.Specific)}
- } else {
- req.TimeRange = &otsprotocol.TimeRange{StartTime: proto.Int64(request.SingleRowQueryCriteria.TimeRange.Start), EndTime: proto.Int64(request.SingleRowQueryCriteria.TimeRange.End)}
- }
- } else if request.SingleRowQueryCriteria.MaxVersion == 0 {
- return nil, errInvalidInput
- }
- if request.SingleRowQueryCriteria.Filter != nil {
- req.Filter = request.SingleRowQueryCriteria.Filter.Serialize()
- }
- response := &GetRowResponse{ConsumedCapacityUnit: &ConsumedCapacityUnit{}}
- if err := tableStoreClient.doRequestWithRetry(getRowUri, req, resp, &response.ResponseInfo); err != nil {
- return nil, err
- }
- response.ConsumedCapacityUnit.Read = *resp.Consumed.CapacityUnit.Read
- response.ConsumedCapacityUnit.Write = *resp.Consumed.CapacityUnit.Write
- if len(resp.Row) == 0 {
- return response, nil
- }
- rows, err := readRowsWithHeader(bytes.NewReader(resp.Row))
- if err != nil {
- return nil, err
- }
- for _, pk := range rows[0].primaryKey {
- pkColumn := &PrimaryKeyColumn{ColumnName: string(pk.cellName), Value: pk.cellValue.Value}
- response.PrimaryKey.PrimaryKeys = append(response.PrimaryKey.PrimaryKeys, pkColumn)
- }
- for _, cell := range rows[0].cells {
- dataColumn := &AttributeColumn{ColumnName: string(cell.cellName), Value: cell.cellValue.Value, Timestamp: cell.cellTimestamp}
- response.Columns = append(response.Columns, dataColumn)
- }
- return response, nil
- }
- // Update row
- // @param UpdateRowRequest
- func (tableStoreClient *TableStoreClient) UpdateRow(request *UpdateRowRequest) (*UpdateRowResponse, error) {
- req := new(otsprotocol.UpdateRowRequest)
- resp := new(otsprotocol.UpdateRowResponse)
- req.TableName = proto.String(request.UpdateRowChange.TableName)
- req.Condition = request.UpdateRowChange.getCondition()
- req.RowChange = request.UpdateRowChange.Serialize()
- if request.UpdateRowChange.TransactionId != nil {
- req.TransactionId = request.UpdateRowChange.TransactionId
- }
- response := &UpdateRowResponse{ConsumedCapacityUnit: &ConsumedCapacityUnit{}}
- if request.UpdateRowChange.ReturnType == ReturnType_RT_AFTER_MODIFY {
- content := otsprotocol.ReturnContent{ReturnType: otsprotocol.ReturnType_RT_AFTER_MODIFY.Enum()}
- for _, column := range request.UpdateRowChange.ColumnNamesToReturn {
- content.ReturnColumnNames = append(content.ReturnColumnNames, column)
- }
- req.ReturnContent = &content
- }
- if err := tableStoreClient.doRequestWithRetry(updateRowUri, req, resp, &response.ResponseInfo); err != nil {
- return nil, err
- }
- if request.UpdateRowChange.ReturnType == ReturnType_RT_AFTER_MODIFY {
- plainbufferRow, err := readRowsWithHeader(bytes.NewReader(resp.Row))
- if err != nil {
- return response, err
- }
- for _, cell := range plainbufferRow[0].cells {
- attribute := &AttributeColumn{ColumnName: string(cell.cellName), Value: cell.cellValue.Value, Timestamp: cell.cellTimestamp}
- response.Columns = append(response.Columns, attribute)
- }
- }
- response.ConsumedCapacityUnit.Read = *resp.Consumed.CapacityUnit.Read
- response.ConsumedCapacityUnit.Write = *resp.Consumed.CapacityUnit.Write
- return response, nil
- }
- // Batch Get Row
- // @param BatchGetRowRequest
- func (tableStoreClient *TableStoreClient) BatchGetRow(request *BatchGetRowRequest) (*BatchGetRowResponse, error) {
- req := new(otsprotocol.BatchGetRowRequest)
- var tablesInBatch []*otsprotocol.TableInBatchGetRowRequest
- for _, Criteria := range request.MultiRowQueryCriteria {
- table := new(otsprotocol.TableInBatchGetRowRequest)
- table.TableName = proto.String(Criteria.TableName)
- table.ColumnsToGet = Criteria.ColumnsToGet
- if Criteria.StartColumn != nil {
- table.StartColumn = Criteria.StartColumn
- }
- if Criteria.EndColumn != nil {
- table.EndColumn = Criteria.EndColumn
- }
- if Criteria.Filter != nil {
- table.Filter = Criteria.Filter.Serialize()
- }
- if Criteria.MaxVersion != 0 {
- table.MaxVersions = proto.Int32(int32(Criteria.MaxVersion))
- }
- if Criteria.TimeRange != nil {
- if Criteria.TimeRange.Specific != 0 {
- table.TimeRange = &otsprotocol.TimeRange{SpecificTime: proto.Int64(Criteria.TimeRange.Specific)}
- } else {
- table.TimeRange = &otsprotocol.TimeRange{StartTime: proto.Int64(Criteria.TimeRange.Start), EndTime: proto.Int64(Criteria.TimeRange.End)}
- }
- } else if Criteria.MaxVersion == 0 {
- return nil, errInvalidInput
- }
- for _, pk := range Criteria.PrimaryKey {
- pkWithBytes := pk.Build(false)
- table.PrimaryKey = append(table.PrimaryKey, pkWithBytes)
- }
- tablesInBatch = append(tablesInBatch, table)
- }
- req.Tables = tablesInBatch
- resp := new(otsprotocol.BatchGetRowResponse)
- response := &BatchGetRowResponse{TableToRowsResult: make(map[string][]RowResult)}
- if err := tableStoreClient.doRequestWithRetry(batchGetRowUri, req, resp, &response.ResponseInfo); err != nil {
- return nil, err
- }
- for _, table := range resp.Tables {
- index := int32(0)
- for _, row := range table.Rows {
- rowResult := &RowResult{TableName: *table.TableName, IsSucceed: *row.IsOk, ConsumedCapacityUnit: &ConsumedCapacityUnit{}, Index: index}
- index++
- if *row.IsOk == false {
- rowResult.Error = Error{Code: *row.Error.Code, Message: *row.Error.Message}
- } else {
- // len == 0 means row not exist
- if len(row.Row) > 0 {
- rows, err := readRowsWithHeader(bytes.NewReader(row.Row))
- if err != nil {
- return nil, err
- }
- for _, pk := range rows[0].primaryKey {
- pkColumn := &PrimaryKeyColumn{ColumnName: string(pk.cellName), Value: pk.cellValue.Value}
- rowResult.PrimaryKey.PrimaryKeys = append(rowResult.PrimaryKey.PrimaryKeys, pkColumn)
- }
- for _, cell := range rows[0].cells {
- dataColumn := &AttributeColumn{ColumnName: string(cell.cellName), Value: cell.cellValue.Value, Timestamp: cell.cellTimestamp}
- rowResult.Columns = append(rowResult.Columns, dataColumn)
- }
- }
- rowResult.ConsumedCapacityUnit.Read = *row.Consumed.CapacityUnit.Read
- rowResult.ConsumedCapacityUnit.Write = *row.Consumed.CapacityUnit.Write
- }
- response.TableToRowsResult[*table.TableName] = append(response.TableToRowsResult[*table.TableName], *rowResult)
- }
- }
- return response, nil
- }
- // Batch Write Row
- // @param BatchWriteRowRequest
- func (tableStoreClient *TableStoreClient) BatchWriteRow(request *BatchWriteRowRequest) (*BatchWriteRowResponse, error) {
- req := new(otsprotocol.BatchWriteRowRequest)
- var tablesInBatch []*otsprotocol.TableInBatchWriteRowRequest
- for key, value := range request.RowChangesGroupByTable {
- table := new(otsprotocol.TableInBatchWriteRowRequest)
- table.TableName = proto.String(key)
- for _, row := range value {
- rowInBatch := &otsprotocol.RowInBatchWriteRowRequest{}
- rowInBatch.Condition = row.getCondition()
- rowInBatch.RowChange = row.Serialize()
- rowInBatch.Type = row.getOperationType().Enum()
- table.Rows = append(table.Rows, rowInBatch)
- }
- tablesInBatch = append(tablesInBatch, table)
- }
- req.Tables = tablesInBatch
- resp := new(otsprotocol.BatchWriteRowResponse)
- response := &BatchWriteRowResponse{TableToRowsResult: make(map[string][]RowResult)}
- if err := tableStoreClient.doRequestWithRetry(batchWriteRowUri, req, resp, &response.ResponseInfo); err != nil {
- return nil, err
- }
- for _, table := range resp.Tables {
- index := int32(0)
- for _, row := range table.Rows {
- rowResult := &RowResult{TableName: *table.TableName, IsSucceed: *row.IsOk, ConsumedCapacityUnit: &ConsumedCapacityUnit{}, Index: index}
- index++
- if *row.IsOk == false {
- rowResult.Error = Error{Code: *row.Error.Code, Message: *row.Error.Message}
- } else {
- rowResult.ConsumedCapacityUnit.Read = *row.Consumed.CapacityUnit.Read
- rowResult.ConsumedCapacityUnit.Write = *row.Consumed.CapacityUnit.Write
- } /*else {
- rows, err := readRowsWithHeader(bytes.NewReader(row.Row))
- if err != nil {
- return nil, err
- }
- for _, pk := range (rows[0].primaryKey) {
- pkColumn := &PrimaryKeyColumn{ColumnName: string(pk.cellName), Value: pk.cellValue.Value}
- rowResult.PrimaryKey.PrimaryKeys = append(rowResult.PrimaryKey.PrimaryKeys, pkColumn)
- }
- for _, cell := range (rows[0].cells) {
- dataColumn := &DataColumn{ColumnName: string(cell.cellName), Value: cell.cellValue.Value}
- rowResult.Columns = append(rowResult.Columns, dataColumn)
- }
- rowResult.ConsumedCapacityUnit.Read = *row.Consumed.CapacityUnit.Read
- rowResult.ConsumedCapacityUnit.Write = *row.Consumed.CapacityUnit.Write
- }*/
- response.TableToRowsResult[*table.TableName] = append(response.TableToRowsResult[*table.TableName], *rowResult)
- }
- }
- return response, nil
- }
- // Get Range
- // @param GetRangeRequest
- func (tableStoreClient *TableStoreClient) GetRange(request *GetRangeRequest) (*GetRangeResponse, error) {
- req := new(otsprotocol.GetRangeRequest)
- req.TableName = proto.String(request.RangeRowQueryCriteria.TableName)
- req.Direction = request.RangeRowQueryCriteria.Direction.ToDirection().Enum()
- if request.RangeRowQueryCriteria.MaxVersion != 0 {
- req.MaxVersions = proto.Int32(request.RangeRowQueryCriteria.MaxVersion)
- }
- if request.RangeRowQueryCriteria.TransactionId != nil {
- req.TransactionId = request.RangeRowQueryCriteria.TransactionId
- }
- if request.RangeRowQueryCriteria.TimeRange != nil {
- if request.RangeRowQueryCriteria.TimeRange.Specific != 0 {
- req.TimeRange = &otsprotocol.TimeRange{SpecificTime: proto.Int64(request.RangeRowQueryCriteria.TimeRange.Specific)}
- } else {
- req.TimeRange = &otsprotocol.TimeRange{StartTime: proto.Int64(request.RangeRowQueryCriteria.TimeRange.Start), EndTime: proto.Int64(request.RangeRowQueryCriteria.TimeRange.End)}
- }
- } else if request.RangeRowQueryCriteria.MaxVersion == 0 {
- return nil, errInvalidInput
- }
- if request.RangeRowQueryCriteria.Limit != 0 {
- req.Limit = proto.Int32(request.RangeRowQueryCriteria.Limit)
- }
- if (request.RangeRowQueryCriteria.ColumnsToGet != nil) && len(request.RangeRowQueryCriteria.ColumnsToGet) > 0 {
- req.ColumnsToGet = request.RangeRowQueryCriteria.ColumnsToGet
- }
- if request.RangeRowQueryCriteria.Filter != nil {
- req.Filter = request.RangeRowQueryCriteria.Filter.Serialize()
- }
- if request.RangeRowQueryCriteria.StartColumn != nil {
- req.StartColumn = request.RangeRowQueryCriteria.StartColumn
- }
- if request.RangeRowQueryCriteria.EndColumn != nil {
- req.EndColumn = request.RangeRowQueryCriteria.EndColumn
- }
- req.InclusiveStartPrimaryKey = request.RangeRowQueryCriteria.StartPrimaryKey.Build(false)
- req.ExclusiveEndPrimaryKey = request.RangeRowQueryCriteria.EndPrimaryKey.Build(false)
- resp := new(otsprotocol.GetRangeResponse)
- response := &GetRangeResponse{ConsumedCapacityUnit: &ConsumedCapacityUnit{}}
- if err := tableStoreClient.doRequestWithRetry(getRangeUri, req, resp, &response.ResponseInfo); err != nil {
- return nil, err
- }
- response.ConsumedCapacityUnit.Read = *resp.Consumed.CapacityUnit.Read
- response.ConsumedCapacityUnit.Write = *resp.Consumed.CapacityUnit.Write
- if len(resp.NextStartPrimaryKey) != 0 {
- currentRows, err := readRowsWithHeader(bytes.NewReader(resp.NextStartPrimaryKey))
- if err != nil {
- return nil, err
- }
- response.NextStartPrimaryKey = &PrimaryKey{}
- for _, pk := range currentRows[0].primaryKey {
- pkColumn := &PrimaryKeyColumn{ColumnName: string(pk.cellName), Value: pk.cellValue.Value}
- response.NextStartPrimaryKey.PrimaryKeys = append(response.NextStartPrimaryKey.PrimaryKeys, pkColumn)
- }
- }
- if len(resp.Rows) == 0 {
- return response, nil
- }
- rows, err := readRowsWithHeader(bytes.NewReader(resp.Rows))
- if err != nil {
- return response, err
- }
- for _, row := range rows {
- currentRow := &Row{}
- currentpk := new(PrimaryKey)
- for _, pk := range row.primaryKey {
- pkColumn := &PrimaryKeyColumn{ColumnName: string(pk.cellName), Value: pk.cellValue.Value}
- currentpk.PrimaryKeys = append(currentpk.PrimaryKeys, pkColumn)
- }
- currentRow.PrimaryKey = currentpk
- for _, cell := range row.cells {
- dataColumn := &AttributeColumn{ColumnName: string(cell.cellName), Value: cell.cellValue.Value, Timestamp: cell.cellTimestamp}
- currentRow.Columns = append(currentRow.Columns, dataColumn)
- }
- response.Rows = append(response.Rows, currentRow)
- }
- return response, nil
- }
- func (client *TableStoreClient) ListStream(req *ListStreamRequest) (*ListStreamResponse, error) {
- pbReq := &otsprotocol.ListStreamRequest{}
- pbReq.TableName = req.TableName
- pbResp := otsprotocol.ListStreamResponse{}
- resp := ListStreamResponse{}
- if err := client.doRequestWithRetry(listStreamUri, pbReq, &pbResp, &resp.ResponseInfo); err != nil {
- return nil, err
- }
- streams := make([]Stream, len(pbResp.Streams))
- for i, pbStream := range pbResp.Streams {
- streams[i] = Stream{
- Id: (*StreamId)(pbStream.StreamId),
- TableName: pbStream.TableName,
- CreationTime: *pbStream.CreationTime}
- }
- resp.Streams = streams[:]
- return &resp, nil
- }
- func (client *TableStoreClient) DescribeStream(req *DescribeStreamRequest) (*DescribeStreamResponse, error) {
- pbReq := &otsprotocol.DescribeStreamRequest{}
- {
- pbReq.StreamId = (*string)(req.StreamId)
- pbReq.InclusiveStartShardId = (*string)(req.InclusiveStartShardId)
- pbReq.ShardLimit = req.ShardLimit
- }
- pbResp := otsprotocol.DescribeStreamResponse{}
- resp := DescribeStreamResponse{}
- if err := client.doRequestWithRetry(describeStreamUri, pbReq, &pbResp, &resp.ResponseInfo); err != nil {
- return nil, err
- }
- resp.StreamId = (*StreamId)(pbResp.StreamId)
- resp.ExpirationTime = *pbResp.ExpirationTime
- resp.TableName = pbResp.TableName
- resp.CreationTime = *pbResp.CreationTime
- Assert(pbResp.StreamStatus != nil, "StreamStatus in DescribeStreamResponse is required.")
- switch *pbResp.StreamStatus {
- case otsprotocol.StreamStatus_STREAM_ENABLING:
- resp.Status = SS_Enabling
- case otsprotocol.StreamStatus_STREAM_ACTIVE:
- resp.Status = SS_Active
- }
- resp.NextShardId = (*ShardId)(pbResp.NextShardId)
- shards := make([]*StreamShard, len(pbResp.Shards))
- for i, pbShard := range pbResp.Shards {
- shards[i] = &StreamShard{
- SelfShard: (*ShardId)(pbShard.ShardId),
- FatherShard: (*ShardId)(pbShard.ParentId),
- MotherShard: (*ShardId)(pbShard.ParentSiblingId)}
- }
- resp.Shards = shards[:]
- return &resp, nil
- }
- func (client *TableStoreClient) GetShardIterator(req *GetShardIteratorRequest) (*GetShardIteratorResponse, error) {
- pbReq := &otsprotocol.GetShardIteratorRequest{
- StreamId: (*string)(req.StreamId),
- ShardId: (*string)(req.ShardId)}
- if req.Timestamp != nil {
- pbReq.Timestamp = req.Timestamp
- }
- if req.Token != nil {
- pbReq.Token = req.Token
- }
- pbResp := otsprotocol.GetShardIteratorResponse{}
- resp := GetShardIteratorResponse{}
- if err := client.doRequestWithRetry(getShardIteratorUri, pbReq, &pbResp, &resp.ResponseInfo); err != nil {
- return nil, err
- }
- resp.ShardIterator = (*ShardIterator)(pbResp.ShardIterator)
- resp.Token = pbResp.NextToken
- return &resp, nil
- }
- func (client TableStoreClient) GetStreamRecord(req *GetStreamRecordRequest) (*GetStreamRecordResponse, error) {
- pbReq := &otsprotocol.GetStreamRecordRequest{
- ShardIterator: (*string)(req.ShardIterator)}
- if req.Limit != nil {
- pbReq.Limit = req.Limit
- }
- pbResp := otsprotocol.GetStreamRecordResponse{}
- resp := GetStreamRecordResponse{}
- if err := client.doRequestWithRetry(getStreamRecordUri, pbReq, &pbResp, &resp.ResponseInfo); err != nil {
- return nil, err
- }
- if pbResp.NextShardIterator != nil {
- resp.NextShardIterator = (*ShardIterator)(pbResp.NextShardIterator)
- }
- records := make([]*StreamRecord, len(pbResp.StreamRecords))
- for i, pbRecord := range pbResp.StreamRecords {
- record := StreamRecord{}
- records[i] = &record
- switch *pbRecord.ActionType {
- case otsprotocol.ActionType_PUT_ROW:
- record.Type = AT_Put
- case otsprotocol.ActionType_UPDATE_ROW:
- record.Type = AT_Update
- case otsprotocol.ActionType_DELETE_ROW:
- record.Type = AT_Delete
- }
- plainRows, err := readRowsWithHeader(bytes.NewReader(pbRecord.Record))
- if err != nil {
- return nil, err
- }
- Assert(len(plainRows) == 1,
- "There must be exactly one row in a StreamRecord.")
- plainRow := plainRows[0]
- pkey := PrimaryKey{}
- record.PrimaryKey = &pkey
- pkey.PrimaryKeys = make([]*PrimaryKeyColumn, len(plainRow.primaryKey))
- for i, pk := range plainRow.primaryKey {
- pkc := PrimaryKeyColumn{
- ColumnName: string(pk.cellName),
- Value: pk.cellValue.Value}
- pkey.PrimaryKeys[i] = &pkc
- }
- Assert(plainRow.extension != nil,
- "extension in a stream record is required.")
- record.Info = plainRow.extension
- record.Columns = make([]*RecordColumn, len(plainRow.cells))
- for i, plainCell := range plainRow.cells {
- cell := RecordColumn{}
- record.Columns[i] = &cell
- name := string(plainCell.cellName)
- cell.Name = &name
- if plainCell.cellValue != nil {
- cell.Type = RCT_Put
- } else {
- if plainCell.cellTimestamp > 0 {
- cell.Type = RCT_DeleteOneVersion
- } else {
- cell.Type = RCT_DeleteAllVersions
- }
- }
- switch cell.Type {
- case RCT_Put:
- cell.Value = plainCell.cellValue.Value
- fallthrough
- case RCT_DeleteOneVersion:
- cell.Timestamp = &plainCell.cellTimestamp
- case RCT_DeleteAllVersions:
- break
- }
- }
- }
- resp.Records = records
- return &resp, nil
- }
- func (client TableStoreClient) ComputeSplitPointsBySize(req *ComputeSplitPointsBySizeRequest) (*ComputeSplitPointsBySizeResponse, error) {
- pbReq := &otsprotocol.ComputeSplitPointsBySizeRequest{
- TableName: &(req.TableName),
- SplitSize: &(req.SplitSize),
- }
- pbResp := otsprotocol.ComputeSplitPointsBySizeResponse{}
- resp := ComputeSplitPointsBySizeResponse{}
- if err := client.doRequestWithRetry(computeSplitPointsBySizeRequestUri, pbReq, &pbResp, &resp.ResponseInfo); err != nil {
- return nil, err
- }
- beginPk := &PrimaryKey{}
- endPk := &PrimaryKey{}
- for _, pkSchema := range pbResp.Schema {
- beginPk.AddPrimaryKeyColumnWithMinValue(*pkSchema.Name)
- endPk.AddPrimaryKeyColumnWithMaxValue(*pkSchema.Name)
- }
- lastPk := beginPk
- nowPk := endPk
- for _, pbRecord := range pbResp.SplitPoints {
- plainRows, err := readRowsWithHeader(bytes.NewReader(pbRecord))
- if err != nil {
- return nil, err
- }
- nowPk = &PrimaryKey{}
- for _, pk := range plainRows[0].primaryKey {
- nowPk.AddPrimaryKeyColumn(string(pk.cellName), pk.cellValue.Value)
- }
- if len(pbResp.Schema) > 1 {
- for i := 1; i < len(pbResp.Schema); i++ {
- nowPk.AddPrimaryKeyColumnWithMinValue(*pbResp.Schema[i].Name)
- }
- }
- newSplit := &Split{LowerBound: lastPk, UpperBound: nowPk}
- resp.Splits = append(resp.Splits, newSplit)
- lastPk = nowPk
- }
- newSplit := &Split{LowerBound: lastPk, UpperBound: endPk}
- resp.Splits = append(resp.Splits, newSplit)
- index := 0
- for _, pbLocation := range pbResp.Locations {
- count := *pbLocation.Repeat
- value := *pbLocation.Location
- for i := int64(0); i < count; i++ {
- resp.Splits[index].Location = value
- index++
- }
- }
- return &resp, nil
- }
- func (client *TableStoreClient) StartLocalTransaction(request *StartLocalTransactionRequest) (*StartLocalTransactionResponse, error) {
- req := new(otsprotocol.StartLocalTransactionRequest)
- resp := new(otsprotocol.StartLocalTransactionResponse)
- req.TableName = proto.String(request.TableName)
- req.Key = request.PrimaryKey.Build(false)
- response := &StartLocalTransactionResponse{}
- if err := client.doRequestWithRetry(createlocaltransactionuri, req, resp, &response.ResponseInfo); err != nil {
- return nil, err
- }
- response.TransactionId = resp.TransactionId
- return response, nil
- }
- func (client *TableStoreClient) CommitTransaction(request *CommitTransactionRequest) (*CommitTransactionResponse, error) {
- req := new(otsprotocol.CommitTransactionRequest)
- resp := new(otsprotocol.CommitTransactionResponse)
- req.TransactionId = request.TransactionId
- response := &CommitTransactionResponse{}
- if err := client.doRequestWithRetry(committransactionuri, req, resp, &response.ResponseInfo); err != nil {
- return nil, err
- }
- return response, nil
- }
- func (client *TableStoreClient) AbortTransaction(request *AbortTransactionRequest) (*AbortTransactionResponse, error) {
- req := new(otsprotocol.AbortTransactionRequest)
- resp := new(otsprotocol.AbortTransactionResponse)
- req.TransactionId = request.TransactionId
- response := &AbortTransactionResponse{}
- if err := client.doRequestWithRetry(aborttransactionuri, req, resp, &response.ResponseInfo); err != nil {
- return nil, err
- }
- return response, nil
- }
|