Examples

Code Chatbot with RAG

This page will showcase how you can setup a chatbot that answers questions about a Python codebase, by leveraging RAG.

The Codebase Documents

Before we start do we need some documents that we can query. In this project did I want a chatbot that knows all the documentation that exists in Markdown files, but also have an overview over the different Python functions and classes that exists. I did not want the LLM to know the inner workings of the functions, only the high level functionality and usage.

As a result did I create a function that finds all Markdown and Python files. It will the parse all of them into different sections, and add a type indicationg what type of document it is.

The schema of each chunk uses the following schema

class ModuleChunk:
    id = String().as_entity()
    source = String()
    lineno = Int32()
    name = String()

    source_type = String().accepted_values([
        "docs",
        "import",
        "class",
        "function",
        "attribute",
    ])
    content = String().is_optional()
    loaded_at = EventTimestamp()

Futhermore, to indicate which function will generate the output can we add the function as the source. However, I will also add a .with_loaded_at() to automatically add a freshness timestamp.

@feature_view(
    source=CustomMethodDataSource.from_load(parse_code_chunks).with_loaded_at(),
)
class ModuleChunk:
    id = String().as_entity()
    source = String()
    lineno = Int32()
    name = String()

    source_type = String().accepted_values([
        "docs",
        "import",
        "class",
        "function",
        "attribute",
    ])
    content = String().is_optional()
    loaded_at = EventTimestamp()

I also want to store all of the chunks in a csv file, but I want the timestamp to be formatted as an Unix timestamp. Leading to the added materialized_source.

@feature_view(
    source=CustomMethodDataSource.from_load(parse_code_chunks).with_loaded_at(),
    materialized_source=FileSource.csv_at(
        "data/module_chunks.csv", date_formatter=DateFormatter.unix_timestamp()
    ),
)
class ModuleChunk:
    id = String().as_entity()
    source = String()
    lineno = Int32()
    name = String()

    source_type = String().accepted_values( [
        "docs",
        "import",
        "class",
        "function",
        "attribute",
    ])
    content = String().is_optional()
    loaded_at = EventTimestamp()

With this do we have our documents, and we can now start inserting them into our vector database.

Chunk Embedding

In order to setup a RAG system will we need some embeddings to inject.

Prompt Construction

However, currently do we have multiple properties, and not one string to embed.

Thankfully do aligned contain a format_string function which will construct a prompt for you, given a set of properties.

@feature_view(...)
class ModuleChunk:
    id = String().as_entity()
    source = String()
    lineno = Int32()
    name = String()

    source_type = String().accepted_values(...)
    content = String().is_optional()
    loaded_at = EventTimestamp()

    summary = String().format_string(
        features=[source_type, name, lineno, source],
        format="Type: '{source_type}': named: '{name}' at line nr. '{lineno}' in '{source}'\n\nBehavior: {content}"
    )

We can now preview our summary by running the following code.

df = await store.feature_view(ModuleChunk).process_input({
    "id": ["some file id"],
    "source": ["docs"],
    "lineno": [10],
    "name": ["Some name"],
    "source_type": ["docs"],
    "content": ["Some content"],
}).to_polars()
print(df)

Which prints something like

shape: (1, 8)
┌────────────────┬────────┬────────┬───────────┬─────────────┬─────────┬───────────────┐
│ id             ┆ source ┆ lineno ┆ name      ┆ source_type ┆ content ┆ summary       │
│ ---            ┆ ---    ┆ ---    ┆ ---       ┆ ---         ┆ ---     ┆ ---           │
│ str            ┆ str    ┆ i64    ┆ str       ┆ str         ┆ str     ┆ str           │
╞════════════════╪════════╪════════╪═══════════╪═════════════╪═════════╪═══════════════╡
│ some file id   ┆ docs   ┆ 10     ┆ Some name ┆ docs        ┆ Some    ┆ Type: 'docs': │
│                ┆        ┆        ┆           ┆             ┆ content ┆ named: 'Some  │
│                ┆        ┆        ┆           ┆             ┆         ┆ nam…          │
└────────────────┴────────┴────────┴───────────┴─────────────┴─────────┴───────────────┘

The additional pro of using this transformation, is that the embedding models can detect the transformation, and use the format as the embedding version. This means we can support multiple versions at the same time, but we can also add warnings for data drift in our CI / CD setup, as we can detect changes to our prompt template.

So with this can we create the embeddings and use the summary as the input to the model.

@model_contract(
    input_features=[docs.summary],
    exposed_model=openai_embedding(
        model="text-embedding-3-small"
    ),
)
class ChunkEmbedding:
    id = String().as_entity()
    embedding = Embedding(1536)
    prompt_version = String().as_model_version()
    full_prompt = String()

With this can we run the following code

embedding = await store.model(ChunkEmbedding).predict_over({
    "summary": [prompt]
}).to_polars()
print(embedding)

Which will return something like the following.

shape: (1, 5)
┌─────────────────────┬────────────────────┬────────────────────┬────────────────────┬─────────────┐
│ summary             ┆ full_prompt        ┆ prompt_version     ┆ updated_at         ┆ embedding   │
│ ---                 ┆ ---                ┆ ---                ┆ ---                ┆ ---         │
│ str                 ┆ str                ┆ str                ┆ datetime[μs, UTC]  ┆ list[f64]   │
╞═════════════════════╪════════════════════╪════════════════════╪════════════════════╪═════════════╡
│ What is in the ml   ┆ What is in the ml  ┆ d35065742e14eb5f95 ┆ 2024-10-27         ┆ [0.038037,  │
│ kickstarter …       ┆ kickstarter …      ┆ f6c8137ef038…      ┆ 11:45:59.065352    ┆ 0.998796, … │
│                     ┆                    ┆                    ┆ UTC                ┆ -0.1742…    │
└─────────────────────┴────────────────────┴────────────────────┴────────────────────┴─────────────┘

The Vector Database

Now with our embeddings created would it be nice to store them somewhere. For this project will I use lancedb which is a fast and lightweight vector database.

from aligned.sources.lancedb import LanceDBConfig

vector_db = LanceDBConfig(path="data/lancedb")

@model_contract(
    input_features=[...],
    exposed_model=openai_embedding(...),
    output_source=vector_db.table("chunk")
)
class ChunkEmbedding:
    id = String().as_entity()
    embedding = Embedding(1536)
    prompt_version = String().as_model_version()
    full_prompt = String()

We can now populate all the chunks by running the following

store = await ContractStore.from_glob("**/*.py")

await store.model(ChunkEmbedding).predict_over(
    store.feature_view(ModuleChunk).all()
).upsert_into_output_source()

Q&A Model

Now that we have a vector store that we can look up will we need a way to construct our LLM prompt.

Prompt Input

Again we need to construct a prompt. However, this time it will not be one single document, but rather contain relevant context, and the question to answer.

Therefore, a vector lookup, and some custom string formatting is needed.

However, we will start by defining the input needed to formate our prompt, by adding a query parameter which should be a string.

@feature_view(source=InMemorySource.empty())
class QuestionAnswerPrompt:
    query = String()

Nearest N Lookup

To find the cunks that are relevant for our query can we do a nearest neighbour lookup.

This is currently done through a custom transform_row transformation. So we can get as many related document ids as we like.

@feature_view(...)
class QuestionAnswerPrompt:
    query = String()

    @transform_row(
        using_features=[query],
        return_type=List(String())
    )
    async def related_chunk_ids(self, input: dict, store: ContractStore) -> list[str]:
        docs = await store.vector_index("chunk").nearest_n_to(
            entities={"summary": [input["query"]]},
            number_of_records=15
        ).to_polars()
        return docs["id"].unique().to_list()

However, we are not interested in the ids of the chunks. We want the content, which can be easily done with the features_for lookup. Therefore, we define that we want the summay feature for a set of entity ids.

@feature_view(...)
class QuestionAnswerPrompt:
    query = String()

    @transform_row(...)
    async def related_chunk_ids(self, input: dict, store: ContractStore) -> list[str]:
        ...
        return docs["id"].unique().to_list()

    relevant_chunks = ModuleChunk().summary.for_entities({ 
        "id": related_chunk_ids
    })

Prompt Construction

Now we have our relevant context. We can therefore construct the final prompt that will be passed ot the LLM.

Again can we use a transform_row transformation, as this requires some more custom logic.

@feature_view(...)
class QuestionAnswerPrompt:
    query = String()

    @transform_row(...)
    async def related_chunk_ids(self, input: dict, store: ContractStore) -> list[str]:
        ...
        return docs["id"].unique().to_list()

    relevant_chunks = ModuleChunk().summary.for_entities({ 
        "id": related_chunk_ids
    })

    @transform_row(
        using_features=[relevant_chunks, query], 
        return_type=String()
    )
    def formatted_prompt(self, input: dict[str, Any], store: ContractStore) -> str:
        prompt += "Related code chunks:\n\n"
        for chunk in input["relevant_chunks"]:
            prompt += f"{chunk}\n\n"

        return prompt + f"Answer the following question with the information above: {input['query']}"

We can now run the following code to get a formatted prompt.

prompts = await store.feature_view(QuestionAnswerPrompt).process_input({
    "query": ["What is in the ml kickstarter project?"]
}).to_polars()

Which could return a prompt similar to the following

Related code chunks:

Type: 'docs': named: 'AI / ML Capabilities' at line nr. '0' in 'README.md/# ML Kickstarter/AI / ML Capabilities'

Behavior: - LLM / embedding server using Ollama
- Model experiment tracking using MLFLow
- Model regristry using MLFLow
- Model serving using MLFlow
- Model evaluation in production using Aligned
- Job orchestration using Prefect
- Data catalog using Aligned
- Data management using Aligned
- Data quality management using Aligned
- Data annotation using Aligned

Type: 'docs': named: '`make clean`' at line nr. '0' in 'README.md/# ML Kickstarter/Other make commands/`make clean`'

Behavior: Removes all unused docker images.

Answer the following question with the information above: What is in the ml kickstarter project?
Previous
Text Sentiment