rpc.go 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606
  1. // Copyright (C) 2016 The GoHBase Authors. All rights reserved.
  2. // This file is part of GoHBase.
  3. // Use of this source code is governed by the Apache License 2.0
  4. // that can be found in the COPYING file.
  5. package gohbase
  6. import (
  7. "bytes"
  8. "context"
  9. "errors"
  10. "fmt"
  11. "io"
  12. "strconv"
  13. "time"
  14. "github.com/golang/protobuf/proto"
  15. log "github.com/sirupsen/logrus"
  16. "github.com/tsuna/gohbase/hrpc"
  17. "github.com/tsuna/gohbase/region"
  18. "github.com/tsuna/gohbase/zk"
  19. )
  20. // Constants
  21. var (
  22. // Name of the meta region.
  23. metaTableName = []byte("hbase:meta")
  24. infoFamily = map[string][]string{
  25. "info": nil,
  26. }
  27. // TableNotFound is returned when attempting to access a table that
  28. // doesn't exist on this cluster.
  29. TableNotFound = errors.New("table not found")
  30. // ErrCannotFindRegion is returned when it took too many tries to find a
  31. // region for the request. It's likely that hbase:meta has overlaps or some other
  32. // inconsistency.
  33. ErrCannotFindRegion = errors.New("cannot find region for the rpc")
  34. // ErrClientClosed is returned when the gohbase client has been closed
  35. ErrClientClosed = errors.New("client is closed")
  36. )
  37. const (
  38. // maxFindRegionTries is the maximum number of times to try to send an RPC
  39. maxFindRegionTries = 10
  40. backoffStart = 16 * time.Millisecond
  41. )
  42. func (c *client) getRegionForRpc(rpc hrpc.Call) (hrpc.RegionInfo, error) {
  43. for i := 0; i < maxFindRegionTries; i++ {
  44. // Check the cache for a region that can handle this request
  45. if reg := c.getRegionFromCache(rpc.Table(), rpc.Key()); reg != nil {
  46. return reg, nil
  47. }
  48. if reg, err := c.findRegion(rpc.Context(), rpc.Table(), rpc.Key()); reg != nil {
  49. return reg, nil
  50. } else if err != nil {
  51. return nil, err
  52. }
  53. }
  54. return nil, ErrCannotFindRegion
  55. }
  56. func (c *client) SendRPC(rpc hrpc.Call) (proto.Message, error) {
  57. reg, err := c.getRegionForRpc(rpc)
  58. if err != nil {
  59. return nil, err
  60. }
  61. backoff := backoffStart
  62. for {
  63. msg, err := c.sendRPCToRegion(rpc, reg)
  64. switch err.(type) {
  65. case region.RetryableError:
  66. backoff, err = sleepAndIncreaseBackoff(rpc.Context(), backoff)
  67. if err != nil {
  68. return msg, err
  69. }
  70. case region.ServerError, region.NotServingRegionError:
  71. if ch := reg.AvailabilityChan(); ch != nil {
  72. // The region is unavailable. Wait for it to become available,
  73. // a new region or for the deadline to be exceeded.
  74. select {
  75. case <-rpc.Context().Done():
  76. return nil, rpc.Context().Err()
  77. case <-c.done:
  78. return nil, ErrClientClosed
  79. case <-ch:
  80. }
  81. }
  82. if reg.Context().Err() != nil {
  83. // region is dead because it was split or merged,
  84. // lookup a new one and retry
  85. reg, err = c.getRegionForRpc(rpc)
  86. if err != nil {
  87. return nil, err
  88. }
  89. }
  90. default:
  91. return msg, err
  92. }
  93. }
  94. }
  95. func sendBlocking(rc hrpc.RegionClient, rpc hrpc.Call) (hrpc.RPCResult, error) {
  96. rc.QueueRPC(rpc)
  97. var res hrpc.RPCResult
  98. // Wait for the response
  99. select {
  100. case res = <-rpc.ResultChan():
  101. return res, nil
  102. case <-rpc.Context().Done():
  103. return res, rpc.Context().Err()
  104. }
  105. }
  106. func (c *client) sendRPCToRegion(rpc hrpc.Call, reg hrpc.RegionInfo) (proto.Message, error) {
  107. if reg.IsUnavailable() {
  108. return nil, region.NotServingRegionError{}
  109. }
  110. rpc.SetRegion(reg)
  111. // Queue the RPC to be sent to the region
  112. client := reg.Client()
  113. if client == nil {
  114. // There was an error queueing the RPC.
  115. // Mark the region as unavailable.
  116. if reg.MarkUnavailable() {
  117. // If this was the first goroutine to mark the region as
  118. // unavailable, start a goroutine to reestablish a connection
  119. go c.reestablishRegion(reg)
  120. }
  121. return nil, region.NotServingRegionError{}
  122. }
  123. res, err := sendBlocking(client, rpc)
  124. if err != nil {
  125. return nil, err
  126. }
  127. // Check for errors
  128. switch res.Error.(type) {
  129. case region.NotServingRegionError:
  130. // There's an error specific to this region, but
  131. // our region client is fine. Mark this region as
  132. // unavailable (as opposed to all regions sharing
  133. // the client), and start a goroutine to reestablish
  134. // it.
  135. if reg.MarkUnavailable() {
  136. go c.reestablishRegion(reg)
  137. }
  138. case region.ServerError:
  139. // If it was an unrecoverable error, the region client is
  140. // considered dead.
  141. if reg == c.adminRegionInfo {
  142. // If this is the admin client, mark the region
  143. // as unavailable and start up a goroutine to
  144. // reconnect if it wasn't already marked as such.
  145. if reg.MarkUnavailable() {
  146. go c.reestablishRegion(reg)
  147. }
  148. } else {
  149. c.clientDown(client)
  150. }
  151. }
  152. return res.Msg, res.Error
  153. }
  154. // clientDown removes client from cache and marks
  155. // all the regions sharing this region's
  156. // client as unavailable, and start a goroutine
  157. // to reconnect for each of them.
  158. func (c *client) clientDown(client hrpc.RegionClient) {
  159. downregions := c.clients.clientDown(client)
  160. for downreg := range downregions {
  161. if downreg.MarkUnavailable() {
  162. downreg.SetClient(nil)
  163. go c.reestablishRegion(downreg)
  164. }
  165. }
  166. }
  167. func (c *client) lookupRegion(ctx context.Context,
  168. table, key []byte) (hrpc.RegionInfo, string, error) {
  169. var reg hrpc.RegionInfo
  170. var addr string
  171. var err error
  172. backoff := backoffStart
  173. for {
  174. // If it takes longer than regionLookupTimeout, fail so that we can sleep
  175. lookupCtx, cancel := context.WithTimeout(ctx, c.regionLookupTimeout)
  176. if c.clientType == region.MasterClient {
  177. log.WithField("resource", zk.Master).Debug("looking up master")
  178. addr, err = c.zkLookup(lookupCtx, zk.Master)
  179. cancel()
  180. reg = c.adminRegionInfo
  181. } else if bytes.Compare(table, metaTableName) == 0 {
  182. log.WithField("resource", zk.Meta).Debug("looking up region server of hbase:meta")
  183. addr, err = c.zkLookup(lookupCtx, zk.Meta)
  184. cancel()
  185. reg = c.metaRegionInfo
  186. } else {
  187. log.WithFields(log.Fields{
  188. "table": strconv.Quote(string(table)),
  189. "key": strconv.Quote(string(key)),
  190. }).Debug("looking up region")
  191. reg, addr, err = c.metaLookup(lookupCtx, table, key)
  192. cancel()
  193. if err == TableNotFound {
  194. log.WithFields(log.Fields{
  195. "table": strconv.Quote(string(table)),
  196. "key": strconv.Quote(string(key)),
  197. "err": err,
  198. }).Debug("hbase:meta does not know about this table/key")
  199. return nil, "", err
  200. } else if err == ErrClientClosed {
  201. return nil, "", err
  202. }
  203. }
  204. if err == nil {
  205. log.WithFields(log.Fields{
  206. "table": strconv.Quote(string(table)),
  207. "key": strconv.Quote(string(key)),
  208. "region": reg,
  209. "addr": addr,
  210. }).Debug("looked up a region")
  211. return reg, addr, nil
  212. }
  213. log.WithFields(log.Fields{
  214. "table": strconv.Quote(string(table)),
  215. "key": strconv.Quote(string(key)),
  216. "backoff": backoff,
  217. "err": err,
  218. }).Error("failed looking up region")
  219. // This will be hit if there was an error locating the region
  220. backoff, err = sleepAndIncreaseBackoff(ctx, backoff)
  221. if err != nil {
  222. return nil, "", err
  223. }
  224. }
  225. }
  226. func (c *client) findRegion(ctx context.Context, table, key []byte) (hrpc.RegionInfo, error) {
  227. // The region was not in the cache, it
  228. // must be looked up in the meta table
  229. reg, addr, err := c.lookupRegion(ctx, table, key)
  230. if err != nil {
  231. return nil, err
  232. }
  233. // We are the ones that looked up the region, so we need to
  234. // mark in unavailable and find a client for it.
  235. reg.MarkUnavailable()
  236. if reg != c.metaRegionInfo && reg != c.adminRegionInfo {
  237. // Check that the region wasn't added to
  238. // the cache while we were looking it up.
  239. overlaps, replaced := c.regions.put(reg)
  240. if !replaced {
  241. // the same or younger regions are already in cache, retry looking up in cache
  242. return nil, nil
  243. }
  244. // otherwise, new region in cache, delete overlaps from client's cache
  245. for _, r := range overlaps {
  246. c.clients.del(r)
  247. }
  248. }
  249. // Start a goroutine to connect to the region
  250. go c.establishRegion(reg, addr)
  251. // Wait for the new region to become
  252. // available, and then send the RPC
  253. return reg, nil
  254. }
  255. // Searches in the regions cache for the region hosting the given row.
  256. func (c *client) getRegionFromCache(table, key []byte) hrpc.RegionInfo {
  257. if c.clientType == region.MasterClient {
  258. return c.adminRegionInfo
  259. } else if bytes.Equal(table, metaTableName) {
  260. return c.metaRegionInfo
  261. }
  262. regionName := createRegionSearchKey(table, key)
  263. _, region := c.regions.get(regionName)
  264. if region == nil {
  265. return nil
  266. }
  267. // make sure the returned region is for the same table
  268. if !bytes.Equal(fullyQualifiedTable(region), table) {
  269. // not the same table, can happen if we got the last region
  270. return nil
  271. }
  272. if len(region.StopKey()) != 0 &&
  273. // If the stop key is an empty byte array, it means this region is the
  274. // last region for this table and this key ought to be in that region.
  275. bytes.Compare(key, region.StopKey()) >= 0 {
  276. return nil
  277. }
  278. return region
  279. }
  280. // Creates the META key to search for in order to locate the given key.
  281. func createRegionSearchKey(table, key []byte) []byte {
  282. metaKey := make([]byte, 0, len(table)+len(key)+3)
  283. metaKey = append(metaKey, table...)
  284. metaKey = append(metaKey, ',')
  285. metaKey = append(metaKey, key...)
  286. metaKey = append(metaKey, ',')
  287. // ':' is the first byte greater than '9'. We always want to find the
  288. // entry with the greatest timestamp, so by looking right before ':'
  289. // we'll find it.
  290. metaKey = append(metaKey, ':')
  291. return metaKey
  292. }
  293. // metaLookup checks meta table for the region in which the given row key for the given table is.
  294. func (c *client) metaLookup(ctx context.Context,
  295. table, key []byte) (hrpc.RegionInfo, string, error) {
  296. metaKey := createRegionSearchKey(table, key)
  297. rpc, err := hrpc.NewScanRange(ctx, metaTableName, metaKey, table,
  298. hrpc.Families(infoFamily),
  299. hrpc.Reversed(),
  300. hrpc.CloseScanner(),
  301. hrpc.NumberOfRows(1))
  302. if err != nil {
  303. return nil, "", err
  304. }
  305. scanner := c.Scan(rpc)
  306. resp, err := scanner.Next()
  307. if err == io.EOF {
  308. return nil, "", TableNotFound
  309. }
  310. if err != nil {
  311. return nil, "", err
  312. }
  313. reg, addr, err := region.ParseRegionInfo(resp)
  314. if err != nil {
  315. return nil, "", err
  316. }
  317. if !bytes.Equal(table, fullyQualifiedTable(reg)) {
  318. // This would indicate a bug in HBase.
  319. return nil, "", fmt.Errorf("wtf: meta returned an entry for the wrong table!"+
  320. " Looked up table=%q key=%q got region=%s", table, key, reg)
  321. } else if len(reg.StopKey()) != 0 &&
  322. bytes.Compare(key, reg.StopKey()) >= 0 {
  323. // This would indicate a hole in the meta table.
  324. return nil, "", fmt.Errorf("wtf: meta returned an entry for the wrong region!"+
  325. " Looked up table=%q key=%q got region=%s", table, key, reg)
  326. }
  327. return reg, addr, nil
  328. }
  329. func fullyQualifiedTable(reg hrpc.RegionInfo) []byte {
  330. namespace := reg.Namespace()
  331. table := reg.Table()
  332. if namespace == nil {
  333. return table
  334. }
  335. // non-default namespace table
  336. fqTable := make([]byte, 0, len(namespace)+1+len(table))
  337. fqTable = append(fqTable, namespace...)
  338. fqTable = append(fqTable, byte(':'))
  339. fqTable = append(fqTable, table...)
  340. return fqTable
  341. }
  342. func (c *client) reestablishRegion(reg hrpc.RegionInfo) {
  343. select {
  344. case <-c.done:
  345. return
  346. default:
  347. }
  348. log.WithField("region", reg).Debug("reestablishing region")
  349. c.establishRegion(reg, "")
  350. }
  351. // probeKey returns a key in region that is unlikely to have data at it
  352. // in order to test if the region is online. This prevents the Get request
  353. // to actually fetch the data from the storage which consumes resources
  354. // of the region server
  355. func probeKey(reg hrpc.RegionInfo) []byte {
  356. // now we create a probe key: reg.StartKey() + 17 zeros
  357. probe := make([]byte, len(reg.StartKey())+17)
  358. copy(probe, reg.StartKey())
  359. return probe
  360. }
  361. // isRegionEstablished checks whether regionserver accepts rpcs for the region.
  362. // Returns the cause if not established.
  363. func isRegionEstablished(rc hrpc.RegionClient, reg hrpc.RegionInfo) error {
  364. probe, err := hrpc.NewGet(context.Background(), fullyQualifiedTable(reg), probeKey(reg),
  365. hrpc.SkipBatch())
  366. if err != nil {
  367. panic(fmt.Sprintf("should not happen: %s", err))
  368. }
  369. probe.ExistsOnly()
  370. probe.SetRegion(reg)
  371. res, err := sendBlocking(rc, probe)
  372. if err != nil {
  373. panic(fmt.Sprintf("should not happen: %s", err))
  374. }
  375. switch res.Error.(type) {
  376. case region.ServerError, region.NotServingRegionError, region.RetryableError:
  377. return res.Error
  378. default:
  379. return nil
  380. }
  381. }
  382. func (c *client) establishRegion(reg hrpc.RegionInfo, addr string) {
  383. var backoff time.Duration
  384. var err error
  385. for {
  386. backoff, err = sleepAndIncreaseBackoff(reg.Context(), backoff)
  387. if err != nil {
  388. // region is dead
  389. reg.MarkAvailable()
  390. return
  391. }
  392. if addr == "" {
  393. // need to look up region and address of the regionserver
  394. originalReg := reg
  395. // lookup region forever until we get it or we learn that it doesn't exist
  396. reg, addr, err = c.lookupRegion(originalReg.Context(),
  397. fullyQualifiedTable(originalReg), originalReg.StartKey())
  398. if err == TableNotFound {
  399. // region doesn't exist, delete it from caches
  400. c.regions.del(originalReg)
  401. c.clients.del(originalReg)
  402. originalReg.MarkAvailable()
  403. log.WithFields(log.Fields{
  404. "region": originalReg.String(),
  405. "err": err,
  406. "backoff": backoff,
  407. }).Info("region does not exist anymore")
  408. return
  409. } else if originalReg.Context().Err() != nil {
  410. // region is dead
  411. originalReg.MarkAvailable()
  412. log.WithFields(log.Fields{
  413. "region": originalReg.String(),
  414. "err": err,
  415. "backoff": backoff,
  416. }).Info("region became dead while establishing client for it")
  417. return
  418. } else if err == ErrClientClosed {
  419. // client has been closed
  420. return
  421. } else if err != nil {
  422. log.WithFields(log.Fields{
  423. "region": originalReg.String(),
  424. "err": err,
  425. "backoff": backoff,
  426. }).Fatal("unknown error occured when looking up region")
  427. }
  428. if !bytes.Equal(reg.Name(), originalReg.Name()) {
  429. // put new region and remove overlapping ones.
  430. // Should remove the original region as well.
  431. reg.MarkUnavailable()
  432. overlaps, replaced := c.regions.put(reg)
  433. if !replaced {
  434. // a region that is the same or younger is already in cache
  435. reg.MarkAvailable()
  436. originalReg.MarkAvailable()
  437. return
  438. }
  439. // otherwise delete the overlapped regions in cache
  440. for _, r := range overlaps {
  441. c.clients.del(r)
  442. }
  443. // let rpcs know that they can retry and either get the newly
  444. // added region from cache or lookup the one they need
  445. originalReg.MarkAvailable()
  446. } else {
  447. // same region, discard the looked up one
  448. reg = originalReg
  449. }
  450. }
  451. var client hrpc.RegionClient
  452. if reg == c.adminRegionInfo {
  453. // admin region is used for talking to master, so it only has one connection to
  454. // master that we don't add to the cache
  455. // TODO: consider combining this case with the regular regionserver path
  456. client = c.newRegionClientFn(addr, c.clientType, c.rpcQueueSize, c.flushInterval,
  457. c.effectiveUser, c.regionReadTimeout)
  458. } else {
  459. client = c.clients.put(addr, reg, func() hrpc.RegionClient {
  460. return c.newRegionClientFn(addr, c.clientType, c.rpcQueueSize, c.flushInterval,
  461. c.effectiveUser, c.regionReadTimeout)
  462. })
  463. }
  464. // connect to the region's regionserver.
  465. // only the first caller to Dial gets to actually connect, other concurrent calls
  466. // will block until connected or an error.
  467. dialCtx, cancel := context.WithTimeout(reg.Context(), c.regionLookupTimeout)
  468. err = client.Dial(dialCtx)
  469. cancel()
  470. if err == nil {
  471. if reg == c.adminRegionInfo {
  472. reg.SetClient(client)
  473. reg.MarkAvailable()
  474. return
  475. }
  476. if err = isRegionEstablished(client, reg); err == nil {
  477. // set region client so that as soon as we mark it available,
  478. // concurrent readers are able to find the client
  479. reg.SetClient(client)
  480. reg.MarkAvailable()
  481. return
  482. } else if _, ok := err.(region.ServerError); ok {
  483. // the client we got died
  484. c.clientDown(client)
  485. }
  486. } else if err == context.Canceled {
  487. // region is dead
  488. reg.MarkAvailable()
  489. return
  490. } else {
  491. // otherwise Dial failed, purge the client and retry.
  492. // note that it's safer to reestablish all regions for this client as well
  493. // because they could have ended up setteling for the same client.
  494. c.clientDown(client)
  495. }
  496. log.WithFields(log.Fields{
  497. "region": reg,
  498. "backoff": backoff,
  499. "err": err,
  500. }).Debug("region was not established, retrying")
  501. // reset address because we weren't able to connect to it
  502. // or regionserver says it's still offline, should look up again
  503. addr = ""
  504. }
  505. }
  506. func sleepAndIncreaseBackoff(ctx context.Context, backoff time.Duration) (time.Duration, error) {
  507. if backoff == 0 {
  508. return backoffStart, nil
  509. }
  510. select {
  511. case <-time.After(backoff):
  512. case <-ctx.Done():
  513. return 0, ctx.Err()
  514. }
  515. // TODO: Revisit how we back off here.
  516. if backoff < 5000*time.Millisecond {
  517. return backoff * 2, nil
  518. }
  519. return backoff + 5000*time.Millisecond, nil
  520. }
  521. // zkResult contains the result of a ZooKeeper lookup (when we're looking for
  522. // the meta region or the HMaster).
  523. type zkResult struct {
  524. addr string
  525. err error
  526. }
  527. // zkLookup asynchronously looks up the meta region or HMaster in ZooKeeper.
  528. func (c *client) zkLookup(ctx context.Context, resource zk.ResourceName) (string, error) {
  529. // We make this a buffered channel so that if we stop waiting due to a
  530. // timeout, we won't block the zkLookupSync() that we start in a
  531. // separate goroutine.
  532. reschan := make(chan zkResult, 1)
  533. go func() {
  534. addr, err := c.zkClient.LocateResource(resource.Prepend(c.zkRoot))
  535. // This is guaranteed to never block as the channel is always buffered.
  536. reschan <- zkResult{addr, err}
  537. }()
  538. select {
  539. case res := <-reschan:
  540. return res.addr, res.err
  541. case <-ctx.Done():
  542. return "", ctx.Err()
  543. }
  544. }