mutate.go 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383
  1. // Copyright (C) 2015 The GoHBase Authors. All rights reserved.
  2. // This file is part of GoHBase.
  3. // Use of this source code is governed by the Apache License 2.0
  4. // that can be found in the COPYING file.
  5. package hrpc
  6. import (
  7. "context"
  8. "encoding/binary"
  9. "errors"
  10. "time"
  11. "github.com/golang/protobuf/proto"
  12. "github.com/tsuna/gohbase/pb"
  13. )
  14. var attributeNameTTL = "_ttl"
  15. // DurabilityType is used to set durability for Durability option
  16. type DurabilityType int32
  17. const (
  18. // UseDefault is USER_DEFAULT
  19. UseDefault DurabilityType = iota
  20. // SkipWal is SKIP_WAL
  21. SkipWal
  22. // AsyncWal is ASYNC_WAL
  23. AsyncWal
  24. // SyncWal is SYNC_WAL
  25. SyncWal
  26. // FsyncWal is FSYNC_WAL
  27. FsyncWal
  28. )
  29. // Mutate represents a mutation on HBase.
  30. type Mutate struct {
  31. base
  32. mutationType pb.MutationProto_MutationType //*int32
  33. // values is a map of column families to a map of column qualifiers to bytes
  34. values map[string]map[string][]byte
  35. ttl []byte
  36. timestamp uint64
  37. durability DurabilityType
  38. deleteOneVersion bool
  39. skipbatch bool
  40. }
  41. // TTL sets a time-to-live for mutation queries.
  42. // The value will be in millisecond resolution.
  43. func TTL(t time.Duration) func(Call) error {
  44. return func(o Call) error {
  45. m, ok := o.(*Mutate)
  46. if !ok {
  47. return errors.New("'TTL' option can only be used with mutation queries")
  48. }
  49. buf := make([]byte, 8)
  50. binary.BigEndian.PutUint64(buf, uint64(t.Nanoseconds()/1e6))
  51. m.ttl = buf
  52. return nil
  53. }
  54. }
  55. // Timestamp sets timestamp for mutation queries.
  56. // The time object passed will be rounded to a millisecond resolution, as by default,
  57. // if no timestamp is provided, HBase sets it to current time in milliseconds.
  58. // In order to have custom time precision, use TimestampUint64 call option for
  59. // mutation requests and corresponding TimeRangeUint64 for retrieval requests.
  60. func Timestamp(ts time.Time) func(Call) error {
  61. return func(o Call) error {
  62. m, ok := o.(*Mutate)
  63. if !ok {
  64. return errors.New("'Timestamp' option can only be used with mutation queries")
  65. }
  66. m.timestamp = uint64(ts.UnixNano() / 1e6)
  67. return nil
  68. }
  69. }
  70. // TimestampUint64 sets timestamp for mutation queries.
  71. func TimestampUint64(ts uint64) func(Call) error {
  72. return func(o Call) error {
  73. m, ok := o.(*Mutate)
  74. if !ok {
  75. return errors.New("'TimestampUint64' option can only be used with mutation queries")
  76. }
  77. m.timestamp = ts
  78. return nil
  79. }
  80. }
  81. // Durability sets durability for mutation queries.
  82. func Durability(d DurabilityType) func(Call) error {
  83. return func(o Call) error {
  84. m, ok := o.(*Mutate)
  85. if !ok {
  86. return errors.New("'Durability' option can only be used with mutation queries")
  87. }
  88. if d < UseDefault || d > FsyncWal {
  89. return errors.New("invalid durability value")
  90. }
  91. m.durability = d
  92. return nil
  93. }
  94. }
  95. // DeleteOneVersion is a delete option that can be passed in order to delete only
  96. // one latest version of the specified qualifiers. Without timestamp specified,
  97. // it will have no effect for delete specific column families request.
  98. // If a Timestamp option is passed along, only the version at that timestamp will be removed
  99. // for delete specific column families and/or qualifier request.
  100. // This option cannot be used for delete entire row request.
  101. func DeleteOneVersion() func(Call) error {
  102. return func(o Call) error {
  103. m, ok := o.(*Mutate)
  104. if !ok {
  105. return errors.New("'DeleteOneVersion' option can only be used with mutation queries")
  106. }
  107. m.deleteOneVersion = true
  108. return nil
  109. }
  110. }
  111. // baseMutate returns a Mutate struct without the mutationType filled in.
  112. func baseMutate(ctx context.Context, table, key []byte, values map[string]map[string][]byte,
  113. options ...func(Call) error) (*Mutate, error) {
  114. m := &Mutate{
  115. base: base{
  116. table: table,
  117. key: key,
  118. ctx: ctx,
  119. resultch: make(chan RPCResult, 1),
  120. },
  121. values: values,
  122. timestamp: MaxTimestamp,
  123. }
  124. err := applyOptions(m, options...)
  125. if err != nil {
  126. return nil, err
  127. }
  128. return m, nil
  129. }
  130. // NewPut creates a new Mutation request to insert the given
  131. // family-column-values in the given row key of the given table.
  132. func NewPut(ctx context.Context, table, key []byte,
  133. values map[string]map[string][]byte, options ...func(Call) error) (*Mutate, error) {
  134. m, err := baseMutate(ctx, table, key, values, options...)
  135. if err != nil {
  136. return nil, err
  137. }
  138. m.mutationType = pb.MutationProto_PUT
  139. return m, nil
  140. }
  141. // NewPutStr is just like NewPut but takes table and key as strings.
  142. func NewPutStr(ctx context.Context, table, key string,
  143. values map[string]map[string][]byte, options ...func(Call) error) (*Mutate, error) {
  144. return NewPut(ctx, []byte(table), []byte(key), values, options...)
  145. }
  146. // NewDel is used to perform Delete operations on a single row.
  147. // To delete entire row, values should be nil.
  148. //
  149. // To delete specific families, qualifiers map should be nil:
  150. // map[string]map[string][]byte{
  151. // "cf1": nil,
  152. // "cf2": nil,
  153. // }
  154. //
  155. // To delete specific qualifiers:
  156. // map[string]map[string][]byte{
  157. // "cf": map[string][]byte{
  158. // "q1": nil,
  159. // "q2": nil,
  160. // },
  161. // }
  162. //
  163. // To delete all versions before and at a timestamp, pass hrpc.Timestamp() option.
  164. // By default all versions will be removed.
  165. //
  166. // To delete only a specific version at a timestamp, pass hrpc.DeleteOneVersion() option
  167. // along with a timestamp. For delete specific qualifiers request, if timestamp is not
  168. // passed, only the latest version will be removed. For delete specific families request,
  169. // the timestamp should be passed or it will have no effect as it's an expensive
  170. // operation to perform.
  171. func NewDel(ctx context.Context, table, key []byte,
  172. values map[string]map[string][]byte, options ...func(Call) error) (*Mutate, error) {
  173. m, err := baseMutate(ctx, table, key, values, options...)
  174. if err != nil {
  175. return nil, err
  176. }
  177. if len(m.values) == 0 && m.deleteOneVersion {
  178. return nil, errors.New(
  179. "'DeleteOneVersion' option cannot be specified for delete entire row request")
  180. }
  181. m.mutationType = pb.MutationProto_DELETE
  182. return m, nil
  183. }
  184. // NewDelStr is just like NewDel but takes table and key as strings.
  185. func NewDelStr(ctx context.Context, table, key string,
  186. values map[string]map[string][]byte, options ...func(Call) error) (*Mutate, error) {
  187. return NewDel(ctx, []byte(table), []byte(key), values, options...)
  188. }
  189. // NewApp creates a new Mutation request to append the given
  190. // family-column-values into the existing cells in HBase (or create them if
  191. // needed), in given row key of the given table.
  192. func NewApp(ctx context.Context, table, key []byte,
  193. values map[string]map[string][]byte, options ...func(Call) error) (*Mutate, error) {
  194. m, err := baseMutate(ctx, table, key, values, options...)
  195. if err != nil {
  196. return nil, err
  197. }
  198. m.mutationType = pb.MutationProto_APPEND
  199. return m, nil
  200. }
  201. // NewAppStr is just like NewApp but takes table and key as strings.
  202. func NewAppStr(ctx context.Context, table, key string,
  203. values map[string]map[string][]byte, options ...func(Call) error) (*Mutate, error) {
  204. return NewApp(ctx, []byte(table), []byte(key), values, options...)
  205. }
  206. // NewIncSingle creates a new Mutation request that will increment the given value
  207. // by amount in HBase under the given table, key, family and qualifier.
  208. func NewIncSingle(ctx context.Context, table, key []byte, family, qualifier string,
  209. amount int64, options ...func(Call) error) (*Mutate, error) {
  210. buf := make([]byte, 8)
  211. binary.BigEndian.PutUint64(buf, uint64(amount))
  212. value := map[string]map[string][]byte{family: map[string][]byte{qualifier: buf}}
  213. return NewInc(ctx, table, key, value, options...)
  214. }
  215. // NewIncStrSingle is just like NewIncSingle but takes table and key as strings.
  216. func NewIncStrSingle(ctx context.Context, table, key, family, qualifier string,
  217. amount int64, options ...func(Call) error) (*Mutate, error) {
  218. return NewIncSingle(ctx, []byte(table), []byte(key), family, qualifier, amount, options...)
  219. }
  220. // NewInc creates a new Mutation request that will increment the given values
  221. // in HBase under the given table and key.
  222. func NewInc(ctx context.Context, table, key []byte,
  223. values map[string]map[string][]byte, options ...func(Call) error) (*Mutate, error) {
  224. m, err := baseMutate(ctx, table, key, values, options...)
  225. if err != nil {
  226. return nil, err
  227. }
  228. m.mutationType = pb.MutationProto_INCREMENT
  229. return m, nil
  230. }
  231. // NewIncStr is just like NewInc but takes table and key as strings.
  232. func NewIncStr(ctx context.Context, table, key string,
  233. values map[string]map[string][]byte, options ...func(Call) error) (*Mutate, error) {
  234. return NewInc(ctx, []byte(table), []byte(key), values, options...)
  235. }
  236. // Name returns the name of this RPC call.
  237. func (m *Mutate) Name() string {
  238. return "Mutate"
  239. }
  240. // SkipBatch returns true if the Mutate request shouldn't be batched,
  241. // but should be sent to Region Server right away.
  242. func (m *Mutate) SkipBatch() bool {
  243. return m.skipbatch
  244. }
  245. func (m *Mutate) setSkipBatch(v bool) {
  246. m.skipbatch = v
  247. }
  248. func (m *Mutate) toProto() *pb.MutateRequest {
  249. var ts *uint64
  250. if m.timestamp != MaxTimestamp {
  251. ts = &m.timestamp
  252. }
  253. // We need to convert everything in the values field
  254. // to a protobuf ColumnValue
  255. cvs := make([]*pb.MutationProto_ColumnValue, len(m.values))
  256. i := 0
  257. for k, v := range m.values {
  258. // And likewise, each item in each column needs to be converted to a
  259. // protobuf QualifierValue
  260. // if it's a delete, figure out the type
  261. var dt *pb.MutationProto_DeleteType
  262. if m.mutationType == pb.MutationProto_DELETE {
  263. if len(v) == 0 {
  264. // delete the whole column family
  265. if m.deleteOneVersion {
  266. dt = pb.MutationProto_DELETE_FAMILY_VERSION.Enum()
  267. } else {
  268. dt = pb.MutationProto_DELETE_FAMILY.Enum()
  269. }
  270. // add empty qualifier
  271. if v == nil {
  272. v = make(map[string][]byte)
  273. }
  274. v[""] = nil
  275. } else {
  276. // delete specific qualifiers
  277. if m.deleteOneVersion {
  278. dt = pb.MutationProto_DELETE_ONE_VERSION.Enum()
  279. } else {
  280. dt = pb.MutationProto_DELETE_MULTIPLE_VERSIONS.Enum()
  281. }
  282. }
  283. }
  284. qvs := make([]*pb.MutationProto_ColumnValue_QualifierValue, len(v))
  285. j := 0
  286. for k1, v1 := range v {
  287. qvs[j] = &pb.MutationProto_ColumnValue_QualifierValue{
  288. Qualifier: []byte(k1),
  289. Value: v1,
  290. Timestamp: ts,
  291. DeleteType: dt,
  292. }
  293. j++
  294. }
  295. cvs[i] = &pb.MutationProto_ColumnValue{
  296. Family: []byte(k),
  297. QualifierValue: qvs,
  298. }
  299. i++
  300. }
  301. mProto := &pb.MutationProto{
  302. Row: m.key,
  303. MutateType: &m.mutationType,
  304. ColumnValue: cvs,
  305. Durability: pb.MutationProto_Durability(m.durability).Enum(),
  306. Timestamp: ts,
  307. }
  308. if len(m.ttl) > 0 {
  309. mProto.Attribute = append(mProto.Attribute, &pb.NameBytesPair{
  310. Name: &attributeNameTTL,
  311. Value: m.ttl,
  312. })
  313. }
  314. return &pb.MutateRequest{
  315. Region: m.regionSpecifier(),
  316. Mutation: mProto,
  317. }
  318. }
  319. // ToProto converts this mutate RPC into a protobuf message
  320. func (m *Mutate) ToProto() proto.Message {
  321. return m.toProto()
  322. }
  323. // NewResponse creates an empty protobuf message to read the response of this RPC.
  324. func (m *Mutate) NewResponse() proto.Message {
  325. return &pb.MutateResponse{}
  326. }
  327. // DeserializeCellBlocks deserializes mutate result from cell blocks
  328. func (m *Mutate) DeserializeCellBlocks(pm proto.Message, b []byte) (uint32, error) {
  329. resp := pm.(*pb.MutateResponse)
  330. if resp.Result == nil {
  331. // TODO: is this possible?
  332. return 0, nil
  333. }
  334. cells, read, err := deserializeCellBlocks(b, uint32(resp.Result.GetAssociatedCellCount()))
  335. if err != nil {
  336. return 0, err
  337. }
  338. resp.Result.Cell = append(resp.Result.Cell, cells...)
  339. return read, nil
  340. }