client.go 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602
  1. // Copyright (C) 2015 The GoHBase Authors. All rights reserved.
  2. // This file is part of GoHBase.
  3. // Use of this source code is governed by the Apache License 2.0
  4. // that can be found in the COPYING file.
  5. package region
  6. import (
  7. "encoding/binary"
  8. "errors"
  9. "fmt"
  10. "io"
  11. "net"
  12. "strings"
  13. "sync"
  14. "sync/atomic"
  15. "time"
  16. log "github.com/sirupsen/logrus"
  17. "github.com/golang/protobuf/proto"
  18. "github.com/tsuna/gohbase/hrpc"
  19. "github.com/tsuna/gohbase/pb"
  20. )
  21. // ClientType is a type alias to represent the type of this region client
  22. type ClientType string
  23. type canDeserializeCellBlocks interface {
  24. // DeserializeCellBlocks populates passed protobuf message with results
  25. // deserialized from the reader and returns number of bytes read or error.
  26. DeserializeCellBlocks(proto.Message, []byte) (uint32, error)
  27. }
  28. var (
  29. // ErrMissingCallID is used when HBase sends us a response message for a
  30. // request that we didn't send
  31. ErrMissingCallID = ServerError{errors.New("got a response with a nonsensical call ID")}
  32. // ErrClientClosed is returned to rpcs when Close() is called or when client
  33. // died because of failed send or receive
  34. ErrClientClosed = ServerError{errors.New("client is closed")}
  35. // If a Java exception listed here is returned by HBase, the client should
  36. // reestablish region and attempt to resend the RPC message, potentially via
  37. // a different region client.
  38. // The value of exception should be contained in the stack trace.
  39. javaRegionExceptions = map[string]string{
  40. "org.apache.hadoop.hbase.NotServingRegionException": "",
  41. "org.apache.hadoop.hbase.exceptions.RegionMovedException": "",
  42. "java.io.IOException": "Cannot append; log is closed",
  43. }
  44. // If a Java exception listed here is returned by HBase, the client should
  45. // backoff and resend the RPC message to the same region and region server
  46. // The value of exception should be contained in the stack trace.
  47. javaRetryableExceptions = map[string]string{
  48. "org.apache.hadoop.hbase.CallQueueTooBigException": "",
  49. "org.apache.hadoop.hbase.exceptions.RegionOpeningException": "",
  50. "org.apache.hadoop.hbase.ipc.ServerNotRunningYetException": "",
  51. "org.apache.hadoop.hbase.quotas.RpcThrottlingException": "",
  52. "org.apache.hadoop.hbase.RetryImmediatelyException": "",
  53. "org.apache.hadoop.hbase.RegionTooBusyException": "",
  54. }
  55. // javaServerExceptions is a map where all Java exceptions that signify
  56. // the RPC should be sent again are listed (as keys). If a Java exception
  57. // listed here is returned by HBase, the RegionClient will be closed and a new
  58. // one should be established.
  59. // The value of exception should be contained in the stack trace.
  60. javaServerExceptions = map[string]string{
  61. "org.apache.hadoop.hbase.regionserver.RegionServerAbortedException": "",
  62. "org.apache.hadoop.hbase.regionserver.RegionServerStoppedException": "",
  63. }
  64. )
  65. const (
  66. //DefaultLookupTimeout is the default region lookup timeout
  67. DefaultLookupTimeout = 30 * time.Second
  68. //DefaultReadTimeout is the default region read timeout
  69. DefaultReadTimeout = 30 * time.Second
  70. // RegionClient is a ClientType that means this will be a normal client
  71. RegionClient = ClientType("ClientService")
  72. // MasterClient is a ClientType that means this client will talk to the
  73. // master server
  74. MasterClient = ClientType("MasterService")
  75. )
  76. var bufferPool = sync.Pool{
  77. New: func() interface{} {
  78. var b []byte
  79. return b
  80. },
  81. }
  82. func newBuffer(size int) []byte {
  83. b := bufferPool.Get().([]byte)
  84. if cap(b) < size {
  85. doublecap := 2 * cap(b)
  86. if doublecap > size {
  87. return make([]byte, size, doublecap)
  88. }
  89. return make([]byte, size)
  90. }
  91. return b[:size]
  92. }
  93. func freeBuffer(b []byte) {
  94. bufferPool.Put(b[:0])
  95. }
  96. // ServerError is an error that this region.Client can't recover from.
  97. // The connection to the RegionServer has to be closed and all queued and
  98. // outstanding RPCs will be failed / retried.
  99. type ServerError struct {
  100. error
  101. }
  102. func formatErr(e interface{}, err error) string {
  103. if err == nil {
  104. return fmt.Sprintf("%T", e)
  105. }
  106. return fmt.Sprintf("%T: %s", e, err.Error())
  107. }
  108. func (e ServerError) Error() string {
  109. return formatErr(e, e.error)
  110. }
  111. // RetryableError is an error that indicates the RPC should be retried after backoff
  112. // because the error is transient (e.g. a region being momentarily unavailable).
  113. type RetryableError struct {
  114. error
  115. }
  116. func (e RetryableError) Error() string {
  117. return formatErr(e, e.error)
  118. }
  119. // NotServingRegionError is an error that indicates the client should
  120. // reestablish the region and retry the RPC potentially via a different client
  121. type NotServingRegionError struct {
  122. error
  123. }
  124. func (e NotServingRegionError) Error() string {
  125. return formatErr(e, e.error)
  126. }
  127. // client manages a connection to a RegionServer.
  128. type client struct {
  129. conn net.Conn
  130. // Address of the RegionServer.
  131. addr string
  132. ctype ClientType
  133. // dialOnce used for concurrent calls to Dial
  134. dialOnce sync.Once
  135. // failOnce used for concurrent calls to fail
  136. failOnce sync.Once
  137. rpcs chan hrpc.Call
  138. done chan struct{}
  139. // sent contains the mapping of sent call IDs to RPC calls, so that when
  140. // a response is received it can be tied to the correct RPC
  141. sentM sync.Mutex // protects sent
  142. sent map[uint32]hrpc.Call
  143. // inFlight is number of rpcs sent to regionserver awaiting response
  144. inFlightM sync.Mutex // protects inFlight and SetReadDeadline
  145. inFlight uint32
  146. id uint32
  147. rpcQueueSize int
  148. flushInterval time.Duration
  149. effectiveUser string
  150. // readTimeout is the maximum amount of time to wait for regionserver reply
  151. readTimeout time.Duration
  152. }
  153. // QueueRPC will add an rpc call to the queue for processing by the writer goroutine
  154. func (c *client) QueueRPC(rpc hrpc.Call) {
  155. if b, ok := rpc.(hrpc.Batchable); ok && c.rpcQueueSize > 1 && !b.SkipBatch() {
  156. // queue up the rpc
  157. select {
  158. case <-rpc.Context().Done():
  159. // rpc timed out before being processed
  160. case <-c.done:
  161. returnResult(rpc, nil, ErrClientClosed)
  162. case c.rpcs <- rpc:
  163. }
  164. } else {
  165. if err := c.trySend(rpc); err != nil {
  166. returnResult(rpc, nil, err)
  167. }
  168. }
  169. }
  170. // Close asks this region.Client to close its connection to the RegionServer.
  171. // All queued and outstanding RPCs, if any, will be failed as if a connection
  172. // error had happened.
  173. func (c *client) Close() {
  174. c.fail(ErrClientClosed)
  175. }
  176. // Addr returns address of the region server the client is connected to
  177. func (c *client) Addr() string {
  178. return c.addr
  179. }
  180. // String returns a string represintation of the current region client
  181. func (c *client) String() string {
  182. return fmt.Sprintf("RegionClient{Addr: %s}", c.addr)
  183. }
  184. func (c *client) inFlightUp() {
  185. c.inFlightM.Lock()
  186. c.inFlight++
  187. // we expect that at least the last request can be completed within readTimeout
  188. c.conn.SetReadDeadline(time.Now().Add(c.readTimeout))
  189. c.inFlightM.Unlock()
  190. }
  191. func (c *client) inFlightDown() {
  192. c.inFlightM.Lock()
  193. c.inFlight--
  194. // reset read timeout if we are not waiting for any responses
  195. // in order to prevent from closing this client if there are no request
  196. if c.inFlight == 0 {
  197. c.conn.SetReadDeadline(time.Time{})
  198. }
  199. c.inFlightM.Unlock()
  200. }
  201. func (c *client) fail(err error) {
  202. c.failOnce.Do(func() {
  203. if err != ErrClientClosed {
  204. log.WithFields(log.Fields{
  205. "client": c,
  206. "err": err,
  207. }).Error("error occured, closing region client")
  208. }
  209. // we don't close c.rpcs channel to make it block in select of QueueRPC
  210. // and avoid dealing with synchronization of closing it while someone
  211. // might be sending to it. Go's GC will take care of it.
  212. // tell goroutines to stop
  213. close(c.done)
  214. // close connection to the regionserver
  215. // to let it know that we can't receive anymore
  216. // and fail all the rpcs being sent
  217. if c.conn != nil {
  218. c.conn.Close()
  219. }
  220. c.failSentRPCs()
  221. })
  222. }
  223. func (c *client) failSentRPCs() {
  224. // channel is closed, clean up awaiting rpcs
  225. c.sentM.Lock()
  226. sent := c.sent
  227. c.sent = make(map[uint32]hrpc.Call)
  228. c.sentM.Unlock()
  229. log.WithFields(log.Fields{
  230. "client": c,
  231. "count": len(sent),
  232. }).Debug("failing awaiting RPCs")
  233. // send error to awaiting rpcs
  234. for _, rpc := range sent {
  235. returnResult(rpc, nil, ErrClientClosed)
  236. }
  237. }
  238. func (c *client) registerRPC(rpc hrpc.Call) uint32 {
  239. currID := atomic.AddUint32(&c.id, 1)
  240. c.sentM.Lock()
  241. c.sent[currID] = rpc
  242. c.sentM.Unlock()
  243. return currID
  244. }
  245. func (c *client) unregisterRPC(id uint32) hrpc.Call {
  246. c.sentM.Lock()
  247. rpc := c.sent[id]
  248. delete(c.sent, id)
  249. c.sentM.Unlock()
  250. return rpc
  251. }
  252. func (c *client) processRPCs() {
  253. // TODO: flush when the size is too large
  254. // TODO: if multi has only one call, send that call instead
  255. m := newMulti(c.rpcQueueSize)
  256. defer func() {
  257. m.returnResults(nil, ErrClientClosed)
  258. }()
  259. flush := func() {
  260. if log.GetLevel() == log.DebugLevel {
  261. log.WithFields(log.Fields{
  262. "len": m.len(),
  263. "addr": c.Addr(),
  264. }).Debug("flushing MultiRequest")
  265. }
  266. if err := c.trySend(m); err != nil {
  267. m.returnResults(nil, err)
  268. }
  269. m = newMulti(c.rpcQueueSize)
  270. }
  271. for {
  272. // first loop is to accomodate request heavy workload
  273. // it will batch as long as conccurent writers are sending
  274. // new rpcs or until multi is filled up
  275. for {
  276. select {
  277. case <-c.done:
  278. return
  279. case rpc := <-c.rpcs:
  280. // have things queued up, batch them
  281. if !m.add(rpc) {
  282. // can still put more rpcs into batch
  283. continue
  284. }
  285. default:
  286. // no more rpcs queued up
  287. }
  288. break
  289. }
  290. if l := m.len(); l == 0 {
  291. // wait for the next batch
  292. select {
  293. case <-c.done:
  294. return
  295. case rpc := <-c.rpcs:
  296. m.add(rpc)
  297. }
  298. continue
  299. } else if l == c.rpcQueueSize || c.flushInterval == 0 {
  300. // batch is full, flush
  301. flush()
  302. continue
  303. }
  304. // second loop is to accomodate less frequent callers
  305. // that would like to maximize their batches at the expense
  306. // of waiting for flushInteval
  307. timer := time.NewTimer(c.flushInterval)
  308. for {
  309. select {
  310. case <-c.done:
  311. return
  312. case <-timer.C:
  313. // time to flush
  314. case rpc := <-c.rpcs:
  315. if !m.add(rpc) {
  316. // can still put more rpcs into batch
  317. continue
  318. }
  319. // batch is full
  320. if !timer.Stop() {
  321. <-timer.C
  322. }
  323. }
  324. break
  325. }
  326. flush()
  327. }
  328. }
  329. func returnResult(c hrpc.Call, msg proto.Message, err error) {
  330. if m, ok := c.(*multi); ok {
  331. m.returnResults(msg, err)
  332. } else {
  333. c.ResultChan() <- hrpc.RPCResult{Msg: msg, Error: err}
  334. }
  335. }
  336. func (c *client) trySend(rpc hrpc.Call) error {
  337. select {
  338. case <-c.done:
  339. // An unrecoverable error has occured,
  340. // region client has been stopped,
  341. // don't send rpcs
  342. return ErrClientClosed
  343. case <-rpc.Context().Done():
  344. // If the deadline has been exceeded, don't bother sending the
  345. // request. The function that placed the RPC in our queue should
  346. // stop waiting for a result and return an error.
  347. return nil
  348. default:
  349. if id, err := c.send(rpc); err != nil {
  350. if _, ok := err.(ServerError); ok {
  351. c.fail(err)
  352. }
  353. if r := c.unregisterRPC(id); r != nil {
  354. // we are the ones to unregister the rpc,
  355. // return err to notify client of it
  356. return err
  357. }
  358. }
  359. return nil
  360. }
  361. }
  362. func (c *client) receiveRPCs() {
  363. for {
  364. select {
  365. case <-c.done:
  366. return
  367. default:
  368. if err := c.receive(); err != nil {
  369. if _, ok := err.(ServerError); ok {
  370. // fail the client and let the callers establish a new one
  371. c.fail(err)
  372. return
  373. }
  374. // in other cases we consider that the region client is healthy
  375. // and return the error to caller to let them retry
  376. }
  377. }
  378. }
  379. }
  380. func (c *client) receive() (err error) {
  381. var (
  382. sz [4]byte
  383. header pb.ResponseHeader
  384. response proto.Message
  385. )
  386. err = c.readFully(sz[:])
  387. if err != nil {
  388. return ServerError{err}
  389. }
  390. size := binary.BigEndian.Uint32(sz[:])
  391. b := make([]byte, size)
  392. err = c.readFully(b)
  393. if err != nil {
  394. return ServerError{err}
  395. }
  396. buf := proto.NewBuffer(b)
  397. if err = buf.DecodeMessage(&header); err != nil {
  398. return ServerError{fmt.Errorf("failed to decode the response header: %s", err)}
  399. }
  400. if header.CallId == nil {
  401. return ErrMissingCallID
  402. }
  403. callID := *header.CallId
  404. rpc := c.unregisterRPC(callID)
  405. if rpc == nil {
  406. return ServerError{fmt.Errorf("got a response with an unexpected call ID: %d", callID)}
  407. }
  408. c.inFlightDown()
  409. select {
  410. case <-rpc.Context().Done():
  411. // context has expired, don't bother deserializing
  412. return
  413. default:
  414. }
  415. // Here we know for sure that we got a response for rpc we asked.
  416. // It's our responsibility to deliver the response or error to the
  417. // caller as we unregistered the rpc.
  418. defer func() { returnResult(rpc, response, err) }()
  419. if header.Exception != nil {
  420. err = exceptionToError(*header.Exception.ExceptionClassName, *header.Exception.StackTrace)
  421. return
  422. }
  423. response = rpc.NewResponse()
  424. if err = buf.DecodeMessage(response); err != nil {
  425. err = RetryableError{fmt.Errorf("failed to decode the response: %s", err)}
  426. return
  427. }
  428. var cellsLen uint32
  429. if header.CellBlockMeta != nil {
  430. cellsLen = header.CellBlockMeta.GetLength()
  431. }
  432. if d, ok := rpc.(canDeserializeCellBlocks); cellsLen > 0 && ok {
  433. b := buf.Bytes()[size-cellsLen:]
  434. var nread uint32
  435. nread, err = d.DeserializeCellBlocks(response, b)
  436. if err != nil {
  437. err = RetryableError{fmt.Errorf("failed to decode the response: %s", err)}
  438. return
  439. }
  440. if int(nread) < len(b) {
  441. err = RetryableError{fmt.Errorf("short read: buffer len %d, read %d", len(b), nread)}
  442. return
  443. }
  444. }
  445. return
  446. }
  447. func exceptionToError(class, stack string) error {
  448. err := fmt.Errorf("HBase Java exception %s:\n%s", class, stack)
  449. if s, ok := javaRetryableExceptions[class]; ok && strings.Contains(stack, s) {
  450. return RetryableError{err}
  451. } else if s, ok := javaRegionExceptions[class]; ok && strings.Contains(stack, s) {
  452. return NotServingRegionError{err}
  453. } else if s, ok := javaServerExceptions[class]; ok && strings.Contains(stack, s) {
  454. return ServerError{err}
  455. }
  456. return err
  457. }
  458. // write sends the given buffer to the RegionServer.
  459. func (c *client) write(buf []byte) error {
  460. _, err := c.conn.Write(buf)
  461. return err
  462. }
  463. // Tries to read enough data to fully fill up the given buffer.
  464. func (c *client) readFully(buf []byte) error {
  465. _, err := io.ReadFull(c.conn, buf)
  466. return err
  467. }
  468. // sendHello sends the "hello" message needed when opening a new connection.
  469. func (c *client) sendHello() error {
  470. connHeader := &pb.ConnectionHeader{
  471. UserInfo: &pb.UserInformation{
  472. EffectiveUser: proto.String(c.effectiveUser),
  473. },
  474. ServiceName: proto.String(string(c.ctype)),
  475. CellBlockCodecClass: proto.String("org.apache.hadoop.hbase.codec.KeyValueCodec"),
  476. }
  477. data, err := proto.Marshal(connHeader)
  478. if err != nil {
  479. return fmt.Errorf("failed to marshal connection header: %s", err)
  480. }
  481. const header = "HBas\x00\x50" // \x50 = Simple Auth.
  482. buf := make([]byte, 0, len(header)+4+len(data))
  483. buf = append(buf, header...)
  484. buf = buf[:len(header)+4]
  485. binary.BigEndian.PutUint32(buf[6:], uint32(len(data)))
  486. buf = append(buf, data...)
  487. return c.write(buf)
  488. }
  489. // send sends an RPC out to the wire.
  490. // Returns the response (for now, as the call is synchronous).
  491. func (c *client) send(rpc hrpc.Call) (uint32, error) {
  492. b := newBuffer(4)
  493. defer func() { freeBuffer(b) }()
  494. buf := proto.NewBuffer(b[4:])
  495. buf.Reset()
  496. request := rpc.ToProto()
  497. // we have to register rpc after we marhsal because
  498. // registered rpc can fail before it was even sent
  499. // in all the cases where c.fail() is called.
  500. // If that happens, client can retry sending the rpc
  501. // again potentially changing it's contents.
  502. id := c.registerRPC(rpc)
  503. header := &pb.RequestHeader{
  504. CallId: &id,
  505. MethodName: proto.String(rpc.Name()),
  506. RequestParam: proto.Bool(true),
  507. }
  508. if err := buf.EncodeMessage(header); err != nil {
  509. return id, fmt.Errorf("failed to marshal request header: %s", err)
  510. }
  511. if err := buf.EncodeMessage(request); err != nil {
  512. return id, fmt.Errorf("failed to marshal request: %s", err)
  513. }
  514. payload := buf.Bytes()
  515. binary.BigEndian.PutUint32(b, uint32(len(payload)))
  516. b = append(b[:4], payload...)
  517. if err := c.write(b); err != nil {
  518. return id, ServerError{err}
  519. }
  520. c.inFlightUp()
  521. return id, nil
  522. }