12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091 |
- package protocol
- import (
- "fmt"
- )
- // Error represents client-side protocol errors.
- type Error string
- func (e Error) Error() string { return string(e) }
- func Errorf(msg string, args ...interface{}) Error {
- return Error(fmt.Sprintf(msg, args...))
- }
- const (
- // ErrNoTopic is returned when a request needs to be sent to a specific
- ErrNoTopic Error = "topic not found"
- // ErrNoPartition is returned when a request needs to be sent to a specific
- // partition, but the client did not find it in the cluster metadata.
- ErrNoPartition Error = "topic partition not found"
- // ErrNoLeader is returned when a request needs to be sent to a partition
- // leader, but the client could not determine what the leader was at this
- // time.
- ErrNoLeader Error = "topic partition has no leader"
- // ErrNoRecord is returned when attempting to write a message containing an
- // empty record set (which kafka forbids).
- //
- // We handle this case client-side because kafka will close the connection
- // that it received an empty produce request on, causing all concurrent
- // requests to be aborted.
- ErrNoRecord Error = "record set contains no records"
- // ErrNoReset is returned by ResetRecordReader when the record reader does
- // not support being reset.
- ErrNoReset Error = "record sequence does not support reset"
- )
- type TopicError struct {
- Topic string
- Err error
- }
- func NewTopicError(topic string, err error) *TopicError {
- return &TopicError{Topic: topic, Err: err}
- }
- func NewErrNoTopic(topic string) *TopicError {
- return NewTopicError(topic, ErrNoTopic)
- }
- func (e *TopicError) Error() string {
- return fmt.Sprintf("%v (topic=%q)", e.Err, e.Topic)
- }
- func (e *TopicError) Unwrap() error {
- return e.Err
- }
- type TopicPartitionError struct {
- Topic string
- Partition int32
- Err error
- }
- func NewTopicPartitionError(topic string, partition int32, err error) *TopicPartitionError {
- return &TopicPartitionError{
- Topic: topic,
- Partition: partition,
- Err: err,
- }
- }
- func NewErrNoPartition(topic string, partition int32) *TopicPartitionError {
- return NewTopicPartitionError(topic, partition, ErrNoPartition)
- }
- func NewErrNoLeader(topic string, partition int32) *TopicPartitionError {
- return NewTopicPartitionError(topic, partition, ErrNoLeader)
- }
- func (e *TopicPartitionError) Error() string {
- return fmt.Sprintf("%v (topic=%q partition=%d)", e.Err, e.Topic, e.Partition)
- }
- func (e *TopicPartitionError) Unwrap() error {
- return e.Err
- }
|