diff --git a/tests/conftest.py b/tests/conftest.py index d1135c0c..25876d61 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -14,6 +14,7 @@ from khoj.configure import ( from khoj.database.models import ( Agent, ChatModel, + FileObject, GithubConfig, GithubRepoConfig, KhojApiUser, @@ -299,6 +300,15 @@ def chat_client_no_background(search_config: SearchConfig, default_user2: KhojUs return chat_client_builder(search_config, default_user2, index_content=False, require_auth=False) +@pytest.fixture(scope="function") +def chat_client_with_large_kb(search_config: SearchConfig, default_user2: KhojUser): + """ + Chat client fixture that creates a large knowledge base with many files + for stress testing atomic agent updates. + """ + return large_kb_chat_client_builder(search_config, default_user2) + + @pytest.mark.django_db def chat_client_builder(search_config, user, index_content=True, require_auth=False): # Initialize app state @@ -339,6 +349,127 @@ def chat_client_builder(search_config, user, index_content=True, require_auth=Fa return TestClient(app) +@pytest.mark.django_db +def large_kb_chat_client_builder(search_config, user): + """ + Build a chat client with a large knowledge base for stress testing. + Creates 200+ markdown files with substantial content. + """ + import os + import shutil + import tempfile + + # Initialize app state + state.config.search_type = search_config + state.SearchType = configure_search_types() + + # Create temporary directory for large number of test files + temp_dir = tempfile.mkdtemp(prefix="khoj_test_large_kb_") + large_file_list = [] + + try: + # Generate 200 test files with substantial content + for i in range(300): + file_path = os.path.join(temp_dir, f"test_file_{i:03d}.markdown") + content = f""" +# Test File {i} + +This is test file {i} with substantial content for stress testing agent knowledge base updates. + +## Section 1: Introduction +This section introduces the topic of file {i}. It contains enough text to create meaningful +embeddings and entries in the database for realistic testing. + +## Section 2: Technical Details +Technical content for file {i}: +- Implementation details +- Best practices +- Code examples +- Architecture notes + +## Section 3: Code Examples +```python +def example_function_{i}(): + '''Example function from file {i}''' + return f"Result from file {i}" + +class TestClass{i}: + def __init__(self): + self.value = {i} + self.data = [f"item_{{j}}" for j in range(10)] + + def process(self): + return f"Processing {{len(self.data)}} items from file {i}" +``` + +## Section 4: Additional Content +More substantial content to make the files realistic and ensure proper +database entry creation during content processing. + +File statistics: +- File number: {i} +- Content sections: 4 +- Code examples: Yes +- Purpose: Stress testing atomic agent updates + +{'Additional padding content. ' * 20} + +End of file {i}. +""" + with open(file_path, "w") as f: + f.write(content) + large_file_list.append(file_path) + + # Create LocalMarkdownConfig with all the generated files + LocalMarkdownConfig.objects.create( + input_files=large_file_list, + input_filter=None, + user=user, + ) + + # Index all the files into the user's knowledge base + all_files = fs_syncer.collect_files(user=user) + configure_content(user, all_files) + + # Verify we have a substantial knowledge base + file_count = FileObject.objects.filter(user=user, agent=None).count() + if file_count < 150: + raise RuntimeError(f"Large KB fixture failed: only {file_count} files indexed, expected at least 150") + + except Exception as e: + # Cleanup on error + if os.path.exists(temp_dir): + shutil.rmtree(temp_dir) + raise e + + # Initialize chat processor + chat_provider = get_chat_provider() + online_chat_model = None + if chat_provider == ChatModel.ModelType.OPENAI: + online_chat_model = ChatModelFactory(name="gpt-4o-mini", model_type="openai") + elif chat_provider == ChatModel.ModelType.GOOGLE: + online_chat_model = ChatModelFactory(name="gemini-2.0-flash", model_type="google") + elif chat_provider == ChatModel.ModelType.ANTHROPIC: + online_chat_model = ChatModelFactory(name="claude-3-5-haiku-20241022", model_type="anthropic") + + if online_chat_model: + online_chat_model.ai_model_api = AiModelApiFactory(api_key=get_chat_api_key(chat_provider)) + UserConversationProcessorConfigFactory(user=user, setting=online_chat_model) + + state.anonymous_mode = False + + app = FastAPI() + configure_routes(app) + configure_middleware(app) + app.mount("/static", StaticFiles(directory=web_directory), name="static") + + # Store temp_dir for cleanup (though Django test cleanup should handle it) + client = TestClient(app) + client._temp_dir = temp_dir # Store for potential cleanup + + return client + + @pytest.fixture(scope="function") def fastapi_app(): app = FastAPI() diff --git a/tests/test_agents.py b/tests/test_agents.py index f573bed9..21a242ef 100644 --- a/tests/test_agents.py +++ b/tests/test_agents.py @@ -1,11 +1,12 @@ # tests/test_agents.py -import os +import asyncio +from collections import Counter import pytest from asgiref.sync import sync_to_async from khoj.database.adapters import AgentAdapters -from khoj.database.models import Agent, ChatModel, Entry, KhojUser +from khoj.database.models import Agent, ChatModel, Entry, FileObject, KhojUser from khoj.routers.helpers import execute_search from khoj.utils.helpers import get_absolute_path from tests.helpers import ChatModelFactory @@ -209,3 +210,195 @@ async def test_multiple_agents_with_knowledge_base_and_users( assert len(search_result) == 0 assert len(search_result2) == 1 + + +@pytest.mark.anyio +@pytest.mark.django_db(transaction=True) +async def test_large_knowledge_base_atomic_update( + default_user2: KhojUser, default_openai_chat_model_option: ChatModel, chat_client_with_large_kb +): + """ + The test simulates the scenario where lots of files are synced to an agent's knowledge base, + and verifies that all files are properly added atomically. + """ + # The chat_client_with_large_kb fixture has already created and indexed 200 files + # Get the files that are already in the user's knowledge base + user_file_objects = await sync_to_async(list)(FileObject.objects.filter(user=default_user2, agent=None)) + + # Verify we have the expected large knowledge base from the fixture + assert len(user_file_objects) >= 150, f"Expected at least 150 files from fixture, got {len(user_file_objects)}" + + # Extract file paths for agent creation + available_files = [fo.file_name for fo in user_file_objects] + files_to_test = available_files # Use all available files for the stress test + + # Create initial agent with smaller set + initial_files = files_to_test[:20] + agent = await AgentAdapters.aupdate_agent( + default_user2, + "Large KB Agent", + "Test agent with large knowledge base", + Agent.PrivacyLevel.PRIVATE, + "icon", + "color", + default_openai_chat_model_option.name, + initial_files, + [], + [], + ) + + # Verify initial state + initial_entries = await sync_to_async(list)(Entry.objects.filter(agent=agent)) + initial_entries_count = len(initial_entries) + + # Now perform the stress test: update with ALL 180 files at once + # This is where partial sync issues would occur without atomic transactions + updated_agent = await AgentAdapters.aupdate_agent( + default_user2, + "Large KB Agent Updated", # Change name to trigger update + "Test agent with large knowledge base - updated", + Agent.PrivacyLevel.PRIVATE, + "icon", + "color", + default_openai_chat_model_option.name, + files_to_test, # ALL files at once + [], + [], + ) + + # Verify atomic update completed successfully + final_entries = await sync_to_async(list)(Entry.objects.filter(agent=updated_agent)) + final_file_objects = await sync_to_async(list)(FileObject.objects.filter(agent=updated_agent)) + + # Critical assertions that would fail with partial sync issues + expected_file_count = len(files_to_test) + actual_file_count = len(final_file_objects) + + assert actual_file_count == expected_file_count, ( + f"Partial sync detected! Expected {expected_file_count} files, " + f"but got {actual_file_count}. This indicates non-atomic update." + ) + + # Verify all files are properly represented + file_paths_in_db = {fo.file_name for fo in final_file_objects} + expected_file_paths = set(files_to_test) + + missing_files = expected_file_paths - file_paths_in_db + assert not missing_files, f"Missing files in knowledge base: {missing_files}" + + # Verify entries were created (should have significantly more than initial) + assert len(final_entries) > initial_entries_count, "Should have more entries after update" + + # With 180 files, we should have many entries (each file creates multiple entries) + assert ( + len(final_entries) >= expected_file_count + ), f"Expected at least {expected_file_count} entries, got {len(final_entries)}" + + # Verify no partial state - all entries should correspond to the final file set + entry_file_paths = {entry.file_path for entry in final_entries} + + # All file objects should have corresponding entries + assert file_paths_in_db.issubset( + entry_file_paths + ), "All file objects should have corresponding entries - atomic update verification" + + # Additional stress test: verify referential integrity + # Count entries per file to ensure no partial file processing + entries_per_file = Counter(entry.file_path for entry in final_entries) + + # Ensure every file has at least one entry (no files were partially processed and lost) + files_without_entries = set(files_to_test) - set(entries_per_file.keys()) + assert not files_without_entries, f"Files with no entries (partial sync): {files_without_entries}" + + # Test that search works with the updated knowledge base + search_result = await execute_search(user=default_user2, q="test", agent=updated_agent) + assert len(search_result) > 0, "Should be able to search the updated knowledge base" + + +@pytest.mark.anyio +@pytest.mark.django_db(transaction=True) +async def test_concurrent_agent_updates_atomicity( + default_user2: KhojUser, default_openai_chat_model_option: ChatModel, chat_client_with_large_kb +): + """ + Test that concurrent updates to the same agent don't result in partial syncs. + This simulates the race condition that could occur with non-atomic updates. + """ + # The chat_client_with_large_kb fixture has already created and indexed 200 files + # Get the files that are already in the user's knowledge base + user_file_objects = await sync_to_async(list)(FileObject.objects.filter(user=default_user2, agent=None)) + + # Extract file paths for agent creation + available_files = [fo.file_name for fo in user_file_objects] + test_files = available_files # Use all available files for the stress test + + # Create initial agent + agent = await AgentAdapters.aupdate_agent( + default_user2, + "Concurrent Test Agent", + "Test concurrent updates", + Agent.PrivacyLevel.PRIVATE, + "icon", + "color", + default_openai_chat_model_option.name, + test_files[:10], + [], + [], + ) + + # Create two concurrent update tasks with different file sets + async def update_agent_with_files(file_subset, name_suffix, delay=0): + if delay > 0: + await asyncio.sleep(delay) + return await AgentAdapters.aupdate_agent( + default_user2, + f"Concurrent Test Agent {name_suffix}", + f"Test concurrent updates {name_suffix}", + Agent.PrivacyLevel.PRIVATE, + "icon", + "color", + default_openai_chat_model_option.name, + file_subset, + [], + [], + ) + + # Run concurrent updates + initial_split_size = 20 + large_split_size = 80 + try: + results = await asyncio.gather( + update_agent_with_files(test_files[initial_split_size : initial_split_size + large_split_size], "Second"), + update_agent_with_files(test_files[:initial_split_size], "First"), + return_exceptions=True, + ) + + # At least one should succeed with atomic updates + successful_updates = [r for r in results if not isinstance(r, Exception)] + assert len(successful_updates) >= 1, "At least one concurrent update should succeed" + + # Verify the final state is consistent + final_agent = successful_updates[0] + final_file_objects = await sync_to_async(list)(FileObject.objects.filter(agent=final_agent)) + final_entries = await sync_to_async(list)(Entry.objects.filter(agent=final_agent)) + + # The agent should have a consistent state (all files from the successful update) + assert len(final_file_objects) == large_split_size, "Should have file objects after concurrent update" + assert len(final_entries) >= large_split_size, "Should have entries after concurrent update" + + # Verify referential integrity + entry_file_paths = {entry.file_path for entry in final_entries} + file_object_paths = {fo.file_name for fo in final_file_objects} + + # All entries should have corresponding file objects + assert entry_file_paths.issubset( + file_object_paths + ), "All entries should have corresponding file objects - indicates atomic update worked" + + except Exception as e: + # If we get database integrity errors, that's actually expected behavior + # with proper atomic transactions - they should fail cleanly rather than + # allowing partial updates + assert ( + "database" in str(e).lower() or "integrity" in str(e).lower() + ), f"Expected database/integrity error with concurrent updates, got: {e}"