from collections.abc import Mapping, Sequence
from datetime import datetime
from http import HTTPStatus
from json import dumps
from typing import Annotated, Any, Literal, Optional
from urllib.parse import quote
import requests
from pydantic import BaseModel, Field
from requests import HTTPError
from intelligence_layer.connectors.base.json_serializable import JsonSerializable
[docs]
class IndexPath(BaseModel, frozen=True):
"""Path to an index.
Args:
namespace: Holds collections.
index: The name of the index, holds a config.
"""
namespace: str
index: str
[docs]
class IndexConfiguration(BaseModel):
"""Configuration of an index.
Args:
embedding_type: "symmetric" or "asymmetric" embedding type.
chunk_size: The maximum size of the chunks in tokens to be used for the index.
"""
embedding_type: Literal["symmetric", "asymmetric"]
chunk_size: int
[docs]
class DocumentContents(BaseModel):
"""Actual content of a document.
Note:
Currently only supports text-only documents.
Args:
contents: List of text items.
metadata: Any metadata that is kept along with the document. This could contain things like
author, creation-data, references to external systems. The content must be serializable using
`json.dumps`. The document-index leaves it unchanged.
"""
contents: Sequence[str]
metadata: JsonSerializable = None
@classmethod
def from_text(cls, text: str) -> "DocumentContents":
return cls(contents=[text])
@classmethod
def _from_modalities_json(
cls, modalities_json: Mapping[str, Any]
) -> "DocumentContents":
return cls(
contents=[
modality["text"]
for modality in modalities_json.get("contents", [])
if modality["modality"] == "text"
],
metadata=modalities_json.get("metadata"),
)
def _to_modalities_json(self) -> Mapping[str, Any]:
return {
"schema_version": "V1",
"contents": [{"modality": "text", "text": c} for c in self.contents],
"metadata": self.metadata,
}
[docs]
class CollectionPath(BaseModel, frozen=True):
"""Path to a collection.
Args:
namespace: Holds collections.
collection: Holds documents.
Unique within a namespace.
"""
namespace: str
collection: str
[docs]
class DocumentPath(BaseModel, frozen=True):
"""Path to a document.
Args:
collection_path: Path to a collection.
document_name: Points to a document.
Unique within a collection.
"""
collection_path: CollectionPath
document_name: str
def encoded_document_name(self) -> str:
return quote(self.document_name, safe="")
@classmethod
def from_json(cls, document_path_json: Mapping[str, str]) -> "DocumentPath":
return cls(
collection_path=CollectionPath(
namespace=document_path_json["namespace"],
collection=document_path_json["collection"],
),
document_name=document_path_json["name"],
)
def to_slash_separated_str(self) -> str:
return f"{self.collection_path.namespace}/{self.collection_path.collection}/{self.document_name}"
@classmethod
def from_slash_separated_str(cls, path: str) -> "DocumentPath":
split = path.split("/", 2)
assert len(split) == 3
return cls(
collection_path=CollectionPath(
namespace=split[0],
collection=split[1],
),
document_name=split[2],
)
[docs]
class DocumentInfo(BaseModel):
"""Presents an overview of a document.
Args:
document_path: Path to a document.
created: When this version of the document was created.
Equivalent to when it was last updated.
version: How many times the document was updated.
"""
document_path: DocumentPath
created: datetime
version: int
@classmethod
def from_list_documents_response(
cls, list_documents_response: Mapping[str, Any]
) -> "DocumentInfo":
return cls(
document_path=DocumentPath.from_json(list_documents_response["path"]),
created=datetime.strptime(
list_documents_response["created_timestamp"], "%Y-%m-%dT%H:%M:%S.%fZ"
),
version=list_documents_response["version"],
)
[docs]
class SearchQuery(BaseModel):
"""Query to search through a collection with.
Args:
query: Actual text to be searched with.
max_results: Max number of search results to be retrieved by the query.
Must be larger than 0.
min_score: Min score needed for a search result to be returned.
Must be between 0 and 1.
"""
query: str
max_results: int = Field(..., ge=0)
min_score: float = Field(..., ge=0.0, le=1.0)
class DocumentFilterQueryParams(BaseModel):
"""Query to filter documents by.
Args:
max_documents: Maximum number of documents to display.
starts_with: Document title prefix/substring to search by.
"""
max_documents: Optional[Annotated[int, Field(default=25, ge=0)]]
starts_with: Optional[str]
class DocumentTextPosition(BaseModel):
"""Position of a document chunk within a document item.
Args:
item: Which item in the document the chunk belongs to.
start_position: Start index of the chunk within the item.
end_position: End index of the chunk within the item.
"""
item: int
start_position: int
end_position: int
[docs]
class DocumentSearchResult(BaseModel):
"""Result of a search query for one individual section.
Args:
document_path: Path to the document that the section originates from.
section: Actual section of the document that was found as a match to the query.
score: Actual search score of the section found.
Generally, higher scores correspond to better matches.
Will be between 0 and 1.
"""
document_path: DocumentPath
section: str
score: float
chunk_position: DocumentTextPosition
@classmethod
def _from_search_response(
cls, search_response: Mapping[str, Any]
) -> "DocumentSearchResult":
assert search_response["start"]["item"] == search_response["end"]["item"]
return cls(
document_path=DocumentPath.from_json(search_response["document_path"]),
section=search_response["section"][0]["text"],
score=search_response["score"],
chunk_position=DocumentTextPosition(
item=search_response["start"]["item"],
start_position=search_response["start"]["position"],
end_position=search_response["end"]["position"],
),
)
[docs]
class DocumentIndexError(RuntimeError):
"""Raised in case of any `DocumentIndexClient`-related errors.
Attributes:
message: The error message as returned by the Document Index.
status_code: The http error code.
"""
def __init__(self, message: str, status_code: HTTPStatus) -> None:
super().__init__(message)
self.message = message
self.status_code = status_code
[docs]
class ExternalServiceUnavailable(DocumentIndexError):
"""Raised in case external service is unavailable when the request is executed."""
pass
[docs]
class ResourceNotFound(DocumentIndexError):
"""Raised when a resource like a namespace or a document cannot be found.
Note that this can also mean that the user executing the request does not have
permission to access the resource.
"""
pass
[docs]
class ConstraintViolation(DocumentIndexError):
"""Raised when the request cannot be processed as it would lead to an inconsistent state."""
pass
_status_code_to_exception = {
HTTPStatus.SERVICE_UNAVAILABLE: ExternalServiceUnavailable,
HTTPStatus.NOT_FOUND: ResourceNotFound,
HTTPStatus.UNPROCESSABLE_ENTITY: InvalidInput,
HTTPStatus.CONFLICT: ConstraintViolation,
}
[docs]
class InternalError(DocumentIndexError):
"""Raised in case of unexpected errors."""
pass
[docs]
class DocumentIndexClient:
"""Client for the Document Index allowing handling documents and search.
Document Index is a tool for managing collections of documents, enabling operations such as creation, deletion, listing, and searching.
Documents can be stored either in the cloud or in a local deployment.
Args:
token: A valid token for the document index API.
base_document_index_url: The url of the document index' API.
Example:
>>> import os
>>> from intelligence_layer.connectors import (
... CollectionPath,
... DocumentContents,
... DocumentIndexClient,
... DocumentPath,
... SearchQuery,
... )
>>> document_index = DocumentIndexClient(os.getenv("AA_TOKEN"))
>>> collection_path = CollectionPath(
... namespace="aleph-alpha", collection="wikipedia-de"
... )
>>> document_index.create_collection(collection_path)
>>> document_index.add_document(
... document_path=DocumentPath(
... collection_path=collection_path, document_name="Fun facts about Germany"
... ),
... contents=DocumentContents.from_text("Germany is a country located in ..."),
... )
>>> search_result = document_index.search(
... collection_path=collection_path,
... index_name="asymmetric",
... search_query=SearchQuery(
... query="What is the capital of Germany", max_results=4, min_score=0.5
... ),
... )
"""
def __init__(
self,
token: str | None,
base_document_index_url: str = "https://document-index.aleph-alpha.com",
) -> None:
self._base_document_index_url = base_document_index_url
self.headers = {
"Content-Type": "application/json",
"Accept": "application/json",
**({"Authorization": f"Bearer {token}"} if token is not None else {}),
}
[docs]
def list_namespaces(self) -> Sequence[str]:
"""Lists all available namespaces.
Returns:
List of all available namespaces.
"""
url = f"{self._base_document_index_url}/namespaces"
response = requests.get(url, headers=self.headers)
self._raise_for_status(response)
return [str(namespace) for namespace in response.json()]
[docs]
def create_collection(self, collection_path: CollectionPath) -> None:
"""Creates a collection at the path.
Note:
Collection's name must be unique within a namespace.
Args:
collection_path: Path to the collection of interest.
"""
url = f"{self._base_document_index_url}/collections/{collection_path.namespace}/{collection_path.collection}"
response = requests.put(url, headers=self.headers)
self._raise_for_status(response)
[docs]
def delete_collection(self, collection_path: CollectionPath) -> None:
"""Deletes the collection at the path.
Args:
collection_path: Path to the collection of interest.
"""
url = f"{self._base_document_index_url}/collections/{collection_path.namespace}/{collection_path.collection}"
response = requests.delete(url, headers=self.headers)
self._raise_for_status(response)
[docs]
def list_collections(self, namespace: str) -> Sequence[CollectionPath]:
"""Lists all collections within a namespace.
Args:
namespace: For a collection of documents.
Typically corresponds to an organization.
Returns:
List of all `CollectionPath` instances in the given namespace.
"""
url = f"{self._base_document_index_url}/collections/{namespace}"
response = requests.get(url, headers=self.headers)
self._raise_for_status(response)
return [
CollectionPath(namespace=namespace, collection=collection)
for collection in response.json()
]
[docs]
def create_index(
self, index_path: IndexPath, index_configuration: IndexConfiguration
) -> None:
"""Creates an index in a namespace.
Args:
index_path: Path to the index.
index_configuration: Configuration of the index to be created.
"""
url = f"{self._base_document_index_url}/indexes/{index_path.namespace}/{index_path.index}"
data = {
"chunk_size": index_configuration.chunk_size,
"embedding_type": index_configuration.embedding_type,
}
response = requests.put(url, data=dumps(data), headers=self.headers)
self._raise_for_status(response)
[docs]
def index_configuration(self, index_path: IndexPath) -> IndexConfiguration:
"""Retrieve the configuration of an index in a namespace given its name.
Args:
index_path: Path to the index.
Returns:
Configuration of the index.
"""
url = f"{self._base_document_index_url}/indexes/{index_path.namespace}/{index_path.index}"
response = requests.get(url, headers=self.headers)
self._raise_for_status(response)
response_json: Mapping[str, Any] = response.json()
return IndexConfiguration(
embedding_type=response_json["embedding_type"],
chunk_size=response_json["chunk_size"],
)
[docs]
def assign_index_to_collection(
self, collection_path: CollectionPath, index_name: str
) -> None:
"""Assign an index to a collection.
Args:
collection_path: Path to the collection of interest.
index_name: Name of the index.
"""
url = f"{self._base_document_index_url}/collections/{collection_path.namespace}/{collection_path.collection}/indexes/{index_name}"
response = requests.put(url, headers=self.headers)
self._raise_for_status(response)
[docs]
def delete_index_from_collection(
self, collection_path: CollectionPath, index_name: str
) -> None:
"""Delete an index from a collection.
Args:
index_name: Name of the index.
collection_path: Path to the collection of interest.
"""
url = f"{self._base_document_index_url}/collections/{collection_path.namespace}/{collection_path.collection}/indexes/{index_name}"
response = requests.delete(url, headers=self.headers)
self._raise_for_status(response)
[docs]
def list_assigned_index_names(
self, collection_path: CollectionPath
) -> Sequence[str]:
"""List all indexes assigned to a collection.
Args:
collection_path: Path to the collection of interest.
Returns:
List of all indexes that are assigned to the collection.
"""
url = f"{self._base_document_index_url}/collections/{collection_path.namespace}/{collection_path.collection}/indexes"
response = requests.get(url, headers=self.headers)
self._raise_for_status(response)
return [str(index_name) for index_name in response.json()]
[docs]
def add_document(
self,
document_path: DocumentPath,
contents: DocumentContents,
) -> None:
"""Add a document to a collection.
Note:
If a document with the same `document_path` exists, it will be updated with the new `contents`.
Args:
document_path: Consists of `collection_path` and name of document to be created.
contents: Actual content of the document.
Currently only supports text.
"""
url = f"{self._base_document_index_url}/collections/{document_path.collection_path.namespace}/{document_path.collection_path.collection}/docs/{document_path.encoded_document_name()}"
response = requests.put(
url, data=dumps(contents._to_modalities_json()), headers=self.headers
)
self._raise_for_status(response)
[docs]
def delete_document(self, document_path: DocumentPath) -> None:
"""Delete a document from a collection.
Args:
document_path: Consists of `collection_path` and name of document to be deleted.
"""
url = f"{self._base_document_index_url}/collections/{document_path.collection_path.namespace}/{document_path.collection_path.collection}/docs/{document_path.encoded_document_name()}"
response = requests.delete(url, headers=self.headers)
self._raise_for_status(response)
[docs]
def document(self, document_path: DocumentPath) -> DocumentContents:
"""Retrieve a document from a collection.
Args:
document_path: Consists of `collection_path` and name of document to be retrieved.
Returns:
Content of the retrieved document.
"""
url = f"{self._base_document_index_url}/collections/{document_path.collection_path.namespace}/{document_path.collection_path.collection}/docs/{document_path.encoded_document_name()}"
response = requests.get(url, headers=self.headers)
self._raise_for_status(response)
return DocumentContents._from_modalities_json(response.json())
[docs]
def documents(
self,
collection_path: CollectionPath,
filter_query_params: Optional[DocumentFilterQueryParams] = None,
) -> Sequence[DocumentInfo]:
"""List all documents within a collection.
Note:
Does not return each document's content.
Args:
collection_path: Path to the collection of interest.
filter_query_params: Query parameters to filter the results.
Returns:
Overview of all documents within the collection.
"""
if filter_query_params is None:
filter_query_params = DocumentFilterQueryParams(
max_documents=None, starts_with=None
)
url = f"{self._base_document_index_url}/collections/{collection_path.namespace}/{collection_path.collection}/docs"
query_params = {}
if filter_query_params.max_documents:
query_params["max_documents"] = str(filter_query_params.max_documents)
if filter_query_params.starts_with:
query_params["starts_with"] = filter_query_params.starts_with
response = requests.get(url=url, params=query_params, headers=self.headers)
self._raise_for_status(response)
return [DocumentInfo.from_list_documents_response(r) for r in response.json()]
[docs]
def search(
self,
collection_path: CollectionPath,
index_name: str,
search_query: SearchQuery,
) -> Sequence[DocumentSearchResult]:
"""Search through a collection with a `search_query`.
Args:
collection_path: Path to the collection of interest.
index_name: Name of the index to search with.
search_query: The query to search with.
Returns:
Result of the search operation. Will be empty if nothing was retrieved.
"""
url = f"{self._base_document_index_url}/collections/{collection_path.namespace}/{collection_path.collection}/indexes/{index_name}/search"
data = {
"query": [{"modality": "text", "text": search_query.query}],
"max_results": search_query.max_results,
"min_score": search_query.min_score,
"filter": [{"with": [{"modality": "text"}]}],
}
response = requests.post(url, data=dumps(data), headers=self.headers)
self._raise_for_status(response)
return [DocumentSearchResult._from_search_response(r) for r in response.json()]
def _raise_for_status(self, response: requests.Response) -> None:
try:
response.raise_for_status()
except HTTPError as e:
exception_factory = _status_code_to_exception.get(
HTTPStatus(response.status_code), InternalError
)
raise exception_factory(
response.text, HTTPStatus(response.status_code)
) from e