method_logger.go 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422
  1. /*
  2. *
  3. * Copyright 2018 gRPC authors.
  4. *
  5. * Licensed under the Apache License, Version 2.0 (the "License");
  6. * you may not use this file except in compliance with the License.
  7. * You may obtain a copy of the License at
  8. *
  9. * http://www.apache.org/licenses/LICENSE-2.0
  10. *
  11. * Unless required by applicable law or agreed to in writing, software
  12. * distributed under the License is distributed on an "AS IS" BASIS,
  13. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  14. * See the License for the specific language governing permissions and
  15. * limitations under the License.
  16. *
  17. */
  18. package binarylog
  19. import (
  20. "net"
  21. "strings"
  22. "sync/atomic"
  23. "time"
  24. "github.com/golang/protobuf/proto"
  25. "github.com/golang/protobuf/ptypes"
  26. pb "google.golang.org/grpc/binarylog/grpc_binarylog_v1"
  27. "google.golang.org/grpc/metadata"
  28. "google.golang.org/grpc/status"
  29. )
  30. type callIDGenerator struct {
  31. id uint64
  32. }
  33. func (g *callIDGenerator) next() uint64 {
  34. id := atomic.AddUint64(&g.id, 1)
  35. return id
  36. }
  37. // reset is for testing only, and doesn't need to be thread safe.
  38. func (g *callIDGenerator) reset() {
  39. g.id = 0
  40. }
  41. var idGen callIDGenerator
  42. // MethodLogger is the sub-logger for each method.
  43. type MethodLogger struct {
  44. headerMaxLen, messageMaxLen uint64
  45. callID uint64
  46. idWithinCallGen *callIDGenerator
  47. sink Sink // TODO(blog): make this plugable.
  48. }
  49. func newMethodLogger(h, m uint64) *MethodLogger {
  50. return &MethodLogger{
  51. headerMaxLen: h,
  52. messageMaxLen: m,
  53. callID: idGen.next(),
  54. idWithinCallGen: &callIDGenerator{},
  55. sink: defaultSink, // TODO(blog): make it plugable.
  56. }
  57. }
  58. // Log creates a proto binary log entry, and logs it to the sink.
  59. func (ml *MethodLogger) Log(c LogEntryConfig) {
  60. m := c.toProto()
  61. timestamp, _ := ptypes.TimestampProto(time.Now())
  62. m.Timestamp = timestamp
  63. m.CallId = ml.callID
  64. m.SequenceIdWithinCall = ml.idWithinCallGen.next()
  65. switch pay := m.Payload.(type) {
  66. case *pb.GrpcLogEntry_ClientHeader:
  67. m.PayloadTruncated = ml.truncateMetadata(pay.ClientHeader.GetMetadata())
  68. case *pb.GrpcLogEntry_ServerHeader:
  69. m.PayloadTruncated = ml.truncateMetadata(pay.ServerHeader.GetMetadata())
  70. case *pb.GrpcLogEntry_Message:
  71. m.PayloadTruncated = ml.truncateMessage(pay.Message)
  72. }
  73. ml.sink.Write(m)
  74. }
  75. func (ml *MethodLogger) truncateMetadata(mdPb *pb.Metadata) (truncated bool) {
  76. if ml.headerMaxLen == maxUInt {
  77. return false
  78. }
  79. var (
  80. bytesLimit = ml.headerMaxLen
  81. index int
  82. )
  83. // At the end of the loop, index will be the first entry where the total
  84. // size is greater than the limit:
  85. //
  86. // len(entry[:index]) <= ml.hdr && len(entry[:index+1]) > ml.hdr.
  87. for ; index < len(mdPb.Entry); index++ {
  88. entry := mdPb.Entry[index]
  89. if entry.Key == "grpc-trace-bin" {
  90. // "grpc-trace-bin" is a special key. It's kept in the log entry,
  91. // but not counted towards the size limit.
  92. continue
  93. }
  94. currentEntryLen := uint64(len(entry.Value))
  95. if currentEntryLen > bytesLimit {
  96. break
  97. }
  98. bytesLimit -= currentEntryLen
  99. }
  100. truncated = index < len(mdPb.Entry)
  101. mdPb.Entry = mdPb.Entry[:index]
  102. return truncated
  103. }
  104. func (ml *MethodLogger) truncateMessage(msgPb *pb.Message) (truncated bool) {
  105. if ml.messageMaxLen == maxUInt {
  106. return false
  107. }
  108. if ml.messageMaxLen >= uint64(len(msgPb.Data)) {
  109. return false
  110. }
  111. msgPb.Data = msgPb.Data[:ml.messageMaxLen]
  112. return true
  113. }
  114. // LogEntryConfig represents the configuration for binary log entry.
  115. type LogEntryConfig interface {
  116. toProto() *pb.GrpcLogEntry
  117. }
  118. // ClientHeader configs the binary log entry to be a ClientHeader entry.
  119. type ClientHeader struct {
  120. OnClientSide bool
  121. Header metadata.MD
  122. MethodName string
  123. Authority string
  124. Timeout time.Duration
  125. // PeerAddr is required only when it's on server side.
  126. PeerAddr net.Addr
  127. }
  128. func (c *ClientHeader) toProto() *pb.GrpcLogEntry {
  129. // This function doesn't need to set all the fields (e.g. seq ID). The Log
  130. // function will set the fields when necessary.
  131. clientHeader := &pb.ClientHeader{
  132. Metadata: mdToMetadataProto(c.Header),
  133. MethodName: c.MethodName,
  134. Authority: c.Authority,
  135. }
  136. if c.Timeout > 0 {
  137. clientHeader.Timeout = ptypes.DurationProto(c.Timeout)
  138. }
  139. ret := &pb.GrpcLogEntry{
  140. Type: pb.GrpcLogEntry_EVENT_TYPE_CLIENT_HEADER,
  141. Payload: &pb.GrpcLogEntry_ClientHeader{
  142. ClientHeader: clientHeader,
  143. },
  144. }
  145. if c.OnClientSide {
  146. ret.Logger = pb.GrpcLogEntry_LOGGER_CLIENT
  147. } else {
  148. ret.Logger = pb.GrpcLogEntry_LOGGER_SERVER
  149. }
  150. if c.PeerAddr != nil {
  151. ret.Peer = addrToProto(c.PeerAddr)
  152. }
  153. return ret
  154. }
  155. // ServerHeader configs the binary log entry to be a ServerHeader entry.
  156. type ServerHeader struct {
  157. OnClientSide bool
  158. Header metadata.MD
  159. // PeerAddr is required only when it's on client side.
  160. PeerAddr net.Addr
  161. }
  162. func (c *ServerHeader) toProto() *pb.GrpcLogEntry {
  163. ret := &pb.GrpcLogEntry{
  164. Type: pb.GrpcLogEntry_EVENT_TYPE_SERVER_HEADER,
  165. Payload: &pb.GrpcLogEntry_ServerHeader{
  166. ServerHeader: &pb.ServerHeader{
  167. Metadata: mdToMetadataProto(c.Header),
  168. },
  169. },
  170. }
  171. if c.OnClientSide {
  172. ret.Logger = pb.GrpcLogEntry_LOGGER_CLIENT
  173. } else {
  174. ret.Logger = pb.GrpcLogEntry_LOGGER_SERVER
  175. }
  176. if c.PeerAddr != nil {
  177. ret.Peer = addrToProto(c.PeerAddr)
  178. }
  179. return ret
  180. }
  181. // ClientMessage configs the binary log entry to be a ClientMessage entry.
  182. type ClientMessage struct {
  183. OnClientSide bool
  184. // Message can be a proto.Message or []byte. Other messages formats are not
  185. // supported.
  186. Message interface{}
  187. }
  188. func (c *ClientMessage) toProto() *pb.GrpcLogEntry {
  189. var (
  190. data []byte
  191. err error
  192. )
  193. if m, ok := c.Message.(proto.Message); ok {
  194. data, err = proto.Marshal(m)
  195. if err != nil {
  196. grpclogLogger.Infof("binarylogging: failed to marshal proto message: %v", err)
  197. }
  198. } else if b, ok := c.Message.([]byte); ok {
  199. data = b
  200. } else {
  201. grpclogLogger.Infof("binarylogging: message to log is neither proto.message nor []byte")
  202. }
  203. ret := &pb.GrpcLogEntry{
  204. Type: pb.GrpcLogEntry_EVENT_TYPE_CLIENT_MESSAGE,
  205. Payload: &pb.GrpcLogEntry_Message{
  206. Message: &pb.Message{
  207. Length: uint32(len(data)),
  208. Data: data,
  209. },
  210. },
  211. }
  212. if c.OnClientSide {
  213. ret.Logger = pb.GrpcLogEntry_LOGGER_CLIENT
  214. } else {
  215. ret.Logger = pb.GrpcLogEntry_LOGGER_SERVER
  216. }
  217. return ret
  218. }
  219. // ServerMessage configs the binary log entry to be a ServerMessage entry.
  220. type ServerMessage struct {
  221. OnClientSide bool
  222. // Message can be a proto.Message or []byte. Other messages formats are not
  223. // supported.
  224. Message interface{}
  225. }
  226. func (c *ServerMessage) toProto() *pb.GrpcLogEntry {
  227. var (
  228. data []byte
  229. err error
  230. )
  231. if m, ok := c.Message.(proto.Message); ok {
  232. data, err = proto.Marshal(m)
  233. if err != nil {
  234. grpclogLogger.Infof("binarylogging: failed to marshal proto message: %v", err)
  235. }
  236. } else if b, ok := c.Message.([]byte); ok {
  237. data = b
  238. } else {
  239. grpclogLogger.Infof("binarylogging: message to log is neither proto.message nor []byte")
  240. }
  241. ret := &pb.GrpcLogEntry{
  242. Type: pb.GrpcLogEntry_EVENT_TYPE_SERVER_MESSAGE,
  243. Payload: &pb.GrpcLogEntry_Message{
  244. Message: &pb.Message{
  245. Length: uint32(len(data)),
  246. Data: data,
  247. },
  248. },
  249. }
  250. if c.OnClientSide {
  251. ret.Logger = pb.GrpcLogEntry_LOGGER_CLIENT
  252. } else {
  253. ret.Logger = pb.GrpcLogEntry_LOGGER_SERVER
  254. }
  255. return ret
  256. }
  257. // ClientHalfClose configs the binary log entry to be a ClientHalfClose entry.
  258. type ClientHalfClose struct {
  259. OnClientSide bool
  260. }
  261. func (c *ClientHalfClose) toProto() *pb.GrpcLogEntry {
  262. ret := &pb.GrpcLogEntry{
  263. Type: pb.GrpcLogEntry_EVENT_TYPE_CLIENT_HALF_CLOSE,
  264. Payload: nil, // No payload here.
  265. }
  266. if c.OnClientSide {
  267. ret.Logger = pb.GrpcLogEntry_LOGGER_CLIENT
  268. } else {
  269. ret.Logger = pb.GrpcLogEntry_LOGGER_SERVER
  270. }
  271. return ret
  272. }
  273. // ServerTrailer configs the binary log entry to be a ServerTrailer entry.
  274. type ServerTrailer struct {
  275. OnClientSide bool
  276. Trailer metadata.MD
  277. // Err is the status error.
  278. Err error
  279. // PeerAddr is required only when it's on client side and the RPC is trailer
  280. // only.
  281. PeerAddr net.Addr
  282. }
  283. func (c *ServerTrailer) toProto() *pb.GrpcLogEntry {
  284. st, ok := status.FromError(c.Err)
  285. if !ok {
  286. grpclogLogger.Info("binarylogging: error in trailer is not a status error")
  287. }
  288. var (
  289. detailsBytes []byte
  290. err error
  291. )
  292. stProto := st.Proto()
  293. if stProto != nil && len(stProto.Details) != 0 {
  294. detailsBytes, err = proto.Marshal(stProto)
  295. if err != nil {
  296. grpclogLogger.Infof("binarylogging: failed to marshal status proto: %v", err)
  297. }
  298. }
  299. ret := &pb.GrpcLogEntry{
  300. Type: pb.GrpcLogEntry_EVENT_TYPE_SERVER_TRAILER,
  301. Payload: &pb.GrpcLogEntry_Trailer{
  302. Trailer: &pb.Trailer{
  303. Metadata: mdToMetadataProto(c.Trailer),
  304. StatusCode: uint32(st.Code()),
  305. StatusMessage: st.Message(),
  306. StatusDetails: detailsBytes,
  307. },
  308. },
  309. }
  310. if c.OnClientSide {
  311. ret.Logger = pb.GrpcLogEntry_LOGGER_CLIENT
  312. } else {
  313. ret.Logger = pb.GrpcLogEntry_LOGGER_SERVER
  314. }
  315. if c.PeerAddr != nil {
  316. ret.Peer = addrToProto(c.PeerAddr)
  317. }
  318. return ret
  319. }
  320. // Cancel configs the binary log entry to be a Cancel entry.
  321. type Cancel struct {
  322. OnClientSide bool
  323. }
  324. func (c *Cancel) toProto() *pb.GrpcLogEntry {
  325. ret := &pb.GrpcLogEntry{
  326. Type: pb.GrpcLogEntry_EVENT_TYPE_CANCEL,
  327. Payload: nil,
  328. }
  329. if c.OnClientSide {
  330. ret.Logger = pb.GrpcLogEntry_LOGGER_CLIENT
  331. } else {
  332. ret.Logger = pb.GrpcLogEntry_LOGGER_SERVER
  333. }
  334. return ret
  335. }
  336. // metadataKeyOmit returns whether the metadata entry with this key should be
  337. // omitted.
  338. func metadataKeyOmit(key string) bool {
  339. switch key {
  340. case "lb-token", ":path", ":authority", "content-encoding", "content-type", "user-agent", "te":
  341. return true
  342. case "grpc-trace-bin": // grpc-trace-bin is special because it's visiable to users.
  343. return false
  344. }
  345. return strings.HasPrefix(key, "grpc-")
  346. }
  347. func mdToMetadataProto(md metadata.MD) *pb.Metadata {
  348. ret := &pb.Metadata{}
  349. for k, vv := range md {
  350. if metadataKeyOmit(k) {
  351. continue
  352. }
  353. for _, v := range vv {
  354. ret.Entry = append(ret.Entry,
  355. &pb.MetadataEntry{
  356. Key: k,
  357. Value: []byte(v),
  358. },
  359. )
  360. }
  361. }
  362. return ret
  363. }
  364. func addrToProto(addr net.Addr) *pb.Address {
  365. ret := &pb.Address{}
  366. switch a := addr.(type) {
  367. case *net.TCPAddr:
  368. if a.IP.To4() != nil {
  369. ret.Type = pb.Address_TYPE_IPV4
  370. } else if a.IP.To16() != nil {
  371. ret.Type = pb.Address_TYPE_IPV6
  372. } else {
  373. ret.Type = pb.Address_TYPE_UNKNOWN
  374. // Do not set address and port fields.
  375. break
  376. }
  377. ret.Address = a.IP.String()
  378. ret.IpPort = uint32(a.Port)
  379. case *net.UnixAddr:
  380. ret.Type = pb.Address_TYPE_UNIX
  381. ret.Address = a.String()
  382. default:
  383. ret.Type = pb.Address_TYPE_UNKNOWN
  384. }
  385. return ret
  386. }