Add grep files tool to enable researcher to find documents by regex

Earlier khoj could technically only answer questions existential
questions, i.e question that would terminate once any relevant note to
answer that question was found.

This change enables khoj to answer universal questions, i.e questions
that require searching through all notes or finding all instances.

It enables more thorough retrieval from user's knowledge base by
combining semantic search, regex search, view and list files tools.

For more development details including motivation, see live coding
session 1.1 at https://www.youtube.com/live/-2s_qi4hd2k
This commit is contained in:
Debanjum
2025-06-16 21:19:21 -07:00
parent 59f5648dbd
commit 9c38326608
4 changed files with 146 additions and 19 deletions

View File

@@ -1756,6 +1756,18 @@ class FileObjectAdapters:
async def adelete_all_file_objects(user: KhojUser):
return await FileObject.objects.filter(user=user).adelete()
@staticmethod
@arequire_valid_user
async def aget_file_objects_by_regex(user: KhojUser, regex_pattern: str, path_prefix: Optional[str] = None):
"""
Search for a regex pattern in file objects, with an optional path prefix filter.
Outputs results in grep format.
"""
query = FileObject.objects.filter(user=user, agent=None, raw_text__regex=regex_pattern)
if path_prefix:
query = query.filter(file_name__startswith=path_prefix)
return await sync_to_async(list)(query)
class EntryAdapters:
word_filter = WordFilter()

View File

@@ -2881,6 +2881,65 @@ async def view_file_content(
yield [{"query": query, "file": path, "compiled": error_msg}]
async def grep_files(
regex_pattern: str,
path_prefix: Optional[str] = None,
user: KhojUser = None,
):
"""
Search for a regex pattern in files with an optional path prefix.
"""
# Construct the query string based on provided parameters
def _generate_query(line_count, doc_count, path, pattern):
query = f"**Found {line_count} matches for '{pattern}' in {doc_count} documents**"
if path:
query += f" in {path}"
return query
# Validate regex pattern
path_prefix = path_prefix or ""
try:
regex = re.compile(regex_pattern)
except re.error as e:
yield {
"query": _generate_query(0, 0, path_prefix, regex_pattern),
"file": path_prefix,
"compiled": f"Invalid regex pattern: {e}",
}
return
try:
file_matches = await FileObjectAdapters.aget_file_objects_by_regex(user, regex_pattern, path_prefix)
line_matches = []
for file_object in file_matches:
lines = file_object.raw_text.split("\n")
for i, line in enumerate(lines, 1):
if regex.search(line):
line_matches.append(f"{file_object.file_name}:{i}:{line}")
# Check if no results found
query = _generate_query(len(line_matches), len(file_matches), path_prefix, regex_pattern)
if not line_matches:
yield {"query": query, "file": path_prefix, "compiled": "No matches found."}
return
# Truncate matched lines list if too long
max_results = 1000
if len(line_matches) > max_results:
line_matches = line_matches[:max_results] + [
f"... {len(line_matches) - max_results} more results found. Use stricter regex or path to narrow down results."
]
yield {"query": query, "file": path_prefix or "", "compiled": "\n".join(line_matches)}
except Exception as e:
error_msg = f"Error using grep files tool: {str(e)}"
logger.error(error_msg, exc_info=True)
yield [{"query": query, "file": path_prefix or "", "compiled": error_msg}]
async def list_files(
path: Optional[str] = None,
pattern: Optional[str] = None,

View File

@@ -24,6 +24,7 @@ from khoj.processor.tools.run_code import run_code
from khoj.routers.helpers import (
ChatEvent,
generate_summary_from_files,
grep_files,
list_files,
search_documents,
send_message_to_model_wrapper,
@@ -93,13 +94,13 @@ async def apick_next_tool(
continue
# Skip showing document related tools if user has no documents
if (
tool == ConversationCommand.Notes
tool == ConversationCommand.SemanticSearchFiles
or tool == ConversationCommand.RegexSearchFiles
or tool == ConversationCommand.ViewFile
or tool == ConversationCommand.ListFiles
) and not user_has_entries:
continue
# Skip showing Notes tool as an option if user has no entries
if tool == ConversationCommand.Notes:
if tool == ConversationCommand.SemanticSearchFiles:
description = tool_data.description.format(max_search_queries=max_document_searches)
elif tool == ConversationCommand.Webpage:
description = tool_data.description.format(max_webpages_to_read=max_webpages_to_read)
@@ -269,7 +270,7 @@ async def research(
):
current_iteration = MAX_ITERATIONS
elif this_iteration.query.name == ConversationCommand.Notes:
elif this_iteration.query.name == ConversationCommand.SemanticSearchFiles:
this_iteration.context = []
document_results = []
previous_inferred_queries = {
@@ -280,7 +281,7 @@ async def research(
n=max_document_searches,
d=None,
user=user,
chat_history=construct_tool_chat_history(previous_iterations, ConversationCommand.Notes),
chat_history=construct_tool_chat_history(previous_iterations, ConversationCommand.SemanticSearchFiles),
conversation_id=conversation_id,
conversation_commands=[ConversationCommand.Default],
location_data=location,
@@ -471,6 +472,25 @@ async def research(
this_iteration.warning = f"Error listing files: {e}"
logger.error(this_iteration.warning, exc_info=True)
elif this_iteration.query.name == ConversationCommand.RegexSearchFiles:
try:
async for result in grep_files(
**this_iteration.query.args,
user=user,
):
if isinstance(result, dict) and ChatEvent.STATUS in result:
yield result[ChatEvent.STATUS]
else:
if this_iteration.context is None:
this_iteration.context = []
document_results: List[Dict[str, str]] = [result] # type: ignore
this_iteration.context += document_results
async for result in send_status_func(result["query"]):
yield result
except Exception as e:
this_iteration.warning = f"Error searching with regex: {e}"
logger.error(this_iteration.warning, exc_info=True)
else:
# No valid tools. This is our exit condition.
current_iteration = MAX_ITERATIONS

View File

@@ -431,6 +431,8 @@ class ConversationCommand(str, Enum):
Operator = "operator"
ViewFile = "view_file"
ListFiles = "list_files"
RegexSearchFiles = "regex_search_files"
SemanticSearchFiles = "semantic_search_files"
command_descriptions = {
@@ -446,6 +448,7 @@ command_descriptions = {
ConversationCommand.Operator: "Operate and perform tasks using a computer.",
ConversationCommand.ViewFile: "View the contents of a file with optional line range specification.",
ConversationCommand.ListFiles: "List files under a given path with optional glob pattern.",
ConversationCommand.RegexSearchFiles: "Search for lines in files matching regex pattern with an optional path prefix.",
}
command_descriptions_for_agent = {
@@ -472,20 +475,6 @@ tool_descriptions_for_llm = {
}
tools_for_research_llm = {
ConversationCommand.Notes: ToolDefinition(
name="notes",
description="To search the user's personal knowledge base. Especially helpful if the question expects context from the user's notes or documents. Max {max_search_queries} search queries allowed per iteration.",
schema={
"type": "object",
"properties": {
"q": {
"type": "string",
"description": "The query to search in the user's personal knowledge base.",
},
},
"required": ["q"],
},
),
ConversationCommand.Online: ToolDefinition(
name="online",
description="To search the internet for information. Useful to get a quick, broad overview from the internet. Provide all relevant context to ensure new searches, not in previous iterations, are performed. Max {max_search_queries} search queries allowed per iteration.",
@@ -595,6 +584,53 @@ tools_for_research_llm = {
},
},
),
ConversationCommand.SemanticSearchFiles: ToolDefinition(
name="semantic_search_files",
description=dedent(
"""
To have the tool AI semantic search through the user's personal knowledge base.
Helpful to answer questions for which finding some relevant notes or documents can complete the search. Example: "When was Tom born?"
This tool AI cannot find all relevant notes or documents, only a subset of them.
It is a good starting point to find keywords, discover similar topics or related concepts and some relevant notes or documents.
The tool AI can perform a maximum of {max_search_queries} semantic search queries per iteration.
"""
).strip(),
schema={
"type": "object",
"properties": {
"q": {
"type": "string",
"description": "Your natural language query for the tool to search in the user's personal knowledge base.",
},
},
"required": ["q"],
},
),
ConversationCommand.RegexSearchFiles: ToolDefinition(
name="regex_search_files",
description=dedent(
"""
To regex search through the user's personal knowledge base. It returns all lines matching the regex pattern in the user's files.
Helpful to answer questions for which all relevant notes or documents are needed to complete the search. Example: "Notes that mention Tom".
You need to know all the correct keywords or regex patterns for this tool to be useful.
An optional path prefix can restrict file(s) to search in.
"""
).strip(),
schema={
"type": "object",
"properties": {
"regex_pattern": {
"type": "string",
"description": "The regex pattern to search for content in the user's files.",
},
"path_prefix": {
"type": "string",
"description": "Optional path prefix to limit the search to files under a specified path.",
},
},
"required": ["regex_pattern"],
},
),
}
mode_descriptions_for_llm = {