diff --git a/README.md b/README.md index 91440ea2..a3da688b 100644 --- a/README.md +++ b/README.md @@ -617,6 +617,18 @@ Enhance the LLM’s accuracy and domain adaptation by integrating historical dat 4. Validate improvements: Test the updated model against sample scenarios and measure key performance indicators (e.g. user satisfaction, call duration, resolution rate) to confirm that adjustments have led to meaningful enhancements. 5. Monitor, iterate, and A/B test: Regularly reassess the model’s performance, integrate newly gathered data, and apply further fine-tuning as needed. Leverage [built-in feature configurations to A/B test (App Configuration Experimentation)](https://learn.microsoft.com/en-us/azure/azure-app-configuration/concept-experimentation) different versions of the model, ensuring responsible, data-driven decisions and continuous optimization over time. +### Monitoring the application + +Application send traces and metrics to Azure Application Insights. You can monitor the application from the Azure portal, or by using the API. + +This includes application behavior, database queries, and external service calls. Plus, LLM metrics (latency, token usage, prompts content, raw response) from [OpenLLMetry](https://github.com/traceloop/openllmetry), following the [semantic sonventions for OpenAI operations](https://opentelemetry.io/docs/specs/semconv/gen-ai/openai/#openai-spans). + +Additionally custom metrics (viewable in Application Insights > Metrics) are published, notably: + +- `call.aec.droped`, number of times the echo cancellation dropped the voice completely. +- `call.aec.missed`, number of times the echo cancellation failed to remove the echo in time. +- `call.answer.latency`, time between the end of the user voice and the start of the bot voice. + ## Q&A ### What will this cost? diff --git a/app/helpers/call_events.py b/app/helpers/call_events.py index 4b50ebb7..d082c207 100644 --- a/app/helpers/call_events.py +++ b/app/helpers/call_events.py @@ -35,7 +35,7 @@ from app.helpers.features import recognition_retry_max, recording_enabled from app.helpers.llm_worker import completion_sync from app.helpers.logging import logger -from app.helpers.monitoring import SpanAttributes, span_attribute, tracer +from app.helpers.monitoring import SpanAttributeEnum, tracer from app.models.call import CallStateModel from app.models.message import ( ActionEnum as MessageActionEnum, @@ -229,7 +229,7 @@ async def on_automation_recognize_error( logger.warning("Unknown context %s, no action taken", contexts) # Enrich span - span_attribute(SpanAttributes.CALL_CHANNEL, "ivr") + SpanAttributeEnum.CALL_CHANNEL.attribute("ivr") # Retry IVR recognition logger.info( @@ -348,7 +348,7 @@ async def on_play_started( logger.debug("Play started") # Enrich span - span_attribute(SpanAttributes.CALL_CHANNEL, "voice") + SpanAttributeEnum.CALL_CHANNEL.attribute("voice") # Update last interaction async with _db.call_transac( @@ -374,7 +374,7 @@ async def on_automation_play_completed( logger.debug("Play completed") # Enrich span - span_attribute(SpanAttributes.CALL_CHANNEL, "voice") + SpanAttributeEnum.CALL_CHANNEL.attribute("voice") # Update last interaction async with _db.call_transac( @@ -414,7 +414,7 @@ async def on_play_error(error_code: int) -> None: logger.debug("Play failed") # Enrich span - span_attribute(SpanAttributes.CALL_CHANNEL, "voice") + SpanAttributeEnum.CALL_CHANNEL.attribute("voice") # Suppress known errors # See: https://github.com/MicrosoftDocs/azure-docs/blob/main/articles/communication-services/how-tos/call-automation/play-action.md @@ -452,8 +452,8 @@ async def on_ivr_recognized( logger.info("IVR recognized: %s", label) # Enrich span - span_attribute(SpanAttributes.CALL_CHANNEL, "ivr") - span_attribute(SpanAttributes.CALL_MESSAGE, label) + SpanAttributeEnum.CALL_CHANNEL.attribute("ivr") + SpanAttributeEnum.CALL_MESSAGE.attribute(label) # Parse language from label try: @@ -517,8 +517,8 @@ async def on_sms_received( logger.info("SMS received from %s: %s", call.initiate.phone_number, message) # Enrich span - span_attribute(SpanAttributes.CALL_CHANNEL, "sms") - span_attribute(SpanAttributes.CALL_MESSAGE, message) + SpanAttributeEnum.CALL_CHANNEL.attribute("sms") + SpanAttributeEnum.CALL_MESSAGE.attribute(message) # Add the SMS to the call history async with _db.call_transac( diff --git a/app/helpers/call_llm.py b/app/helpers/call_llm.py index 26974c67..ba41c7f2 100644 --- a/app/helpers/call_llm.py +++ b/app/helpers/call_llm.py @@ -1,4 +1,5 @@ import asyncio +import time from collections.abc import Awaitable, Callable from datetime import UTC, datetime, timedelta from functools import wraps @@ -12,7 +13,7 @@ from openai import APIError from app.helpers.call_utils import ( - EchoCancellationStream, + AECStream, handle_media, handle_realtime_tts, tts_sentence_split, @@ -34,7 +35,12 @@ completion_stream, ) from app.helpers.logging import logger -from app.helpers.monitoring import SpanAttributes, span_attribute, tracer +from app.helpers.monitoring import ( + SpanAttributeEnum, + call_answer_latency, + gauge_set, + tracer, +) from app.models.call import CallStateModel from app.models.message import ( ActionEnum as MessageAction, @@ -63,11 +69,12 @@ async def load_llm_chat( # noqa: PLR0913, PLR0915 # Init language recognition stt_buffer: list[str] = [] # Temporary buffer for recognition stt_complete_gate = asyncio.Event() # Gate to wait for the recognition - aec = EchoCancellationStream( + aec = AECStream( sample_rate=audio_sample_rate, scheduler=scheduler, ) audio_reference: asyncio.Queue[bytes] = asyncio.Queue() + answer_start: float | None = None async def _send_in_to_aec() -> None: """ @@ -83,8 +90,21 @@ async def _send_out_to_aec() -> None: Forward the TTS to the echo cancellation and output. """ while True: + # Consume the audio out_chunck = await audio_reference.get() audio_reference.task_done() + + # Report the answer latency and reset the timer + nonlocal answer_start + if answer_start: + # Enrich span + gauge_set( + metric=call_answer_latency, + value=time.monotonic() - answer_start, + ) + answer_start = None + + # Forward the audio await asyncio.gather( # First, send the audio to the output audio_out.put(out_chunck), @@ -209,6 +229,9 @@ async def _response_callback(_retry: bool = False) -> None: If the recognition is empty, retry the recognition once. Otherwise, process the response. """ + nonlocal answer_start + answer_start = time.monotonic() + # Wait the complete recognition for 50ms maximum try: await asyncio.wait_for(stt_complete_gate.wait(), timeout=0.05) @@ -226,7 +249,7 @@ async def _response_callback(_retry: bool = False) -> None: await asyncio.sleep(0.2) return await _response_callback(_retry=True) - # Stop any previous response + # Stop any previous response, but keep the metrics await _stop_callback() # Add it to the call history and update last interaction @@ -302,8 +325,8 @@ async def _continue_chat( # noqa: PLR0915, PLR0913 Returns the updated call model. """ # Add span attributes - span_attribute(SpanAttributes.CALL_CHANNEL, "voice") - span_attribute(SpanAttributes.CALL_MESSAGE, call.messages[-1].content) + SpanAttributeEnum.CALL_CHANNEL.attribute("voice") + SpanAttributeEnum.CALL_MESSAGE.attribute(call.messages[-1].content) # Reset recognition retry counter async with _db.call_transac( @@ -665,7 +688,7 @@ async def _content_callback(buffer: str) -> None: # TODO: Refacto and simplify async def _process_audio_for_vad( # noqa: PLR0913 call: CallStateModel, - echo_cancellation: EchoCancellationStream, + echo_cancellation: AECStream, out_stream: PushAudioInputStream, response_callback: Callable[[], Awaitable[None]], scheduler: Scheduler, diff --git a/app/helpers/call_utils.py b/app/helpers/call_utils.py index 3587522e..9eb60385 100644 --- a/app/helpers/call_utils.py +++ b/app/helpers/call_utils.py @@ -43,6 +43,7 @@ from app.helpers.features import vad_threshold from app.helpers.identity import token from app.helpers.logging import logger +from app.helpers.monitoring import call_aec_droped, call_aec_missed, counter_add from app.models.call import CallStateModel from app.models.message import ( MessageModel, @@ -622,7 +623,7 @@ async def use_stt_client( client.stop_continuous_recognition_async() -class EchoCancellationStream: +class AECStream: """ Real-time audio stream with echo cancellation. @@ -773,7 +774,11 @@ async def _ensure_stream(self, input_pcm: bytes) -> None: # If the processing is delayed, return the original input except TimeoutError: - logger.warning("Echo processing timeout, returning input") + # Enrich span + counter_add( + metric=call_aec_missed, + value=1, + ) await self._output_queue.put((input_pcm, False)) async def process_stream(self) -> None: @@ -820,6 +825,7 @@ async def pull_audio(self) -> tuple[bytes, bool]: Returns a tuple with the echo-cancelled PCM audio and a boolean flag indicating if the user was speaking. """ + # Fetch output audio try: return await asyncio.wait_for( fut=self._output_queue.get(), @@ -827,5 +833,13 @@ async def pull_audio(self) -> tuple[bytes, bool]: / 1000 * 1.5, # Allow temporary small latency ) + + # If the processing is delayed, return an empty packet except TimeoutError: + # Enrich span + counter_add( + metric=call_aec_droped, + value=1, + ) + # Return empty packet return self._empty_packet, False diff --git a/app/helpers/llm_utils.py b/app/helpers/llm_utils.py index 9c5ba805..2a215513 100644 --- a/app/helpers/llm_utils.py +++ b/app/helpers/llm_utils.py @@ -28,7 +28,7 @@ from app.helpers.cache import async_lru_cache from app.helpers.logging import logger -from app.helpers.monitoring import SpanAttributes, span_attribute, tracer +from app.helpers.monitoring import SpanAttributeEnum, tracer from app.models.call import CallStateModel from app.models.message import ToolModel @@ -105,7 +105,7 @@ async def execute( # Update tool tool.content = res # Enrich span - span_attribute(SpanAttributes.TOOL_RESULT, tool.content) + SpanAttributeEnum.TOOL_RESULT.attribute(tool.content) return # Try to fix JSON args to catch LLM hallucinations @@ -128,12 +128,12 @@ async def execute( f"Bad arguments, available are {functions}. Please try again." ) # Enrich span - span_attribute(SpanAttributes.TOOL_RESULT, tool.content) + SpanAttributeEnum.TOOL_RESULT.attribute(tool.content) return # Enrich span - span_attribute(SpanAttributes.TOOL_ARGS, json.dumps(args)) - span_attribute(SpanAttributes.TOOL_NAME, name) + SpanAttributeEnum.TOOL_ARGS.attribute(json.dumps(args)) + SpanAttributeEnum.TOOL_NAME.attribute(name) # Execute the function try: @@ -160,7 +160,7 @@ async def execute( # Update tool tool.content = res # Enrich span - span_attribute(SpanAttributes.TOOL_RESULT, tool.content) + SpanAttributeEnum.TOOL_RESULT.attribute(tool.content) @cache def _available_functions( diff --git a/app/helpers/monitoring.py b/app/helpers/monitoring.py index edf9a12e..1b52e312 100644 --- a/app/helpers/monitoring.py +++ b/app/helpers/monitoring.py @@ -1,50 +1,20 @@ +from enum import Enum from os import environ from azure.monitor.opentelemetry import configure_azure_monitor -from opentelemetry import trace +from opentelemetry import metrics, trace from opentelemetry.instrumentation.aiohttp_client import AioHttpClientInstrumentor from opentelemetry.instrumentation.httpx import HTTPXClientInstrumentor +from opentelemetry.metrics._internal.instrument import Counter, Gauge from opentelemetry.trace.span import INVALID_SPAN from opentelemetry.util.types import AttributeValue -from structlog.contextvars import bind_contextvars +from structlog.contextvars import bind_contextvars, get_contextvars +MODULE_NAME = "com.github.clemlesne.call-center-ai" VERSION = environ.get("VERSION", "0.0.0-unknown") -try: - configure_azure_monitor() # Configure Azure Application Insights exporter - AioHttpClientInstrumentor().instrument() # Instrument aiohttp - HTTPXClientInstrumentor().instrument() # Instrument httpx -except ValueError as e: - print( # noqa: T201 - "Azure Application Insights instrumentation failed, likely due to a missing APPLICATIONINSIGHTS_CONNECTION_STRING environment variable.", - e, - ) - -tracer = trace.get_tracer( - instrumenting_library_version=VERSION, - instrumenting_module_name="com.github.clemlesne.call-center-ai", -) # Create a tracer that will be used in the app - -def span_attribute(key: str, value: AttributeValue) -> None: - """ - Set an attribute on the current span. - - Prefer using attributes from `opentelemetry.semconv.attributes` when possible. - - Returns None. - """ - # Enrich logging - bind_contextvars(**{key: value}) - - # Enrich span - span = trace.get_current_span() - if span == INVALID_SPAN: - return - span.set_attribute(key, value) - - -class SpanAttributes: +class SpanAttributeEnum(str, Enum): """ OpenTelemetry attributes. @@ -65,3 +35,106 @@ class SpanAttributes: """Tool name being used.""" TOOL_RESULT = "tool.result" """Tool result.""" + + def attribute( + self, + value: AttributeValue, + ) -> None: + """ + Set an attribute on the current span. + """ + # Enrich logging + bind_contextvars(**{self.value: value}) + + # Enrich span + span = trace.get_current_span() + if span == INVALID_SPAN: + return + span.set_attribute(self.value, value) + + +class SpanCounterEnum(str, Enum): + CALL_ANSWER_LATENCY = "call.answer.latency" + """Answer latency in seconds.""" + CALL_AEC_MISSED = "call.aec.missed" + """Echo cancellation missed frames.""" + CALL_AEC_DROPED = "call.aec.droped" + """Echo cancellation dropped frames.""" + + def counter( + self, + unit: str, + ) -> Counter: + """ + Create a counter metric to track a span counter. + """ + return meter.create_counter( + description=self.__doc__ or "", + name=self.value, + unit=unit, + ) + + def gauge( + self, + unit: str, + ) -> Gauge: + """ + Create a gauge metric to track a span counter. + """ + return meter.create_gauge( + description=self.__doc__ or "", + name=self.value, + unit=unit, + ) + + +try: + configure_azure_monitor() # Configure Azure Application Insights exporter + AioHttpClientInstrumentor().instrument() # Instrument aiohttp + HTTPXClientInstrumentor().instrument() # Instrument httpx +except ValueError as e: + print( # noqa: T201 + "Azure Application Insights instrumentation failed, likely due to a missing APPLICATIONINSIGHTS_CONNECTION_STRING environment variable.", + e, + ) + +# Create a tracer and meter that will be used across the application +tracer = trace.get_tracer( + instrumenting_library_version=VERSION, + instrumenting_module_name=MODULE_NAME, +) +meter = metrics.get_meter( + name=MODULE_NAME, + version=VERSION, +) + +# Init metrics +call_answer_latency = SpanCounterEnum.CALL_ANSWER_LATENCY.gauge("s") +call_aec_droped = SpanCounterEnum.CALL_AEC_DROPED.counter("frames") +call_aec_missed = SpanCounterEnum.CALL_AEC_MISSED.counter("frames") + + +def gauge_set( + metric: Gauge, + value: float | int, +): + """ + Set a gauge metric value with context attributes. + """ + metric.set( + amount=value, + attributes=get_contextvars(), + ) + + +def counter_add( + metric: Counter, + value: float | int, +): + """ + Add a counter metric value with context attributes. + """ + metric.add( + amount=value, + attributes=get_contextvars(), + ) diff --git a/app/main.py b/app/main.py index 9d7ac5e4..d47ef296 100644 --- a/app/main.py +++ b/app/main.py @@ -59,7 +59,7 @@ from app.helpers.config import CONFIG from app.helpers.http import aiohttp_session, azure_transport from app.helpers.logging import logger -from app.helpers.monitoring import SpanAttributes, span_attribute, tracer +from app.helpers.monitoring import SpanAttributeEnum, tracer from app.helpers.pydantic_types.phone_numbers import PhoneNumber from app.helpers.resources import resources_dir from app.models.call import CallGetModel, CallInitiateModel, CallStateModel @@ -401,8 +401,8 @@ async def call_post(request: Request) -> CallGetModel: ) # Enrich span - span_attribute(SpanAttributes.CALL_ID, str(call.call_id)) - span_attribute(SpanAttributes.CALL_PHONE_NUMBER, call.initiate.phone_number) + SpanAttributeEnum.CALL_ID.attribute(str(call.call_id)) + SpanAttributeEnum.CALL_PHONE_NUMBER.attribute(call.initiate.phone_number) # Init SDK automation_client = await _use_automation_client() @@ -456,8 +456,8 @@ async def call_event( callback_url, wss_url, _call = await _communicationservices_urls(phone_number) # Enrich span - span_attribute(SpanAttributes.CALL_ID, str(_call.call_id)) - span_attribute(SpanAttributes.CALL_PHONE_NUMBER, _call.initiate.phone_number) + SpanAttributeEnum.CALL_ID.attribute(str(_call.call_id)) + SpanAttributeEnum.CALL_PHONE_NUMBER.attribute(_call.initiate.phone_number) # Execute business logic await on_new_call( @@ -494,7 +494,7 @@ async def sms_event( phone_number: str = event.data["from"] # Enrich span - span_attribute(SpanAttributes.CALL_PHONE_NUMBER, phone_number) + SpanAttributeEnum.CALL_PHONE_NUMBER.attribute(phone_number) async with get_scheduler() as scheduler: # Get call @@ -508,7 +508,7 @@ async def sms_event( return # Enrich span - span_attribute(SpanAttributes.CALL_ID, str(call.call_id)) + SpanAttributeEnum.CALL_ID.attribute(str(call.call_id)) async with get_scheduler() as scheduler: # Execute business logic @@ -556,7 +556,7 @@ async def _communicationservices_validate_call_id( secret: str, ) -> CallStateModel: # Enrich span - span_attribute(SpanAttributes.CALL_ID, str(call_id)) + SpanAttributeEnum.CALL_ID.attribute(str(call_id)) async with get_scheduler() as scheduler: # Validate call @@ -578,7 +578,7 @@ async def _communicationservices_validate_call_id( ) # Enrich span - span_attribute(SpanAttributes.CALL_PHONE_NUMBER, call.initiate.phone_number) + SpanAttributeEnum.CALL_PHONE_NUMBER.attribute(call.initiate.phone_number) return call @@ -869,8 +869,8 @@ async def training_event( call = CallStateModel.model_validate_json(training.content) # Enrich span - span_attribute(SpanAttributes.CALL_ID, str(call.call_id)) - span_attribute(SpanAttributes.CALL_PHONE_NUMBER, call.initiate.phone_number) + SpanAttributeEnum.CALL_ID.attribute(str(call.call_id)) + SpanAttributeEnum.CALL_PHONE_NUMBER.attribute(call.initiate.phone_number) logger.debug("Training event received") @@ -898,8 +898,8 @@ async def post_event( return # Enrich span - span_attribute(SpanAttributes.CALL_ID, str(call.call_id)) - span_attribute(SpanAttributes.CALL_PHONE_NUMBER, call.initiate.phone_number) + SpanAttributeEnum.CALL_ID.attribute(str(call.call_id)) + SpanAttributeEnum.CALL_PHONE_NUMBER.attribute(call.initiate.phone_number) # Execute business logic logger.debug("Post event received") @@ -984,7 +984,7 @@ async def twilio_sms_post( Returns a 200 OK if the SMS is properly formatted. Otherwise, returns a 400 Bad Request. """ # Enrich span - span_attribute(SpanAttributes.CALL_PHONE_NUMBER, From) + SpanAttributeEnum.CALL_PHONE_NUMBER.attribute(From) async with get_scheduler() as scheduler: # Get call @@ -1001,7 +1001,7 @@ async def twilio_sms_post( # Call found else: # Enrich span - span_attribute(SpanAttributes.CALL_ID, str(call.call_id)) + SpanAttributeEnum.CALL_ID.attribute(str(call.call_id)) # Execute business logic event_status = await on_sms_received( diff --git a/app/models/call.py b/app/models/call.py index 6d02e07d..182c6bfe 100644 --- a/app/models/call.py +++ b/app/models/call.py @@ -11,7 +11,6 @@ LanguageEntryModel, WorkflowInitiateModel, ) -from app.helpers.monitoring import tracer from app.helpers.pydantic_types.phone_numbers import PhoneNumber from app.models.message import ( ActionEnum as MessageActionEnum, @@ -132,6 +131,7 @@ async def trainings(self, cache_only: bool = True) -> list[TrainingModel]: Is using query expansion from last messages. Then, data is sorted by score. """ from app.helpers.config import CONFIG + from app.helpers.monitoring import tracer with tracer.start_as_current_span("call_trainings"): search = CONFIG.ai_search.instance()