websocket-provider.ts 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350
  1. "use strict";
  2. import { BigNumber } from "@ethersproject/bignumber";
  3. import { Network, Networkish } from "@ethersproject/networks";
  4. import { defineReadOnly } from "@ethersproject/properties";
  5. import { Event } from "./base-provider";
  6. import { JsonRpcProvider } from "./json-rpc-provider";
  7. import { WebSocket } from "./ws";
  8. import { Logger } from "@ethersproject/logger";
  9. import { version } from "./_version";
  10. const logger = new Logger(version);
  11. /**
  12. * Notes:
  13. *
  14. * This provider differs a bit from the polling providers. One main
  15. * difference is how it handles consistency. The polling providers
  16. * will stall responses to ensure a consistent state, while this
  17. * WebSocket provider assumes the connected backend will manage this.
  18. *
  19. * For example, if a polling provider emits an event which indicates
  20. * the event occurred in blockhash XXX, a call to fetch that block by
  21. * its hash XXX, if not present will retry until it is present. This
  22. * can occur when querying a pool of nodes that are mildly out of sync
  23. * with each other.
  24. */
  25. let NextId = 1;
  26. export type InflightRequest = {
  27. callback: (error: Error, result: any) => void;
  28. payload: string;
  29. };
  30. export type Subscription = {
  31. tag: string;
  32. processFunc: (payload: any) => void;
  33. };
  34. export interface WebSocketLike {
  35. onopen: (...args: Array<any>) => any;
  36. onmessage: (...args: Array<any>) => any;
  37. onerror: (...args: Array<any>) => any;
  38. readyState: number;
  39. send(payload: any): void;
  40. close(code?: number, reason?: string): void;
  41. }
  42. // For more info about the Real-time Event API see:
  43. // https://geth.ethereum.org/docs/rpc/pubsub
  44. export class WebSocketProvider extends JsonRpcProvider {
  45. readonly _websocket: any;
  46. readonly _requests: { [ name: string ]: InflightRequest };
  47. readonly _detectNetwork: Promise<Network>;
  48. // Maps event tag to subscription ID (we dedupe identical events)
  49. readonly _subIds: { [ tag: string ]: Promise<string> };
  50. // Maps Subscription ID to Subscription
  51. readonly _subs: { [ name: string ]: Subscription };
  52. _wsReady: boolean;
  53. constructor(url: string | WebSocketLike, network?: Networkish) {
  54. // This will be added in the future; please open an issue to expedite
  55. if (network === "any") {
  56. logger.throwError("WebSocketProvider does not support 'any' network yet", Logger.errors.UNSUPPORTED_OPERATION, {
  57. operation: "network:any"
  58. });
  59. }
  60. if (typeof(url) === "string") {
  61. super(url, network);
  62. } else {
  63. super("_websocket", network);
  64. }
  65. this._pollingInterval = -1;
  66. this._wsReady = false;
  67. if (typeof(url) === "string") {
  68. defineReadOnly(this, "_websocket", new WebSocket(this.connection.url));
  69. } else {
  70. defineReadOnly(this, "_websocket", url);
  71. }
  72. defineReadOnly(this, "_requests", { });
  73. defineReadOnly(this, "_subs", { });
  74. defineReadOnly(this, "_subIds", { });
  75. defineReadOnly(this, "_detectNetwork", super.detectNetwork());
  76. // Stall sending requests until the socket is open...
  77. this.websocket.onopen = () => {
  78. this._wsReady = true;
  79. Object.keys(this._requests).forEach((id) => {
  80. this.websocket.send(this._requests[id].payload);
  81. });
  82. };
  83. this.websocket.onmessage = (messageEvent: { data: string }) => {
  84. const data = messageEvent.data;
  85. const result = JSON.parse(data);
  86. if (result.id != null) {
  87. const id = String(result.id);
  88. const request = this._requests[id];
  89. delete this._requests[id];
  90. if (result.result !== undefined) {
  91. request.callback(null, result.result);
  92. this.emit("debug", {
  93. action: "response",
  94. request: JSON.parse(request.payload),
  95. response: result.result,
  96. provider: this
  97. });
  98. } else {
  99. let error: Error = null;
  100. if (result.error) {
  101. error = new Error(result.error.message || "unknown error");
  102. defineReadOnly(<any>error, "code", result.error.code || null);
  103. defineReadOnly(<any>error, "response", data);
  104. } else {
  105. error = new Error("unknown error");
  106. }
  107. request.callback(error, undefined);
  108. this.emit("debug", {
  109. action: "response",
  110. error: error,
  111. request: JSON.parse(request.payload),
  112. provider: this
  113. });
  114. }
  115. } else if (result.method === "eth_subscription") {
  116. // Subscription...
  117. const sub = this._subs[result.params.subscription];
  118. if (sub) {
  119. //this.emit.apply(this, );
  120. sub.processFunc(result.params.result)
  121. }
  122. } else {
  123. console.warn("this should not happen");
  124. }
  125. };
  126. // This Provider does not actually poll, but we want to trigger
  127. // poll events for things that depend on them (like stalling for
  128. // block and transaction lookups)
  129. const fauxPoll = setInterval(() => {
  130. this.emit("poll");
  131. }, 1000);
  132. if (fauxPoll.unref) { fauxPoll.unref(); }
  133. }
  134. // Cannot narrow the type of _websocket, as that is not backwards compatible
  135. // so we add a getter and let the WebSocket be a public API.
  136. get websocket(): WebSocketLike { return this._websocket; }
  137. detectNetwork(): Promise<Network> {
  138. return this._detectNetwork;
  139. }
  140. get pollingInterval(): number {
  141. return 0;
  142. }
  143. resetEventsBlock(blockNumber: number): void {
  144. logger.throwError("cannot reset events block on WebSocketProvider", Logger.errors.UNSUPPORTED_OPERATION, {
  145. operation: "resetEventBlock"
  146. });
  147. }
  148. set pollingInterval(value: number) {
  149. logger.throwError("cannot set polling interval on WebSocketProvider", Logger.errors.UNSUPPORTED_OPERATION, {
  150. operation: "setPollingInterval"
  151. });
  152. }
  153. async poll(): Promise<void> {
  154. return null;
  155. }
  156. set polling(value: boolean) {
  157. if (!value) { return; }
  158. logger.throwError("cannot set polling on WebSocketProvider", Logger.errors.UNSUPPORTED_OPERATION, {
  159. operation: "setPolling"
  160. });
  161. }
  162. send(method: string, params?: Array<any>): Promise<any> {
  163. const rid = NextId++;
  164. return new Promise((resolve, reject) => {
  165. function callback(error: Error, result: any) {
  166. if (error) { return reject(error); }
  167. return resolve(result);
  168. }
  169. const payload = JSON.stringify({
  170. method: method,
  171. params: params,
  172. id: rid,
  173. jsonrpc: "2.0"
  174. });
  175. this.emit("debug", {
  176. action: "request",
  177. request: JSON.parse(payload),
  178. provider: this
  179. });
  180. this._requests[String(rid)] = { callback, payload };
  181. if (this._wsReady) { this.websocket.send(payload); }
  182. });
  183. }
  184. static defaultUrl(): string {
  185. return "ws:/\/localhost:8546";
  186. }
  187. async _subscribe(tag: string, param: Array<any>, processFunc: (result: any) => void): Promise<void> {
  188. let subIdPromise = this._subIds[tag];
  189. if (subIdPromise == null) {
  190. subIdPromise = Promise.all(param).then((param) => {
  191. return this.send("eth_subscribe", param);
  192. });
  193. this._subIds[tag] = subIdPromise;
  194. }
  195. const subId = await subIdPromise;
  196. this._subs[subId] = { tag, processFunc };
  197. }
  198. _startEvent(event: Event): void {
  199. switch (event.type) {
  200. case "block":
  201. this._subscribe("block", [ "newHeads" ], (result: any) => {
  202. const blockNumber = BigNumber.from(result.number).toNumber();
  203. this._emitted.block = blockNumber;
  204. this.emit("block", blockNumber);
  205. });
  206. break;
  207. case "pending":
  208. this._subscribe("pending", [ "newPendingTransactions" ], (result: any) => {
  209. this.emit("pending", result);
  210. });
  211. break;
  212. case "filter":
  213. this._subscribe(event.tag, [ "logs", this._getFilter(event.filter) ], (result: any) => {
  214. if (result.removed == null) { result.removed = false; }
  215. this.emit(event.filter, this.formatter.filterLog(result));
  216. });
  217. break;
  218. case "tx": {
  219. const emitReceipt = (event: Event) => {
  220. const hash = event.hash;
  221. this.getTransactionReceipt(hash).then((receipt) => {
  222. if (!receipt) { return; }
  223. this.emit(hash, receipt);
  224. });
  225. };
  226. // In case it is already mined
  227. emitReceipt(event);
  228. // To keep things simple, we start up a single newHeads subscription
  229. // to keep an eye out for transactions we are watching for.
  230. // Starting a subscription for an event (i.e. "tx") that is already
  231. // running is (basically) a nop.
  232. this._subscribe("tx", [ "newHeads" ], (result: any) => {
  233. this._events.filter((e) => (e.type === "tx")).forEach(emitReceipt);
  234. });
  235. break;
  236. }
  237. // Nothing is needed
  238. case "debug":
  239. case "poll":
  240. case "willPoll":
  241. case "didPoll":
  242. case "error":
  243. break;
  244. default:
  245. console.log("unhandled:", event);
  246. break;
  247. }
  248. }
  249. _stopEvent(event: Event): void {
  250. let tag = event.tag;
  251. if (event.type === "tx") {
  252. // There are remaining transaction event listeners
  253. if (this._events.filter((e) => (e.type === "tx")).length) {
  254. return;
  255. }
  256. tag = "tx";
  257. } else if (this.listenerCount(event.event)) {
  258. // There are remaining event listeners
  259. return;
  260. }
  261. const subId = this._subIds[tag];
  262. if (!subId) { return; }
  263. delete this._subIds[tag];
  264. subId.then((subId) => {
  265. if (!this._subs[subId]) { return; }
  266. delete this._subs[subId];
  267. this.send("eth_unsubscribe", [ subId ]);
  268. });
  269. }
  270. async destroy(): Promise<void> {
  271. // Wait until we have connected before trying to disconnect
  272. if (this.websocket.readyState === WebSocket.CONNECTING) {
  273. await (new Promise((resolve) => {
  274. this.websocket.onopen = function() {
  275. resolve(true);
  276. };
  277. this.websocket.onerror = function() {
  278. resolve(false);
  279. };
  280. }));
  281. }
  282. // Hangup
  283. // See: https://developer.mozilla.org/en-US/docs/Web/API/CloseEvent#Status_codes
  284. this.websocket.close(1000);
  285. }
  286. }