Examples
Code Chatbot with RAG
This page will showcase how you can setup a chatbot that answers questions about a Python codebase, by leveraging RAG.
The Codebase Documents
Before we start do we need some documents that we can query. In this project did I want a chatbot that knows all the documentation that exists in Markdown files, but also have an overview over the different Python functions and classes that exists. I did not want the LLM to know the inner workings of the functions, only the high level functionality and usage.
As a result did I create a function that finds all Markdown and Python files. It will the parse all of them into different sections, and add a type indicationg what type of document it is.
The schema of each chunk uses the following schema
class ModuleChunk:
id = String().as_entity()
source = String()
lineno = Int32()
name = String()
source_type = String().accepted_values([
"docs",
"import",
"class",
"function",
"attribute",
])
content = String().is_optional()
loaded_at = EventTimestamp()
Futhermore, to indicate which function will generate the output can we add the function as the source
. However, I will also add a .with_loaded_at()
to automatically add a freshness timestamp.
@feature_view(
source=CustomMethodDataSource.from_load(parse_code_chunks).with_loaded_at(),
)
class ModuleChunk:
id = String().as_entity()
source = String()
lineno = Int32()
name = String()
source_type = String().accepted_values([
"docs",
"import",
"class",
"function",
"attribute",
])
content = String().is_optional()
loaded_at = EventTimestamp()
I also want to store all of the chunks in a csv file, but I want the timestamp to be formatted as an Unix timestamp. Leading to the added materialized_source
.
@feature_view(
source=CustomMethodDataSource.from_load(parse_code_chunks).with_loaded_at(),
materialized_source=FileSource.csv_at(
"data/module_chunks.csv", date_formatter=DateFormatter.unix_timestamp()
),
)
class ModuleChunk:
id = String().as_entity()
source = String()
lineno = Int32()
name = String()
source_type = String().accepted_values( [
"docs",
"import",
"class",
"function",
"attribute",
])
content = String().is_optional()
loaded_at = EventTimestamp()
With this do we have our documents, and we can now start inserting them into our vector database.
Chunk Embedding
In order to setup a RAG system will we need some embeddings to inject.
Prompt Construction
However, currently do we have multiple properties, and not one string to embed.
Thankfully do aligned contain a format_string
function which will construct a prompt for you, given a set of properties.
@feature_view(...)
class ModuleChunk:
id = String().as_entity()
source = String()
lineno = Int32()
name = String()
source_type = String().accepted_values(...)
content = String().is_optional()
loaded_at = EventTimestamp()
summary = String().format_string(
features=[source_type, name, lineno, source],
format="Type: '{source_type}': named: '{name}' at line nr. '{lineno}' in '{source}'\n\nBehavior: {content}"
)
We can now preview our summary
by running the following code.
df = await store.feature_view(ModuleChunk).process_input({
"id": ["some file id"],
"source": ["docs"],
"lineno": [10],
"name": ["Some name"],
"source_type": ["docs"],
"content": ["Some content"],
}).to_polars()
print(df)
Which prints something like
shape: (1, 8)
┌────────────────┬────────┬────────┬───────────┬─────────────┬─────────┬───────────────┐
│ id ┆ source ┆ lineno ┆ name ┆ source_type ┆ content ┆ summary │
│ --- ┆ --- ┆ --- ┆ --- ┆ --- ┆ --- ┆ --- │
│ str ┆ str ┆ i64 ┆ str ┆ str ┆ str ┆ str │
╞════════════════╪════════╪════════╪═══════════╪═════════════╪═════════╪═══════════════╡
│ some file id ┆ docs ┆ 10 ┆ Some name ┆ docs ┆ Some ┆ Type: 'docs': │
│ ┆ ┆ ┆ ┆ ┆ content ┆ named: 'Some │
│ ┆ ┆ ┆ ┆ ┆ ┆ nam… │
└────────────────┴────────┴────────┴───────────┴─────────────┴─────────┴───────────────┘
The additional pro of using this transformation, is that the embedding models can detect the transformation, and use the format as the embedding version. This means we can support multiple versions at the same time, but we can also add warnings for data drift in our CI / CD setup, as we can detect changes to our prompt template.
So with this can we create the embeddings and use the summary
as the input to the model.
@model_contract(
input_features=[docs.summary],
exposed_model=openai_embedding(
model="text-embedding-3-small"
),
)
class ChunkEmbedding:
id = String().as_entity()
embedding = Embedding(1536)
prompt_version = String().as_model_version()
full_prompt = String()
With this can we run the following code
embedding = await store.model(ChunkEmbedding).predict_over({
"summary": [prompt]
}).to_polars()
print(embedding)
Which will return something like the following.
shape: (1, 5)
┌─────────────────────┬────────────────────┬────────────────────┬────────────────────┬─────────────┐
│ summary ┆ full_prompt ┆ prompt_version ┆ updated_at ┆ embedding │
│ --- ┆ --- ┆ --- ┆ --- ┆ --- │
│ str ┆ str ┆ str ┆ datetime[μs, UTC] ┆ list[f64] │
╞═════════════════════╪════════════════════╪════════════════════╪════════════════════╪═════════════╡
│ What is in the ml ┆ What is in the ml ┆ d35065742e14eb5f95 ┆ 2024-10-27 ┆ [0.038037, │
│ kickstarter … ┆ kickstarter … ┆ f6c8137ef038… ┆ 11:45:59.065352 ┆ 0.998796, … │
│ ┆ ┆ ┆ UTC ┆ -0.1742… │
└─────────────────────┴────────────────────┴────────────────────┴────────────────────┴─────────────┘
The Vector Database
Now with our embeddings created would it be nice to store them somewhere. For this project will I use lancedb
which is a fast and lightweight vector database.
from aligned.sources.lancedb import LanceDBConfig
vector_db = LanceDBConfig(path="data/lancedb")
@model_contract(
input_features=[...],
exposed_model=openai_embedding(...),
output_source=vector_db.table("chunk")
)
class ChunkEmbedding:
id = String().as_entity()
embedding = Embedding(1536)
prompt_version = String().as_model_version()
full_prompt = String()
We can now populate all the chunks by running the following
store = await ContractStore.from_glob("**/*.py")
await store.model(ChunkEmbedding).predict_over(
store.feature_view(ModuleChunk).all()
).upsert_into_output_source()
Q&A Model
Now that we have a vector store that we can look up will we need a way to construct our LLM prompt.
Prompt Input
Again we need to construct a prompt. However, this time it will not be one single document, but rather contain relevant context, and the question to answer.
Therefore, a vector lookup, and some custom string formatting is needed.
However, we will start by defining the input needed to formate our prompt, by adding a query
parameter which should be a string.
@feature_view(source=InMemorySource.empty())
class QuestionAnswerPrompt:
query = String()
Nearest N Lookup
To find the cunks that are relevant for our query
can we do a nearest neighbour lookup.
This is currently done through a custom transform_row
transformation. So we can get as many related document ids as we like.
@feature_view(...)
class QuestionAnswerPrompt:
query = String()
@transform_row(
using_features=[query],
return_type=List(String())
)
async def related_chunk_ids(self, input: dict, store: ContractStore) -> list[str]:
docs = await store.vector_index("chunk").nearest_n_to(
entities={"summary": [input["query"]]},
number_of_records=15
).to_polars()
return docs["id"].unique().to_list()
However, we are not interested in the ids of the chunks. We want the content, which can be easily done with the features_for
lookup. Therefore, we define that we want the summay
feature for a set of entity ids.
@feature_view(...)
class QuestionAnswerPrompt:
query = String()
@transform_row(...)
async def related_chunk_ids(self, input: dict, store: ContractStore) -> list[str]:
...
return docs["id"].unique().to_list()
relevant_chunks = ModuleChunk().summary.for_entities({
"id": related_chunk_ids
})
Prompt Construction
Now we have our relevant context. We can therefore construct the final prompt that will be passed ot the LLM.
Again can we use a transform_row
transformation, as this requires some more custom logic.
@feature_view(...)
class QuestionAnswerPrompt:
query = String()
@transform_row(...)
async def related_chunk_ids(self, input: dict, store: ContractStore) -> list[str]:
...
return docs["id"].unique().to_list()
relevant_chunks = ModuleChunk().summary.for_entities({
"id": related_chunk_ids
})
@transform_row(
using_features=[relevant_chunks, query],
return_type=String()
)
def formatted_prompt(self, input: dict[str, Any], store: ContractStore) -> str:
prompt += "Related code chunks:\n\n"
for chunk in input["relevant_chunks"]:
prompt += f"{chunk}\n\n"
return prompt + f"Answer the following question with the information above: {input['query']}"
We can now run the following code to get a formatted prompt.
prompts = await store.feature_view(QuestionAnswerPrompt).process_input({
"query": ["What is in the ml kickstarter project?"]
}).to_polars()
Which could return a prompt similar to the following
Related code chunks:
Type: 'docs': named: 'AI / ML Capabilities' at line nr. '0' in 'README.md/# ML Kickstarter/AI / ML Capabilities'
Behavior: - LLM / embedding server using Ollama
- Model experiment tracking using MLFLow
- Model regristry using MLFLow
- Model serving using MLFlow
- Model evaluation in production using Aligned
- Job orchestration using Prefect
- Data catalog using Aligned
- Data management using Aligned
- Data quality management using Aligned
- Data annotation using Aligned
Type: 'docs': named: '`make clean`' at line nr. '0' in 'README.md/# ML Kickstarter/Other make commands/`make clean`'
Behavior: Removes all unused docker images.
Answer the following question with the information above: What is in the ml kickstarter project?