query.js 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366
  1. 'use strict';
  2. const process = require('process');
  3. const Timers = require('timers');
  4. const Readable = require('stream').Readable;
  5. const Command = require('./command.js');
  6. const Packets = require('../packets/index.js');
  7. const getTextParser = require('../parsers/text_parser.js');
  8. const staticParser = require('../parsers/static_text_parser.js');
  9. const ServerStatus = require('../constants/server_status.js');
  10. const EmptyPacket = new Packets.Packet(0, Buffer.allocUnsafe(4), 0, 4);
  11. // http://dev.mysql.com/doc/internals/en/com-query.html
  12. class Query extends Command {
  13. constructor(options, callback) {
  14. super();
  15. this.sql = options.sql;
  16. this.values = options.values;
  17. this._queryOptions = options;
  18. this.namedPlaceholders = options.namedPlaceholders || false;
  19. this.onResult = callback;
  20. this.timeout = options.timeout;
  21. this.queryTimeout = null;
  22. this._fieldCount = 0;
  23. this._rowParser = null;
  24. this._fields = [];
  25. this._rows = [];
  26. this._receivedFieldsCount = 0;
  27. this._resultIndex = 0;
  28. this._localStream = null;
  29. this._unpipeStream = function () {};
  30. this._streamFactory = options.infileStreamFactory;
  31. this._connection = null;
  32. }
  33. then() {
  34. const err =
  35. "You have tried to call .then(), .catch(), or invoked await on the result of query that is not a promise, which is a programming error. Try calling con.promise().query(), or require('mysql2/promise') instead of 'mysql2' for a promise-compatible version of the query interface. To learn how to use async/await or Promises check out documentation at https://sidorares.github.io/node-mysql2/docs#using-promise-wrapper, or the mysql2 documentation at https://sidorares.github.io/node-mysql2/docs/documentation/promise-wrapper";
  36. // eslint-disable-next-line
  37. console.log(err);
  38. throw new Error(err);
  39. }
  40. /* eslint no-unused-vars: ["error", { "argsIgnorePattern": "^_" }] */
  41. start(_packet, connection) {
  42. if (connection.config.debug) {
  43. // eslint-disable-next-line
  44. console.log(' Sending query command: %s', this.sql);
  45. }
  46. this._connection = connection;
  47. this.options = Object.assign({}, connection.config, this._queryOptions);
  48. this._setTimeout();
  49. const cmdPacket = new Packets.Query(
  50. this.sql,
  51. connection.config.charsetNumber
  52. );
  53. connection.writePacket(cmdPacket.toPacket(1));
  54. return Query.prototype.resultsetHeader;
  55. }
  56. done() {
  57. this._unpipeStream();
  58. // if all ready timeout, return null directly
  59. if (this.timeout && !this.queryTimeout) {
  60. return null;
  61. }
  62. // else clear timer
  63. if (this.queryTimeout) {
  64. Timers.clearTimeout(this.queryTimeout);
  65. this.queryTimeout = null;
  66. }
  67. if (this.onResult) {
  68. let rows, fields;
  69. if (this._resultIndex === 0) {
  70. rows = this._rows[0];
  71. fields = this._fields[0];
  72. } else {
  73. rows = this._rows;
  74. fields = this._fields;
  75. }
  76. if (fields) {
  77. process.nextTick(() => {
  78. this.onResult(null, rows, fields);
  79. });
  80. } else {
  81. process.nextTick(() => {
  82. this.onResult(null, rows);
  83. });
  84. }
  85. }
  86. return null;
  87. }
  88. doneInsert(rs) {
  89. if (this._localStreamError) {
  90. if (this.onResult) {
  91. this.onResult(this._localStreamError, rs);
  92. } else {
  93. this.emit('error', this._localStreamError);
  94. }
  95. return null;
  96. }
  97. this._rows.push(rs);
  98. this._fields.push(void 0);
  99. this.emit('fields', void 0);
  100. this.emit('result', rs);
  101. if (rs.serverStatus & ServerStatus.SERVER_MORE_RESULTS_EXISTS) {
  102. this._resultIndex++;
  103. return this.resultsetHeader;
  104. }
  105. return this.done();
  106. }
  107. resultsetHeader(packet, connection) {
  108. const rs = new Packets.ResultSetHeader(packet, connection);
  109. this._fieldCount = rs.fieldCount;
  110. if (connection.config.debug) {
  111. // eslint-disable-next-line
  112. console.log(
  113. ` Resultset header received, expecting ${rs.fieldCount} column definition packets`
  114. );
  115. }
  116. if (this._fieldCount === 0) {
  117. return this.doneInsert(rs);
  118. }
  119. if (this._fieldCount === null) {
  120. return this._streamLocalInfile(connection, rs.infileName);
  121. }
  122. this._receivedFieldsCount = 0;
  123. this._rows.push([]);
  124. this._fields.push([]);
  125. return this.readField;
  126. }
  127. _streamLocalInfile(connection, path) {
  128. if (this._streamFactory) {
  129. this._localStream = this._streamFactory(path);
  130. } else {
  131. this._localStreamError = new Error(
  132. `As a result of LOCAL INFILE command server wants to read ${path} file, but as of v2.0 you must provide streamFactory option returning ReadStream.`
  133. );
  134. connection.writePacket(EmptyPacket);
  135. return this.infileOk;
  136. }
  137. const onConnectionError = () => {
  138. this._unpipeStream();
  139. };
  140. const onDrain = () => {
  141. this._localStream.resume();
  142. };
  143. const onPause = () => {
  144. this._localStream.pause();
  145. };
  146. const onData = function (data) {
  147. const dataWithHeader = Buffer.allocUnsafe(data.length + 4);
  148. data.copy(dataWithHeader, 4);
  149. connection.writePacket(
  150. new Packets.Packet(0, dataWithHeader, 0, dataWithHeader.length)
  151. );
  152. };
  153. const onEnd = () => {
  154. connection.removeListener('error', onConnectionError);
  155. connection.writePacket(EmptyPacket);
  156. };
  157. const onError = (err) => {
  158. this._localStreamError = err;
  159. connection.removeListener('error', onConnectionError);
  160. connection.writePacket(EmptyPacket);
  161. };
  162. this._unpipeStream = () => {
  163. connection.stream.removeListener('pause', onPause);
  164. connection.stream.removeListener('drain', onDrain);
  165. this._localStream.removeListener('data', onData);
  166. this._localStream.removeListener('end', onEnd);
  167. this._localStream.removeListener('error', onError);
  168. };
  169. connection.stream.on('pause', onPause);
  170. connection.stream.on('drain', onDrain);
  171. this._localStream.on('data', onData);
  172. this._localStream.on('end', onEnd);
  173. this._localStream.on('error', onError);
  174. connection.once('error', onConnectionError);
  175. return this.infileOk;
  176. }
  177. readField(packet, connection) {
  178. this._receivedFieldsCount++;
  179. // Often there is much more data in the column definition than in the row itself
  180. // If you set manually _fields[0] to array of ColumnDefinition's (from previous call)
  181. // you can 'cache' result of parsing. Field packets still received, but ignored in that case
  182. // this is the reason _receivedFieldsCount exist (otherwise we could just use current length of fields array)
  183. if (this._fields[this._resultIndex].length !== this._fieldCount) {
  184. const field = new Packets.ColumnDefinition(
  185. packet,
  186. connection.clientEncoding
  187. );
  188. this._fields[this._resultIndex].push(field);
  189. if (connection.config.debug) {
  190. /* eslint-disable no-console */
  191. console.log(' Column definition:');
  192. console.log(` name: ${field.name}`);
  193. console.log(` type: ${field.columnType}`);
  194. console.log(` flags: ${field.flags}`);
  195. /* eslint-enable no-console */
  196. }
  197. }
  198. // last field received
  199. if (this._receivedFieldsCount === this._fieldCount) {
  200. const fields = this._fields[this._resultIndex];
  201. this.emit('fields', fields);
  202. if (this.options.disableEval) {
  203. this._rowParser = staticParser(fields, this.options, connection.config);
  204. } else {
  205. this._rowParser = new (getTextParser(
  206. fields,
  207. this.options,
  208. connection.config
  209. ))(fields);
  210. }
  211. return Query.prototype.fieldsEOF;
  212. }
  213. return Query.prototype.readField;
  214. }
  215. fieldsEOF(packet, connection) {
  216. // check EOF
  217. if (!packet.isEOF()) {
  218. return connection.protocolError('Expected EOF packet');
  219. }
  220. return this.row;
  221. }
  222. /* eslint no-unused-vars: ["error", { "argsIgnorePattern": "^_" }] */
  223. row(packet, _connection) {
  224. if (packet.isEOF()) {
  225. const status = packet.eofStatusFlags();
  226. const moreResults = status & ServerStatus.SERVER_MORE_RESULTS_EXISTS;
  227. if (moreResults) {
  228. this._resultIndex++;
  229. return Query.prototype.resultsetHeader;
  230. }
  231. return this.done();
  232. }
  233. let row;
  234. try {
  235. row = this._rowParser.next(
  236. packet,
  237. this._fields[this._resultIndex],
  238. this.options
  239. );
  240. } catch (err) {
  241. this._localStreamError = err;
  242. return this.doneInsert(null);
  243. }
  244. if (this.onResult) {
  245. this._rows[this._resultIndex].push(row);
  246. } else {
  247. this.emit('result', row, this._resultIndex);
  248. }
  249. return Query.prototype.row;
  250. }
  251. infileOk(packet, connection) {
  252. const rs = new Packets.ResultSetHeader(packet, connection);
  253. return this.doneInsert(rs);
  254. }
  255. stream(options) {
  256. options = options || Object.create(null);
  257. options.objectMode = true;
  258. const stream = new Readable({
  259. ...options,
  260. emitClose: true,
  261. autoDestroy: true,
  262. read: () => {
  263. this._connection && this._connection.resume();
  264. },
  265. });
  266. // Prevent a breaking change for users that rely on `end` event
  267. stream.once('close', () => {
  268. if (!stream.readableEnded) {
  269. stream.emit('end');
  270. }
  271. });
  272. const onResult = (row, index) => {
  273. if (stream.destroyed) return;
  274. if (!stream.push(row)) {
  275. this._connection && this._connection.pause();
  276. }
  277. stream.emit('result', row, index); // replicate old emitter
  278. };
  279. const onFields = (fields) => {
  280. if (stream.destroyed) return;
  281. stream.emit('fields', fields); // replicate old emitter
  282. };
  283. const onEnd = () => {
  284. if (stream.destroyed) return;
  285. stream.push(null); // pushing null, indicating EOF
  286. };
  287. const onError = (err) => {
  288. stream.destroy(err);
  289. };
  290. stream._destroy = (err, cb) => {
  291. this._connection && this._connection.resume();
  292. this.removeListener('result', onResult);
  293. this.removeListener('fields', onFields);
  294. this.removeListener('end', onEnd);
  295. this.removeListener('error', onError);
  296. cb(err); // Pass on any errors
  297. };
  298. this.on('result', onResult);
  299. this.on('fields', onFields);
  300. this.on('end', onEnd);
  301. this.on('error', onError);
  302. return stream;
  303. }
  304. _setTimeout() {
  305. if (this.timeout) {
  306. const timeoutHandler = this._handleTimeoutError.bind(this);
  307. this.queryTimeout = Timers.setTimeout(timeoutHandler, this.timeout);
  308. }
  309. }
  310. _handleTimeoutError() {
  311. if (this.queryTimeout) {
  312. Timers.clearTimeout(this.queryTimeout);
  313. this.queryTimeout = null;
  314. }
  315. const err = new Error('Query inactivity timeout');
  316. err.errorno = 'PROTOCOL_SEQUENCE_TIMEOUT';
  317. err.code = 'PROTOCOL_SEQUENCE_TIMEOUT';
  318. err.syscall = 'query';
  319. if (this.onResult) {
  320. this.onResult(err);
  321. } else {
  322. this.emit('error', err);
  323. }
  324. }
  325. }
  326. Query.prototype.catch = Query.prototype.then;
  327. module.exports = Query;