|
- // Copyright (C) 2016 The GoHBase Authors. All rights reserved.
- // This file is part of GoHBase.
- // Use of this source code is governed by the Apache License 2.0
- // that can be found in the COPYING file.
- package gohbase
- import (
- "context"
- "errors"
- "fmt"
- "time"
- log "github.com/sirupsen/logrus"
- "github.com/tsuna/gohbase/hrpc"
- "github.com/tsuna/gohbase/pb"
- "github.com/tsuna/gohbase/region"
- "github.com/tsuna/gohbase/zk"
- )
- const (
- // snapshotValidateInterval specifies the amount of time to wait before
- // polling the hbase server about the status of a snapshot operation.
- snaphotValidateInterval time.Duration = time.Second / 2
- )
- // AdminClient to perform admistrative operations with HMaster
- type AdminClient interface {
- CreateTable(t *hrpc.CreateTable) error
- DeleteTable(t *hrpc.DeleteTable) error
- EnableTable(t *hrpc.EnableTable) error
- DisableTable(t *hrpc.DisableTable) error
- CreateSnapshot(t *hrpc.Snapshot) error
- DeleteSnapshot(t *hrpc.Snapshot) error
- ListSnapshots(t *hrpc.ListSnapshots) ([]*pb.SnapshotDescription, error)
- RestoreSnapshot(t *hrpc.Snapshot) error
- ClusterStatus() (*pb.ClusterStatus, error)
- ListTableNames(t *hrpc.ListTableNames) ([]*pb.TableName, error)
- }
- // NewAdminClient creates an admin HBase client.
- func NewAdminClient(zkquorum string, options ...Option) AdminClient {
- return newAdminClient(zkquorum, options...)
- }
- func newAdminClient(zkquorum string, options ...Option) AdminClient {
- log.WithFields(log.Fields{
- "Host": zkquorum,
- }).Debug("Creating new admin client.")
- c := &client{
- clientType: region.MasterClient,
- rpcQueueSize: defaultRPCQueueSize,
- flushInterval: defaultFlushInterval,
- // empty region in order to be able to set client to it
- adminRegionInfo: region.NewInfo(0, nil, nil, nil, nil, nil),
- zkTimeout: defaultZkTimeout,
- zkRoot: defaultZkRoot,
- effectiveUser: defaultEffectiveUser,
- regionLookupTimeout: region.DefaultLookupTimeout,
- regionReadTimeout: region.DefaultReadTimeout,
- newRegionClientFn: region.NewClient,
- }
- for _, option := range options {
- option(c)
- }
- c.zkClient = zk.NewClient(zkquorum, c.zkTimeout)
- return c
- }
- //Get the status of the cluster
- func (c *client) ClusterStatus() (*pb.ClusterStatus, error) {
- pbmsg, err := c.SendRPC(hrpc.NewClusterStatus())
- if err != nil {
- return nil, err
- }
- r, ok := pbmsg.(*pb.GetClusterStatusResponse)
- if !ok {
- return nil, fmt.Errorf("sendRPC returned not a ClusterStatusResponse")
- }
- return r.GetClusterStatus(), nil
- }
- func (c *client) CreateTable(t *hrpc.CreateTable) error {
- pbmsg, err := c.SendRPC(t)
- if err != nil {
- return err
- }
- r, ok := pbmsg.(*pb.CreateTableResponse)
- if !ok {
- return fmt.Errorf("sendRPC returned not a CreateTableResponse")
- }
- return c.checkProcedureWithBackoff(t.Context(), r.GetProcId())
- }
- func (c *client) DeleteTable(t *hrpc.DeleteTable) error {
- pbmsg, err := c.SendRPC(t)
- if err != nil {
- return err
- }
- r, ok := pbmsg.(*pb.DeleteTableResponse)
- if !ok {
- return fmt.Errorf("sendRPC returned not a DeleteTableResponse")
- }
- return c.checkProcedureWithBackoff(t.Context(), r.GetProcId())
- }
- func (c *client) EnableTable(t *hrpc.EnableTable) error {
- pbmsg, err := c.SendRPC(t)
- if err != nil {
- return err
- }
- r, ok := pbmsg.(*pb.EnableTableResponse)
- if !ok {
- return fmt.Errorf("sendRPC returned not a EnableTableResponse")
- }
- return c.checkProcedureWithBackoff(t.Context(), r.GetProcId())
- }
- func (c *client) DisableTable(t *hrpc.DisableTable) error {
- pbmsg, err := c.SendRPC(t)
- if err != nil {
- return err
- }
- r, ok := pbmsg.(*pb.DisableTableResponse)
- if !ok {
- return fmt.Errorf("sendRPC returned not a DisableTableResponse")
- }
- return c.checkProcedureWithBackoff(t.Context(), r.GetProcId())
- }
- func (c *client) checkProcedureWithBackoff(ctx context.Context, procID uint64) error {
- backoff := backoffStart
- for {
- pbmsg, err := c.SendRPC(hrpc.NewGetProcedureState(ctx, procID))
- if err != nil {
- return err
- }
- res := pbmsg.(*pb.GetProcedureResultResponse)
- switch res.GetState() {
- case pb.GetProcedureResultResponse_NOT_FOUND:
- return fmt.Errorf("procedure not found")
- case pb.GetProcedureResultResponse_FINISHED:
- if fe := res.Exception; fe != nil {
- ge := fe.GenericException
- if ge == nil {
- return errors.New("got unexpected empty exception")
- }
- return fmt.Errorf("procedure exception: %s: %s", ge.GetClassName(), ge.GetMessage())
- }
- return nil
- default:
- backoff, err = sleepAndIncreaseBackoff(ctx, backoff)
- if err != nil {
- return err
- }
- }
- }
- }
- // CreateSnapshot creates a snapshot in HBase.
- //
- // If a context happens during creation, no cleanup is done.
- func (c *client) CreateSnapshot(t *hrpc.Snapshot) error {
- pbmsg, err := c.SendRPC(t)
- if err != nil {
- return err
- }
- _, ok := pbmsg.(*pb.SnapshotResponse)
- if !ok {
- return errors.New("sendPRC returned not a SnapshotResponse")
- }
- ticker := time.NewTicker(snaphotValidateInterval)
- defer ticker.Stop()
- check := hrpc.NewSnapshotDone(t)
- ctx := t.Context()
- for {
- select {
- case <-ticker.C:
- pbmsgs, err := c.SendRPC(check)
- if err != nil {
- return err
- }
- r, ok := pbmsgs.(*pb.IsSnapshotDoneResponse)
- if !ok {
- return errors.New("sendPRC returned not a IsSnapshotDoneResponse")
- }
- if r.GetDone() {
- return nil
- }
- case <-ctx.Done():
- return ctx.Err()
- }
- }
- }
- // DeleteSnapshot deletes a snapshot in HBase.
- func (c *client) DeleteSnapshot(t *hrpc.Snapshot) error {
- rt := hrpc.NewDeleteSnapshot(t)
- pbmsg, err := c.SendRPC(rt)
- if err != nil {
- return err
- }
- _, ok := pbmsg.(*pb.DeleteSnapshotResponse)
- if !ok {
- return errors.New("sendPRC returned not a DeleteSnapshotResponse")
- }
- return nil
- }
- func (c *client) ListSnapshots(t *hrpc.ListSnapshots) ([]*pb.SnapshotDescription, error) {
- pbmsg, err := c.SendRPC(t)
- if err != nil {
- return nil, err
- }
- r, ok := pbmsg.(*pb.GetCompletedSnapshotsResponse)
- if !ok {
- return nil, errors.New("sendPRC returned not a GetCompletedSnapshotsResponse")
- }
- return r.GetSnapshots(), nil
- }
- func (c *client) RestoreSnapshot(t *hrpc.Snapshot) error {
- rt := hrpc.NewRestoreSnapshot(t)
- pbmsg, err := c.SendRPC(rt)
- if err != nil {
- return err
- }
- _, ok := pbmsg.(*pb.RestoreSnapshotResponse)
- if !ok {
- return errors.New("sendPRC returned not a RestoreSnapshotResponse")
- }
- return nil
- }
- func (c *client) ListTableNames(t *hrpc.ListTableNames) ([]*pb.TableName, error) {
- pbmsg, err := c.SendRPC(t)
- if err != nil {
- return nil, err
- }
- res, ok := pbmsg.(*pb.GetTableNamesResponse)
- if !ok {
- return nil, errors.New("sendPRC returned not a GetTableNamesResponse")
- }
- return res.GetTableNames(), nil
- }
|