123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383 |
- package hrpc
- import (
- "context"
- "encoding/binary"
- "errors"
- "time"
- "github.com/golang/protobuf/proto"
- "github.com/tsuna/gohbase/pb"
- )
- var attributeNameTTL = "_ttl"
- type DurabilityType int32
- const (
-
- UseDefault DurabilityType = iota
-
- SkipWal
-
- AsyncWal
-
- SyncWal
-
- FsyncWal
- )
- type Mutate struct {
- base
- mutationType pb.MutationProto_MutationType
-
- values map[string]map[string][]byte
- ttl []byte
- timestamp uint64
- durability DurabilityType
- deleteOneVersion bool
- skipbatch bool
- }
- func TTL(t time.Duration) func(Call) error {
- return func(o Call) error {
- m, ok := o.(*Mutate)
- if !ok {
- return errors.New("'TTL' option can only be used with mutation queries")
- }
- buf := make([]byte, 8)
- binary.BigEndian.PutUint64(buf, uint64(t.Nanoseconds()/1e6))
- m.ttl = buf
- return nil
- }
- }
- func Timestamp(ts time.Time) func(Call) error {
- return func(o Call) error {
- m, ok := o.(*Mutate)
- if !ok {
- return errors.New("'Timestamp' option can only be used with mutation queries")
- }
- m.timestamp = uint64(ts.UnixNano() / 1e6)
- return nil
- }
- }
- func TimestampUint64(ts uint64) func(Call) error {
- return func(o Call) error {
- m, ok := o.(*Mutate)
- if !ok {
- return errors.New("'TimestampUint64' option can only be used with mutation queries")
- }
- m.timestamp = ts
- return nil
- }
- }
- func Durability(d DurabilityType) func(Call) error {
- return func(o Call) error {
- m, ok := o.(*Mutate)
- if !ok {
- return errors.New("'Durability' option can only be used with mutation queries")
- }
- if d < UseDefault || d > FsyncWal {
- return errors.New("invalid durability value")
- }
- m.durability = d
- return nil
- }
- }
- func DeleteOneVersion() func(Call) error {
- return func(o Call) error {
- m, ok := o.(*Mutate)
- if !ok {
- return errors.New("'DeleteOneVersion' option can only be used with mutation queries")
- }
- m.deleteOneVersion = true
- return nil
- }
- }
- func baseMutate(ctx context.Context, table, key []byte, values map[string]map[string][]byte,
- options ...func(Call) error) (*Mutate, error) {
- m := &Mutate{
- base: base{
- table: table,
- key: key,
- ctx: ctx,
- resultch: make(chan RPCResult, 1),
- },
- values: values,
- timestamp: MaxTimestamp,
- }
- err := applyOptions(m, options...)
- if err != nil {
- return nil, err
- }
- return m, nil
- }
- func NewPut(ctx context.Context, table, key []byte,
- values map[string]map[string][]byte, options ...func(Call) error) (*Mutate, error) {
- m, err := baseMutate(ctx, table, key, values, options...)
- if err != nil {
- return nil, err
- }
- m.mutationType = pb.MutationProto_PUT
- return m, nil
- }
- func NewPutStr(ctx context.Context, table, key string,
- values map[string]map[string][]byte, options ...func(Call) error) (*Mutate, error) {
- return NewPut(ctx, []byte(table), []byte(key), values, options...)
- }
- func NewDel(ctx context.Context, table, key []byte,
- values map[string]map[string][]byte, options ...func(Call) error) (*Mutate, error) {
- m, err := baseMutate(ctx, table, key, values, options...)
- if err != nil {
- return nil, err
- }
- if len(m.values) == 0 && m.deleteOneVersion {
- return nil, errors.New(
- "'DeleteOneVersion' option cannot be specified for delete entire row request")
- }
- m.mutationType = pb.MutationProto_DELETE
- return m, nil
- }
- func NewDelStr(ctx context.Context, table, key string,
- values map[string]map[string][]byte, options ...func(Call) error) (*Mutate, error) {
- return NewDel(ctx, []byte(table), []byte(key), values, options...)
- }
- func NewApp(ctx context.Context, table, key []byte,
- values map[string]map[string][]byte, options ...func(Call) error) (*Mutate, error) {
- m, err := baseMutate(ctx, table, key, values, options...)
- if err != nil {
- return nil, err
- }
- m.mutationType = pb.MutationProto_APPEND
- return m, nil
- }
- func NewAppStr(ctx context.Context, table, key string,
- values map[string]map[string][]byte, options ...func(Call) error) (*Mutate, error) {
- return NewApp(ctx, []byte(table), []byte(key), values, options...)
- }
- func NewIncSingle(ctx context.Context, table, key []byte, family, qualifier string,
- amount int64, options ...func(Call) error) (*Mutate, error) {
- buf := make([]byte, 8)
- binary.BigEndian.PutUint64(buf, uint64(amount))
- value := map[string]map[string][]byte{family: map[string][]byte{qualifier: buf}}
- return NewInc(ctx, table, key, value, options...)
- }
- func NewIncStrSingle(ctx context.Context, table, key, family, qualifier string,
- amount int64, options ...func(Call) error) (*Mutate, error) {
- return NewIncSingle(ctx, []byte(table), []byte(key), family, qualifier, amount, options...)
- }
- func NewInc(ctx context.Context, table, key []byte,
- values map[string]map[string][]byte, options ...func(Call) error) (*Mutate, error) {
- m, err := baseMutate(ctx, table, key, values, options...)
- if err != nil {
- return nil, err
- }
- m.mutationType = pb.MutationProto_INCREMENT
- return m, nil
- }
- func NewIncStr(ctx context.Context, table, key string,
- values map[string]map[string][]byte, options ...func(Call) error) (*Mutate, error) {
- return NewInc(ctx, []byte(table), []byte(key), values, options...)
- }
- func (m *Mutate) Name() string {
- return "Mutate"
- }
- func (m *Mutate) SkipBatch() bool {
- return m.skipbatch
- }
- func (m *Mutate) setSkipBatch(v bool) {
- m.skipbatch = v
- }
- func (m *Mutate) toProto() *pb.MutateRequest {
- var ts *uint64
- if m.timestamp != MaxTimestamp {
- ts = &m.timestamp
- }
-
-
- cvs := make([]*pb.MutationProto_ColumnValue, len(m.values))
- i := 0
- for k, v := range m.values {
-
-
-
- var dt *pb.MutationProto_DeleteType
- if m.mutationType == pb.MutationProto_DELETE {
- if len(v) == 0 {
-
- if m.deleteOneVersion {
- dt = pb.MutationProto_DELETE_FAMILY_VERSION.Enum()
- } else {
- dt = pb.MutationProto_DELETE_FAMILY.Enum()
- }
-
- if v == nil {
- v = make(map[string][]byte)
- }
- v[""] = nil
- } else {
-
- if m.deleteOneVersion {
- dt = pb.MutationProto_DELETE_ONE_VERSION.Enum()
- } else {
- dt = pb.MutationProto_DELETE_MULTIPLE_VERSIONS.Enum()
- }
- }
- }
- qvs := make([]*pb.MutationProto_ColumnValue_QualifierValue, len(v))
- j := 0
- for k1, v1 := range v {
- qvs[j] = &pb.MutationProto_ColumnValue_QualifierValue{
- Qualifier: []byte(k1),
- Value: v1,
- Timestamp: ts,
- DeleteType: dt,
- }
- j++
- }
- cvs[i] = &pb.MutationProto_ColumnValue{
- Family: []byte(k),
- QualifierValue: qvs,
- }
- i++
- }
- mProto := &pb.MutationProto{
- Row: m.key,
- MutateType: &m.mutationType,
- ColumnValue: cvs,
- Durability: pb.MutationProto_Durability(m.durability).Enum(),
- Timestamp: ts,
- }
- if len(m.ttl) > 0 {
- mProto.Attribute = append(mProto.Attribute, &pb.NameBytesPair{
- Name: &attributeNameTTL,
- Value: m.ttl,
- })
- }
- return &pb.MutateRequest{
- Region: m.regionSpecifier(),
- Mutation: mProto,
- }
- }
- func (m *Mutate) ToProto() proto.Message {
- return m.toProto()
- }
- func (m *Mutate) NewResponse() proto.Message {
- return &pb.MutateResponse{}
- }
- func (m *Mutate) DeserializeCellBlocks(pm proto.Message, b []byte) (uint32, error) {
- resp := pm.(*pb.MutateResponse)
- if resp.Result == nil {
-
- return 0, nil
- }
- cells, read, err := deserializeCellBlocks(b, uint32(resp.Result.GetAssociatedCellCount()))
- if err != nil {
- return 0, err
- }
- resp.Result.Cell = append(resp.Result.Cell, cells...)
- return read, nil
- }
|