123456789101112131415161718192021222324252627282930313233343536373839 |
- package kafka
- // A commit represents the instruction of publishing an update of the last
- // offset read by a program for a topic and partition.
- type commit struct {
- topic string
- partition int
- offset int64
- }
- // makeCommit builds a commit value from a message, the resulting commit takes
- // its topic, partition, and offset from the message.
- func makeCommit(msg Message) commit {
- return commit{
- topic: msg.Topic,
- partition: msg.Partition,
- offset: msg.Offset + 1,
- }
- }
- // makeCommits generates a slice of commits from a list of messages, it extracts
- // the topic, partition, and offset of each message and builds the corresponding
- // commit slice.
- func makeCommits(msgs ...Message) []commit {
- commits := make([]commit, len(msgs))
- for i, m := range msgs {
- commits[i] = makeCommit(m)
- }
- return commits
- }
- // commitRequest is the data type exchanged between the CommitMessages method
- // and internals of the reader's implementation.
- type commitRequest struct {
- commits []commit
- errch chan<- error
- }
|