util.go 27 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984
  1. package tablestore
  2. import (
  3. "bytes"
  4. "fmt"
  5. "github.com/aliyun/aliyun-tablestore-go-sdk/tablestore/otsprotocol"
  6. "github.com/golang/protobuf/proto"
  7. "io"
  8. "io/ioutil"
  9. "math"
  10. "net/http"
  11. "reflect"
  12. "sort"
  13. )
  14. const (
  15. maxTableNameLength = 100
  16. maxPrimaryKeyLength = 255
  17. maxPrimaryKeyNum = 10
  18. maxMultiDeleteRows = 100
  19. )
  20. type ColumnType int32
  21. const (
  22. ColumnType_STRING ColumnType = 1
  23. ColumnType_INTEGER ColumnType = 2
  24. ColumnType_BOOLEAN ColumnType = 3
  25. ColumnType_DOUBLE ColumnType = 4
  26. ColumnType_BINARY ColumnType = 5
  27. )
  28. const (
  29. Version = "1.0"
  30. ApiVersion = "2015-12-31"
  31. xOtsDateFormat = "2006-01-02T15:04:05.123Z"
  32. xOtsInstanceName = "x-ots-instancename"
  33. xOtsRequestId = "x-ots-requestid"
  34. )
  35. type ColumnValue struct {
  36. Type ColumnType
  37. Value interface{}
  38. }
  39. func (cv *ColumnValue) writeCellValue(w io.Writer) {
  40. writeTag(w, TAG_CELL_VALUE)
  41. if cv == nil {
  42. writeRawLittleEndian32(w, 1)
  43. writeRawByte(w, VT_AUTO_INCREMENT)
  44. return
  45. }
  46. switch cv.Type {
  47. case ColumnType_STRING:
  48. v := cv.Value.(string)
  49. writeRawLittleEndian32(w, int32(LITTLE_ENDIAN_32_SIZE+1+len(v))) // length + type + value
  50. writeRawByte(w, VT_STRING)
  51. writeRawLittleEndian32(w, int32(len(v)))
  52. writeBytes(w, []byte(v))
  53. case ColumnType_INTEGER:
  54. v := cv.Value.(int64)
  55. writeRawLittleEndian32(w, int32(LITTLE_ENDIAN_64_SIZE+1))
  56. writeRawByte(w, VT_INTEGER)
  57. writeRawLittleEndian64(w, v)
  58. case ColumnType_BOOLEAN:
  59. v := cv.Value.(bool)
  60. writeRawLittleEndian32(w, 2)
  61. writeRawByte(w, VT_BOOLEAN)
  62. writeBoolean(w, v)
  63. case ColumnType_DOUBLE:
  64. v := cv.Value.(float64)
  65. writeRawLittleEndian32(w, LITTLE_ENDIAN_64_SIZE+1)
  66. writeRawByte(w, VT_DOUBLE)
  67. writeDouble(w, v)
  68. case ColumnType_BINARY:
  69. v := cv.Value.([]byte)
  70. writeRawLittleEndian32(w, int32(LITTLE_ENDIAN_32_SIZE+1+len(v))) // length + type + value
  71. writeRawByte(w, VT_BLOB)
  72. writeRawLittleEndian32(w, int32(len(v)))
  73. writeBytes(w, v)
  74. }
  75. }
  76. func (cv *ColumnValue) writeCellValueWithoutLengthPrefix() []byte {
  77. var b bytes.Buffer
  78. w := &b
  79. switch cv.Type {
  80. case ColumnType_STRING:
  81. v := cv.Value.(string)
  82. writeRawByte(w, VT_STRING)
  83. writeRawLittleEndian32(w, int32(len(v)))
  84. writeBytes(w, []byte(v))
  85. case ColumnType_INTEGER:
  86. v := cv.Value.(int64)
  87. writeRawByte(w, VT_INTEGER)
  88. writeRawLittleEndian64(w, v)
  89. case ColumnType_BOOLEAN:
  90. v := cv.Value.(bool)
  91. writeRawByte(w, VT_BOOLEAN)
  92. writeBoolean(w, v)
  93. case ColumnType_DOUBLE:
  94. v := cv.Value.(float64)
  95. writeRawByte(w, VT_DOUBLE)
  96. writeDouble(w, v)
  97. case ColumnType_BINARY:
  98. v := cv.Value.([]byte)
  99. writeRawByte(w, VT_BLOB)
  100. writeRawLittleEndian32(w, int32(len(v)))
  101. writeBytes(w, v)
  102. }
  103. return b.Bytes()
  104. }
  105. func (cv *ColumnValue) getCheckSum(crc byte) byte {
  106. if cv == nil {
  107. return crc8Byte(crc, VT_AUTO_INCREMENT)
  108. }
  109. switch cv.Type {
  110. case ColumnType_STRING:
  111. v := cv.Value.(string)
  112. crc = crc8Byte(crc, VT_STRING)
  113. crc = crc8Int32(crc, int32(len(v)))
  114. crc = crc8Bytes(crc, []byte(v))
  115. case ColumnType_INTEGER:
  116. v := cv.Value.(int64)
  117. crc = crc8Byte(crc, VT_INTEGER)
  118. crc = crc8Int64(crc, v)
  119. case ColumnType_BOOLEAN:
  120. v := cv.Value.(bool)
  121. crc = crc8Byte(crc, VT_BOOLEAN)
  122. if v {
  123. crc = crc8Byte(crc, 0x1)
  124. } else {
  125. crc = crc8Byte(crc, 0x0)
  126. }
  127. case ColumnType_DOUBLE:
  128. v := cv.Value.(float64)
  129. crc = crc8Byte(crc, VT_DOUBLE)
  130. crc = crc8Int64(crc, int64(math.Float64bits(v)))
  131. case ColumnType_BINARY:
  132. v := cv.Value.([]byte)
  133. crc = crc8Byte(crc, VT_BLOB)
  134. crc = crc8Int32(crc, int32(len(v)))
  135. crc = crc8Bytes(crc, v)
  136. }
  137. return crc
  138. }
  139. type Column struct {
  140. Name []byte
  141. Value ColumnValue
  142. Type byte
  143. Timestamp int64
  144. HasType bool
  145. HasTimestamp bool
  146. IgnoreValue bool
  147. }
  148. func NewColumn(name []byte, value interface{}) *Column {
  149. v := &Column{}
  150. v.Name = name
  151. if value != nil {
  152. t := reflect.TypeOf(value)
  153. switch t.Kind() {
  154. case reflect.String:
  155. v.Value.Type = ColumnType_STRING
  156. case reflect.Int64:
  157. v.Value.Type = ColumnType_INTEGER
  158. case reflect.Bool:
  159. v.Value.Type = ColumnType_BOOLEAN
  160. case reflect.Float64:
  161. v.Value.Type = ColumnType_DOUBLE
  162. case reflect.Slice:
  163. v.Value.Type = ColumnType_BINARY
  164. default:
  165. panic(errInvalidInput)
  166. }
  167. v.Value.Value = value
  168. }
  169. return v
  170. }
  171. func (c *Column) toPlainBufferCell(ignoreValue bool) *PlainBufferCell {
  172. cell := &PlainBufferCell{}
  173. cell.cellName = c.Name
  174. cell.ignoreValue = ignoreValue
  175. if ignoreValue == false {
  176. cell.cellValue = &c.Value
  177. }
  178. if c.HasType {
  179. cell.hasCellType = c.HasType
  180. cell.cellType = byte(c.Type)
  181. }
  182. if c.HasTimestamp {
  183. cell.hasCellTimestamp = c.HasTimestamp
  184. cell.cellTimestamp = c.Timestamp
  185. }
  186. return cell
  187. }
  188. type PrimaryKeyColumnInner struct {
  189. Name []byte
  190. Type otsprotocol.PrimaryKeyType
  191. Value interface{}
  192. }
  193. func NewPrimaryKeyColumnINF_MAX(name []byte) *PrimaryKeyColumnInner {
  194. v := &PrimaryKeyColumnInner{}
  195. v.Name = name
  196. v.Type = 0
  197. v.Value = "INF_MAX"
  198. return v
  199. }
  200. func NewPrimaryKeyColumnINF_MIN(name []byte) *PrimaryKeyColumnInner {
  201. v := &PrimaryKeyColumnInner{}
  202. v.Name = name
  203. v.Type = 0
  204. v.Value = "INF_MIN"
  205. return v
  206. }
  207. func NewPrimaryKeyColumnAuto_Increment(name []byte) *PrimaryKeyColumnInner {
  208. v := &PrimaryKeyColumnInner{}
  209. v.Name = name
  210. v.Type = 0
  211. v.Value = "AUTO_INCRMENT"
  212. return v
  213. }
  214. func NewPrimaryKeyColumn(name []byte, value interface{}, option PrimaryKeyOption) *PrimaryKeyColumnInner {
  215. if option == NONE {
  216. v := &PrimaryKeyColumnInner{}
  217. v.Name = name
  218. t := reflect.TypeOf(value)
  219. switch t.Kind() {
  220. case reflect.String:
  221. v.Type = otsprotocol.PrimaryKeyType_STRING
  222. case reflect.Int64:
  223. v.Type = otsprotocol.PrimaryKeyType_INTEGER
  224. case reflect.Slice:
  225. v.Type = otsprotocol.PrimaryKeyType_BINARY
  226. default:
  227. panic(errInvalidInput)
  228. }
  229. v.Value = value
  230. return v
  231. } else if option == AUTO_INCREMENT {
  232. return NewPrimaryKeyColumnAuto_Increment(name)
  233. } else if option == MIN {
  234. return NewPrimaryKeyColumnINF_MIN(name)
  235. } else {
  236. return NewPrimaryKeyColumnINF_MAX(name)
  237. }
  238. }
  239. func (pkc *PrimaryKeyColumnInner) toColumnValue() *ColumnValue {
  240. switch pkc.Type {
  241. case otsprotocol.PrimaryKeyType_INTEGER:
  242. return &ColumnValue{ColumnType_INTEGER, pkc.Value}
  243. case otsprotocol.PrimaryKeyType_STRING:
  244. return &ColumnValue{ColumnType_STRING, pkc.Value}
  245. case otsprotocol.PrimaryKeyType_BINARY:
  246. return &ColumnValue{ColumnType_BINARY, pkc.Value}
  247. }
  248. return nil
  249. }
  250. func (pkc *PrimaryKeyColumnInner) toPlainBufferCell() *PlainBufferCell {
  251. cell := &PlainBufferCell{}
  252. cell.cellName = pkc.Name
  253. cell.cellValue = pkc.toColumnValue()
  254. return cell
  255. }
  256. func (pkc *PrimaryKeyColumnInner) isInfMin() bool {
  257. if pkc.Type == 0 && pkc.Value.(string) == "INF_MIN" {
  258. return true
  259. }
  260. return false
  261. }
  262. func (pkc *PrimaryKeyColumnInner) isInfMax() bool {
  263. if pkc.Type == 0 && pkc.Value.(string) == "INF_MAX" {
  264. return true
  265. }
  266. return false
  267. }
  268. func (pkc *PrimaryKeyColumnInner) isAutoInc() bool {
  269. if pkc.Type == 0 && pkc.Value.(string) == "AUTO_INCRMENT" {
  270. return true
  271. }
  272. return false
  273. }
  274. func (pkc *PrimaryKeyColumnInner) getCheckSum(crc byte) byte {
  275. if pkc.isInfMin() {
  276. return crc8Byte(crc, VT_INF_MIN)
  277. }
  278. if pkc.isInfMax() {
  279. return crc8Byte(crc, VT_INF_MAX)
  280. }
  281. if pkc.isAutoInc() {
  282. return crc8Byte(crc, VT_AUTO_INCREMENT)
  283. }
  284. return pkc.toColumnValue().getCheckSum(crc)
  285. }
  286. func (pkc *PrimaryKeyColumnInner) writePrimaryKeyColumn(w io.Writer) {
  287. writeTag(w, TAG_CELL)
  288. writeCellName(w, []byte(pkc.Name))
  289. if pkc.isInfMin() {
  290. writeTag(w, TAG_CELL_VALUE)
  291. writeRawLittleEndian32(w, 1)
  292. writeRawByte(w, VT_INF_MIN)
  293. return
  294. }
  295. if pkc.isInfMax() {
  296. writeTag(w, TAG_CELL_VALUE)
  297. writeRawLittleEndian32(w, 1)
  298. writeRawByte(w, VT_INF_MAX)
  299. return
  300. }
  301. if pkc.isAutoInc() {
  302. writeTag(w, TAG_CELL_VALUE)
  303. writeRawLittleEndian32(w, 1)
  304. writeRawByte(w, VT_AUTO_INCREMENT)
  305. return
  306. }
  307. pkc.toColumnValue().writeCellValue(w)
  308. }
  309. type PrimaryKey2 struct {
  310. primaryKey []*PrimaryKeyColumnInner
  311. }
  312. func (pk *PrimaryKey) Build(isDelete bool) []byte {
  313. var b bytes.Buffer
  314. writeHeader(&b)
  315. writeTag(&b, TAG_ROW_PK)
  316. rowChecksum := byte(0x0)
  317. var cellChecksum byte
  318. for _, column := range pk.PrimaryKeys {
  319. primaryKeyColumn := NewPrimaryKeyColumn([]byte(column.ColumnName), column.Value, column.PrimaryKeyOption)
  320. cellChecksum = crc8Bytes(byte(0x0), []byte(primaryKeyColumn.Name))
  321. cellChecksum = primaryKeyColumn.getCheckSum(cellChecksum)
  322. rowChecksum = crc8Byte(rowChecksum, cellChecksum)
  323. primaryKeyColumn.writePrimaryKeyColumn(&b)
  324. writeTag(&b, TAG_CELL_CHECKSUM)
  325. writeRawByte(&b, cellChecksum)
  326. }
  327. // 没有deleteMarker, 要与0x0做crc.
  328. if isDelete {
  329. writeTag(&b, TAG_DELETE_ROW_MARKER)
  330. rowChecksum = crc8Byte(rowChecksum, byte(0x1))
  331. } else {
  332. rowChecksum = crc8Byte(rowChecksum, byte(0x0))
  333. }
  334. writeTag(&b, TAG_ROW_CHECKSUM)
  335. writeRawByte(&b, rowChecksum)
  336. return b.Bytes()
  337. }
  338. type RowPutChange struct {
  339. primaryKey []*PrimaryKeyColumnInner
  340. columnsToPut []*Column
  341. }
  342. type RowUpdateChange struct {
  343. primaryKey []*PrimaryKeyColumnInner
  344. columnsToUpdate []*Column
  345. }
  346. func (rpc *RowPutChange) Build() []byte {
  347. pkCells := make([]*PlainBufferCell, len(rpc.primaryKey))
  348. for i, pkc := range rpc.primaryKey {
  349. pkCells[i] = pkc.toPlainBufferCell()
  350. }
  351. cells := make([]*PlainBufferCell, len(rpc.columnsToPut))
  352. for i, c := range rpc.columnsToPut {
  353. cells[i] = c.toPlainBufferCell(false)
  354. }
  355. row := &PlainBufferRow{
  356. primaryKey: pkCells,
  357. cells: cells}
  358. var b bytes.Buffer
  359. row.writeRowWithHeader(&b)
  360. return b.Bytes()
  361. }
  362. func (ruc *RowUpdateChange) Build() []byte {
  363. pkCells := make([]*PlainBufferCell, len(ruc.primaryKey))
  364. for i, pkc := range ruc.primaryKey {
  365. pkCells[i] = pkc.toPlainBufferCell()
  366. }
  367. cells := make([]*PlainBufferCell, len(ruc.columnsToUpdate))
  368. for i, c := range ruc.columnsToUpdate {
  369. cells[i] = c.toPlainBufferCell(c.IgnoreValue)
  370. }
  371. row := &PlainBufferRow{
  372. primaryKey: pkCells,
  373. cells: cells}
  374. var b bytes.Buffer
  375. row.writeRowWithHeader(&b)
  376. return b.Bytes()
  377. }
  378. const (
  379. MaxValue = "_get_range_max"
  380. MinValue = "_get_range_min"
  381. )
  382. func (comparatorType *ComparatorType) ConvertToPbComparatorType() otsprotocol.ComparatorType {
  383. switch *comparatorType {
  384. case CT_EQUAL:
  385. return otsprotocol.ComparatorType_CT_EQUAL
  386. case CT_NOT_EQUAL:
  387. return otsprotocol.ComparatorType_CT_NOT_EQUAL
  388. case CT_GREATER_THAN:
  389. return otsprotocol.ComparatorType_CT_GREATER_THAN
  390. case CT_GREATER_EQUAL:
  391. return otsprotocol.ComparatorType_CT_GREATER_EQUAL
  392. case CT_LESS_THAN:
  393. return otsprotocol.ComparatorType_CT_LESS_THAN
  394. default:
  395. return otsprotocol.ComparatorType_CT_LESS_EQUAL
  396. }
  397. }
  398. func (columnType DefinedColumnType) ConvertToPbDefinedColumnType() otsprotocol.DefinedColumnType {
  399. switch columnType {
  400. case DefinedColumn_INTEGER:
  401. return otsprotocol.DefinedColumnType_DCT_INTEGER
  402. case DefinedColumn_DOUBLE:
  403. return otsprotocol.DefinedColumnType_DCT_DOUBLE
  404. case DefinedColumn_BOOLEAN:
  405. return otsprotocol.DefinedColumnType_DCT_BOOLEAN
  406. case DefinedColumn_STRING:
  407. return otsprotocol.DefinedColumnType_DCT_STRING
  408. default:
  409. return otsprotocol.DefinedColumnType_DCT_BLOB
  410. }
  411. }
  412. func (loType *LogicalOperator) ConvertToPbLoType() otsprotocol.LogicalOperator {
  413. switch *loType {
  414. case LO_NOT:
  415. return otsprotocol.LogicalOperator_LO_NOT
  416. case LO_AND:
  417. return otsprotocol.LogicalOperator_LO_AND
  418. default:
  419. return otsprotocol.LogicalOperator_LO_OR
  420. }
  421. }
  422. func ConvertToPbCastType(variantType VariantType) *otsprotocol.VariantType {
  423. switch variantType {
  424. case Variant_INTEGER:
  425. return otsprotocol.VariantType_VT_INTEGER.Enum()
  426. case Variant_DOUBLE:
  427. return otsprotocol.VariantType_VT_DOUBLE.Enum()
  428. case Variant_STRING:
  429. return otsprotocol.VariantType_VT_STRING.Enum()
  430. default:
  431. panic("invalid VariantType")
  432. }
  433. }
  434. func NewValueTransferRule(regex string, vt VariantType) *ValueTransferRule{
  435. return &ValueTransferRule{Regex: regex, Cast_type: vt}
  436. }
  437. func NewSingleColumnValueRegexFilter(columnName string, comparator ComparatorType, rule *ValueTransferRule, value interface{}) *SingleColumnCondition {
  438. return &SingleColumnCondition{ColumnName: &columnName, Comparator: &comparator, ColumnValue: value, TransferRule: rule}
  439. }
  440. func NewSingleColumnValueFilter(condition *SingleColumnCondition) *otsprotocol.SingleColumnValueFilter {
  441. filter := new(otsprotocol.SingleColumnValueFilter)
  442. comparatorType := condition.Comparator.ConvertToPbComparatorType()
  443. filter.Comparator = &comparatorType
  444. filter.ColumnName = condition.ColumnName
  445. col := NewColumn([]byte(*condition.ColumnName), condition.ColumnValue)
  446. filter.ColumnValue = col.toPlainBufferCell(false).cellValue.writeCellValueWithoutLengthPrefix()
  447. filter.FilterIfMissing = proto.Bool(condition.FilterIfMissing)
  448. filter.LatestVersionOnly = proto.Bool(condition.LatestVersionOnly)
  449. if condition.TransferRule != nil {
  450. filter.ValueTransRule = &otsprotocol.ValueTransferRule{ Regex: proto.String(condition.TransferRule.Regex), CastType: ConvertToPbCastType(condition.TransferRule.Cast_type) }
  451. }
  452. return filter
  453. }
  454. func NewCompositeFilter(filters []ColumnFilter, lo LogicalOperator) *otsprotocol.CompositeColumnValueFilter {
  455. ccvfilter := new(otsprotocol.CompositeColumnValueFilter)
  456. combinator := lo.ConvertToPbLoType()
  457. ccvfilter.Combinator = &combinator
  458. for _, cf := range filters {
  459. filter := cf.ToFilter()
  460. ccvfilter.SubFilters = append(ccvfilter.SubFilters, filter)
  461. }
  462. return ccvfilter
  463. }
  464. func NewPaginationFilter(filter *PaginationFilter) *otsprotocol.ColumnPaginationFilter {
  465. pageFilter := new(otsprotocol.ColumnPaginationFilter)
  466. pageFilter.Offset = proto.Int32(filter.Offset)
  467. pageFilter.Limit = proto.Int32(filter.Limit)
  468. return pageFilter
  469. }
  470. func (otsClient *TableStoreClient) postReq(req *http.Request, url string) ([]byte, error, string) {
  471. resp, err := otsClient.httpClient.Do(req)
  472. if err != nil {
  473. return nil, err, ""
  474. }
  475. defer resp.Body.Close()
  476. reqId := getRequestId(resp)
  477. body, err := ioutil.ReadAll(resp.Body)
  478. if err != nil {
  479. return nil, err, reqId
  480. }
  481. if (resp.StatusCode >= 200 && resp.StatusCode < 300) == false {
  482. var retErr *OtsError
  483. perr := new(otsprotocol.Error)
  484. errUm := proto.Unmarshal(body, perr)
  485. if errUm != nil {
  486. retErr = rawHttpToOtsError(resp.StatusCode, body, reqId)
  487. } else {
  488. retErr = pbErrToOtsError(resp.StatusCode, perr, reqId)
  489. }
  490. return nil, retErr, reqId
  491. }
  492. return body, nil, reqId
  493. }
  494. func rawHttpToOtsError(code int, body []byte, reqId string) *OtsError {
  495. oerr := &OtsError{
  496. Message: string(body),
  497. RequestId: reqId,
  498. HttpStatusCode: code,
  499. }
  500. if code >= 500 && code < 600 {
  501. oerr.Code = SERVER_UNAVAILABLE
  502. } else {
  503. oerr.Code = OTS_CLIENT_UNKNOWN
  504. }
  505. return oerr
  506. }
  507. func pbErrToOtsError(statusCode int, pbErr *otsprotocol.Error, reqId string) *OtsError {
  508. return &OtsError{
  509. Code: pbErr.GetCode(),
  510. Message: pbErr.GetMessage(),
  511. RequestId: reqId,
  512. HttpStatusCode : statusCode,
  513. }
  514. }
  515. func getRequestId(response *http.Response) string {
  516. if response == nil || response.Header == nil {
  517. return ""
  518. }
  519. return response.Header.Get(xOtsRequestId)
  520. }
  521. func buildRowPutChange(primarykey *PrimaryKey, columns []AttributeColumn) *RowPutChange {
  522. row := new(RowPutChange)
  523. row.primaryKey = make([]*PrimaryKeyColumnInner, len(primarykey.PrimaryKeys))
  524. for i, p := range primarykey.PrimaryKeys {
  525. row.primaryKey[i] = NewPrimaryKeyColumn([]byte(p.ColumnName), p.Value, p.PrimaryKeyOption)
  526. }
  527. row.columnsToPut = make([]*Column, len(columns))
  528. for i, p := range columns {
  529. row.columnsToPut[i] = NewColumn([]byte(p.ColumnName), p.Value)
  530. if p.Timestamp != 0 {
  531. row.columnsToPut[i].HasTimestamp = true
  532. row.columnsToPut[i].Timestamp = p.Timestamp
  533. }
  534. }
  535. return row
  536. }
  537. func buildRowUpdateChange(primarykey *PrimaryKey, columns []ColumnToUpdate) *RowUpdateChange {
  538. row := new(RowUpdateChange)
  539. row.primaryKey = make([]*PrimaryKeyColumnInner, len(primarykey.PrimaryKeys))
  540. for i, p := range primarykey.PrimaryKeys {
  541. row.primaryKey[i] = NewPrimaryKeyColumn([]byte(p.ColumnName), p.Value, p.PrimaryKeyOption)
  542. }
  543. row.columnsToUpdate = make([]*Column, len(columns))
  544. for i, p := range columns {
  545. row.columnsToUpdate[i] = NewColumn([]byte(p.ColumnName), p.Value)
  546. row.columnsToUpdate[i].HasTimestamp = p.HasTimestamp
  547. row.columnsToUpdate[i].HasType = p.HasType
  548. row.columnsToUpdate[i].Type = p.Type
  549. row.columnsToUpdate[i].Timestamp = p.Timestamp
  550. row.columnsToUpdate[i].IgnoreValue = p.IgnoreValue
  551. }
  552. return row
  553. }
  554. func (condition *RowCondition) buildCondition() *otsprotocol.RowExistenceExpectation {
  555. switch condition.RowExistenceExpectation {
  556. case RowExistenceExpectation_IGNORE:
  557. return otsprotocol.RowExistenceExpectation_IGNORE.Enum()
  558. case RowExistenceExpectation_EXPECT_EXIST:
  559. return otsprotocol.RowExistenceExpectation_EXPECT_EXIST.Enum()
  560. case RowExistenceExpectation_EXPECT_NOT_EXIST:
  561. return otsprotocol.RowExistenceExpectation_EXPECT_NOT_EXIST.Enum()
  562. }
  563. panic(errInvalidInput)
  564. }
  565. // build primary key for create table, put row, delete row and update row
  566. // value only support int64,string,[]byte or you will get panic
  567. func buildPrimaryKey(primaryKeyName string, value interface{}) *PrimaryKeyColumn {
  568. // Todo: validate the input
  569. return &PrimaryKeyColumn{ColumnName: primaryKeyName, Value: value, PrimaryKeyOption: NONE}
  570. }
  571. // value only support int64,string,bool,float64,[]byte. other type will get panic
  572. func (rowchange *PutRowChange) AddColumn(columnName string, value interface{}) {
  573. // Todo: validate the input
  574. column := &AttributeColumn{ColumnName: columnName, Value: value}
  575. rowchange.Columns = append(rowchange.Columns, *column)
  576. }
  577. func (rowchange *PutRowChange) SetReturnPk() {
  578. rowchange.ReturnType = ReturnType(ReturnType_RT_PK)
  579. }
  580. func (rowchange *UpdateRowChange) SetReturnIncrementValue() {
  581. rowchange.ReturnType = ReturnType(ReturnType_RT_AFTER_MODIFY)
  582. }
  583. func (rowchange *UpdateRowChange) AppendIncrementColumnToReturn(name string) {
  584. rowchange.ColumnNamesToReturn = append(rowchange.ColumnNamesToReturn, name)
  585. }
  586. // value only support int64,string,bool,float64,[]byte. other type will get panic
  587. func (rowchange *PutRowChange) AddColumnWithTimestamp(columnName string, value interface{}, timestamp int64) {
  588. // Todo: validate the input
  589. column := &AttributeColumn{ColumnName: columnName, Value: value}
  590. column.Timestamp = timestamp
  591. rowchange.Columns = append(rowchange.Columns, *column)
  592. }
  593. func (pk *PrimaryKey) AddPrimaryKeyColumn(primaryKeyName string, value interface{}) {
  594. pk.PrimaryKeys = append(pk.PrimaryKeys, buildPrimaryKey(primaryKeyName, value))
  595. }
  596. func (pk *PrimaryKey) AddPrimaryKeyColumnWithAutoIncrement(primaryKeyName string) {
  597. pk.PrimaryKeys = append(pk.PrimaryKeys, &PrimaryKeyColumn{ColumnName: primaryKeyName, PrimaryKeyOption: AUTO_INCREMENT})
  598. }
  599. func (pk *PrimaryKey) AddPrimaryKeyColumnWithMinValue(primaryKeyName string) {
  600. pk.PrimaryKeys = append(pk.PrimaryKeys, &PrimaryKeyColumn{ColumnName: primaryKeyName, PrimaryKeyOption: MIN})
  601. }
  602. // Only used for range query
  603. func (pk *PrimaryKey) AddPrimaryKeyColumnWithMaxValue(primaryKeyName string) {
  604. pk.PrimaryKeys = append(pk.PrimaryKeys, &PrimaryKeyColumn{ColumnName: primaryKeyName, PrimaryKeyOption: MAX})
  605. }
  606. func (rowchange *PutRowChange) SetCondition(rowExistenceExpectation RowExistenceExpectation) {
  607. rowchange.Condition = &RowCondition{RowExistenceExpectation: rowExistenceExpectation}
  608. }
  609. func (rowchange *DeleteRowChange) SetCondition(rowExistenceExpectation RowExistenceExpectation) {
  610. rowchange.Condition = &RowCondition{RowExistenceExpectation: rowExistenceExpectation}
  611. }
  612. func (Criteria *SingleRowQueryCriteria) SetFilter(filter ColumnFilter) {
  613. Criteria.Filter = filter
  614. }
  615. func (Criteria *MultiRowQueryCriteria) SetFilter(filter ColumnFilter) {
  616. Criteria.Filter = filter
  617. }
  618. func NewSingleColumnCondition(columnName string, comparator ComparatorType, value interface{}) *SingleColumnCondition {
  619. return &SingleColumnCondition{ColumnName: &columnName, Comparator: &comparator, ColumnValue: value}
  620. }
  621. func NewCompositeColumnCondition(lo LogicalOperator) *CompositeColumnValueFilter {
  622. return &CompositeColumnValueFilter{Operator: lo}
  623. }
  624. func (rowchange *PutRowChange) SetColumnCondition(condition ColumnFilter) {
  625. rowchange.Condition.ColumnCondition = condition
  626. }
  627. func (rowchange *UpdateRowChange) SetCondition(rowExistenceExpectation RowExistenceExpectation) {
  628. rowchange.Condition = &RowCondition{RowExistenceExpectation: rowExistenceExpectation}
  629. }
  630. func (rowchange *UpdateRowChange) SetColumnCondition(condition ColumnFilter) {
  631. rowchange.Condition.ColumnCondition = condition
  632. }
  633. func (rowchange *DeleteRowChange) SetColumnCondition(condition ColumnFilter) {
  634. rowchange.Condition.ColumnCondition = condition
  635. }
  636. func (meta *TableMeta) AddPrimaryKeyColumn(name string, keyType PrimaryKeyType) {
  637. meta.SchemaEntry = append(meta.SchemaEntry, &PrimaryKeySchema{Name: &name, Type: &keyType})
  638. }
  639. func (meta *TableMeta) AddPrimaryKeyColumnOption(name string, keyType PrimaryKeyType, keyOption PrimaryKeyOption) {
  640. meta.SchemaEntry = append(meta.SchemaEntry, &PrimaryKeySchema{Name: &name, Type: &keyType, Option: &keyOption})
  641. }
  642. // value only support int64,string,bool,float64,[]byte. other type will get panic
  643. func (rowchange *UpdateRowChange) PutColumn(columnName string, value interface{}) {
  644. // Todo: validate the input
  645. column := &ColumnToUpdate{ColumnName: columnName, Value: value}
  646. rowchange.Columns = append(rowchange.Columns, *column)
  647. }
  648. func (rowchange *UpdateRowChange) DeleteColumn(columnName string) {
  649. // Todo: validate the input
  650. column := &ColumnToUpdate{ColumnName: columnName, Value: nil, Type: DELETE_ALL_VERSION, HasType: true, IgnoreValue: true}
  651. rowchange.Columns = append(rowchange.Columns, *column)
  652. }
  653. func (rowchange *UpdateRowChange) DeleteColumnWithTimestamp(columnName string, timestamp int64) {
  654. // Todo: validate the input
  655. column := &ColumnToUpdate{ColumnName: columnName, Value: nil, Type: DELETE_ONE_VERSION, HasType: true, HasTimestamp: true, Timestamp: timestamp, IgnoreValue: true}
  656. rowchange.Columns = append(rowchange.Columns, *column)
  657. }
  658. func (rowchange *UpdateRowChange) IncrementColumn(columnName string, value int64) {
  659. // Todo: validate the input
  660. column := &ColumnToUpdate{ColumnName: columnName, Value: value, Type: INCREMENT, HasType: true, IgnoreValue: false}
  661. rowchange.Columns = append(rowchange.Columns, *column)
  662. }
  663. func (rowchange *DeleteRowChange) Serialize() []byte {
  664. return rowchange.PrimaryKey.Build(true)
  665. }
  666. func (rowchange *PutRowChange) Serialize() []byte {
  667. row := buildRowPutChange(rowchange.PrimaryKey, rowchange.Columns)
  668. return row.Build()
  669. }
  670. func (rowchange *UpdateRowChange) Serialize() []byte {
  671. row := buildRowUpdateChange(rowchange.PrimaryKey, rowchange.Columns)
  672. return row.Build()
  673. }
  674. func (rowchange *DeleteRowChange) GetTableName() string {
  675. return rowchange.TableName
  676. }
  677. func (rowchange *PutRowChange) GetTableName() string {
  678. return rowchange.TableName
  679. }
  680. func (rowchange *UpdateRowChange) GetTableName() string {
  681. return rowchange.TableName
  682. }
  683. func (rowchange *DeleteRowChange) getOperationType() otsprotocol.OperationType {
  684. return otsprotocol.OperationType_DELETE
  685. }
  686. func (rowchange *PutRowChange) getOperationType() otsprotocol.OperationType {
  687. return otsprotocol.OperationType_PUT
  688. }
  689. func (rowchange *UpdateRowChange) getOperationType() otsprotocol.OperationType {
  690. return otsprotocol.OperationType_UPDATE
  691. }
  692. func (rowchange *DeleteRowChange) getCondition() *otsprotocol.Condition {
  693. condition := new(otsprotocol.Condition)
  694. condition.RowExistence = rowchange.Condition.buildCondition()
  695. if rowchange.Condition.ColumnCondition != nil {
  696. condition.ColumnCondition = rowchange.Condition.ColumnCondition.Serialize()
  697. }
  698. return condition
  699. }
  700. func (rowchange *UpdateRowChange) getCondition() *otsprotocol.Condition {
  701. condition := new(otsprotocol.Condition)
  702. condition.RowExistence = rowchange.Condition.buildCondition()
  703. if rowchange.Condition.ColumnCondition != nil {
  704. condition.ColumnCondition = rowchange.Condition.ColumnCondition.Serialize()
  705. }
  706. return condition
  707. }
  708. func (rowchange *PutRowChange) getCondition() *otsprotocol.Condition {
  709. condition := new(otsprotocol.Condition)
  710. condition.RowExistence = rowchange.Condition.buildCondition()
  711. if rowchange.Condition.ColumnCondition != nil {
  712. condition.ColumnCondition = rowchange.Condition.ColumnCondition.Serialize()
  713. }
  714. return condition
  715. }
  716. func (request *BatchWriteRowRequest) AddRowChange(change RowChange) {
  717. if request.RowChangesGroupByTable == nil {
  718. request.RowChangesGroupByTable = make(map[string][]RowChange)
  719. }
  720. request.RowChangesGroupByTable[change.GetTableName()] = append(request.RowChangesGroupByTable[change.GetTableName()], change)
  721. }
  722. func (direction Direction) ToDirection() otsprotocol.Direction {
  723. if direction == FORWARD {
  724. return otsprotocol.Direction_FORWARD
  725. } else {
  726. return otsprotocol.Direction_BACKWARD
  727. }
  728. }
  729. func (columnMap *ColumnMap) GetRange(start int, count int) ([]*AttributeColumn, error) {
  730. columns := []*AttributeColumn{}
  731. end := start + count
  732. if len(columnMap.columnsKey) < end {
  733. return nil, fmt.Errorf("invalid arugment")
  734. }
  735. for i := start; i < end; i++ {
  736. subColumns := columnMap.Columns[columnMap.columnsKey[i]]
  737. for _, column := range subColumns {
  738. columns = append(columns, column)
  739. }
  740. }
  741. return columns, nil
  742. }
  743. func (response *GetRowResponse) GetColumnMap() *ColumnMap {
  744. if response == nil {
  745. return nil
  746. }
  747. if response.columnMap != nil {
  748. return response.columnMap
  749. } else {
  750. response.columnMap = &ColumnMap{}
  751. response.columnMap.Columns = make(map[string][]*AttributeColumn)
  752. if len(response.Columns) == 0 {
  753. return response.columnMap
  754. } else {
  755. for _, column := range response.Columns {
  756. if _, ok := response.columnMap.Columns[column.ColumnName]; ok {
  757. response.columnMap.Columns[column.ColumnName] = append(response.columnMap.Columns[column.ColumnName], column)
  758. } else {
  759. response.columnMap.columnsKey = append(response.columnMap.columnsKey, column.ColumnName)
  760. value := []*AttributeColumn{}
  761. value = append(value, column)
  762. response.columnMap.Columns[column.ColumnName] = value
  763. }
  764. }
  765. sort.Strings(response.columnMap.columnsKey)
  766. return response.columnMap
  767. }
  768. }
  769. }
  770. func Assert(cond bool, msg string) {
  771. if !cond {
  772. panic(msg)
  773. }
  774. }
  775. func (meta *TableMeta) AddDefinedColumn(name string, definedType DefinedColumnType) {
  776. meta.DefinedColumns = append(meta.DefinedColumns, &DefinedColumnSchema{Name: name, ColumnType: definedType})
  777. }
  778. func (meta *IndexMeta) AddDefinedColumn(name string) {
  779. meta.DefinedColumns = append(meta.DefinedColumns, name)
  780. }
  781. func (meta *IndexMeta) AddPrimaryKeyColumn(name string) {
  782. meta.Primarykey = append(meta.Primarykey, name)
  783. }
  784. func (request *CreateTableRequest) AddIndexMeta(meta *IndexMeta) {
  785. request.IndexMetas = append(request.IndexMetas, meta)
  786. }
  787. func (meta *IndexMeta) ConvertToPbIndexMeta() *otsprotocol.IndexMeta {
  788. return &otsprotocol.IndexMeta {
  789. Name: &meta.IndexName,
  790. PrimaryKey: meta.Primarykey,
  791. DefinedColumn: meta.DefinedColumns,
  792. IndexUpdateMode: otsprotocol.IndexUpdateMode_IUM_ASYNC_INDEX.Enum(),
  793. IndexType: otsprotocol.IndexType_IT_GLOBAL_INDEX.Enum(),
  794. }
  795. }
  796. func ConvertPbIndexTypeToIndexType(indexType *otsprotocol.IndexType) IndexType {
  797. switch *indexType {
  798. case otsprotocol.IndexType_IT_GLOBAL_INDEX:
  799. return IT_GLOBAL_INDEX
  800. default:
  801. return IT_LOCAL_INDEX
  802. }
  803. }
  804. func ConvertPbIndexMetaToIndexMeta(meta *otsprotocol.IndexMeta) *IndexMeta {
  805. indexmeta := &IndexMeta {
  806. IndexName: *meta.Name,
  807. IndexType: ConvertPbIndexTypeToIndexType(meta.IndexType),
  808. }
  809. for _, pk := range meta.PrimaryKey {
  810. indexmeta.Primarykey = append(indexmeta.Primarykey, pk)
  811. }
  812. for _, col := range meta.DefinedColumn {
  813. indexmeta.DefinedColumns = append(indexmeta.DefinedColumns, col)
  814. }
  815. return indexmeta
  816. }