client.go 7.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288
  1. // Copyright (C) 2015 The GoHBase Authors. All rights reserved.
  2. // This file is part of GoHBase.
  3. // Use of this source code is governed by the Apache License 2.0
  4. // that can be found in the COPYING file.
  5. package gohbase
  6. import (
  7. "encoding/binary"
  8. "fmt"
  9. "sync"
  10. "time"
  11. "github.com/golang/protobuf/proto"
  12. log "github.com/sirupsen/logrus"
  13. "github.com/tsuna/gohbase/hrpc"
  14. "github.com/tsuna/gohbase/pb"
  15. "github.com/tsuna/gohbase/region"
  16. "github.com/tsuna/gohbase/zk"
  17. "modernc.org/b"
  18. )
  19. const (
  20. defaultRPCQueueSize = 100
  21. defaultFlushInterval = 20 * time.Millisecond
  22. defaultZkRoot = "/hbase"
  23. defaultZkTimeout = 30 * time.Second
  24. defaultEffectiveUser = "root"
  25. )
  26. // Client a regular HBase client
  27. type Client interface {
  28. Scan(s *hrpc.Scan) hrpc.Scanner
  29. Get(g *hrpc.Get) (*hrpc.Result, error)
  30. Put(p *hrpc.Mutate) (*hrpc.Result, error)
  31. Delete(d *hrpc.Mutate) (*hrpc.Result, error)
  32. Append(a *hrpc.Mutate) (*hrpc.Result, error)
  33. Increment(i *hrpc.Mutate) (int64, error)
  34. CheckAndPut(p *hrpc.Mutate, family string, qualifier string,
  35. expectedValue []byte) (bool, error)
  36. Close()
  37. }
  38. // RPCClient is core client of gohbase. It's exposed for testing.
  39. type RPCClient interface {
  40. SendRPC(rpc hrpc.Call) (proto.Message, error)
  41. }
  42. // Option is a function used to configure optional config items for a Client.
  43. type Option func(*client)
  44. // A Client provides access to an HBase cluster.
  45. type client struct {
  46. clientType region.ClientType
  47. regions keyRegionCache
  48. // Maps a hrpc.RegionInfo to the *region.Client that we think currently
  49. // serves it.
  50. clients clientRegionCache
  51. metaRegionInfo hrpc.RegionInfo
  52. adminRegionInfo hrpc.RegionInfo
  53. // The maximum size of the RPC queue in the region client
  54. rpcQueueSize int
  55. // zkClient is zookeeper for retrieving meta and admin information
  56. zkClient zk.Client
  57. // The root zookeeper path for Hbase. By default, this is usually "/hbase".
  58. zkRoot string
  59. // The zookeeper session timeout
  60. zkTimeout time.Duration
  61. // The timeout before flushing the RPC queue in the region client
  62. flushInterval time.Duration
  63. // The user used when accessing regions.
  64. effectiveUser string
  65. // How long to wait for a region lookup (either meta lookup or finding
  66. // meta in ZooKeeper). Should be greater than or equal to the ZooKeeper
  67. // session timeout.
  68. regionLookupTimeout time.Duration
  69. // regionReadTimeout is the maximum amount of time to wait for regionserver reply
  70. regionReadTimeout time.Duration
  71. done chan struct{}
  72. closeOnce sync.Once
  73. newRegionClientFn func(string, region.ClientType, int, time.Duration,
  74. string, time.Duration) hrpc.RegionClient
  75. }
  76. // NewClient creates a new HBase client.
  77. func NewClient(zkquorum string, options ...Option) Client {
  78. return newClient(zkquorum, options...)
  79. }
  80. func newClient(zkquorum string, options ...Option) *client {
  81. log.WithFields(log.Fields{
  82. "Host": zkquorum,
  83. }).Debug("Creating new client.")
  84. c := &client{
  85. clientType: region.RegionClient,
  86. regions: keyRegionCache{regions: b.TreeNew(region.CompareGeneric)},
  87. clients: clientRegionCache{
  88. regions: make(map[hrpc.RegionClient]map[hrpc.RegionInfo]struct{}),
  89. },
  90. rpcQueueSize: defaultRPCQueueSize,
  91. flushInterval: defaultFlushInterval,
  92. metaRegionInfo: region.NewInfo(
  93. 0,
  94. []byte("hbase"),
  95. []byte("meta"),
  96. []byte("hbase:meta,,1"),
  97. nil,
  98. nil),
  99. zkRoot: defaultZkRoot,
  100. zkTimeout: defaultZkTimeout,
  101. effectiveUser: defaultEffectiveUser,
  102. regionLookupTimeout: region.DefaultLookupTimeout,
  103. regionReadTimeout: region.DefaultReadTimeout,
  104. done: make(chan struct{}),
  105. newRegionClientFn: region.NewClient,
  106. }
  107. for _, option := range options {
  108. option(c)
  109. }
  110. //Have to create the zkClient after the Options have been set
  111. //since the zkTimeout could be changed as an option
  112. c.zkClient = zk.NewClient(zkquorum, c.zkTimeout)
  113. return c
  114. }
  115. // RpcQueueSize will return an option that will set the size of the RPC queues
  116. // used in a given client
  117. func RpcQueueSize(size int) Option {
  118. return func(c *client) {
  119. c.rpcQueueSize = size
  120. }
  121. }
  122. // ZookeeperRoot will return an option that will set the zookeeper root path used in a given client.
  123. func ZookeeperRoot(root string) Option {
  124. return func(c *client) {
  125. c.zkRoot = root
  126. }
  127. }
  128. // ZookeeperTimeout will return an option that will set the zookeeper session timeout.
  129. func ZookeeperTimeout(to time.Duration) Option {
  130. return func(c *client) {
  131. c.zkTimeout = to
  132. }
  133. }
  134. // RegionLookupTimeout will return an option that sets the region lookup timeout
  135. func RegionLookupTimeout(to time.Duration) Option {
  136. return func(c *client) {
  137. c.regionLookupTimeout = to
  138. }
  139. }
  140. // RegionReadTimeout will return an option that sets the region read timeout
  141. func RegionReadTimeout(to time.Duration) Option {
  142. return func(c *client) {
  143. c.regionReadTimeout = to
  144. }
  145. }
  146. // EffectiveUser will return an option that will set the user used when accessing regions.
  147. func EffectiveUser(user string) Option {
  148. return func(c *client) {
  149. c.effectiveUser = user
  150. }
  151. }
  152. // FlushInterval will return an option that will set the timeout for flushing
  153. // the RPC queues used in a given client
  154. func FlushInterval(interval time.Duration) Option {
  155. return func(c *client) {
  156. c.flushInterval = interval
  157. }
  158. }
  159. // Close closes connections to hbase master and regionservers
  160. func (c *client) Close() {
  161. c.closeOnce.Do(func() {
  162. close(c.done)
  163. if c.clientType == region.MasterClient {
  164. if ac := c.adminRegionInfo.Client(); ac != nil {
  165. ac.Close()
  166. }
  167. }
  168. c.clients.closeAll()
  169. })
  170. }
  171. func (c *client) Scan(s *hrpc.Scan) hrpc.Scanner {
  172. return newScanner(c, s)
  173. }
  174. func (c *client) Get(g *hrpc.Get) (*hrpc.Result, error) {
  175. pbmsg, err := c.SendRPC(g)
  176. if err != nil {
  177. return nil, err
  178. }
  179. r, ok := pbmsg.(*pb.GetResponse)
  180. if !ok {
  181. return nil, fmt.Errorf("sendRPC returned not a GetResponse")
  182. }
  183. return hrpc.ToLocalResult(r.Result), nil
  184. }
  185. func (c *client) Put(p *hrpc.Mutate) (*hrpc.Result, error) {
  186. return c.mutate(p)
  187. }
  188. func (c *client) Delete(d *hrpc.Mutate) (*hrpc.Result, error) {
  189. return c.mutate(d)
  190. }
  191. func (c *client) Append(a *hrpc.Mutate) (*hrpc.Result, error) {
  192. return c.mutate(a)
  193. }
  194. func (c *client) Increment(i *hrpc.Mutate) (int64, error) {
  195. r, err := c.mutate(i)
  196. if err != nil {
  197. return 0, err
  198. }
  199. if len(r.Cells) != 1 {
  200. return 0, fmt.Errorf("increment returned %d cells, but we expected exactly one",
  201. len(r.Cells))
  202. }
  203. val := binary.BigEndian.Uint64(r.Cells[0].Value)
  204. return int64(val), nil
  205. }
  206. func (c *client) mutate(m *hrpc.Mutate) (*hrpc.Result, error) {
  207. pbmsg, err := c.SendRPC(m)
  208. if err != nil {
  209. return nil, err
  210. }
  211. r, ok := pbmsg.(*pb.MutateResponse)
  212. if !ok {
  213. return nil, fmt.Errorf("sendRPC returned not a MutateResponse")
  214. }
  215. return hrpc.ToLocalResult(r.Result), nil
  216. }
  217. func (c *client) CheckAndPut(p *hrpc.Mutate, family string,
  218. qualifier string, expectedValue []byte) (bool, error) {
  219. cas, err := hrpc.NewCheckAndPut(p, family, qualifier, expectedValue)
  220. if err != nil {
  221. return false, err
  222. }
  223. pbmsg, err := c.SendRPC(cas)
  224. if err != nil {
  225. return false, err
  226. }
  227. r, ok := pbmsg.(*pb.MutateResponse)
  228. if !ok {
  229. return false, fmt.Errorf("sendRPC returned a %T instead of MutateResponse", pbmsg)
  230. }
  231. if r.Processed == nil {
  232. return false, fmt.Errorf("protobuf in the response didn't contain the field "+
  233. "indicating whether the CheckAndPut was successful or not: %s", r)
  234. }
  235. return r.GetProcessed(), nil
  236. }