admin_client.go 6.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270
  1. // Copyright (C) 2016 The GoHBase Authors. All rights reserved.
  2. // This file is part of GoHBase.
  3. // Use of this source code is governed by the Apache License 2.0
  4. // that can be found in the COPYING file.
  5. package gohbase
  6. import (
  7. "context"
  8. "errors"
  9. "fmt"
  10. "time"
  11. log "github.com/sirupsen/logrus"
  12. "github.com/tsuna/gohbase/hrpc"
  13. "github.com/tsuna/gohbase/pb"
  14. "github.com/tsuna/gohbase/region"
  15. "github.com/tsuna/gohbase/zk"
  16. )
  17. const (
  18. // snapshotValidateInterval specifies the amount of time to wait before
  19. // polling the hbase server about the status of a snapshot operation.
  20. snaphotValidateInterval time.Duration = time.Second / 2
  21. )
  22. // AdminClient to perform admistrative operations with HMaster
  23. type AdminClient interface {
  24. CreateTable(t *hrpc.CreateTable) error
  25. DeleteTable(t *hrpc.DeleteTable) error
  26. EnableTable(t *hrpc.EnableTable) error
  27. DisableTable(t *hrpc.DisableTable) error
  28. CreateSnapshot(t *hrpc.Snapshot) error
  29. DeleteSnapshot(t *hrpc.Snapshot) error
  30. ListSnapshots(t *hrpc.ListSnapshots) ([]*pb.SnapshotDescription, error)
  31. RestoreSnapshot(t *hrpc.Snapshot) error
  32. ClusterStatus() (*pb.ClusterStatus, error)
  33. ListTableNames(t *hrpc.ListTableNames) ([]*pb.TableName, error)
  34. }
  35. // NewAdminClient creates an admin HBase client.
  36. func NewAdminClient(zkquorum string, options ...Option) AdminClient {
  37. return newAdminClient(zkquorum, options...)
  38. }
  39. func newAdminClient(zkquorum string, options ...Option) AdminClient {
  40. log.WithFields(log.Fields{
  41. "Host": zkquorum,
  42. }).Debug("Creating new admin client.")
  43. c := &client{
  44. clientType: region.MasterClient,
  45. rpcQueueSize: defaultRPCQueueSize,
  46. flushInterval: defaultFlushInterval,
  47. // empty region in order to be able to set client to it
  48. adminRegionInfo: region.NewInfo(0, nil, nil, nil, nil, nil),
  49. zkTimeout: defaultZkTimeout,
  50. zkRoot: defaultZkRoot,
  51. effectiveUser: defaultEffectiveUser,
  52. regionLookupTimeout: region.DefaultLookupTimeout,
  53. regionReadTimeout: region.DefaultReadTimeout,
  54. newRegionClientFn: region.NewClient,
  55. }
  56. for _, option := range options {
  57. option(c)
  58. }
  59. c.zkClient = zk.NewClient(zkquorum, c.zkTimeout)
  60. return c
  61. }
  62. //Get the status of the cluster
  63. func (c *client) ClusterStatus() (*pb.ClusterStatus, error) {
  64. pbmsg, err := c.SendRPC(hrpc.NewClusterStatus())
  65. if err != nil {
  66. return nil, err
  67. }
  68. r, ok := pbmsg.(*pb.GetClusterStatusResponse)
  69. if !ok {
  70. return nil, fmt.Errorf("sendRPC returned not a ClusterStatusResponse")
  71. }
  72. return r.GetClusterStatus(), nil
  73. }
  74. func (c *client) CreateTable(t *hrpc.CreateTable) error {
  75. pbmsg, err := c.SendRPC(t)
  76. if err != nil {
  77. return err
  78. }
  79. r, ok := pbmsg.(*pb.CreateTableResponse)
  80. if !ok {
  81. return fmt.Errorf("sendRPC returned not a CreateTableResponse")
  82. }
  83. return c.checkProcedureWithBackoff(t.Context(), r.GetProcId())
  84. }
  85. func (c *client) DeleteTable(t *hrpc.DeleteTable) error {
  86. pbmsg, err := c.SendRPC(t)
  87. if err != nil {
  88. return err
  89. }
  90. r, ok := pbmsg.(*pb.DeleteTableResponse)
  91. if !ok {
  92. return fmt.Errorf("sendRPC returned not a DeleteTableResponse")
  93. }
  94. return c.checkProcedureWithBackoff(t.Context(), r.GetProcId())
  95. }
  96. func (c *client) EnableTable(t *hrpc.EnableTable) error {
  97. pbmsg, err := c.SendRPC(t)
  98. if err != nil {
  99. return err
  100. }
  101. r, ok := pbmsg.(*pb.EnableTableResponse)
  102. if !ok {
  103. return fmt.Errorf("sendRPC returned not a EnableTableResponse")
  104. }
  105. return c.checkProcedureWithBackoff(t.Context(), r.GetProcId())
  106. }
  107. func (c *client) DisableTable(t *hrpc.DisableTable) error {
  108. pbmsg, err := c.SendRPC(t)
  109. if err != nil {
  110. return err
  111. }
  112. r, ok := pbmsg.(*pb.DisableTableResponse)
  113. if !ok {
  114. return fmt.Errorf("sendRPC returned not a DisableTableResponse")
  115. }
  116. return c.checkProcedureWithBackoff(t.Context(), r.GetProcId())
  117. }
  118. func (c *client) checkProcedureWithBackoff(ctx context.Context, procID uint64) error {
  119. backoff := backoffStart
  120. for {
  121. pbmsg, err := c.SendRPC(hrpc.NewGetProcedureState(ctx, procID))
  122. if err != nil {
  123. return err
  124. }
  125. res := pbmsg.(*pb.GetProcedureResultResponse)
  126. switch res.GetState() {
  127. case pb.GetProcedureResultResponse_NOT_FOUND:
  128. return fmt.Errorf("procedure not found")
  129. case pb.GetProcedureResultResponse_FINISHED:
  130. if fe := res.Exception; fe != nil {
  131. ge := fe.GenericException
  132. if ge == nil {
  133. return errors.New("got unexpected empty exception")
  134. }
  135. return fmt.Errorf("procedure exception: %s: %s", ge.GetClassName(), ge.GetMessage())
  136. }
  137. return nil
  138. default:
  139. backoff, err = sleepAndIncreaseBackoff(ctx, backoff)
  140. if err != nil {
  141. return err
  142. }
  143. }
  144. }
  145. }
  146. // CreateSnapshot creates a snapshot in HBase.
  147. //
  148. // If a context happens during creation, no cleanup is done.
  149. func (c *client) CreateSnapshot(t *hrpc.Snapshot) error {
  150. pbmsg, err := c.SendRPC(t)
  151. if err != nil {
  152. return err
  153. }
  154. _, ok := pbmsg.(*pb.SnapshotResponse)
  155. if !ok {
  156. return errors.New("sendPRC returned not a SnapshotResponse")
  157. }
  158. ticker := time.NewTicker(snaphotValidateInterval)
  159. defer ticker.Stop()
  160. check := hrpc.NewSnapshotDone(t)
  161. ctx := t.Context()
  162. for {
  163. select {
  164. case <-ticker.C:
  165. pbmsgs, err := c.SendRPC(check)
  166. if err != nil {
  167. return err
  168. }
  169. r, ok := pbmsgs.(*pb.IsSnapshotDoneResponse)
  170. if !ok {
  171. return errors.New("sendPRC returned not a IsSnapshotDoneResponse")
  172. }
  173. if r.GetDone() {
  174. return nil
  175. }
  176. case <-ctx.Done():
  177. return ctx.Err()
  178. }
  179. }
  180. }
  181. // DeleteSnapshot deletes a snapshot in HBase.
  182. func (c *client) DeleteSnapshot(t *hrpc.Snapshot) error {
  183. rt := hrpc.NewDeleteSnapshot(t)
  184. pbmsg, err := c.SendRPC(rt)
  185. if err != nil {
  186. return err
  187. }
  188. _, ok := pbmsg.(*pb.DeleteSnapshotResponse)
  189. if !ok {
  190. return errors.New("sendPRC returned not a DeleteSnapshotResponse")
  191. }
  192. return nil
  193. }
  194. func (c *client) ListSnapshots(t *hrpc.ListSnapshots) ([]*pb.SnapshotDescription, error) {
  195. pbmsg, err := c.SendRPC(t)
  196. if err != nil {
  197. return nil, err
  198. }
  199. r, ok := pbmsg.(*pb.GetCompletedSnapshotsResponse)
  200. if !ok {
  201. return nil, errors.New("sendPRC returned not a GetCompletedSnapshotsResponse")
  202. }
  203. return r.GetSnapshots(), nil
  204. }
  205. func (c *client) RestoreSnapshot(t *hrpc.Snapshot) error {
  206. rt := hrpc.NewRestoreSnapshot(t)
  207. pbmsg, err := c.SendRPC(rt)
  208. if err != nil {
  209. return err
  210. }
  211. _, ok := pbmsg.(*pb.RestoreSnapshotResponse)
  212. if !ok {
  213. return errors.New("sendPRC returned not a RestoreSnapshotResponse")
  214. }
  215. return nil
  216. }
  217. func (c *client) ListTableNames(t *hrpc.ListTableNames) ([]*pb.TableName, error) {
  218. pbmsg, err := c.SendRPC(t)
  219. if err != nil {
  220. return nil, err
  221. }
  222. res, ok := pbmsg.(*pb.GetTableNamesResponse)
  223. if !ok {
  224. return nil, errors.New("sendPRC returned not a GetTableNamesResponse")
  225. }
  226. return res.GetTableNames(), nil
  227. }