From 15c61181423b027e9ac2ca67a220b850c0f3cc33 Mon Sep 17 00:00:00 2001 From: Debanjum Date: Tue, 22 Jul 2025 18:27:28 -0500 Subject: [PATCH] Store event delimiter in chat event enum for reuse --- src/khoj/processor/conversation/utils.py | 1 + src/khoj/processor/operator/__init__.py | 2 +- src/khoj/routers/api_chat.py | 10 ++++------ src/khoj/routers/helpers.py | 5 ++--- src/khoj/routers/research.py | 2 +- 5 files changed, 9 insertions(+), 11 deletions(-) diff --git a/src/khoj/processor/conversation/utils.py b/src/khoj/processor/conversation/utils.py index 6edd84b2..3bd17856 100644 --- a/src/khoj/processor/conversation/utils.py +++ b/src/khoj/processor/conversation/utils.py @@ -385,6 +385,7 @@ class ChatEvent(Enum): USAGE = "usage" END_RESPONSE = "end_response" INTERRUPT = "interrupt" + END_EVENT = "␃🔚␗" def message_to_log( diff --git a/src/khoj/processor/operator/__init__.py b/src/khoj/processor/operator/__init__.py index d7068bff..979205a0 100644 --- a/src/khoj/processor/operator/__init__.py +++ b/src/khoj/processor/operator/__init__.py @@ -44,7 +44,7 @@ async def operate_environment( query_files: str = None, # TODO: Handle query files cancellation_event: Optional[asyncio.Event] = None, interrupt_queue: Optional[asyncio.Queue] = None, - abort_message: Optional[str] = "␃🔚␗", + abort_message: Optional[str] = ChatEvent.END_EVENT.value, tracer: dict = {}, ): response, user_input_message = None, None diff --git a/src/khoj/routers/api_chat.py b/src/khoj/routers/api_chat.py index 94aaebcf..f438ec61 100644 --- a/src/khoj/routers/api_chat.py +++ b/src/khoj/routers/api_chat.py @@ -704,7 +704,6 @@ async def event_generator( train_of_thought = [] cancellation_event = asyncio.Event() child_interrupt_queue: asyncio.Queue = asyncio.Queue(maxsize=10) - event_delimiter = "␃🔚␗" tracer: dict = { "mid": turn_id, @@ -791,7 +790,7 @@ async def event_generator( # Check if any interrupt query is received if interrupt_query := get_message_from_queue(parent_interrupt_queue): - if interrupt_query == event_delimiter: + if interrupt_query == ChatEvent.END_EVENT.value: cancellation_event.set() logger.debug(f"Chat cancelled by user {user} via interrupt queue.") else: @@ -872,7 +871,7 @@ async def event_generator( ) finally: if not cancellation_event.is_set(): - yield event_delimiter + yield ChatEvent.END_EVENT.value # Cancel the disconnect monitor task if it is still running if cancellation_event.is_set() or event_type == ChatEvent.END_RESPONSE: await cancel_disconnect_monitor() @@ -1044,7 +1043,7 @@ async def event_generator( tracer=tracer, cancellation_event=cancellation_event, interrupt_queue=child_interrupt_queue, - abort_message=event_delimiter, + abort_message=ChatEvent.END_EVENT.value, ): if isinstance(research_result, ResearchIteration): if research_result.summarizedResult: @@ -1510,8 +1509,7 @@ async def chat_ws( if data.get("type") == "interrupt": if current_task and not current_task.done(): # Send interrupt signal to the ongoing task - abort_message = "␃🔚␗" - await interrupt_queue.put(data.get("query") or abort_message) + await interrupt_queue.put(data.get("query") or ChatEvent.END_EVENT.value) logger.info( f"Interrupt signal sent to ongoing task for user {websocket.scope['user'].object.id} with query: {data.get('query')}" ) diff --git a/src/khoj/routers/helpers.py b/src/khoj/routers/helpers.py index 40b30c23..d3554f7f 100644 --- a/src/khoj/routers/helpers.py +++ b/src/khoj/routers/helpers.py @@ -2617,7 +2617,6 @@ class MessageProcessor: async def read_chat_stream(response_iterator: AsyncGenerator[str, None]) -> Dict[str, Any]: processor = MessageProcessor() - event_delimiter = "␃🔚␗" buffer = "" async for chunk in response_iterator: @@ -2625,9 +2624,9 @@ async def read_chat_stream(response_iterator: AsyncGenerator[str, None]) -> Dict buffer += chunk # Once the buffer contains a complete event - while event_delimiter in buffer: + while ChatEvent.END_EVENT.value in buffer: # Extract the event from the buffer - event, buffer = buffer.split(event_delimiter, 1) + event, buffer = buffer.split(ChatEvent.END_EVENT.value, 1) # Process the event if event: processor.process_message_chunk(event) diff --git a/src/khoj/routers/research.py b/src/khoj/routers/research.py index 9391e266..d54a147f 100644 --- a/src/khoj/routers/research.py +++ b/src/khoj/routers/research.py @@ -224,7 +224,7 @@ async def research( query_files: str = None, cancellation_event: Optional[asyncio.Event] = None, interrupt_queue: Optional[asyncio.Queue] = None, - abort_message: str = "␃🔚␗", + abort_message: str = ChatEvent.END_EVENT.value, ): max_document_searches = 7 max_online_searches = 3