funcs.go 20 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739
  1. /*
  2. *
  3. * Copyright 2018 gRPC authors.
  4. *
  5. * Licensed under the Apache License, Version 2.0 (the "License");
  6. * you may not use this file except in compliance with the License.
  7. * You may obtain a copy of the License at
  8. *
  9. * http://www.apache.org/licenses/LICENSE-2.0
  10. *
  11. * Unless required by applicable law or agreed to in writing, software
  12. * distributed under the License is distributed on an "AS IS" BASIS,
  13. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  14. * See the License for the specific language governing permissions and
  15. * limitations under the License.
  16. *
  17. */
  18. // Package channelz defines APIs for enabling channelz service, entry
  19. // registration/deletion, and accessing channelz data. It also defines channelz
  20. // metric struct formats.
  21. //
  22. // All APIs in this package are experimental.
  23. package channelz
  24. import (
  25. "fmt"
  26. "sort"
  27. "sync"
  28. "sync/atomic"
  29. "time"
  30. "google.golang.org/grpc/grpclog"
  31. )
  32. const (
  33. defaultMaxTraceEntry int32 = 30
  34. )
  35. var (
  36. db dbWrapper
  37. idGen idGenerator
  38. // EntryPerPage defines the number of channelz entries to be shown on a web page.
  39. EntryPerPage = int64(50)
  40. curState int32
  41. maxTraceEntry = defaultMaxTraceEntry
  42. )
  43. // TurnOn turns on channelz data collection.
  44. func TurnOn() {
  45. if !IsOn() {
  46. NewChannelzStorage()
  47. atomic.StoreInt32(&curState, 1)
  48. }
  49. }
  50. // IsOn returns whether channelz data collection is on.
  51. func IsOn() bool {
  52. return atomic.CompareAndSwapInt32(&curState, 1, 1)
  53. }
  54. // SetMaxTraceEntry sets maximum number of trace entry per entity (i.e. channel/subchannel).
  55. // Setting it to 0 will disable channel tracing.
  56. func SetMaxTraceEntry(i int32) {
  57. atomic.StoreInt32(&maxTraceEntry, i)
  58. }
  59. // ResetMaxTraceEntryToDefault resets the maximum number of trace entry per entity to default.
  60. func ResetMaxTraceEntryToDefault() {
  61. atomic.StoreInt32(&maxTraceEntry, defaultMaxTraceEntry)
  62. }
  63. func getMaxTraceEntry() int {
  64. i := atomic.LoadInt32(&maxTraceEntry)
  65. return int(i)
  66. }
  67. // dbWarpper wraps around a reference to internal channelz data storage, and
  68. // provide synchronized functionality to set and get the reference.
  69. type dbWrapper struct {
  70. mu sync.RWMutex
  71. DB *channelMap
  72. }
  73. func (d *dbWrapper) set(db *channelMap) {
  74. d.mu.Lock()
  75. d.DB = db
  76. d.mu.Unlock()
  77. }
  78. func (d *dbWrapper) get() *channelMap {
  79. d.mu.RLock()
  80. defer d.mu.RUnlock()
  81. return d.DB
  82. }
  83. // NewChannelzStorage initializes channelz data storage and id generator.
  84. //
  85. // This function returns a cleanup function to wait for all channelz state to be reset by the
  86. // grpc goroutines when those entities get closed. By using this cleanup function, we make sure tests
  87. // don't mess up each other, i.e. lingering goroutine from previous test doing entity removal happen
  88. // to remove some entity just register by the new test, since the id space is the same.
  89. //
  90. // Note: This function is exported for testing purpose only. User should not call
  91. // it in most cases.
  92. func NewChannelzStorage() (cleanup func() error) {
  93. db.set(&channelMap{
  94. topLevelChannels: make(map[int64]struct{}),
  95. channels: make(map[int64]*channel),
  96. listenSockets: make(map[int64]*listenSocket),
  97. normalSockets: make(map[int64]*normalSocket),
  98. servers: make(map[int64]*server),
  99. subChannels: make(map[int64]*subChannel),
  100. })
  101. idGen.reset()
  102. return func() error {
  103. var err error
  104. cm := db.get()
  105. if cm == nil {
  106. return nil
  107. }
  108. for i := 0; i < 1000; i++ {
  109. cm.mu.Lock()
  110. if len(cm.topLevelChannels) == 0 && len(cm.servers) == 0 && len(cm.channels) == 0 && len(cm.subChannels) == 0 && len(cm.listenSockets) == 0 && len(cm.normalSockets) == 0 {
  111. cm.mu.Unlock()
  112. // all things stored in the channelz map have been cleared.
  113. return nil
  114. }
  115. cm.mu.Unlock()
  116. time.Sleep(10 * time.Millisecond)
  117. }
  118. cm.mu.Lock()
  119. err = fmt.Errorf("after 10s the channelz map has not been cleaned up yet, topchannels: %d, servers: %d, channels: %d, subchannels: %d, listen sockets: %d, normal sockets: %d", len(cm.topLevelChannels), len(cm.servers), len(cm.channels), len(cm.subChannels), len(cm.listenSockets), len(cm.normalSockets))
  120. cm.mu.Unlock()
  121. return err
  122. }
  123. }
  124. // GetTopChannels returns a slice of top channel's ChannelMetric, along with a
  125. // boolean indicating whether there's more top channels to be queried for.
  126. //
  127. // The arg id specifies that only top channel with id at or above it will be included
  128. // in the result. The returned slice is up to a length of the arg maxResults or
  129. // EntryPerPage if maxResults is zero, and is sorted in ascending id order.
  130. func GetTopChannels(id int64, maxResults int64) ([]*ChannelMetric, bool) {
  131. return db.get().GetTopChannels(id, maxResults)
  132. }
  133. // GetServers returns a slice of server's ServerMetric, along with a
  134. // boolean indicating whether there's more servers to be queried for.
  135. //
  136. // The arg id specifies that only server with id at or above it will be included
  137. // in the result. The returned slice is up to a length of the arg maxResults or
  138. // EntryPerPage if maxResults is zero, and is sorted in ascending id order.
  139. func GetServers(id int64, maxResults int64) ([]*ServerMetric, bool) {
  140. return db.get().GetServers(id, maxResults)
  141. }
  142. // GetServerSockets returns a slice of server's (identified by id) normal socket's
  143. // SocketMetric, along with a boolean indicating whether there's more sockets to
  144. // be queried for.
  145. //
  146. // The arg startID specifies that only sockets with id at or above it will be
  147. // included in the result. The returned slice is up to a length of the arg maxResults
  148. // or EntryPerPage if maxResults is zero, and is sorted in ascending id order.
  149. func GetServerSockets(id int64, startID int64, maxResults int64) ([]*SocketMetric, bool) {
  150. return db.get().GetServerSockets(id, startID, maxResults)
  151. }
  152. // GetChannel returns the ChannelMetric for the channel (identified by id).
  153. func GetChannel(id int64) *ChannelMetric {
  154. return db.get().GetChannel(id)
  155. }
  156. // GetSubChannel returns the SubChannelMetric for the subchannel (identified by id).
  157. func GetSubChannel(id int64) *SubChannelMetric {
  158. return db.get().GetSubChannel(id)
  159. }
  160. // GetSocket returns the SocketInternalMetric for the socket (identified by id).
  161. func GetSocket(id int64) *SocketMetric {
  162. return db.get().GetSocket(id)
  163. }
  164. // GetServer returns the ServerMetric for the server (identified by id).
  165. func GetServer(id int64) *ServerMetric {
  166. return db.get().GetServer(id)
  167. }
  168. // RegisterChannel registers the given channel c in channelz database with ref
  169. // as its reference name, and add it to the child list of its parent (identified
  170. // by pid). pid = 0 means no parent. It returns the unique channelz tracking id
  171. // assigned to this channel.
  172. func RegisterChannel(c Channel, pid int64, ref string) int64 {
  173. id := idGen.genID()
  174. cn := &channel{
  175. refName: ref,
  176. c: c,
  177. subChans: make(map[int64]string),
  178. nestedChans: make(map[int64]string),
  179. id: id,
  180. pid: pid,
  181. trace: &channelTrace{createdTime: time.Now(), events: make([]*TraceEvent, 0, getMaxTraceEntry())},
  182. }
  183. if pid == 0 {
  184. db.get().addChannel(id, cn, true, pid, ref)
  185. } else {
  186. db.get().addChannel(id, cn, false, pid, ref)
  187. }
  188. return id
  189. }
  190. // RegisterSubChannel registers the given channel c in channelz database with ref
  191. // as its reference name, and add it to the child list of its parent (identified
  192. // by pid). It returns the unique channelz tracking id assigned to this subchannel.
  193. func RegisterSubChannel(c Channel, pid int64, ref string) int64 {
  194. if pid == 0 {
  195. logger.Error("a SubChannel's parent id cannot be 0")
  196. return 0
  197. }
  198. id := idGen.genID()
  199. sc := &subChannel{
  200. refName: ref,
  201. c: c,
  202. sockets: make(map[int64]string),
  203. id: id,
  204. pid: pid,
  205. trace: &channelTrace{createdTime: time.Now(), events: make([]*TraceEvent, 0, getMaxTraceEntry())},
  206. }
  207. db.get().addSubChannel(id, sc, pid, ref)
  208. return id
  209. }
  210. // RegisterServer registers the given server s in channelz database. It returns
  211. // the unique channelz tracking id assigned to this server.
  212. func RegisterServer(s Server, ref string) int64 {
  213. id := idGen.genID()
  214. svr := &server{
  215. refName: ref,
  216. s: s,
  217. sockets: make(map[int64]string),
  218. listenSockets: make(map[int64]string),
  219. id: id,
  220. }
  221. db.get().addServer(id, svr)
  222. return id
  223. }
  224. // RegisterListenSocket registers the given listen socket s in channelz database
  225. // with ref as its reference name, and add it to the child list of its parent
  226. // (identified by pid). It returns the unique channelz tracking id assigned to
  227. // this listen socket.
  228. func RegisterListenSocket(s Socket, pid int64, ref string) int64 {
  229. if pid == 0 {
  230. logger.Error("a ListenSocket's parent id cannot be 0")
  231. return 0
  232. }
  233. id := idGen.genID()
  234. ls := &listenSocket{refName: ref, s: s, id: id, pid: pid}
  235. db.get().addListenSocket(id, ls, pid, ref)
  236. return id
  237. }
  238. // RegisterNormalSocket registers the given normal socket s in channelz database
  239. // with ref as its reference name, and add it to the child list of its parent
  240. // (identified by pid). It returns the unique channelz tracking id assigned to
  241. // this normal socket.
  242. func RegisterNormalSocket(s Socket, pid int64, ref string) int64 {
  243. if pid == 0 {
  244. logger.Error("a NormalSocket's parent id cannot be 0")
  245. return 0
  246. }
  247. id := idGen.genID()
  248. ns := &normalSocket{refName: ref, s: s, id: id, pid: pid}
  249. db.get().addNormalSocket(id, ns, pid, ref)
  250. return id
  251. }
  252. // RemoveEntry removes an entry with unique channelz trakcing id to be id from
  253. // channelz database.
  254. func RemoveEntry(id int64) {
  255. db.get().removeEntry(id)
  256. }
  257. // TraceEventDesc is what the caller of AddTraceEvent should provide to describe the event to be added
  258. // to the channel trace.
  259. // The Parent field is optional. It is used for event that will be recorded in the entity's parent
  260. // trace also.
  261. type TraceEventDesc struct {
  262. Desc string
  263. Severity Severity
  264. Parent *TraceEventDesc
  265. }
  266. // AddTraceEvent adds trace related to the entity with specified id, using the provided TraceEventDesc.
  267. func AddTraceEvent(l grpclog.DepthLoggerV2, id int64, depth int, desc *TraceEventDesc) {
  268. for d := desc; d != nil; d = d.Parent {
  269. switch d.Severity {
  270. case CtUNKNOWN:
  271. l.InfoDepth(depth+1, d.Desc)
  272. case CtINFO:
  273. l.InfoDepth(depth+1, d.Desc)
  274. case CtWarning:
  275. l.WarningDepth(depth+1, d.Desc)
  276. case CtError:
  277. l.ErrorDepth(depth+1, d.Desc)
  278. }
  279. }
  280. if getMaxTraceEntry() == 0 {
  281. return
  282. }
  283. db.get().traceEvent(id, desc)
  284. }
  285. // channelMap is the storage data structure for channelz.
  286. // Methods of channelMap can be divided in two two categories with respect to locking.
  287. // 1. Methods acquire the global lock.
  288. // 2. Methods that can only be called when global lock is held.
  289. // A second type of method need always to be called inside a first type of method.
  290. type channelMap struct {
  291. mu sync.RWMutex
  292. topLevelChannels map[int64]struct{}
  293. servers map[int64]*server
  294. channels map[int64]*channel
  295. subChannels map[int64]*subChannel
  296. listenSockets map[int64]*listenSocket
  297. normalSockets map[int64]*normalSocket
  298. }
  299. func (c *channelMap) addServer(id int64, s *server) {
  300. c.mu.Lock()
  301. s.cm = c
  302. c.servers[id] = s
  303. c.mu.Unlock()
  304. }
  305. func (c *channelMap) addChannel(id int64, cn *channel, isTopChannel bool, pid int64, ref string) {
  306. c.mu.Lock()
  307. cn.cm = c
  308. cn.trace.cm = c
  309. c.channels[id] = cn
  310. if isTopChannel {
  311. c.topLevelChannels[id] = struct{}{}
  312. } else {
  313. c.findEntry(pid).addChild(id, cn)
  314. }
  315. c.mu.Unlock()
  316. }
  317. func (c *channelMap) addSubChannel(id int64, sc *subChannel, pid int64, ref string) {
  318. c.mu.Lock()
  319. sc.cm = c
  320. sc.trace.cm = c
  321. c.subChannels[id] = sc
  322. c.findEntry(pid).addChild(id, sc)
  323. c.mu.Unlock()
  324. }
  325. func (c *channelMap) addListenSocket(id int64, ls *listenSocket, pid int64, ref string) {
  326. c.mu.Lock()
  327. ls.cm = c
  328. c.listenSockets[id] = ls
  329. c.findEntry(pid).addChild(id, ls)
  330. c.mu.Unlock()
  331. }
  332. func (c *channelMap) addNormalSocket(id int64, ns *normalSocket, pid int64, ref string) {
  333. c.mu.Lock()
  334. ns.cm = c
  335. c.normalSockets[id] = ns
  336. c.findEntry(pid).addChild(id, ns)
  337. c.mu.Unlock()
  338. }
  339. // removeEntry triggers the removal of an entry, which may not indeed delete the entry, if it has to
  340. // wait on the deletion of its children and until no other entity's channel trace references it.
  341. // It may lead to a chain of entry deletion. For example, deleting the last socket of a gracefully
  342. // shutting down server will lead to the server being also deleted.
  343. func (c *channelMap) removeEntry(id int64) {
  344. c.mu.Lock()
  345. c.findEntry(id).triggerDelete()
  346. c.mu.Unlock()
  347. }
  348. // c.mu must be held by the caller
  349. func (c *channelMap) decrTraceRefCount(id int64) {
  350. e := c.findEntry(id)
  351. if v, ok := e.(tracedChannel); ok {
  352. v.decrTraceRefCount()
  353. e.deleteSelfIfReady()
  354. }
  355. }
  356. // c.mu must be held by the caller.
  357. func (c *channelMap) findEntry(id int64) entry {
  358. var v entry
  359. var ok bool
  360. if v, ok = c.channels[id]; ok {
  361. return v
  362. }
  363. if v, ok = c.subChannels[id]; ok {
  364. return v
  365. }
  366. if v, ok = c.servers[id]; ok {
  367. return v
  368. }
  369. if v, ok = c.listenSockets[id]; ok {
  370. return v
  371. }
  372. if v, ok = c.normalSockets[id]; ok {
  373. return v
  374. }
  375. return &dummyEntry{idNotFound: id}
  376. }
  377. // c.mu must be held by the caller
  378. // deleteEntry simply deletes an entry from the channelMap. Before calling this
  379. // method, caller must check this entry is ready to be deleted, i.e removeEntry()
  380. // has been called on it, and no children still exist.
  381. // Conditionals are ordered by the expected frequency of deletion of each entity
  382. // type, in order to optimize performance.
  383. func (c *channelMap) deleteEntry(id int64) {
  384. var ok bool
  385. if _, ok = c.normalSockets[id]; ok {
  386. delete(c.normalSockets, id)
  387. return
  388. }
  389. if _, ok = c.subChannels[id]; ok {
  390. delete(c.subChannels, id)
  391. return
  392. }
  393. if _, ok = c.channels[id]; ok {
  394. delete(c.channels, id)
  395. delete(c.topLevelChannels, id)
  396. return
  397. }
  398. if _, ok = c.listenSockets[id]; ok {
  399. delete(c.listenSockets, id)
  400. return
  401. }
  402. if _, ok = c.servers[id]; ok {
  403. delete(c.servers, id)
  404. return
  405. }
  406. }
  407. func (c *channelMap) traceEvent(id int64, desc *TraceEventDesc) {
  408. c.mu.Lock()
  409. child := c.findEntry(id)
  410. childTC, ok := child.(tracedChannel)
  411. if !ok {
  412. c.mu.Unlock()
  413. return
  414. }
  415. childTC.getChannelTrace().append(&TraceEvent{Desc: desc.Desc, Severity: desc.Severity, Timestamp: time.Now()})
  416. if desc.Parent != nil {
  417. parent := c.findEntry(child.getParentID())
  418. var chanType RefChannelType
  419. switch child.(type) {
  420. case *channel:
  421. chanType = RefChannel
  422. case *subChannel:
  423. chanType = RefSubChannel
  424. }
  425. if parentTC, ok := parent.(tracedChannel); ok {
  426. parentTC.getChannelTrace().append(&TraceEvent{
  427. Desc: desc.Parent.Desc,
  428. Severity: desc.Parent.Severity,
  429. Timestamp: time.Now(),
  430. RefID: id,
  431. RefName: childTC.getRefName(),
  432. RefType: chanType,
  433. })
  434. childTC.incrTraceRefCount()
  435. }
  436. }
  437. c.mu.Unlock()
  438. }
  439. type int64Slice []int64
  440. func (s int64Slice) Len() int { return len(s) }
  441. func (s int64Slice) Swap(i, j int) { s[i], s[j] = s[j], s[i] }
  442. func (s int64Slice) Less(i, j int) bool { return s[i] < s[j] }
  443. func copyMap(m map[int64]string) map[int64]string {
  444. n := make(map[int64]string)
  445. for k, v := range m {
  446. n[k] = v
  447. }
  448. return n
  449. }
  450. func min(a, b int64) int64 {
  451. if a < b {
  452. return a
  453. }
  454. return b
  455. }
  456. func (c *channelMap) GetTopChannels(id int64, maxResults int64) ([]*ChannelMetric, bool) {
  457. if maxResults <= 0 {
  458. maxResults = EntryPerPage
  459. }
  460. c.mu.RLock()
  461. l := int64(len(c.topLevelChannels))
  462. ids := make([]int64, 0, l)
  463. cns := make([]*channel, 0, min(l, maxResults))
  464. for k := range c.topLevelChannels {
  465. ids = append(ids, k)
  466. }
  467. sort.Sort(int64Slice(ids))
  468. idx := sort.Search(len(ids), func(i int) bool { return ids[i] >= id })
  469. count := int64(0)
  470. var end bool
  471. var t []*ChannelMetric
  472. for i, v := range ids[idx:] {
  473. if count == maxResults {
  474. break
  475. }
  476. if cn, ok := c.channels[v]; ok {
  477. cns = append(cns, cn)
  478. t = append(t, &ChannelMetric{
  479. NestedChans: copyMap(cn.nestedChans),
  480. SubChans: copyMap(cn.subChans),
  481. })
  482. count++
  483. }
  484. if i == len(ids[idx:])-1 {
  485. end = true
  486. break
  487. }
  488. }
  489. c.mu.RUnlock()
  490. if count == 0 {
  491. end = true
  492. }
  493. for i, cn := range cns {
  494. t[i].ChannelData = cn.c.ChannelzMetric()
  495. t[i].ID = cn.id
  496. t[i].RefName = cn.refName
  497. t[i].Trace = cn.trace.dumpData()
  498. }
  499. return t, end
  500. }
  501. func (c *channelMap) GetServers(id, maxResults int64) ([]*ServerMetric, bool) {
  502. if maxResults <= 0 {
  503. maxResults = EntryPerPage
  504. }
  505. c.mu.RLock()
  506. l := int64(len(c.servers))
  507. ids := make([]int64, 0, l)
  508. ss := make([]*server, 0, min(l, maxResults))
  509. for k := range c.servers {
  510. ids = append(ids, k)
  511. }
  512. sort.Sort(int64Slice(ids))
  513. idx := sort.Search(len(ids), func(i int) bool { return ids[i] >= id })
  514. count := int64(0)
  515. var end bool
  516. var s []*ServerMetric
  517. for i, v := range ids[idx:] {
  518. if count == maxResults {
  519. break
  520. }
  521. if svr, ok := c.servers[v]; ok {
  522. ss = append(ss, svr)
  523. s = append(s, &ServerMetric{
  524. ListenSockets: copyMap(svr.listenSockets),
  525. })
  526. count++
  527. }
  528. if i == len(ids[idx:])-1 {
  529. end = true
  530. break
  531. }
  532. }
  533. c.mu.RUnlock()
  534. if count == 0 {
  535. end = true
  536. }
  537. for i, svr := range ss {
  538. s[i].ServerData = svr.s.ChannelzMetric()
  539. s[i].ID = svr.id
  540. s[i].RefName = svr.refName
  541. }
  542. return s, end
  543. }
  544. func (c *channelMap) GetServerSockets(id int64, startID int64, maxResults int64) ([]*SocketMetric, bool) {
  545. if maxResults <= 0 {
  546. maxResults = EntryPerPage
  547. }
  548. var svr *server
  549. var ok bool
  550. c.mu.RLock()
  551. if svr, ok = c.servers[id]; !ok {
  552. // server with id doesn't exist.
  553. c.mu.RUnlock()
  554. return nil, true
  555. }
  556. svrskts := svr.sockets
  557. l := int64(len(svrskts))
  558. ids := make([]int64, 0, l)
  559. sks := make([]*normalSocket, 0, min(l, maxResults))
  560. for k := range svrskts {
  561. ids = append(ids, k)
  562. }
  563. sort.Sort(int64Slice(ids))
  564. idx := sort.Search(len(ids), func(i int) bool { return ids[i] >= startID })
  565. count := int64(0)
  566. var end bool
  567. for i, v := range ids[idx:] {
  568. if count == maxResults {
  569. break
  570. }
  571. if ns, ok := c.normalSockets[v]; ok {
  572. sks = append(sks, ns)
  573. count++
  574. }
  575. if i == len(ids[idx:])-1 {
  576. end = true
  577. break
  578. }
  579. }
  580. c.mu.RUnlock()
  581. if count == 0 {
  582. end = true
  583. }
  584. var s []*SocketMetric
  585. for _, ns := range sks {
  586. sm := &SocketMetric{}
  587. sm.SocketData = ns.s.ChannelzMetric()
  588. sm.ID = ns.id
  589. sm.RefName = ns.refName
  590. s = append(s, sm)
  591. }
  592. return s, end
  593. }
  594. func (c *channelMap) GetChannel(id int64) *ChannelMetric {
  595. cm := &ChannelMetric{}
  596. var cn *channel
  597. var ok bool
  598. c.mu.RLock()
  599. if cn, ok = c.channels[id]; !ok {
  600. // channel with id doesn't exist.
  601. c.mu.RUnlock()
  602. return nil
  603. }
  604. cm.NestedChans = copyMap(cn.nestedChans)
  605. cm.SubChans = copyMap(cn.subChans)
  606. // cn.c can be set to &dummyChannel{} when deleteSelfFromMap is called. Save a copy of cn.c when
  607. // holding the lock to prevent potential data race.
  608. chanCopy := cn.c
  609. c.mu.RUnlock()
  610. cm.ChannelData = chanCopy.ChannelzMetric()
  611. cm.ID = cn.id
  612. cm.RefName = cn.refName
  613. cm.Trace = cn.trace.dumpData()
  614. return cm
  615. }
  616. func (c *channelMap) GetSubChannel(id int64) *SubChannelMetric {
  617. cm := &SubChannelMetric{}
  618. var sc *subChannel
  619. var ok bool
  620. c.mu.RLock()
  621. if sc, ok = c.subChannels[id]; !ok {
  622. // subchannel with id doesn't exist.
  623. c.mu.RUnlock()
  624. return nil
  625. }
  626. cm.Sockets = copyMap(sc.sockets)
  627. // sc.c can be set to &dummyChannel{} when deleteSelfFromMap is called. Save a copy of sc.c when
  628. // holding the lock to prevent potential data race.
  629. chanCopy := sc.c
  630. c.mu.RUnlock()
  631. cm.ChannelData = chanCopy.ChannelzMetric()
  632. cm.ID = sc.id
  633. cm.RefName = sc.refName
  634. cm.Trace = sc.trace.dumpData()
  635. return cm
  636. }
  637. func (c *channelMap) GetSocket(id int64) *SocketMetric {
  638. sm := &SocketMetric{}
  639. c.mu.RLock()
  640. if ls, ok := c.listenSockets[id]; ok {
  641. c.mu.RUnlock()
  642. sm.SocketData = ls.s.ChannelzMetric()
  643. sm.ID = ls.id
  644. sm.RefName = ls.refName
  645. return sm
  646. }
  647. if ns, ok := c.normalSockets[id]; ok {
  648. c.mu.RUnlock()
  649. sm.SocketData = ns.s.ChannelzMetric()
  650. sm.ID = ns.id
  651. sm.RefName = ns.refName
  652. return sm
  653. }
  654. c.mu.RUnlock()
  655. return nil
  656. }
  657. func (c *channelMap) GetServer(id int64) *ServerMetric {
  658. sm := &ServerMetric{}
  659. var svr *server
  660. var ok bool
  661. c.mu.RLock()
  662. if svr, ok = c.servers[id]; !ok {
  663. c.mu.RUnlock()
  664. return nil
  665. }
  666. sm.ListenSockets = copyMap(svr.listenSockets)
  667. c.mu.RUnlock()
  668. sm.ID = svr.id
  669. sm.RefName = svr.refName
  670. sm.ServerData = svr.s.ChannelzMetric()
  671. return sm
  672. }
  673. type idGenerator struct {
  674. id int64
  675. }
  676. func (i *idGenerator) reset() {
  677. atomic.StoreInt64(&i.id, 0)
  678. }
  679. func (i *idGenerator) genID() int64 {
  680. return atomic.AddInt64(&i.id, 1)
  681. }