Core concepts
Data Source
A data source describes any form of data, ranging from a database table, key-value store, file, or structured folder. Consequently, the system can support existing infrastructures such as application databases, data warehouses, as well as more complex infrastructures like data lakes.
Furthermore, data sources will vary in the amount of needed metadata. Therefore, each data source is represented with a unique key enabling us to understand which metadata to decode. For instance, a CSV data source can contain a separator symbol and a compression format. However, a PostgreSQL source needs a table name, and potentially a schema name. Hence, enabling the user to be flexible, yet detailed.
Setting Data Sources
Most often will a source be associated with either a feature_view
or a model_contract
. However, it is also possible to update these on the fly with the .update_source_for(...)
function.
store: ContractStore = await ContractStore.from_glob("**/contract.py")
store = store.update_source_for(
"feature_view:titanic",
InMemorySource.from_values({
"passenger_id": [1, 2, 3],
"age": [1, 2, 3],
...
})
)
There are currently support for the following data sources:
- CSV
- Parquet
- Hive Paritioned Parquet
- Schema Versioned
- In Memory Source
- Random Source
- Azure Blog Storage
- AWS Redshift
- AWS S3
- PostgreSQL
- Feature View
- Custom Source
CSV File
from aligned import FileSource
taxi_file = FileSource.csv_at("taxi_data.csv")
CSV Config
There are a lot of differnet variations of CSV files. That is why it is possible to define a CsvConfig
containting which seperator, and compressions to use.
from aligned import FileSource
from aligned.sources.local import CsvConfig
taxi_file_config = CsvConfig(
seperator=";",
compression="gzip"
)
compressed_taxi_data = FileSource.csv_at(
"gzip_taxi_data.csv",
mapping_keys={
"ID": "taxi_id",
"Duration": "duration",
},
csv_config=taxi_file_config
)
Parquet File
There are a lot of differnet variations of Parquet files. That is why it is possible to define a ParquetConfig
containting which seperator, and compressions to use.
from aligned import FileSource
taxi_file = FileSource.parquet_at("taxi_data.parquet")
Parquet Config
from aligned import FileSource
from aligned.sources.local import ParquetConfig
taxi_file_config = ParquetConfig(
compression="gzip"
)
compressed_taxi_data = FileSource.parquet_at(
"gzip_taxi_data.parquet",
mapping_keys={
"ID": "taxi_id",
"Duration": "duration",
},
config=taxi_file_config
)
Hive Partitioned Parquet
Often we have data that can be split into different segments based on their data. This is why aligned offer a hive partitioned data source, which can signficantly improve filtering performance, and reduce memory usage in certain scenarios.
from aligned import FileSource
partitioned_credits = FileSource.partitioned_parquet_at(
directory="credit_scoring",
partition_keys=["country"],
mapping_keys={ ... },
config=None, # A parquet config
date_formatter=None
)
This could partition the data with a file structure as:
credit_scoring/country=norway/data.parquet
credit_scoring/country=danmark/data.parquet
credit_scoring/country=uk/data.parquet
Schema Versioned Source
When deploying new versions of our programs is it highly likely that there will be a conflict in data sources at some point.
This can especially become a problem when deploying AB tests, or canarys. As we might need to run conflicting versions in production at the same time.
That is why aligned offer a data source that changes the source based on the data requirments. Therefore, enabling to reuse backwards complient sources, while still maintaining multiple versions at the same time with ease.
taxi_dir = FileSource.directory("taxi_data")
vendors_data = taxi_dir.with_schema_version(
sub_directory="vendors"
).parquet_at("data.parquet")
@feature_view(source=vendors_data)
class VendorSchema:
vendor_id = Int32().as_entity()
number_of_passengers = String()
This would read from the following file path:
taxi_data/vendors/26c65192dc9a74910a845d03ae05d4d2/data.parquet
However, if we want to change the data type from a string to an int, or maybe remove a column would it break.
So if a new code version looked like the folowing
# Change the data type to an Int
@feature_view(source=vendors_data)
class VendorSchema:
vendor_id = Int32().as_entity()
number_of_passengers = Int32()
Would it read from the path:
taxi_data/vendors/2d05c032e9a7e6854f42b84cd6764092/data.parquet
Derived features
Adding new derived features and computations will not effect the schema version.
In Memory Source
Sometimes will it be nice to have an in memory cache, or hard-code some data for a spesific schenario. This is where the in memory source come into play.
from aligned.sources.in_memory_source import InMemorySource
# Assumes that there will be an insert, upsert or overwrite in the future
empty_source = InMemorySource.empty()
# Hard code values, but needs to contain the full schema
with_values = InMemorySource.from_values({
"passenger_id": [1, 2, 3],
"age": [1, 2, 3],
...
})
# Or send in a raw polars df
with_polars_df = InMemorySource(pl.DataFrame(...))
Random Source
Quality controlling our ML systems can be very hard as they depend so much on our data. However, sometimes we just want to know that our code runs, and that they do what we want given a few data scenarios. Thankfully the random source makes it easy to fabricate scenarios that our systems should get right.
The random source will leverate the data schema, and it's constraint to generate data points that should be somewhat in the expected distribution. Yet still enable you to hard-code scenarios if needed.
The source will also make sure to keep all computed features consistent with the the raw data.
from aligned.sources.random_source import RandomDataSource
@feature_view(
source=RandomDataSource.with_values({
"passenger_id": [1, 2, 3],
"age": [10, 0.5, 90]
}, seed=1)
)
class TitanicPassenger:
passenger_id = Int32().as_entity()
age = Float()
sibsp = Int32().lower_bound(0).upper_bound(68).description(
"Fun fact: the world record for most born children is 69!"
)
has_siblings = sibsp > 0
sex = String().accepted_values(["male", "female"])
is_male, is_female = sex.one_hot_encode(['male', 'female'])
survived = Bool()
df = await TitanicPassenger.query().features_for({
"passenger_id": [1, 2]
}).to_polars()
print(df)
Which returns the following:
shape: (2, 8)
┌──────────────┬─────────┬───────────┬──────────────┬──────────┬──────┬───────┬────────┐
│ has_siblings ┆ is_male ┆ is_female ┆ passenger_id ┆ survived ┆ age ┆ sibsp ┆ sex │
│ --- ┆ --- ┆ --- ┆ --- ┆ --- ┆ --- ┆ --- ┆ --- │
│ bool ┆ bool ┆ bool ┆ i32 ┆ bool ┆ f64 ┆ i32 ┆ str │
╞══════════════╪═════════╪═══════════╪══════════════╪══════════╪══════╪═══════╪════════╡
│ true ┆ false ┆ true ┆ 1 ┆ true ┆ 10.0 ┆ 66 ┆ female │
│ true ┆ true ┆ false ┆ 2 ┆ false ┆ 0.5 ┆ 18 ┆ male │
└──────────────┴─────────┴───────────┴──────────────┴──────────┴──────┴───────┴────────┘
You also have a .dummy_store()
on the ContractStore
which will update all sources to the RandomDataSource
.
store = await ContractStore.from_glob("**/contract.py")
dummy_store = store.dummy_store()
Azure Blob Storage
There is also support for using external storage solutions like Azure Blob Storage.
The Azure config works similar to a Directory
. So all file sources should be available. Aka. csv, parquet, partitioned parquet and the schema versioned source.
from aligned.sources.azure_blob_storage import AzureBlobConfig
config = AzureBlobConfig(
account_id_env="...",
tenant_id_env="...",
client_id_env="...",
client_secret_env="...",
account_name_env="..."
)
taxi_data = config.parquet_at("taxi_data.parquet")
AWS Redshift
from aligned import RedshiftSQLConfig
database = RedshiftSQLConfig(env_var='REDSHIFT_DB_URL')
taxi_data = database.table("taxi")
Redshift Renaming
There is often not a one to one mapping between data and the "wanted" structure. Therefore, bellow is a way to rename columns.
from aligned import RedshiftSQLConfig
database = RedshiftSQLConfig(env_var='REDSHIFT_DB_URL')
with_renames = database.table(
name="taxi",
mapping_values={
"ID": "taxi_id",
"Duration": "duration"
}
)
AWS S3
from aligned import AwsConfig
bucket = AwsS3Config(
access_token_env="AWS_ACCESS_TOKEN",
secret_token_env="AWS_SECRET_TOKEN",
bucket_env="AWS_BUCKET",
region_env="AWS_REGION"
)
taxi_parquet = bucket.parquet_at("taxi.parquet")
taxi_csv = bucket.csv_at("taxi.csv")
PostgreSQL
from aligned import PostgreSQLConfig
database = PostgreSQLConfig(env_var='PSQL_DB_URL')
taxi_data = database.table("taxi")
PostgreSQL Renaming
There is often not a one to one mapping between data and the "wanted" structure. Therefore, bellow is a way to rename columns.
from aligned import PostgreSQLConfig
database = PostgreSQLConfig(env_var='PSQL_DB_URL')
taxi_data = database.table(
name="taxi",
mapping_values={
"ID": "taxi_id",
"Duration": "duration"
}
)
Feature View
You can also use a feature view as a data source.
Feature View Lineage
Using a feature view as the source for another will collect data lineage information. Therefore, aligned knows which sources depend on each other, which could be useful to keeping materialized sources up to date
@feature_view(
name="zipcode",
source=FileSource.parquet_at("...")
)
class Zipcode:
zipcode = Int64().as_entity()
event_timestamp = EventTimestamp()
created_timestamp = Timestamp()
location_type = String()
population = Int64()
@feature_view(
name="other",
source=Zipcode
)
class Other:
location_type = String().as_entity()
population = Int64()
event_timestamp = EventTimestamp()
Schema Structure
When using a feature view as a source. Make sure that the schema is matching in the down stream view. There is currently poor support for makeing sure you uphold the schama. However, this will be improved, as we can check that the schema would be upheld.
Custom Source
In some cases can it be nice to integrate with custom logic. Maybe you want to add a way to load data from an API or simulate data.
In this scenario do aligned support a custom data source where you can provide the functionality.
Here will you always get a RetrivalRequest
structure. That contains which entities, and features are expected, and some other information like event timestamps etc.
from aligned import CustomMethodDataSource
from aligned.request.retrival_request import RetrivalRequest
import polars as pl
def load_all_data(request: RetrivalRequest, limit: int | None) -> pl.LazyFrame:
# My custom method
return pl.DataFrame(...).lazy()
custom_source = CustomMethodDataSource.from_methods(
all_data=load_all_data,
all_between_dates=None,
features_for=None
)