commit.go 1.0 KB

123456789101112131415161718192021222324252627282930313233343536373839
  1. package kafka
  2. // A commit represents the instruction of publishing an update of the last
  3. // offset read by a program for a topic and partition.
  4. type commit struct {
  5. topic string
  6. partition int
  7. offset int64
  8. }
  9. // makeCommit builds a commit value from a message, the resulting commit takes
  10. // its topic, partition, and offset from the message.
  11. func makeCommit(msg Message) commit {
  12. return commit{
  13. topic: msg.Topic,
  14. partition: msg.Partition,
  15. offset: msg.Offset + 1,
  16. }
  17. }
  18. // makeCommits generates a slice of commits from a list of messages, it extracts
  19. // the topic, partition, and offset of each message and builds the corresponding
  20. // commit slice.
  21. func makeCommits(msgs ...Message) []commit {
  22. commits := make([]commit, len(msgs))
  23. for i, m := range msgs {
  24. commits[i] = makeCommit(m)
  25. }
  26. return commits
  27. }
  28. // commitRequest is the data type exchanged between the CommitMessages method
  29. // and internals of the reader's implementation.
  30. type commitRequest struct {
  31. commits []commit
  32. errch chan<- error
  33. }