| 12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094 |
- # File generated from our OpenAPI spec by Stainless. See CONTRIBUTING.md for details.
- from __future__ import annotations
- import json
- import logging
- from types import TracebackType
- from typing import TYPE_CHECKING, Any, Iterator, cast
- from typing_extensions import AsyncIterator
- import httpx
- from pydantic import BaseModel
- from .sessions import (
- Sessions,
- AsyncSessions,
- SessionsWithRawResponse,
- AsyncSessionsWithRawResponse,
- SessionsWithStreamingResponse,
- AsyncSessionsWithStreamingResponse,
- )
- from ...._types import NOT_GIVEN, Query, Headers, NotGiven
- from ...._utils import (
- is_azure_client,
- maybe_transform,
- strip_not_given,
- async_maybe_transform,
- is_async_azure_client,
- )
- from ...._compat import cached_property
- from ...._models import construct_type_unchecked
- from ...._resource import SyncAPIResource, AsyncAPIResource
- from ...._exceptions import OpenAIError
- from ...._base_client import _merge_mappings
- from ....types.beta.realtime import (
- session_update_event_param,
- response_create_event_param,
- transcription_session_update_param,
- )
- from .transcription_sessions import (
- TranscriptionSessions,
- AsyncTranscriptionSessions,
- TranscriptionSessionsWithRawResponse,
- AsyncTranscriptionSessionsWithRawResponse,
- TranscriptionSessionsWithStreamingResponse,
- AsyncTranscriptionSessionsWithStreamingResponse,
- )
- from ....types.websocket_connection_options import WebsocketConnectionOptions
- from ....types.beta.realtime.realtime_client_event import RealtimeClientEvent
- from ....types.beta.realtime.realtime_server_event import RealtimeServerEvent
- from ....types.beta.realtime.conversation_item_param import ConversationItemParam
- from ....types.beta.realtime.realtime_client_event_param import RealtimeClientEventParam
- if TYPE_CHECKING:
- from websockets.sync.client import ClientConnection as WebsocketConnection
- from websockets.asyncio.client import ClientConnection as AsyncWebsocketConnection
- from ...._client import OpenAI, AsyncOpenAI
- __all__ = ["Realtime", "AsyncRealtime"]
- log: logging.Logger = logging.getLogger(__name__)
- class Realtime(SyncAPIResource):
- @cached_property
- def sessions(self) -> Sessions:
- return Sessions(self._client)
- @cached_property
- def transcription_sessions(self) -> TranscriptionSessions:
- return TranscriptionSessions(self._client)
- @cached_property
- def with_raw_response(self) -> RealtimeWithRawResponse:
- """
- This property can be used as a prefix for any HTTP method call to return
- the raw response object instead of the parsed content.
- For more information, see https://www.github.com/openai/openai-python#accessing-raw-response-data-eg-headers
- """
- return RealtimeWithRawResponse(self)
- @cached_property
- def with_streaming_response(self) -> RealtimeWithStreamingResponse:
- """
- An alternative to `.with_raw_response` that doesn't eagerly read the response body.
- For more information, see https://www.github.com/openai/openai-python#with_streaming_response
- """
- return RealtimeWithStreamingResponse(self)
- def connect(
- self,
- *,
- model: str,
- extra_query: Query = {},
- extra_headers: Headers = {},
- websocket_connection_options: WebsocketConnectionOptions = {},
- ) -> RealtimeConnectionManager:
- """
- The Realtime API enables you to build low-latency, multi-modal conversational experiences. It currently supports text and audio as both input and output, as well as function calling.
- Some notable benefits of the API include:
- - Native speech-to-speech: Skipping an intermediate text format means low latency and nuanced output.
- - Natural, steerable voices: The models have natural inflection and can laugh, whisper, and adhere to tone direction.
- - Simultaneous multimodal output: Text is useful for moderation; faster-than-realtime audio ensures stable playback.
- The Realtime API is a stateful, event-based API that communicates over a WebSocket.
- """
- return RealtimeConnectionManager(
- client=self._client,
- extra_query=extra_query,
- extra_headers=extra_headers,
- websocket_connection_options=websocket_connection_options,
- model=model,
- )
- class AsyncRealtime(AsyncAPIResource):
- @cached_property
- def sessions(self) -> AsyncSessions:
- return AsyncSessions(self._client)
- @cached_property
- def transcription_sessions(self) -> AsyncTranscriptionSessions:
- return AsyncTranscriptionSessions(self._client)
- @cached_property
- def with_raw_response(self) -> AsyncRealtimeWithRawResponse:
- """
- This property can be used as a prefix for any HTTP method call to return
- the raw response object instead of the parsed content.
- For more information, see https://www.github.com/openai/openai-python#accessing-raw-response-data-eg-headers
- """
- return AsyncRealtimeWithRawResponse(self)
- @cached_property
- def with_streaming_response(self) -> AsyncRealtimeWithStreamingResponse:
- """
- An alternative to `.with_raw_response` that doesn't eagerly read the response body.
- For more information, see https://www.github.com/openai/openai-python#with_streaming_response
- """
- return AsyncRealtimeWithStreamingResponse(self)
- def connect(
- self,
- *,
- model: str,
- extra_query: Query = {},
- extra_headers: Headers = {},
- websocket_connection_options: WebsocketConnectionOptions = {},
- ) -> AsyncRealtimeConnectionManager:
- """
- The Realtime API enables you to build low-latency, multi-modal conversational experiences. It currently supports text and audio as both input and output, as well as function calling.
- Some notable benefits of the API include:
- - Native speech-to-speech: Skipping an intermediate text format means low latency and nuanced output.
- - Natural, steerable voices: The models have natural inflection and can laugh, whisper, and adhere to tone direction.
- - Simultaneous multimodal output: Text is useful for moderation; faster-than-realtime audio ensures stable playback.
- The Realtime API is a stateful, event-based API that communicates over a WebSocket.
- """
- return AsyncRealtimeConnectionManager(
- client=self._client,
- extra_query=extra_query,
- extra_headers=extra_headers,
- websocket_connection_options=websocket_connection_options,
- model=model,
- )
- class RealtimeWithRawResponse:
- def __init__(self, realtime: Realtime) -> None:
- self._realtime = realtime
- @cached_property
- def sessions(self) -> SessionsWithRawResponse:
- return SessionsWithRawResponse(self._realtime.sessions)
- @cached_property
- def transcription_sessions(self) -> TranscriptionSessionsWithRawResponse:
- return TranscriptionSessionsWithRawResponse(self._realtime.transcription_sessions)
- class AsyncRealtimeWithRawResponse:
- def __init__(self, realtime: AsyncRealtime) -> None:
- self._realtime = realtime
- @cached_property
- def sessions(self) -> AsyncSessionsWithRawResponse:
- return AsyncSessionsWithRawResponse(self._realtime.sessions)
- @cached_property
- def transcription_sessions(self) -> AsyncTranscriptionSessionsWithRawResponse:
- return AsyncTranscriptionSessionsWithRawResponse(self._realtime.transcription_sessions)
- class RealtimeWithStreamingResponse:
- def __init__(self, realtime: Realtime) -> None:
- self._realtime = realtime
- @cached_property
- def sessions(self) -> SessionsWithStreamingResponse:
- return SessionsWithStreamingResponse(self._realtime.sessions)
- @cached_property
- def transcription_sessions(self) -> TranscriptionSessionsWithStreamingResponse:
- return TranscriptionSessionsWithStreamingResponse(self._realtime.transcription_sessions)
- class AsyncRealtimeWithStreamingResponse:
- def __init__(self, realtime: AsyncRealtime) -> None:
- self._realtime = realtime
- @cached_property
- def sessions(self) -> AsyncSessionsWithStreamingResponse:
- return AsyncSessionsWithStreamingResponse(self._realtime.sessions)
- @cached_property
- def transcription_sessions(self) -> AsyncTranscriptionSessionsWithStreamingResponse:
- return AsyncTranscriptionSessionsWithStreamingResponse(self._realtime.transcription_sessions)
- class AsyncRealtimeConnection:
- """Represents a live websocket connection to the Realtime API"""
- session: AsyncRealtimeSessionResource
- response: AsyncRealtimeResponseResource
- input_audio_buffer: AsyncRealtimeInputAudioBufferResource
- conversation: AsyncRealtimeConversationResource
- output_audio_buffer: AsyncRealtimeOutputAudioBufferResource
- transcription_session: AsyncRealtimeTranscriptionSessionResource
- _connection: AsyncWebsocketConnection
- def __init__(self, connection: AsyncWebsocketConnection) -> None:
- self._connection = connection
- self.session = AsyncRealtimeSessionResource(self)
- self.response = AsyncRealtimeResponseResource(self)
- self.input_audio_buffer = AsyncRealtimeInputAudioBufferResource(self)
- self.conversation = AsyncRealtimeConversationResource(self)
- self.output_audio_buffer = AsyncRealtimeOutputAudioBufferResource(self)
- self.transcription_session = AsyncRealtimeTranscriptionSessionResource(self)
- async def __aiter__(self) -> AsyncIterator[RealtimeServerEvent]:
- """
- An infinite-iterator that will continue to yield events until
- the connection is closed.
- """
- from websockets.exceptions import ConnectionClosedOK
- try:
- while True:
- yield await self.recv()
- except ConnectionClosedOK:
- return
- async def recv(self) -> RealtimeServerEvent:
- """
- Receive the next message from the connection and parses it into a `RealtimeServerEvent` object.
- Canceling this method is safe. There's no risk of losing data.
- """
- return self.parse_event(await self.recv_bytes())
- async def recv_bytes(self) -> bytes:
- """Receive the next message from the connection as raw bytes.
- Canceling this method is safe. There's no risk of losing data.
- If you want to parse the message into a `RealtimeServerEvent` object like `.recv()` does,
- then you can call `.parse_event(data)`.
- """
- message = await self._connection.recv(decode=False)
- log.debug(f"Received websocket message: %s", message)
- return message
- async def send(self, event: RealtimeClientEvent | RealtimeClientEventParam) -> None:
- data = (
- event.to_json(use_api_names=True, exclude_defaults=True, exclude_unset=True)
- if isinstance(event, BaseModel)
- else json.dumps(await async_maybe_transform(event, RealtimeClientEventParam))
- )
- await self._connection.send(data)
- async def close(self, *, code: int = 1000, reason: str = "") -> None:
- await self._connection.close(code=code, reason=reason)
- def parse_event(self, data: str | bytes) -> RealtimeServerEvent:
- """
- Converts a raw `str` or `bytes` message into a `RealtimeServerEvent` object.
- This is helpful if you're using `.recv_bytes()`.
- """
- return cast(
- RealtimeServerEvent, construct_type_unchecked(value=json.loads(data), type_=cast(Any, RealtimeServerEvent))
- )
- class AsyncRealtimeConnectionManager:
- """
- Context manager over a `AsyncRealtimeConnection` that is returned by `beta.realtime.connect()`
- This context manager ensures that the connection will be closed when it exits.
- ---
- Note that if your application doesn't work well with the context manager approach then you
- can call the `.enter()` method directly to initiate a connection.
- **Warning**: You must remember to close the connection with `.close()`.
- ```py
- connection = await client.beta.realtime.connect(...).enter()
- # ...
- await connection.close()
- ```
- """
- def __init__(
- self,
- *,
- client: AsyncOpenAI,
- model: str,
- extra_query: Query,
- extra_headers: Headers,
- websocket_connection_options: WebsocketConnectionOptions,
- ) -> None:
- self.__client = client
- self.__model = model
- self.__connection: AsyncRealtimeConnection | None = None
- self.__extra_query = extra_query
- self.__extra_headers = extra_headers
- self.__websocket_connection_options = websocket_connection_options
- async def __aenter__(self) -> AsyncRealtimeConnection:
- """
- 👋 If your application doesn't work well with the context manager approach then you
- can call this method directly to initiate a connection.
- **Warning**: You must remember to close the connection with `.close()`.
- ```py
- connection = await client.beta.realtime.connect(...).enter()
- # ...
- await connection.close()
- ```
- """
- try:
- from websockets.asyncio.client import connect
- except ImportError as exc:
- raise OpenAIError("You need to install `openai[realtime]` to use this method") from exc
- extra_query = self.__extra_query
- await self.__client._refresh_api_key()
- auth_headers = self.__client.auth_headers
- if is_async_azure_client(self.__client):
- url, auth_headers = await self.__client._configure_realtime(self.__model, extra_query)
- else:
- url = self._prepare_url().copy_with(
- params={
- **self.__client.base_url.params,
- "model": self.__model,
- **extra_query,
- },
- )
- log.debug("Connecting to %s", url)
- if self.__websocket_connection_options:
- log.debug("Connection options: %s", self.__websocket_connection_options)
- self.__connection = AsyncRealtimeConnection(
- await connect(
- str(url),
- user_agent_header=self.__client.user_agent,
- additional_headers=_merge_mappings(
- {
- **auth_headers,
- "OpenAI-Beta": "realtime=v1",
- },
- self.__extra_headers,
- ),
- **self.__websocket_connection_options,
- )
- )
- return self.__connection
- enter = __aenter__
- def _prepare_url(self) -> httpx.URL:
- if self.__client.websocket_base_url is not None:
- base_url = httpx.URL(self.__client.websocket_base_url)
- else:
- base_url = self.__client._base_url.copy_with(scheme="wss")
- merge_raw_path = base_url.raw_path.rstrip(b"/") + b"/realtime"
- return base_url.copy_with(raw_path=merge_raw_path)
- async def __aexit__(
- self, exc_type: type[BaseException] | None, exc: BaseException | None, exc_tb: TracebackType | None
- ) -> None:
- if self.__connection is not None:
- await self.__connection.close()
- class RealtimeConnection:
- """Represents a live websocket connection to the Realtime API"""
- session: RealtimeSessionResource
- response: RealtimeResponseResource
- input_audio_buffer: RealtimeInputAudioBufferResource
- conversation: RealtimeConversationResource
- output_audio_buffer: RealtimeOutputAudioBufferResource
- transcription_session: RealtimeTranscriptionSessionResource
- _connection: WebsocketConnection
- def __init__(self, connection: WebsocketConnection) -> None:
- self._connection = connection
- self.session = RealtimeSessionResource(self)
- self.response = RealtimeResponseResource(self)
- self.input_audio_buffer = RealtimeInputAudioBufferResource(self)
- self.conversation = RealtimeConversationResource(self)
- self.output_audio_buffer = RealtimeOutputAudioBufferResource(self)
- self.transcription_session = RealtimeTranscriptionSessionResource(self)
- def __iter__(self) -> Iterator[RealtimeServerEvent]:
- """
- An infinite-iterator that will continue to yield events until
- the connection is closed.
- """
- from websockets.exceptions import ConnectionClosedOK
- try:
- while True:
- yield self.recv()
- except ConnectionClosedOK:
- return
- def recv(self) -> RealtimeServerEvent:
- """
- Receive the next message from the connection and parses it into a `RealtimeServerEvent` object.
- Canceling this method is safe. There's no risk of losing data.
- """
- return self.parse_event(self.recv_bytes())
- def recv_bytes(self) -> bytes:
- """Receive the next message from the connection as raw bytes.
- Canceling this method is safe. There's no risk of losing data.
- If you want to parse the message into a `RealtimeServerEvent` object like `.recv()` does,
- then you can call `.parse_event(data)`.
- """
- message = self._connection.recv(decode=False)
- log.debug(f"Received websocket message: %s", message)
- return message
- def send(self, event: RealtimeClientEvent | RealtimeClientEventParam) -> None:
- data = (
- event.to_json(use_api_names=True, exclude_defaults=True, exclude_unset=True)
- if isinstance(event, BaseModel)
- else json.dumps(maybe_transform(event, RealtimeClientEventParam))
- )
- self._connection.send(data)
- def close(self, *, code: int = 1000, reason: str = "") -> None:
- self._connection.close(code=code, reason=reason)
- def parse_event(self, data: str | bytes) -> RealtimeServerEvent:
- """
- Converts a raw `str` or `bytes` message into a `RealtimeServerEvent` object.
- This is helpful if you're using `.recv_bytes()`.
- """
- return cast(
- RealtimeServerEvent, construct_type_unchecked(value=json.loads(data), type_=cast(Any, RealtimeServerEvent))
- )
- class RealtimeConnectionManager:
- """
- Context manager over a `RealtimeConnection` that is returned by `beta.realtime.connect()`
- This context manager ensures that the connection will be closed when it exits.
- ---
- Note that if your application doesn't work well with the context manager approach then you
- can call the `.enter()` method directly to initiate a connection.
- **Warning**: You must remember to close the connection with `.close()`.
- ```py
- connection = client.beta.realtime.connect(...).enter()
- # ...
- connection.close()
- ```
- """
- def __init__(
- self,
- *,
- client: OpenAI,
- model: str,
- extra_query: Query,
- extra_headers: Headers,
- websocket_connection_options: WebsocketConnectionOptions,
- ) -> None:
- self.__client = client
- self.__model = model
- self.__connection: RealtimeConnection | None = None
- self.__extra_query = extra_query
- self.__extra_headers = extra_headers
- self.__websocket_connection_options = websocket_connection_options
- def __enter__(self) -> RealtimeConnection:
- """
- 👋 If your application doesn't work well with the context manager approach then you
- can call this method directly to initiate a connection.
- **Warning**: You must remember to close the connection with `.close()`.
- ```py
- connection = client.beta.realtime.connect(...).enter()
- # ...
- connection.close()
- ```
- """
- try:
- from websockets.sync.client import connect
- except ImportError as exc:
- raise OpenAIError("You need to install `openai[realtime]` to use this method") from exc
- extra_query = self.__extra_query
- self.__client._refresh_api_key()
- auth_headers = self.__client.auth_headers
- if is_azure_client(self.__client):
- url, auth_headers = self.__client._configure_realtime(self.__model, extra_query)
- else:
- url = self._prepare_url().copy_with(
- params={
- **self.__client.base_url.params,
- "model": self.__model,
- **extra_query,
- },
- )
- log.debug("Connecting to %s", url)
- if self.__websocket_connection_options:
- log.debug("Connection options: %s", self.__websocket_connection_options)
- self.__connection = RealtimeConnection(
- connect(
- str(url),
- user_agent_header=self.__client.user_agent,
- additional_headers=_merge_mappings(
- {
- **auth_headers,
- "OpenAI-Beta": "realtime=v1",
- },
- self.__extra_headers,
- ),
- **self.__websocket_connection_options,
- )
- )
- return self.__connection
- enter = __enter__
- def _prepare_url(self) -> httpx.URL:
- if self.__client.websocket_base_url is not None:
- base_url = httpx.URL(self.__client.websocket_base_url)
- else:
- base_url = self.__client._base_url.copy_with(scheme="wss")
- merge_raw_path = base_url.raw_path.rstrip(b"/") + b"/realtime"
- return base_url.copy_with(raw_path=merge_raw_path)
- def __exit__(
- self, exc_type: type[BaseException] | None, exc: BaseException | None, exc_tb: TracebackType | None
- ) -> None:
- if self.__connection is not None:
- self.__connection.close()
- class BaseRealtimeConnectionResource:
- def __init__(self, connection: RealtimeConnection) -> None:
- self._connection = connection
- class RealtimeSessionResource(BaseRealtimeConnectionResource):
- def update(self, *, session: session_update_event_param.Session, event_id: str | NotGiven = NOT_GIVEN) -> None:
- """
- Send this event to update the session’s default configuration.
- The client may send this event at any time to update any field,
- except for `voice`. However, note that once a session has been
- initialized with a particular `model`, it can’t be changed to
- another model using `session.update`.
- When the server receives a `session.update`, it will respond
- with a `session.updated` event showing the full, effective configuration.
- Only the fields that are present are updated. To clear a field like
- `instructions`, pass an empty string.
- """
- self._connection.send(
- cast(
- RealtimeClientEventParam,
- strip_not_given({"type": "session.update", "session": session, "event_id": event_id}),
- )
- )
- class RealtimeResponseResource(BaseRealtimeConnectionResource):
- def create(
- self,
- *,
- event_id: str | NotGiven = NOT_GIVEN,
- response: response_create_event_param.Response | NotGiven = NOT_GIVEN,
- ) -> None:
- """
- This event instructs the server to create a Response, which means triggering
- model inference. When in Server VAD mode, the server will create Responses
- automatically.
- A Response will include at least one Item, and may have two, in which case
- the second will be a function call. These Items will be appended to the
- conversation history.
- The server will respond with a `response.created` event, events for Items
- and content created, and finally a `response.done` event to indicate the
- Response is complete.
- The `response.create` event includes inference configuration like
- `instructions`, and `temperature`. These fields will override the Session's
- configuration for this Response only.
- """
- self._connection.send(
- cast(
- RealtimeClientEventParam,
- strip_not_given({"type": "response.create", "event_id": event_id, "response": response}),
- )
- )
- def cancel(self, *, event_id: str | NotGiven = NOT_GIVEN, response_id: str | NotGiven = NOT_GIVEN) -> None:
- """Send this event to cancel an in-progress response.
- The server will respond
- with a `response.done` event with a status of `response.status=cancelled`. If
- there is no response to cancel, the server will respond with an error.
- """
- self._connection.send(
- cast(
- RealtimeClientEventParam,
- strip_not_given({"type": "response.cancel", "event_id": event_id, "response_id": response_id}),
- )
- )
- class RealtimeInputAudioBufferResource(BaseRealtimeConnectionResource):
- def clear(self, *, event_id: str | NotGiven = NOT_GIVEN) -> None:
- """Send this event to clear the audio bytes in the buffer.
- The server will
- respond with an `input_audio_buffer.cleared` event.
- """
- self._connection.send(
- cast(RealtimeClientEventParam, strip_not_given({"type": "input_audio_buffer.clear", "event_id": event_id}))
- )
- def commit(self, *, event_id: str | NotGiven = NOT_GIVEN) -> None:
- """
- Send this event to commit the user input audio buffer, which will create a
- new user message item in the conversation. This event will produce an error
- if the input audio buffer is empty. When in Server VAD mode, the client does
- not need to send this event, the server will commit the audio buffer
- automatically.
- Committing the input audio buffer will trigger input audio transcription
- (if enabled in session configuration), but it will not create a response
- from the model. The server will respond with an `input_audio_buffer.committed`
- event.
- """
- self._connection.send(
- cast(RealtimeClientEventParam, strip_not_given({"type": "input_audio_buffer.commit", "event_id": event_id}))
- )
- def append(self, *, audio: str, event_id: str | NotGiven = NOT_GIVEN) -> None:
- """Send this event to append audio bytes to the input audio buffer.
- The audio
- buffer is temporary storage you can write to and later commit. In Server VAD
- mode, the audio buffer is used to detect speech and the server will decide
- when to commit. When Server VAD is disabled, you must commit the audio buffer
- manually.
- The client may choose how much audio to place in each event up to a maximum
- of 15 MiB, for example streaming smaller chunks from the client may allow the
- VAD to be more responsive. Unlike made other client events, the server will
- not send a confirmation response to this event.
- """
- self._connection.send(
- cast(
- RealtimeClientEventParam,
- strip_not_given({"type": "input_audio_buffer.append", "audio": audio, "event_id": event_id}),
- )
- )
- class RealtimeConversationResource(BaseRealtimeConnectionResource):
- @cached_property
- def item(self) -> RealtimeConversationItemResource:
- return RealtimeConversationItemResource(self._connection)
- class RealtimeConversationItemResource(BaseRealtimeConnectionResource):
- def delete(self, *, item_id: str, event_id: str | NotGiven = NOT_GIVEN) -> None:
- """Send this event when you want to remove any item from the conversation
- history.
- The server will respond with a `conversation.item.deleted` event,
- unless the item does not exist in the conversation history, in which case the
- server will respond with an error.
- """
- self._connection.send(
- cast(
- RealtimeClientEventParam,
- strip_not_given({"type": "conversation.item.delete", "item_id": item_id, "event_id": event_id}),
- )
- )
- def create(
- self,
- *,
- item: ConversationItemParam,
- event_id: str | NotGiven = NOT_GIVEN,
- previous_item_id: str | NotGiven = NOT_GIVEN,
- ) -> None:
- """
- Add a new Item to the Conversation's context, including messages, function
- calls, and function call responses. This event can be used both to populate a
- "history" of the conversation and to add new items mid-stream, but has the
- current limitation that it cannot populate assistant audio messages.
- If successful, the server will respond with a `conversation.item.created`
- event, otherwise an `error` event will be sent.
- """
- self._connection.send(
- cast(
- RealtimeClientEventParam,
- strip_not_given(
- {
- "type": "conversation.item.create",
- "item": item,
- "event_id": event_id,
- "previous_item_id": previous_item_id,
- }
- ),
- )
- )
- def truncate(
- self, *, audio_end_ms: int, content_index: int, item_id: str, event_id: str | NotGiven = NOT_GIVEN
- ) -> None:
- """Send this event to truncate a previous assistant message’s audio.
- The server
- will produce audio faster than realtime, so this event is useful when the user
- interrupts to truncate audio that has already been sent to the client but not
- yet played. This will synchronize the server's understanding of the audio with
- the client's playback.
- Truncating audio will delete the server-side text transcript to ensure there
- is not text in the context that hasn't been heard by the user.
- If successful, the server will respond with a `conversation.item.truncated`
- event.
- """
- self._connection.send(
- cast(
- RealtimeClientEventParam,
- strip_not_given(
- {
- "type": "conversation.item.truncate",
- "audio_end_ms": audio_end_ms,
- "content_index": content_index,
- "item_id": item_id,
- "event_id": event_id,
- }
- ),
- )
- )
- def retrieve(self, *, item_id: str, event_id: str | NotGiven = NOT_GIVEN) -> None:
- """
- Send this event when you want to retrieve the server's representation of a specific item in the conversation history. This is useful, for example, to inspect user audio after noise cancellation and VAD.
- The server will respond with a `conversation.item.retrieved` event,
- unless the item does not exist in the conversation history, in which case the
- server will respond with an error.
- """
- self._connection.send(
- cast(
- RealtimeClientEventParam,
- strip_not_given({"type": "conversation.item.retrieve", "item_id": item_id, "event_id": event_id}),
- )
- )
- class RealtimeOutputAudioBufferResource(BaseRealtimeConnectionResource):
- def clear(self, *, event_id: str | NotGiven = NOT_GIVEN) -> None:
- """**WebRTC Only:** Emit to cut off the current audio response.
- This will trigger the server to
- stop generating audio and emit a `output_audio_buffer.cleared` event. This
- event should be preceded by a `response.cancel` client event to stop the
- generation of the current response.
- [Learn more](https://platform.openai.com/docs/guides/realtime-conversations#client-and-server-events-for-audio-in-webrtc).
- """
- self._connection.send(
- cast(RealtimeClientEventParam, strip_not_given({"type": "output_audio_buffer.clear", "event_id": event_id}))
- )
- class RealtimeTranscriptionSessionResource(BaseRealtimeConnectionResource):
- def update(
- self, *, session: transcription_session_update_param.Session, event_id: str | NotGiven = NOT_GIVEN
- ) -> None:
- """Send this event to update a transcription session."""
- self._connection.send(
- cast(
- RealtimeClientEventParam,
- strip_not_given({"type": "transcription_session.update", "session": session, "event_id": event_id}),
- )
- )
- class BaseAsyncRealtimeConnectionResource:
- def __init__(self, connection: AsyncRealtimeConnection) -> None:
- self._connection = connection
- class AsyncRealtimeSessionResource(BaseAsyncRealtimeConnectionResource):
- async def update(
- self, *, session: session_update_event_param.Session, event_id: str | NotGiven = NOT_GIVEN
- ) -> None:
- """
- Send this event to update the session’s default configuration.
- The client may send this event at any time to update any field,
- except for `voice`. However, note that once a session has been
- initialized with a particular `model`, it can’t be changed to
- another model using `session.update`.
- When the server receives a `session.update`, it will respond
- with a `session.updated` event showing the full, effective configuration.
- Only the fields that are present are updated. To clear a field like
- `instructions`, pass an empty string.
- """
- await self._connection.send(
- cast(
- RealtimeClientEventParam,
- strip_not_given({"type": "session.update", "session": session, "event_id": event_id}),
- )
- )
- class AsyncRealtimeResponseResource(BaseAsyncRealtimeConnectionResource):
- async def create(
- self,
- *,
- event_id: str | NotGiven = NOT_GIVEN,
- response: response_create_event_param.Response | NotGiven = NOT_GIVEN,
- ) -> None:
- """
- This event instructs the server to create a Response, which means triggering
- model inference. When in Server VAD mode, the server will create Responses
- automatically.
- A Response will include at least one Item, and may have two, in which case
- the second will be a function call. These Items will be appended to the
- conversation history.
- The server will respond with a `response.created` event, events for Items
- and content created, and finally a `response.done` event to indicate the
- Response is complete.
- The `response.create` event includes inference configuration like
- `instructions`, and `temperature`. These fields will override the Session's
- configuration for this Response only.
- """
- await self._connection.send(
- cast(
- RealtimeClientEventParam,
- strip_not_given({"type": "response.create", "event_id": event_id, "response": response}),
- )
- )
- async def cancel(self, *, event_id: str | NotGiven = NOT_GIVEN, response_id: str | NotGiven = NOT_GIVEN) -> None:
- """Send this event to cancel an in-progress response.
- The server will respond
- with a `response.done` event with a status of `response.status=cancelled`. If
- there is no response to cancel, the server will respond with an error.
- """
- await self._connection.send(
- cast(
- RealtimeClientEventParam,
- strip_not_given({"type": "response.cancel", "event_id": event_id, "response_id": response_id}),
- )
- )
- class AsyncRealtimeInputAudioBufferResource(BaseAsyncRealtimeConnectionResource):
- async def clear(self, *, event_id: str | NotGiven = NOT_GIVEN) -> None:
- """Send this event to clear the audio bytes in the buffer.
- The server will
- respond with an `input_audio_buffer.cleared` event.
- """
- await self._connection.send(
- cast(RealtimeClientEventParam, strip_not_given({"type": "input_audio_buffer.clear", "event_id": event_id}))
- )
- async def commit(self, *, event_id: str | NotGiven = NOT_GIVEN) -> None:
- """
- Send this event to commit the user input audio buffer, which will create a
- new user message item in the conversation. This event will produce an error
- if the input audio buffer is empty. When in Server VAD mode, the client does
- not need to send this event, the server will commit the audio buffer
- automatically.
- Committing the input audio buffer will trigger input audio transcription
- (if enabled in session configuration), but it will not create a response
- from the model. The server will respond with an `input_audio_buffer.committed`
- event.
- """
- await self._connection.send(
- cast(RealtimeClientEventParam, strip_not_given({"type": "input_audio_buffer.commit", "event_id": event_id}))
- )
- async def append(self, *, audio: str, event_id: str | NotGiven = NOT_GIVEN) -> None:
- """Send this event to append audio bytes to the input audio buffer.
- The audio
- buffer is temporary storage you can write to and later commit. In Server VAD
- mode, the audio buffer is used to detect speech and the server will decide
- when to commit. When Server VAD is disabled, you must commit the audio buffer
- manually.
- The client may choose how much audio to place in each event up to a maximum
- of 15 MiB, for example streaming smaller chunks from the client may allow the
- VAD to be more responsive. Unlike made other client events, the server will
- not send a confirmation response to this event.
- """
- await self._connection.send(
- cast(
- RealtimeClientEventParam,
- strip_not_given({"type": "input_audio_buffer.append", "audio": audio, "event_id": event_id}),
- )
- )
- class AsyncRealtimeConversationResource(BaseAsyncRealtimeConnectionResource):
- @cached_property
- def item(self) -> AsyncRealtimeConversationItemResource:
- return AsyncRealtimeConversationItemResource(self._connection)
- class AsyncRealtimeConversationItemResource(BaseAsyncRealtimeConnectionResource):
- async def delete(self, *, item_id: str, event_id: str | NotGiven = NOT_GIVEN) -> None:
- """Send this event when you want to remove any item from the conversation
- history.
- The server will respond with a `conversation.item.deleted` event,
- unless the item does not exist in the conversation history, in which case the
- server will respond with an error.
- """
- await self._connection.send(
- cast(
- RealtimeClientEventParam,
- strip_not_given({"type": "conversation.item.delete", "item_id": item_id, "event_id": event_id}),
- )
- )
- async def create(
- self,
- *,
- item: ConversationItemParam,
- event_id: str | NotGiven = NOT_GIVEN,
- previous_item_id: str | NotGiven = NOT_GIVEN,
- ) -> None:
- """
- Add a new Item to the Conversation's context, including messages, function
- calls, and function call responses. This event can be used both to populate a
- "history" of the conversation and to add new items mid-stream, but has the
- current limitation that it cannot populate assistant audio messages.
- If successful, the server will respond with a `conversation.item.created`
- event, otherwise an `error` event will be sent.
- """
- await self._connection.send(
- cast(
- RealtimeClientEventParam,
- strip_not_given(
- {
- "type": "conversation.item.create",
- "item": item,
- "event_id": event_id,
- "previous_item_id": previous_item_id,
- }
- ),
- )
- )
- async def truncate(
- self, *, audio_end_ms: int, content_index: int, item_id: str, event_id: str | NotGiven = NOT_GIVEN
- ) -> None:
- """Send this event to truncate a previous assistant message’s audio.
- The server
- will produce audio faster than realtime, so this event is useful when the user
- interrupts to truncate audio that has already been sent to the client but not
- yet played. This will synchronize the server's understanding of the audio with
- the client's playback.
- Truncating audio will delete the server-side text transcript to ensure there
- is not text in the context that hasn't been heard by the user.
- If successful, the server will respond with a `conversation.item.truncated`
- event.
- """
- await self._connection.send(
- cast(
- RealtimeClientEventParam,
- strip_not_given(
- {
- "type": "conversation.item.truncate",
- "audio_end_ms": audio_end_ms,
- "content_index": content_index,
- "item_id": item_id,
- "event_id": event_id,
- }
- ),
- )
- )
- async def retrieve(self, *, item_id: str, event_id: str | NotGiven = NOT_GIVEN) -> None:
- """
- Send this event when you want to retrieve the server's representation of a specific item in the conversation history. This is useful, for example, to inspect user audio after noise cancellation and VAD.
- The server will respond with a `conversation.item.retrieved` event,
- unless the item does not exist in the conversation history, in which case the
- server will respond with an error.
- """
- await self._connection.send(
- cast(
- RealtimeClientEventParam,
- strip_not_given({"type": "conversation.item.retrieve", "item_id": item_id, "event_id": event_id}),
- )
- )
- class AsyncRealtimeOutputAudioBufferResource(BaseAsyncRealtimeConnectionResource):
- async def clear(self, *, event_id: str | NotGiven = NOT_GIVEN) -> None:
- """**WebRTC Only:** Emit to cut off the current audio response.
- This will trigger the server to
- stop generating audio and emit a `output_audio_buffer.cleared` event. This
- event should be preceded by a `response.cancel` client event to stop the
- generation of the current response.
- [Learn more](https://platform.openai.com/docs/guides/realtime-conversations#client-and-server-events-for-audio-in-webrtc).
- """
- await self._connection.send(
- cast(RealtimeClientEventParam, strip_not_given({"type": "output_audio_buffer.clear", "event_id": event_id}))
- )
- class AsyncRealtimeTranscriptionSessionResource(BaseAsyncRealtimeConnectionResource):
- async def update(
- self, *, session: transcription_session_update_param.Session, event_id: str | NotGiven = NOT_GIVEN
- ) -> None:
- """Send this event to update a transcription session."""
- await self._connection.send(
- cast(
- RealtimeClientEventParam,
- strip_not_given({"type": "transcription_session.update", "session": session, "event_id": event_id}),
- )
- )
|