streaming.mjs 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288
  1. var _Stream_client;
  2. import { __classPrivateFieldGet, __classPrivateFieldSet } from "../internal/tslib.mjs";
  3. import { OpenAIError } from "./error.mjs";
  4. import { makeReadableStream } from "../internal/shims.mjs";
  5. import { findDoubleNewlineIndex, LineDecoder } from "../internal/decoders/line.mjs";
  6. import { ReadableStreamToAsyncIterable } from "../internal/shims.mjs";
  7. import { isAbortError } from "../internal/errors.mjs";
  8. import { encodeUTF8 } from "../internal/utils/bytes.mjs";
  9. import { loggerFor } from "../internal/utils/log.mjs";
  10. import { APIError } from "./error.mjs";
  11. export class Stream {
  12. constructor(iterator, controller, client) {
  13. this.iterator = iterator;
  14. _Stream_client.set(this, void 0);
  15. this.controller = controller;
  16. __classPrivateFieldSet(this, _Stream_client, client, "f");
  17. }
  18. static fromSSEResponse(response, controller, client) {
  19. let consumed = false;
  20. const logger = client ? loggerFor(client) : console;
  21. async function* iterator() {
  22. if (consumed) {
  23. throw new OpenAIError('Cannot iterate over a consumed stream, use `.tee()` to split the stream.');
  24. }
  25. consumed = true;
  26. let done = false;
  27. try {
  28. for await (const sse of _iterSSEMessages(response, controller)) {
  29. if (done)
  30. continue;
  31. if (sse.data.startsWith('[DONE]')) {
  32. done = true;
  33. continue;
  34. }
  35. if (sse.event === null || !sse.event.startsWith('thread.')) {
  36. let data;
  37. try {
  38. data = JSON.parse(sse.data);
  39. }
  40. catch (e) {
  41. logger.error(`Could not parse message into JSON:`, sse.data);
  42. logger.error(`From chunk:`, sse.raw);
  43. throw e;
  44. }
  45. if (data && data.error) {
  46. throw new APIError(undefined, data.error, undefined, response.headers);
  47. }
  48. yield data;
  49. }
  50. else {
  51. let data;
  52. try {
  53. data = JSON.parse(sse.data);
  54. }
  55. catch (e) {
  56. console.error(`Could not parse message into JSON:`, sse.data);
  57. console.error(`From chunk:`, sse.raw);
  58. throw e;
  59. }
  60. // TODO: Is this where the error should be thrown?
  61. if (sse.event == 'error') {
  62. throw new APIError(undefined, data.error, data.message, undefined);
  63. }
  64. yield { event: sse.event, data: data };
  65. }
  66. }
  67. done = true;
  68. }
  69. catch (e) {
  70. // If the user calls `stream.controller.abort()`, we should exit without throwing.
  71. if (isAbortError(e))
  72. return;
  73. throw e;
  74. }
  75. finally {
  76. // If the user `break`s, abort the ongoing request.
  77. if (!done)
  78. controller.abort();
  79. }
  80. }
  81. return new Stream(iterator, controller, client);
  82. }
  83. /**
  84. * Generates a Stream from a newline-separated ReadableStream
  85. * where each item is a JSON value.
  86. */
  87. static fromReadableStream(readableStream, controller, client) {
  88. let consumed = false;
  89. async function* iterLines() {
  90. const lineDecoder = new LineDecoder();
  91. const iter = ReadableStreamToAsyncIterable(readableStream);
  92. for await (const chunk of iter) {
  93. for (const line of lineDecoder.decode(chunk)) {
  94. yield line;
  95. }
  96. }
  97. for (const line of lineDecoder.flush()) {
  98. yield line;
  99. }
  100. }
  101. async function* iterator() {
  102. if (consumed) {
  103. throw new OpenAIError('Cannot iterate over a consumed stream, use `.tee()` to split the stream.');
  104. }
  105. consumed = true;
  106. let done = false;
  107. try {
  108. for await (const line of iterLines()) {
  109. if (done)
  110. continue;
  111. if (line)
  112. yield JSON.parse(line);
  113. }
  114. done = true;
  115. }
  116. catch (e) {
  117. // If the user calls `stream.controller.abort()`, we should exit without throwing.
  118. if (isAbortError(e))
  119. return;
  120. throw e;
  121. }
  122. finally {
  123. // If the user `break`s, abort the ongoing request.
  124. if (!done)
  125. controller.abort();
  126. }
  127. }
  128. return new Stream(iterator, controller, client);
  129. }
  130. [(_Stream_client = new WeakMap(), Symbol.asyncIterator)]() {
  131. return this.iterator();
  132. }
  133. /**
  134. * Splits the stream into two streams which can be
  135. * independently read from at different speeds.
  136. */
  137. tee() {
  138. const left = [];
  139. const right = [];
  140. const iterator = this.iterator();
  141. const teeIterator = (queue) => {
  142. return {
  143. next: () => {
  144. if (queue.length === 0) {
  145. const result = iterator.next();
  146. left.push(result);
  147. right.push(result);
  148. }
  149. return queue.shift();
  150. },
  151. };
  152. };
  153. return [
  154. new Stream(() => teeIterator(left), this.controller, __classPrivateFieldGet(this, _Stream_client, "f")),
  155. new Stream(() => teeIterator(right), this.controller, __classPrivateFieldGet(this, _Stream_client, "f")),
  156. ];
  157. }
  158. /**
  159. * Converts this stream to a newline-separated ReadableStream of
  160. * JSON stringified values in the stream
  161. * which can be turned back into a Stream with `Stream.fromReadableStream()`.
  162. */
  163. toReadableStream() {
  164. const self = this;
  165. let iter;
  166. return makeReadableStream({
  167. async start() {
  168. iter = self[Symbol.asyncIterator]();
  169. },
  170. async pull(ctrl) {
  171. try {
  172. const { value, done } = await iter.next();
  173. if (done)
  174. return ctrl.close();
  175. const bytes = encodeUTF8(JSON.stringify(value) + '\n');
  176. ctrl.enqueue(bytes);
  177. }
  178. catch (err) {
  179. ctrl.error(err);
  180. }
  181. },
  182. async cancel() {
  183. await iter.return?.();
  184. },
  185. });
  186. }
  187. }
  188. export async function* _iterSSEMessages(response, controller) {
  189. if (!response.body) {
  190. controller.abort();
  191. if (typeof globalThis.navigator !== 'undefined' &&
  192. globalThis.navigator.product === 'ReactNative') {
  193. throw new OpenAIError(`The default react-native fetch implementation does not support streaming. Please use expo/fetch: https://docs.expo.dev/versions/latest/sdk/expo/#expofetch-api`);
  194. }
  195. throw new OpenAIError(`Attempted to iterate over a response with no body`);
  196. }
  197. const sseDecoder = new SSEDecoder();
  198. const lineDecoder = new LineDecoder();
  199. const iter = ReadableStreamToAsyncIterable(response.body);
  200. for await (const sseChunk of iterSSEChunks(iter)) {
  201. for (const line of lineDecoder.decode(sseChunk)) {
  202. const sse = sseDecoder.decode(line);
  203. if (sse)
  204. yield sse;
  205. }
  206. }
  207. for (const line of lineDecoder.flush()) {
  208. const sse = sseDecoder.decode(line);
  209. if (sse)
  210. yield sse;
  211. }
  212. }
  213. /**
  214. * Given an async iterable iterator, iterates over it and yields full
  215. * SSE chunks, i.e. yields when a double new-line is encountered.
  216. */
  217. async function* iterSSEChunks(iterator) {
  218. let data = new Uint8Array();
  219. for await (const chunk of iterator) {
  220. if (chunk == null) {
  221. continue;
  222. }
  223. const binaryChunk = chunk instanceof ArrayBuffer ? new Uint8Array(chunk)
  224. : typeof chunk === 'string' ? encodeUTF8(chunk)
  225. : chunk;
  226. let newData = new Uint8Array(data.length + binaryChunk.length);
  227. newData.set(data);
  228. newData.set(binaryChunk, data.length);
  229. data = newData;
  230. let patternIndex;
  231. while ((patternIndex = findDoubleNewlineIndex(data)) !== -1) {
  232. yield data.slice(0, patternIndex);
  233. data = data.slice(patternIndex);
  234. }
  235. }
  236. if (data.length > 0) {
  237. yield data;
  238. }
  239. }
  240. class SSEDecoder {
  241. constructor() {
  242. this.event = null;
  243. this.data = [];
  244. this.chunks = [];
  245. }
  246. decode(line) {
  247. if (line.endsWith('\r')) {
  248. line = line.substring(0, line.length - 1);
  249. }
  250. if (!line) {
  251. // empty line and we didn't previously encounter any messages
  252. if (!this.event && !this.data.length)
  253. return null;
  254. const sse = {
  255. event: this.event,
  256. data: this.data.join('\n'),
  257. raw: this.chunks,
  258. };
  259. this.event = null;
  260. this.data = [];
  261. this.chunks = [];
  262. return sse;
  263. }
  264. this.chunks.push(line);
  265. if (line.startsWith(':')) {
  266. return null;
  267. }
  268. let [fieldname, _, value] = partition(line, ':');
  269. if (value.startsWith(' ')) {
  270. value = value.substring(1);
  271. }
  272. if (fieldname === 'event') {
  273. this.event = value;
  274. }
  275. else if (fieldname === 'data') {
  276. this.data.push(value);
  277. }
  278. return null;
  279. }
  280. }
  281. function partition(str, delimiter) {
  282. const index = str.indexOf(delimiter);
  283. if (index !== -1) {
  284. return [str.substring(0, index), delimiter, str.substring(index + delimiter.length)];
  285. }
  286. return [str, '', ''];
  287. }
  288. //# sourceMappingURL=streaming.mjs.map