intelligence_layer.connectors

Module contents

class intelligence_layer.connectors.AlephAlphaClientProtocol(*args, **kwargs)[source]

Bases: Protocol

class intelligence_layer.connectors.ArgillaClient[source]

Bases: ABC

Client interface for accessing an Argilla server.

Argilla supports human in the loop evaluation. This class defines the API used by the intelligence layer to create feedback datasets or retrieve evaluation results.

abstract add_record(dataset_id: str, record: RecordData) None[source]

Adds a new record to the given dataset.

Parameters:
  • dataset_id – id of the dataset the record is added to

  • record – the actual record data (i.e. content for the dataset’s fields)

add_records(dataset_id: str, records: Sequence[RecordData]) None[source]

Adds new records to the given dataset.

Parameters:
  • dataset_id – id of the dataset the record is added to

  • records – list containing the record data (i.e. content for the dataset’s fields)

abstract create_dataset(workspace_id: str, dataset_name: str, fields: Sequence[Any], questions: Sequence[Any]) str[source]

Creates and publishes a new feedback dataset in Argilla.

Raises an error if the name exists already.

Parameters:
  • workspace_id – the id of the workspace the feedback dataset should be created in. The user executing this request must have corresponding permissions for this workspace.

  • dataset_name – the name of the feedback-dataset to be created.

  • fields – all fields of this dataset.

  • questions – all questions for this dataset.

Returns:

The id of the created dataset.

abstract ensure_dataset_exists(workspace_id: str, dataset_name: str, fields: Sequence[Any], questions: Sequence[Any]) str[source]

Retrieves an existing dataset or creates and publishes a new feedback dataset in Argilla.

Parameters:
  • workspace_id – the id of the workspace the feedback dataset should be created in. The user executing this request must have corresponding permissions for this workspace.

  • dataset_name – the name of the feedback-dataset to be created.

  • fields – all fields of this dataset.

  • questions – all questions for this dataset.

Returns:

The id of the dataset to be retrieved .

abstract evaluations(dataset_id: str) Iterable[ArgillaEvaluation][source]

Returns all human-evaluated evaluations for the given dataset.

Parameters:

dataset_id – the id of the dataset.

Returns:

An Iterable over all human-evaluated evaluations for the given dataset.

abstract split_dataset(dataset_id: str, n_splits: int) None[source]

Adds a new metadata property to the dataset and assigns a split to each record.

Parameters:
  • dataset_id – the id of the dataset

  • n_splits – the number of splits to create

class intelligence_layer.connectors.ArgillaEvaluation(*, example_id: str, record_id: str, responses: Mapping[str, Any], metadata: Mapping[str, Any])[source]

Bases: BaseModel

The evaluation result for a single rating record in an Argilla feedback-dataset.

example_id

the id of the example that was evaluated.

Type:

str

record_id

the id of the record that is evaluated.

Type:

str

responses

Maps question-names (Question.name ) to response values.

Type:

collections.abc.Mapping[str, Any]

metadata

Metadata belonging to the evaluation, for example ids of completions.

Type:

collections.abc.Mapping[str, Any]

class intelligence_layer.connectors.ArgillaWrapperClient(api_url: str | None = None, api_key: str | None = None, disable_warnings: bool = True)[source]

Bases: ArgillaClient

add_record(dataset_id: str, record: RecordData) None[source]

Adds a new record to the given dataset.

Parameters:
  • dataset_id – id of the dataset the record is added to

  • record – the actual record data (i.e. content for the dataset’s fields)

add_records(dataset_id: str, records: Sequence[RecordData]) None[source]

Adds new records to the given dataset.

Parameters:
  • dataset_id – id of the dataset the record is added to

  • records – list containing the record data (i.e. content for the dataset’s fields)

create_dataset(workspace_id: str, dataset_name: str, fields: Sequence[TextField], questions: Sequence[TextQuestion | RatingQuestion | LabelQuestion | MultiLabelQuestion | RankingQuestion | SpanQuestion]) str[source]

Creates and publishes a new feedback dataset in Argilla.

Raises an error if the name exists already.

Parameters:
  • workspace_id – the name of the workspace the feedback dataset should be created in. The user executing this request must have corresponding permissions for this workspace.

  • dataset_name – the name of the feedback-dataset to be created.

  • fields – all fields of this dataset.

  • questions – all questions for this dataset.

Returns:

The id of the created dataset.

ensure_dataset_exists(workspace_id: str, dataset_name: str, fields: Sequence[TextField], questions: Sequence[TextQuestion | RatingQuestion | LabelQuestion | MultiLabelQuestion | RankingQuestion | SpanQuestion]) str[source]

Retrieves an existing dataset or creates and publishes a new feedback dataset in Argilla.

Parameters:
  • workspace_id – the name of the workspace the feedback dataset should be created in. The user executing this request must have corresponding permissions for this workspace.

  • dataset_name – the name of the feedback-dataset to be created.

  • fields – all fields of this dataset.

  • questions – all questions for this dataset.

Returns:

The id of the dataset to be retrieved .

ensure_workspace_exists(workspace_name: str) str[source]

Retrieves the name of an argilla workspace with specified name or creates a new workspace if necessary.

Parameters:

workspace_name – the name of the workspace to be retrieved or created.

Returns:

The name of an argilla workspace with the given workspace_name.

evaluations(dataset_id: str) Iterable[ArgillaEvaluation][source]

Returns all human-evaluated evaluations for the given dataset.

Parameters:

dataset_id – the id of the dataset.

Returns:

An Iterable over all human-evaluated evaluations for the given dataset.

split_dataset(dataset_id: str, n_splits: int) None[source]

Adds a new metadata property to the dataset and assigns a split to each record.

Deletes the property if n_splits is equal to one.

Parameters:
  • dataset_id – the id of the dataset

  • n_splits – the number of splits to create

class intelligence_layer.connectors.BaseRetriever[source]

Bases: ABC, Generic[ID]

General interface for any retriever.

Retrievers are used to find texts given a user query. Each Retriever implementation owns its own logic for retrieval. For comparison purposes, we assume scores in the SearchResult instances to be between 0 and 1.

class intelligence_layer.connectors.CollectionPath(*, namespace: str, collection: str)[source]

Bases: BaseModel

Path to a collection.

Parameters:
  • namespace – Holds collections.

  • collection – Holds documents. Unique within a namespace.

exception intelligence_layer.connectors.ConstraintViolation(message: str, status_code: HTTPStatus)[source]

Bases: DocumentIndexError

Raised when the request cannot be processed as it would lead to an inconsistent state.

class intelligence_layer.connectors.DefaultArgillaClient(api_url: str | None = None, api_key: str | None = None, total_retries: int = 5)[source]

Bases: ArgillaClient

add_record(dataset_id: str, record: RecordData) None[source]

Adds a new record to the given dataset.

Parameters:
  • dataset_id – id of the dataset the record is added to

  • record – the actual record data (i.e. content for the dataset’s fields)

add_records(dataset_id: str, records: Sequence[RecordData]) None[source]

Adds new records to the given dataset.

Parameters:
  • dataset_id – id of the dataset the record is added to

  • records – list containing the record data (i.e. content for the dataset’s fields)

create_dataset(workspace_id: str, dataset_name: str, fields: Sequence[Field], questions: Sequence[Question]) str[source]

Creates and publishes a new feedback dataset in Argilla.

Raises an error if the name exists already.

Parameters:
  • workspace_id – the id of the workspace the feedback dataset should be created in. The user executing this request must have corresponding permissions for this workspace.

  • dataset_name – the name of the feedback-dataset to be created.

  • fields – all fields of this dataset.

  • questions – all questions for this dataset.

Returns:

The id of the created dataset.

ensure_dataset_exists(workspace_id: str, dataset_name: str, fields: Sequence[Field], questions: Sequence[Question]) str[source]

Retrieves an existing dataset or creates and publishes a new feedback dataset in Argilla.

Parameters:
  • workspace_id – the id of the workspace the feedback dataset should be created in. The user executing this request must have corresponding permissions for this workspace.

  • dataset_name – the name of the feedback-dataset to be created.

  • fields – all fields of this dataset.

  • questions – all questions for this dataset.

Returns:

The id of the dataset to be retrieved .

ensure_workspace_exists(workspace_name: str) str[source]

Retrieves the id of an argilla workspace with specified name or creates a new workspace if necessary.

Parameters:

workspace_name – the name of the workspace to be retrieved or created.

Returns:

The id of an argilla workspace with the given workspace_name.

evaluations(dataset_id: str) Iterable[ArgillaEvaluation][source]

Returns all human-evaluated evaluations for the given dataset.

Parameters:

dataset_id – the id of the dataset.

Returns:

An Iterable over all human-evaluated evaluations for the given dataset.

split_dataset(dataset_id: str, n_splits: int) None[source]

Adds a new metadata property to the dataset and assigns a split to each record.

Parameters:
  • dataset_id – the id of the dataset

  • n_splits – the number of splits to create

class intelligence_layer.connectors.Document(*, text: str, metadata: Any = None)[source]

Bases: BaseModel

A document.

text

The document’s text.

Type:

str

metadata

Any metadata added to the document.

Type:

Any

class intelligence_layer.connectors.DocumentChunk(*, text: str, start: int, end: int, metadata: Any = None)[source]

Bases: BaseModel

Part of a Document, specifically for retrieval use cases.

text

Chunk of the document that matched the search query.

Type:

str

metadata

Any metadata added to the document.

Type:

Any

start

Start index of the chunk within the document

Type:

int

end

End index of the chunk within the document

Type:

int

class intelligence_layer.connectors.DocumentContents(*, contents: Sequence[str], metadata: JsonSerializable = None)[source]

Bases: BaseModel

Actual content of a document.

Note

Currently only supports text-only documents.

Parameters:
  • contents – List of text items.

  • metadata – Any metadata that is kept along with the document. This could contain things like author, creation-data, references to external systems. The content must be serializable using json.dumps. The document-index leaves it unchanged.

class intelligence_layer.connectors.DocumentIndexClient(token: str | None, base_document_index_url: str = 'https://document-index.aleph-alpha.com')[source]

Bases: object

Client for the Document Index allowing handling documents and search.

Document Index is a tool for managing collections of documents, enabling operations such as creation, deletion, listing, and searching. Documents can be stored either in the cloud or in a local deployment.

Parameters:
  • token – A valid token for the document index API.

  • base_document_index_url – The url of the document index’ API.

Example

>>> import os
>>> from intelligence_layer.connectors import (
...     CollectionPath,
...     DocumentContents,
...     DocumentIndexClient,
...     DocumentPath,
...     SearchQuery,
... )
>>> document_index = DocumentIndexClient(os.getenv("AA_TOKEN"))
>>> collection_path = CollectionPath(
...     namespace="aleph-alpha", collection="wikipedia-de"
... )
>>> document_index.create_collection(collection_path)
>>> document_index.add_document(
...     document_path=DocumentPath(
...         collection_path=collection_path, document_name="Fun facts about Germany"
...     ),
...     contents=DocumentContents.from_text("Germany is a country located in ..."),
... )
>>> search_result = document_index.search(
...     collection_path=collection_path,
...     index_name="asymmetric",
...     search_query=SearchQuery(
...         query="What is the capital of Germany", max_results=4, min_score=0.5
...     ),
... )
add_document(document_path: DocumentPath, contents: DocumentContents) None[source]

Add a document to a collection.

Note

If a document with the same document_path exists, it will be updated with the new contents.

Parameters:
  • document_path – Consists of collection_path and name of document to be created.

  • contents – Actual content of the document. Currently only supports text.

assign_index_to_collection(collection_path: CollectionPath, index_name: str) None[source]

Assign an index to a collection.

Parameters:
  • collection_path – Path to the collection of interest.

  • index_name – Name of the index.

create_collection(collection_path: CollectionPath) None[source]

Creates a collection at the path.

Note

Collection’s name must be unique within a namespace.

Parameters:

collection_path – Path to the collection of interest.

create_index(index_path: IndexPath, index_configuration: IndexConfiguration) None[source]

Creates an index in a namespace.

Parameters:
  • index_path – Path to the index.

  • index_configuration – Configuration of the index to be created.

delete_collection(collection_path: CollectionPath) None[source]

Deletes the collection at the path.

Parameters:

collection_path – Path to the collection of interest.

delete_document(document_path: DocumentPath) None[source]

Delete a document from a collection.

Parameters:

document_path – Consists of collection_path and name of document to be deleted.

delete_index_from_collection(collection_path: CollectionPath, index_name: str) None[source]

Delete an index from a collection.

Parameters:
  • index_name – Name of the index.

  • collection_path – Path to the collection of interest.

document(document_path: DocumentPath) DocumentContents[source]

Retrieve a document from a collection.

Parameters:

document_path – Consists of collection_path and name of document to be retrieved.

Returns:

Content of the retrieved document.

documents(collection_path: CollectionPath, filter_query_params: DocumentFilterQueryParams | None = None) Sequence[DocumentInfo][source]

List all documents within a collection.

Note

Does not return each document’s content.

Parameters:
  • collection_path – Path to the collection of interest.

  • filter_query_params – Query parameters to filter the results.

Returns:

Overview of all documents within the collection.

index_configuration(index_path: IndexPath) IndexConfiguration[source]

Retrieve the configuration of an index in a namespace given its name.

Parameters:

index_path – Path to the index.

Returns:

Configuration of the index.

list_assigned_index_names(collection_path: CollectionPath) Sequence[str][source]

List all indexes assigned to a collection.

Parameters:

collection_path – Path to the collection of interest.

Returns:

List of all indexes that are assigned to the collection.

list_collections(namespace: str) Sequence[CollectionPath][source]

Lists all collections within a namespace.

Parameters:

namespace – For a collection of documents. Typically corresponds to an organization.

Returns:

List of all CollectionPath instances in the given namespace.

list_namespaces() Sequence[str][source]

Lists all available namespaces.

Returns:

List of all available namespaces.

search(collection_path: CollectionPath, index_name: str, search_query: SearchQuery) Sequence[DocumentSearchResult][source]

Search through a collection with a search_query.

Parameters:
  • collection_path – Path to the collection of interest.

  • index_name – Name of the index to search with.

  • search_query – The query to search with.

Returns:

Result of the search operation. Will be empty if nothing was retrieved.

exception intelligence_layer.connectors.DocumentIndexError(message: str, status_code: HTTPStatus)[source]

Bases: RuntimeError

Raised in case of any DocumentIndexClient-related errors.

message

The error message as returned by the Document Index.

status_code

The http error code.

class intelligence_layer.connectors.DocumentIndexRetriever(document_index: DocumentIndexClient, index_name: str, namespace: str, collection: str, k: int, threshold: float = 0.5)[source]

Bases: BaseRetriever[DocumentPath]

Search through documents within collections in the DocumentIndexClient.

We initialize this Retriever with a collection & namespace names, and we can find the documents in the collection most semanticly similar to our query.

Parameters:
  • document_index – Client offering functionality for search.

  • index_name – The name of the index to be used.

  • namespace – The namespace within the DocumentIndexClient where all collections are stored.

  • collection – The collection within the namespace that holds the desired documents.

  • k – The (top) number of documents to be returned by search.

  • threshold – The mimumum value of cosine similarity between the query vector and the document vector.

Example: >>> import os >>> from intelligence_layer.connectors import DocumentIndexClient, DocumentIndexRetriever >>> document_index = DocumentIndexClient(os.getenv(“AA_TOKEN”)) >>> retriever = DocumentIndexRetriever(document_index, “asymmetric”, “aleph-alpha”, “wikipedia-de”, 3) >>> documents = retriever.get_relevant_documents_with_scores(“Who invented the airplane?”)

class intelligence_layer.connectors.DocumentInfo(*, document_path: DocumentPath, created: datetime, version: int)[source]

Bases: BaseModel

Presents an overview of a document.

Parameters:
  • document_path – Path to a document.

  • created – When this version of the document was created. Equivalent to when it was last updated.

  • version – How many times the document was updated.

class intelligence_layer.connectors.DocumentPath(*, collection_path: CollectionPath, document_name: str)[source]

Bases: BaseModel

Path to a document.

Parameters:
  • collection_path – Path to a collection.

  • document_name – Points to a document. Unique within a collection.

class intelligence_layer.connectors.DocumentSearchResult(*, document_path: DocumentPath, section: str, score: float, chunk_position: DocumentTextPosition)[source]

Bases: BaseModel

Result of a search query for one individual section.

Parameters:
  • document_path – Path to the document that the section originates from.

  • section – Actual section of the document that was found as a match to the query.

  • score – Actual search score of the section found. Generally, higher scores correspond to better matches. Will be between 0 and 1.

exception intelligence_layer.connectors.ExternalServiceUnavailable(message: str, status_code: HTTPStatus)[source]

Bases: DocumentIndexError

Raised in case external service is unavailable when the request is executed.

class intelligence_layer.connectors.Field(*, name: str, title: str)[source]

Bases: BaseModel

Definition of an Argilla feedback-dataset field.

name

The name of the field. This is used to reference the field in json-documents

Type:

str

title

The title of the field. This is displayed in the Argilla UI to users that perform the manual evaluations.

Type:

str

class intelligence_layer.connectors.IndexConfiguration(*, embedding_type: Literal['symmetric', 'asymmetric'], chunk_size: int)[source]

Bases: BaseModel

Configuration of an index.

Parameters:
  • embedding_type – “symmetric” or “asymmetric” embedding type.

  • chunk_size – The maximum size of the chunks in tokens to be used for the index.

class intelligence_layer.connectors.IndexPath(*, namespace: str, index: str)[source]

Bases: BaseModel

Path to an index.

Parameters:
  • namespace – Holds collections.

  • index – The name of the index, holds a config.

exception intelligence_layer.connectors.InternalError(message: str, status_code: HTTPStatus)[source]

Bases: DocumentIndexError

Raised in case of unexpected errors.

exception intelligence_layer.connectors.InvalidInput(message: str, status_code: HTTPStatus)[source]

Bases: DocumentIndexError

Raised when the user-input could not be processed as it violates pre-conditions.

class intelligence_layer.connectors.LimitedConcurrencyClient(client: AlephAlphaClientProtocol, max_concurrency: int = 10, max_retry_time: int = 86400)[source]

Bases: object

An Aleph Alpha Client wrapper that limits the number of concurrent requests.

This just delegates each call to the wrapped Aleph Alpha Client and ensures that never more than a given number of concurrent calls are executed against the API.

Parameters:
  • client – The wrapped Client.

  • max_concurrency – the maximal number of requests that may run concurrently against the API. Defaults to 10, which is also the maximum.

  • max_retry_time – the maximal time in seconds a complete is retried in case a BusyError is raised.

classmethod from_env(token: str | None = None, host: str | None = None) LimitedConcurrencyClient[source]

This is a helper method to construct your client with default settings from a token and host.

Parameters:
  • token – An Aleph Alpha token to instantiate the client. If no token is provided, this method tries to fetch it from the environment under the name of “AA_TOKEN”.

  • host – The host that is used for requests. If no token is provided, this method tries to fetch it from the environment under the naem of “CLIENT_URL”. If this is not present, it defaults to the Aleph Alpha Api. If you have an on premise setup, change this to your host URL.

class intelligence_layer.connectors.QdrantInMemoryRetriever(documents: Sequence[Document], k: int, client: AlephAlphaClientProtocol | None = None, threshold: float = 0.5, retriever_type: RetrieverType = RetrieverType.ASYMMETRIC, distance_metric: Distance = Distance.COSINE)[source]

Bases: BaseRetriever[int]

Search through documents stored in memory using semantic search.

This retriever uses a [Qdrant](https://github.com/qdrant/qdrant)-in-Memory vector store instance to store documents and their asymmetric embeddings. When run, the given query is embedded and scored against the document embeddings to retrieve the k-most similar matches by cosine similarity.

Parameters:
  • client – Aleph Alpha client instance for running model related API calls.

  • texts – The sequence of texts to be made searchable.

  • k – The (top) number of documents to be returned by search.

  • threshold – The mimumum value of cosine similarity between the query vector and the document vector.

  • retriever_type – The type of retriever to be instantiated. Should be ASYMMETRIC for most query-document retrieveal use cases, SYMMETRIC is optimized for similar document retrieval.

  • distance_metric – The distance metric to be used for vector comparison.

Example

>>> from intelligence_layer.connectors import LimitedConcurrencyClient, Document, QdrantInMemoryRetriever
>>> client = LimitedConcurrencyClient.from_env()
>>> documents = [Document(text=t) for t in ["I do not like rain.", "Summer is warm.", "We are so back."]]
>>> retriever = QdrantInMemoryRetriever(documents, 5, client=client)
>>> query = "Do you like summer?"
>>> documents = retriever.get_relevant_documents_with_scores(query)
get_filtered_documents_with_scores(query: str, filter: Filter) Sequence[SearchResult[int]][source]

Specific method for InMemoryRetriever to support filtering search results.

Parameters:
  • query – The text to be searched with.

  • filter – Conditions to filter by.

class intelligence_layer.connectors.Question(*, name: str, title: str, description: str, options: Sequence[int])[source]

Bases: BaseModel

Definition of an evaluation-question for an Argilla feedback dataset.

name

The name of the question. This is used to reference the questions in json-documents

Type:

str

title

The title of the field. This is displayed in the Argilla UI to users that perform the manual evaluations.

Type:

str

description

A more verbose description of the question. This is displayed in the Argilla UI to users that perform the manual evaluations.

Type:

str

options

All integer options to answer this question

Type:

collections.abc.Sequence[int]

class intelligence_layer.connectors.Record(*, content: Mapping[str, str], example_id: str, metadata: Mapping[str, str | int] = None, id: str)[source]

Bases: RecordData

Represents an Argilla record of an feedback-dataset.

Just adds the id to a RecordData

id

the Argilla generated id of the record.

Type:

str

class intelligence_layer.connectors.RecordData(*, content: Mapping[str, str], example_id: str, metadata: Mapping[str, str | int] = None)[source]

Bases: BaseModel

Input-data for a Argilla evaluation record.

This can be used to add a new record to an existing Argilla feedback-dataset. Once it is added it gets an Argilla provided id and can be retrieved as Record

content

Maps field-names (Field.name ) to string values that can be displayed to the user.

Type:

collections.abc.Mapping[str, str]

example_id

the id of the corresponding Example from a Dataset.

Type:

str

metadata

Arbitrary metadata in form of key/value strings that can be attached to a record.

Type:

collections.abc.Mapping[str, str | int]

exception intelligence_layer.connectors.ResourceNotFound(message: str, status_code: HTTPStatus)[source]

Bases: DocumentIndexError

Raised when a resource like a namespace or a document cannot be found.

Note that this can also mean that the user executing the request does not have permission to access the resource.

class intelligence_layer.connectors.RetrieverType(value, names=None, *, module=None, qualname=None, type=None, start=1, boundary=None)[source]

Bases: Enum

Specify the type of retriever to instantiate.

ASYMMETRIC

Query is embedded as Query and each document as Document.

SYMMETRIC

Both query and documents will be embedded as Symmetric.

class intelligence_layer.connectors.SearchQuery(*, query: str, max_results: int, min_score: float)[source]

Bases: BaseModel

Query to search through a collection with.

Parameters:
  • query – Actual text to be searched with.

  • max_results – Max number of search results to be retrieved by the query. Must be larger than 0.

  • min_score – Min score needed for a search result to be returned. Must be between 0 and 1.

class intelligence_layer.connectors.SearchResult(*, id: ID, score: float, document_chunk: DocumentChunk)[source]

Bases: BaseModel, Generic[ID]

Contains a text alongside its search score.

id

Unique identifier of the document

Type:

intelligence_layer.connectors.retrievers.base_retriever.ID

score

The similarity score between the text and the query that was searched with. Will be between 0 and 1, where 0 means no similarity and 1 perfect similarity.

Type:

float

document_chunk

The document chunk found by search.

Type:

intelligence_layer.connectors.retrievers.base_retriever.DocumentChunk