error.go 2.4 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091
  1. package protocol
  2. import (
  3. "fmt"
  4. )
  5. // Error represents client-side protocol errors.
  6. type Error string
  7. func (e Error) Error() string { return string(e) }
  8. func Errorf(msg string, args ...interface{}) Error {
  9. return Error(fmt.Sprintf(msg, args...))
  10. }
  11. const (
  12. // ErrNoTopic is returned when a request needs to be sent to a specific
  13. ErrNoTopic Error = "topic not found"
  14. // ErrNoPartition is returned when a request needs to be sent to a specific
  15. // partition, but the client did not find it in the cluster metadata.
  16. ErrNoPartition Error = "topic partition not found"
  17. // ErrNoLeader is returned when a request needs to be sent to a partition
  18. // leader, but the client could not determine what the leader was at this
  19. // time.
  20. ErrNoLeader Error = "topic partition has no leader"
  21. // ErrNoRecord is returned when attempting to write a message containing an
  22. // empty record set (which kafka forbids).
  23. //
  24. // We handle this case client-side because kafka will close the connection
  25. // that it received an empty produce request on, causing all concurrent
  26. // requests to be aborted.
  27. ErrNoRecord Error = "record set contains no records"
  28. // ErrNoReset is returned by ResetRecordReader when the record reader does
  29. // not support being reset.
  30. ErrNoReset Error = "record sequence does not support reset"
  31. )
  32. type TopicError struct {
  33. Topic string
  34. Err error
  35. }
  36. func NewTopicError(topic string, err error) *TopicError {
  37. return &TopicError{Topic: topic, Err: err}
  38. }
  39. func NewErrNoTopic(topic string) *TopicError {
  40. return NewTopicError(topic, ErrNoTopic)
  41. }
  42. func (e *TopicError) Error() string {
  43. return fmt.Sprintf("%v (topic=%q)", e.Err, e.Topic)
  44. }
  45. func (e *TopicError) Unwrap() error {
  46. return e.Err
  47. }
  48. type TopicPartitionError struct {
  49. Topic string
  50. Partition int32
  51. Err error
  52. }
  53. func NewTopicPartitionError(topic string, partition int32, err error) *TopicPartitionError {
  54. return &TopicPartitionError{
  55. Topic: topic,
  56. Partition: partition,
  57. Err: err,
  58. }
  59. }
  60. func NewErrNoPartition(topic string, partition int32) *TopicPartitionError {
  61. return NewTopicPartitionError(topic, partition, ErrNoPartition)
  62. }
  63. func NewErrNoLeader(topic string, partition int32) *TopicPartitionError {
  64. return NewTopicPartitionError(topic, partition, ErrNoLeader)
  65. }
  66. func (e *TopicPartitionError) Error() string {
  67. return fmt.Sprintf("%v (topic=%q partition=%d)", e.Err, e.Topic, e.Partition)
  68. }
  69. func (e *TopicPartitionError) Unwrap() error {
  70. return e.Err
  71. }