12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758 |
- package kafka
- import (
- "math"
- "time"
- )
- const (
- maxTimeout = time.Duration(math.MaxInt32) * time.Millisecond
- minTimeout = time.Duration(math.MinInt32) * time.Millisecond
- defaultRTT = 1 * time.Second
- )
- func makeTime(t int64) time.Time {
- if t <= 0 {
- return time.Time{}
- }
- return time.Unix(t/1000, (t%1000)*int64(time.Millisecond)).UTC()
- }
- func timestamp(t time.Time) int64 {
- if t.IsZero() {
- return 0
- }
- return t.UnixNano() / int64(time.Millisecond)
- }
- func makeDuration(ms int32) time.Duration {
- return time.Duration(ms) * time.Millisecond
- }
- func milliseconds(d time.Duration) int32 {
- switch {
- case d > maxTimeout:
- d = maxTimeout
- case d < minTimeout:
- d = minTimeout
- }
- return int32(d / time.Millisecond)
- }
- func deadlineToTimeout(deadline time.Time, now time.Time) time.Duration {
- if deadline.IsZero() {
- return maxTimeout
- }
- return deadline.Sub(now)
- }
- func adjustDeadlineForRTT(deadline time.Time, now time.Time, rtt time.Duration) time.Time {
- if !deadline.IsZero() {
- timeout := deadline.Sub(now)
- if timeout < rtt {
- rtt = timeout / 4
- }
- deadline = deadline.Add(-rtt)
- }
- return deadline
- }
|