From dcc5073d166de2ff4054b746a5719eb7e408bee8 Mon Sep 17 00:00:00 2001 From: Debanjum Date: Fri, 20 Dec 2024 13:27:29 -0800 Subject: [PATCH 1/6] Fix the to_notify decider automation chat actor. Add detailed logging --- src/khoj/routers/helpers.py | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/src/khoj/routers/helpers.py b/src/khoj/routers/helpers.py index 4e458651..5b6825c4 100644 --- a/src/khoj/routers/helpers.py +++ b/src/khoj/routers/helpers.py @@ -1651,12 +1651,15 @@ def should_notify(original_query: str, executed_query: str, ai_response: str, us with timer("Chat actor: Decide to notify user of automation response", logger): try: # TODO Replace with async call so we don't have to maintain a sync version - response = send_message_to_model_wrapper_sync(to_notify_or_not, user) + response = send_message_to_model_wrapper_sync(to_notify_or_not, user=user) should_notify_result = "no" not in response.lower() logger.info(f'Decided to {"not " if not should_notify_result else ""}notify user of automation response.') return should_notify_result - except: - logger.warning(f"Fallback to notify user of automation response as failed to infer should notify or not.") + except Exception as e: + logger.warning( + f"Fallback to notify user of automation response as failed to infer should notify or not. {e}", + exc_info=True, + ) return True From 3600a9a4f30457f6d2e0b95cb83002173df0266f Mon Sep 17 00:00:00 2001 From: Debanjum Date: Fri, 20 Dec 2024 16:05:26 -0800 Subject: [PATCH 2/6] Ask reason before decision to improve to_notify decider automation AI Previously it just gave a decision. This was hard to debug during prompt tuning. Asking for reason before decision improves models decision quality. --- src/khoj/processor/conversation/prompts.py | 30 +++++++++++----------- src/khoj/routers/helpers.py | 6 ++--- 2 files changed, 18 insertions(+), 18 deletions(-) diff --git a/src/khoj/processor/conversation/prompts.py b/src/khoj/processor/conversation/prompts.py index 034209d4..0a021135 100644 --- a/src/khoj/processor/conversation/prompts.py +++ b/src/khoj/processor/conversation/prompts.py @@ -1101,33 +1101,33 @@ to_notify_or_not = PromptTemplate.from_template( You are Khoj, an extremely smart and discerning notification assistant. - Decide whether the user should be notified of the AI's response using the Original User Query, Executed User Query and AI Response triplet. - Notify the user only if the AI's response satisfies the user specified requirements. -- You should only respond with a "Yes" or "No". Do not say anything else. +- You should return a response with your reason and "Yes" or "No" decision in JSON format. Do not say anything else. # Examples: Original User Query: Hahah, nice! Show a new one every morning at 9am. My Current Location: Shanghai, China Executed User Query: Could you share a funny Calvin and Hobbes quote from my notes? AI Reponse: Here is one I found: "It's not denial. I'm just selective about the reality I accept." -Khoj: Yes +Khoj: {{ "reason": "The AI has shared a funny Calvin and Hobbes quote." , "decision": "Yes" }} Original User Query: Every evening check if it's going to rain tomorrow. Notify me only if I'll need an umbrella. My Current Location: Nairobi, Kenya Executed User Query: Is it going to rain tomorrow in Nairobi, Kenya AI Response: Tomorrow's forecast is sunny with a high of 28°C and a low of 18°C -Khoj: No +Khoj: {{ "reason": "It is not expected to rain tomorrow.", "decision": "No" }} -Original User Query: Tell me when version 2.0.0 is released. My Current Location: Mexico City, Mexico -Executed User Query: Check if version 2.0.0 of the Khoj python package is released -AI Response: The latest released Khoj python package version is 1.5.0. -Khoj: No - -Original User Query: Paint me a sunset every evening. My Current Location: Shanghai, China -Executed User Query: Paint me a sunset in Shanghai, China +Original User Query: Paint a sunset for me every evening. My Current Location: Shanghai, China +Executed User Query: Paint a sunset in Shanghai, China AI Response: https://khoj-generated-images.khoj.dev/user110/image78124.webp -Khoj: Yes +Khoj: {{ "reason": "The AI has created an image.", "decision": "Yes" }} -Original User Query: Share a summary of the tasks I've completed at the end of the day. My Current Location: Oslo, Norway -Executed User Query: Share a summary of the tasks I've completed today. -AI Response: I'm sorry, I couldn't find any relevant notes to respond to your message. -Khoj: No +Original User Query: Notify me when Khoj version 2.0.0 is released +Executed User Query: What is the latest released version of the Khoj python package +AI Response: The latest released Khoj python package version is 1.5.0. +Khoj: {{ "reason": "Version 2.0.0 of Khoj has not been released yet." , "decision": "No" }} + +Original User Query: Share a summary of the tasks I've completed at the end of the day. +Executed User Query: Generate a summary of the tasks I've completed today. +AI Response: You have completed the following tasks today: 1. Meeting with the team 2. Submit travel expense report +Khoj: {{ "reason": "The AI has provided a summary of completed tasks.", "decision": "Yes" }} Original User Query: {original_query} Executed User Query: {executed_query} diff --git a/src/khoj/routers/helpers.py b/src/khoj/routers/helpers.py index 5b6825c4..773afe25 100644 --- a/src/khoj/routers/helpers.py +++ b/src/khoj/routers/helpers.py @@ -553,7 +553,7 @@ async def generate_online_subqueries( async def schedule_query( q: str, conversation_history: dict, user: KhojUser, query_images: List[str] = None, tracer: dict = {} -) -> Tuple[str, ...]: +) -> Tuple[str, str, str]: """ Schedule the date, time to run the query. Assume the server timezone is UTC. """ @@ -1651,8 +1651,8 @@ def should_notify(original_query: str, executed_query: str, ai_response: str, us with timer("Chat actor: Decide to notify user of automation response", logger): try: # TODO Replace with async call so we don't have to maintain a sync version - response = send_message_to_model_wrapper_sync(to_notify_or_not, user=user) - should_notify_result = "no" not in response.lower() + response = send_message_to_model_wrapper_sync(to_notify_or_not, user=user, response_type="json_object") + should_notify_result = json.loads(response)["decision"] == "Yes" logger.info(f'Decided to {"not " if not should_notify_result else ""}notify user of automation response.') return should_notify_result except Exception as e: From 6d219dcc1d9c48e95fe88eed21f3fa626ddfcf56 Mon Sep 17 00:00:00 2001 From: Debanjum Date: Fri, 20 Dec 2024 16:11:25 -0800 Subject: [PATCH 3/6] Switch to let Khoj infer chat query based on user automation query This tries to decouple the automation query from the chat query. So the chat model doesn't have to know it is running in an automation context or figure how to notify user or send automation response. It just has to respond to the AI generated `query_to_run' corresponding to the `scheduling_request` automation by the user. For example, a `scheduling_request' of `notify me when X happens' results in the automation calling the chat api with a `query_to_run` like `tell me about X` and deciding if to notify based on information gathered about X from the scheduled run. If these two are not decoupled, the chat model may respond with how it can notify about X instead of just asking about it. Swap query_to_run with scheduling_request on the automation web page --- src/interface/web/app/automations/page.tsx | 58 +++++++++++----------- src/khoj/database/adapters/__init__.py | 13 +++++ src/khoj/processor/conversation/prompts.py | 4 +- src/khoj/routers/api.py | 48 ++++++++++-------- 4 files changed, 71 insertions(+), 52 deletions(-) diff --git a/src/interface/web/app/automations/page.tsx b/src/interface/web/app/automations/page.tsx index a4becd1b..39448ed2 100644 --- a/src/interface/web/app/automations/page.tsx +++ b/src/interface/web/app/automations/page.tsx @@ -167,68 +167,68 @@ const timestamp = Date.now(); const suggestedAutomationsMetadata: AutomationsData[] = [ { subject: "Weekly Newsletter", - query_to_run: + scheduling_request: "/research Compile a message including: 1. A recap of news from last week 2. An at-home workout I can do before work 3. A quote to inspire me for the week ahead", schedule: "9AM every Monday", next: "Next run at 9AM on Monday", crontime: "0 9 * * 1", id: timestamp, - scheduling_request: "", - }, - { - subject: "Daily Bedtime Story", - query_to_run: - "Compose a bedtime story that a five-year-old might enjoy. It should not exceed five paragraphs. Appeal to the imagination, but weave in learnings.", - schedule: "9PM every night", - next: "Next run at 9PM today", - crontime: "0 21 * * *", - id: timestamp + 1, - scheduling_request: "", + query_to_run: "", }, { subject: "Front Page of Hacker News", - query_to_run: + scheduling_request: "/research Summarize the top 5 posts from https://news.ycombinator.com/best and share them with me, including links", schedule: "9PM on every Wednesday", next: "Next run at 9PM on Wednesday", crontime: "0 21 * * 3", id: timestamp + 2, - scheduling_request: "", + query_to_run: "", }, { subject: "Market Summary", - query_to_run: + scheduling_request: "/research Get the market summary for today and share it with me. Focus on tech stocks and the S&P 500.", schedule: "9AM on every weekday", next: "Next run at 9AM on Monday", crontime: "0 9 * * *", id: timestamp + 3, - scheduling_request: "", + query_to_run: "", }, { subject: "Market Crash Notification", - query_to_run: "Notify me if the stock market fell by more than 5% today.", + scheduling_request: "Notify me if the stock market fell by more than 5% today.", schedule: "5PM every evening", next: "Next run at 5PM today", crontime: "0 17 * * *", id: timestamp + 5, - scheduling_request: "", + query_to_run: "", }, { subject: "Round-up of research papers about AI in healthcare", - query_to_run: + scheduling_request: "/research Summarize the top 3 research papers about AI in healthcare that were published in the last week. Include links to the full papers.", schedule: "9AM every Friday", next: "Next run at 9AM on Friday", crontime: "0 9 * * 5", id: timestamp + 4, - scheduling_request: "", + query_to_run: "", + }, + { + subject: "Daily Bedtime Story", + scheduling_request: + "Compose a bedtime story that a five-year-old might enjoy. It should not exceed five paragraphs. Appeal to the imagination, but weave in learnings.", + schedule: "9PM every night", + next: "Next run at 9PM today", + crontime: "0 21 * * *", + id: timestamp + 1, + query_to_run: "", }, ]; function createShareLink(automation: AutomationsData) { const encodedSubject = encodeURIComponent(automation.subject); - const encodedQuery = encodeURIComponent(automation.query_to_run); + const encodedQuery = encodeURIComponent(automation.scheduling_request); const encodedCrontime = encodeURIComponent(automation.crontime); const shareLink = `${window.location.origin}/automations?subject=${encodedSubject}&query=${encodedQuery}&crontime=${encodedCrontime}`; @@ -391,7 +391,7 @@ function AutomationsCard(props: AutomationsCardProps) { - {updatedAutomationData?.query_to_run || automation.query_to_run} + {updatedAutomationData?.scheduling_request || automation.scheduling_request}
@@ -451,8 +451,8 @@ function SharedAutomationCard(props: SharedAutomationCardProps) { const automation: AutomationsData = { id: 0, subject: decodeURIComponent(subject), - query_to_run: decodeURIComponent(query), - scheduling_request: "", + scheduling_request: decodeURIComponent(query), + query_to_run: "", schedule: cronToHumanReadableString(decodeURIComponent(crontime)), crontime: decodeURIComponent(crontime), next: "", @@ -480,7 +480,7 @@ const EditAutomationSchema = z.object({ dayOfWeek: z.optional(z.number()), dayOfMonth: z.optional(z.string()), timeRecurrence: z.string({ required_error: "Time Recurrence is required" }), - queryToRun: z.string({ required_error: "Query to Run is required" }), + schedulingRequest: z.string({ required_error: "Query to Run is required" }), }); interface EditCardProps { @@ -507,7 +507,7 @@ function EditCard(props: EditCardProps) { ? getTimeRecurrenceFromCron(automation.crontime) : "12:00 PM", dayOfMonth: automation?.crontime ? getDayOfMonthFromCron(automation.crontime) : "1", - queryToRun: automation?.query_to_run, + schedulingRequest: automation?.scheduling_request, }, }); @@ -520,7 +520,7 @@ function EditCard(props: EditCardProps) { ); let updateQueryUrl = `/api/automation?`; - updateQueryUrl += `q=${encodeURIComponent(values.queryToRun)}`; + updateQueryUrl += `q=${encodeURIComponent(values.schedulingRequest)}`; if (automation?.id && !props.createNew) { updateQueryUrl += `&automation_id=${encodeURIComponent(automation.id)}`; } @@ -829,7 +829,7 @@ function AutomationModificationForm(props: AutomationModificationFormProps) { )} ( Instructions @@ -850,7 +850,7 @@ function AutomationModificationForm(props: AutomationModificationFormProps) { {errors.subject && ( - {errors.queryToRun?.message} + {errors.schedulingRequest?.message} )} )} diff --git a/src/khoj/database/adapters/__init__.py b/src/khoj/database/adapters/__init__.py index c707a08f..9342b913 100644 --- a/src/khoj/database/adapters/__init__.py +++ b/src/khoj/database/adapters/__init__.py @@ -1783,6 +1783,19 @@ class AutomationAdapters: return automation + @staticmethod + async def aget_automation(user: KhojUser, automation_id: str) -> Job: + # Perform validation checks + # Check if user is allowed to delete this automation id + if not automation_id.startswith(f"automation_{user.uuid}_"): + raise ValueError("Invalid automation id") + # Check if automation with this id exist + automation: Job = await sync_to_async(state.scheduler.get_job)(job_id=automation_id) + if not automation: + raise ValueError("Invalid automation id") + + return automation + @staticmethod def delete_automation(user: KhojUser, automation_id: str): # Get valid, user-owned automation diff --git a/src/khoj/processor/conversation/prompts.py b/src/khoj/processor/conversation/prompts.py index 0a021135..357f406e 100644 --- a/src/khoj/processor/conversation/prompts.py +++ b/src/khoj/processor/conversation/prompts.py @@ -935,7 +935,7 @@ AI: Here is one I found: "It's not denial. I'm just selective about the reality User: Hahah, nice! Show a new one every morning. Khoj: {{ "crontime": "0 9 * * *", - "query": "/automated_task Share a funny Calvin and Hobbes or Bill Watterson quote from my notes", + "query": "Share a funny Calvin and Hobbes or Bill Watterson quote from my notes", "subject": "Your Calvin and Hobbes Quote for the Day" }} @@ -955,7 +955,7 @@ AI: The latest released Khoj python package version is 1.5.0. User: Notify me when version 2.0.0 is released Khoj: {{ "crontime": "0 10 * * *", - "query": "/automated_task What is the latest released version of the Khoj python package?", + "query": "/automated_task /research What is the latest released version of the Khoj python package?", "subject": "Khoj Python Package Version 2.0.0 Release" }} diff --git a/src/khoj/routers/api.py b/src/khoj/routers/api.py index 3f58ca1a..b936488f 100644 --- a/src/khoj/routers/api.py +++ b/src/khoj/routers/api.py @@ -47,6 +47,7 @@ from khoj.routers.helpers import ( acreate_title_from_query, get_user_config, schedule_automation, + schedule_query, update_telemetry_state, ) from khoj.search_filter.date_filter import DateFilter @@ -584,11 +585,15 @@ async def post_automation( if not cron_descriptor.get_description(crontime): return Response(content="Invalid crontime", status_code=400) + # Infer subject, query to run + _, query_to_run, generated_subject = await schedule_query(q, conversation_history={}, user=user) + subject = subject or generated_subject + # Normalize query parameters # Add /automated_task prefix to query if not present - q = q.strip() - if not q.startswith("/automated_task"): - query_to_run = f"/automated_task {q}" + query_to_run = query_to_run.strip() + if not query_to_run.startswith("/automated_task"): + query_to_run = f"/automated_task {query_to_run}" # Normalize crontime for AP Scheduler CronTrigger crontime = crontime.strip() @@ -603,23 +608,18 @@ async def post_automation( minute_value = crontime.split(" ")[0] if not minute_value.isdigit(): return Response( - content="Recurrence of every X minutes is unsupported. Please create a less frequent schedule.", + content="Minute level recurrence is unsupported. Please create a less frequent schedule.", status_code=400, ) - if not subject: - subject = await acreate_title_from_query(q, user) - - title = f"Automation: {subject}" - # Create new Conversation Session associated with this new task + title = f"Automation: {subject}" conversation = await ConversationAdapters.acreate_conversation_session(user, request.user.client_app, title=title) - calling_url = request.url.replace(query=f"{request.url.query}") - # Schedule automation with query_to_run, timezone, subject directly provided by user try: # Use the query to run as the scheduling request if the scheduling request is unset + calling_url = request.url.replace(query=f"{request.url.query}") automation = await schedule_automation( query_to_run, subject, crontime, timezone, q, user, calling_url, str(conversation.id) ) @@ -665,7 +665,7 @@ def trigger_manual_job( @api.put("/automation", response_class=Response) @requires(["authenticated"]) -def edit_job( +async def edit_job( request: Request, automation_id: str, q: Optional[str], @@ -686,16 +686,20 @@ def edit_job( # Check, get automation to edit try: - automation: Job = AutomationAdapters.get_automation(user, automation_id) + automation: Job = await AutomationAdapters.aget_automation(user, automation_id) except ValueError as e: logger.error(f"Error editing automation {automation_id} for {user.email}: {e}", exc_info=True) return Response(content="Invalid automation", status_code=403) + # Infer subject, query to run + _, query_to_run, _ = await schedule_query(q, conversation_history={}, user=user) + subject = subject + # Normalize query parameters # Add /automated_task prefix to query if not present - q = q.strip() - if not q.startswith("/automated_task"): - query_to_run = f"/automated_task {q}" + query_to_run = query_to_run.strip() + if not query_to_run.startswith("/automated_task"): + query_to_run = f"/automated_task {query_to_run}" # Normalize crontime for AP Scheduler CronTrigger crontime = crontime.strip() if len(crontime.split(" ")) > 5: @@ -724,13 +728,15 @@ def edit_job( title = f"Automation: {subject}" # Create new Conversation Session associated with this new task - conversation = ConversationAdapters.create_conversation_session(user, request.user.client_app, title=title) + conversation = await ConversationAdapters.acreate_conversation_session( + user, request.user.client_app, title=title + ) conversation_id = str(conversation.id) automation_metadata["conversation_id"] = conversation_id # Modify automation with updated query, subject - automation.modify( + await sync_to_async(automation.modify)( name=json.dumps(automation_metadata), kwargs={ "query_to_run": query_to_run, @@ -746,11 +752,11 @@ def edit_job( user_timezone = pytz.timezone(timezone) trigger = CronTrigger.from_crontab(crontime, user_timezone) if automation.trigger != trigger: - automation.reschedule(trigger=trigger) + await sync_to_async(automation.reschedule)(trigger=trigger) # Collate info about the updated user automation - automation = AutomationAdapters.get_automation(user, automation.id) - automation_info = AutomationAdapters.get_automation_metadata(user, automation) + automation = await AutomationAdapters.aget_automation(user, automation.id) + automation_info = await sync_to_async(AutomationAdapters.get_automation_metadata)(user, automation) # Return modified automation information as a JSON response return Response(content=json.dumps(automation_info), media_type="application/json", status_code=200) From 9674de400c06df2085f5c38349f046668da20f6b Mon Sep 17 00:00:00 2001 From: Debanjum Date: Sat, 21 Dec 2024 16:25:15 -0800 Subject: [PATCH 4/6] Format AI response to send in automation email Previously we sent the AI response directly. This change post processes the AI response with the awareness that it is to be sent to the user as an email to improve rendering and quality of the emails. --- src/khoj/processor/conversation/prompts.py | 20 +++++++++++++ src/khoj/routers/helpers.py | 35 ++++++++++++++++++---- 2 files changed, 50 insertions(+), 5 deletions(-) diff --git a/src/khoj/processor/conversation/prompts.py b/src/khoj/processor/conversation/prompts.py index 357f406e..77ac4214 100644 --- a/src/khoj/processor/conversation/prompts.py +++ b/src/khoj/processor/conversation/prompts.py @@ -1137,6 +1137,26 @@ Khoj: ) +automation_format_prompt = PromptTemplate.from_template( + """ +You are Khoj, a smart and creative researcher and writer with a knack for creating engaging content. +- You *CAN REMEMBER ALL NOTES and PERSONAL INFORMATION FOREVER* that the user ever shares with you. +- You *CAN* generate look-up real-time information from the internet, send notifications and answer questions based on the user's notes. + +Convert the AI response into a clear, structured markdown report with section headings to improve readability. +Your response will be sent in the body of an email to the user. +Do not add an email subject. Never add disclaimers in your final response. + +You are provided the following details for context. + +{username} +Original User Query: {original_query} +Executed Chat Request: {executed_query} +AI Response: {response} +Khoj: +""".strip() +) + # System messages to user # -- help_message = PromptTemplate.from_template( diff --git a/src/khoj/routers/helpers.py b/src/khoj/routers/helpers.py index 773afe25..6dcb1b4b 100644 --- a/src/khoj/routers/helpers.py +++ b/src/khoj/routers/helpers.py @@ -1634,6 +1634,25 @@ class CommonQueryParamsClass: CommonQueryParams = Annotated[CommonQueryParamsClass, Depends()] +def format_automation_response(scheduling_request: str, executed_query: str, ai_response: str, user: KhojUser) -> bool: + """ + Format the AI response to send in automation email to user. + """ + name = get_user_name(user) + if name: + username = prompts.user_name.format(name=name) + + automation_format_prompt = prompts.automation_format_prompt.format( + original_query=scheduling_request, + executed_query=executed_query, + response=ai_response, + username=username, + ) + + with timer("Chat actor: Format automation response", logger): + return send_message_to_model_wrapper_sync(automation_format_prompt, user=user) + + def should_notify(original_query: str, executed_query: str, ai_response: str, user: KhojUser) -> bool: """ Decide whether to notify the user of the AI response. @@ -1651,9 +1670,13 @@ def should_notify(original_query: str, executed_query: str, ai_response: str, us with timer("Chat actor: Decide to notify user of automation response", logger): try: # TODO Replace with async call so we don't have to maintain a sync version - response = send_message_to_model_wrapper_sync(to_notify_or_not, user=user, response_type="json_object") - should_notify_result = json.loads(response)["decision"] == "Yes" - logger.info(f'Decided to {"not " if not should_notify_result else ""}notify user of automation response.') + raw_response = send_message_to_model_wrapper_sync(to_notify_or_not, user=user, response_type="json_object") + response = json.loads(raw_response) + should_notify_result = response["decision"] == "Yes" + reason = response.get("reason", "unknown") + logger.info( + f'Decided to {"not " if not should_notify_result else ""}notify user of automation response because of reason: {reason}.' + ) return should_notify_result except Exception as e: logger.warning( @@ -1754,10 +1777,12 @@ def scheduled_chat( if should_notify( original_query=scheduling_request, executed_query=cleaned_query, ai_response=ai_response, user=user ): + formatted_response = format_automation_response(scheduling_request, cleaned_query, ai_response, user) + if is_resend_enabled(): - send_task_email(user.get_short_name(), user.email, cleaned_query, ai_response, subject, is_image) + send_task_email(user.get_short_name(), user.email, cleaned_query, formatted_response, subject, is_image) else: - return raw_response + return formatted_response async def create_automation( From c4bb92076ea4bd7b7fc32f5d7abc42b9462d4ef8 Mon Sep 17 00:00:00 2001 From: Debanjum Date: Thu, 26 Dec 2024 20:09:48 -0800 Subject: [PATCH 5/6] Convert async create automation api endpoints to sync --- src/khoj/database/adapters/__init__.py | 2 +- src/khoj/routers/api.py | 31 ++++---- src/khoj/routers/helpers.py | 97 ++++++++++++++++++++++++-- tests/test_online_chat_actors.py | 4 +- 4 files changed, 109 insertions(+), 25 deletions(-) diff --git a/src/khoj/database/adapters/__init__.py b/src/khoj/database/adapters/__init__.py index 9342b913..a5be3086 100644 --- a/src/khoj/database/adapters/__init__.py +++ b/src/khoj/database/adapters/__init__.py @@ -1741,7 +1741,7 @@ class AutomationAdapters: return { "id": automation.id, "subject": automation_metadata["subject"], - "query_to_run": re.sub(r"^/automated_task\s*", "", automation_metadata["query_to_run"]), + "query_to_run": automation_metadata["query_to_run"], "scheduling_request": automation_metadata["scheduling_request"], "schedule": schedule, "crontime": crontime, diff --git a/src/khoj/routers/api.py b/src/khoj/routers/api.py index b936488f..a29e993a 100644 --- a/src/khoj/routers/api.py +++ b/src/khoj/routers/api.py @@ -38,13 +38,12 @@ from khoj.processor.conversation.offline.chat_model import extract_questions_off from khoj.processor.conversation.offline.whisper import transcribe_audio_offline from khoj.processor.conversation.openai.gpt import extract_questions from khoj.processor.conversation.openai.whisper import transcribe_audio -from khoj.processor.conversation.utils import defilter_query +from khoj.processor.conversation.utils import clean_json, defilter_query from khoj.routers.helpers import ( ApiUserRateLimiter, ChatEvent, CommonQueryParams, ConversationCommandRateLimiter, - acreate_title_from_query, get_user_config, schedule_automation, schedule_query, @@ -567,7 +566,7 @@ def delete_automation(request: Request, automation_id: str) -> Response: @api.post("/automation", response_class=Response) @requires(["authenticated"]) -async def post_automation( +def post_automation( request: Request, q: str, crontime: str, @@ -586,7 +585,7 @@ async def post_automation( return Response(content="Invalid crontime", status_code=400) # Infer subject, query to run - _, query_to_run, generated_subject = await schedule_query(q, conversation_history={}, user=user) + _, query_to_run, generated_subject = schedule_query(q, conversation_history={}, user=user) subject = subject or generated_subject # Normalize query parameters @@ -614,13 +613,13 @@ async def post_automation( # Create new Conversation Session associated with this new task title = f"Automation: {subject}" - conversation = await ConversationAdapters.acreate_conversation_session(user, request.user.client_app, title=title) + conversation = ConversationAdapters.create_conversation_session(user, request.user.client_app, title=title) # Schedule automation with query_to_run, timezone, subject directly provided by user try: # Use the query to run as the scheduling request if the scheduling request is unset calling_url = request.url.replace(query=f"{request.url.query}") - automation = await schedule_automation( + automation = schedule_automation( query_to_run, subject, crontime, timezone, q, user, calling_url, str(conversation.id) ) except Exception as e: @@ -665,7 +664,7 @@ def trigger_manual_job( @api.put("/automation", response_class=Response) @requires(["authenticated"]) -async def edit_job( +def edit_job( request: Request, automation_id: str, q: Optional[str], @@ -686,13 +685,13 @@ async def edit_job( # Check, get automation to edit try: - automation: Job = await AutomationAdapters.aget_automation(user, automation_id) + automation: Job = AutomationAdapters.get_automation(user, automation_id) except ValueError as e: logger.error(f"Error editing automation {automation_id} for {user.email}: {e}", exc_info=True) return Response(content="Invalid automation", status_code=403) # Infer subject, query to run - _, query_to_run, _ = await schedule_query(q, conversation_history={}, user=user) + _, query_to_run, _ = schedule_query(q, conversation_history={}, user=user) subject = subject # Normalize query parameters @@ -717,7 +716,7 @@ async def edit_job( ) # Construct updated automation metadata - automation_metadata = json.loads(automation.name) + automation_metadata: dict[str, str] = json.loads(clean_json(automation.name)) automation_metadata["scheduling_request"] = q automation_metadata["query_to_run"] = query_to_run automation_metadata["subject"] = subject.strip() @@ -728,15 +727,13 @@ async def edit_job( title = f"Automation: {subject}" # Create new Conversation Session associated with this new task - conversation = await ConversationAdapters.acreate_conversation_session( - user, request.user.client_app, title=title - ) + conversation = ConversationAdapters.create_conversation_session(user, request.user.client_app, title=title) conversation_id = str(conversation.id) automation_metadata["conversation_id"] = conversation_id # Modify automation with updated query, subject - await sync_to_async(automation.modify)( + automation.modify( name=json.dumps(automation_metadata), kwargs={ "query_to_run": query_to_run, @@ -752,11 +749,11 @@ async def edit_job( user_timezone = pytz.timezone(timezone) trigger = CronTrigger.from_crontab(crontime, user_timezone) if automation.trigger != trigger: - await sync_to_async(automation.reschedule)(trigger=trigger) + automation.reschedule(trigger=trigger) # Collate info about the updated user automation - automation = await AutomationAdapters.aget_automation(user, automation.id) - automation_info = await sync_to_async(AutomationAdapters.get_automation_metadata)(user, automation) + automation = AutomationAdapters.get_automation(user, automation.id) + automation_info = AutomationAdapters.get_automation_metadata(user, automation) # Return modified automation information as a JSON response return Response(content=json.dumps(automation_info), media_type="application/json", status_code=200) diff --git a/src/khoj/routers/helpers.py b/src/khoj/routers/helpers.py index 6dcb1b4b..d6afddb3 100644 --- a/src/khoj/routers/helpers.py +++ b/src/khoj/routers/helpers.py @@ -551,7 +551,35 @@ async def generate_online_subqueries( return {q} -async def schedule_query( +def schedule_query( + q: str, conversation_history: dict, user: KhojUser, query_images: List[str] = None, tracer: dict = {} +) -> Tuple[str, str, str]: + """ + Schedule the date, time to run the query. Assume the server timezone is UTC. + """ + chat_history = construct_chat_history(conversation_history) + + crontime_prompt = prompts.crontime_prompt.format( + query=q, + chat_history=chat_history, + ) + + raw_response = send_message_to_model_wrapper_sync( + crontime_prompt, query_images=query_images, response_type="json_object", user=user, tracer=tracer + ) + + # Validate that the response is a non-empty, JSON-serializable list + try: + raw_response = raw_response.strip() + response: Dict[str, str] = json.loads(clean_json(raw_response)) + if not response or not isinstance(response, Dict) or len(response) != 3: + raise AssertionError(f"Invalid response for scheduling query : {response}") + return response.get("crontime"), response.get("query"), response.get("subject") + except Exception: + raise AssertionError(f"Invalid response for scheduling query: {raw_response}") + + +async def aschedule_query( q: str, conversation_history: dict, user: KhojUser, query_images: List[str] = None, tracer: dict = {} ) -> Tuple[str, str, str]: """ @@ -571,7 +599,7 @@ async def schedule_query( # Validate that the response is a non-empty, JSON-serializable list try: raw_response = raw_response.strip() - response: Dict[str, str] = json.loads(raw_response) + response: Dict[str, str] = json.loads(clean_json(raw_response)) if not response or not isinstance(response, Dict) or len(response) != 3: raise AssertionError(f"Invalid response for scheduling query : {response}") return response.get("crontime"), response.get("query"), response.get("subject") @@ -1065,6 +1093,7 @@ def send_message_to_model_wrapper_sync( system_message: str = "", response_type: str = "text", user: KhojUser = None, + query_images: List[str] = None, query_files: str = "", tracer: dict = {}, ): @@ -1090,6 +1119,7 @@ def send_message_to_model_wrapper_sync( max_prompt_size=max_tokens, vision_enabled=vision_available, model_type=chat_model.model_type, + query_images=query_images, query_files=query_files, ) @@ -1112,6 +1142,7 @@ def send_message_to_model_wrapper_sync( max_prompt_size=max_tokens, vision_enabled=vision_available, model_type=chat_model.model_type, + query_images=query_images, query_files=query_files, ) @@ -1134,6 +1165,7 @@ def send_message_to_model_wrapper_sync( max_prompt_size=max_tokens, vision_enabled=vision_available, model_type=chat_model.model_type, + query_images=query_images, query_files=query_files, ) @@ -1154,6 +1186,7 @@ def send_message_to_model_wrapper_sync( max_prompt_size=max_tokens, vision_enabled=vision_available, model_type=chat_model.model_type, + query_images=query_images, query_files=query_files, ) @@ -1794,12 +1827,66 @@ async def create_automation( conversation_id: str = None, tracer: dict = {}, ): - crontime, query_to_run, subject = await schedule_query(q, meta_log, user, tracer=tracer) - job = await schedule_automation(query_to_run, subject, crontime, timezone, q, user, calling_url, conversation_id) + crontime, query_to_run, subject = await aschedule_query(q, meta_log, user, tracer=tracer) + job = await aschedule_automation(query_to_run, subject, crontime, timezone, q, user, calling_url, conversation_id) return job, crontime, query_to_run, subject -async def schedule_automation( +def schedule_automation( + query_to_run: str, + subject: str, + crontime: str, + timezone: str, + scheduling_request: str, + user: KhojUser, + calling_url: URL, + conversation_id: str, +): + # Disable minute level automation recurrence + minute_value = crontime.split(" ")[0] + if not minute_value.isdigit(): + # Run automation at some random minute (to distribute request load) instead of running every X minutes + crontime = " ".join([str(math.floor(random() * 60))] + crontime.split(" ")[1:]) + + user_timezone = pytz.timezone(timezone) + trigger = CronTrigger.from_crontab(crontime, user_timezone) + trigger.jitter = 60 + # Generate id and metadata used by task scheduler and process locks for the task runs + job_metadata = json.dumps( + { + "query_to_run": query_to_run, + "scheduling_request": scheduling_request, + "subject": subject, + "crontime": crontime, + "conversation_id": str(conversation_id), + } + ) + query_id = hashlib.md5(f"{query_to_run}_{crontime}".encode("utf-8")).hexdigest() + job_id = f"automation_{user.uuid}_{query_id}" + job = state.scheduler.add_job( + run_with_process_lock, + trigger=trigger, + args=( + scheduled_chat, + f"{ProcessLock.Operation.SCHEDULED_JOB}_{user.uuid}_{query_id}", + ), + kwargs={ + "query_to_run": query_to_run, + "scheduling_request": scheduling_request, + "subject": subject, + "user": user, + "calling_url": calling_url, + "job_id": job_id, + "conversation_id": conversation_id, + }, + id=job_id, + name=job_metadata, + max_instances=2, # Allow second instance to kill any previous instance with stale lock + ) + return job + + +async def aschedule_automation( query_to_run: str, subject: str, crontime: str, diff --git a/tests/test_online_chat_actors.py b/tests/test_online_chat_actors.py index f873be45..2b08fc74 100644 --- a/tests/test_online_chat_actors.py +++ b/tests/test_online_chat_actors.py @@ -636,11 +636,11 @@ async def test_infer_webpage_urls_actor_extracts_correct_links(chat_client, defa ), ], ) -async def test_infer_task_scheduling_request( +def test_infer_task_scheduling_request( chat_client, user_query, expected_crontime, expected_qs, unexpected_qs, default_user2 ): # Act - crontime, inferred_query, _ = await schedule_query(user_query, {}, default_user2) + crontime, inferred_query, _ = schedule_query(user_query, {}, default_user2) inferred_query = inferred_query.lower() # Assert From 90685ccbb0a14738c4e95ecfadf8783eba17b19e Mon Sep 17 00:00:00 2001 From: Debanjum Date: Thu, 26 Dec 2024 20:54:08 -0800 Subject: [PATCH 6/6] Show error message if update automation via web app fails --- src/interface/web/app/automations/page.tsx | 57 ++++++++++++++++------ 1 file changed, 42 insertions(+), 15 deletions(-) diff --git a/src/interface/web/app/automations/page.tsx b/src/interface/web/app/automations/page.tsx index 39448ed2..0241db6d 100644 --- a/src/interface/web/app/automations/page.tsx +++ b/src/interface/web/app/automations/page.tsx @@ -269,6 +269,7 @@ interface AutomationsCardProps { isLoggedIn: boolean; setShowLoginPrompt: (showLoginPrompt: boolean) => void; authenticatedData: UserProfile | null; + setToastMessage: (toastMessage: string) => void; } function AutomationsCard(props: AutomationsCardProps) { @@ -277,8 +278,6 @@ function AutomationsCard(props: AutomationsCardProps) { null, ); const [isDeleted, setIsDeleted] = useState(false); - const [toastMessage, setToastMessage] = useState(""); - const { toast } = useToast(); const automation = props.automation; @@ -306,18 +305,6 @@ function AutomationsCard(props: AutomationsCardProps) { } }, [updatedAutomationData, automation]); - useEffect(() => { - const toastTitle = `Automation: ${updatedAutomationData?.subject || automation.subject}`; - if (toastMessage) { - toast({ - title: toastTitle, - description: toastMessage, - action: Ok, - }); - setToastMessage(""); - } - }, [toastMessage, updatedAutomationData, automation, toast]); - if (isDeleted) { return null; } @@ -346,6 +333,7 @@ function AutomationsCard(props: AutomationsCardProps) { isCreating={isEditing} automation={updatedAutomationData || automation} ipLocationData={props.locationData} + setToastMessage={props.setToastMessage} /> )} { - sendAPreview(automation.id.toString(), setToastMessage); + sendAPreview( + automation.id.toString(), + props.setToastMessage, + ); }} > @@ -420,6 +411,7 @@ function AutomationsCard(props: AutomationsCardProps) { isCreating={isEditing} automation={automation} ipLocationData={props.locationData} + setToastMessage={props.setToastMessage} /> )} @@ -434,6 +426,7 @@ interface SharedAutomationCardProps { setShowLoginPrompt: (showLoginPrompt: boolean) => void; authenticatedData: UserProfile | null; isMobileWidth: boolean; + setToastMessage: (toastMessage: string) => void; } function SharedAutomationCard(props: SharedAutomationCardProps) { @@ -470,6 +463,7 @@ function SharedAutomationCard(props: SharedAutomationCardProps) { isCreating={isCreating} automation={automation} ipLocationData={props.locationData} + setToastMessage={props.setToastMessage} /> ) : null; } @@ -492,6 +486,7 @@ interface EditCardProps { isLoggedIn: boolean; setShowLoginPrompt: (showLoginPrompt: boolean) => void; authenticatedData: UserProfile | null; + setToastMessage: (toastMessage: string) => void; } function EditCard(props: EditCardProps) { @@ -552,6 +547,15 @@ function EditCard(props: EditCardProps) { crontime: data.crontime, next: data.next, }); + }) + .catch((error) => { + console.error("Error saving automation:", error); + // Reset saving state + props.setIsEditing(false); + // Show error message + props.setToastMessage( + "Sorry, something went wrong. Try again or contact team@khoj.dev.", + ); }); }; @@ -919,6 +923,7 @@ interface AutomationComponentWrapperProps { isCreating: boolean; ipLocationData: LocationData | null | undefined; automation?: AutomationsData; + setToastMessage: (toastMessage: string) => void; } function AutomationComponentWrapper(props: AutomationComponentWrapperProps) { @@ -946,6 +951,7 @@ function AutomationComponentWrapper(props: AutomationComponentWrapperProps) { setShowLoginPrompt={props.setShowLoginPrompt} setUpdatedAutomationData={props.setNewAutomationData} locationData={props.ipLocationData} + setToastMessage={props.setToastMessage} /> @@ -973,6 +979,7 @@ function AutomationComponentWrapper(props: AutomationComponentWrapperProps) { setShowLoginPrompt={props.setShowLoginPrompt} setUpdatedAutomationData={props.setNewAutomationData} locationData={props.ipLocationData} + setToastMessage={props.setToastMessage} /> @@ -1000,6 +1007,8 @@ export default function Automations() { const [showLoginPrompt, setShowLoginPrompt] = useState(false); const isMobileWidth = useIsMobileWidth(); const { locationData, locationDataError, locationDataLoading } = useIPLocationData(); + const [toastMessage, setToastMessage] = useState(""); + const { toast } = useToast(); useEffect(() => { if (newAutomationData) { @@ -1026,6 +1035,19 @@ export default function Automations() { } }, [personalAutomations, allNewAutomations]); + useEffect(() => { + const toastTitle = `Automation`; + if (toastMessage) { + toast({ + title: toastTitle, + description: toastMessage, + action: Ok, + variant: toastMessage.includes("Sorry") ? "destructive" : "default", + }); + setToastMessage(""); + } + }, [toastMessage]); + if (error) return ; @@ -1100,6 +1122,7 @@ export default function Automations() { authenticatedData={authenticatedData} isCreating={isCreating} ipLocationData={locationData} + setToastMessage={setToastMessage} /> ) : (
@@ -1163,6 +1189,7 @@ export default function Automations() { isLoggedIn={authenticatedData ? true : false} setShowLoginPrompt={setShowLoginPrompt} suggestedCard={true} + setToastMessage={setToastMessage} /> ))}