intelligence_layer.connectors

Module contents

class intelligence_layer.connectors.AlephAlphaClientProtocol(*args, **kwargs)[source]

Bases: Protocol

class intelligence_layer.connectors.ArgillaClient[source]

Bases: ABC

Client interface for accessing an Argilla server.

Argilla supports human in the loop evaluation. This class defines the API used by the intelligence layer to create feedback datasets or retrieve evaluation results.

abstract add_record(dataset_id: str, record: RecordData) None[source]

Adds a new record to the given dataset.

Parameters:
  • dataset_id – id of the dataset the record is added to

  • record – the actual record data (i.e. content for the dataset’s fields)

add_records(dataset_id: str, records: Sequence[RecordData]) None[source]

Adds new records to the given dataset.

Parameters:
  • dataset_id – id of the dataset the record is added to

  • records – list containing the record data (i.e. content for the dataset’s fields)

abstract create_dataset(workspace_id: str, dataset_name: str, fields: Sequence[Any], questions: Sequence[Any]) str[source]

Creates and publishes a new feedback dataset in Argilla.

Raises an error if the name exists already.

Parameters:
  • workspace_id – the id of the workspace the feedback dataset should be created in. The user executing this request must have corresponding permissions for this workspace.

  • dataset_name – the name of the feedback-dataset to be created.

  • fields – all fields of this dataset.

  • questions – all questions for this dataset.

Returns:

The id of the created dataset.

abstract ensure_dataset_exists(workspace_id: str, dataset_name: str, fields: Sequence[Any], questions: Sequence[Any]) str[source]

Retrieves an existing dataset or creates and publishes a new feedback dataset in Argilla.

Parameters:
  • workspace_id – the id of the workspace the feedback dataset should be created in. The user executing this request must have corresponding permissions for this workspace.

  • dataset_name – the name of the feedback-dataset to be created.

  • fields – all fields of this dataset.

  • questions – all questions for this dataset.

Returns:

The id of the dataset to be retrieved .

abstract evaluations(dataset_id: str) Iterable[ArgillaEvaluation][source]

Returns all human-evaluated evaluations for the given dataset.

Parameters:

dataset_id – the id of the dataset.

Returns:

An Iterable over all human-evaluated evaluations for the given dataset.

abstract split_dataset(dataset_id: str, n_splits: int) None[source]

Adds a new metadata property to the dataset and assigns a split to each record.

Parameters:
  • dataset_id – the id of the dataset

  • n_splits – the number of splits to create

class intelligence_layer.connectors.ArgillaEvaluation(*, example_id: str, record_id: str, responses: Mapping[str, Any], metadata: Mapping[str, Any])[source]

Bases: BaseModel

The evaluation result for a single rating record in an Argilla feedback-dataset.

example_id

the id of the example that was evaluated.

Type:

str

record_id

the id of the record that is evaluated.

Type:

str

responses

Maps question-names (Question.name ) to response values.

Type:

collections.abc.Mapping[str, Any]

metadata

Metadata belonging to the evaluation, for example ids of completions.

Type:

collections.abc.Mapping[str, Any]

class intelligence_layer.connectors.ArgillaWrapperClient(api_url: str | None = None, api_key: str | None = None, disable_warnings: bool = True)[source]

Bases: ArgillaClient

add_record(dataset_id: str, record: RecordData) None[source]

Adds a new record to the given dataset.

Parameters:
  • dataset_id – id of the dataset the record is added to

  • record – the actual record data (i.e. content for the dataset’s fields)

add_records(dataset_id: str, records: Sequence[RecordData]) None[source]

Adds new records to the given dataset.

Parameters:
  • dataset_id – id of the dataset the record is added to

  • records – list containing the record data (i.e. content for the dataset’s fields)

create_dataset(workspace_name: str, dataset_name: str, fields: Sequence[TextField], questions: Sequence[LabelQuestion | MultiLabelQuestion | RankingQuestion | TextQuestion | RatingQuestion | SpanQuestion]) str[source]

Creates and publishes a new feedback dataset in Argilla.

Raises an error if the name exists already.

Parameters:
  • workspace_name – the name of the workspace the feedback dataset should be created in. The user executing this request must have corresponding permissions for this workspace.

  • dataset_name – the name of the feedback-dataset to be created.

  • fields – all fields of this dataset.

  • questions – all questions for this dataset.

Returns:

The id of the created dataset.

ensure_dataset_exists(workspace_name: str, dataset_name: str, fields: Sequence[TextField], questions: Sequence[LabelQuestion | MultiLabelQuestion | RankingQuestion | TextQuestion | RatingQuestion | SpanQuestion]) str[source]

Retrieves an existing dataset or creates and publishes a new feedback dataset in Argilla.

Parameters:
  • workspace_name – the name of the workspace the feedback dataset should be created in. The user executing this request must have corresponding permissions for this workspace.

  • dataset_name – the name of the feedback-dataset to be created.

  • fields – all fields of this dataset.

  • questions – all questions for this dataset.

Returns:

The id of the dataset to be retrieved .

ensure_workspace_exists(workspace_name: str) str[source]

Retrieves the name of an argilla workspace with specified name or creates a new workspace if necessary.

Parameters:

workspace_name – the name of the workspace to be retrieved or created.

Returns:

The name of an argilla workspace with the given workspace_name.

evaluations(dataset_id: str) Iterable[ArgillaEvaluation][source]

Returns all human-evaluated evaluations for the given dataset.

Parameters:

dataset_id – the id of the dataset.

Returns:

An Iterable over all human-evaluated evaluations for the given dataset.

split_dataset(dataset_id: str, n_splits: int) None[source]

Adds a new metadata property to the dataset and assigns a split to each record.

Deletes the property if n_splits is equal to one.

Parameters:
  • dataset_id – the id of the dataset

  • n_splits – the number of splits to create

class intelligence_layer.connectors.BaseRetriever[source]

Bases: ABC, Generic[ID]

General interface for any retriever.

Retrievers are used to find texts given a user query. Each Retriever implementation owns its own logic for retrieval. For comparison purposes, we assume scores in the SearchResult instances to be between 0 and 1.

class intelligence_layer.connectors.CollectionPath(*, namespace: str, collection: str)[source]

Bases: BaseModel

Path to a collection.

Parameters:
  • namespace – Holds collections.

  • collection – Holds documents. Unique within a namespace.

exception intelligence_layer.connectors.ConstraintViolation(message: str, status_code: HTTPStatus)[source]

Bases: DocumentIndexError

Raised when the request cannot be processed as it would lead to an inconsistent state.

class intelligence_layer.connectors.DataClient(token: str | None, base_data_platform_url: str = 'http://localhost:8000', session: Session | None = None)[source]

Bases: object

Client to interact with the Data Platform API.

headers

headers used in the request session

create_dataset(repository_id: str, dataset: DatasetCreate) DataDataset[source]

Create a new dataset in a repository.

Parameters:
  • repository_id – Repository ID

  • dataset – :DatasetCreate object

Returns:

DataDataset new object

create_repository(repository: DataRepositoryCreate) DataRepository[source]

Create a new repository.

Parameters:

repository – DataRepositoryCreate object

Returns:

DataRepository new object

create_stage(stage: DataStageCreate) DataStage[source]

Create a new stage.

Parameters:

stage – DataStageCreate object

Returns:

DataStage new object

delete_dataset(repository_id: str, dataset_id: str) None[source]

Delete a dataset by ID.

Parameters:
  • repository_id – Repository ID

  • dataset_id – DataDataset ID

get_dataset(repository_id: str, dataset_id: str) DataDataset[source]

Get a dataset by ID.

Parameters:
  • repository_id – Repository ID

  • dataset_id – DataDataset ID

Returns:

DataDataset object

get_file_from_stage(stage_id: str, file_id: str) BytesIO[source]

Get a file from a stage.

Parameters:
  • stage_id – Stage ID

  • file_id – File ID

Returns:

File bytes

get_repository(repository_id: str) DataRepository[source]

Get a repository by ID.

Parameters:

repository_id – Repository ID

Returns:

DataRepository object

get_stage(stage_id: str) DataStage[source]

Get a stage by ID.

Parameters:

stage_id – Stage ID

Returns:

DataStage object

list_datasets(repository_id: str, page: int = 0, size: int = 20) list[DataDataset][source]

List all the datasets in a repository.

Parameters:
  • repository_id – Repository ID

  • page – Page number. Defaults to 0

  • size – Number of items per page. Defaults to 20

Returns:

List of DataDataset from a given repository

list_files_in_stage(stage_id: str, page: int = 0, size: int = 20) list[DataFile][source]

List all the files in a stage.

Parameters:
  • stage_id – Stage ID

  • page – Page number. Defaults to 0

  • size – Number of items per page. Defaults to 20

Returns:

List of DataFile objects

list_repositories(page: int = 0, size: int = 20) list[DataRepository][source]

List all the repositories.

Parameters:
  • page – Page number. Defaults to 0

  • size – Number of items per page. Defaults to 20

Returns:

List of DataRepository objects

list_stages(page: int = 0, size: int = 20) list[DataStage][source]

List all the stages.

Parameters:
  • page – Page number. Defaults to 0

  • size – Number of items per page. Defaults to 20

Returns:

List of DataStage objects

stream_dataset(repository_id: str, dataset_id: str) Iterator[Any][source]

Stream the data points of a dataset.

Parameters:
  • repository_id – Repository ID

  • dataset_id – DataDataset ID

Returns:

class Iterator of datapoints(Any)

upload_file_to_stage(stage_id: str, file: DataFileCreate) DataFile[source]

Upload a file to a stage.

Parameters:
  • stage_id – Stage ID

  • file – DataFileCreate object

Returns:

DataFile new object

class intelligence_layer.connectors.DataDataset(*, repositoryId: str, datasetId: str, name: str | None = None, labels: list[str] | None = None, totalDatapoints: int, metadata: dict[str, Any] | None = None, createdAt: datetime, updatedAt: datetime)[source]

Bases: BaseDataModel

Dataset model.

Attributes: repository_id: Repository ID that identifies the repository(group of datasets) dataset_id: Dataset ID that identifies the dataset name: Name of the dataset labels: List of labels of the dataset total_datapoints: Total number of units in the dataset metadata: Metadata of the dataset created_at: Datetime when the dataset was created updated_at: Datetime when the dataset was updated

exception intelligence_layer.connectors.DataExternalServiceUnavailable(*args: object)[source]

Bases: DataError

Exception raised when an external service is unavailable.

add_note()

Exception.add_note(note) – add a note to the exception

with_traceback()

Exception.with_traceback(tb) – set self.__traceback__ to tb and return self.

class intelligence_layer.connectors.DataFile(*, fileId: str, stageId: str, name: str, createdAt: datetime, updatedAt: datetime, mediaType: str, size: int)[source]

Bases: BaseDataModel

class intelligence_layer.connectors.DataFileCreate(*, sourceData: BufferedReader | bytes, name: str)[source]

Bases: BaseDataModel

exception intelligence_layer.connectors.DataForbiddenError(*args: object)[source]

Bases: DataError

Exception raised when a forbidden error occurs.

add_note()

Exception.add_note(note) – add a note to the exception

with_traceback()

Exception.with_traceback(tb) – set self.__traceback__ to tb and return self.

exception intelligence_layer.connectors.DataInternalError(*args: object)[source]

Bases: DataError

Exception raised when an internal error occurs.

add_note()

Exception.add_note(note) – add a note to the exception

with_traceback()

Exception.with_traceback(tb) – set self.__traceback__ to tb and return self.

exception intelligence_layer.connectors.DataInvalidInput(*args: object)[source]

Bases: DataError

Exception raised when the input is invalid.

add_note()

Exception.add_note(note) – add a note to the exception

with_traceback()

Exception.with_traceback(tb) – set self.__traceback__ to tb and return self.

class intelligence_layer.connectors.DataRepository(*, repositoryId: str, name: str, mutable: bool, mediaType: Annotated[str, AfterValidator(func=media_type_validator)], modality: Modality, createdAt: datetime, updatedAt: datetime)[source]

Bases: BaseDataModel

Data Repository model.

Attributes: repository_id: Repository ID that identifies the repository(group of datasets) name: Name of the repository mutable: Indicates if the datasets in the repository are mutable or not media_type: Media type of the data: application/json, application/csv, etc. modality: Modality of the data: image, text, etc. created_at: Datetime when the repository was created updated_at: Datetime when the repository was updated

class intelligence_layer.connectors.DataRepositoryCreate(*, name: str, mediaType: Annotated[str, AfterValidator(func=media_type_validator)], modality: Modality)[source]

Bases: BaseDataModel

Data Repository creation model.

Attributes: name: Name of the repository media_type: Media type of the data: application/json, application/csv, etc. modality: Modality of the data: image, text, etc.

exception intelligence_layer.connectors.DataResourceNotFound(*args: object)[source]

Bases: DataError

Exception raised when a resource is not found.

add_note()

Exception.add_note(note) – add a note to the exception

with_traceback()

Exception.with_traceback(tb) – set self.__traceback__ to tb and return self.

class intelligence_layer.connectors.DataStage(*, stageId: str, name: str, createdAt: datetime, updatedAt: datetime)[source]

Bases: BaseDataModel

Stage model.

Attributes: stage_id: Stage ID that identifies the stage name: Name of the stage created_at: Datetime when the stage was created updated_at: Datetime when the stage was updated

class intelligence_layer.connectors.DataStageCreate(*, name: str)[source]

Bases: BaseDataModel

Stage creation model.

Attributes: name: Name of the stage

class intelligence_layer.connectors.DatasetCreate(*, sourceData: BufferedReader | bytes, name: str | None = None, labels: list[str], totalDatapoints: int, metadata: dict[str, Any] | None = None)[source]

Bases: BaseDataModel

Dataset creation model.

Attributes: source_data: Source data of the dataset in bytes(file like object) name: Name of the dataset labels: List of labels of the dataset total_datapoints: Total number of units in the dataset metadata: Metadata of the dataset

class intelligence_layer.connectors.Document(*, text: str, metadata: Any = None)[source]

Bases: BaseModel

A document.

text

The document’s text.

Type:

str

metadata

Any metadata added to the document.

Type:

Any

class intelligence_layer.connectors.DocumentChunk(*, text: str, start: int, end: int, metadata: Any = None)[source]

Bases: BaseModel

Part of a Document, specifically for retrieval use cases.

text

Chunk of the document that matched the search query.

Type:

str

metadata

Any metadata added to the document.

Type:

Any

start

Start index of the chunk within the document

Type:

int

end

End index of the chunk within the document

Type:

int

class intelligence_layer.connectors.DocumentContents(*, contents: Sequence[str], metadata: JsonSerializable = None)[source]

Bases: BaseModel

Actual content of a document.

Note

Currently only supports text-only documents.

Parameters:
  • contents – List of text items.

  • metadata – Any metadata that is kept along with the document. This could contain things like author, creation-data, references to external systems. The content must be serializable using json.dumps. The document-index leaves it unchanged.

class intelligence_layer.connectors.DocumentIndexClient(token: str | None, base_document_index_url: str = 'https://document-index.aleph-alpha.com')[source]

Bases: object

Client for the Document Index allowing handling documents and search.

Document Index is a tool for managing collections of documents, enabling operations such as creation, deletion, listing, and searching. Documents can be stored either in the cloud or in a local deployment.

Parameters:
  • token – A valid token for the document index API.

  • base_document_index_url – The url of the document index’ API.

Example

>>> import os
>>> from intelligence_layer.connectors import (
...     CollectionPath,
...     DocumentContents,
...     DocumentIndexClient,
...     DocumentPath,
...     SearchQuery,
... )
>>> document_index = DocumentIndexClient(os.getenv("AA_TOKEN"))
>>> collection_path = CollectionPath(
...     namespace="aleph-alpha", collection="wikipedia-de"
... )
>>> document_index.create_collection(collection_path)
>>> document_index.add_document(
...     document_path=DocumentPath(
...         collection_path=collection_path, document_name="Fun facts about Germany"
...     ),
...     contents=DocumentContents.from_text("Germany is a country located in ..."),
... )
>>> search_result = document_index.search(
...     collection_path=collection_path,
...     index_name="asymmetric",
...     search_query=SearchQuery(
...         query="What is the capital of Germany", max_results=4, min_score=0.5
...     ),
... )
add_document(document_path: DocumentPath, contents: DocumentContents) None[source]

Add a document to a collection.

Note

If a document with the same document_path exists, it will be updated with the new contents.

Parameters:
  • document_path – Consists of collection_path and name of document to be created.

  • contents – Actual content of the document. Currently only supports text.

assign_filter_index_to_search_index(collection_path: CollectionPath, index_name: str, filter_index_name: str) None[source]

Assign an existing filter index to an assigned search index.

Parameters:
  • collection_path – Path to the collection of interest.

  • index_name – Name of the index to assign the filter index to.

  • filter_index_name – Name of the filter index.

assign_index_to_collection(collection_path: CollectionPath, index_name: str) None[source]

Assign an index to a collection.

Parameters:
  • collection_path – Path to the collection of interest.

  • index_name – Name of the index.

create_collection(collection_path: CollectionPath) None[source]

Creates a collection at the path.

Note

Collection’s name must be unique within a namespace.

Parameters:

collection_path – Path to the collection of interest.

create_filter_index_in_namespace(namespace: str, filter_index_name: str, field_name: str, field_type: Literal['string', 'integer', 'float', 'boolean', 'datetime']) None[source]

Create a filter index in a specified namespace.

Parameters:
  • namespace – The namespace in which to create the filter index.

  • filter_index_name – The name of the filter index to create.

  • field_name – The name of the field to index.

  • field_type – The type of the field to index.

create_index(index_path: IndexPath, index_configuration: IndexConfiguration) None[source]

Creates an index in a namespace.

Parameters:
  • index_path – Path to the index.

  • index_configuration – Configuration of the index to be created.

delete_collection(collection_path: CollectionPath) None[source]

Deletes the collection at the path.

Parameters:

collection_path – Path to the collection of interest.

delete_document(document_path: DocumentPath) None[source]

Delete a document from a collection.

Parameters:

document_path – Consists of collection_path and name of document to be deleted.

delete_filter_index_from_namespace(namespace: str, filter_index_name: str) None[source]

Delete a filter index from a namespace.

Parameters:
  • namespace – The namespace to delete the filter index from.

  • filter_index_name – The name of the filter index to delete.

delete_index(index_path: IndexPath) None[source]

Delete an index in a namespace.

Parameters:

index_path – Path to the index.

delete_index_from_collection(collection_path: CollectionPath, index_name: str) None[source]

Delete an index from a collection.

Parameters:
  • collection_path – Path to the collection of interest.

  • index_name – Name of the index.

document(document_path: DocumentPath) DocumentContents[source]

Retrieve a document from a collection.

Parameters:

document_path – Consists of collection_path and name of document to be retrieved.

Returns:

Content of the retrieved document.

documents(collection_path: CollectionPath, filter_query_params: DocumentFilterQueryParams | None = None) Sequence[DocumentInfo][source]

List all documents within a collection.

Note

Does not return each document’s content.

Parameters:
  • collection_path – Path to the collection of interest.

  • filter_query_params – Query parameters to filter the results.

Returns:

Overview of all documents within the collection.

index_configuration(index_path: IndexPath) IndexConfiguration[source]

Retrieve the configuration of an index in a namespace given its name.

Parameters:

index_path – Path to the index.

Returns:

Configuration of the index.

list_assigned_filter_index_names(collection_path: CollectionPath, index_name: str) Sequence[str][source]

List all filter-indexes assigned to a search index in a collection.

Parameters:
  • collection_path – Path to the collection of interest.

  • index_name – Search index to check.

Returns:

List of all filter-indexes that are assigned to the collection.

list_assigned_index_names(collection_path: CollectionPath) Sequence[str][source]

List all indexes assigned to a collection.

Parameters:

collection_path – Path to the collection of interest.

Returns:

List of all indexes that are assigned to the collection.

list_collections(namespace: str) Sequence[CollectionPath][source]

Lists all collections within a namespace.

Parameters:

namespace – For a collection of documents. Typically corresponds to an organization.

Returns:

List of all CollectionPath instances in the given namespace.

list_filter_indexes_in_namespace(namespace: str) Sequence[str][source]

List all filter indexes in a namespace.

Parameters:

namespace – The namespace to list filter indexes in.

Returns:

List of all filter indexes in the namespace.

list_indexes(namespace: str) Sequence[IndexPath][source]

Lists all indexes within a namespace.

Parameters:

namespace – For a collection of documents. Typically corresponds to an organization.

Returns:

List of all IndexPath instances in the given namespace.

list_namespaces() Sequence[str][source]

Lists all available namespaces.

Returns:

List of all available namespaces.

progress(collection_path: CollectionPath) int[source]

Get the number of unembedded documents in a collection.

Parameters:

collection_path – Path to the collection of interest.

Returns:

The number of unembedded documents in a collection.

search(collection_path: CollectionPath, index_name: str, search_query: SearchQuery) Sequence[DocumentSearchResult][source]

Search through a collection with a search_query.

Parameters:
  • collection_path – Path to the collection of interest.

  • index_name – Name of the index to search with.

  • search_query – The query to search with.

Returns:

Result of the search operation. Will be empty if nothing was retrieved.

unassign_filter_index_from_search_index(collection_path: CollectionPath, index_name: str, filter_index_name: str) None[source]

Unassign a filter index from an assigned search index.

Parameters:
  • collection_path – Path to the collection of interest.

  • index_name – Name of the index to unassign the filter index from.

  • filter_index_name – Name of the filter index.

exception intelligence_layer.connectors.DocumentIndexError(message: str, status_code: HTTPStatus)[source]

Bases: RuntimeError

Raised in case of any DocumentIndexClient-related errors.

message

The error message as returned by the Document Index.

status_code

The http error code.

class intelligence_layer.connectors.DocumentIndexRetriever(document_index: DocumentIndexClient, index_name: str, namespace: str, collection: str, k: int = 1, threshold: float = 0.0)[source]

Bases: BaseRetriever[DocumentPath]

Search through documents within collections in the DocumentIndexClient.

This retriever lets you search for relevant documents in the given Document Index collection.

Example: >>> import os >>> from intelligence_layer.connectors import DocumentIndexClient, DocumentIndexRetriever >>> document_index = DocumentIndexClient(os.getenv(“AA_TOKEN”)) >>> retriever = DocumentIndexRetriever(document_index, “asymmetric”, “aleph-alpha”, “wikipedia-de”, 3) >>> documents = retriever.get_relevant_documents_with_scores(“Who invented the airplane?”)

class intelligence_layer.connectors.DocumentInfo(*, document_path: DocumentPath, created: datetime, version: int)[source]

Bases: BaseModel

Presents an overview of a document.

Parameters:
  • document_path – Path to a document.

  • created – When this version of the document was created. Equivalent to when it was last updated.

  • version – How many times the document was updated.

class intelligence_layer.connectors.DocumentPath(*, collection_path: CollectionPath, document_name: str)[source]

Bases: BaseModel

Path to a document.

Parameters:
  • collection_path – Path to a collection.

  • document_name – Points to a document. Unique within a collection.

class intelligence_layer.connectors.DocumentSearchResult(*, document_path: DocumentPath, section: str, score: float, chunk_position: DocumentTextPosition)[source]

Bases: BaseModel

Result of a search query for one individual section.

Parameters:
  • document_path – Path to the document that the section originates from.

  • section – Actual section of the document that was found as a match to the query.

  • score – Search score of the found section. Will be between 0 and 1. Higher scores correspond to higher matches. The score depends on the index configuration, e.g. the score of a section differs for hybrid and non-hybrid indexes. For searches on hybrid indexes, the score can exceed the min_score of the query as the min_score only applies to the similarity score.

exception intelligence_layer.connectors.ExternalServiceUnavailable(message: str, status_code: HTTPStatus)[source]

Bases: DocumentIndexError

Raised in case external service is unavailable when the request is executed.

class intelligence_layer.connectors.FilterField(*, field_name: ~typing.Annotated[str, ~pydantic.types.StringConstraints(strip_whitespace=None, to_upper=None, to_lower=None, strict=None, min_length=None, max_length=1000, pattern=^[\w-]+(\.\d{0,5})?[\w-]*$)], field_value: str | int | float | bool | ~datetime.datetime, criteria: ~intelligence_layer.connectors.document_index.document_index.FilterOps)[source]

Bases: BaseModel

Represents a field to filter on in the DocumentIndex metadata.

classmethod validate_and_convert_datetime(v: str | int | float | bool | datetime) str | int | float | bool[source]

Validate field_value and convert datetime to RFC3339 format with Z suffix.

Parameters:

v – The value to be validated and converted. # noqa: DAR102: + cls

Returns:

The validated and converted value.

class intelligence_layer.connectors.FilterOps(value, names=None, *, module=None, qualname=None, type=None, start=1, boundary=None)[source]

Bases: Enum

Enumeration of possible filter operations.

class intelligence_layer.connectors.Filters(*, filter_type: Literal['with', 'without', 'with_one_of'], fields: list[FilterField])[source]

Bases: BaseModel

Represents a set of filters to apply to a search query.

class intelligence_layer.connectors.IndexConfiguration(*, embedding_type: Literal['symmetric', 'asymmetric'], chunk_overlap: Annotated[int, Ge(ge=0)] = 0, chunk_size: Annotated[int, Gt(gt=0), Le(le=2046)], hybrid_index: Literal['bm25'] | None = None)[source]

Bases: BaseModel

Configuration of an index.

Parameters:
  • embedding_type – “symmetric” or “asymmetric” embedding type.

  • chunk_overlap – The maximum number of tokens of overlap between consecutive chunks. Must be less than chunk_size.

  • chunk_size – The maximum size of the chunks in tokens to be used for the index.

  • hybrid_index – If set to “bm25”, combine vector search and keyword search (bm25) results.

class intelligence_layer.connectors.IndexPath(*, namespace: str, index: str)[source]

Bases: BaseModel

Path to an index.

Parameters:
  • namespace – Holds collections.

  • index – The name of the index, holds a config.

exception intelligence_layer.connectors.InternalError(message: str, status_code: HTTPStatus)[source]

Bases: DocumentIndexError

Raised in case of unexpected errors.

exception intelligence_layer.connectors.InvalidInput(message: str, status_code: HTTPStatus)[source]

Bases: DocumentIndexError

Raised when the user-input could not be processed as it violates pre-conditions.

class intelligence_layer.connectors.LimitedConcurrencyClient(client: AlephAlphaClientProtocol, max_concurrency: int = 10, max_retry_time: int = 180)[source]

Bases: object

An Aleph Alpha Client wrapper that limits the number of concurrent requests.

This just delegates each call to the wrapped Aleph Alpha Client and ensures that never more than a given number of concurrent calls are executed against the API.

Parameters:
  • client – The wrapped Client.

  • max_concurrency – the maximal number of requests that may run concurrently against the API. Defaults to 10.

  • max_retry_time – the maximal time in seconds a complete is retried in case a BusyError is raised.

classmethod from_env(token: str | None = None, host: str | None = None) LimitedConcurrencyClient[source]

This is a helper method to construct your client with default settings from a token and host.

Parameters:
  • token – An Aleph Alpha token to instantiate the client. If no token is provided, this method tries to fetch it from the environment under the name of “AA_TOKEN”.

  • host – The host that is used for requests. If no token is provided, this method tries to fetch it from the environment under the name of “CLIENT_URL”. If this is not present, it defaults to the Aleph Alpha Api. If you have an on premise setup, change this to your host URL.

Returns:

A LimitedConcurrencyClient

class intelligence_layer.connectors.QdrantInMemoryRetriever(documents: Sequence[Document], k: int, client: AlephAlphaClientProtocol | None = None, threshold: float = 0.5, retriever_type: RetrieverType = RetrieverType.ASYMMETRIC, distance_metric: Distance = Distance.COSINE)[source]

Bases: BaseRetriever[int]

Search through documents stored in memory using semantic search.

This retriever uses a [Qdrant](https://github.com/qdrant/qdrant)-in-Memory vector store instance to store documents and their asymmetric embeddings. When run, the given query is embedded and scored against the document embeddings to retrieve the k-most similar matches by cosine similarity.

Parameters:
  • documents – The sequence of documents to be made searchable.

  • k – The (top) number of documents to be returned by search.

  • client – Aleph Alpha client instance for running model related API calls.

  • threshold – The mimumum value of cosine similarity between the query vector and the document vector.

  • retriever_type – The type of retriever to be instantiated. Should be ASYMMETRIC for most query-document retrieveal use cases, SYMMETRIC is optimized for similar document retrieval.

  • distance_metric – The distance metric to be used for vector comparison.

Example

>>> from intelligence_layer.connectors import LimitedConcurrencyClient, Document, QdrantInMemoryRetriever
>>> client = LimitedConcurrencyClient.from_env()
>>> documents = [Document(text=t) for t in ["I do not like rain.", "Summer is warm.", "We are so back."]]
>>> retriever = QdrantInMemoryRetriever(documents, 5, client=client)
>>> query = "Do you like summer?"
>>> documents = retriever.get_relevant_documents_with_scores(query)
get_filtered_documents_with_scores(query: str, filter: Filter) Sequence[SearchResult[int]][source]

Specific method for InMemoryRetriever to support filtering search results.

Parameters:
  • query – The text to be searched with.

  • filter – Conditions to filter by.

Returns:

All documents that correspond to the query and pass the filter.

class intelligence_layer.connectors.Record(*, content: Mapping[str, str], example_id: str, metadata: Mapping[str, str | int] = None, id: str)[source]

Bases: RecordData

Represents an Argilla record of an feedback-dataset.

Just adds the id to a RecordData

id

the Argilla generated id of the record.

Type:

str

class intelligence_layer.connectors.RecordData(*, content: Mapping[str, str], example_id: str, metadata: Mapping[str, str | int] = None)[source]

Bases: BaseModel

Input-data for a Argilla evaluation record.

This can be used to add a new record to an existing Argilla feedback-dataset. Once it is added it gets an Argilla provided id and can be retrieved as Record

content

Maps field-names (Field.name ) to string values that can be displayed to the user.

Type:

collections.abc.Mapping[str, str]

example_id

the id of the corresponding Example from a Dataset.

Type:

str

metadata

Arbitrary metadata in form of key/value strings that can be attached to a record.

Type:

collections.abc.Mapping[str, str | int]

exception intelligence_layer.connectors.ResourceNotFound(message: str, status_code: HTTPStatus)[source]

Bases: DocumentIndexError

Raised when a resource like a namespace or a document cannot be found.

Note that this can also mean that the user executing the request does not have permission to access the resource.

class intelligence_layer.connectors.RetrieverType(value, names=None, *, module=None, qualname=None, type=None, start=1, boundary=None)[source]

Bases: Enum

Specify the type of retriever to instantiate.

ASYMMETRIC

Query is embedded as Query and each document as Document.

SYMMETRIC

Both query and documents will be embedded as Symmetric.

class intelligence_layer.connectors.SearchQuery(*, query: str, max_results: Annotated[int, Ge(ge=0)] = 1, min_score: Annotated[float, Ge(ge=0.0), Le(le=1.0)] = 0.0, filters: list[Filters] | None = None)[source]

Bases: BaseModel

Query to search through a collection with.

Parameters:
  • query – Actual text to be searched with.

  • max_results – Max number of search results to be retrieved by the query. Must be larger than 0.

  • min_score – Filter out results with a similarity score below this value. Must be between 0 and 1. For searches on hybrid indexes, the Document Index applies the min_score to the semantic results before fusion of result sets. As fusion re-scores results, returned scores may exceed this value.

class intelligence_layer.connectors.SearchResult(*, id: ID, score: float, document_chunk: DocumentChunk)[source]

Bases: BaseModel, Generic[ID]

Contains a text alongside its search score.

id

Unique identifier of the document

Type:

intelligence_layer.connectors.retrievers.base_retriever.ID

score

The similarity score between the text and the query that was searched with. Will be between 0 and 1, where 0 means no similarity and 1 perfect similarity.

Type:

float

document_chunk

The document chunk found by search.

Type:

intelligence_layer.connectors.retrievers.base_retriever.DocumentChunk

class intelligence_layer.connectors.StudioClient(project: str, studio_url: str | None = None, auth_token: str | None = None)[source]

Bases: object

Client for communicating with Studio.

project_id

The unique identifier of the project currently in use.

url

The url of your current Studio instance.

create_project(project: str, description: str | None = None) int[source]

Creates a project in Studio.

Projects are uniquely identified by the user provided name.

Parameters:
  • project – User provided name of the project.

  • description – Description explaining the usage of the project. Defaults to None.

Returns:

The ID of the newly created project.

submit_dataset(dataset: StudioDataset, examples: Iterable[StudioExample]) str[source]

Submits a dataset to Studio.

Parameters:
  • datasetDataset to be uploaded

  • examplesExamples of the Dataset

Returns:

ID of the created dataset

submit_from_tracer(tracer: Tracer) list[str][source]

Sends all trace data from the Tracer to Studio.

Parameters:

tracerTracer to extract data from.

Returns:

List of created trace IDs.

submit_trace(data: Sequence[ExportedSpan]) str[source]

Sends the provided spans to Studio as a singular trace.

The method fails if the span list is empty, has already been created or if spans belong to multiple traces.

Parameters:

dataSpans to create the trace from. Created by exporting from a Tracer.

Returns:

The ID of the created trace.