wait-on.js 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400
  1. 'use strict';
  2. const fs = require('fs');
  3. const { promisify } = require('util');
  4. const Joi = require('joi');
  5. const https = require('https');
  6. const net = require('net');
  7. const util = require('util');
  8. const axiosPkg = require('axios').default;
  9. const { isBoolean, isEmpty, negate, noop, once, partial, pick, zip } = require('lodash/fp');
  10. const { NEVER, combineLatest, from, merge, throwError, timer } = require('rxjs');
  11. const { distinctUntilChanged, map, mergeMap, scan, startWith, take, takeWhile } = require('rxjs/operators');
  12. // force http adapter for axios, otherwise if using jest/jsdom xhr might
  13. // be used and it logs all errors polluting the logs
  14. const axios = axiosPkg.create({ adapter: 'http' });
  15. const isNotABoolean = negate(isBoolean);
  16. const isNotEmpty = negate(isEmpty);
  17. const fstat = promisify(fs.stat);
  18. const PREFIX_RE = /^((https?-get|https?|tcp|socket|file):)(.+)$/;
  19. const HOST_PORT_RE = /^(([^:]*):)?(\d+)$/;
  20. const HTTP_GET_RE = /^https?-get:/;
  21. const HTTP_UNIX_RE = /^http:\/\/unix:([^:]+):([^:]+)$/;
  22. const TIMEOUT_ERR_MSG = 'Timed out waiting for';
  23. const WAIT_ON_SCHEMA = Joi.object({
  24. resources: Joi.array().items(Joi.string().required()).required(),
  25. delay: Joi.number().integer().min(0).default(0),
  26. httpTimeout: Joi.number().integer().min(0),
  27. interval: Joi.number().integer().min(0).default(250),
  28. log: Joi.boolean().default(false),
  29. reverse: Joi.boolean().default(false),
  30. simultaneous: Joi.number().integer().min(1).default(Infinity),
  31. timeout: Joi.number().integer().min(0).default(Infinity),
  32. validateStatus: Joi.function(),
  33. verbose: Joi.boolean().default(false),
  34. window: Joi.number().integer().min(0).default(750),
  35. tcpTimeout: Joi.number().integer().min(0).default(300),
  36. // http/https options
  37. ca: [Joi.string(), Joi.binary()],
  38. cert: [Joi.string(), Joi.binary()],
  39. key: [Joi.string(), Joi.binary(), Joi.object()],
  40. passphrase: Joi.string(),
  41. proxy: [Joi.boolean(), Joi.object()],
  42. auth: Joi.object({
  43. username: Joi.string(),
  44. password: Joi.string()
  45. }),
  46. strictSSL: Joi.boolean().default(false),
  47. followRedirect: Joi.boolean().default(true), // HTTP 3XX responses
  48. headers: Joi.object()
  49. });
  50. /**
  51. Waits for resources to become available before calling callback
  52. Polls file, http(s), tcp ports, sockets for availability.
  53. Resource types are distinquished by their prefix with default being `file:`
  54. - file:/path/to/file - waits for file to be available and size to stabilize
  55. - http://foo.com:8000/bar verifies HTTP HEAD request returns 2XX
  56. - https://my.bar.com/cat verifies HTTPS HEAD request returns 2XX
  57. - http-get: - HTTP GET returns 2XX response. ex: http://m.com:90/foo
  58. - https-get: - HTTPS GET returns 2XX response. ex: https://my/bar
  59. - tcp:my.server.com:3000 verifies a service is listening on port
  60. - socket:/path/sock verifies a service is listening on (UDS) socket
  61. For http over socket, use http://unix:SOCK_PATH:URL_PATH
  62. like http://unix:/path/to/sock:/foo/bar or
  63. http-get://unix:/path/to/sock:/foo/bar
  64. @param opts object configuring waitOn
  65. @param opts.resources array of string resources to wait for. prefix determines the type of resource with the default type of `file:`
  66. @param opts.delay integer - optional initial delay in ms, default 0
  67. @param opts.httpTimeout integer - optional http HEAD/GET timeout to wait for request, default 0
  68. @param opts.interval integer - optional poll resource interval in ms, default 250ms
  69. @param opts.log boolean - optional flag to turn on logging to stdout
  70. @param opts.reverse boolean - optional flag which reverses the mode, succeeds when resources are not available
  71. @param opts.simultaneous integer - optional limit of concurrent connections to a resource, default Infinity
  72. @param opts.tcpTimeout - Maximum time in ms for tcp connect, default 300ms
  73. @param opts.timeout integer - optional timeout in ms, default Infinity. Aborts with error.
  74. @param opts.verbose boolean - optional flag to turn on debug log
  75. @param opts.window integer - optional stabilization time in ms, default 750ms. Waits this amount of time for file sizes to stabilize or other resource availability to remain unchanged. If less than interval then will be reset to interval
  76. @param cb optional callback function with signature cb(err) - if err is provided then, resource checks did not succeed
  77. if not specified, wait-on will return a promise that will be rejected if resource checks did not succeed or resolved otherwise
  78. */
  79. function waitOn(opts, cb) {
  80. if (cb !== undefined) {
  81. return waitOnImpl(opts, cb);
  82. } else {
  83. // promise API
  84. return new Promise(function (resolve, reject) {
  85. waitOnImpl(opts, function (err) {
  86. if (err) {
  87. reject(err);
  88. } else {
  89. resolve();
  90. }
  91. });
  92. });
  93. }
  94. }
  95. function waitOnImpl(opts, cbFunc) {
  96. const cbOnce = once(cbFunc);
  97. const validResult = WAIT_ON_SCHEMA.validate(opts);
  98. if (validResult.error) {
  99. return cbOnce(validResult.error);
  100. }
  101. const validatedOpts = {
  102. ...validResult.value, // use defaults
  103. // window needs to be at least interval
  104. ...(validResult.value.window < validResult.value.interval ? { window: validResult.value.interval } : {}),
  105. ...(validResult.value.verbose ? { log: true } : {}) // if debug logging then normal log is also enabled
  106. };
  107. const { resources, log: shouldLog, timeout, verbose, reverse } = validatedOpts;
  108. const output = verbose ? console.log.bind() : noop;
  109. const log = shouldLog ? console.log.bind() : noop;
  110. const logWaitingForWDeps = partial(logWaitingFor, [{ log, resources }]);
  111. const createResourceWithDeps$ = partial(createResource$, [{ validatedOpts, output, log }]);
  112. let lastResourcesState = resources; // the last state we had recorded
  113. const timeoutError$ =
  114. timeout !== Infinity
  115. ? timer(timeout).pipe(
  116. mergeMap(() => {
  117. const resourcesWaitingFor = determineRemainingResources(resources, lastResourcesState).join(', ');
  118. return throwError(Error(`${TIMEOUT_ERR_MSG}: ${resourcesWaitingFor}`));
  119. })
  120. )
  121. : NEVER;
  122. function cleanup(err) {
  123. if (err) {
  124. if (err.message.startsWith(TIMEOUT_ERR_MSG)) {
  125. log('wait-on(%s) %s; exiting with error', process.pid, err.message);
  126. } else {
  127. log('wait-on(%s) exiting with error', process.pid, err);
  128. }
  129. } else {
  130. // no error, we are complete
  131. log('wait-on(%s) complete', process.pid);
  132. }
  133. cbOnce(err);
  134. }
  135. if (reverse) {
  136. log('wait-on reverse mode - waiting for resources to be unavailable');
  137. }
  138. logWaitingForWDeps(resources);
  139. const resourcesCompleted$ = combineLatest(resources.map(createResourceWithDeps$));
  140. merge(timeoutError$, resourcesCompleted$)
  141. .pipe(takeWhile((resourceStates) => resourceStates.some((x) => !x)))
  142. .subscribe({
  143. next: (resourceStates) => {
  144. lastResourcesState = resourceStates;
  145. logWaitingForWDeps(resourceStates);
  146. },
  147. error: cleanup,
  148. complete: cleanup
  149. });
  150. }
  151. function logWaitingFor({ log, resources }, resourceStates) {
  152. const remainingResources = determineRemainingResources(resources, resourceStates);
  153. if (isNotEmpty(remainingResources)) {
  154. log(`waiting for ${remainingResources.length} resources: ${remainingResources.join(', ')}`);
  155. }
  156. }
  157. function determineRemainingResources(resources, resourceStates) {
  158. // resourcesState is array of completed booleans
  159. const resourceAndStateTuples = zip(resources, resourceStates);
  160. return resourceAndStateTuples.filter(([, /* r */ s]) => !s).map(([r /*, s */]) => r);
  161. }
  162. function createResource$(deps, resource) {
  163. const prefix = extractPrefix(resource);
  164. switch (prefix) {
  165. case 'https-get:':
  166. case 'http-get:':
  167. case 'https:':
  168. case 'http:':
  169. return createHTTP$(deps, resource);
  170. case 'tcp:':
  171. return createTCP$(deps, resource);
  172. case 'socket:':
  173. return createSocket$(deps, resource);
  174. default:
  175. return createFileResource$(deps, resource);
  176. }
  177. }
  178. function createFileResource$(
  179. { validatedOpts: { delay, interval, reverse, simultaneous, window: stabilityWindow }, output },
  180. resource
  181. ) {
  182. const filePath = extractPath(resource);
  183. const checkOperator = reverse
  184. ? map((size) => size === -1) // check that file does not exist
  185. : scan(
  186. // check that file exists and the size is stable
  187. (acc, x) => {
  188. if (x > -1) {
  189. const { size, t } = acc;
  190. const now = Date.now();
  191. if (size !== -1 && x === size) {
  192. if (now >= t + stabilityWindow) {
  193. // file size has stabilized
  194. output(` file stabilized at size:${size} file:${filePath}`);
  195. return true;
  196. }
  197. output(` file exists, checking for size change during stability window, size:${size} file:${filePath}`);
  198. return acc; // return acc unchanged, just waiting to pass stability window
  199. }
  200. output(` file exists, checking for size changes, size:${x} file:${filePath}`);
  201. return { size: x, t: now }; // update acc with new value and timestamp
  202. }
  203. return acc;
  204. },
  205. { size: -1, t: Date.now() }
  206. );
  207. return timer(delay, interval).pipe(
  208. mergeMap(() => {
  209. output(`checking file stat for file:${filePath} ...`);
  210. return from(getFileSize(filePath));
  211. }, simultaneous),
  212. checkOperator,
  213. map((x) => (isNotABoolean(x) ? false : x)),
  214. startWith(false),
  215. distinctUntilChanged(),
  216. take(2)
  217. );
  218. }
  219. function extractPath(resource) {
  220. const m = PREFIX_RE.exec(resource);
  221. if (m) {
  222. return m[3];
  223. }
  224. return resource;
  225. }
  226. function extractPrefix(resource) {
  227. const m = PREFIX_RE.exec(resource);
  228. if (m) {
  229. return m[1];
  230. }
  231. return '';
  232. }
  233. async function getFileSize(filePath) {
  234. try {
  235. const { size } = await fstat(filePath);
  236. return size;
  237. } catch (err) {
  238. return -1;
  239. }
  240. }
  241. function createHTTP$({ validatedOpts, output }, resource) {
  242. const {
  243. delay,
  244. followRedirect,
  245. httpTimeout: timeout,
  246. interval,
  247. proxy,
  248. reverse,
  249. simultaneous,
  250. strictSSL: rejectUnauthorized
  251. } = validatedOpts;
  252. const method = HTTP_GET_RE.test(resource) ? 'get' : 'head';
  253. const url = resource.replace('-get:', ':');
  254. const matchHttpUnixSocket = HTTP_UNIX_RE.exec(url); // http://unix:/sock:/url
  255. const urlSocketOptions = matchHttpUnixSocket
  256. ? { socketPath: matchHttpUnixSocket[1], url: matchHttpUnixSocket[2] }
  257. : { url };
  258. const socketPathDesc = urlSocketOptions.socketPath ? `socketPath:${urlSocketOptions.socketPath}` : '';
  259. const httpOptions = {
  260. ...pick(['auth', 'headers', 'validateStatus'], validatedOpts),
  261. httpsAgent: new https.Agent({
  262. rejectUnauthorized,
  263. ...pick(['ca', 'cert', 'key', 'passphrase'], validatedOpts)
  264. }),
  265. ...(followRedirect ? {} : { maxRedirects: 0 }), // defaults to 5 (enabled)
  266. proxy, // can be undefined, false, or object
  267. ...(timeout && { timeout }),
  268. ...urlSocketOptions,
  269. method
  270. // by default it provides full response object
  271. // validStatus is 2xx unless followRedirect is true (default)
  272. };
  273. const checkFn = reverse ? negateAsync(httpCallSucceeds) : httpCallSucceeds;
  274. return timer(delay, interval).pipe(
  275. mergeMap(() => {
  276. output(`making HTTP(S) ${method} request to ${socketPathDesc} url:${urlSocketOptions.url} ...`);
  277. return from(checkFn(output, httpOptions));
  278. }, simultaneous),
  279. startWith(false),
  280. distinctUntilChanged(),
  281. take(2)
  282. );
  283. }
  284. async function httpCallSucceeds(output, httpOptions) {
  285. try {
  286. const result = await axios(httpOptions);
  287. output(
  288. ` HTTP(S) result for ${httpOptions.url}: ${util.inspect(
  289. pick(['status', 'statusText', 'headers', 'data'], result)
  290. )}`
  291. );
  292. return true;
  293. } catch (err) {
  294. output(` HTTP(S) error for ${httpOptions.url} ${err.toString()}`);
  295. return false;
  296. }
  297. }
  298. function createTCP$({ validatedOpts: { delay, interval, tcpTimeout, reverse, simultaneous }, output }, resource) {
  299. const tcpPath = extractPath(resource);
  300. const checkFn = reverse ? negateAsync(tcpExists) : tcpExists;
  301. return timer(delay, interval).pipe(
  302. mergeMap(() => {
  303. output(`making TCP connection to ${tcpPath} ...`);
  304. return from(checkFn(output, tcpPath, tcpTimeout));
  305. }, simultaneous),
  306. startWith(false),
  307. distinctUntilChanged(),
  308. take(2)
  309. );
  310. }
  311. async function tcpExists(output, tcpPath, tcpTimeout) {
  312. const [, , /* full, hostWithColon */ hostMatched, port] = HOST_PORT_RE.exec(tcpPath);
  313. const host = hostMatched || 'localhost';
  314. return new Promise((resolve) => {
  315. const conn = net
  316. .connect(port, host)
  317. .on('error', (err) => {
  318. output(` error connecting to TCP host:${host} port:${port} ${err.toString()}`);
  319. resolve(false);
  320. })
  321. .on('timeout', () => {
  322. output(` timed out connecting to TCP host:${host} port:${port} tcpTimeout:${tcpTimeout}ms`);
  323. conn.end();
  324. resolve(false);
  325. })
  326. .on('connect', () => {
  327. output(` TCP connection successful to host:${host} port:${port}`);
  328. conn.end();
  329. resolve(true);
  330. });
  331. conn.setTimeout(tcpTimeout);
  332. });
  333. }
  334. function createSocket$({ validatedOpts: { delay, interval, reverse, simultaneous }, output }, resource) {
  335. const socketPath = extractPath(resource);
  336. const checkFn = reverse ? negateAsync(socketExists) : socketExists;
  337. return timer(delay, interval).pipe(
  338. mergeMap(() => {
  339. output(`making socket connection to ${socketPath} ...`);
  340. return from(checkFn(output, socketPath));
  341. }, simultaneous),
  342. startWith(false),
  343. distinctUntilChanged(),
  344. take(2)
  345. );
  346. }
  347. async function socketExists(output, socketPath) {
  348. return new Promise((resolve) => {
  349. const conn = net
  350. .connect(socketPath)
  351. .on('error', (err) => {
  352. output(` error connecting to socket socket:${socketPath} ${err.toString()}`);
  353. resolve(false);
  354. })
  355. .on('connect', () => {
  356. output(` connected to socket:${socketPath}`);
  357. conn.end();
  358. resolve(true);
  359. });
  360. });
  361. }
  362. function negateAsync(asyncFn) {
  363. return async function (...args) {
  364. return !(await asyncFn(...args));
  365. };
  366. }
  367. module.exports = waitOn;