api.go 17 KB


  1. // Copyright 2011 Google Inc. All rights reserved.
  2. // Use of this source code is governed by the Apache 2.0
  3. // license that can be found in the LICENSE file.
  4. // +build !appengine
  5. package internal
  6. import (
  7. "bytes"
  8. "errors"
  9. "fmt"
  10. "io/ioutil"
  11. "log"
  12. "net"
  13. "net/http"
  14. "net/url"
  15. "os"
  16. "runtime"
  17. "strconv"
  18. "strings"
  19. "sync"
  20. "sync/atomic"
  21. "time"
  22. "github.com/golang/protobuf/proto"
  23. netcontext "golang.org/x/net/context"
  24. basepb "google.golang.org/appengine/internal/base"
  25. logpb "google.golang.org/appengine/internal/log"
  26. remotepb "google.golang.org/appengine/internal/remote_api"
  27. )
  28. const (
  29. apiPath = "/rpc_http"
  30. defaultTicketSuffix = "/default.20150612t184001.0"
  31. )
  32. var (
  33. // Incoming headers.
  34. ticketHeader = http.CanonicalHeaderKey("X-AppEngine-API-Ticket")
  35. dapperHeader = http.CanonicalHeaderKey("X-Google-DapperTraceInfo")
  36. traceHeader = http.CanonicalHeaderKey("X-Cloud-Trace-Context")
  37. curNamespaceHeader = http.CanonicalHeaderKey("X-AppEngine-Current-Namespace")
  38. userIPHeader = http.CanonicalHeaderKey("X-AppEngine-User-IP")
  39. remoteAddrHeader = http.CanonicalHeaderKey("X-AppEngine-Remote-Addr")
  40. devRequestIdHeader = http.CanonicalHeaderKey("X-Appengine-Dev-Request-Id")
  41. // Outgoing headers.
  42. apiEndpointHeader = http.CanonicalHeaderKey("X-Google-RPC-Service-Endpoint")
  43. apiEndpointHeaderValue = []string{"app-engine-apis"}
  44. apiMethodHeader = http.CanonicalHeaderKey("X-Google-RPC-Service-Method")
  45. apiMethodHeaderValue = []string{"/VMRemoteAPI.CallRemoteAPI"}
  46. apiDeadlineHeader = http.CanonicalHeaderKey("X-Google-RPC-Service-Deadline")
  47. apiContentType = http.CanonicalHeaderKey("Content-Type")
  48. apiContentTypeValue = []string{"application/octet-stream"}
  49. logFlushHeader = http.CanonicalHeaderKey("X-AppEngine-Log-Flush-Count")
  50. apiHTTPClient = &http.Client{
  51. Transport: &http.Transport{
  52. Proxy: http.ProxyFromEnvironment,
  53. Dial: limitDial,
  54. MaxIdleConns: 1000,
  55. MaxIdleConnsPerHost: 10000,
  56. IdleConnTimeout: 90 * time.Second,
  57. },
  58. }
  59. defaultTicketOnce sync.Once
  60. defaultTicket string
  61. backgroundContextOnce sync.Once
  62. backgroundContext netcontext.Context
  63. )
  64. func apiURL() *url.URL {
  65. host, port := "appengine.googleapis.internal", "10001"
  66. if h := os.Getenv("API_HOST"); h != "" {
  67. host = h
  68. }
  69. if p := os.Getenv("API_PORT"); p != "" {
  70. port = p
  71. }
  72. return &url.URL{
  73. Scheme: "http",
  74. Host: host + ":" + port,
  75. Path: apiPath,
  76. }
  77. }
  78. func handleHTTP(w http.ResponseWriter, r *http.Request) {
  79. c := &context{
  80. req: r,
  81. outHeader: w.Header(),
  82. apiURL: apiURL(),
  83. }
  84. r = r.WithContext(withContext(r.Context(), c))
  85. c.req = r
  86. stopFlushing := make(chan int)
  87. // Patch up RemoteAddr so it looks reasonable.
  88. if addr := r.Header.Get(userIPHeader); addr != "" {
  89. r.RemoteAddr = addr
  90. } else if addr = r.Header.Get(remoteAddrHeader); addr != "" {
  91. r.RemoteAddr = addr
  92. } else {
  93. // Should not normally reach here, but pick a sensible default anyway.
  94. r.RemoteAddr = "127.0.0.1"
  95. }
  96. // The address in the headers will most likely be of these forms:
  97. // 123.123.123.123
  98. // 2001:db8::1
  99. // net/http.Request.RemoteAddr is specified to be in "IP:port" form.
  100. if _, _, err := net.SplitHostPort(r.RemoteAddr); err != nil {
  101. // Assume the remote address is only a host; add a default port.
  102. r.RemoteAddr = net.JoinHostPort(r.RemoteAddr, "80")
  103. }
  104. // Start goroutine responsible for flushing app logs.
  105. // This is done after adding c to ctx.m (and stopped before removing it)
  106. // because flushing logs requires making an API call.
  107. go c.logFlusher(stopFlushing)
  108. executeRequestSafely(c, r)
  109. c.outHeader = nil // make sure header changes aren't respected any more
  110. stopFlushing <- 1 // any logging beyond this point will be dropped
  111. // Flush any pending logs asynchronously.
  112. c.pendingLogs.Lock()
  113. flushes := c.pendingLogs.flushes
  114. if len(c.pendingLogs.lines) > 0 {
  115. flushes++
  116. }
  117. c.pendingLogs.Unlock()
  118. flushed := make(chan struct{})
  119. go func() {
  120. defer close(flushed)
  121. // Force a log flush, because with very short requests we
  122. // may not ever flush logs.
  123. c.flushLog(true)
  124. }()
  125. w.Header().Set(logFlushHeader, strconv.Itoa(flushes))
  126. // Avoid nil Write call if c.Write is never called.
  127. if c.outCode != 0 {
  128. w.WriteHeader(c.outCode)
  129. }
  130. if c.outBody != nil {
  131. w.Write(c.outBody)
  132. }
  133. // Wait for the last flush to complete before returning,
  134. // otherwise the security ticket will not be valid.
  135. <-flushed
  136. }
  137. func executeRequestSafely(c *context, r *http.Request) {
  138. defer func() {
  139. if x := recover(); x != nil {
  140. logf(c, 4, "%s", renderPanic(x)) // 4 == critical
  141. c.outCode = 500
  142. }
  143. }()
  144. http.DefaultServeMux.ServeHTTP(c, r)
  145. }
  146. func renderPanic(x interface{}) string {
  147. buf := make([]byte, 16<<10) // 16 KB should be plenty
  148. buf = buf[:runtime.Stack(buf, false)]
  149. // Remove the first few stack frames:
  150. // this func
  151. // the recover closure in the caller
  152. // That will root the stack trace at the site of the panic.
  153. const (
  154. skipStart = "internal.renderPanic"
  155. skipFrames = 2
  156. )
  157. start := bytes.Index(buf, []byte(skipStart))
  158. p := start
  159. for i := 0; i < skipFrames*2 && p+1 < len(buf); i++ {
  160. p = bytes.IndexByte(buf[p+1:], '\n') + p + 1
  161. if p < 0 {
  162. break
  163. }
  164. }
  165. if p >= 0 {
  166. // buf[start:p+1] is the block to remove.
  167. // Copy buf[p+1:] over buf[start:] and shrink buf.
  168. copy(buf[start:], buf[p+1:])
  169. buf = buf[:len(buf)-(p+1-start)]
  170. }
  171. // Add panic heading.
  172. head := fmt.Sprintf("panic: %v\n\n", x)
  173. if len(head) > len(buf) {
  174. // Extremely unlikely to happen.
  175. return head
  176. }
  177. copy(buf[len(head):], buf)
  178. copy(buf, head)
  179. return string(buf)
  180. }
  181. // context represents the context of an in-flight HTTP request.
  182. // It implements the appengine.Context and http.ResponseWriter interfaces.
  183. type context struct {
  184. req *http.Request
  185. outCode int
  186. outHeader http.Header
  187. outBody []byte
  188. pendingLogs struct {
  189. sync.Mutex
  190. lines []*logpb.UserAppLogLine
  191. flushes int
  192. }
  193. apiURL *url.URL
  194. }
  195. var contextKey = "holds a *context"
  196. // jointContext joins two contexts in a superficial way.
  197. // It takes values and timeouts from a base context, and only values from another context.
  198. type jointContext struct {
  199. base netcontext.Context
  200. valuesOnly netcontext.Context
  201. }
  202. func (c jointContext) Deadline() (time.Time, bool) {
  203. return c.base.Deadline()
  204. }
  205. func (c jointContext) Done() <-chan struct{} {
  206. return c.base.Done()
  207. }
  208. func (c jointContext) Err() error {
  209. return c.base.Err()
  210. }
  211. func (c jointContext) Value(key interface{}) interface{} {
  212. if val := c.base.Value(key); val != nil {
  213. return val
  214. }
  215. return c.valuesOnly.Value(key)
  216. }
  217. // fromContext returns the App Engine context or nil if ctx is not
  218. // derived from an App Engine context.
  219. func fromContext(ctx netcontext.Context) *context {
  220. c, _ := ctx.Value(&contextKey).(*context)
  221. return c
  222. }
  223. func withContext(parent netcontext.Context, c *context) netcontext.Context {
  224. ctx := netcontext.WithValue(parent, &contextKey, c)
  225. if ns := c.req.Header.Get(curNamespaceHeader); ns != "" {
  226. ctx = withNamespace(ctx, ns)
  227. }
  228. return ctx
  229. }
  230. func toContext(c *context) netcontext.Context {
  231. return withContext(netcontext.Background(), c)
  232. }
  233. func IncomingHeaders(ctx netcontext.Context) http.Header {
  234. if c := fromContext(ctx); c != nil {
  235. return c.req.Header
  236. }
  237. return nil
  238. }
  239. func ReqContext(req *http.Request) netcontext.Context {
  240. return req.Context()
  241. }
  242. func WithContext(parent netcontext.Context, req *http.Request) netcontext.Context {
  243. return jointContext{
  244. base: parent,
  245. valuesOnly: req.Context(),
  246. }
  247. }
  248. // DefaultTicket returns a ticket used for background context or dev_appserver.
  249. func DefaultTicket() string {
  250. defaultTicketOnce.Do(func() {
  251. if IsDevAppServer() {
  252. defaultTicket = "testapp" + defaultTicketSuffix
  253. return
  254. }
  255. appID := partitionlessAppID()
  256. escAppID := strings.Replace(strings.Replace(appID, ":", "_", -1), ".", "_", -1)
  257. majVersion := VersionID(nil)
  258. if i := strings.Index(majVersion, "."); i > 0 {
  259. majVersion = majVersion[:i]
  260. }
  261. defaultTicket = fmt.Sprintf("%s/%s.%s.%s", escAppID, ModuleName(nil), majVersion, InstanceID())
  262. })
  263. return defaultTicket
  264. }
  265. func BackgroundContext() netcontext.Context {
  266. backgroundContextOnce.Do(func() {
  267. // Compute background security ticket.
  268. ticket := DefaultTicket()
  269. c := &context{
  270. req: &http.Request{
  271. Header: http.Header{
  272. ticketHeader: []string{ticket},
  273. },
  274. },
  275. apiURL: apiURL(),
  276. }
  277. backgroundContext = toContext(c)
  278. // TODO(dsymonds): Wire up the shutdown handler to do a final flush.
  279. go c.logFlusher(make(chan int))
  280. })
  281. return backgroundContext
  282. }
  283. // RegisterTestRequest registers the HTTP request req for testing, such that
  284. // any API calls are sent to the provided URL. It returns a closure to delete
  285. // the registration.
  286. // It should only be used by aetest package.
  287. func RegisterTestRequest(req *http.Request, apiURL *url.URL, decorate func(netcontext.Context) netcontext.Context) (*http.Request, func()) {
  288. c := &context{
  289. req: req,
  290. apiURL: apiURL,
  291. }
  292. ctx := withContext(decorate(req.Context()), c)
  293. req = req.WithContext(ctx)
  294. c.req = req
  295. return req, func() {}
  296. }
  297. var errTimeout = &CallError{
  298. Detail: "Deadline exceeded",
  299. Code: int32(remotepb.RpcError_CANCELLED),
  300. Timeout: true,
  301. }
  302. func (c *context) Header() http.Header { return c.outHeader }
  303. // Copied from $GOROOT/src/pkg/net/http/transfer.go. Some response status
  304. // codes do not permit a response body (nor response entity headers such as
  305. // Content-Length, Content-Type, etc).
  306. func bodyAllowedForStatus(status int) bool {
  307. switch {
  308. case status >= 100 && status <= 199:
  309. return false
  310. case status == 204:
  311. return false
  312. case status == 304:
  313. return false
  314. }
  315. return true
  316. }
  317. func (c *context) Write(b []byte) (int, error) {
  318. if c.outCode == 0 {
  319. c.WriteHeader(http.StatusOK)
  320. }
  321. if len(b) > 0 && !bodyAllowedForStatus(c.outCode) {
  322. return 0, http.ErrBodyNotAllowed
  323. }
  324. c.outBody = append(c.outBody, b...)
  325. return len(b), nil
  326. }
  327. func (c *context) WriteHeader(code int) {
  328. if c.outCode != 0 {
  329. logf(c, 3, "WriteHeader called multiple times on request.") // error level
  330. return
  331. }
  332. c.outCode = code
  333. }
  334. func (c *context) post(body []byte, timeout time.Duration) (b []byte, err error) {
  335. hreq := &http.Request{
  336. Method: "POST",
  337. URL: c.apiURL,
  338. Header: http.Header{
  339. apiEndpointHeader: apiEndpointHeaderValue,
  340. apiMethodHeader: apiMethodHeaderValue,
  341. apiContentType: apiContentTypeValue,
  342. apiDeadlineHeader: []string{strconv.FormatFloat(timeout.Seconds(), 'f', -1, 64)},
  343. },
  344. Body: ioutil.NopCloser(bytes.NewReader(body)),
  345. ContentLength: int64(len(body)),
  346. Host: c.apiURL.Host,
  347. }
  348. if info := c.req.Header.Get(dapperHeader); info != "" {
  349. hreq.Header.Set(dapperHeader, info)
  350. }
  351. if info := c.req.Header.Get(traceHeader); info != "" {
  352. hreq.Header.Set(traceHeader, info)
  353. }
  354. tr := apiHTTPClient.Transport.(*http.Transport)
  355. var timedOut int32 // atomic; set to 1 if timed out
  356. t := time.AfterFunc(timeout, func() {
  357. atomic.StoreInt32(&timedOut, 1)
  358. tr.CancelRequest(hreq)
  359. })
  360. defer t.Stop()
  361. defer func() {
  362. // Check if timeout was exceeded.
  363. if atomic.LoadInt32(&timedOut) != 0 {
  364. err = errTimeout
  365. }
  366. }()
  367. hresp, err := apiHTTPClient.Do(hreq)
  368. if err != nil {
  369. return nil, &CallError{
  370. Detail: fmt.Sprintf("service bridge HTTP failed: %v", err),
  371. Code: int32(remotepb.RpcError_UNKNOWN),
  372. }
  373. }
  374. defer hresp.Body.Close()
  375. hrespBody, err := ioutil.ReadAll(hresp.Body)
  376. if hresp.StatusCode != 200 {
  377. return nil, &CallError{
  378. Detail: fmt.Sprintf("service bridge returned HTTP %d (%q)", hresp.StatusCode, hrespBody),
  379. Code: int32(remotepb.RpcError_UNKNOWN),
  380. }
  381. }
  382. if err != nil {
  383. return nil, &CallError{
  384. Detail: fmt.Sprintf("service bridge response bad: %v", err),
  385. Code: int32(remotepb.RpcError_UNKNOWN),
  386. }
  387. }
  388. return hrespBody, nil
  389. }
  390. func Call(ctx netcontext.Context, service, method string, in, out proto.Message) error {
  391. if ns := NamespaceFromContext(ctx); ns != "" {
  392. if fn, ok := NamespaceMods[service]; ok {
  393. fn(in, ns)
  394. }
  395. }
  396. if f, ctx, ok := callOverrideFromContext(ctx); ok {
  397. return f(ctx, service, method, in, out)
  398. }
  399. // Handle already-done contexts quickly.
  400. select {
  401. case <-ctx.Done():
  402. return ctx.Err()
  403. default:
  404. }
  405. c := fromContext(ctx)
  406. if c == nil {
  407. // Give a good error message rather than a panic lower down.
  408. return errNotAppEngineContext
  409. }
  410. // Apply transaction modifications if we're in a transaction.
  411. if t := transactionFromContext(ctx); t != nil {
  412. if t.finished {
  413. return errors.New("transaction context has expired")
  414. }
  415. applyTransaction(in, &t.transaction)
  416. }
  417. // Default RPC timeout is 60s.
  418. timeout := 60 * time.Second
  419. if deadline, ok := ctx.Deadline(); ok {
  420. timeout = deadline.Sub(time.Now())
  421. }
  422. data, err := proto.Marshal(in)
  423. if err != nil {
  424. return err
  425. }
  426. ticket := c.req.Header.Get(ticketHeader)
  427. // Use a test ticket under test environment.
  428. if ticket == "" {
  429. if appid := ctx.Value(&appIDOverrideKey); appid != nil {
  430. ticket = appid.(string) + defaultTicketSuffix
  431. }
  432. }
  433. // Fall back to use background ticket when the request ticket is not available in Flex or dev_appserver.
  434. if ticket == "" {
  435. ticket = DefaultTicket()
  436. }
  437. if dri := c.req.Header.Get(devRequestIdHeader); IsDevAppServer() && dri != "" {
  438. ticket = dri
  439. }
  440. req := &remotepb.Request{
  441. ServiceName: &service,
  442. Method: &method,
  443. Request: data,
  444. RequestId: &ticket,
  445. }
  446. hreqBody, err := proto.Marshal(req)
  447. if err != nil {
  448. return err
  449. }
  450. hrespBody, err := c.post(hreqBody, timeout)
  451. if err != nil {
  452. return err
  453. }
  454. res := &remotepb.Response{}
  455. if err := proto.Unmarshal(hrespBody, res); err != nil {
  456. return err
  457. }
  458. if res.RpcError != nil {
  459. ce := &CallError{
  460. Detail: res.RpcError.GetDetail(),
  461. Code: *res.RpcError.Code,
  462. }
  463. switch remotepb.RpcError_ErrorCode(ce.Code) {
  464. case remotepb.RpcError_CANCELLED, remotepb.RpcError_DEADLINE_EXCEEDED:
  465. ce.Timeout = true
  466. }
  467. return ce
  468. }
  469. if res.ApplicationError != nil {
  470. return &APIError{
  471. Service: *req.ServiceName,
  472. Detail: res.ApplicationError.GetDetail(),
  473. Code: *res.ApplicationError.Code,
  474. }
  475. }
  476. if res.Exception != nil || res.JavaException != nil {
  477. // This shouldn't happen, but let's be defensive.
  478. return &CallError{
  479. Detail: "service bridge returned exception",
  480. Code: int32(remotepb.RpcError_UNKNOWN),
  481. }
  482. }
  483. return proto.Unmarshal(res.Response, out)
  484. }
  485. func (c *context) Request() *http.Request {
  486. return c.req
  487. }
  488. func (c *context) addLogLine(ll *logpb.UserAppLogLine) {
  489. // Truncate long log lines.
  490. // TODO(dsymonds): Check if this is still necessary.
  491. const lim = 8 << 10
  492. if len(*ll.Message) > lim {
  493. suffix := fmt.Sprintf("...(length %d)", len(*ll.Message))
  494. ll.Message = proto.String((*ll.Message)[:lim-len(suffix)] + suffix)
  495. }
  496. c.pendingLogs.Lock()
  497. c.pendingLogs.lines = append(c.pendingLogs.lines, ll)
  498. c.pendingLogs.Unlock()
  499. }
  500. var logLevelName = map[int64]string{
  501. 0: "DEBUG",
  502. 1: "INFO",
  503. 2: "WARNING",
  504. 3: "ERROR",
  505. 4: "CRITICAL",
  506. }
  507. func logf(c *context, level int64, format string, args ...interface{}) {
  508. if c == nil {
  509. panic("not an App Engine context")
  510. }
  511. s := fmt.Sprintf(format, args...)
  512. s = strings.TrimRight(s, "\n") // Remove any trailing newline characters.
  513. c.addLogLine(&logpb.UserAppLogLine{
  514. TimestampUsec: proto.Int64(time.Now().UnixNano() / 1e3),
  515. Level: &level,
  516. Message: &s,
  517. })
  518. // Only duplicate log to stderr if not running on App Engine second generation
  519. if !IsSecondGen() {
  520. log.Print(logLevelName[level] + ": " + s)
  521. }
  522. }
  523. // flushLog attempts to flush any pending logs to the appserver.
  524. // It should not be called concurrently.
  525. func (c *context) flushLog(force bool) (flushed bool) {
  526. c.pendingLogs.Lock()
  527. // Grab up to 30 MB. We can get away with up to 32 MB, but let's be cautious.
  528. n, rem := 0, 30<<20
  529. for ; n < len(c.pendingLogs.lines); n++ {
  530. ll := c.pendingLogs.lines[n]
  531. // Each log line will require about 3 bytes of overhead.
  532. nb := proto.Size(ll) + 3
  533. if nb > rem {
  534. break
  535. }
  536. rem -= nb
  537. }
  538. lines := c.pendingLogs.lines[:n]
  539. c.pendingLogs.lines = c.pendingLogs.lines[n:]
  540. c.pendingLogs.Unlock()
  541. if len(lines) == 0 && !force {
  542. // Nothing to flush.
  543. return false
  544. }
  545. rescueLogs := false
  546. defer func() {
  547. if rescueLogs {
  548. c.pendingLogs.Lock()
  549. c.pendingLogs.lines = append(lines, c.pendingLogs.lines...)
  550. c.pendingLogs.Unlock()
  551. }
  552. }()
  553. buf, err := proto.Marshal(&logpb.UserAppLogGroup{
  554. LogLine: lines,
  555. })
  556. if err != nil {
  557. log.Printf("internal.flushLog: marshaling UserAppLogGroup: %v", err)
  558. rescueLogs = true
  559. return false
  560. }
  561. req := &logpb.FlushRequest{
  562. Logs: buf,
  563. }
  564. res := &basepb.VoidProto{}
  565. c.pendingLogs.Lock()
  566. c.pendingLogs.flushes++
  567. c.pendingLogs.Unlock()
  568. if err := Call(toContext(c), "logservice", "Flush", req, res); err != nil {
  569. log.Printf("internal.flushLog: Flush RPC: %v", err)
  570. rescueLogs = true
  571. return false
  572. }
  573. return true
  574. }
  575. const (
  576. // Log flushing parameters.
  577. flushInterval = 1 * time.Second
  578. forceFlushInterval = 60 * time.Second
  579. )
  580. func (c *context) logFlusher(stop <-chan int) {
  581. lastFlush := time.Now()
  582. tick := time.NewTicker(flushInterval)
  583. for {
  584. select {
  585. case <-stop:
  586. // Request finished.
  587. tick.Stop()
  588. return
  589. case <-tick.C:
  590. force := time.Now().Sub(lastFlush) > forceFlushInterval
  591. if c.flushLog(force) {
  592. lastFlush = time.Now()
  593. }
  594. }
  595. }
  596. }
  597. func ContextForTesting(req *http.Request) netcontext.Context {
  598. return toContext(&context{req: req})
  599. }