From 60e9a616470dd8e6e0c043e50d3185eb278a8681 Mon Sep 17 00:00:00 2001 From: Debanjum Singh Solanky Date: Wed, 11 Oct 2023 17:14:15 -0700 Subject: [PATCH] Use multi-part form to receive files to index on server - This uses existing HTTP affordance to process files - Better handling of binary file formats as removes need to url encode/decode - Less memory utilization than streaming json as files get automatically written to disk once memory utilization exceeds preset limits - No manual parsing of raw files streams required --- pyproject.toml | 1 + src/khoj/routers/indexer.py | 31 ++++++------------------------- src/khoj/utils/helpers.py | 24 ++++++++++++++---------- 3 files changed, 21 insertions(+), 35 deletions(-) diff --git a/pyproject.toml b/pyproject.toml index f352a83d..afd78848 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -40,6 +40,7 @@ dependencies = [ "dateparser >= 1.1.1", "defusedxml == 0.7.1", "fastapi == 0.77.1", + "python-multipart >= 0.0.5", "jinja2 == 3.1.2", "openai >= 0.27.0, < 1.0.0", "tiktoken >= 0.3.2", diff --git a/src/khoj/routers/indexer.py b/src/khoj/routers/indexer.py index 94fc392d..86cd847f 100644 --- a/src/khoj/routers/indexer.py +++ b/src/khoj/routers/indexer.py @@ -1,10 +1,9 @@ # Standard Packages import logging -import sys from typing import Optional, Union, Dict # External Packages -from fastapi import APIRouter, HTTPException, Header, Request, Body, Response +from fastapi import APIRouter, HTTPException, Header, Response, UploadFile from pydantic import BaseModel # Internal Packages @@ -58,7 +57,7 @@ class IndexerInput(BaseModel): @indexer.post("/batch") async def index_batch( - request: Request, + files: list[UploadFile], x_api_key: str = Header(None), regenerate: bool = False, search_type: Optional[Union[state.SearchType, str]] = None, @@ -67,32 +66,14 @@ async def index_batch( raise HTTPException(status_code=401, detail="Invalid API Key") state.config_lock.acquire() try: - logger.info(f"Received batch indexing request") - index_batch_request_acc = b"" - async for chunk in request.stream(): - index_batch_request_acc += chunk - data_bytes = sys.getsizeof(index_batch_request_acc) - unit = "KB" - data_size = data_bytes / 1024 - if data_size > 1000: - unit = "MB" - data_size = data_size / 1024 - if data_size > 1000: - unit = "GB" - data_size = data_size / 1024 - data_size_metric = f"{data_size:.2f} {unit}" - logger.info(f"Received {data_size_metric} of data") - index_batch_request = IndexBatchRequest.parse_raw(index_batch_request_acc) - logger.info(f"Received {len(index_batch_request.files)} files") - logger.info("📬 Updating content index via API") org_files: Dict[str, str] = {} markdown_files: Dict[str, str] = {} pdf_files: Dict[str, str] = {} plaintext_files: Dict[str, str] = {} - for file in index_batch_request.files: - file_type = get_file_type(file.path) + for file in files: + file_type = get_file_type(file.content_type) dict_to_update = None if file_type == "org": dict_to_update = org_files @@ -104,9 +85,9 @@ async def index_batch( dict_to_update = plaintext_files if dict_to_update is not None: - dict_to_update[file.path] = file.content + dict_to_update[file.filename] = file.file.read().decode("utf-8") else: - logger.info(f"Skipping unsupported streamed file: {file.path}") + logger.warning(f"Skipped indexing unsupported file type sent by client: {file.filename}") indexer_input = IndexerInput( org=org_files, diff --git a/src/khoj/utils/helpers.py b/src/khoj/utils/helpers.py index f8977043..3391a55d 100644 --- a/src/khoj/utils/helpers.py +++ b/src/khoj/utils/helpers.py @@ -66,20 +66,24 @@ def merge_dicts(priority_dict: dict, default_dict: dict): return merged_dict -def get_file_type(filepath: str) -> str: - "Get file type from file path" - file_type = Path(filepath).suffix[1:] +def get_file_type(file_type: str) -> str: + "Get file type from file mime type" - if file_type in ["md", "markdown"]: + file_type = file_type.split(";")[0].strip() if ";" in file_type else file_type + if file_type in ["text/markdown"]: return "markdown" - elif file_type in ["org", "orgmode"]: + elif file_type in ["text/org"]: return "org" - elif file_type in ["txt", "text", "html", "xml", "htm", "rst"]: - return "plaintext" - elif file_type in ["pdf"]: + elif file_type in ["application/pdf"]: return "pdf" - - return file_type + elif file_type in ["image/jpeg"]: + return "jpeg" + elif file_type in ["image/png"]: + return "png" + elif file_type in ["text/plain", "text/html", "application/xml", "text/x-rst"]: + return "plaintext" + else: + return "other" def load_model(