Core concepts
Model contract
A model contract defines the expected behaviour of an AI / ML model. Making it easy for external applications to know what to expect.
However, it also means that we can simplify a lot of common use-cases for our models. Leading to less code to maintain, and a system that is easier to understand.
In this section will we go through how you can define the following:
- The output structure of a model
- The input features to a model
- Where we store our output
- How to load the output from our models
- Where our models are accessible
- Who have the ownership of the model
- How can we create training datasets
- Where do we store model datasets
Output Structure
Similar to a feature_view
do we define the structure of the model output. However, rather then just defining the prediction value would it contain a bit more metadata that would be used in applications. Anything from the related entities, model version, when it was predicted, potential input features and so on.
So if we wanted to define the classic taxi
regression model, we could have described it as the following contract.
from examples.taxi.arrival import TaxiArrivals
arrivals = TaxiArrivals()
@model_contract(
input_features=[...],
)
class TaxiModel:
trip_id = Int32().as_entity()
predicted_duration = arrivals.duration.as_regression_target()
predicted_at = EventTimestamp()
Thereby we define that our model should output a predicted_duration
for a given trip_id
.
Regression Target
Notice how we define the target feature by refering to the duration
in our TaxiArrivals
view. This makes it possible to get code completion, data lineage and we can type check our code through tools like pyright
.
Furthermore, due to the data lineage will it also be possible to join our ground truths to our training data set automatically. But we can also add static checks for feature leakage.
Input Features
However, our models will always need some input. That is why we are able to define the input features. Here can the input either be a feature view or the output of another model.
from examples.taxi.arrival import TaxiArrivals
from examples.taxi.departure import TaxiDepartures, TaxiVendor
departures = TaxiDepartures()
vendor = TaxiVendor()
arrivals = TaxiArrivals()
@model_contract(
input_features=[
departures.day_of_week,
departures.travel_distance,
vendor.passenger_hour_mean,
vendor.passenger_20_min_mean,
],
)
class TaxiModel:
trip_id = Int32().as_entity()
predicted_duration = arrivals.duration.as_regression_target()
predicted_at = EventTimestamp()
You can also refere to a view, but then all features in that view will be used. Which can lead to unexpected behavior if the upstream views change.
And with this can we already load input features given the entities in the upstream views.
input_features = await store.model(TaxiModel).features_for({
"vendor_id": [2, 1, 4],
"trip_id": [
"ea6b8d5d-62fd-4664-a112-4889ebfcdf2b",
"64c4c94f-2a85-406f-86e6-082f1f7aadc8",
"3258461f-6113-4c5e-864b-75a0dee808d3"
]
}).to_polars()
print(input_features)
Which will return something like
shape: (3, 6)
┌───────────┬─────────────────────────────────┬─────────────┬─────────────────┬─────────────────────┬───────────────────────┐
│ vendor_id ┆ trip_id ┆ day_of_week ┆ travel_distance ┆ passenger_hour_mean ┆ passenger_20_min_mean │
│ --- ┆ --- ┆ --- ┆ --- ┆ --- ┆ --- │
│ i32 ┆ str ┆ i8 ┆ f64 ┆ f64 ┆ f64 │
╞═══════════╪═════════════════════════════════╪═════════════╪═════════════════╪═════════════════════╪═══════════════════════╡
│ 2 ┆ ea6b8d5d-62fd-4664-a112-4889eb… ┆ 1 ┆ 0.022726 ┆ 2.031903 ┆ 2.031903 │
│ 1 ┆ 64c4c94f-2a85-406f-86e6-082f1f… ┆ 1 ┆ 0.009103 ┆ 1.292352 ┆ 1.292352 │
│ 4 ┆ 3258461f-6113-4c5e-864b-75a0de… ┆ 1 ┆ 0.088376 ┆ null ┆ null │
└───────────┴─────────────────────────────────┴─────────────┴─────────────────┴─────────────────────┴───────────────────────┘
Freshness
If you are interested to see when the input sources was last updated. Maybe as a guardrail, or just to debug. Then you can use the input_freshness()
function.
freshness = await store.model(TaxiModel).input_freshness()
print(freshness)
Which will return something like
{
"feature_view:vendor": "2024-10-20 20:21:50",
"feature_view:departures": "2024-11-20 10:11:32",
}
Output Storage
We might run this model as a batch model, if so will we also need to store the output somewhere. And simliar to how a feature_view
s can define a source, can we also define an output_source
.
@model_contract(
input_features=[...],
output_source=FileSource.parquet_at("predicted_trips.parquet"),
)
class TaxiModel:
vendor_id = Int32().as_entity()
trip_id = String().as_entity()
predicted_duration = arrivals.duration.as_regression_target()
model_version = Int32().as_model_version()
This also means that we have all the information to load our output for a given trip_id
.
preds = await store.model(TaxiModel).predictions_for({
"vendor_id": [2, 1, 4],
"trip_id": [
"ea6b8d5d-62fd-4664-a112-4889ebfcdf2b",
"64c4c94f-2a85-406f-86e6-082f1f7aadc8",
"3258461f-6113-4c5e-864b-75a0dee808d3"
]
}).to_polars()
print(preds)
Which could return something like the following
shape: (3, 4)
┌───────────┬────────────────────┬───────────────┬─────────────────────────────────┐
│ vendor_id ┆ predicted_duration ┆ model_version ┆ trip_id │
│ --- ┆ --- ┆ --- ┆ --- │
│ i32 ┆ i64 ┆ i32 ┆ str │
╞═══════════╪════════════════════╪═══════════════╪═════════════════════════════════╡
│ 2 ┆ 500 ┆ 1 ┆ ea6b8d5d-62fd-4664-a112-4889eb… │
│ 1 ┆ 292 ┆ 1 ┆ 64c4c94f-2a85-406f-86e6-082f1f… │
│ 4 ┆ 689 ┆ 1 ┆ 3258461f-6113-4c5e-864b-75a0de… │
└───────────┴────────────────────┴───────────────┴─────────────────────────────────┘
Freshness
If you are interested to see when the source was last updated with any outputs, then you can use the .prediction_freshness()
call
freshness = await store.model(TaxiModel).prediction_freshness()
Which will return a timestamp with something like 2024-10-24 23:21:50
.
All data products have a few assumptions on how frequently a data product will be updated. That is why Aligned makes it possible to add freshness thresholds.
This makes it clear what is the "expected" delay of a prediction, also refered to as the acceptable_freshness
. However, it also defines an unacceptable_freshness
which defines the an delay that should not occure.
Combined with the Aligned UI can these thresholds be used to send of different types of alerts that monitor the data quality.
Exposed Models
A model have little value if it is hard to use it. Therefore, do Aligned make it possible to define where our models are exposed, and thereby how to use them.
from aligned.exposed_model.mlflow import mlflow_server
...
@model_contract(
exposed_model=mlflow_server(
host="https://my-taxi-model-endpoint:8000",
model_name="taxi_model",
model_alias="champion",
)
)
class TaxiModel:
...
With the added exposed_model
will it unlock the possibility of using the predict_over
functionality.
preds = await store.model(TaxiModel).predict_over({
"trip_id": [...],
"vendor_id": [...]
}).to_polars()
This will load the relevant features, format the HTTP request to the mlflow
server, add relevant metadata like model versions, timestamps, etc.
Futhermore is it also possible to insert, upsert or overwrite directly to the output source as well.
await store.model(TaxiModel).predict_over({
"trip_id": [...],
"vendor_id": [...]
}).upsert_into_output_source()
Available models
The models that currently exists are:
- Custom Python function
- MLFlow Server
- In Memory MLFlow model
- Langchain Model
- Ollama Complete
- Ollama Embedding
- OpenAI Embedding
- Sentence Transformer
- Shadow Models
- A/B Test Models
Freshness
All data products have a few assumptions on how frequently a data product will be updated. That is why Aligned makes it possible to add freshness thresholds.
This makes it clear what is the "expected" delay of a prediction, also refered to as the acceptable_freshness
. However, it also defines an unacceptable_freshness
which defines the an delay that should not occure.
Combined with the Aligned UI can these thresholds be used to send of different types of alerts that monitor the data quality.
Dataset Store
Working with data often leads to very spesific datsets, like train
, test
, validation
, but you may want to store other spesific datasets related to a model. Therefore, will Aligned enable the user to register where these datasets exists.
In the shown example will it create a json file called taxi_datasets
that contains the location of the dataset files, with some associated metadata.
@model_contract(
dataset_store=FileSource.json_at("taxi_datasets.json"),
...
)
class TaxiModel:
...
You can manually load or modify the DatasetStore
object, or you can use some built in functions like the following training pipeline.
store = await ContractStore.from_dir(".")
entites = ...
dataset_store = store.model("taxi").dataset_store
datasets = await (store.model("taxi")
.with_labels()
.features_for(entities)
.train_test(train_size=0.7)
.store_dataset_at_directory(
FileSource.directory("datasets/taxi"),
dataset_store=dataset_store
)
)
train = await datasets.train.to_polars()
print(train.input)
print(train.labels)