intelligence_layer.examples

Module contents

class intelligence_layer.examples.AggregatedLabelInfo(*, expected_count: int, predicted_count: int)[source]

Bases: BaseModel

class intelligence_layer.examples.AggregatedMultiLabelClassifyEvaluation(*, class_metrics: Mapping[str, MultiLabelClassifyMetrics], micro_avg: MultiLabelClassifyMetrics, macro_avg: MultiLabelClassifyMetrics)[source]

Bases: BaseModel

The aggregated evaluation of a multi-label classify dataset.

class_metrics

Mapping of all labels to their aggregated metrics.

Type:

collections.abc.Mapping[str, intelligence_layer.examples.classify.classify.MultiLabelClassifyMetrics]

micro_avg

Calculated by considering the tp, tn, fp and fn for each class, adding them up and dividing by them.

Type:

intelligence_layer.examples.classify.classify.MultiLabelClassifyMetrics

macro_avg

The metrics’ mean across all classes.

Type:

intelligence_layer.examples.classify.classify.MultiLabelClassifyMetrics

class intelligence_layer.examples.AggregatedSearchEvaluation(*, mean_score: float, mean_reciprocal_rank: float, mean_top_ks: Mapping[int, float], chunk_found: ChunkFound)[source]

Bases: BaseModel

class intelligence_layer.examples.AggregatedSingleLabelClassifyEvaluation(*, percentage_correct: float, precision_by_class: dict[str, float | None], recall_by_class: dict[str, float | None], f1_by_class: dict[str, float], confusion_matrix: dict[str, dict[str, int]], by_label: dict[str, AggregatedLabelInfo], missing_labels: dict[str, int])[source]

Bases: BaseModel

The aggregated evaluation of a single label classify implementation against a dataset.

percentage_correct

Percentage of answers that were considered to be correct.

Type:

float

precision_by_class

Precision for each class

Type:

dict[str, float | None]

recall_by_class

Recall for each class

Type:

dict[str, float | None]

f1_by_class

f1-score for each class

Type:

dict[str, float]

confusion_matrix

A matrix showing the predicted classifications vs the expected classifications. First key refers to the rows of the confusion matrix (=actual prediction), second key refers to the columns of the matrix (=expected value).

Type:

dict[str, dict[str, int]]

by_label

Each label along side the counts how often it was expected or predicted.

Type:

dict[str, intelligence_layer.examples.classify.classify.AggregatedLabelInfo]

missing_labels

Each expected label which is missing in the set of possible labels in the task input and the number of its occurrences.

Type:

dict[str, int]

class intelligence_layer.examples.AggregatedSummarizeEvaluation(*, aggregate_bleu: float, aggregate_rouge: float)[source]

Bases: BaseModel

The aggregated evaluation of a summarization implementation against a dataset.

aggregate_bleu

average over BLEU-scores

Type:

float

aggregate_rouge

average over ROUGE-scores

Type:

float

class intelligence_layer.examples.ChunkFound(*, found_count: int, expected_count: int, percentage: float)[source]

Bases: BaseModel

class intelligence_layer.examples.ClassifyInput(*, chunk: TextChunk, labels: frozenset[str])[source]

Bases: BaseModel

Input for a classification task.

chunk

text to be classified.

Type:

intelligence_layer.core.chunk.TextChunk

labels

Possible labels the model will choose a label from

Type:

frozenset[str]

class intelligence_layer.examples.EloQaEvaluationLogic(model: ControlModel, tracer: Tracer | None = None)[source]

Bases: EloEvaluationLogic[SingleChunkQaInput, SingleChunkQaOutput, SingleChunkQaOutput]

do_evaluate(example: Example, *output: SuccessfulExampleOutput) Evaluation

Executes the evaluation for this specific example.

Responsible for comparing the input & expected output of a task to the actually generated output. The difference to the standard EvaluationLogic’s do_evaluate is that this method will separate already processed evaluation from new ones before handing them over to do_incremental_evaluate.

Parameters:
  • example – Input data of Task to produce the output.

  • *output – Outputs of the Task.

Returns:

The metrics that come from the evaluated Task.

Return type:

Evaluation

grade(first: SuccessfulExampleOutput[SingleChunkQaOutput], second: SuccessfulExampleOutput[SingleChunkQaOutput], example: Example[SingleChunkQaInput, SingleChunkQaOutput]) MatchOutcome[source]

Returns a :class: MatchOutcome for the provided two contestants on the given example.

Defines the use case specific logic how to determine the winner of the two provided outputs.

Parameters:
  • first – Instance of :class: SuccessfulExampleOutut[Output] of the first contestant in the comparison

  • second – Instance of :class: SuccessfulExampleOutut[Output] of the second contestant in the comparison

  • example – Datapoint of :class: Example on which the two outputs were generated

Returns:

class: MatchOutcome

Return type:

Instance of

class intelligence_layer.examples.EmbeddingBasedClassify(labels_with_examples: Sequence[LabelWithExamples], client: AlephAlphaClientProtocol | None = None, top_k_per_label: int = 5)[source]

Bases: Task[ClassifyInput, MultiLabelClassifyOutput]

Task that classifies a given input text based on examples.

The input contains a complete set of all possible labels. The output will return a score for each possible label. Scores will be between 0 and 1 but do not have to add up to one. On initiation, provide a list of examples for each label.

This methodology works best with a larger number of examples per label and with labels that consist of easily definable semantic clusters.

Parameters:
METADATA_LABEL_NAME

The metadata field name for ‘label’ in the retriever.

Example

>>> from intelligence_layer.connectors import (
...     LimitedConcurrencyClient,
... )
>>> from intelligence_layer.core import TextChunk, InMemoryTracer
>>> from intelligence_layer.examples.classify.classify import ClassifyInput
>>> from intelligence_layer.examples.classify.embedding_based_classify import (
...     EmbeddingBasedClassify,
...     LabelWithExamples,
... )
>>> labels_with_examples = [
...     LabelWithExamples(
...         name="positive",
...         examples=[
...             "I really like this.",
...         ],
...     ),
...     LabelWithExamples(
...         name="negative",
...         examples=[
...             "I really dislike this.",
...         ],
...     ),
... ]
>>> client = LimitedConcurrencyClient.from_env()
>>> task = EmbeddingBasedClassify(labels_with_examples, client=client)
>>> input = ClassifyInput(chunk=TextChunk("This is a happy text."), labels=frozenset({"positive", "negative"}))
>>> tracer = InMemoryTracer()
>>> output = task.run(input, tracer)
do_run(input: ClassifyInput, task_span: TaskSpan) MultiLabelClassifyOutput[source]

The implementation for this use case.

This takes an input and runs the implementation to generate an output. It takes a Span for tracing of the process. The Input and Output are logged by default.

Parameters:
  • input – Generic input defined by the task implementation

  • task_span – The Span used for tracing.

Returns:

Generic output defined by the task implementation.

run(input: Input, tracer: Tracer) Output

Executes the implementation of do_run for this use case.

This takes an input and runs the implementation to generate an output. It takes a Tracer for tracing of the process. The Input and Output are logged by default.

Parameters:
  • input – Generic input defined by the task implementation

  • tracer – The Tracer used for tracing.

Returns:

Generic output defined by the task implementation.

run_concurrently(inputs: Iterable[Input], tracer: Tracer, concurrency_limit: int = 20) Sequence[Output]

Executes multiple processes of this task concurrently.

Each provided input is potentially executed concurrently to the others. There is a global limit on the number of concurrently executed tasks that is shared by all tasks of all types.

Parameters:
  • inputs – The inputs that are potentially processed concurrently.

  • tracer – The tracer passed on the run method when executing a task.

  • concurrency_limit – An optional additional limit for the number of concurrently executed task for this method call. This can be used to prevent queue-full or similar error of downstream APIs when the global concurrency limit is too high for a certain task.

Returns:

The Outputs generated by calling run for each given Input. The order of Outputs corresponds to the order of the Inputs.

class intelligence_layer.examples.EnrichedSubanswer(*, answer: str | None, chunk: TextChunk, highlights: Sequence[ScoredTextHighlight], id: ID)[source]

Bases: Subanswer, Generic[ID]

Individual answer for a chunk that also contains the origin of the chunk.

answer

The answer generated by the task. Can be a string or None (if no answer was found).

Type:

str | None

chunk

Piece of the original text that answer is based on.

Type:

intelligence_layer.core.chunk.TextChunk

highlights

The specific sentences that explain the answer the most. These are generated by the TextHighlight Task.

Type:

collections.abc.Sequence[intelligence_layer.core.text_highlight.ScoredTextHighlight]

id

The id of the document where the chunk came from.

Type:

intelligence_layer.connectors.retrievers.base_retriever.ID

class intelligence_layer.examples.ExpandChunks(retriever: BaseRetriever[ID], model: AlephAlphaModel, max_chunk_size: int = 512)[source]

Bases: Generic[ID], Task[ExpandChunksInput, ExpandChunksOutput]

Expand chunks found during search with the chunks directly before and after the chunk of interest.

A Task class that expands specific text chunks identified during a document search using a retriever to access the original document. It expands the found chunks to the specified maximum size, and ensures overlapping and unique chunk coverage. This process ensures that the expanded chunks cover the chunks_found completely and include immediate context, which is often valuable for downstream tasks.

Parameters:
  • retriever – Used to access and return a set of texts.

  • model – The model’s tokenizer is relevant to calculate the correct size of the returned chunks.

  • max_chunk_size – The maximum chunk size of each returned chunk in #tokens.

do_run(input: ExpandChunksInput, task_span: TaskSpan) ExpandChunksOutput[source]

The implementation for this use case.

This takes an input and runs the implementation to generate an output. It takes a Span for tracing of the process. The Input and Output are logged by default.

Parameters:
  • input – Generic input defined by the task implementation

  • task_span – The Span used for tracing.

Returns:

Generic output defined by the task implementation.

run(input: Input, tracer: Tracer) Output

Executes the implementation of do_run for this use case.

This takes an input and runs the implementation to generate an output. It takes a Tracer for tracing of the process. The Input and Output are logged by default.

Parameters:
  • input – Generic input defined by the task implementation

  • tracer – The Tracer used for tracing.

Returns:

Generic output defined by the task implementation.

run_concurrently(inputs: Iterable[Input], tracer: Tracer, concurrency_limit: int = 20) Sequence[Output]

Executes multiple processes of this task concurrently.

Each provided input is potentially executed concurrently to the others. There is a global limit on the number of concurrently executed tasks that is shared by all tasks of all types.

Parameters:
  • inputs – The inputs that are potentially processed concurrently.

  • tracer – The tracer passed on the run method when executing a task.

  • concurrency_limit – An optional additional limit for the number of concurrently executed task for this method call. This can be used to prevent queue-full or similar error of downstream APIs when the global concurrency limit is too high for a certain task.

Returns:

The Outputs generated by calling run for each given Input. The order of Outputs corresponds to the order of the Inputs.

class intelligence_layer.examples.ExpandChunksInput(*, document_id: ID, chunks_found: Sequence[DocumentChunk])[source]

Bases: BaseModel, Generic[ID]

class intelligence_layer.examples.ExpandChunksOutput(*, chunks: Sequence[ChunkWithStartEndIndices])[source]

Bases: BaseModel

class intelligence_layer.examples.ExpectedSearchOutput(*, document_id: ID, start_idx: int, end_idx: int)[source]

Bases: BaseModel, Generic[ID]

class intelligence_layer.examples.KeywordExtract(model: ControlModel | None = None, instruct_configs: Mapping[Language, str] = {Language(iso_639_1='de'): 'Worum geht es in dem Text? Extrahiere ein paar Stichwörter in Form einer Komma-separierten Liste.', Language(iso_639_1='en'): 'What is the text about? Extract a few keywords in form of a comma-separated list.', Language(iso_639_1='es'): '¿De qué trata el texto? Extrae algunas palabras clave en forma de una lista separada por comas.', Language(iso_639_1='fr'): "De quoi parle le texte? Extraire quelques mots-clés sous forme d'une liste séparée par des virgules.", Language(iso_639_1='it'): 'Di cosa tratta il testo? Estrai alcune parole chiave sotto forma di una lista separata da virgole.'}, maximum_tokens: int = 32)[source]

Bases: Task[KeywordExtractInput, KeywordExtractOutput]

do_run(input: KeywordExtractInput, task_span: TaskSpan) KeywordExtractOutput[source]

The implementation for this use case.

This takes an input and runs the implementation to generate an output. It takes a Span for tracing of the process. The Input and Output are logged by default.

Parameters:
  • input – Generic input defined by the task implementation

  • task_span – The Span used for tracing.

Returns:

Generic output defined by the task implementation.

run(input: Input, tracer: Tracer) Output

Executes the implementation of do_run for this use case.

This takes an input and runs the implementation to generate an output. It takes a Tracer for tracing of the process. The Input and Output are logged by default.

Parameters:
  • input – Generic input defined by the task implementation

  • tracer – The Tracer used for tracing.

Returns:

Generic output defined by the task implementation.

run_concurrently(inputs: Iterable[Input], tracer: Tracer, concurrency_limit: int = 20) Sequence[Output]

Executes multiple processes of this task concurrently.

Each provided input is potentially executed concurrently to the others. There is a global limit on the number of concurrently executed tasks that is shared by all tasks of all types.

Parameters:
  • inputs – The inputs that are potentially processed concurrently.

  • tracer – The tracer passed on the run method when executing a task.

  • concurrency_limit – An optional additional limit for the number of concurrently executed task for this method call. This can be used to prevent queue-full or similar error of downstream APIs when the global concurrency limit is too high for a certain task.

Returns:

The Outputs generated by calling run for each given Input. The order of Outputs corresponds to the order of the Inputs.

class intelligence_layer.examples.KeywordExtractInput(*, chunk: TextChunk, language: Language)[source]

Bases: BaseModel

class intelligence_layer.examples.KeywordExtractOutput(*, keywords: frozenset[str])[source]

Bases: BaseModel

class intelligence_layer.examples.LabelWithDefinition(*, name: str, definition: str)[source]

Bases: BaseModel

Defines a label with a definition.

name

Name of the label.

Type:

str

definition

A definition or description of the label.

Type:

str

class intelligence_layer.examples.LabelWithExamples(*, name: str, examples: Sequence[str])[source]

Bases: BaseModel

Defines a label and the list of examples making it up.

name

Name of the label.

Type:

str

examples

The texts defining the example. Should be similar in structure and semantics to the texts to be classified on inference.

Type:

collections.abc.Sequence[str]

class intelligence_layer.examples.LongContextQa(multi_chunk_qa: Task[MultipleChunkQaInput, MultipleChunkQaOutput] | None = None, chunk: Task[ChunkInput, ChunkOutput] | None = None, k: int = 4, model: ControlModel | None = None)[source]

Bases: Task[LongContextQaInput, MultipleChunkQaOutput]

Answer a question on the basis of a (lengthy) document.

Best for answering a question on the basis of a long document, where the length of text exceeds the context length of a model (e.g. 2048 tokens for the luminous models).

Note

  • Creates instance of InMemoryRetriever on the fly.

  • model provided should be a control-type model.

Parameters:
  • multi_chunk_qa – task used to produce answers for each relevant chunk generated by the chunk-task for the given input. Defaults to MultipleChunkQa .

  • chunk – task used to chunk the input. Defaults to Chunk .

  • k – The number of top relevant chunks to retrieve.

  • model – The model used in the task.

Example

>>> from intelligence_layer.core import InMemoryTracer
>>> from intelligence_layer.examples import LongContextQa, LongContextQaInput
>>> task = LongContextQa()
>>> input = LongContextQaInput(text="Lengthy text goes here...",
...                             question="Where does the text go?")
>>> tracer = InMemoryTracer()
>>> output = task.run(input, tracer)
do_run(input: LongContextQaInput, task_span: TaskSpan) MultipleChunkQaOutput[source]

The implementation for this use case.

This takes an input and runs the implementation to generate an output. It takes a Span for tracing of the process. The Input and Output are logged by default.

Parameters:
  • input – Generic input defined by the task implementation

  • task_span – The Span used for tracing.

Returns:

Generic output defined by the task implementation.

run(input: Input, tracer: Tracer) Output

Executes the implementation of do_run for this use case.

This takes an input and runs the implementation to generate an output. It takes a Tracer for tracing of the process. The Input and Output are logged by default.

Parameters:
  • input – Generic input defined by the task implementation

  • tracer – The Tracer used for tracing.

Returns:

Generic output defined by the task implementation.

run_concurrently(inputs: Iterable[Input], tracer: Tracer, concurrency_limit: int = 20) Sequence[Output]

Executes multiple processes of this task concurrently.

Each provided input is potentially executed concurrently to the others. There is a global limit on the number of concurrently executed tasks that is shared by all tasks of all types.

Parameters:
  • inputs – The inputs that are potentially processed concurrently.

  • tracer – The tracer passed on the run method when executing a task.

  • concurrency_limit – An optional additional limit for the number of concurrently executed task for this method call. This can be used to prevent queue-full or similar error of downstream APIs when the global concurrency limit is too high for a certain task.

Returns:

The Outputs generated by calling run for each given Input. The order of Outputs corresponds to the order of the Inputs.

class intelligence_layer.examples.LongContextQaInput(*, text: str, question: str, language: Language = Language(iso_639_1='en'))[source]

Bases: BaseModel

The input for a LongContextQa task.

text

Text of arbitrary length on the basis of which the question is to be answered.

Type:

str

question

The question for the text.

Type:

str

language

The desired language of the answer. ISO 619 str with language e.g. en, fr, etc.

Type:

intelligence_layer.core.detect_language.Language

class intelligence_layer.examples.LongContextSummarizeAggregationLogic[source]

Bases: AggregationLogic[SummarizeEvaluation, AggregatedSummarizeEvaluation]

aggregate(evaluations: Iterable[SummarizeEvaluation]) AggregatedSummarizeEvaluation[source]

Evaluator-specific method for aggregating individual Evaluations into report-like Aggregated Evaluation.

This method is responsible for taking the results of an evaluation run and aggregating all the results. It should create an AggregatedEvaluation class and return it at the end.

Parameters:

evaluations – The results from running eval_and_aggregate_runs with a Task.

Returns:

The aggregated results of an evaluation run with a Dataset.

class intelligence_layer.examples.LongContextSummarizeEvaluationLogic[source]

Bases: SingleOutputEvaluationLogic[LongContextSummarizeInput, LongContextSummarizeOutput, str, SummarizeEvaluation]

do_evaluate(example: Example, *output: SuccessfulExampleOutput) Evaluation

Executes the evaluation for this specific example.

Responsible for comparing the input & expected output of a task to the actually generated output.

Parameters:
  • example – Input data of Task to produce the output.

  • *output – Output of the Task.

Returns:

The metrics that come from the evaluated Task.

class intelligence_layer.examples.LongContextSummarizeInput(*, text: str, language: Language = Language(iso_639_1='en'))[source]

Bases: BaseModel

The input for a summarize-task for a text of any length.

text

A text of any length.

Type:

str

language

The desired language of the summary. ISO 619 str with language e.g. en, fr, etc.

Type:

intelligence_layer.core.detect_language.Language

class intelligence_layer.examples.LongContextSummarizeOutput(*, partial_summaries: Sequence[PartialSummary])[source]

Bases: BaseModel

The output of a summarize-task for a text of any length.

partial_summaries

Chunk-wise summaries.

Type:

collections.abc.Sequence[intelligence_layer.examples.summarize.summarize.PartialSummary]

class intelligence_layer.examples.MultiLabelClassifyAggregationLogic[source]

Bases: AggregationLogic[MultiLabelClassifyEvaluation, AggregatedMultiLabelClassifyEvaluation]

aggregate(evaluations: Iterable[MultiLabelClassifyEvaluation]) AggregatedMultiLabelClassifyEvaluation[source]

Evaluator-specific method for aggregating individual Evaluations into report-like Aggregated Evaluation.

This method is responsible for taking the results of an evaluation run and aggregating all the results. It should create an AggregatedEvaluation class and return it at the end.

Parameters:

evaluations – The results from running eval_and_aggregate_runs with a Task.

Returns:

The aggregated results of an evaluation run with a Dataset.

class intelligence_layer.examples.MultiLabelClassifyEvaluation(*, tp: frozenset[str], tn: frozenset[str], fp: frozenset[str], fn: frozenset[str])[source]

Bases: BaseModel

The evaluation of a single multi-label classification example.

tp

The classes that were expected and correctly predicted (true positives).

Type:

frozenset[str]

tn

The classes that were not expected and correctly not predicted (true negatives).

Type:

frozenset[str]

fp

The classes that were not expected and falsely predicted (false positives).

Type:

frozenset[str]

fn

The classes that were expected and falsely not predicted (false negatives).

Type:

frozenset[str]

class intelligence_layer.examples.MultiLabelClassifyEvaluationLogic(threshold: float = 0.55)[source]

Bases: SingleOutputEvaluationLogic[ClassifyInput, MultiLabelClassifyOutput, Sequence[str], MultiLabelClassifyEvaluation]

do_evaluate(example: Example, *output: SuccessfulExampleOutput) Evaluation

Executes the evaluation for this specific example.

Responsible for comparing the input & expected output of a task to the actually generated output.

Parameters:
  • example – Input data of Task to produce the output.

  • *output – Output of the Task.

Returns:

The metrics that come from the evaluated Task.

class intelligence_layer.examples.MultiLabelClassifyOutput(*, scores: Mapping[str, Probability])[source]

Bases: BaseModel

Output for a multi label classification task.

scores

Mapping of the provided label (key) to corresponding score (value). The score represents how sure the model is that this is the correct label. This will be a value between 0 and 1. There is not constraint on the sum of the individual probabilities.

Type:

collections.abc.Mapping[str, intelligence_layer.examples.classify.classify.Probability]

class intelligence_layer.examples.MultipleChunkQa(single_chunk_qa: Task[SingleChunkQaInput, SingleChunkQaOutput] | None = None, merge_answers_model: ControlModel | None = None, merge_answers_instruct_configs: Mapping[Language, MergeAnswersInstructConfig] = {Language(iso_639_1='de'): MergeAnswersInstructConfig(instruction='Fasse alle Antworten zu einer einzigen Antwort zusammen. Falls es Widersprüche gibt, präsentiere diese. Deine Antwort sollte nicht länger als 5 Sätze sein.', question_label='Frage', answers_label='Antworten', final_answer_label='Endgültige Antwort:', maximum_tokens=128), Language(iso_639_1='en'): MergeAnswersInstructConfig(instruction='You are tasked with combining multiple answers into a single answer. If conflicting answers arise, acknowledge the discrepancies by presenting them collectively. Your answer should not be lomnger than 5 sentences.', question_label='Question', answers_label='Answers', final_answer_label='Final answer:', maximum_tokens=128), Language(iso_639_1='es'): MergeAnswersInstructConfig(instruction='Su tarea consiste en combinar varias respuestas en una sola. Si surgen respuestas contradictorias, reconozca las discrepancias presentándolas colectivamente. Su respuesta no debe superar las 5 frases.', question_label='Pregunta', answers_label='Respuestas', final_answer_label='Respuesta final:', maximum_tokens=128), Language(iso_639_1='fr'): MergeAnswersInstructConfig(instruction='Vous devez combiner plusieurs réponses en une seule. Si des réponses contradictoires apparaissent, reconnaissez les divergences en les présentant collectivement. Votre réponse ne doit pas dépasser 5 phrases.', question_label='Question', answers_label='Réponses', final_answer_label='Réponse finale:', maximum_tokens=128), Language(iso_639_1='it'): MergeAnswersInstructConfig(instruction="Il compito è quello di combinare più risposte in un'unica risposta. Se emergono risposte contrastanti, riconoscete le discrepanze presentandole collettivamente. La risposta non deve essere più lunga di 5 frasi.", question_label='Domanda', answers_label='Risposte', final_answer_label='Risposta finale:', maximum_tokens=128)})[source]

Bases: Task[MultipleChunkQaInput, MultipleChunkQaOutput]

Answer a question on the basis of a list of text chunks.

Uses Aleph Alpha models to generate a natural language answer based on multiple text chunks. Best for longer texts that are already split into smaller units (chunks). Relies on SingleChunkQa to generate answers for each chunk and then merges the answers into a single final answer. Includes logic to return ‘answer = None’ if the language model determines that the question cannot be reliably answered on the basis of the chunks.

Note

model provided should be a control-type model.

Parameters:
  • single_chunk_qa – The task that is used to generate an answer based on a single chunk. Defaults to SingleChunkQa .

  • merge_answers_model – The model used throughout the task for model related API calls. Defaults to luminous-supreme-control.

  • merge_answers_instruct_configs – Mapping language used to prompt parameters.

Example

>>> import os
>>> from intelligence_layer.connectors import (
...     LimitedConcurrencyClient,
... )
>>> from intelligence_layer.core import Language, InMemoryTracer
>>> from intelligence_layer.core.chunk import TextChunk
>>> from intelligence_layer.examples import (
...     MultipleChunkQa,
...     MultipleChunkQaInput,
... )
>>> task = MultipleChunkQa()
>>> input = MultipleChunkQaInput(
...     chunks=[TextChunk("Tina does not like pizza."), TextChunk("Mike is a big fan of pizza.")],
...     question="Who likes pizza?",
...     language=Language("en"),
... )
>>> tracer = InMemoryTracer()
>>> output = task.run(input, tracer)
>>> print(output.answer)
Mike likes pizza.
do_run(input: MultipleChunkQaInput, task_span: TaskSpan) MultipleChunkQaOutput[source]

The implementation for this use case.

This takes an input and runs the implementation to generate an output. It takes a Span for tracing of the process. The Input and Output are logged by default.

Parameters:
  • input – Generic input defined by the task implementation

  • task_span – The Span used for tracing.

Returns:

Generic output defined by the task implementation.

run(input: Input, tracer: Tracer) Output

Executes the implementation of do_run for this use case.

This takes an input and runs the implementation to generate an output. It takes a Tracer for tracing of the process. The Input and Output are logged by default.

Parameters:
  • input – Generic input defined by the task implementation

  • tracer – The Tracer used for tracing.

Returns:

Generic output defined by the task implementation.

run_concurrently(inputs: Iterable[Input], tracer: Tracer, concurrency_limit: int = 20) Sequence[Output]

Executes multiple processes of this task concurrently.

Each provided input is potentially executed concurrently to the others. There is a global limit on the number of concurrently executed tasks that is shared by all tasks of all types.

Parameters:
  • inputs – The inputs that are potentially processed concurrently.

  • tracer – The tracer passed on the run method when executing a task.

  • concurrency_limit – An optional additional limit for the number of concurrently executed task for this method call. This can be used to prevent queue-full or similar error of downstream APIs when the global concurrency limit is too high for a certain task.

Returns:

The Outputs generated by calling run for each given Input. The order of Outputs corresponds to the order of the Inputs.

class intelligence_layer.examples.MultipleChunkQaInput(*, chunks: Sequence[TextChunk], question: str, language: Language = Language(iso_639_1='en'), generate_highlights: bool = True)[source]

Bases: BaseModel

The input for a MultipleChunkQa task.

chunks

The list of chunks that will be used to answer the question. Can be arbitrarily long list of chunks.

Type:

collections.abc.Sequence[intelligence_layer.core.chunk.TextChunk]

question

The question that will be answered based on the chunks.

Type:

str

language

The desired language of the answer. ISO 619 str with language e.g. en, fr, etc.

Type:

intelligence_layer.core.detect_language.Language

generate_highlights

Whether to generate highlights (using the explainability feature) for the answer. Defaults to True.

Type:

bool

class intelligence_layer.examples.MultipleChunkQaOutput(*, answer: str | None, subanswers: Sequence[Subanswer])[source]

Bases: BaseModel

The output of a MultipleChunkQa task.

answer

The answer generated by the task. Can be a string or None (if no answer was found).

Type:

str | None

subanswers

All the subanswers used to generate the answer.

Type:

collections.abc.Sequence[intelligence_layer.examples.qa.multiple_chunk_qa.Subanswer]

class intelligence_layer.examples.MultipleChunkRetrieverQa(retriever: BaseRetriever[ID], insert_chunk_number: int = 5, model: ControlModel | None = None, expand_chunks: Task[ExpandChunksInput, ExpandChunksOutput] | None = None, single_chunk_qa: Task[SingleChunkQaInput, SingleChunkQaOutput] | None = None, source_prefix_config: Mapping[Language, str] = {Language(iso_639_1='de'): 'Quelle {i}:\n', Language(iso_639_1='en'): 'Source {i}:\n', Language(iso_639_1='es'): 'Fuente {i}:\n', Language(iso_639_1='fr'): 'Source {i}:\n', Language(iso_639_1='it'): 'Fonte {i}:\n'})[source]

Bases: Task[RetrieverBasedQaInput, MultipleChunkRetrieverQaOutput], Generic[ID]

Answer a question based on documents found by a retriever.

MultipleChunkRetrieverBasedQa is a task that answers a question based on a set of documents. It relies on some retriever of type BaseRetriever that has the ability to access texts. In contrast to the regular RetrieverBasedQa, this tasks injects multiple chunks into one SingleChunkQa task run.

We recommend using this task instead of RetrieverBasedQa.

Note

model provided should be a control-type model.

Parameters:
  • retriever – Used to access and return a set of texts.

  • insert_chunk_number – number of top chunks to inject into SingleChunkQa-task.

  • model – The model used throughout the task for model related API calls.

  • expand_chunks – The task used to fetch adjacent chunks to the search results. These “expanded” chunks will be injected into the prompt.

  • single_chunk_qa – The task used to generate an answer for a single chunk (retrieved through the retriever). Defaults to SingleChunkQa.

  • source_prefix_config – A mapping that describes the source section string for different languages. Defaults to the equivalent of “Source {language}”.

do_run(input: RetrieverBasedQaInput, task_span: TaskSpan) MultipleChunkRetrieverQaOutput[source]

The implementation for this use case.

This takes an input and runs the implementation to generate an output. It takes a Span for tracing of the process. The Input and Output are logged by default.

Parameters:
  • input – Generic input defined by the task implementation

  • task_span – The Span used for tracing.

Returns:

Generic output defined by the task implementation.

run(input: Input, tracer: Tracer) Output

Executes the implementation of do_run for this use case.

This takes an input and runs the implementation to generate an output. It takes a Tracer for tracing of the process. The Input and Output are logged by default.

Parameters:
  • input – Generic input defined by the task implementation

  • tracer – The Tracer used for tracing.

Returns:

Generic output defined by the task implementation.

run_concurrently(inputs: Iterable[Input], tracer: Tracer, concurrency_limit: int = 20) Sequence[Output]

Executes multiple processes of this task concurrently.

Each provided input is potentially executed concurrently to the others. There is a global limit on the number of concurrently executed tasks that is shared by all tasks of all types.

Parameters:
  • inputs – The inputs that are potentially processed concurrently.

  • tracer – The tracer passed on the run method when executing a task.

  • concurrency_limit – An optional additional limit for the number of concurrently executed task for this method call. This can be used to prevent queue-full or similar error of downstream APIs when the global concurrency limit is too high for a certain task.

Returns:

The Outputs generated by calling run for each given Input. The order of Outputs corresponds to the order of the Inputs.

class intelligence_layer.examples.MultipleChunkRetrieverQaOutput(*, answer: str | None, sources: Sequence[AnswerSource], search_results: Sequence[SearchResult])[source]

Bases: BaseModel, Generic[ID]

Returns the answer of a QA task together with the sources which support the answer.

The important thing to note is that the answer is generated based on multiple chunks of text. Furthermore, there are potentially multiple sources associated with a single search result. This is due to the fact, that the search result may be expanded to include adjacent chunks of text.

answer

The answer generated by the QA task. May be None if no answer was found in the text.

Type:

str | None

sources

A list of source chunks or passages that support or are relevant to the provided answer.

Type:

collections.abc.Sequence[intelligence_layer.examples.qa.multiple_chunk_retriever_qa.AnswerSource]

search_results

A list of search results from the retriever, providing additional references.

Type:

collections.abc.Sequence[intelligence_layer.connectors.retrievers.base_retriever.SearchResult]

class intelligence_layer.examples.PartialSummary(*, summary: str, chunk: TextChunk, generated_tokens: int)[source]

Bases: BaseModel

The summary of a single chunk.

summary

The summary generated by the task.

Type:

str

chunk

The source chunk.

Type:

intelligence_layer.core.chunk.TextChunk

generated_tokens

The number of tokens generated for the summary

Type:

int

class intelligence_layer.examples.PromptBasedClassify(model: LuminousControlModel | Llama3InstructModel | None = None, echo: Task[EchoInput, EchoOutput] | None = None, instruction: str = 'Identify a class that describes the text adequately.\nReply with only the class label.')[source]

Bases: Task[ClassifyInput, SingleLabelClassifyOutput]

Task that classifies a given input text with one of the given classes.

The input contains a complete set of all possible labels. The output will return a score for each possible label. All scores will add up to 1 and are relative to each other. The highest score is given to the most likely class.

This methodology works best for classes that are easily understood, and don’t require an explanation or examples.

Parameters:
  • model – The model used throughout the task for model related API calls. Defaults to luminous-base-control.

  • echo – echo-task used to compute the score for each label. Defaults to Echo.

  • instruction – The prompt to use. Check the class for the default.

PROMPT_TEMPLATE_STR

The prompt template used for answering the question. ‘text’ and ‘labels’ will be inserted here.

MODEL

A valid Aleph Alpha model name.

Example

>>> from intelligence_layer.core import InMemoryTracer
>>> from intelligence_layer.core import TextChunk
>>> from intelligence_layer.examples import ClassifyInput
>>> from intelligence_layer.examples import PromptBasedClassify
>>> task = PromptBasedClassify()
>>> input = ClassifyInput(
...     chunk=TextChunk("This is a happy text."), labels=frozenset({"positive", "negative"})
... )
>>> tracer = InMemoryTracer()
>>> output = task.run(input, tracer)
do_run(input: ClassifyInput, task_span: TaskSpan) SingleLabelClassifyOutput[source]

The implementation for this use case.

This takes an input and runs the implementation to generate an output. It takes a Span for tracing of the process. The Input and Output are logged by default.

Parameters:
  • input – Generic input defined by the task implementation

  • task_span – The Span used for tracing.

Returns:

Generic output defined by the task implementation.

run(input: Input, tracer: Tracer) Output

Executes the implementation of do_run for this use case.

This takes an input and runs the implementation to generate an output. It takes a Tracer for tracing of the process. The Input and Output are logged by default.

Parameters:
  • input – Generic input defined by the task implementation

  • tracer – The Tracer used for tracing.

Returns:

Generic output defined by the task implementation.

run_concurrently(inputs: Iterable[Input], tracer: Tracer, concurrency_limit: int = 20) Sequence[Output]

Executes multiple processes of this task concurrently.

Each provided input is potentially executed concurrently to the others. There is a global limit on the number of concurrently executed tasks that is shared by all tasks of all types.

Parameters:
  • inputs – The inputs that are potentially processed concurrently.

  • tracer – The tracer passed on the run method when executing a task.

  • concurrency_limit – An optional additional limit for the number of concurrently executed task for this method call. This can be used to prevent queue-full or similar error of downstream APIs when the global concurrency limit is too high for a certain task.

Returns:

The Outputs generated by calling run for each given Input. The order of Outputs corresponds to the order of the Inputs.

class intelligence_layer.examples.PromptBasedClassifyWithDefinitions(labels_with_definitions: Sequence[LabelWithDefinition], model: ControlModel | None = None, instruction: str = 'Identify a class that describes the text adequately.\nReply with only the class label.')[source]

Bases: Task[ClassifyInput, SingleLabelClassifyOutput]

do_run(input: ClassifyInput, task_span: TaskSpan) SingleLabelClassifyOutput[source]

The implementation for this use case.

This takes an input and runs the implementation to generate an output. It takes a Span for tracing of the process. The Input and Output are logged by default.

Parameters:
  • input – Generic input defined by the task implementation

  • task_span – The Span used for tracing.

Returns:

Generic output defined by the task implementation.

run(input: Input, tracer: Tracer) Output

Executes the implementation of do_run for this use case.

This takes an input and runs the implementation to generate an output. It takes a Tracer for tracing of the process. The Input and Output are logged by default.

Parameters:
  • input – Generic input defined by the task implementation

  • tracer – The Tracer used for tracing.

Returns:

Generic output defined by the task implementation.

run_concurrently(inputs: Iterable[Input], tracer: Tracer, concurrency_limit: int = 20) Sequence[Output]

Executes multiple processes of this task concurrently.

Each provided input is potentially executed concurrently to the others. There is a global limit on the number of concurrently executed tasks that is shared by all tasks of all types.

Parameters:
  • inputs – The inputs that are potentially processed concurrently.

  • tracer – The tracer passed on the run method when executing a task.

  • concurrency_limit – An optional additional limit for the number of concurrently executed task for this method call. This can be used to prevent queue-full or similar error of downstream APIs when the global concurrency limit is too high for a certain task.

Returns:

The Outputs generated by calling run for each given Input. The order of Outputs corresponds to the order of the Inputs.

class intelligence_layer.examples.QdrantSearch(in_memory_retriever: QdrantInMemoryRetriever)[source]

Bases: Task[QdrantSearchInput, SearchOutput[int]]

Performs search to find documents using QDrant filtering methods.

Given a query, this task will utilize a retriever to fetch relevant text search results. Contrary to Search, this Task offers the option to filter.

Parameters:

in_memory_retriever – Implements logic to retrieve matching texts to the query.

Example

>>> import os
>>> from intelligence_layer.connectors import (
...     LimitedConcurrencyClient,
... )
>>> from intelligence_layer.connectors import Document
>>> from intelligence_layer.connectors import (
...     QdrantInMemoryRetriever,
... )
>>> from intelligence_layer.core import InMemoryTracer
>>> from intelligence_layer.examples import (
...     QdrantSearch,
...     QdrantSearchInput,
... )
>>> from qdrant_client.http.models import models
>>> client = LimitedConcurrencyClient.from_env()
>>> documents = [
...     Document(
...         text="West and East Germany reunited in 1990.", metadata={"title": "Germany"}
...     )
... ]
>>> retriever = QdrantInMemoryRetriever(documents, 3, client=client)
>>> task = QdrantSearch(retriever)
>>> input = QdrantSearchInput(
...     query="When did East and West Germany reunite?",
...     filter=models.Filter(
...         must=[
...             models.FieldCondition(
...                 key="metadata.title",
...                 match=models.MatchValue(value="Germany"),
...             ),
...         ]
...     ),
... )
>>> tracer = InMemoryTracer()
>>> output = task.run(input, tracer)
do_run(input: QdrantSearchInput, task_span: TaskSpan) SearchOutput[int][source]

The implementation for this use case.

This takes an input and runs the implementation to generate an output. It takes a Span for tracing of the process. The Input and Output are logged by default.

Parameters:
  • input – Generic input defined by the task implementation

  • task_span – The Span used for tracing.

Returns:

Generic output defined by the task implementation.

run(input: Input, tracer: Tracer) Output

Executes the implementation of do_run for this use case.

This takes an input and runs the implementation to generate an output. It takes a Tracer for tracing of the process. The Input and Output are logged by default.

Parameters:
  • input – Generic input defined by the task implementation

  • tracer – The Tracer used for tracing.

Returns:

Generic output defined by the task implementation.

run_concurrently(inputs: Iterable[Input], tracer: Tracer, concurrency_limit: int = 20) Sequence[Output]

Executes multiple processes of this task concurrently.

Each provided input is potentially executed concurrently to the others. There is a global limit on the number of concurrently executed tasks that is shared by all tasks of all types.

Parameters:
  • inputs – The inputs that are potentially processed concurrently.

  • tracer – The tracer passed on the run method when executing a task.

  • concurrency_limit – An optional additional limit for the number of concurrently executed task for this method call. This can be used to prevent queue-full or similar error of downstream APIs when the global concurrency limit is too high for a certain task.

Returns:

The Outputs generated by calling run for each given Input. The order of Outputs corresponds to the order of the Inputs.

class intelligence_layer.examples.QdrantSearchInput(*, query: str, filter: Filter)[source]

Bases: BaseModel

The input for a QdrantSearch task.

query

The text to be searched with.

Type:

str

filter

Conditions to filter by as offered by Qdrant.

Type:

qdrant_client.http.models.models.Filter

class intelligence_layer.examples.RecursiveSummarize(long_context_summarize_task: Task[LongContextSummarizeInput, LongContextSummarizeOutput] | None = None)[source]

Bases: Task[RecursiveSummarizeInput, SummarizeOutput]

This task will summarize the input text recursively until the desired length is reached.

It uses any long-context summarize task to go over text recursively and condense it even further.

Parameters:

long_context_summarize_task – Any task that satifies the interface Input: LongContextSummarizeInput and Output: LongContextSummarizeOutput. Defaults to SteerableLongContextSummarize

do_run(input: RecursiveSummarizeInput, task_span: TaskSpan) SummarizeOutput[source]

The implementation for this use case.

This takes an input and runs the implementation to generate an output. It takes a Span for tracing of the process. The Input and Output are logged by default.

Parameters:
  • input – Generic input defined by the task implementation

  • task_span – The Span used for tracing.

Returns:

Generic output defined by the task implementation.

run(input: Input, tracer: Tracer) Output

Executes the implementation of do_run for this use case.

This takes an input and runs the implementation to generate an output. It takes a Tracer for tracing of the process. The Input and Output are logged by default.

Parameters:
  • input – Generic input defined by the task implementation

  • tracer – The Tracer used for tracing.

Returns:

Generic output defined by the task implementation.

run_concurrently(inputs: Iterable[Input], tracer: Tracer, concurrency_limit: int = 20) Sequence[Output]

Executes multiple processes of this task concurrently.

Each provided input is potentially executed concurrently to the others. There is a global limit on the number of concurrently executed tasks that is shared by all tasks of all types.

Parameters:
  • inputs – The inputs that are potentially processed concurrently.

  • tracer – The tracer passed on the run method when executing a task.

  • concurrency_limit – An optional additional limit for the number of concurrently executed task for this method call. This can be used to prevent queue-full or similar error of downstream APIs when the global concurrency limit is too high for a certain task.

Returns:

The Outputs generated by calling run for each given Input. The order of Outputs corresponds to the order of the Inputs.

class intelligence_layer.examples.RecursiveSummarizeInput(*, text: str, language: Language = Language(iso_639_1='en'), max_tokens: int = 512)[source]

Bases: BaseModel

The input for a recursive summarize-task for a text of any length.

text

A text of any length.

Type:

str

language

The desired language of the summary. ISO 619 str with language e.g. en, fr, etc.

Type:

intelligence_layer.core.detect_language.Language

max_tokens

The maximum desired length of the summary in tokens.

Type:

int

class intelligence_layer.examples.RetrieverBasedQa(retriever: BaseRetriever[ID], multi_chunk_qa: Task[MultipleChunkQaInput, MultipleChunkQaOutput] | None = None)[source]

Bases: Task[RetrieverBasedQaInput, RetrieverBasedQaOutput], Generic[ID]

Answer a question based on documents found by a retriever.

RetrieverBasedQa is a task that answers a question based on a set of documents. Relies on some retriever of type BaseRetriever that has the ability to access texts.

Note

model provided should be a control-type model.

Parameters:
  • retriever – Used to access and return a set of texts.

  • multi_chunk_qa – The task that is used to generate an answer for a single chunk (retrieved through the retriever). Defaults to MultipleChunkQa .

Example

>>> import os
>>> from intelligence_layer.connectors import DocumentIndexClient
>>> from intelligence_layer.connectors import DocumentIndexRetriever
>>> from intelligence_layer.core import InMemoryTracer
>>> from intelligence_layer.examples import RetrieverBasedQa, RetrieverBasedQaInput
>>> token = os.getenv("AA_TOKEN")
>>> document_index = DocumentIndexClient(token)
>>> retriever = DocumentIndexRetriever(document_index, "asymmetric", "aleph-alpha", "wikipedia-de", 3)
>>> task = RetrieverBasedQa(retriever)
>>> input_data = RetrieverBasedQaInput(question="When was Rome founded?")
>>> tracer = InMemoryTracer()
>>> output = task.run(input_data, tracer)
do_run(input: RetrieverBasedQaInput, task_span: TaskSpan) RetrieverBasedQaOutput[source]

The implementation for this use case.

This takes an input and runs the implementation to generate an output. It takes a Span for tracing of the process. The Input and Output are logged by default.

Parameters:
  • input – Generic input defined by the task implementation

  • task_span – The Span used for tracing.

Returns:

Generic output defined by the task implementation.

run(input: Input, tracer: Tracer) Output

Executes the implementation of do_run for this use case.

This takes an input and runs the implementation to generate an output. It takes a Tracer for tracing of the process. The Input and Output are logged by default.

Parameters:
  • input – Generic input defined by the task implementation

  • tracer – The Tracer used for tracing.

Returns:

Generic output defined by the task implementation.

run_concurrently(inputs: Iterable[Input], tracer: Tracer, concurrency_limit: int = 20) Sequence[Output]

Executes multiple processes of this task concurrently.

Each provided input is potentially executed concurrently to the others. There is a global limit on the number of concurrently executed tasks that is shared by all tasks of all types.

Parameters:
  • inputs – The inputs that are potentially processed concurrently.

  • tracer – The tracer passed on the run method when executing a task.

  • concurrency_limit – An optional additional limit for the number of concurrently executed task for this method call. This can be used to prevent queue-full or similar error of downstream APIs when the global concurrency limit is too high for a certain task.

Returns:

The Outputs generated by calling run for each given Input. The order of Outputs corresponds to the order of the Inputs.

class intelligence_layer.examples.RetrieverBasedQaInput(*, question: str, language: Language = Language(iso_639_1='en'), generate_highlights: bool = True)[source]

Bases: BaseModel

The input for a RetrieverBasedQa task.

question

The question to be answered based on the documents accessed by the retriever.

Type:

str

language

The desired language of the answer. ISO 619 str with language e.g. en, fr, etc.

Type:

intelligence_layer.core.detect_language.Language

generate_highlights

Whether to generate highlights (using the explainability feature) for the answer. Defaults to True.

Type:

bool

class intelligence_layer.examples.RetrieverBasedQaOutput(*, answer: str | None, subanswers: Sequence[EnrichedSubanswer])[source]

Bases: BaseModel, Generic[ID]

The output of a RetrieverBasedQa task.

answer

The answer generated by the task. Can be a string or None (if no answer was found).

Type:

str | None

subanswers

All the subanswers used to generate the answer.

Type:

collections.abc.Sequence[intelligence_layer.examples.qa.retriever_based_qa.EnrichedSubanswer]

class intelligence_layer.examples.Search(retriever: BaseRetriever[ID])[source]

Bases: Generic[ID], Task[SearchInput, SearchOutput]

Performs search to find documents.

Given a query, this task will utilize a retriever to fetch relevant text search results. Each result consists of a string representation of the content and an associated score indicating its relevance to the provided query.

Parameters:

retriever – Implements logic to retrieve matching texts to the query.

Example

>>> from os import getenv
>>> from intelligence_layer.connectors import (
...     DocumentIndexClient,
... )
>>> from intelligence_layer.connectors import (
...     DocumentIndexRetriever,
... )
>>> from intelligence_layer.core import InMemoryTracer
>>> from intelligence_layer.examples import Search, SearchInput
>>> document_index = DocumentIndexClient(getenv("AA_TOKEN"))
>>> retriever = DocumentIndexRetriever(document_index, "asymmetric", "aleph-alpha", "wikipedia-de", 3)
>>> task = Search(retriever)
>>> input = SearchInput(query="When did East and West Germany reunite?")
>>> tracer = InMemoryTracer()
>>> output = task.run(input, tracer)
do_run(input: SearchInput, task_span: TaskSpan) SearchOutput[source]

The implementation for this use case.

This takes an input and runs the implementation to generate an output. It takes a Span for tracing of the process. The Input and Output are logged by default.

Parameters:
  • input – Generic input defined by the task implementation

  • task_span – The Span used for tracing.

Returns:

Generic output defined by the task implementation.

run(input: Input, tracer: Tracer) Output

Executes the implementation of do_run for this use case.

This takes an input and runs the implementation to generate an output. It takes a Tracer for tracing of the process. The Input and Output are logged by default.

Parameters:
  • input – Generic input defined by the task implementation

  • tracer – The Tracer used for tracing.

Returns:

Generic output defined by the task implementation.

run_concurrently(inputs: Iterable[Input], tracer: Tracer, concurrency_limit: int = 20) Sequence[Output]

Executes multiple processes of this task concurrently.

Each provided input is potentially executed concurrently to the others. There is a global limit on the number of concurrently executed tasks that is shared by all tasks of all types.

Parameters:
  • inputs – The inputs that are potentially processed concurrently.

  • tracer – The tracer passed on the run method when executing a task.

  • concurrency_limit – An optional additional limit for the number of concurrently executed task for this method call. This can be used to prevent queue-full or similar error of downstream APIs when the global concurrency limit is too high for a certain task.

Returns:

The Outputs generated by calling run for each given Input. The order of Outputs corresponds to the order of the Inputs.

class intelligence_layer.examples.SearchAggregationLogic(top_ks_to_evaluate: Sequence[int])[source]

Bases: AggregationLogic[SearchEvaluation, AggregatedSearchEvaluation]

aggregate(evaluations: Iterable[SearchEvaluation]) AggregatedSearchEvaluation[source]

Evaluator-specific method for aggregating individual Evaluations into report-like Aggregated Evaluation.

This method is responsible for taking the results of an evaluation run and aggregating all the results. It should create an AggregatedEvaluation class and return it at the end.

Parameters:

evaluations – The results from running eval_and_aggregate_runs with a Task.

Returns:

The aggregated results of an evaluation run with a Dataset.

class intelligence_layer.examples.SearchEvaluation(*, rank: int | None, similarity_score: float | None)[source]

Bases: BaseModel

class intelligence_layer.examples.SearchEvaluationLogic[source]

Bases: Generic[ID], SingleOutputEvaluationLogic[SearchInput, SearchOutput, ExpectedSearchOutput, SearchEvaluation]

do_evaluate(example: Example, *output: SuccessfulExampleOutput) Evaluation

Executes the evaluation for this specific example.

Responsible for comparing the input & expected output of a task to the actually generated output.

Parameters:
  • example – Input data of Task to produce the output.

  • *output – Output of the Task.

Returns:

The metrics that come from the evaluated Task.

class intelligence_layer.examples.SearchInput(*, query: str)[source]

Bases: BaseModel

The input for a Search task.

query

The text to be searched with.

Type:

str

class intelligence_layer.examples.SearchOutput(*, results: Sequence[SearchResult])[source]

Bases: BaseModel, Generic[ID]

The output of a Search task.

results

Each result contains a text and corresponding score.

Type:

collections.abc.Sequence[intelligence_layer.connectors.retrievers.base_retriever.SearchResult]

class intelligence_layer.examples.SingleChunkQa(model: ControlModel | None = None, text_highlight: Task[TextHighlightInput, TextHighlightOutput] | None = None, instruction_config: Mapping[Language, QaSetup] = {Language(iso_639_1='de'): QaSetup(unformatted_instruction='Beantworte die Frage anhand des Textes. Wenn sich die Frage nicht mit dem Text beantworten lässt, antworte "{{no_answer_text}}".\nFrage: {{question}}', no_answer_str='Unbeantwortbar', no_answer_logit_bias=0.5), Language(iso_639_1='en'): QaSetup(unformatted_instruction='Question: {{question}}\nAnswer the question on the basis of the text. If there is no answer within the text, respond "{{no_answer_text}}".', no_answer_str='no answer in text', no_answer_logit_bias=1.0), Language(iso_639_1='es'): QaSetup(unformatted_instruction='{{question}}\nSi no hay respuesta, di "{{no_answer_text}}". Responde sólo a la pregunta basándote en el texto.', no_answer_str='no hay respuesta en el texto', no_answer_logit_bias=None), Language(iso_639_1='fr'): QaSetup(unformatted_instruction='{{question}}\nS\'il n\'y a pas de réponse, dites "{{no_answer_text}}". Ne répondez à la question qu\'en vous basant sur le texte.', no_answer_str='pas de réponse dans le texte', no_answer_logit_bias=None), Language(iso_639_1='it'): QaSetup(unformatted_instruction='{{question}}\nSe non c\'è risposta, dire "{{no_answer_text}}". Rispondere alla domanda solo in base al testo.', no_answer_str='nessuna risposta nel testo', no_answer_logit_bias=None)}, maximum_tokens: int = 256)[source]

Bases: Task[SingleChunkQaInput, SingleChunkQaOutput]

Answer a question on the basis of one chunk.

Uses Aleph Alpha models to generate a natural language answer for a text chunk given a question. Will answer None if the language model determines that the question cannot be answered on the basis of the text.

Parameters:
  • model – The model used throughout the task for model related API calls.

  • text_highlight – The task that is used for highlighting that parts of the input that are relevant for the answer. Defaults to TextHighlight .

  • instruction_config – defines instructions for different languages.

  • maximum_tokens – the maximal number of tokens to be generated for an answer.

NO_ANSWER_STR

The string to be generated by the model in case no answer can be found.

Example

>>> import os
>>> from intelligence_layer.core import Language, InMemoryTracer
>>> from intelligence_layer.core import TextChunk
>>> from intelligence_layer.examples import SingleChunkQa, SingleChunkQaInput
>>>
>>> task = SingleChunkQa()
>>> input = SingleChunkQaInput(
...     chunk=TextChunk("Tina does not like pizza. However, Mike does."),
...     question="Who likes pizza?",
...     language=Language("en"),
... )
>>> tracer = InMemoryTracer()
>>> output = task.run(input, tracer)
do_run(input: SingleChunkQaInput, task_span: TaskSpan) SingleChunkQaOutput[source]

The implementation for this use case.

This takes an input and runs the implementation to generate an output. It takes a Span for tracing of the process. The Input and Output are logged by default.

Parameters:
  • input – Generic input defined by the task implementation

  • task_span – The Span used for tracing.

Returns:

Generic output defined by the task implementation.

run(input: Input, tracer: Tracer) Output

Executes the implementation of do_run for this use case.

This takes an input and runs the implementation to generate an output. It takes a Tracer for tracing of the process. The Input and Output are logged by default.

Parameters:
  • input – Generic input defined by the task implementation

  • tracer – The Tracer used for tracing.

Returns:

Generic output defined by the task implementation.

run_concurrently(inputs: Iterable[Input], tracer: Tracer, concurrency_limit: int = 20) Sequence[Output]

Executes multiple processes of this task concurrently.

Each provided input is potentially executed concurrently to the others. There is a global limit on the number of concurrently executed tasks that is shared by all tasks of all types.

Parameters:
  • inputs – The inputs that are potentially processed concurrently.

  • tracer – The tracer passed on the run method when executing a task.

  • concurrency_limit – An optional additional limit for the number of concurrently executed task for this method call. This can be used to prevent queue-full or similar error of downstream APIs when the global concurrency limit is too high for a certain task.

Returns:

The Outputs generated by calling run for each given Input. The order of Outputs corresponds to the order of the Inputs.

class intelligence_layer.examples.SingleChunkQaInput(*, chunk: TextChunk, question: str, language: Language = Language(iso_639_1='en'), generate_highlights: bool = True)[source]

Bases: BaseModel

The input for a SingleChunkQa task.

chunk

The (short) text to be asked about. Usually measures one or a few paragraph(s). Can’t be longer than the context length of the model used minus the size of the system prompt.

Type:

intelligence_layer.core.chunk.TextChunk

question

The question to be asked by about the chunk.

Type:

str

language

The desired language of the answer. ISO 619 str with language e.g. en, fr, etc.

Type:

intelligence_layer.core.detect_language.Language

generate_highlights

Whether to generate highlights (using the explainability feature) for the answer. Defaults to True.

Type:

bool

class intelligence_layer.examples.SingleChunkQaOutput(*, answer: str | None, highlights: Sequence[ScoredTextHighlight])[source]

Bases: BaseModel

The output of a SingleChunkQa task.

answer

The answer generated by the task. Can be a string or None (if no answer was found).

Type:

str | None

highlights

Highlights indicating which parts of the chunk contributed to the answer. Each highlight is a quote from the text.

Type:

collections.abc.Sequence[intelligence_layer.core.text_highlight.ScoredTextHighlight]

class intelligence_layer.examples.SingleChunkSummarizeAggregationLogic[source]

Bases: AggregationLogic[SummarizeEvaluation, AggregatedSummarizeEvaluation]

aggregate(evaluations: Iterable[SummarizeEvaluation]) AggregatedSummarizeEvaluation[source]

Evaluator-specific method for aggregating individual Evaluations into report-like Aggregated Evaluation.

This method is responsible for taking the results of an evaluation run and aggregating all the results. It should create an AggregatedEvaluation class and return it at the end.

Parameters:

evaluations – The results from running eval_and_aggregate_runs with a Task.

Returns:

The aggregated results of an evaluation run with a Dataset.

class intelligence_layer.examples.SingleChunkSummarizeEvaluationLogic[source]

Bases: SingleOutputEvaluationLogic[SingleChunkSummarizeInput, SummarizeOutput, str, SummarizeEvaluation]

do_evaluate(example: Example, *output: SuccessfulExampleOutput) Evaluation

Executes the evaluation for this specific example.

Responsible for comparing the input & expected output of a task to the actually generated output.

Parameters:
  • example – Input data of Task to produce the output.

  • *output – Output of the Task.

Returns:

The metrics that come from the evaluated Task.

class intelligence_layer.examples.SingleChunkSummarizeInput(*, chunk: TextChunk, language: Language = Language(iso_639_1='en'))[source]

Bases: BaseModel

The input for a summarize-task that only deals with a single chunk.

chunk

The text chunk to be summarized.

Type:

intelligence_layer.core.chunk.TextChunk

language

The desired language of the summary. ISO 619 str with language e.g. en, fr, etc.

Type:

intelligence_layer.core.detect_language.Language

class intelligence_layer.examples.SingleLabelClassifyAggregationLogic[source]

Bases: AggregationLogic[SingleLabelClassifyEvaluation, AggregatedSingleLabelClassifyEvaluation]

aggregate(evaluations: Iterable[SingleLabelClassifyEvaluation]) AggregatedSingleLabelClassifyEvaluation[source]

Evaluator-specific method for aggregating individual Evaluations into report-like Aggregated Evaluation.

This method is responsible for taking the results of an evaluation run and aggregating all the results. It should create an AggregatedEvaluation class and return it at the end.

Parameters:

evaluations – The results from running eval_and_aggregate_runs with a Task.

Returns:

The aggregated results of an evaluation run with a Dataset.

class intelligence_layer.examples.SingleLabelClassifyEvaluation(*, correct: bool, predicted: str, expected: str, expected_label_missing: bool)[source]

Bases: BaseModel

The evaluation of a single label classification run.

correct

Was the highest scoring class from the output in the set of “correct classes”.

Type:

bool

predicted

The predicted label.

Type:

str

expected

The expected label.

Type:

str

expected_label_missing

Whether the expected label was missing from the possible set of labels in the task’s input.

Type:

bool

class intelligence_layer.examples.SingleLabelClassifyEvaluationLogic[source]

Bases: SingleOutputEvaluationLogic[ClassifyInput, SingleLabelClassifyOutput, str, SingleLabelClassifyEvaluation]

do_evaluate(example: Example, *output: SuccessfulExampleOutput) Evaluation

Executes the evaluation for this specific example.

Responsible for comparing the input & expected output of a task to the actually generated output.

Parameters:
  • example – Input data of Task to produce the output.

  • *output – Output of the Task.

Returns:

The metrics that come from the evaluated Task.

class intelligence_layer.examples.SingleLabelClassifyOutput(*, scores: Mapping[str, Probability])[source]

Bases: BaseModel

Output for a single label classification task.

scores

Mapping of the provided label (key) to corresponding score (value). The score represents how sure the model is that this is the correct label. This will be a value between 0 and 1. The sum of all probabilities will be 1.

Type:

collections.abc.Mapping[str, intelligence_layer.examples.classify.classify.Probability]

class intelligence_layer.examples.SteerableLongContextSummarize(summarize: Task[SingleChunkSummarizeInput, SummarizeOutput] | None = None, chunk: Task[ChunkInput, ChunkOutput] | None = None, model: ControlModel | None = None)[source]

Bases: Task[LongContextSummarizeInput, LongContextSummarizeOutput]

Condenses a long text into a summary.

Generate a summary given an instruction setup.

Parameters:
  • summarize – The summarize task that is used to summarize a single chunk. Make sure that this and the chunk task use the same model. Defaults to SteerableSingleChunkSummarize .

  • chunk – The chunk task that is used to chunk the long text into smaller pieces such that a single chunk fits into the context of the model. Make sure that this and the summarize task use the same model. Defaults to Chunk .

  • model – A valid Aleph Alpha control model. This is passed on to the default summarize and chunk tasks. So it is ignored when the defaults for both tasks are overwritten. Defaults to luminous-base-control.

do_run(input: LongContextSummarizeInput, task_span: TaskSpan) LongContextSummarizeOutput[source]

The implementation for this use case.

This takes an input and runs the implementation to generate an output. It takes a Span for tracing of the process. The Input and Output are logged by default.

Parameters:
  • input – Generic input defined by the task implementation

  • task_span – The Span used for tracing.

Returns:

Generic output defined by the task implementation.

run(input: Input, tracer: Tracer) Output

Executes the implementation of do_run for this use case.

This takes an input and runs the implementation to generate an output. It takes a Tracer for tracing of the process. The Input and Output are logged by default.

Parameters:
  • input – Generic input defined by the task implementation

  • tracer – The Tracer used for tracing.

Returns:

Generic output defined by the task implementation.

run_concurrently(inputs: Iterable[Input], tracer: Tracer, concurrency_limit: int = 20) Sequence[Output]

Executes multiple processes of this task concurrently.

Each provided input is potentially executed concurrently to the others. There is a global limit on the number of concurrently executed tasks that is shared by all tasks of all types.

Parameters:
  • inputs – The inputs that are potentially processed concurrently.

  • tracer – The tracer passed on the run method when executing a task.

  • concurrency_limit – An optional additional limit for the number of concurrently executed task for this method call. This can be used to prevent queue-full or similar error of downstream APIs when the global concurrency limit is too high for a certain task.

Returns:

The Outputs generated by calling run for each given Input. The order of Outputs corresponds to the order of the Inputs.

class intelligence_layer.examples.SteerableSingleChunkSummarize(model: ControlModel | None = None, max_generated_tokens: int = 256, instruction_configs: Mapping[Language, str] = {Language(iso_639_1='de'): 'Fasse den Text in einem Paragraphen zusammen.', Language(iso_639_1='en'): 'Summarize the text in a single paragraph.'})[source]

Bases: Task[SingleChunkSummarizeInput, SummarizeOutput]

Summarises a text given an instruction.

Parameters:
  • model – A valid Aleph Alpha control model.

  • max_generated_tokens – The maximum number of tokens to be generated by the model. This is not intended to steer the generation length, but instead will cut off the generation at the specified limit. Note that maximum tokens + chunk size + prompt length should not exceed the context size of the model.

  • instruction_configs – A mapping of valid Language to str for each supported language.

do_run(input: SingleChunkSummarizeInput, task_span: TaskSpan) SummarizeOutput[source]

The implementation for this use case.

This takes an input and runs the implementation to generate an output. It takes a Span for tracing of the process. The Input and Output are logged by default.

Parameters:
  • input – Generic input defined by the task implementation

  • task_span – The Span used for tracing.

Returns:

Generic output defined by the task implementation.

run(input: Input, tracer: Tracer) Output

Executes the implementation of do_run for this use case.

This takes an input and runs the implementation to generate an output. It takes a Tracer for tracing of the process. The Input and Output are logged by default.

Parameters:
  • input – Generic input defined by the task implementation

  • tracer – The Tracer used for tracing.

Returns:

Generic output defined by the task implementation.

run_concurrently(inputs: Iterable[Input], tracer: Tracer, concurrency_limit: int = 20) Sequence[Output]

Executes multiple processes of this task concurrently.

Each provided input is potentially executed concurrently to the others. There is a global limit on the number of concurrently executed tasks that is shared by all tasks of all types.

Parameters:
  • inputs – The inputs that are potentially processed concurrently.

  • tracer – The tracer passed on the run method when executing a task.

  • concurrency_limit – An optional additional limit for the number of concurrently executed task for this method call. This can be used to prevent queue-full or similar error of downstream APIs when the global concurrency limit is too high for a certain task.

Returns:

The Outputs generated by calling run for each given Input. The order of Outputs corresponds to the order of the Inputs.

class intelligence_layer.examples.Subanswer(*, answer: str | None, chunk: TextChunk, highlights: Sequence[ScoredTextHighlight])[source]

Bases: BaseModel

Individual answer based on just one of the multiple chunks.

answer

The answer generated by the task. Can be a string or None (if no answer was found).

Type:

str | None

chunk

Piece of the original text that answer is based on.

Type:

intelligence_layer.core.chunk.TextChunk

highlights

The specific sentences that explain the answer the most. These are generated by the TextHighlight Task.

Type:

collections.abc.Sequence[intelligence_layer.core.text_highlight.ScoredTextHighlight]

class intelligence_layer.examples.SummarizeEvaluation(*, bleu: float, rouge: float, output: SummarizeOutput | LongContextSummarizeOutput)[source]

Bases: BaseModel

The evaluation of a summarization run.

bleu

roughly corresponds to precision

Type:

float

rouge

roughly corresponds to recall

Type:

float

output

The actual output from the task run

Type:

intelligence_layer.examples.summarize.summarize.SummarizeOutput | intelligence_layer.examples.summarize.summarize.LongContextSummarizeOutput

class intelligence_layer.examples.SummarizeOutput(*, summary: str, generated_tokens: int)[source]

Bases: BaseModel

The output of a summarize-task.

summary

The summary generated by the task.

Type:

str

generated_tokens

The number of tokens generated for the summary.

Type:

int