api.go 44 KB


  1. package tablestore
  2. import (
  3. "bytes"
  4. "crypto/md5"
  5. "encoding/base64"
  6. "fmt"
  7. "github.com/aliyun/aliyun-tablestore-go-sdk/tablestore/otsprotocol"
  8. "github.com/golang/protobuf/proto"
  9. "io"
  10. "math/rand"
  11. "net"
  12. "net/http"
  13. "strings"
  14. "time"
  15. )
  16. const (
  17. userAgent = "aliyun-tablestore-sdk-golang/4.0.2"
  18. createTableUri = "/CreateTable"
  19. listTableUri = "/ListTable"
  20. deleteTableUri = "/DeleteTable"
  21. describeTableUri = "/DescribeTable"
  22. updateTableUri = "/UpdateTable"
  23. putRowUri = "/PutRow"
  24. deleteRowUri = "/DeleteRow"
  25. getRowUri = "/GetRow"
  26. updateRowUri = "/UpdateRow"
  27. batchGetRowUri = "/BatchGetRow"
  28. batchWriteRowUri = "/BatchWriteRow"
  29. getRangeUri = "/GetRange"
  30. listStreamUri = "/ListStream"
  31. describeStreamUri = "/DescribeStream"
  32. getShardIteratorUri = "/GetShardIterator"
  33. getStreamRecordUri = "/GetStreamRecord"
  34. computeSplitPointsBySizeRequestUri = "/ComputeSplitPointsBySize"
  35. searchUri = "/Search"
  36. createSearchIndexUri = "/CreateSearchIndex"
  37. listSearchIndexUri = "/ListSearchIndex"
  38. deleteSearchIndexUri = "/DeleteSearchIndex"
  39. describeSearchIndexUri = "/DescribeSearchIndex"
  40. createIndexUri = "/CreateIndex"
  41. dropIndexUri = "/DropIndex"
  42. createlocaltransactionuri = "/StartLocalTransaction"
  43. committransactionuri = "/CommitTransaction"
  44. aborttransactionuri = "/AbortTransaction"
  45. )
  46. // Constructor: to create the client of TableStore service.
  47. // 构造函数:创建表格存储服务的客户端。
  48. //
  49. // @param endPoint The address of TableStore service. 表格存储服务地址。
  50. // @param instanceName
  51. // @param accessId The Access ID. 用于标示用户的ID。
  52. // @param accessKey The Access Key. 用于签名和验证的密钥。
  53. // @param options set client config
  54. func NewClient(endPoint, instanceName, accessKeyId, accessKeySecret string, options ...ClientOption) *TableStoreClient {
  55. client := NewClientWithConfig(endPoint, instanceName, accessKeyId, accessKeySecret, "", nil)
  56. // client options parse
  57. for _, option := range options {
  58. option(client)
  59. }
  60. return client
  61. }
  62. type GetHttpClient func() IHttpClient
  63. var currentGetHttpClientFunc GetHttpClient = func() IHttpClient {
  64. return &TableStoreHttpClient{}
  65. }
  66. // Constructor: to create the client of OTS service. 传入config
  67. // 构造函数:创建OTS服务的客户端。
  68. func NewClientWithConfig(endPoint, instanceName, accessKeyId, accessKeySecret string, securityToken string, config *TableStoreConfig) *TableStoreClient {
  69. tableStoreClient := new(TableStoreClient)
  70. tableStoreClient.endPoint = endPoint
  71. tableStoreClient.instanceName = instanceName
  72. tableStoreClient.accessKeyId = accessKeyId
  73. tableStoreClient.accessKeySecret = accessKeySecret
  74. tableStoreClient.securityToken = securityToken
  75. if config == nil {
  76. config = NewDefaultTableStoreConfig()
  77. }
  78. tableStoreClient.config = config
  79. var tableStoreTransportProxy http.RoundTripper
  80. if config.Transport != nil {
  81. tableStoreTransportProxy = config.Transport
  82. } else {
  83. tableStoreTransportProxy = &http.Transport{
  84. MaxIdleConnsPerHost: config.MaxIdleConnections,
  85. Dial: (&net.Dialer{
  86. Timeout: config.HTTPTimeout.ConnectionTimeout,
  87. }).Dial,
  88. }
  89. }
  90. tableStoreClient.httpClient = currentGetHttpClientFunc()
  91. httpClient := &http.Client{
  92. Transport: tableStoreTransportProxy,
  93. Timeout: tableStoreClient.config.HTTPTimeout.RequestTimeout,
  94. }
  95. tableStoreClient.httpClient.New(httpClient)
  96. tableStoreClient.random = rand.New(rand.NewSource(time.Now().Unix()))
  97. return tableStoreClient
  98. }
  99. func NewClientWithExternalHeader(endPoint, instanceName, accessKeyId, accessKeySecret string, securityToken string, config *TableStoreConfig, header map[string]string) *TableStoreClient {
  100. tableStoreClient := NewClientWithConfig(endPoint, instanceName, accessKeyId, accessKeySecret, securityToken, config)
  101. tableStoreClient.externalHeader = header
  102. return tableStoreClient
  103. }
  104. // 请求服务端
  105. func (tableStoreClient *TableStoreClient) doRequestWithRetry(uri string, req, resp proto.Message, responseInfo *ResponseInfo) error {
  106. end := time.Now().Add(tableStoreClient.config.MaxRetryTime)
  107. url := fmt.Sprintf("%s%s", tableStoreClient.endPoint, uri)
  108. /* request body */
  109. var body []byte
  110. var err error
  111. if req != nil {
  112. body, err = proto.Marshal(req)
  113. if err != nil {
  114. return err
  115. }
  116. } else {
  117. body = nil
  118. }
  119. var value int64
  120. var i uint
  121. var respBody []byte
  122. var requestId string
  123. for i = 0; ; i++ {
  124. respBody, err, requestId = tableStoreClient.doRequest(url, uri, body, resp)
  125. responseInfo.RequestId = requestId
  126. if err == nil {
  127. break
  128. } else {
  129. value = getNextPause(tableStoreClient, err, i, end, value, uri)
  130. // fmt.Println("hit retry", uri, err, *e.Code, value)
  131. if value <= 0 {
  132. return err
  133. }
  134. time.Sleep(time.Duration(value) * time.Millisecond)
  135. }
  136. }
  137. if respBody == nil || len(respBody) == 0 {
  138. return nil
  139. }
  140. err = proto.Unmarshal(respBody, resp)
  141. if err != nil {
  142. return fmt.Errorf("decode resp failed: %s", err)
  143. }
  144. return nil
  145. }
  146. func getNextPause(tableStoreClient *TableStoreClient, err error, count uint, end time.Time, lastInterval int64, action string) int64 {
  147. if tableStoreClient.config.RetryTimes <= count || time.Now().After(end) {
  148. return 0
  149. }
  150. var retry bool
  151. if otsErr, ok := err.(*OtsError); ok {
  152. retry = shouldRetry(tableStoreClient, otsErr.Code, otsErr.Message, action, otsErr.HttpStatusCode)
  153. } else {
  154. if err == io.EOF || err == io.ErrUnexpectedEOF || //retry on special net error contains EOF or reset
  155. strings.Contains(err.Error(), io.EOF.Error()) ||
  156. strings.Contains(err.Error(), "Connection reset by peer") ||
  157. strings.Contains(err.Error(), "connection reset by peer") {
  158. retry = true
  159. } else if nErr, ok := err.(net.Error); ok {
  160. retry = nErr.Temporary()
  161. }
  162. }
  163. if retry {
  164. value := lastInterval*2 + tableStoreClient.random.Int63n(DefaultRetryInterval-1) + 1
  165. if value > MaxRetryInterval {
  166. value = MaxRetryInterval
  167. }
  168. return value
  169. }
  170. return 0
  171. }
  172. func shouldRetry(tableStoreClient *TableStoreClient, errorCode string, errorMsg string, action string, statusCode int) bool {
  173. if tableStoreClient.CustomizedRetryFunc != nil {
  174. if tableStoreClient.CustomizedRetryFunc(errorCode, errorMsg, action, statusCode) == true {
  175. return true
  176. }
  177. }
  178. if retryNotMatterActions(errorCode, errorMsg) == true {
  179. return true
  180. }
  181. if isIdempotent(action) &&
  182. (errorCode == STORAGE_TIMEOUT || errorCode == INTERNAL_SERVER_ERROR || errorCode == SERVER_UNAVAILABLE) {
  183. return true
  184. }
  185. return false
  186. }
  187. type CustomizedRetryNotMatterActions func(errorCode string, errorMsg string, action string, httpStatus int) bool
  188. func retryNotMatterActions(errorCode string, errorMsg string) bool {
  189. if errorCode == ROW_OPERATION_CONFLICT || errorCode == NOT_ENOUGH_CAPACITY_UNIT ||
  190. errorCode == TABLE_NOT_READY || errorCode == PARTITION_UNAVAILABLE ||
  191. errorCode == SERVER_BUSY || errorCode == STORAGE_SERVER_BUSY || (errorCode == QUOTA_EXHAUSTED && errorMsg == "Too frequent table operations.") {
  192. return true
  193. } else {
  194. return false
  195. }
  196. }
  197. func isIdempotent(action string) bool {
  198. if action == batchGetRowUri || action == describeTableUri ||
  199. action == getRangeUri || action == getRowUri ||
  200. action == listTableUri || action == listStreamUri ||
  201. action == getStreamRecordUri || action == describeStreamUri {
  202. return true
  203. } else {
  204. return false
  205. }
  206. }
  207. func (tableStoreClient *TableStoreClient) doRequest(url string, uri string, body []byte, resp proto.Message) ([]byte, error, string) {
  208. hreq, err := http.NewRequest("POST", url, bytes.NewBuffer(body))
  209. if err != nil {
  210. return nil, err, ""
  211. }
  212. /* set headers */
  213. hreq.Header.Set("User-Agent", userAgent)
  214. date := time.Now().UTC().Format(xOtsDateFormat)
  215. hreq.Header.Set(xOtsDate, date)
  216. hreq.Header.Set(xOtsApiversion, ApiVersion)
  217. hreq.Header.Set(xOtsAccesskeyid, tableStoreClient.accessKeyId)
  218. hreq.Header.Set(xOtsInstanceName, tableStoreClient.instanceName)
  219. for key, value := range tableStoreClient.externalHeader {
  220. hreq.Header[key] = []string{value}
  221. }
  222. md5Byte := md5.Sum(body)
  223. md5Base64 := base64.StdEncoding.EncodeToString(md5Byte[:16])
  224. hreq.Header.Set(xOtsContentmd5, md5Base64)
  225. otshead := createOtsHeaders(tableStoreClient.accessKeySecret)
  226. otshead.set(xOtsDate, date)
  227. otshead.set(xOtsApiversion, ApiVersion)
  228. otshead.set(xOtsAccesskeyid, tableStoreClient.accessKeyId)
  229. if tableStoreClient.securityToken != "" {
  230. hreq.Header.Set(xOtsHeaderStsToken, tableStoreClient.securityToken)
  231. otshead.set(xOtsHeaderStsToken, tableStoreClient.securityToken)
  232. }
  233. otshead.set(xOtsContentmd5, md5Base64)
  234. otshead.set(xOtsInstanceName, tableStoreClient.instanceName)
  235. for key, value := range tableStoreClient.externalHeader {
  236. if strings.HasPrefix(key, xOtsPrefix) {
  237. otshead.set(key, value)
  238. }
  239. }
  240. sign, err := otshead.signature(uri, "POST", tableStoreClient.accessKeySecret)
  241. if err != nil {
  242. return nil, err, ""
  243. }
  244. hreq.Header.Set(xOtsSignature, sign)
  245. /* end set headers */
  246. return tableStoreClient.postReq(hreq, url)
  247. }
  248. // table API
  249. // Create a table with the CreateTableRequest, in which the table name and
  250. // primary keys are required.
  251. // 根据CreateTableRequest创建一个表,其中表名和主健列是必选项
  252. //
  253. // @param request of CreateTableRequest.
  254. // @return Void. 无返回值。
  255. func (tableStoreClient *TableStoreClient) CreateTable(request *CreateTableRequest) (*CreateTableResponse, error) {
  256. if len(request.TableMeta.TableName) > maxTableNameLength {
  257. return nil, errTableNameTooLong(request.TableMeta.TableName)
  258. }
  259. if len(request.TableMeta.SchemaEntry) > maxPrimaryKeyNum {
  260. return nil, errPrimaryKeyTooMuch
  261. }
  262. if len(request.TableMeta.SchemaEntry) == 0 {
  263. return nil, errCreateTableNoPrimaryKey
  264. }
  265. req := new(otsprotocol.CreateTableRequest)
  266. req.TableMeta = new(otsprotocol.TableMeta)
  267. req.TableMeta.TableName = proto.String(request.TableMeta.TableName)
  268. if len(request.TableMeta.DefinedColumns) > 0 {
  269. for _, value := range request.TableMeta.DefinedColumns {
  270. req.TableMeta.DefinedColumn = append(req.TableMeta.DefinedColumn, &otsprotocol.DefinedColumnSchema{Name: &value.Name, Type: value.ColumnType.ConvertToPbDefinedColumnType().Enum()})
  271. }
  272. }
  273. if len(request.IndexMetas) > 0 {
  274. for _, value := range request.IndexMetas {
  275. req.IndexMetas = append(req.IndexMetas, value.ConvertToPbIndexMeta())
  276. }
  277. }
  278. for _, key := range request.TableMeta.SchemaEntry {
  279. keyType := otsprotocol.PrimaryKeyType(*key.Type)
  280. if key.Option != nil {
  281. keyOption := otsprotocol.PrimaryKeyOption(*key.Option)
  282. req.TableMeta.PrimaryKey = append(req.TableMeta.PrimaryKey, &otsprotocol.PrimaryKeySchema{Name: key.Name, Type: &keyType, Option: &keyOption})
  283. } else {
  284. req.TableMeta.PrimaryKey = append(req.TableMeta.PrimaryKey, &otsprotocol.PrimaryKeySchema{Name: key.Name, Type: &keyType})
  285. }
  286. }
  287. req.ReservedThroughput = new(otsprotocol.ReservedThroughput)
  288. req.ReservedThroughput.CapacityUnit = new(otsprotocol.CapacityUnit)
  289. req.ReservedThroughput.CapacityUnit.Read = proto.Int32(int32(request.ReservedThroughput.Readcap))
  290. req.ReservedThroughput.CapacityUnit.Write = proto.Int32(int32(request.ReservedThroughput.Writecap))
  291. req.TableOptions = new(otsprotocol.TableOptions)
  292. req.TableOptions.TimeToLive = proto.Int32(int32(request.TableOption.TimeToAlive))
  293. req.TableOptions.MaxVersions = proto.Int32(int32(request.TableOption.MaxVersion))
  294. if request.TableOption.DeviationCellVersionInSec > 0 {
  295. req.TableOptions.DeviationCellVersionInSec = proto.Int64(request.TableOption.DeviationCellVersionInSec)
  296. }
  297. if request.StreamSpec != nil {
  298. var ss otsprotocol.StreamSpecification
  299. if request.StreamSpec.EnableStream {
  300. ss = otsprotocol.StreamSpecification{
  301. EnableStream: &request.StreamSpec.EnableStream,
  302. ExpirationTime: &request.StreamSpec.ExpirationTime}
  303. } else {
  304. ss = otsprotocol.StreamSpecification{
  305. EnableStream: &request.StreamSpec.EnableStream}
  306. }
  307. req.StreamSpec = &ss
  308. }
  309. resp := new(otsprotocol.CreateTableResponse)
  310. response := &CreateTableResponse{}
  311. if err := tableStoreClient.doRequestWithRetry(createTableUri, req, resp, &response.ResponseInfo); err != nil {
  312. return nil, err
  313. }
  314. return response, nil
  315. }
  316. func (tableStoreClient *TableStoreClient) CreateIndex(request *CreateIndexRequest) (*CreateIndexResponse, error) {
  317. if len(request.MainTableName) > maxTableNameLength {
  318. return nil, errTableNameTooLong(request.MainTableName)
  319. }
  320. req := new(otsprotocol.CreateIndexRequest)
  321. req.IndexMeta = request.IndexMeta.ConvertToPbIndexMeta()
  322. req.IncludeBaseData = proto.Bool(request.IncludeBaseData)
  323. req.MainTableName = proto.String(request.MainTableName)
  324. resp := new(otsprotocol.CreateIndexResponse)
  325. response := &CreateIndexResponse{}
  326. if err := tableStoreClient.doRequestWithRetry(createIndexUri, req, resp, &response.ResponseInfo); err != nil {
  327. return nil, err
  328. }
  329. return response, nil
  330. }
  331. func (tableStoreClient *TableStoreClient) DeleteIndex(request *DeleteIndexRequest) (*DeleteIndexResponse, error) {
  332. if len(request.MainTableName) > maxTableNameLength {
  333. return nil, errTableNameTooLong(request.MainTableName)
  334. }
  335. req := new(otsprotocol.DropIndexRequest)
  336. req.IndexName = proto.String(request.IndexName)
  337. req.MainTableName = proto.String(request.MainTableName)
  338. resp := new(otsprotocol.DropIndexResponse)
  339. response := &DeleteIndexResponse{}
  340. if err := tableStoreClient.doRequestWithRetry(dropIndexUri, req, resp, &response.ResponseInfo); err != nil {
  341. return nil, err
  342. }
  343. return response, nil
  344. }
  345. // List all tables. If done, all table names will be returned.
  346. // 列出所有的表,如果操作成功,将返回所有表的名称。
  347. //
  348. // @param tableNames The returned table names. 返回的表名集合。
  349. // @return Void. 无返回值。
  350. func (tableStoreClient *TableStoreClient) ListTable() (*ListTableResponse, error) {
  351. resp := new(otsprotocol.ListTableResponse)
  352. response := &ListTableResponse{}
  353. if err := tableStoreClient.doRequestWithRetry(listTableUri, nil, resp, &response.ResponseInfo); err != nil {
  354. return response, err
  355. }
  356. response.TableNames = resp.TableNames
  357. return response, nil
  358. }
  359. // Delete a table and all its views will be deleted.
  360. // 删除一个表
  361. //
  362. // @param tableName The table name. 表名。
  363. // @return Void. 无返回值。
  364. func (tableStoreClient *TableStoreClient) DeleteTable(request *DeleteTableRequest) (*DeleteTableResponse, error) {
  365. req := new(otsprotocol.DeleteTableRequest)
  366. req.TableName = proto.String(request.TableName)
  367. response := &DeleteTableResponse{}
  368. if err := tableStoreClient.doRequestWithRetry(deleteTableUri, req, nil, &response.ResponseInfo); err != nil {
  369. return nil, err
  370. }
  371. return response, nil
  372. }
  373. // Query the tablemeta, tableoption and reservedthroughtputdetails
  374. // @param DescribeTableRequest
  375. // @param DescribeTableResponse
  376. func (tableStoreClient *TableStoreClient) DescribeTable(request *DescribeTableRequest) (*DescribeTableResponse, error) {
  377. req := new(otsprotocol.DescribeTableRequest)
  378. req.TableName = proto.String(request.TableName)
  379. resp := new(otsprotocol.DescribeTableResponse)
  380. response := new(DescribeTableResponse)
  381. if err := tableStoreClient.doRequestWithRetry(describeTableUri, req, resp, &response.ResponseInfo); err != nil {
  382. return &DescribeTableResponse{}, err
  383. }
  384. response.ReservedThroughput = &ReservedThroughput{Readcap: int(*(resp.ReservedThroughputDetails.CapacityUnit.Read)), Writecap: int(*(resp.ReservedThroughputDetails.CapacityUnit.Write))}
  385. responseTableMeta := new(TableMeta)
  386. responseTableMeta.TableName = *resp.TableMeta.TableName
  387. for _, key := range resp.TableMeta.PrimaryKey {
  388. keyType := PrimaryKeyType(*key.Type)
  389. // enable it when we support kep option in describe table
  390. if key.Option != nil {
  391. keyOption := PrimaryKeyOption(*key.Option)
  392. responseTableMeta.SchemaEntry = append(responseTableMeta.SchemaEntry, &PrimaryKeySchema{Name: key.Name, Type: &keyType, Option: &keyOption})
  393. } else {
  394. responseTableMeta.SchemaEntry = append(responseTableMeta.SchemaEntry, &PrimaryKeySchema{Name: key.Name, Type: &keyType})
  395. }
  396. }
  397. response.TableMeta = responseTableMeta
  398. response.TableOption = &TableOption{TimeToAlive: int(*resp.TableOptions.TimeToLive), MaxVersion: int(*resp.TableOptions.MaxVersions), DeviationCellVersionInSec: *resp.TableOptions.DeviationCellVersionInSec}
  399. if resp.StreamDetails != nil && *resp.StreamDetails.EnableStream {
  400. response.StreamDetails = &StreamDetails{
  401. EnableStream: *resp.StreamDetails.EnableStream,
  402. StreamId: (*StreamId)(resp.StreamDetails.StreamId),
  403. ExpirationTime: *resp.StreamDetails.ExpirationTime,
  404. LastEnableTime: *resp.StreamDetails.LastEnableTime}
  405. } else {
  406. response.StreamDetails = &StreamDetails{
  407. EnableStream: false}
  408. }
  409. for _, meta := range resp.IndexMetas {
  410. response.IndexMetas = append(response.IndexMetas, ConvertPbIndexMetaToIndexMeta(meta))
  411. }
  412. return response, nil
  413. }
  414. // Update the table info includes tableoptions and reservedthroughput
  415. // @param UpdateTableRequest
  416. // @param UpdateTableResponse
  417. func (tableStoreClient *TableStoreClient) UpdateTable(request *UpdateTableRequest) (*UpdateTableResponse, error) {
  418. req := new(otsprotocol.UpdateTableRequest)
  419. req.TableName = proto.String(request.TableName)
  420. if request.ReservedThroughput != nil {
  421. req.ReservedThroughput = new(otsprotocol.ReservedThroughput)
  422. req.ReservedThroughput.CapacityUnit = new(otsprotocol.CapacityUnit)
  423. req.ReservedThroughput.CapacityUnit.Read = proto.Int32(int32(request.ReservedThroughput.Readcap))
  424. req.ReservedThroughput.CapacityUnit.Write = proto.Int32(int32(request.ReservedThroughput.Writecap))
  425. }
  426. if request.TableOption != nil {
  427. req.TableOptions = new(otsprotocol.TableOptions)
  428. req.TableOptions.TimeToLive = proto.Int32(int32(request.TableOption.TimeToAlive))
  429. req.TableOptions.MaxVersions = proto.Int32(int32(request.TableOption.MaxVersion))
  430. if request.TableOption.DeviationCellVersionInSec > 0 {
  431. req.TableOptions.DeviationCellVersionInSec = proto.Int64(request.TableOption.DeviationCellVersionInSec)
  432. }
  433. }
  434. if request.StreamSpec != nil {
  435. if request.StreamSpec.EnableStream == true {
  436. req.StreamSpec = &otsprotocol.StreamSpecification{
  437. EnableStream: &request.StreamSpec.EnableStream,
  438. ExpirationTime: &request.StreamSpec.ExpirationTime}
  439. } else {
  440. req.StreamSpec = &otsprotocol.StreamSpecification{EnableStream: &request.StreamSpec.EnableStream}
  441. }
  442. }
  443. resp := new(otsprotocol.UpdateTableResponse)
  444. response := new(UpdateTableResponse)
  445. if err := tableStoreClient.doRequestWithRetry(updateTableUri, req, resp, &response.ResponseInfo); err != nil {
  446. return nil, err
  447. }
  448. response.ReservedThroughput = &ReservedThroughput{
  449. Readcap: int(*(resp.ReservedThroughputDetails.CapacityUnit.Read)),
  450. Writecap: int(*(resp.ReservedThroughputDetails.CapacityUnit.Write))}
  451. response.TableOption = &TableOption{
  452. TimeToAlive: int(*resp.TableOptions.TimeToLive),
  453. MaxVersion: int(*resp.TableOptions.MaxVersions),
  454. DeviationCellVersionInSec: *resp.TableOptions.DeviationCellVersionInSec}
  455. if *resp.StreamDetails.EnableStream {
  456. response.StreamDetails = &StreamDetails{
  457. EnableStream: *resp.StreamDetails.EnableStream,
  458. StreamId: (*StreamId)(resp.StreamDetails.StreamId),
  459. ExpirationTime: *resp.StreamDetails.ExpirationTime,
  460. LastEnableTime: *resp.StreamDetails.LastEnableTime}
  461. } else {
  462. response.StreamDetails = &StreamDetails{
  463. EnableStream: false}
  464. }
  465. return response, nil
  466. }
  467. // Put or update a row in a table. The operation is determined by CheckingType,
  468. // which has three options: NO, UPDATE, INSERT. The transaction id is optional.
  469. // 插入或更新行数据。操作针对数据的存在性包含三种检查类型:NO(不检查),UPDATE
  470. // (更新,数据必须存在)和INSERT(插入,数据必须不存在)。事务ID是可选项。
  471. //
  472. // @param builder The builder for putting a row. 插入或更新数据的Builder。
  473. // @return Void. 无返回值。
  474. func (tableStoreClient *TableStoreClient) PutRow(request *PutRowRequest) (*PutRowResponse, error) {
  475. if request == nil {
  476. return nil, nil
  477. }
  478. if request.PutRowChange == nil {
  479. return nil, nil
  480. }
  481. req := new(otsprotocol.PutRowRequest)
  482. req.TableName = proto.String(request.PutRowChange.TableName)
  483. req.Row = request.PutRowChange.Serialize()
  484. condition := new(otsprotocol.Condition)
  485. condition.RowExistence = request.PutRowChange.Condition.buildCondition()
  486. if request.PutRowChange.Condition.ColumnCondition != nil {
  487. condition.ColumnCondition = request.PutRowChange.Condition.ColumnCondition.Serialize()
  488. }
  489. if request.PutRowChange.ReturnType == ReturnType_RT_PK {
  490. content := otsprotocol.ReturnContent{ReturnType: otsprotocol.ReturnType_RT_PK.Enum()}
  491. req.ReturnContent = &content
  492. }
  493. if request.PutRowChange.TransactionId != nil {
  494. req.TransactionId = request.PutRowChange.TransactionId
  495. }
  496. req.Condition = condition
  497. resp := new(otsprotocol.PutRowResponse)
  498. response := &PutRowResponse{}
  499. if err := tableStoreClient.doRequestWithRetry(putRowUri, req, resp, &response.ResponseInfo); err != nil {
  500. return nil, err
  501. }
  502. response.ConsumedCapacityUnit = &ConsumedCapacityUnit{}
  503. response.ConsumedCapacityUnit.Read = *resp.Consumed.CapacityUnit.Read
  504. response.ConsumedCapacityUnit.Write = *resp.Consumed.CapacityUnit.Write
  505. if request.PutRowChange.ReturnType == ReturnType_RT_PK {
  506. rows, err := readRowsWithHeader(bytes.NewReader(resp.Row))
  507. if err != nil {
  508. return response, err
  509. }
  510. for _, pk := range rows[0].primaryKey {
  511. pkColumn := &PrimaryKeyColumn{ColumnName: string(pk.cellName), Value: pk.cellValue.Value}
  512. response.PrimaryKey.PrimaryKeys = append(response.PrimaryKey.PrimaryKeys, pkColumn)
  513. }
  514. }
  515. return response, nil
  516. }
  517. // Delete row with pk
  518. // @param DeleteRowRequest
  519. func (tableStoreClient *TableStoreClient) DeleteRow(request *DeleteRowRequest) (*DeleteRowResponse, error) {
  520. req := new(otsprotocol.DeleteRowRequest)
  521. req.TableName = proto.String(request.DeleteRowChange.TableName)
  522. req.Condition = request.DeleteRowChange.getCondition()
  523. req.PrimaryKey = request.DeleteRowChange.PrimaryKey.Build(true)
  524. if request.DeleteRowChange.TransactionId != nil {
  525. req.TransactionId = request.DeleteRowChange.TransactionId
  526. }
  527. resp := new(otsprotocol.DeleteRowResponse)
  528. response := &DeleteRowResponse{}
  529. if err := tableStoreClient.doRequestWithRetry(deleteRowUri, req, resp, &response.ResponseInfo); err != nil {
  530. return nil, err
  531. }
  532. response.ConsumedCapacityUnit = &ConsumedCapacityUnit{}
  533. response.ConsumedCapacityUnit.Read = *resp.Consumed.CapacityUnit.Read
  534. response.ConsumedCapacityUnit.Write = *resp.Consumed.CapacityUnit.Write
  535. return response, nil
  536. }
  537. // row API
  538. // Get the data of a row or some columns.
  539. //
  540. // @param getrowrequest
  541. func (tableStoreClient *TableStoreClient) GetRow(request *GetRowRequest) (*GetRowResponse, error) {
  542. req := new(otsprotocol.GetRowRequest)
  543. resp := new(otsprotocol.GetRowResponse)
  544. req.TableName = proto.String(request.SingleRowQueryCriteria.TableName)
  545. if (request.SingleRowQueryCriteria.getColumnsToGet() != nil) && len(request.SingleRowQueryCriteria.getColumnsToGet()) > 0 {
  546. req.ColumnsToGet = request.SingleRowQueryCriteria.getColumnsToGet()
  547. }
  548. req.PrimaryKey = request.SingleRowQueryCriteria.PrimaryKey.Build(false)
  549. if request.SingleRowQueryCriteria.MaxVersion != 0 {
  550. req.MaxVersions = proto.Int32(int32(request.SingleRowQueryCriteria.MaxVersion))
  551. }
  552. if request.SingleRowQueryCriteria.TransactionId != nil {
  553. req.TransactionId = request.SingleRowQueryCriteria.TransactionId
  554. }
  555. if request.SingleRowQueryCriteria.StartColumn != nil {
  556. req.StartColumn = request.SingleRowQueryCriteria.StartColumn
  557. }
  558. if request.SingleRowQueryCriteria.EndColumn != nil {
  559. req.EndColumn = request.SingleRowQueryCriteria.EndColumn
  560. }
  561. if request.SingleRowQueryCriteria.TimeRange != nil {
  562. if request.SingleRowQueryCriteria.TimeRange.Specific != 0 {
  563. req.TimeRange = &otsprotocol.TimeRange{SpecificTime: proto.Int64(request.SingleRowQueryCriteria.TimeRange.Specific)}
  564. } else {
  565. req.TimeRange = &otsprotocol.TimeRange{StartTime: proto.Int64(request.SingleRowQueryCriteria.TimeRange.Start), EndTime: proto.Int64(request.SingleRowQueryCriteria.TimeRange.End)}
  566. }
  567. } else if request.SingleRowQueryCriteria.MaxVersion == 0 {
  568. return nil, errInvalidInput
  569. }
  570. if request.SingleRowQueryCriteria.Filter != nil {
  571. req.Filter = request.SingleRowQueryCriteria.Filter.Serialize()
  572. }
  573. response := &GetRowResponse{ConsumedCapacityUnit: &ConsumedCapacityUnit{}}
  574. if err := tableStoreClient.doRequestWithRetry(getRowUri, req, resp, &response.ResponseInfo); err != nil {
  575. return nil, err
  576. }
  577. response.ConsumedCapacityUnit.Read = *resp.Consumed.CapacityUnit.Read
  578. response.ConsumedCapacityUnit.Write = *resp.Consumed.CapacityUnit.Write
  579. if len(resp.Row) == 0 {
  580. return response, nil
  581. }
  582. rows, err := readRowsWithHeader(bytes.NewReader(resp.Row))
  583. if err != nil {
  584. return nil, err
  585. }
  586. for _, pk := range rows[0].primaryKey {
  587. pkColumn := &PrimaryKeyColumn{ColumnName: string(pk.cellName), Value: pk.cellValue.Value}
  588. response.PrimaryKey.PrimaryKeys = append(response.PrimaryKey.PrimaryKeys, pkColumn)
  589. }
  590. for _, cell := range rows[0].cells {
  591. dataColumn := &AttributeColumn{ColumnName: string(cell.cellName), Value: cell.cellValue.Value, Timestamp: cell.cellTimestamp}
  592. response.Columns = append(response.Columns, dataColumn)
  593. }
  594. return response, nil
  595. }
  596. // Update row
  597. // @param UpdateRowRequest
  598. func (tableStoreClient *TableStoreClient) UpdateRow(request *UpdateRowRequest) (*UpdateRowResponse, error) {
  599. req := new(otsprotocol.UpdateRowRequest)
  600. resp := new(otsprotocol.UpdateRowResponse)
  601. req.TableName = proto.String(request.UpdateRowChange.TableName)
  602. req.Condition = request.UpdateRowChange.getCondition()
  603. req.RowChange = request.UpdateRowChange.Serialize()
  604. if request.UpdateRowChange.TransactionId != nil {
  605. req.TransactionId = request.UpdateRowChange.TransactionId
  606. }
  607. response := &UpdateRowResponse{ConsumedCapacityUnit: &ConsumedCapacityUnit{}}
  608. if request.UpdateRowChange.ReturnType == ReturnType_RT_AFTER_MODIFY {
  609. content := otsprotocol.ReturnContent{ReturnType: otsprotocol.ReturnType_RT_AFTER_MODIFY.Enum()}
  610. for _, column := range request.UpdateRowChange.ColumnNamesToReturn {
  611. content.ReturnColumnNames = append(content.ReturnColumnNames, column)
  612. }
  613. req.ReturnContent = &content
  614. }
  615. if err := tableStoreClient.doRequestWithRetry(updateRowUri, req, resp, &response.ResponseInfo); err != nil {
  616. return nil, err
  617. }
  618. if request.UpdateRowChange.ReturnType == ReturnType_RT_AFTER_MODIFY {
  619. plainbufferRow, err := readRowsWithHeader(bytes.NewReader(resp.Row))
  620. if err != nil {
  621. return response, err
  622. }
  623. for _, cell := range plainbufferRow[0].cells {
  624. attribute := &AttributeColumn{ColumnName: string(cell.cellName), Value: cell.cellValue.Value, Timestamp: cell.cellTimestamp}
  625. response.Columns = append(response.Columns, attribute)
  626. }
  627. }
  628. response.ConsumedCapacityUnit.Read = *resp.Consumed.CapacityUnit.Read
  629. response.ConsumedCapacityUnit.Write = *resp.Consumed.CapacityUnit.Write
  630. return response, nil
  631. }
  632. // Batch Get Row
  633. // @param BatchGetRowRequest
  634. func (tableStoreClient *TableStoreClient) BatchGetRow(request *BatchGetRowRequest) (*BatchGetRowResponse, error) {
  635. req := new(otsprotocol.BatchGetRowRequest)
  636. var tablesInBatch []*otsprotocol.TableInBatchGetRowRequest
  637. for _, Criteria := range request.MultiRowQueryCriteria {
  638. table := new(otsprotocol.TableInBatchGetRowRequest)
  639. table.TableName = proto.String(Criteria.TableName)
  640. table.ColumnsToGet = Criteria.ColumnsToGet
  641. if Criteria.StartColumn != nil {
  642. table.StartColumn = Criteria.StartColumn
  643. }
  644. if Criteria.EndColumn != nil {
  645. table.EndColumn = Criteria.EndColumn
  646. }
  647. if Criteria.Filter != nil {
  648. table.Filter = Criteria.Filter.Serialize()
  649. }
  650. if Criteria.MaxVersion != 0 {
  651. table.MaxVersions = proto.Int32(int32(Criteria.MaxVersion))
  652. }
  653. if Criteria.TimeRange != nil {
  654. if Criteria.TimeRange.Specific != 0 {
  655. table.TimeRange = &otsprotocol.TimeRange{SpecificTime: proto.Int64(Criteria.TimeRange.Specific)}
  656. } else {
  657. table.TimeRange = &otsprotocol.TimeRange{StartTime: proto.Int64(Criteria.TimeRange.Start), EndTime: proto.Int64(Criteria.TimeRange.End)}
  658. }
  659. } else if Criteria.MaxVersion == 0 {
  660. return nil, errInvalidInput
  661. }
  662. for _, pk := range Criteria.PrimaryKey {
  663. pkWithBytes := pk.Build(false)
  664. table.PrimaryKey = append(table.PrimaryKey, pkWithBytes)
  665. }
  666. tablesInBatch = append(tablesInBatch, table)
  667. }
  668. req.Tables = tablesInBatch
  669. resp := new(otsprotocol.BatchGetRowResponse)
  670. response := &BatchGetRowResponse{TableToRowsResult: make(map[string][]RowResult)}
  671. if err := tableStoreClient.doRequestWithRetry(batchGetRowUri, req, resp, &response.ResponseInfo); err != nil {
  672. return nil, err
  673. }
  674. for _, table := range resp.Tables {
  675. index := int32(0)
  676. for _, row := range table.Rows {
  677. rowResult := &RowResult{TableName: *table.TableName, IsSucceed: *row.IsOk, ConsumedCapacityUnit: &ConsumedCapacityUnit{}, Index: index}
  678. index++
  679. if *row.IsOk == false {
  680. rowResult.Error = Error{Code: *row.Error.Code, Message: *row.Error.Message}
  681. } else {
  682. // len == 0 means row not exist
  683. if len(row.Row) > 0 {
  684. rows, err := readRowsWithHeader(bytes.NewReader(row.Row))
  685. if err != nil {
  686. return nil, err
  687. }
  688. for _, pk := range rows[0].primaryKey {
  689. pkColumn := &PrimaryKeyColumn{ColumnName: string(pk.cellName), Value: pk.cellValue.Value}
  690. rowResult.PrimaryKey.PrimaryKeys = append(rowResult.PrimaryKey.PrimaryKeys, pkColumn)
  691. }
  692. for _, cell := range rows[0].cells {
  693. dataColumn := &AttributeColumn{ColumnName: string(cell.cellName), Value: cell.cellValue.Value, Timestamp: cell.cellTimestamp}
  694. rowResult.Columns = append(rowResult.Columns, dataColumn)
  695. }
  696. }
  697. rowResult.ConsumedCapacityUnit.Read = *row.Consumed.CapacityUnit.Read
  698. rowResult.ConsumedCapacityUnit.Write = *row.Consumed.CapacityUnit.Write
  699. }
  700. response.TableToRowsResult[*table.TableName] = append(response.TableToRowsResult[*table.TableName], *rowResult)
  701. }
  702. }
  703. return response, nil
  704. }
  705. // Batch Write Row
  706. // @param BatchWriteRowRequest
  707. func (tableStoreClient *TableStoreClient) BatchWriteRow(request *BatchWriteRowRequest) (*BatchWriteRowResponse, error) {
  708. req := new(otsprotocol.BatchWriteRowRequest)
  709. var tablesInBatch []*otsprotocol.TableInBatchWriteRowRequest
  710. for key, value := range request.RowChangesGroupByTable {
  711. table := new(otsprotocol.TableInBatchWriteRowRequest)
  712. table.TableName = proto.String(key)
  713. for _, row := range value {
  714. rowInBatch := &otsprotocol.RowInBatchWriteRowRequest{}
  715. rowInBatch.Condition = row.getCondition()
  716. rowInBatch.RowChange = row.Serialize()
  717. rowInBatch.Type = row.getOperationType().Enum()
  718. table.Rows = append(table.Rows, rowInBatch)
  719. }
  720. tablesInBatch = append(tablesInBatch, table)
  721. }
  722. req.Tables = tablesInBatch
  723. resp := new(otsprotocol.BatchWriteRowResponse)
  724. response := &BatchWriteRowResponse{TableToRowsResult: make(map[string][]RowResult)}
  725. if err := tableStoreClient.doRequestWithRetry(batchWriteRowUri, req, resp, &response.ResponseInfo); err != nil {
  726. return nil, err
  727. }
  728. for _, table := range resp.Tables {
  729. index := int32(0)
  730. for _, row := range table.Rows {
  731. rowResult := &RowResult{TableName: *table.TableName, IsSucceed: *row.IsOk, ConsumedCapacityUnit: &ConsumedCapacityUnit{}, Index: index}
  732. index++
  733. if *row.IsOk == false {
  734. rowResult.Error = Error{Code: *row.Error.Code, Message: *row.Error.Message}
  735. } else {
  736. rowResult.ConsumedCapacityUnit.Read = *row.Consumed.CapacityUnit.Read
  737. rowResult.ConsumedCapacityUnit.Write = *row.Consumed.CapacityUnit.Write
  738. } /*else {
  739. rows, err := readRowsWithHeader(bytes.NewReader(row.Row))
  740. if err != nil {
  741. return nil, err
  742. }
  743. for _, pk := range (rows[0].primaryKey) {
  744. pkColumn := &PrimaryKeyColumn{ColumnName: string(pk.cellName), Value: pk.cellValue.Value}
  745. rowResult.PrimaryKey.PrimaryKeys = append(rowResult.PrimaryKey.PrimaryKeys, pkColumn)
  746. }
  747. for _, cell := range (rows[0].cells) {
  748. dataColumn := &DataColumn{ColumnName: string(cell.cellName), Value: cell.cellValue.Value}
  749. rowResult.Columns = append(rowResult.Columns, dataColumn)
  750. }
  751. rowResult.ConsumedCapacityUnit.Read = *row.Consumed.CapacityUnit.Read
  752. rowResult.ConsumedCapacityUnit.Write = *row.Consumed.CapacityUnit.Write
  753. }*/
  754. response.TableToRowsResult[*table.TableName] = append(response.TableToRowsResult[*table.TableName], *rowResult)
  755. }
  756. }
  757. return response, nil
  758. }
  759. // Get Range
  760. // @param GetRangeRequest
  761. func (tableStoreClient *TableStoreClient) GetRange(request *GetRangeRequest) (*GetRangeResponse, error) {
  762. req := new(otsprotocol.GetRangeRequest)
  763. req.TableName = proto.String(request.RangeRowQueryCriteria.TableName)
  764. req.Direction = request.RangeRowQueryCriteria.Direction.ToDirection().Enum()
  765. if request.RangeRowQueryCriteria.MaxVersion != 0 {
  766. req.MaxVersions = proto.Int32(request.RangeRowQueryCriteria.MaxVersion)
  767. }
  768. if request.RangeRowQueryCriteria.TransactionId != nil {
  769. req.TransactionId = request.RangeRowQueryCriteria.TransactionId
  770. }
  771. if request.RangeRowQueryCriteria.TimeRange != nil {
  772. if request.RangeRowQueryCriteria.TimeRange.Specific != 0 {
  773. req.TimeRange = &otsprotocol.TimeRange{SpecificTime: proto.Int64(request.RangeRowQueryCriteria.TimeRange.Specific)}
  774. } else {
  775. req.TimeRange = &otsprotocol.TimeRange{StartTime: proto.Int64(request.RangeRowQueryCriteria.TimeRange.Start), EndTime: proto.Int64(request.RangeRowQueryCriteria.TimeRange.End)}
  776. }
  777. } else if request.RangeRowQueryCriteria.MaxVersion == 0 {
  778. return nil, errInvalidInput
  779. }
  780. if request.RangeRowQueryCriteria.Limit != 0 {
  781. req.Limit = proto.Int32(request.RangeRowQueryCriteria.Limit)
  782. }
  783. if (request.RangeRowQueryCriteria.ColumnsToGet != nil) && len(request.RangeRowQueryCriteria.ColumnsToGet) > 0 {
  784. req.ColumnsToGet = request.RangeRowQueryCriteria.ColumnsToGet
  785. }
  786. if request.RangeRowQueryCriteria.Filter != nil {
  787. req.Filter = request.RangeRowQueryCriteria.Filter.Serialize()
  788. }
  789. if request.RangeRowQueryCriteria.StartColumn != nil {
  790. req.StartColumn = request.RangeRowQueryCriteria.StartColumn
  791. }
  792. if request.RangeRowQueryCriteria.EndColumn != nil {
  793. req.EndColumn = request.RangeRowQueryCriteria.EndColumn
  794. }
  795. req.InclusiveStartPrimaryKey = request.RangeRowQueryCriteria.StartPrimaryKey.Build(false)
  796. req.ExclusiveEndPrimaryKey = request.RangeRowQueryCriteria.EndPrimaryKey.Build(false)
  797. resp := new(otsprotocol.GetRangeResponse)
  798. response := &GetRangeResponse{ConsumedCapacityUnit: &ConsumedCapacityUnit{}}
  799. if err := tableStoreClient.doRequestWithRetry(getRangeUri, req, resp, &response.ResponseInfo); err != nil {
  800. return nil, err
  801. }
  802. response.ConsumedCapacityUnit.Read = *resp.Consumed.CapacityUnit.Read
  803. response.ConsumedCapacityUnit.Write = *resp.Consumed.CapacityUnit.Write
  804. if len(resp.NextStartPrimaryKey) != 0 {
  805. currentRows, err := readRowsWithHeader(bytes.NewReader(resp.NextStartPrimaryKey))
  806. if err != nil {
  807. return nil, err
  808. }
  809. response.NextStartPrimaryKey = &PrimaryKey{}
  810. for _, pk := range currentRows[0].primaryKey {
  811. pkColumn := &PrimaryKeyColumn{ColumnName: string(pk.cellName), Value: pk.cellValue.Value}
  812. response.NextStartPrimaryKey.PrimaryKeys = append(response.NextStartPrimaryKey.PrimaryKeys, pkColumn)
  813. }
  814. }
  815. if len(resp.Rows) == 0 {
  816. return response, nil
  817. }
  818. rows, err := readRowsWithHeader(bytes.NewReader(resp.Rows))
  819. if err != nil {
  820. return response, err
  821. }
  822. for _, row := range rows {
  823. currentRow := &Row{}
  824. currentpk := new(PrimaryKey)
  825. for _, pk := range row.primaryKey {
  826. pkColumn := &PrimaryKeyColumn{ColumnName: string(pk.cellName), Value: pk.cellValue.Value}
  827. currentpk.PrimaryKeys = append(currentpk.PrimaryKeys, pkColumn)
  828. }
  829. currentRow.PrimaryKey = currentpk
  830. for _, cell := range row.cells {
  831. dataColumn := &AttributeColumn{ColumnName: string(cell.cellName), Value: cell.cellValue.Value, Timestamp: cell.cellTimestamp}
  832. currentRow.Columns = append(currentRow.Columns, dataColumn)
  833. }
  834. response.Rows = append(response.Rows, currentRow)
  835. }
  836. return response, nil
  837. }
  838. func (client *TableStoreClient) ListStream(req *ListStreamRequest) (*ListStreamResponse, error) {
  839. pbReq := &otsprotocol.ListStreamRequest{}
  840. pbReq.TableName = req.TableName
  841. pbResp := otsprotocol.ListStreamResponse{}
  842. resp := ListStreamResponse{}
  843. if err := client.doRequestWithRetry(listStreamUri, pbReq, &pbResp, &resp.ResponseInfo); err != nil {
  844. return nil, err
  845. }
  846. streams := make([]Stream, len(pbResp.Streams))
  847. for i, pbStream := range pbResp.Streams {
  848. streams[i] = Stream{
  849. Id: (*StreamId)(pbStream.StreamId),
  850. TableName: pbStream.TableName,
  851. CreationTime: *pbStream.CreationTime}
  852. }
  853. resp.Streams = streams[:]
  854. return &resp, nil
  855. }
  856. func (client *TableStoreClient) DescribeStream(req *DescribeStreamRequest) (*DescribeStreamResponse, error) {
  857. pbReq := &otsprotocol.DescribeStreamRequest{}
  858. {
  859. pbReq.StreamId = (*string)(req.StreamId)
  860. pbReq.InclusiveStartShardId = (*string)(req.InclusiveStartShardId)
  861. pbReq.ShardLimit = req.ShardLimit
  862. }
  863. pbResp := otsprotocol.DescribeStreamResponse{}
  864. resp := DescribeStreamResponse{}
  865. if err := client.doRequestWithRetry(describeStreamUri, pbReq, &pbResp, &resp.ResponseInfo); err != nil {
  866. return nil, err
  867. }
  868. resp.StreamId = (*StreamId)(pbResp.StreamId)
  869. resp.ExpirationTime = *pbResp.ExpirationTime
  870. resp.TableName = pbResp.TableName
  871. resp.CreationTime = *pbResp.CreationTime
  872. Assert(pbResp.StreamStatus != nil, "StreamStatus in DescribeStreamResponse is required.")
  873. switch *pbResp.StreamStatus {
  874. case otsprotocol.StreamStatus_STREAM_ENABLING:
  875. resp.Status = SS_Enabling
  876. case otsprotocol.StreamStatus_STREAM_ACTIVE:
  877. resp.Status = SS_Active
  878. }
  879. resp.NextShardId = (*ShardId)(pbResp.NextShardId)
  880. shards := make([]*StreamShard, len(pbResp.Shards))
  881. for i, pbShard := range pbResp.Shards {
  882. shards[i] = &StreamShard{
  883. SelfShard: (*ShardId)(pbShard.ShardId),
  884. FatherShard: (*ShardId)(pbShard.ParentId),
  885. MotherShard: (*ShardId)(pbShard.ParentSiblingId)}
  886. }
  887. resp.Shards = shards[:]
  888. return &resp, nil
  889. }
  890. func (client *TableStoreClient) GetShardIterator(req *GetShardIteratorRequest) (*GetShardIteratorResponse, error) {
  891. pbReq := &otsprotocol.GetShardIteratorRequest{
  892. StreamId: (*string)(req.StreamId),
  893. ShardId: (*string)(req.ShardId)}
  894. if req.Timestamp != nil {
  895. pbReq.Timestamp = req.Timestamp
  896. }
  897. if req.Token != nil {
  898. pbReq.Token = req.Token
  899. }
  900. pbResp := otsprotocol.GetShardIteratorResponse{}
  901. resp := GetShardIteratorResponse{}
  902. if err := client.doRequestWithRetry(getShardIteratorUri, pbReq, &pbResp, &resp.ResponseInfo); err != nil {
  903. return nil, err
  904. }
  905. resp.ShardIterator = (*ShardIterator)(pbResp.ShardIterator)
  906. resp.Token = pbResp.NextToken
  907. return &resp, nil
  908. }
  909. func (client TableStoreClient) GetStreamRecord(req *GetStreamRecordRequest) (*GetStreamRecordResponse, error) {
  910. pbReq := &otsprotocol.GetStreamRecordRequest{
  911. ShardIterator: (*string)(req.ShardIterator)}
  912. if req.Limit != nil {
  913. pbReq.Limit = req.Limit
  914. }
  915. pbResp := otsprotocol.GetStreamRecordResponse{}
  916. resp := GetStreamRecordResponse{}
  917. if err := client.doRequestWithRetry(getStreamRecordUri, pbReq, &pbResp, &resp.ResponseInfo); err != nil {
  918. return nil, err
  919. }
  920. if pbResp.NextShardIterator != nil {
  921. resp.NextShardIterator = (*ShardIterator)(pbResp.NextShardIterator)
  922. }
  923. records := make([]*StreamRecord, len(pbResp.StreamRecords))
  924. for i, pbRecord := range pbResp.StreamRecords {
  925. record := StreamRecord{}
  926. records[i] = &record
  927. switch *pbRecord.ActionType {
  928. case otsprotocol.ActionType_PUT_ROW:
  929. record.Type = AT_Put
  930. case otsprotocol.ActionType_UPDATE_ROW:
  931. record.Type = AT_Update
  932. case otsprotocol.ActionType_DELETE_ROW:
  933. record.Type = AT_Delete
  934. }
  935. plainRows, err := readRowsWithHeader(bytes.NewReader(pbRecord.Record))
  936. if err != nil {
  937. return nil, err
  938. }
  939. Assert(len(plainRows) == 1,
  940. "There must be exactly one row in a StreamRecord.")
  941. plainRow := plainRows[0]
  942. pkey := PrimaryKey{}
  943. record.PrimaryKey = &pkey
  944. pkey.PrimaryKeys = make([]*PrimaryKeyColumn, len(plainRow.primaryKey))
  945. for i, pk := range plainRow.primaryKey {
  946. pkc := PrimaryKeyColumn{
  947. ColumnName: string(pk.cellName),
  948. Value: pk.cellValue.Value}
  949. pkey.PrimaryKeys[i] = &pkc
  950. }
  951. Assert(plainRow.extension != nil,
  952. "extension in a stream record is required.")
  953. record.Info = plainRow.extension
  954. record.Columns = make([]*RecordColumn, len(plainRow.cells))
  955. for i, plainCell := range plainRow.cells {
  956. cell := RecordColumn{}
  957. record.Columns[i] = &cell
  958. name := string(plainCell.cellName)
  959. cell.Name = &name
  960. if plainCell.cellValue != nil {
  961. cell.Type = RCT_Put
  962. } else {
  963. if plainCell.cellTimestamp > 0 {
  964. cell.Type = RCT_DeleteOneVersion
  965. } else {
  966. cell.Type = RCT_DeleteAllVersions
  967. }
  968. }
  969. switch cell.Type {
  970. case RCT_Put:
  971. cell.Value = plainCell.cellValue.Value
  972. fallthrough
  973. case RCT_DeleteOneVersion:
  974. cell.Timestamp = &plainCell.cellTimestamp
  975. case RCT_DeleteAllVersions:
  976. break
  977. }
  978. }
  979. }
  980. resp.Records = records
  981. return &resp, nil
  982. }
  983. func (client TableStoreClient) ComputeSplitPointsBySize(req *ComputeSplitPointsBySizeRequest) (*ComputeSplitPointsBySizeResponse, error) {
  984. pbReq := &otsprotocol.ComputeSplitPointsBySizeRequest{
  985. TableName: &(req.TableName),
  986. SplitSize: &(req.SplitSize),
  987. }
  988. pbResp := otsprotocol.ComputeSplitPointsBySizeResponse{}
  989. resp := ComputeSplitPointsBySizeResponse{}
  990. if err := client.doRequestWithRetry(computeSplitPointsBySizeRequestUri, pbReq, &pbResp, &resp.ResponseInfo); err != nil {
  991. return nil, err
  992. }
  993. beginPk := &PrimaryKey{}
  994. endPk := &PrimaryKey{}
  995. for _, pkSchema := range pbResp.Schema {
  996. beginPk.AddPrimaryKeyColumnWithMinValue(*pkSchema.Name)
  997. endPk.AddPrimaryKeyColumnWithMaxValue(*pkSchema.Name)
  998. }
  999. lastPk := beginPk
  1000. nowPk := endPk
  1001. for _, pbRecord := range pbResp.SplitPoints {
  1002. plainRows, err := readRowsWithHeader(bytes.NewReader(pbRecord))
  1003. if err != nil {
  1004. return nil, err
  1005. }
  1006. nowPk = &PrimaryKey{}
  1007. for _, pk := range plainRows[0].primaryKey {
  1008. nowPk.AddPrimaryKeyColumn(string(pk.cellName), pk.cellValue.Value)
  1009. }
  1010. if len(pbResp.Schema) > 1 {
  1011. for i := 1; i < len(pbResp.Schema); i++ {
  1012. nowPk.AddPrimaryKeyColumnWithMinValue(*pbResp.Schema[i].Name)
  1013. }
  1014. }
  1015. newSplit := &Split{LowerBound: lastPk, UpperBound: nowPk}
  1016. resp.Splits = append(resp.Splits, newSplit)
  1017. lastPk = nowPk
  1018. }
  1019. newSplit := &Split{LowerBound: lastPk, UpperBound: endPk}
  1020. resp.Splits = append(resp.Splits, newSplit)
  1021. index := 0
  1022. for _, pbLocation := range pbResp.Locations {
  1023. count := *pbLocation.Repeat
  1024. value := *pbLocation.Location
  1025. for i := int64(0); i < count; i++ {
  1026. resp.Splits[index].Location = value
  1027. index++
  1028. }
  1029. }
  1030. return &resp, nil
  1031. }
  1032. func (client *TableStoreClient) StartLocalTransaction(request *StartLocalTransactionRequest) (*StartLocalTransactionResponse, error) {
  1033. req := new(otsprotocol.StartLocalTransactionRequest)
  1034. resp := new(otsprotocol.StartLocalTransactionResponse)
  1035. req.TableName = proto.String(request.TableName)
  1036. req.Key = request.PrimaryKey.Build(false)
  1037. response := &StartLocalTransactionResponse{}
  1038. if err := client.doRequestWithRetry(createlocaltransactionuri, req, resp, &response.ResponseInfo); err != nil {
  1039. return nil, err
  1040. }
  1041. response.TransactionId = resp.TransactionId
  1042. return response, nil
  1043. }
  1044. func (client *TableStoreClient) CommitTransaction(request *CommitTransactionRequest) (*CommitTransactionResponse, error) {
  1045. req := new(otsprotocol.CommitTransactionRequest)
  1046. resp := new(otsprotocol.CommitTransactionResponse)
  1047. req.TransactionId = request.TransactionId
  1048. response := &CommitTransactionResponse{}
  1049. if err := client.doRequestWithRetry(committransactionuri, req, resp, &response.ResponseInfo); err != nil {
  1050. return nil, err
  1051. }
  1052. return response, nil
  1053. }
  1054. func (client *TableStoreClient) AbortTransaction(request *AbortTransactionRequest) (*AbortTransactionResponse, error) {
  1055. req := new(otsprotocol.AbortTransactionRequest)
  1056. resp := new(otsprotocol.AbortTransactionResponse)
  1057. req.TransactionId = request.TransactionId
  1058. response := &AbortTransactionResponse{}
  1059. if err := client.doRequestWithRetry(aborttransactionuri, req, resp, &response.ResponseInfo); err != nil {
  1060. return nil, err
  1061. }
  1062. return response, nil
  1063. }