new.go 1.6 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768
  1. // Copyright (C) 2016 The GoHBase Authors. All rights reserved.
  2. // This file is part of GoHBase.
  3. // Use of this source code is governed by the Apache License 2.0
  4. // that can be found in the COPYING file.
  5. // +build !testing
  6. package region
  7. import (
  8. "context"
  9. "fmt"
  10. "net"
  11. "time"
  12. "github.com/tsuna/gohbase/hrpc"
  13. )
  14. // NewClient creates a new RegionClient.
  15. func NewClient(addr string, ctype ClientType, queueSize int, flushInterval time.Duration,
  16. effectiveUser string, readTimeout time.Duration) hrpc.RegionClient {
  17. return &client{
  18. addr: addr,
  19. ctype: ctype,
  20. rpcQueueSize: queueSize,
  21. flushInterval: flushInterval,
  22. effectiveUser: effectiveUser,
  23. readTimeout: readTimeout,
  24. rpcs: make(chan hrpc.Call),
  25. done: make(chan struct{}),
  26. sent: make(map[uint32]hrpc.Call),
  27. }
  28. }
  29. func (c *client) Dial(ctx context.Context) error {
  30. c.dialOnce.Do(func() {
  31. var d net.Dialer
  32. var err error
  33. c.conn, err = d.DialContext(ctx, "tcp", c.addr)
  34. if err != nil {
  35. c.fail(fmt.Errorf("failed to dial RegionServer: %s", err))
  36. return
  37. }
  38. // time out send hello if it take long
  39. if deadline, ok := ctx.Deadline(); ok {
  40. c.conn.SetWriteDeadline(deadline)
  41. }
  42. if err := c.sendHello(); err != nil {
  43. c.fail(fmt.Errorf("failed to send hello to RegionServer: %s", err))
  44. return
  45. }
  46. // reset write deadline
  47. c.conn.SetWriteDeadline(time.Time{})
  48. if c.ctype == RegionClient {
  49. go c.processRPCs() // Batching goroutine
  50. }
  51. go c.receiveRPCs() // Reader goroutine
  52. })
  53. select {
  54. case <-c.done:
  55. return ErrClientClosed
  56. default:
  57. return nil
  58. }
  59. }