streaming.js 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293
  1. "use strict";
  2. var _Stream_client;
  3. Object.defineProperty(exports, "__esModule", { value: true });
  4. exports.Stream = void 0;
  5. exports._iterSSEMessages = _iterSSEMessages;
  6. const tslib_1 = require("../internal/tslib.js");
  7. const error_1 = require("./error.js");
  8. const shims_1 = require("../internal/shims.js");
  9. const line_1 = require("../internal/decoders/line.js");
  10. const shims_2 = require("../internal/shims.js");
  11. const errors_1 = require("../internal/errors.js");
  12. const bytes_1 = require("../internal/utils/bytes.js");
  13. const log_1 = require("../internal/utils/log.js");
  14. const error_2 = require("./error.js");
  15. class Stream {
  16. constructor(iterator, controller, client) {
  17. this.iterator = iterator;
  18. _Stream_client.set(this, void 0);
  19. this.controller = controller;
  20. tslib_1.__classPrivateFieldSet(this, _Stream_client, client, "f");
  21. }
  22. static fromSSEResponse(response, controller, client) {
  23. let consumed = false;
  24. const logger = client ? (0, log_1.loggerFor)(client) : console;
  25. async function* iterator() {
  26. if (consumed) {
  27. throw new error_1.OpenAIError('Cannot iterate over a consumed stream, use `.tee()` to split the stream.');
  28. }
  29. consumed = true;
  30. let done = false;
  31. try {
  32. for await (const sse of _iterSSEMessages(response, controller)) {
  33. if (done)
  34. continue;
  35. if (sse.data.startsWith('[DONE]')) {
  36. done = true;
  37. continue;
  38. }
  39. if (sse.event === null || !sse.event.startsWith('thread.')) {
  40. let data;
  41. try {
  42. data = JSON.parse(sse.data);
  43. }
  44. catch (e) {
  45. logger.error(`Could not parse message into JSON:`, sse.data);
  46. logger.error(`From chunk:`, sse.raw);
  47. throw e;
  48. }
  49. if (data && data.error) {
  50. throw new error_2.APIError(undefined, data.error, undefined, response.headers);
  51. }
  52. yield data;
  53. }
  54. else {
  55. let data;
  56. try {
  57. data = JSON.parse(sse.data);
  58. }
  59. catch (e) {
  60. console.error(`Could not parse message into JSON:`, sse.data);
  61. console.error(`From chunk:`, sse.raw);
  62. throw e;
  63. }
  64. // TODO: Is this where the error should be thrown?
  65. if (sse.event == 'error') {
  66. throw new error_2.APIError(undefined, data.error, data.message, undefined);
  67. }
  68. yield { event: sse.event, data: data };
  69. }
  70. }
  71. done = true;
  72. }
  73. catch (e) {
  74. // If the user calls `stream.controller.abort()`, we should exit without throwing.
  75. if ((0, errors_1.isAbortError)(e))
  76. return;
  77. throw e;
  78. }
  79. finally {
  80. // If the user `break`s, abort the ongoing request.
  81. if (!done)
  82. controller.abort();
  83. }
  84. }
  85. return new Stream(iterator, controller, client);
  86. }
  87. /**
  88. * Generates a Stream from a newline-separated ReadableStream
  89. * where each item is a JSON value.
  90. */
  91. static fromReadableStream(readableStream, controller, client) {
  92. let consumed = false;
  93. async function* iterLines() {
  94. const lineDecoder = new line_1.LineDecoder();
  95. const iter = (0, shims_2.ReadableStreamToAsyncIterable)(readableStream);
  96. for await (const chunk of iter) {
  97. for (const line of lineDecoder.decode(chunk)) {
  98. yield line;
  99. }
  100. }
  101. for (const line of lineDecoder.flush()) {
  102. yield line;
  103. }
  104. }
  105. async function* iterator() {
  106. if (consumed) {
  107. throw new error_1.OpenAIError('Cannot iterate over a consumed stream, use `.tee()` to split the stream.');
  108. }
  109. consumed = true;
  110. let done = false;
  111. try {
  112. for await (const line of iterLines()) {
  113. if (done)
  114. continue;
  115. if (line)
  116. yield JSON.parse(line);
  117. }
  118. done = true;
  119. }
  120. catch (e) {
  121. // If the user calls `stream.controller.abort()`, we should exit without throwing.
  122. if ((0, errors_1.isAbortError)(e))
  123. return;
  124. throw e;
  125. }
  126. finally {
  127. // If the user `break`s, abort the ongoing request.
  128. if (!done)
  129. controller.abort();
  130. }
  131. }
  132. return new Stream(iterator, controller, client);
  133. }
  134. [(_Stream_client = new WeakMap(), Symbol.asyncIterator)]() {
  135. return this.iterator();
  136. }
  137. /**
  138. * Splits the stream into two streams which can be
  139. * independently read from at different speeds.
  140. */
  141. tee() {
  142. const left = [];
  143. const right = [];
  144. const iterator = this.iterator();
  145. const teeIterator = (queue) => {
  146. return {
  147. next: () => {
  148. if (queue.length === 0) {
  149. const result = iterator.next();
  150. left.push(result);
  151. right.push(result);
  152. }
  153. return queue.shift();
  154. },
  155. };
  156. };
  157. return [
  158. new Stream(() => teeIterator(left), this.controller, tslib_1.__classPrivateFieldGet(this, _Stream_client, "f")),
  159. new Stream(() => teeIterator(right), this.controller, tslib_1.__classPrivateFieldGet(this, _Stream_client, "f")),
  160. ];
  161. }
  162. /**
  163. * Converts this stream to a newline-separated ReadableStream of
  164. * JSON stringified values in the stream
  165. * which can be turned back into a Stream with `Stream.fromReadableStream()`.
  166. */
  167. toReadableStream() {
  168. const self = this;
  169. let iter;
  170. return (0, shims_1.makeReadableStream)({
  171. async start() {
  172. iter = self[Symbol.asyncIterator]();
  173. },
  174. async pull(ctrl) {
  175. try {
  176. const { value, done } = await iter.next();
  177. if (done)
  178. return ctrl.close();
  179. const bytes = (0, bytes_1.encodeUTF8)(JSON.stringify(value) + '\n');
  180. ctrl.enqueue(bytes);
  181. }
  182. catch (err) {
  183. ctrl.error(err);
  184. }
  185. },
  186. async cancel() {
  187. await iter.return?.();
  188. },
  189. });
  190. }
  191. }
  192. exports.Stream = Stream;
  193. async function* _iterSSEMessages(response, controller) {
  194. if (!response.body) {
  195. controller.abort();
  196. if (typeof globalThis.navigator !== 'undefined' &&
  197. globalThis.navigator.product === 'ReactNative') {
  198. throw new error_1.OpenAIError(`The default react-native fetch implementation does not support streaming. Please use expo/fetch: https://docs.expo.dev/versions/latest/sdk/expo/#expofetch-api`);
  199. }
  200. throw new error_1.OpenAIError(`Attempted to iterate over a response with no body`);
  201. }
  202. const sseDecoder = new SSEDecoder();
  203. const lineDecoder = new line_1.LineDecoder();
  204. const iter = (0, shims_2.ReadableStreamToAsyncIterable)(response.body);
  205. for await (const sseChunk of iterSSEChunks(iter)) {
  206. for (const line of lineDecoder.decode(sseChunk)) {
  207. const sse = sseDecoder.decode(line);
  208. if (sse)
  209. yield sse;
  210. }
  211. }
  212. for (const line of lineDecoder.flush()) {
  213. const sse = sseDecoder.decode(line);
  214. if (sse)
  215. yield sse;
  216. }
  217. }
  218. /**
  219. * Given an async iterable iterator, iterates over it and yields full
  220. * SSE chunks, i.e. yields when a double new-line is encountered.
  221. */
  222. async function* iterSSEChunks(iterator) {
  223. let data = new Uint8Array();
  224. for await (const chunk of iterator) {
  225. if (chunk == null) {
  226. continue;
  227. }
  228. const binaryChunk = chunk instanceof ArrayBuffer ? new Uint8Array(chunk)
  229. : typeof chunk === 'string' ? (0, bytes_1.encodeUTF8)(chunk)
  230. : chunk;
  231. let newData = new Uint8Array(data.length + binaryChunk.length);
  232. newData.set(data);
  233. newData.set(binaryChunk, data.length);
  234. data = newData;
  235. let patternIndex;
  236. while ((patternIndex = (0, line_1.findDoubleNewlineIndex)(data)) !== -1) {
  237. yield data.slice(0, patternIndex);
  238. data = data.slice(patternIndex);
  239. }
  240. }
  241. if (data.length > 0) {
  242. yield data;
  243. }
  244. }
  245. class SSEDecoder {
  246. constructor() {
  247. this.event = null;
  248. this.data = [];
  249. this.chunks = [];
  250. }
  251. decode(line) {
  252. if (line.endsWith('\r')) {
  253. line = line.substring(0, line.length - 1);
  254. }
  255. if (!line) {
  256. // empty line and we didn't previously encounter any messages
  257. if (!this.event && !this.data.length)
  258. return null;
  259. const sse = {
  260. event: this.event,
  261. data: this.data.join('\n'),
  262. raw: this.chunks,
  263. };
  264. this.event = null;
  265. this.data = [];
  266. this.chunks = [];
  267. return sse;
  268. }
  269. this.chunks.push(line);
  270. if (line.startsWith(':')) {
  271. return null;
  272. }
  273. let [fieldname, _, value] = partition(line, ':');
  274. if (value.startsWith(' ')) {
  275. value = value.substring(1);
  276. }
  277. if (fieldname === 'event') {
  278. this.event = value;
  279. }
  280. else if (fieldname === 'data') {
  281. this.data.push(value);
  282. }
  283. return null;
  284. }
  285. }
  286. function partition(str, delimiter) {
  287. const index = str.indexOf(delimiter);
  288. if (index !== -1) {
  289. return [str.substring(0, index), delimiter, str.substring(index + delimiter.length)];
  290. }
  291. return [str, '', ''];
  292. }
  293. //# sourceMappingURL=streaming.js.map