Test update agents with large knowledge bases

This commit is contained in:
Debanjum
2025-07-02 15:18:02 -07:00
parent 5fe2ea8a55
commit e6cc9b1182
2 changed files with 326 additions and 2 deletions

View File

@@ -14,6 +14,7 @@ from khoj.configure import (
from khoj.database.models import (
Agent,
ChatModel,
FileObject,
GithubConfig,
GithubRepoConfig,
KhojApiUser,
@@ -299,6 +300,15 @@ def chat_client_no_background(search_config: SearchConfig, default_user2: KhojUs
return chat_client_builder(search_config, default_user2, index_content=False, require_auth=False)
@pytest.fixture(scope="function")
def chat_client_with_large_kb(search_config: SearchConfig, default_user2: KhojUser):
"""
Chat client fixture that creates a large knowledge base with many files
for stress testing atomic agent updates.
"""
return large_kb_chat_client_builder(search_config, default_user2)
@pytest.mark.django_db
def chat_client_builder(search_config, user, index_content=True, require_auth=False):
# Initialize app state
@@ -339,6 +349,127 @@ def chat_client_builder(search_config, user, index_content=True, require_auth=Fa
return TestClient(app)
@pytest.mark.django_db
def large_kb_chat_client_builder(search_config, user):
"""
Build a chat client with a large knowledge base for stress testing.
Creates 200+ markdown files with substantial content.
"""
import os
import shutil
import tempfile
# Initialize app state
state.config.search_type = search_config
state.SearchType = configure_search_types()
# Create temporary directory for large number of test files
temp_dir = tempfile.mkdtemp(prefix="khoj_test_large_kb_")
large_file_list = []
try:
# Generate 200 test files with substantial content
for i in range(300):
file_path = os.path.join(temp_dir, f"test_file_{i:03d}.markdown")
content = f"""
# Test File {i}
This is test file {i} with substantial content for stress testing agent knowledge base updates.
## Section 1: Introduction
This section introduces the topic of file {i}. It contains enough text to create meaningful
embeddings and entries in the database for realistic testing.
## Section 2: Technical Details
Technical content for file {i}:
- Implementation details
- Best practices
- Code examples
- Architecture notes
## Section 3: Code Examples
```python
def example_function_{i}():
'''Example function from file {i}'''
return f"Result from file {i}"
class TestClass{i}:
def __init__(self):
self.value = {i}
self.data = [f"item_{{j}}" for j in range(10)]
def process(self):
return f"Processing {{len(self.data)}} items from file {i}"
```
## Section 4: Additional Content
More substantial content to make the files realistic and ensure proper
database entry creation during content processing.
File statistics:
- File number: {i}
- Content sections: 4
- Code examples: Yes
- Purpose: Stress testing atomic agent updates
{'Additional padding content. ' * 20}
End of file {i}.
"""
with open(file_path, "w") as f:
f.write(content)
large_file_list.append(file_path)
# Create LocalMarkdownConfig with all the generated files
LocalMarkdownConfig.objects.create(
input_files=large_file_list,
input_filter=None,
user=user,
)
# Index all the files into the user's knowledge base
all_files = fs_syncer.collect_files(user=user)
configure_content(user, all_files)
# Verify we have a substantial knowledge base
file_count = FileObject.objects.filter(user=user, agent=None).count()
if file_count < 150:
raise RuntimeError(f"Large KB fixture failed: only {file_count} files indexed, expected at least 150")
except Exception as e:
# Cleanup on error
if os.path.exists(temp_dir):
shutil.rmtree(temp_dir)
raise e
# Initialize chat processor
chat_provider = get_chat_provider()
online_chat_model = None
if chat_provider == ChatModel.ModelType.OPENAI:
online_chat_model = ChatModelFactory(name="gpt-4o-mini", model_type="openai")
elif chat_provider == ChatModel.ModelType.GOOGLE:
online_chat_model = ChatModelFactory(name="gemini-2.0-flash", model_type="google")
elif chat_provider == ChatModel.ModelType.ANTHROPIC:
online_chat_model = ChatModelFactory(name="claude-3-5-haiku-20241022", model_type="anthropic")
if online_chat_model:
online_chat_model.ai_model_api = AiModelApiFactory(api_key=get_chat_api_key(chat_provider))
UserConversationProcessorConfigFactory(user=user, setting=online_chat_model)
state.anonymous_mode = False
app = FastAPI()
configure_routes(app)
configure_middleware(app)
app.mount("/static", StaticFiles(directory=web_directory), name="static")
# Store temp_dir for cleanup (though Django test cleanup should handle it)
client = TestClient(app)
client._temp_dir = temp_dir # Store for potential cleanup
return client
@pytest.fixture(scope="function")
def fastapi_app():
app = FastAPI()

View File

@@ -1,11 +1,12 @@
# tests/test_agents.py
import os
import asyncio
from collections import Counter
import pytest
from asgiref.sync import sync_to_async
from khoj.database.adapters import AgentAdapters
from khoj.database.models import Agent, ChatModel, Entry, KhojUser
from khoj.database.models import Agent, ChatModel, Entry, FileObject, KhojUser
from khoj.routers.helpers import execute_search
from khoj.utils.helpers import get_absolute_path
from tests.helpers import ChatModelFactory
@@ -209,3 +210,195 @@ async def test_multiple_agents_with_knowledge_base_and_users(
assert len(search_result) == 0
assert len(search_result2) == 1
@pytest.mark.anyio
@pytest.mark.django_db(transaction=True)
async def test_large_knowledge_base_atomic_update(
default_user2: KhojUser, default_openai_chat_model_option: ChatModel, chat_client_with_large_kb
):
"""
The test simulates the scenario where lots of files are synced to an agent's knowledge base,
and verifies that all files are properly added atomically.
"""
# The chat_client_with_large_kb fixture has already created and indexed 200 files
# Get the files that are already in the user's knowledge base
user_file_objects = await sync_to_async(list)(FileObject.objects.filter(user=default_user2, agent=None))
# Verify we have the expected large knowledge base from the fixture
assert len(user_file_objects) >= 150, f"Expected at least 150 files from fixture, got {len(user_file_objects)}"
# Extract file paths for agent creation
available_files = [fo.file_name for fo in user_file_objects]
files_to_test = available_files # Use all available files for the stress test
# Create initial agent with smaller set
initial_files = files_to_test[:20]
agent = await AgentAdapters.aupdate_agent(
default_user2,
"Large KB Agent",
"Test agent with large knowledge base",
Agent.PrivacyLevel.PRIVATE,
"icon",
"color",
default_openai_chat_model_option.name,
initial_files,
[],
[],
)
# Verify initial state
initial_entries = await sync_to_async(list)(Entry.objects.filter(agent=agent))
initial_entries_count = len(initial_entries)
# Now perform the stress test: update with ALL 180 files at once
# This is where partial sync issues would occur without atomic transactions
updated_agent = await AgentAdapters.aupdate_agent(
default_user2,
"Large KB Agent Updated", # Change name to trigger update
"Test agent with large knowledge base - updated",
Agent.PrivacyLevel.PRIVATE,
"icon",
"color",
default_openai_chat_model_option.name,
files_to_test, # ALL files at once
[],
[],
)
# Verify atomic update completed successfully
final_entries = await sync_to_async(list)(Entry.objects.filter(agent=updated_agent))
final_file_objects = await sync_to_async(list)(FileObject.objects.filter(agent=updated_agent))
# Critical assertions that would fail with partial sync issues
expected_file_count = len(files_to_test)
actual_file_count = len(final_file_objects)
assert actual_file_count == expected_file_count, (
f"Partial sync detected! Expected {expected_file_count} files, "
f"but got {actual_file_count}. This indicates non-atomic update."
)
# Verify all files are properly represented
file_paths_in_db = {fo.file_name for fo in final_file_objects}
expected_file_paths = set(files_to_test)
missing_files = expected_file_paths - file_paths_in_db
assert not missing_files, f"Missing files in knowledge base: {missing_files}"
# Verify entries were created (should have significantly more than initial)
assert len(final_entries) > initial_entries_count, "Should have more entries after update"
# With 180 files, we should have many entries (each file creates multiple entries)
assert (
len(final_entries) >= expected_file_count
), f"Expected at least {expected_file_count} entries, got {len(final_entries)}"
# Verify no partial state - all entries should correspond to the final file set
entry_file_paths = {entry.file_path for entry in final_entries}
# All file objects should have corresponding entries
assert file_paths_in_db.issubset(
entry_file_paths
), "All file objects should have corresponding entries - atomic update verification"
# Additional stress test: verify referential integrity
# Count entries per file to ensure no partial file processing
entries_per_file = Counter(entry.file_path for entry in final_entries)
# Ensure every file has at least one entry (no files were partially processed and lost)
files_without_entries = set(files_to_test) - set(entries_per_file.keys())
assert not files_without_entries, f"Files with no entries (partial sync): {files_without_entries}"
# Test that search works with the updated knowledge base
search_result = await execute_search(user=default_user2, q="test", agent=updated_agent)
assert len(search_result) > 0, "Should be able to search the updated knowledge base"
@pytest.mark.anyio
@pytest.mark.django_db(transaction=True)
async def test_concurrent_agent_updates_atomicity(
default_user2: KhojUser, default_openai_chat_model_option: ChatModel, chat_client_with_large_kb
):
"""
Test that concurrent updates to the same agent don't result in partial syncs.
This simulates the race condition that could occur with non-atomic updates.
"""
# The chat_client_with_large_kb fixture has already created and indexed 200 files
# Get the files that are already in the user's knowledge base
user_file_objects = await sync_to_async(list)(FileObject.objects.filter(user=default_user2, agent=None))
# Extract file paths for agent creation
available_files = [fo.file_name for fo in user_file_objects]
test_files = available_files # Use all available files for the stress test
# Create initial agent
agent = await AgentAdapters.aupdate_agent(
default_user2,
"Concurrent Test Agent",
"Test concurrent updates",
Agent.PrivacyLevel.PRIVATE,
"icon",
"color",
default_openai_chat_model_option.name,
test_files[:10],
[],
[],
)
# Create two concurrent update tasks with different file sets
async def update_agent_with_files(file_subset, name_suffix, delay=0):
if delay > 0:
await asyncio.sleep(delay)
return await AgentAdapters.aupdate_agent(
default_user2,
f"Concurrent Test Agent {name_suffix}",
f"Test concurrent updates {name_suffix}",
Agent.PrivacyLevel.PRIVATE,
"icon",
"color",
default_openai_chat_model_option.name,
file_subset,
[],
[],
)
# Run concurrent updates
initial_split_size = 20
large_split_size = 80
try:
results = await asyncio.gather(
update_agent_with_files(test_files[initial_split_size : initial_split_size + large_split_size], "Second"),
update_agent_with_files(test_files[:initial_split_size], "First"),
return_exceptions=True,
)
# At least one should succeed with atomic updates
successful_updates = [r for r in results if not isinstance(r, Exception)]
assert len(successful_updates) >= 1, "At least one concurrent update should succeed"
# Verify the final state is consistent
final_agent = successful_updates[0]
final_file_objects = await sync_to_async(list)(FileObject.objects.filter(agent=final_agent))
final_entries = await sync_to_async(list)(Entry.objects.filter(agent=final_agent))
# The agent should have a consistent state (all files from the successful update)
assert len(final_file_objects) == large_split_size, "Should have file objects after concurrent update"
assert len(final_entries) >= large_split_size, "Should have entries after concurrent update"
# Verify referential integrity
entry_file_paths = {entry.file_path for entry in final_entries}
file_object_paths = {fo.file_name for fo in final_file_objects}
# All entries should have corresponding file objects
assert entry_file_paths.issubset(
file_object_paths
), "All entries should have corresponding file objects - indicates atomic update worked"
except Exception as e:
# If we get database integrity errors, that's actually expected behavior
# with proper atomic transactions - they should fail cleanly rather than
# allowing partial updates
assert (
"database" in str(e).lower() or "integrity" in str(e).lower()
), f"Expected database/integrity error with concurrent updates, got: {e}"