diff --git a/src/khoj/processor/tools/online_search.py b/src/khoj/processor/tools/online_search.py index 11e94bde..5b277978 100644 --- a/src/khoj/processor/tools/online_search.py +++ b/src/khoj/processor/tools/online_search.py @@ -45,6 +45,8 @@ FIRECRAWL_USE_LLM_EXTRACT = is_env_var_true("FIRECRAWL_USE_LLM_EXTRACT") SEARXNG_URL = os.getenv("KHOJ_SEARXNG_URL") +EXA_API_KEY = os.getenv("EXA_API_KEY") + # Timeout for web search and webpage read HTTP requests WEBPAGE_REQUEST_TIMEOUT = 60 # seconds @@ -115,6 +117,9 @@ async def search_online( if FIRECRAWL_API_KEY: search_engine = "Firecrawl" search_engines.append((search_engine, search_with_firecrawl)) + if EXA_API_KEY: + search_engine = "Exa" + search_engines.append((search_engine, search_with_exa)) if SEARXNG_URL: search_engine = "Searxng" search_engines.append((search_engine, search_with_searxng)) @@ -182,6 +187,73 @@ async def search_online( yield response_dict +async def search_with_exa(query: str, location: LocationData) -> Tuple[str, Dict[str, List[Dict]]]: + """ + Search using Exa API. + + Args: + query: The search query string + location: Location data for geolocation-based search + + Returns: + Tuple containing the original query and a dictionary of search results + """ + # Set up API endpoint and headers + exa_search_api_endpoint = "https://api.exa.ai/search" + headers = {"Content-Type": "application/json", "x-api-key": EXA_API_KEY} + + # Prepare request payload + country_code = location.country_code.upper() if location and location.country_code else "US" + payload = { + "query": query, + "type": "auto", + "userLocation": country_code, + "numResults": 10, + "contents": { + "text": False, + "highlights": { + "numSentences": 5, + "highlightsPerUrl": 1, + }, + }, + } + + async with aiohttp.ClientSession() as session: + try: + async with session.post( + exa_search_api_endpoint, headers=headers, json=payload, timeout=WEBPAGE_REQUEST_TIMEOUT + ) as response: + if response.status != 200: + error_text = await response.text() + logger.error(f"Exa search failed: {error_text}") + return query, {} + + response_json = await response.json() + results = response_json.get("results", []) + + if is_none_or_empty(results): + logger.error(f"Exa search failed: {response_json.get('warning', 'Unknown error')}") + return query, {} + + # Transform Exa response to match the expected format + organic_results = [] + for item in results: + organic_results.append( + { + "title": item["title"], + "link": item["url"], + "snippet": item["highlights"][0] if "highlights" in item else None, + "content": item.get("text", None), + } + ) + + return query, {"organic": organic_results} + + except Exception as e: + logger.error(f"Error searching with Exa: {str(e)}") + return query, {} + + async def search_with_firecrawl(query: str, location: LocationData) -> Tuple[str, Dict[str, List[Dict]]]: """ Search using Firecrawl API.