restclient.go 9.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311
  1. package datahub
  2. import (
  3. "bytes"
  4. "context"
  5. "crypto/hmac"
  6. "crypto/sha1"
  7. "encoding/base64"
  8. "errors"
  9. "fmt"
  10. "io/ioutil"
  11. "net"
  12. "net/http"
  13. "os"
  14. "sort"
  15. "strconv"
  16. "strings"
  17. "time"
  18. log "github.com/sirupsen/logrus"
  19. )
  20. const (
  21. httpHeaderAcceptEncoding = "Accept-Encoding"
  22. httpHeaderAuthorization = "Authorization"
  23. httpHeadercacheControl = "Cache-Control"
  24. httpHeaderChunked = "chunked"
  25. httpHeaderClientVersion = "x-datahub-client-version"
  26. httpHeaderContentDisposition = "Content-Disposition"
  27. httpHeaderContentEncoding = "Content-Encoding"
  28. httpHeaderContentLength = "Content-Length"
  29. httpHeaderContentMD5 = "Content-MD5"
  30. httpHeaderContentType = "Content-Type"
  31. httpHeaderDate = "Date"
  32. httpHeaderETAG = "ETag"
  33. httpHeaderEXPIRES = "Expires"
  34. httpHeaderHost = "Host"
  35. httpHeaderlastModified = "Last-Modified"
  36. httpHeaderLocation = "Location"
  37. httpHeaderRange = "Range"
  38. httpHeaderRawSize = "x-datahub-content-raw-size"
  39. httpHeaderRequestAction = "x-datahub-request-action"
  40. httpHeaderRequestId = "x-datahub-request-id"
  41. httpHeaderSecurityToken = "x-datahub-security-token"
  42. httpHeaderTransferEncoding = "Transfer-Encoding"
  43. httpHeaderUserAgent = "User-Agent"
  44. httpHeaderConnectorMode = "mode"
  45. )
  46. const (
  47. httpFilterQuery = "filter"
  48. httpJsonContent = "application/json"
  49. httpProtoContent = "application/x-protobuf"
  50. httpProtoBatchContent = "application/x-binary"
  51. httpPublistContent = "pub"
  52. httpSubscribeContent = "sub"
  53. )
  54. const (
  55. datahubHeadersPrefix = "x-datahub-"
  56. )
  57. func init() {
  58. // Log as JSON instead of the default ASCII formatter.
  59. log.SetFormatter(&log.TextFormatter{})
  60. // Output to stdout instead of the default stderr
  61. // Can be any io.Writer, see below for File examples
  62. log.SetOutput(os.Stdout)
  63. // Only log the level severity or above.
  64. dev := strings.ToLower(os.Getenv("GODATAHUB_DEV"))
  65. switch dev {
  66. case "true":
  67. log.SetLevel(log.DebugLevel)
  68. default:
  69. log.SetLevel(log.WarnLevel)
  70. }
  71. }
  72. // DialContextFn was defined to make code more readable.
  73. type DialContextFn func(ctx context.Context, network, address string) (net.Conn, error)
  74. // TraceDialContext implements our own dialer in order to trace conn info.
  75. func TraceDialContext(ctimeout time.Duration) DialContextFn {
  76. dialer := &net.Dialer{
  77. Timeout: ctimeout,
  78. KeepAlive: ctimeout,
  79. }
  80. return func(ctx context.Context, network, addr string) (net.Conn, error) {
  81. conn, err := dialer.DialContext(ctx, network, addr)
  82. if err != nil {
  83. return nil, err
  84. }
  85. log.Debug("connect done, use", conn.LocalAddr().String())
  86. return conn, nil
  87. }
  88. }
  89. // RestClient rest客户端
  90. type RestClient struct {
  91. // Endpoint datahub服务的endpint
  92. Endpoint string
  93. // Useragent user agent
  94. Useragent string
  95. // HttpClient http client
  96. HttpClient *http.Client
  97. // Account
  98. Account Account
  99. CompressorType CompressorType
  100. }
  101. // NewRestClient create a new rest client
  102. func NewRestClient(endpoint string, useragent string, httpClient *http.Client, account Account, cType CompressorType) *RestClient {
  103. if strings.HasSuffix(endpoint, "/") {
  104. endpoint = endpoint[0 : len(endpoint)-1]
  105. }
  106. return &RestClient{
  107. Endpoint: endpoint,
  108. Useragent: useragent,
  109. HttpClient: httpClient,
  110. Account: account,
  111. CompressorType: cType,
  112. }
  113. }
  114. type RequestParameter struct {
  115. Header map[string]string
  116. Query map[string]string
  117. }
  118. // Get send HTTP Get method request
  119. func (client *RestClient) Get(resource string, para *RequestParameter) ([]byte, *CommonResponseResult, error) {
  120. return client.request(http.MethodGet, resource, &EmptyRequest{}, para)
  121. }
  122. // Post send HTTP Post method request
  123. func (client *RestClient) Post(resource string, model RequestModel, para *RequestParameter) ([]byte, *CommonResponseResult, error) {
  124. return client.request(http.MethodPost, resource, model, para)
  125. }
  126. // Put send HTTP Put method request
  127. func (client *RestClient) Put(resource string, model RequestModel, para *RequestParameter) (interface{}, *CommonResponseResult, error) {
  128. return client.request(http.MethodPut, resource, model, para)
  129. }
  130. // Delete send HTTP Delete method request
  131. func (client *RestClient) Delete(resource string, para *RequestParameter) (interface{}, *CommonResponseResult, error) {
  132. return client.request(http.MethodDelete, resource, &EmptyRequest{}, para)
  133. }
  134. func (client *RestClient) request(method, resource string, requestModel RequestModel, para *RequestParameter) ([]byte, *CommonResponseResult, error) {
  135. url := fmt.Sprintf("%s%s", client.Endpoint, resource)
  136. header := map[string]string{
  137. httpHeaderClientVersion: DATAHUB_CLIENT_VERSION,
  138. httpHeaderDate: time.Now().UTC().Format(http.TimeFormat),
  139. httpHeaderUserAgent: client.Useragent,
  140. }
  141. //serialization
  142. reqBody, err := requestModel.requestBodyEncode()
  143. if err != nil {
  144. return nil, nil, err
  145. }
  146. //compress
  147. client.compressIfNeed(header, &reqBody)
  148. if client.Account.GetSecurityToken() != "" {
  149. header[httpHeaderSecurityToken] = client.Account.GetSecurityToken()
  150. }
  151. req, err := http.NewRequest(method, url, bytes.NewBuffer(reqBody))
  152. if err != nil {
  153. return nil, nil, err
  154. }
  155. if para != nil {
  156. for k, v := range para.Header {
  157. header[k] = v
  158. }
  159. query := req.URL.Query()
  160. for k, v := range para.Query {
  161. query.Add(k, v)
  162. }
  163. req.URL.RawQuery = query.Encode()
  164. }
  165. for k, v := range header {
  166. req.Header.Add(k, v)
  167. }
  168. client.buildSignature(&req.Header, method, resource)
  169. resp, err := client.HttpClient.Do(req)
  170. if err != nil {
  171. if strings.Contains(err.Error(), "EOF") {
  172. return nil, nil, NewServiceTemporaryUnavailableError(err.Error());
  173. }
  174. return nil, nil, err
  175. }
  176. defer resp.Body.Close()
  177. respBody, err := ioutil.ReadAll(resp.Body)
  178. if err != nil {
  179. return nil, nil, err
  180. }
  181. //decompress
  182. if err := client.decompress(&respBody, &resp.Header); err != nil {
  183. return nil, nil, err
  184. }
  185. //detect error
  186. respResult, err := newCommonResponseResult(resp.StatusCode, &resp.Header, respBody)
  187. log.Debug(fmt.Sprintf("request id: %s\nrequest url: %s\nrequest headers: %v\nrequest body: %s\nresponse headers: %v\nresponse body: %s",
  188. respResult.RequestId, url, req.Header, string(reqBody), resp.Header, string(respBody)))
  189. if err != nil {
  190. return nil, nil, err
  191. }
  192. return respBody, respResult, nil
  193. }
  194. func (client *RestClient) buildSignature(header *http.Header, method, resource string) {
  195. builder := make([]string, 0, 5)
  196. builder = append(builder, method)
  197. builder = append(builder, header.Get(httpHeaderContentType))
  198. builder = append(builder, header.Get(httpHeaderDate))
  199. headersToSign := make(map[string][]string)
  200. for k, v := range *header {
  201. lower := strings.ToLower(k)
  202. if strings.HasPrefix(lower, datahubHeadersPrefix) {
  203. headersToSign[lower] = v
  204. }
  205. }
  206. keys := make([]string, len(headersToSign))
  207. for k := range headersToSign {
  208. keys = append(keys, k)
  209. }
  210. sort.Strings(keys)
  211. for _, k := range keys {
  212. for _, v := range headersToSign[k] {
  213. builder = append(builder, fmt.Sprintf("%s:%s", k, v))
  214. }
  215. }
  216. builder = append(builder, resource)
  217. canonString := strings.Join(builder, "\n")
  218. log.Debug(fmt.Sprintf("canonString: %s, accesskey: %s", canonString, client.Account.GetAccountKey()))
  219. hash := hmac.New(sha1.New, []byte(client.Account.GetAccountKey()))
  220. hash.Write([]byte(canonString))
  221. crypto := hash.Sum(nil)
  222. signature := base64.StdEncoding.EncodeToString(crypto)
  223. authorization := fmt.Sprintf("DATAHUB %s:%s", client.Account.GetAccountId(), signature)
  224. header.Add(httpHeaderAuthorization, authorization)
  225. }
  226. func (client *RestClient) compressIfNeed(header map[string]string, reqBody *[]byte) {
  227. if client.CompressorType == NOCOMPRESS {
  228. return
  229. }
  230. compressor := getCompressor(client.CompressorType)
  231. if compressor != nil {
  232. compressedReqBody, err := compressor.Compress(*reqBody)
  233. header[httpHeaderAcceptEncoding] = client.CompressorType.String()
  234. //compress is valid
  235. if err == nil && len(compressedReqBody) < len(*reqBody) {
  236. header[httpHeaderContentEncoding] = client.CompressorType.String()
  237. //header[httpHeaderAcceptEncoding] = client.CompressorType.String()
  238. header[httpHeaderRawSize] = strconv.Itoa(len(*reqBody))
  239. *reqBody = compressedReqBody
  240. } else {
  241. //print warning and give up compress when compress failed
  242. log.Warning("compress failed or compress invalid, give up compression, ", err)
  243. }
  244. }
  245. header[httpHeaderContentLength] = strconv.Itoa(len(*reqBody))
  246. return
  247. }
  248. func (client *RestClient) decompress(respBody *[]byte, header *http.Header) error {
  249. encoding := header.Get(httpHeaderContentEncoding)
  250. if encoding == "" {
  251. return nil
  252. }
  253. compressor := getCompressor(CompressorType(encoding))
  254. if compressor == nil {
  255. return errors.New(fmt.Sprintf("not support the compress mode %s ", encoding))
  256. }
  257. rawSize := header.Get(httpHeaderRawSize)
  258. //str convert to int64
  259. size, err := strconv.ParseInt(rawSize, 10, 64)
  260. if err != nil {
  261. return err
  262. }
  263. buf, err := compressor.DeCompress(*respBody, size)
  264. if err != nil {
  265. return err
  266. }
  267. *respBody = buf
  268. return nil
  269. }