structs.go 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624
  1. package zk
  2. import (
  3. "encoding/binary"
  4. "errors"
  5. "log"
  6. "reflect"
  7. "runtime"
  8. "strings"
  9. "time"
  10. )
  11. var (
  12. ErrUnhandledFieldType = errors.New("zk: unhandled field type")
  13. ErrPtrExpected = errors.New("zk: encode/decode expect a non-nil pointer to struct")
  14. ErrShortBuffer = errors.New("zk: buffer too small")
  15. )
  16. type defaultLogger struct{}
  17. func (defaultLogger) Printf(format string, a ...interface{}) {
  18. log.Printf(format, a...)
  19. }
  20. type ACL struct {
  21. Perms int32
  22. Scheme string
  23. ID string
  24. }
  25. type Stat struct {
  26. Czxid int64 // The zxid of the change that caused this znode to be created.
  27. Mzxid int64 // The zxid of the change that last modified this znode.
  28. Ctime int64 // The time in milliseconds from epoch when this znode was created.
  29. Mtime int64 // The time in milliseconds from epoch when this znode was last modified.
  30. Version int32 // The number of changes to the data of this znode.
  31. Cversion int32 // The number of changes to the children of this znode.
  32. Aversion int32 // The number of changes to the ACL of this znode.
  33. EphemeralOwner int64 // The session id of the owner of this znode if the znode is an ephemeral node. If it is not an ephemeral node, it will be zero.
  34. DataLength int32 // The length of the data field of this znode.
  35. NumChildren int32 // The number of children of this znode.
  36. Pzxid int64 // last modified children
  37. }
  38. // ServerClient is the information for a single Zookeeper client and its session.
  39. // This is used to parse/extract the output fo the `cons` command.
  40. type ServerClient struct {
  41. Queued int64
  42. Received int64
  43. Sent int64
  44. SessionID int64
  45. Lcxid int64
  46. Lzxid int64
  47. Timeout int32
  48. LastLatency int32
  49. MinLatency int32
  50. AvgLatency int32
  51. MaxLatency int32
  52. Established time.Time
  53. LastResponse time.Time
  54. Addr string
  55. LastOperation string // maybe?
  56. Error error
  57. }
  58. // ServerClients is a struct for the FLWCons() function. It's used to provide
  59. // the list of Clients.
  60. //
  61. // This is needed because FLWCons() takes multiple servers.
  62. type ServerClients struct {
  63. Clients []*ServerClient
  64. Error error
  65. }
  66. // ServerStats is the information pulled from the Zookeeper `stat` command.
  67. type ServerStats struct {
  68. Sent int64
  69. Received int64
  70. NodeCount int64
  71. MinLatency int64
  72. AvgLatency int64
  73. MaxLatency int64
  74. Connections int64
  75. Outstanding int64
  76. Epoch int32
  77. Counter int32
  78. BuildTime time.Time
  79. Mode Mode
  80. Version string
  81. Error error
  82. }
  83. type requestHeader struct {
  84. Xid int32
  85. Opcode int32
  86. }
  87. type responseHeader struct {
  88. Xid int32
  89. Zxid int64
  90. Err ErrCode
  91. }
  92. type multiHeader struct {
  93. Type int32
  94. Done bool
  95. Err ErrCode
  96. }
  97. type auth struct {
  98. Type int32
  99. Scheme string
  100. Auth []byte
  101. }
  102. // Generic request structs
  103. type pathRequest struct {
  104. Path string
  105. }
  106. type PathVersionRequest struct {
  107. Path string
  108. Version int32
  109. }
  110. type pathWatchRequest struct {
  111. Path string
  112. Watch bool
  113. }
  114. type pathResponse struct {
  115. Path string
  116. }
  117. type statResponse struct {
  118. Stat Stat
  119. }
  120. //
  121. type CheckVersionRequest PathVersionRequest
  122. type closeRequest struct{}
  123. type closeResponse struct{}
  124. type connectRequest struct {
  125. ProtocolVersion int32
  126. LastZxidSeen int64
  127. TimeOut int32
  128. SessionID int64
  129. Passwd []byte
  130. }
  131. type connectResponse struct {
  132. ProtocolVersion int32
  133. TimeOut int32
  134. SessionID int64
  135. Passwd []byte
  136. }
  137. type CreateRequest struct {
  138. Path string
  139. Data []byte
  140. Acl []ACL
  141. Flags int32
  142. }
  143. type createResponse pathResponse
  144. type DeleteRequest PathVersionRequest
  145. type deleteResponse struct{}
  146. type errorResponse struct {
  147. Err int32
  148. }
  149. type existsRequest pathWatchRequest
  150. type existsResponse statResponse
  151. type getAclRequest pathRequest
  152. type getAclResponse struct {
  153. Acl []ACL
  154. Stat Stat
  155. }
  156. type getChildrenRequest pathRequest
  157. type getChildrenResponse struct {
  158. Children []string
  159. }
  160. type getChildren2Request pathWatchRequest
  161. type getChildren2Response struct {
  162. Children []string
  163. Stat Stat
  164. }
  165. type getDataRequest pathWatchRequest
  166. type getDataResponse struct {
  167. Data []byte
  168. Stat Stat
  169. }
  170. type getMaxChildrenRequest pathRequest
  171. type getMaxChildrenResponse struct {
  172. Max int32
  173. }
  174. type getSaslRequest struct {
  175. Token []byte
  176. }
  177. type pingRequest struct{}
  178. type pingResponse struct{}
  179. type setAclRequest struct {
  180. Path string
  181. Acl []ACL
  182. Version int32
  183. }
  184. type setAclResponse statResponse
  185. type SetDataRequest struct {
  186. Path string
  187. Data []byte
  188. Version int32
  189. }
  190. type setDataResponse statResponse
  191. type setMaxChildren struct {
  192. Path string
  193. Max int32
  194. }
  195. type setSaslRequest struct {
  196. Token string
  197. }
  198. type setSaslResponse struct {
  199. Token string
  200. }
  201. type setWatchesRequest struct {
  202. RelativeZxid int64
  203. DataWatches []string
  204. ExistWatches []string
  205. ChildWatches []string
  206. }
  207. type setWatchesResponse struct{}
  208. type syncRequest pathRequest
  209. type syncResponse pathResponse
  210. type setAuthRequest auth
  211. type setAuthResponse struct{}
  212. type multiRequestOp struct {
  213. Header multiHeader
  214. Op interface{}
  215. }
  216. type multiRequest struct {
  217. Ops []multiRequestOp
  218. DoneHeader multiHeader
  219. }
  220. type multiResponseOp struct {
  221. Header multiHeader
  222. String string
  223. Stat *Stat
  224. Err ErrCode
  225. }
  226. type multiResponse struct {
  227. Ops []multiResponseOp
  228. DoneHeader multiHeader
  229. }
  230. // zk version 3.5 reconfig API
  231. type reconfigRequest struct {
  232. JoiningServers []byte
  233. LeavingServers []byte
  234. NewMembers []byte
  235. // curConfigId version of the current configuration
  236. // optional - causes reconfiguration to return an error if configuration is no longer current
  237. CurConfigId int64
  238. }
  239. type reconfigReponse getDataResponse
  240. func (r *multiRequest) Encode(buf []byte) (int, error) {
  241. total := 0
  242. for _, op := range r.Ops {
  243. op.Header.Done = false
  244. n, err := encodePacketValue(buf[total:], reflect.ValueOf(op))
  245. if err != nil {
  246. return total, err
  247. }
  248. total += n
  249. }
  250. r.DoneHeader.Done = true
  251. n, err := encodePacketValue(buf[total:], reflect.ValueOf(r.DoneHeader))
  252. if err != nil {
  253. return total, err
  254. }
  255. total += n
  256. return total, nil
  257. }
  258. func (r *multiRequest) Decode(buf []byte) (int, error) {
  259. r.Ops = make([]multiRequestOp, 0)
  260. r.DoneHeader = multiHeader{-1, true, -1}
  261. total := 0
  262. for {
  263. header := &multiHeader{}
  264. n, err := decodePacketValue(buf[total:], reflect.ValueOf(header))
  265. if err != nil {
  266. return total, err
  267. }
  268. total += n
  269. if header.Done {
  270. r.DoneHeader = *header
  271. break
  272. }
  273. req := requestStructForOp(header.Type)
  274. if req == nil {
  275. return total, ErrAPIError
  276. }
  277. n, err = decodePacketValue(buf[total:], reflect.ValueOf(req))
  278. if err != nil {
  279. return total, err
  280. }
  281. total += n
  282. r.Ops = append(r.Ops, multiRequestOp{*header, req})
  283. }
  284. return total, nil
  285. }
  286. func (r *multiResponse) Decode(buf []byte) (int, error) {
  287. var multiErr error
  288. r.Ops = make([]multiResponseOp, 0)
  289. r.DoneHeader = multiHeader{-1, true, -1}
  290. total := 0
  291. for {
  292. header := &multiHeader{}
  293. n, err := decodePacketValue(buf[total:], reflect.ValueOf(header))
  294. if err != nil {
  295. return total, err
  296. }
  297. total += n
  298. if header.Done {
  299. r.DoneHeader = *header
  300. break
  301. }
  302. res := multiResponseOp{Header: *header}
  303. var w reflect.Value
  304. switch header.Type {
  305. default:
  306. return total, ErrAPIError
  307. case opError:
  308. w = reflect.ValueOf(&res.Err)
  309. case opCreate:
  310. w = reflect.ValueOf(&res.String)
  311. case opSetData:
  312. res.Stat = new(Stat)
  313. w = reflect.ValueOf(res.Stat)
  314. case opCheck, opDelete:
  315. }
  316. if w.IsValid() {
  317. n, err := decodePacketValue(buf[total:], w)
  318. if err != nil {
  319. return total, err
  320. }
  321. total += n
  322. }
  323. r.Ops = append(r.Ops, res)
  324. if multiErr == nil && res.Err != errOk {
  325. // Use the first error as the error returned from Multi().
  326. multiErr = res.Err.toError()
  327. }
  328. }
  329. return total, multiErr
  330. }
  331. type watcherEvent struct {
  332. Type EventType
  333. State State
  334. Path string
  335. }
  336. type decoder interface {
  337. Decode(buf []byte) (int, error)
  338. }
  339. type encoder interface {
  340. Encode(buf []byte) (int, error)
  341. }
  342. func decodePacket(buf []byte, st interface{}) (n int, err error) {
  343. defer func() {
  344. if r := recover(); r != nil {
  345. if e, ok := r.(runtime.Error); ok && strings.HasPrefix(e.Error(), "runtime error: slice bounds out of range") {
  346. err = ErrShortBuffer
  347. } else {
  348. panic(r)
  349. }
  350. }
  351. }()
  352. v := reflect.ValueOf(st)
  353. if v.Kind() != reflect.Ptr || v.IsNil() {
  354. return 0, ErrPtrExpected
  355. }
  356. return decodePacketValue(buf, v)
  357. }
  358. func decodePacketValue(buf []byte, v reflect.Value) (int, error) {
  359. rv := v
  360. kind := v.Kind()
  361. if kind == reflect.Ptr {
  362. if v.IsNil() {
  363. v.Set(reflect.New(v.Type().Elem()))
  364. }
  365. v = v.Elem()
  366. kind = v.Kind()
  367. }
  368. n := 0
  369. switch kind {
  370. default:
  371. return n, ErrUnhandledFieldType
  372. case reflect.Struct:
  373. if de, ok := rv.Interface().(decoder); ok {
  374. return de.Decode(buf)
  375. } else if de, ok := v.Interface().(decoder); ok {
  376. return de.Decode(buf)
  377. } else {
  378. for i := 0; i < v.NumField(); i++ {
  379. field := v.Field(i)
  380. n2, err := decodePacketValue(buf[n:], field)
  381. n += n2
  382. if err != nil {
  383. return n, err
  384. }
  385. }
  386. }
  387. case reflect.Bool:
  388. v.SetBool(buf[n] != 0)
  389. n++
  390. case reflect.Int32:
  391. v.SetInt(int64(binary.BigEndian.Uint32(buf[n : n+4])))
  392. n += 4
  393. case reflect.Int64:
  394. v.SetInt(int64(binary.BigEndian.Uint64(buf[n : n+8])))
  395. n += 8
  396. case reflect.String:
  397. ln := int(binary.BigEndian.Uint32(buf[n : n+4]))
  398. v.SetString(string(buf[n+4 : n+4+ln]))
  399. n += 4 + ln
  400. case reflect.Slice:
  401. switch v.Type().Elem().Kind() {
  402. default:
  403. count := int(binary.BigEndian.Uint32(buf[n : n+4]))
  404. n += 4
  405. values := reflect.MakeSlice(v.Type(), count, count)
  406. v.Set(values)
  407. for i := 0; i < count; i++ {
  408. n2, err := decodePacketValue(buf[n:], values.Index(i))
  409. n += n2
  410. if err != nil {
  411. return n, err
  412. }
  413. }
  414. case reflect.Uint8:
  415. ln := int(int32(binary.BigEndian.Uint32(buf[n : n+4])))
  416. if ln < 0 {
  417. n += 4
  418. v.SetBytes(nil)
  419. } else {
  420. bytes := make([]byte, ln)
  421. copy(bytes, buf[n+4:n+4+ln])
  422. v.SetBytes(bytes)
  423. n += 4 + ln
  424. }
  425. }
  426. }
  427. return n, nil
  428. }
  429. func encodePacket(buf []byte, st interface{}) (n int, err error) {
  430. defer func() {
  431. if r := recover(); r != nil {
  432. if e, ok := r.(runtime.Error); ok && strings.HasPrefix(e.Error(), "runtime error: slice bounds out of range") {
  433. err = ErrShortBuffer
  434. } else {
  435. panic(r)
  436. }
  437. }
  438. }()
  439. v := reflect.ValueOf(st)
  440. if v.Kind() != reflect.Ptr || v.IsNil() {
  441. return 0, ErrPtrExpected
  442. }
  443. return encodePacketValue(buf, v)
  444. }
  445. func encodePacketValue(buf []byte, v reflect.Value) (int, error) {
  446. rv := v
  447. for v.Kind() == reflect.Ptr || v.Kind() == reflect.Interface {
  448. v = v.Elem()
  449. }
  450. n := 0
  451. switch v.Kind() {
  452. default:
  453. return n, ErrUnhandledFieldType
  454. case reflect.Struct:
  455. if en, ok := rv.Interface().(encoder); ok {
  456. return en.Encode(buf)
  457. } else if en, ok := v.Interface().(encoder); ok {
  458. return en.Encode(buf)
  459. } else {
  460. for i := 0; i < v.NumField(); i++ {
  461. field := v.Field(i)
  462. n2, err := encodePacketValue(buf[n:], field)
  463. n += n2
  464. if err != nil {
  465. return n, err
  466. }
  467. }
  468. }
  469. case reflect.Bool:
  470. if v.Bool() {
  471. buf[n] = 1
  472. } else {
  473. buf[n] = 0
  474. }
  475. n++
  476. case reflect.Int32:
  477. binary.BigEndian.PutUint32(buf[n:n+4], uint32(v.Int()))
  478. n += 4
  479. case reflect.Int64:
  480. binary.BigEndian.PutUint64(buf[n:n+8], uint64(v.Int()))
  481. n += 8
  482. case reflect.String:
  483. str := v.String()
  484. binary.BigEndian.PutUint32(buf[n:n+4], uint32(len(str)))
  485. copy(buf[n+4:n+4+len(str)], []byte(str))
  486. n += 4 + len(str)
  487. case reflect.Slice:
  488. switch v.Type().Elem().Kind() {
  489. default:
  490. count := v.Len()
  491. startN := n
  492. n += 4
  493. for i := 0; i < count; i++ {
  494. n2, err := encodePacketValue(buf[n:], v.Index(i))
  495. n += n2
  496. if err != nil {
  497. return n, err
  498. }
  499. }
  500. binary.BigEndian.PutUint32(buf[startN:startN+4], uint32(count))
  501. case reflect.Uint8:
  502. if v.IsNil() {
  503. binary.BigEndian.PutUint32(buf[n:n+4], uint32(0xffffffff))
  504. n += 4
  505. } else {
  506. bytes := v.Bytes()
  507. binary.BigEndian.PutUint32(buf[n:n+4], uint32(len(bytes)))
  508. copy(buf[n+4:n+4+len(bytes)], bytes)
  509. n += 4 + len(bytes)
  510. }
  511. }
  512. }
  513. return n, nil
  514. }
  515. func requestStructForOp(op int32) interface{} {
  516. switch op {
  517. case opClose:
  518. return &closeRequest{}
  519. case opCreate:
  520. return &CreateRequest{}
  521. case opDelete:
  522. return &DeleteRequest{}
  523. case opExists:
  524. return &existsRequest{}
  525. case opGetAcl:
  526. return &getAclRequest{}
  527. case opGetChildren:
  528. return &getChildrenRequest{}
  529. case opGetChildren2:
  530. return &getChildren2Request{}
  531. case opGetData:
  532. return &getDataRequest{}
  533. case opPing:
  534. return &pingRequest{}
  535. case opSetAcl:
  536. return &setAclRequest{}
  537. case opSetData:
  538. return &SetDataRequest{}
  539. case opSetWatches:
  540. return &setWatchesRequest{}
  541. case opSync:
  542. return &syncRequest{}
  543. case opSetAuth:
  544. return &setAuthRequest{}
  545. case opCheck:
  546. return &CheckVersionRequest{}
  547. case opMulti:
  548. return &multiRequest{}
  549. case opReconfig:
  550. return &reconfigRequest{}
  551. }
  552. return nil
  553. }