roundtrip.go 740 B

12345678910111213141516171819202122232425262728
  1. package protocol
  2. import (
  3. "io"
  4. )
  5. // RoundTrip sends a request to a kafka broker and returns the response.
  6. func RoundTrip(rw io.ReadWriter, apiVersion int16, correlationID int32, clientID string, req Message) (Message, error) {
  7. if err := WriteRequest(rw, apiVersion, correlationID, clientID, req); err != nil {
  8. return nil, err
  9. }
  10. if !hasResponse(req) {
  11. return nil, nil
  12. }
  13. id, res, err := ReadResponse(rw, req.ApiKey(), apiVersion)
  14. if err != nil {
  15. return nil, err
  16. }
  17. if id != correlationID {
  18. return nil, Errorf("correlation id mismatch (expected=%d, found=%d)", correlationID, id)
  19. }
  20. return res, nil
  21. }
  22. func hasResponse(msg Message) bool {
  23. x, _ := msg.(interface{ HasResponse() bool })
  24. return x == nil || x.HasResponse()
  25. }