sink.go 3.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161
  1. /*
  2. *
  3. * Copyright 2018 gRPC authors.
  4. *
  5. * Licensed under the Apache License, Version 2.0 (the "License");
  6. * you may not use this file except in compliance with the License.
  7. * You may obtain a copy of the License at
  8. *
  9. * http://www.apache.org/licenses/LICENSE-2.0
  10. *
  11. * Unless required by applicable law or agreed to in writing, software
  12. * distributed under the License is distributed on an "AS IS" BASIS,
  13. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  14. * See the License for the specific language governing permissions and
  15. * limitations under the License.
  16. *
  17. */
  18. package binarylog
  19. import (
  20. "bufio"
  21. "encoding/binary"
  22. "fmt"
  23. "io"
  24. "io/ioutil"
  25. "sync"
  26. "time"
  27. "github.com/golang/protobuf/proto"
  28. pb "google.golang.org/grpc/binarylog/grpc_binarylog_v1"
  29. )
  30. var (
  31. defaultSink Sink = &noopSink{} // TODO(blog): change this default (file in /tmp).
  32. )
  33. // SetDefaultSink sets the sink where binary logs will be written to.
  34. //
  35. // Not thread safe. Only set during initialization.
  36. func SetDefaultSink(s Sink) {
  37. if defaultSink != nil {
  38. defaultSink.Close()
  39. }
  40. defaultSink = s
  41. }
  42. // Sink writes log entry into the binary log sink.
  43. type Sink interface {
  44. // Write will be called to write the log entry into the sink.
  45. //
  46. // It should be thread-safe so it can be called in parallel.
  47. Write(*pb.GrpcLogEntry) error
  48. // Close will be called when the Sink is replaced by a new Sink.
  49. Close() error
  50. }
  51. type noopSink struct{}
  52. func (ns *noopSink) Write(*pb.GrpcLogEntry) error { return nil }
  53. func (ns *noopSink) Close() error { return nil }
  54. // newWriterSink creates a binary log sink with the given writer.
  55. //
  56. // Write() marshals the proto message and writes it to the given writer. Each
  57. // message is prefixed with a 4 byte big endian unsigned integer as the length.
  58. //
  59. // No buffer is done, Close() doesn't try to close the writer.
  60. func newWriterSink(w io.Writer) *writerSink {
  61. return &writerSink{out: w}
  62. }
  63. type writerSink struct {
  64. out io.Writer
  65. }
  66. func (ws *writerSink) Write(e *pb.GrpcLogEntry) error {
  67. b, err := proto.Marshal(e)
  68. if err != nil {
  69. grpclogLogger.Infof("binary logging: failed to marshal proto message: %v", err)
  70. }
  71. hdr := make([]byte, 4)
  72. binary.BigEndian.PutUint32(hdr, uint32(len(b)))
  73. if _, err := ws.out.Write(hdr); err != nil {
  74. return err
  75. }
  76. if _, err := ws.out.Write(b); err != nil {
  77. return err
  78. }
  79. return nil
  80. }
  81. func (ws *writerSink) Close() error { return nil }
  82. type bufWriteCloserSink struct {
  83. mu sync.Mutex
  84. closer io.Closer
  85. out *writerSink // out is built on buf.
  86. buf *bufio.Writer // buf is kept for flush.
  87. writeStartOnce sync.Once
  88. writeTicker *time.Ticker
  89. }
  90. func (fs *bufWriteCloserSink) Write(e *pb.GrpcLogEntry) error {
  91. // Start the write loop when Write is called.
  92. fs.writeStartOnce.Do(fs.startFlushGoroutine)
  93. fs.mu.Lock()
  94. if err := fs.out.Write(e); err != nil {
  95. fs.mu.Unlock()
  96. return err
  97. }
  98. fs.mu.Unlock()
  99. return nil
  100. }
  101. const (
  102. bufFlushDuration = 60 * time.Second
  103. )
  104. func (fs *bufWriteCloserSink) startFlushGoroutine() {
  105. fs.writeTicker = time.NewTicker(bufFlushDuration)
  106. go func() {
  107. for range fs.writeTicker.C {
  108. fs.mu.Lock()
  109. fs.buf.Flush()
  110. fs.mu.Unlock()
  111. }
  112. }()
  113. }
  114. func (fs *bufWriteCloserSink) Close() error {
  115. if fs.writeTicker != nil {
  116. fs.writeTicker.Stop()
  117. }
  118. fs.mu.Lock()
  119. fs.buf.Flush()
  120. fs.closer.Close()
  121. fs.out.Close()
  122. fs.mu.Unlock()
  123. return nil
  124. }
  125. func newBufWriteCloserSink(o io.WriteCloser) Sink {
  126. bufW := bufio.NewWriter(o)
  127. return &bufWriteCloserSink{
  128. closer: o,
  129. out: newWriterSink(bufW),
  130. buf: bufW,
  131. }
  132. }
  133. // NewTempFileSink creates a temp file and returns a Sink that writes to this
  134. // file.
  135. func NewTempFileSink() (Sink, error) {
  136. tempFile, err := ioutil.TempFile("/tmp", "grpcgo_binarylog_*.txt")
  137. if err != nil {
  138. return nil, fmt.Errorf("failed to create temp file: %v", err)
  139. }
  140. return newBufWriteCloserSink(tempFile), nil
  141. }