Core concepts
Data Source
A data source represents any form of data, including database tables, key-value stores, files, or structured folders. As a result, the system supports various infrastructures such as application databases, data warehouses, and more complex setups like data lakes.
Different data sources require varying amounts of metadata. Each source is uniquely identified by a key, which determines how its metadata should be processed. For example, a CSV source may need a separator symbol and compression format, whereas a PostgreSQL source requires a table name and, optionally, a schema name. This approach ensures flexibility while maintaining detailed control.
Setting Data Sources
In most cases, a data source is associated with either a feature_view
or a model_contract
. However, these sources can also be updated dynamically using the .update_source_for(...)
function.
store = await ContractStore.from_glob("**/contract.py")
store = store.update_source_for(
Titanic,
InMemorySource.from_values({
"passenger_id": [1, 2, 3],
"age": [1, 2, 3],
...
})
)
Aligned currently supports the following data sources:
- CSV
- Parquet
- Hive Paritioned Parquet
- Schema Versioned
- In Memory Source
- Random Source
- Azure Blog Storage
- AWS Redshift
- AWS S3
- PostgreSQL
- Feature View
- Custom Source
CSV File
from aligned import FileSource
taxi_file = FileSource.csv_at("taxi_data.csv")
CSV Config
CSV files come in many variations. To accommodate these differences, you can use a CsvConfig
to specify the separator and compression format.
from aligned import FileSource
from aligned.sources.local import CsvConfig
taxi_file_config = CsvConfig(
seperator=";",
compression="gzip"
)
compressed_taxi_data = FileSource.csv_at(
"gzip_taxi_data.csv",
mapping_keys={
"ID": "taxi_id",
"Duration": "duration",
},
csv_config=taxi_file_config
)
Parquet File
from aligned import FileSource
taxi_file = FileSource.parquet_at("taxi_data.parquet")
Parquet Config
from aligned import FileSource
from aligned.sources.local import ParquetConfig
taxi_file_config = ParquetConfig(
compression="gzip"
)
compressed_taxi_data = FileSource.parquet_at(
"gzip_taxi_data.parquet",
mapping_keys={
"ID": "taxi_id",
"Duration": "duration",
},
config=taxi_file_config
)
Hive Partitioned Parquet
When data is segmented based on certain attributes, a Hive partitioned source can significantly improve filtering performance and reduce memory usage.
from aligned import FileSource
partitioned_credits = FileSource.partitioned_parquet_at(
directory="credit_scoring",
partition_keys=["country"],
mapping_keys={ ... },
config=None, # A parquet config
date_formatter=None
)
Example partitioned file structure:
credit_scoring/country=norway/data.parquet
credit_scoring/country=danmark/data.parquet
credit_scoring/country=uk/data.parquet
Schema Versioned Source
Managing data schema changes is crucial, especially when running A/B tests or canary deployments. Aligned offers a schema versioned source that dynamically adapts based on data requirements, enabling backward compatibility while supporting multiple versions simultaneously.
taxi_dir = FileSource.directory("taxi_data")
vendors_data = taxi_dir.with_schema_version(
sub_directory="vendors"
).parquet_at("data.parquet")
@feature_view(source=vendors_data)
class VendorSchema:
vendor_id = Int32().as_entity()
number_of_passengers = String()
Example file path:
taxi_data/vendors/26c65192dc9a74910a845d03ae05d4d2/data.parquet
Derived features
Adding new derived features and computations will not effect the schema version.
In Memory Source
An in-memory source is useful for caching or hardcoding data for specific scenarios.
from aligned.sources.in_memory_source import InMemorySource
# Empty in-memory source (for future inserts, upserts, or overwrites)
empty_source = InMemorySource.empty()
# Hardcoded values (must match full schema)
with_values = InMemorySource.from_values({
"passenger_id": [1, 2, 3],
"age": [1, 2, 3],
...
})
# Using a Polars DataFrame
with_polars_df = InMemorySource(pl.DataFrame(...))
Random Source
Validating ML models can be challenging due to data dependencies. A random source allows you to test your pipeline by generating synthetic yet structured data based on constraints.
from aligned.sources.random_source import RandomDataSource
@feature_view(
source=RandomDataSource.with_values({
"passenger_id": [1, 2, 3],
"age": [10, 0.5, 90]
}, seed=1)
)
class TitanicPassenger:
passenger_id = Int32().as_entity()
age = Float()
sibsp = Int32().lower_bound(0).upper_bound(68).description(
"Fun fact: the world record for most born children is 69!"
)
has_siblings = sibsp > 0
sex = String().accepted_values(["male", "female"])
is_male, is_female = sex.one_hot_encode(['male', 'female'])
survived = Bool()
df = await TitanicPassenger.query().features_for({
"passenger_id": [1, 2]
}).to_polars()
print(df)
Which returns the following:
shape: (2, 8)
┌──────────────┬─────────┬───────────┬──────────────┬──────────┬──────┬───────┬────────┐
│ has_siblings ┆ is_male ┆ is_female ┆ passenger_id ┆ survived ┆ age ┆ sibsp ┆ sex │
│ --- ┆ --- ┆ --- ┆ --- ┆ --- ┆ --- ┆ --- ┆ --- │
│ bool ┆ bool ┆ bool ┆ i32 ┆ bool ┆ f64 ┆ i32 ┆ str │
╞══════════════╪═════════╪═══════════╪══════════════╪══════════╪══════╪═══════╪════════╡
│ true ┆ false ┆ true ┆ 1 ┆ true ┆ 10.0 ┆ 66 ┆ female │
│ true ┆ true ┆ false ┆ 2 ┆ false ┆ 0.5 ┆ 18 ┆ male │
└──────────────┴─────────┴───────────┴──────────────┴──────────┴──────┴───────┴────────┘
You also have a .dummy_store()
on the ContractStore
which will update all sources to the RandomDataSource
.
store = await ContractStore.from_glob("**/contract.py")
dummy_store = store.dummy_store()
Azure Blob Storage
External storage solutions like Azure Blob Storage are supported.
The Azure config works similar to a Directory
. So all file sources should be available. E.g. CSV, Parquet, partitioned Parquet and the schema versioned source.
from aligned.sources.azure_blob_storage import AzureBlobConfig
config = AzureBlobConfig(
account_id_env="...",
tenant_id_env="...",
client_id_env="...",
client_secret_env="...",
account_name_env="..."
)
taxi_data = config.parquet_at("taxi_data.parquet")
AWS Redshift
from aligned import RedshiftSQLConfig
database = RedshiftSQLConfig(env_var='REDSHIFT_DB_URL')
taxi_data = database.table("taxi")
Renaming Columns in Redshift
from aligned import RedshiftSQLConfig
database = RedshiftSQLConfig(env_var='REDSHIFT_DB_URL')
with_renames = database.table(
name="taxi",
mapping_values={
"ID": "taxi_id",
"Duration": "duration"
}
)
AWS S3
from aligned import AwsConfig
bucket = AwsS3Config(
access_token_env="AWS_ACCESS_TOKEN",
secret_token_env="AWS_SECRET_TOKEN",
bucket_env="AWS_BUCKET",
region_env="AWS_REGION"
)
taxi_parquet = bucket.parquet_at("taxi.parquet")
taxi_csv = bucket.csv_at("taxi.csv")
PostgreSQL
from aligned import PostgreSQLConfig
database = PostgreSQLConfig(env_var='PSQL_DB_URL')
taxi_data = database.table("taxi")
Renaming Columns in PostgreSQL
There is often not a one to one mapping between data and the "wanted" structure. Therefore, bellow is a way to rename columns.
from aligned import PostgreSQLConfig
database = PostgreSQLConfig(env_var='PSQL_DB_URL')
taxi_data = database.table(
name="taxi",
mapping_values={
"ID": "taxi_id",
"Duration": "duration"
}
)
Feature View
A feature view can be used as a data source, allowing for lineage tracking and dependency management.
@feature_view(
name="zipcode",
source=FileSource.parquet_at("...")
)
class Zipcode:
zipcode = Int64().as_entity()
event_timestamp = EventTimestamp()
created_timestamp = Timestamp()
location_type = String()
population = Int64()
@feature_view(
name="other",
source=Zipcode
)
class Other:
location_type = String().as_entity()
population = Int64()
event_timestamp = EventTimestamp()
Schema Structure
When using a feature view as a source. Make sure that the schema is matching in the down stream view. There is currently poor support for makeing sure you uphold the schama. However, this will be improved, as we can check that the schema would be upheld.
Custom Source
For custom logic, such as fetching data from an API, you can implement a CustomMethodDataSource.
from aligned import CustomMethodDataSource
from aligned.request.retrieval_request import RetrievalRequest
import polars as pl
def load_all_data(request: RetrievalRequest, limit: int | None) -> pl.LazyFrame:
# My custom method
return pl.DataFrame(...).lazy()
custom_source = CustomMethodDataSource.from_methods(
all_data=load_all_data,
all_between_dates=None,
features_for=None,
docker_config="my-custom-image:latest"
)