123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606 |
- package gohbase
- import (
- "bytes"
- "context"
- "errors"
- "fmt"
- "io"
- "strconv"
- "time"
- "github.com/golang/protobuf/proto"
- log "github.com/sirupsen/logrus"
- "github.com/tsuna/gohbase/hrpc"
- "github.com/tsuna/gohbase/region"
- "github.com/tsuna/gohbase/zk"
- )
- var (
-
- metaTableName = []byte("hbase:meta")
- infoFamily = map[string][]string{
- "info": nil,
- }
-
-
- TableNotFound = errors.New("table not found")
-
-
-
- ErrCannotFindRegion = errors.New("cannot find region for the rpc")
-
- ErrClientClosed = errors.New("client is closed")
- )
- const (
-
- maxFindRegionTries = 10
- backoffStart = 16 * time.Millisecond
- )
- func (c *client) getRegionForRpc(rpc hrpc.Call) (hrpc.RegionInfo, error) {
- for i := 0; i < maxFindRegionTries; i++ {
-
- if reg := c.getRegionFromCache(rpc.Table(), rpc.Key()); reg != nil {
- return reg, nil
- }
- if reg, err := c.findRegion(rpc.Context(), rpc.Table(), rpc.Key()); reg != nil {
- return reg, nil
- } else if err != nil {
- return nil, err
- }
- }
- return nil, ErrCannotFindRegion
- }
- func (c *client) SendRPC(rpc hrpc.Call) (proto.Message, error) {
- reg, err := c.getRegionForRpc(rpc)
- if err != nil {
- return nil, err
- }
- backoff := backoffStart
- for {
- msg, err := c.sendRPCToRegion(rpc, reg)
- switch err.(type) {
- case region.RetryableError:
- backoff, err = sleepAndIncreaseBackoff(rpc.Context(), backoff)
- if err != nil {
- return msg, err
- }
- case region.ServerError, region.NotServingRegionError:
- if ch := reg.AvailabilityChan(); ch != nil {
-
-
- select {
- case <-rpc.Context().Done():
- return nil, rpc.Context().Err()
- case <-c.done:
- return nil, ErrClientClosed
- case <-ch:
- }
- }
- if reg.Context().Err() != nil {
-
-
- reg, err = c.getRegionForRpc(rpc)
- if err != nil {
- return nil, err
- }
- }
- default:
- return msg, err
- }
- }
- }
- func sendBlocking(rc hrpc.RegionClient, rpc hrpc.Call) (hrpc.RPCResult, error) {
- rc.QueueRPC(rpc)
- var res hrpc.RPCResult
-
- select {
- case res = <-rpc.ResultChan():
- return res, nil
- case <-rpc.Context().Done():
- return res, rpc.Context().Err()
- }
- }
- func (c *client) sendRPCToRegion(rpc hrpc.Call, reg hrpc.RegionInfo) (proto.Message, error) {
- if reg.IsUnavailable() {
- return nil, region.NotServingRegionError{}
- }
- rpc.SetRegion(reg)
-
- client := reg.Client()
- if client == nil {
-
-
- if reg.MarkUnavailable() {
-
-
- go c.reestablishRegion(reg)
- }
- return nil, region.NotServingRegionError{}
- }
- res, err := sendBlocking(client, rpc)
- if err != nil {
- return nil, err
- }
-
- switch res.Error.(type) {
- case region.NotServingRegionError:
-
-
-
-
-
- if reg.MarkUnavailable() {
- go c.reestablishRegion(reg)
- }
- case region.ServerError:
-
-
- if reg == c.adminRegionInfo {
-
-
-
- if reg.MarkUnavailable() {
- go c.reestablishRegion(reg)
- }
- } else {
- c.clientDown(client)
- }
- }
- return res.Msg, res.Error
- }
- func (c *client) clientDown(client hrpc.RegionClient) {
- downregions := c.clients.clientDown(client)
- for downreg := range downregions {
- if downreg.MarkUnavailable() {
- downreg.SetClient(nil)
- go c.reestablishRegion(downreg)
- }
- }
- }
- func (c *client) lookupRegion(ctx context.Context,
- table, key []byte) (hrpc.RegionInfo, string, error) {
- var reg hrpc.RegionInfo
- var addr string
- var err error
- backoff := backoffStart
- for {
-
- lookupCtx, cancel := context.WithTimeout(ctx, c.regionLookupTimeout)
- if c.clientType == region.MasterClient {
- log.WithField("resource", zk.Master).Debug("looking up master")
- addr, err = c.zkLookup(lookupCtx, zk.Master)
- cancel()
- reg = c.adminRegionInfo
- } else if bytes.Compare(table, metaTableName) == 0 {
- log.WithField("resource", zk.Meta).Debug("looking up region server of hbase:meta")
- addr, err = c.zkLookup(lookupCtx, zk.Meta)
- cancel()
- reg = c.metaRegionInfo
- } else {
- log.WithFields(log.Fields{
- "table": strconv.Quote(string(table)),
- "key": strconv.Quote(string(key)),
- }).Debug("looking up region")
- reg, addr, err = c.metaLookup(lookupCtx, table, key)
- cancel()
- if err == TableNotFound {
- log.WithFields(log.Fields{
- "table": strconv.Quote(string(table)),
- "key": strconv.Quote(string(key)),
- "err": err,
- }).Debug("hbase:meta does not know about this table/key")
- return nil, "", err
- } else if err == ErrClientClosed {
- return nil, "", err
- }
- }
- if err == nil {
- log.WithFields(log.Fields{
- "table": strconv.Quote(string(table)),
- "key": strconv.Quote(string(key)),
- "region": reg,
- "addr": addr,
- }).Debug("looked up a region")
- return reg, addr, nil
- }
- log.WithFields(log.Fields{
- "table": strconv.Quote(string(table)),
- "key": strconv.Quote(string(key)),
- "backoff": backoff,
- "err": err,
- }).Error("failed looking up region")
-
- backoff, err = sleepAndIncreaseBackoff(ctx, backoff)
- if err != nil {
- return nil, "", err
- }
- }
- }
- func (c *client) findRegion(ctx context.Context, table, key []byte) (hrpc.RegionInfo, error) {
-
-
- reg, addr, err := c.lookupRegion(ctx, table, key)
- if err != nil {
- return nil, err
- }
-
-
- reg.MarkUnavailable()
- if reg != c.metaRegionInfo && reg != c.adminRegionInfo {
-
-
- overlaps, replaced := c.regions.put(reg)
- if !replaced {
-
- return nil, nil
- }
-
- for _, r := range overlaps {
- c.clients.del(r)
- }
- }
-
- go c.establishRegion(reg, addr)
-
-
- return reg, nil
- }
- func (c *client) getRegionFromCache(table, key []byte) hrpc.RegionInfo {
- if c.clientType == region.MasterClient {
- return c.adminRegionInfo
- } else if bytes.Equal(table, metaTableName) {
- return c.metaRegionInfo
- }
- regionName := createRegionSearchKey(table, key)
- _, region := c.regions.get(regionName)
- if region == nil {
- return nil
- }
-
- if !bytes.Equal(fullyQualifiedTable(region), table) {
-
- return nil
- }
- if len(region.StopKey()) != 0 &&
-
-
- bytes.Compare(key, region.StopKey()) >= 0 {
- return nil
- }
- return region
- }
- func createRegionSearchKey(table, key []byte) []byte {
- metaKey := make([]byte, 0, len(table)+len(key)+3)
- metaKey = append(metaKey, table...)
- metaKey = append(metaKey, ',')
- metaKey = append(metaKey, key...)
- metaKey = append(metaKey, ',')
-
-
-
- metaKey = append(metaKey, ':')
- return metaKey
- }
- func (c *client) metaLookup(ctx context.Context,
- table, key []byte) (hrpc.RegionInfo, string, error) {
- metaKey := createRegionSearchKey(table, key)
- rpc, err := hrpc.NewScanRange(ctx, metaTableName, metaKey, table,
- hrpc.Families(infoFamily),
- hrpc.Reversed(),
- hrpc.CloseScanner(),
- hrpc.NumberOfRows(1))
- if err != nil {
- return nil, "", err
- }
- scanner := c.Scan(rpc)
- resp, err := scanner.Next()
- if err == io.EOF {
- return nil, "", TableNotFound
- }
- if err != nil {
- return nil, "", err
- }
- reg, addr, err := region.ParseRegionInfo(resp)
- if err != nil {
- return nil, "", err
- }
- if !bytes.Equal(table, fullyQualifiedTable(reg)) {
-
- return nil, "", fmt.Errorf("wtf: meta returned an entry for the wrong table!"+
- " Looked up table=%q key=%q got region=%s", table, key, reg)
- } else if len(reg.StopKey()) != 0 &&
- bytes.Compare(key, reg.StopKey()) >= 0 {
-
- return nil, "", fmt.Errorf("wtf: meta returned an entry for the wrong region!"+
- " Looked up table=%q key=%q got region=%s", table, key, reg)
- }
- return reg, addr, nil
- }
- func fullyQualifiedTable(reg hrpc.RegionInfo) []byte {
- namespace := reg.Namespace()
- table := reg.Table()
- if namespace == nil {
- return table
- }
-
- fqTable := make([]byte, 0, len(namespace)+1+len(table))
- fqTable = append(fqTable, namespace...)
- fqTable = append(fqTable, byte(':'))
- fqTable = append(fqTable, table...)
- return fqTable
- }
- func (c *client) reestablishRegion(reg hrpc.RegionInfo) {
- select {
- case <-c.done:
- return
- default:
- }
- log.WithField("region", reg).Debug("reestablishing region")
- c.establishRegion(reg, "")
- }
- func probeKey(reg hrpc.RegionInfo) []byte {
-
- probe := make([]byte, len(reg.StartKey())+17)
- copy(probe, reg.StartKey())
- return probe
- }
- func isRegionEstablished(rc hrpc.RegionClient, reg hrpc.RegionInfo) error {
- probe, err := hrpc.NewGet(context.Background(), fullyQualifiedTable(reg), probeKey(reg),
- hrpc.SkipBatch())
- if err != nil {
- panic(fmt.Sprintf("should not happen: %s", err))
- }
- probe.ExistsOnly()
- probe.SetRegion(reg)
- res, err := sendBlocking(rc, probe)
- if err != nil {
- panic(fmt.Sprintf("should not happen: %s", err))
- }
- switch res.Error.(type) {
- case region.ServerError, region.NotServingRegionError, region.RetryableError:
- return res.Error
- default:
- return nil
- }
- }
- func (c *client) establishRegion(reg hrpc.RegionInfo, addr string) {
- var backoff time.Duration
- var err error
- for {
- backoff, err = sleepAndIncreaseBackoff(reg.Context(), backoff)
- if err != nil {
-
- reg.MarkAvailable()
- return
- }
- if addr == "" {
-
- originalReg := reg
-
- reg, addr, err = c.lookupRegion(originalReg.Context(),
- fullyQualifiedTable(originalReg), originalReg.StartKey())
- if err == TableNotFound {
-
- c.regions.del(originalReg)
- c.clients.del(originalReg)
- originalReg.MarkAvailable()
- log.WithFields(log.Fields{
- "region": originalReg.String(),
- "err": err,
- "backoff": backoff,
- }).Info("region does not exist anymore")
- return
- } else if originalReg.Context().Err() != nil {
-
- originalReg.MarkAvailable()
- log.WithFields(log.Fields{
- "region": originalReg.String(),
- "err": err,
- "backoff": backoff,
- }).Info("region became dead while establishing client for it")
- return
- } else if err == ErrClientClosed {
-
- return
- } else if err != nil {
- log.WithFields(log.Fields{
- "region": originalReg.String(),
- "err": err,
- "backoff": backoff,
- }).Fatal("unknown error occured when looking up region")
- }
- if !bytes.Equal(reg.Name(), originalReg.Name()) {
-
-
- reg.MarkUnavailable()
- overlaps, replaced := c.regions.put(reg)
- if !replaced {
-
- reg.MarkAvailable()
- originalReg.MarkAvailable()
- return
- }
-
- for _, r := range overlaps {
- c.clients.del(r)
- }
-
-
- originalReg.MarkAvailable()
- } else {
-
- reg = originalReg
- }
- }
- var client hrpc.RegionClient
- if reg == c.adminRegionInfo {
-
-
-
- client = c.newRegionClientFn(addr, c.clientType, c.rpcQueueSize, c.flushInterval,
- c.effectiveUser, c.regionReadTimeout)
- } else {
- client = c.clients.put(addr, reg, func() hrpc.RegionClient {
- return c.newRegionClientFn(addr, c.clientType, c.rpcQueueSize, c.flushInterval,
- c.effectiveUser, c.regionReadTimeout)
- })
- }
-
-
-
- dialCtx, cancel := context.WithTimeout(reg.Context(), c.regionLookupTimeout)
- err = client.Dial(dialCtx)
- cancel()
- if err == nil {
- if reg == c.adminRegionInfo {
- reg.SetClient(client)
- reg.MarkAvailable()
- return
- }
- if err = isRegionEstablished(client, reg); err == nil {
-
-
- reg.SetClient(client)
- reg.MarkAvailable()
- return
- } else if _, ok := err.(region.ServerError); ok {
-
- c.clientDown(client)
- }
- } else if err == context.Canceled {
-
- reg.MarkAvailable()
- return
- } else {
-
-
-
- c.clientDown(client)
- }
- log.WithFields(log.Fields{
- "region": reg,
- "backoff": backoff,
- "err": err,
- }).Debug("region was not established, retrying")
-
-
- addr = ""
- }
- }
- func sleepAndIncreaseBackoff(ctx context.Context, backoff time.Duration) (time.Duration, error) {
- if backoff == 0 {
- return backoffStart, nil
- }
- select {
- case <-time.After(backoff):
- case <-ctx.Done():
- return 0, ctx.Err()
- }
-
- if backoff < 5000*time.Millisecond {
- return backoff * 2, nil
- }
- return backoff + 5000*time.Millisecond, nil
- }
- type zkResult struct {
- addr string
- err error
- }
- func (c *client) zkLookup(ctx context.Context, resource zk.ResourceName) (string, error) {
-
-
-
- reschan := make(chan zkResult, 1)
- go func() {
- addr, err := c.zkClient.LocateResource(resource.Prepend(c.zkRoot))
-
- reschan <- zkResult{addr, err}
- }()
- select {
- case res := <-reschan:
- return res.addr, res.err
- case <-ctx.Done():
- return "", ctx.Err()
- }
- }
|