An Azure service that integrates speech processing into apps and services.
Hello Datou !
Thank you for posting on MS Learn Q&A.
I think you had an issue posting the question on the platform so I will try to answer your questions here.
So for your 1st question, Azure Speech does use a WebSocket v2 endpoint internally but input text streaming for TTS is not exposed as a supported raw WebSocket protocol.
The feature is available through the Speech SDK and there is no official WebSocket protocol documentation for implementing it directly without the SDK.
If you want a non-SDK path for TTS, Azure does publish a REST TTS API, but that is a different interface and does not give you the same SDK-managed text-streaming behavior.
Azure has a newer Voice Live API in preview which is a documented WebSocket API for real time voice scenarios, including TTS and bidirectional communication but that is not the same thing as use the Speech SDK text streaming feature directly over the old Speech WebSocket protocol. It is a separate preview API surface.
For your Python 3.13 what I can say from current release notes is that the Speech SDK recently shipped Python fixes.
https://learn.microsoft.com/en-us/azure/ai-services/speech-service/releasenotes
So the most practical path is to upgrade azure-cognitiveservices-speech to the latest available SDK first because there were recent Python speech-synthesis leak fixes then test on Python 3.12 or 3.11.
I noticed that in your code, you create long lived native SDK objects and let GC eventually clean them up. With C-backed SDKs, that is exactly where hangs often appear.
In your code, I would change the lifecycle like this:
- create the synthesizer once.
- on shutdown, explicitly: close the input stream,
wait for tts_task.get() to finish or time out, disconnect callbacks if possible, cancel listen_completed_task, drop references (self.tts_request = None, self.tts_task = None, self.speech_synthesizer = None) before the request scope ends. - run the Speech SDK work in a dedicated worker thread or separate process not mixed into the main FastAPI event loop lifecycle.
The most important issue I see in your current code is that close() does not guarantee a full deterministic result of the native SDK objects. It calls stop_speaking_async but it does not clearly wait for all callbacks to drain before those objects become collectible and that makes the GC path much more likely to bite you.
I didn't a simple test and it is working.
Would love to see your feedback.
async def close(self):
self.listening = False
async with self._azure_input_stream_ops_lock:
if not self._azure_input_stream_closed and self.tts_request:
await run_in_threadpool(self.tts_request.input_stream.close)
self._azure_input_stream_closed = True
if self.tts_task:
try:
await asyncio.wait_for(run_in_threadpool(self.tts_task.get), timeout=10)
except asyncio.TimeoutError:
pass
except Exception:
logger.exception("tts_task.get failed")
# Stop synthesizer after stream/task completion
if self.speech_synthesizer:
try:
await run_in_threadpool(self.speech_synthesizer.stop_speaking_async)
except Exception:
logger.exception("stop_speaking_async failed")
if self.listen_completed_task:
self.listen_completed_task.cancel()
try:
await self.listen_completed_task
except asyncio.CancelledError:
pass
except Exception:
logger.exception("listen_completed_task failed")
self.tts_request = None
self.tts_task = None
self.speech_synthesizer = None