Core concepts

Data Source

A data source represents any form of data, including database tables, key-value stores, files, or structured folders. As a result, the system supports various infrastructures such as application databases, data warehouses, and more complex setups like data lakes.

Different data sources require varying amounts of metadata. Each source is uniquely identified by a key, which determines how its metadata should be processed. For example, a CSV source may need a separator symbol and compression format, whereas a PostgreSQL source requires a table name and, optionally, a schema name. This approach ensures flexibility while maintaining detailed control.

Setting Data Sources

In most cases, a data source is associated with either a feature_view or a model_contract. However, these sources can also be updated dynamically using the .update_source_for(...) function.

store = await ContractStore.from_glob("**/contract.py")
store = store.update_source_for(
    Titanic, 
    InMemorySource.from_values({
        "passenger_id": [1, 2, 3],
        "age": [1, 2, 3],
        ...
    })
)

Aligned currently supports the following data sources:

  • CSV
  • Parquet
  • Hive Paritioned Parquet
  • Schema Versioned
  • In Memory Source
  • Random Source
  • Azure Blog Storage
  • AWS Redshift
  • AWS S3
  • PostgreSQL
  • Feature View
  • Custom Source

CSV File

from aligned import FileSource

taxi_file = FileSource.csv_at("taxi_data.csv")

CSV Config

CSV files come in many variations. To accommodate these differences, you can use a CsvConfig to specify the separator and compression format.

from aligned import FileSource
from aligned.sources.local import CsvConfig

taxi_file_config = CsvConfig(
    seperator=";",
    compression="gzip"
)
compressed_taxi_data = FileSource.csv_at(
    "gzip_taxi_data.csv",
    mapping_keys={
        "ID": "taxi_id",
        "Duration": "duration",
    },
    csv_config=taxi_file_config
)

Parquet File

from aligned import FileSource

taxi_file = FileSource.parquet_at("taxi_data.parquet")

Parquet Config

from aligned import FileSource
from aligned.sources.local import ParquetConfig

taxi_file_config = ParquetConfig(
    compression="gzip"
)
compressed_taxi_data = FileSource.parquet_at(
    "gzip_taxi_data.parquet",
    mapping_keys={
        "ID": "taxi_id",
        "Duration": "duration",
    },
    config=taxi_file_config
)

Hive Partitioned Parquet

When data is segmented based on certain attributes, a Hive partitioned source can significantly improve filtering performance and reduce memory usage.

from aligned import FileSource

partitioned_credits = FileSource.partitioned_parquet_at(
    directory="credit_scoring",
    partition_keys=["country"],
    mapping_keys={ ... },
    config=None, # A parquet config
    date_formatter=None
)

Example partitioned file structure:

credit_scoring/country=norway/data.parquet
credit_scoring/country=danmark/data.parquet
credit_scoring/country=uk/data.parquet

Schema Versioned Source

Managing data schema changes is crucial, especially when running A/B tests or canary deployments. Aligned offers a schema versioned source that dynamically adapts based on data requirements, enabling backward compatibility while supporting multiple versions simultaneously.

taxi_dir = FileSource.directory("taxi_data")

vendors_data = taxi_dir.with_schema_version(
    sub_directory="vendors"
).parquet_at("data.parquet")

@feature_view(source=vendors_data) 
class VendorSchema:
    vendor_id = Int32().as_entity()
    number_of_passengers = String()

Example file path:

taxi_data/vendors/26c65192dc9a74910a845d03ae05d4d2/data.parquet

Derived features

Adding new derived features and computations will not effect the schema version.

In Memory Source

An in-memory source is useful for caching or hardcoding data for specific scenarios.

from aligned.sources.in_memory_source import InMemorySource

# Empty in-memory source (for future inserts, upserts, or overwrites)
empty_source = InMemorySource.empty()

# Hardcoded values (must match full schema)
with_values = InMemorySource.from_values({
    "passenger_id": [1, 2, 3],
    "age": [1, 2, 3],
    ...
})

# Using a Polars DataFrame
with_polars_df = InMemorySource(pl.DataFrame(...))

Random Source

Validating ML models can be challenging due to data dependencies. A random source allows you to test your pipeline by generating synthetic yet structured data based on constraints.

from aligned.sources.random_source import RandomDataSource

@feature_view(
    source=RandomDataSource.with_values({
        "passenger_id": [1, 2, 3],
        "age": [10, 0.5, 90]
    }, seed=1)
)
class TitanicPassenger:
    passenger_id = Int32().as_entity()

    age = Float()
    sibsp = Int32().lower_bound(0).upper_bound(68).description(
        "Fun fact: the world record for most born children is 69!"
    )
    has_siblings = sibsp > 0
    
    sex = String().accepted_values(["male", "female"])
    is_male, is_female = sex.one_hot_encode(['male', 'female'])

    survived = Bool()

df = await TitanicPassenger.query().features_for({
    "passenger_id": [1, 2]
}).to_polars()
print(df)

Which returns the following:

shape: (2, 8)
┌──────────────┬─────────┬───────────┬──────────────┬──────────┬──────┬───────┬────────┐
│ has_siblings ┆ is_male ┆ is_female ┆ passenger_id ┆ survived ┆ age  ┆ sibsp ┆ sex    │
│ ---          ┆ ---     ┆ ---       ┆ ---          ┆ ---      ┆ ---  ┆ ---   ┆ ---    │
│ bool         ┆ bool    ┆ bool      ┆ i32          ┆ bool     ┆ f64  ┆ i32   ┆ str    │
╞══════════════╪═════════╪═══════════╪══════════════╪══════════╪══════╪═══════╪════════╡
│ true         ┆ false   ┆ true      ┆ 1            ┆ true     ┆ 10.0 ┆ 66    ┆ female │
│ true         ┆ true    ┆ false     ┆ 2            ┆ false    ┆ 0.5  ┆ 18    ┆ male   │
└──────────────┴─────────┴───────────┴──────────────┴──────────┴──────┴───────┴────────┘

You also have a .dummy_store() on the ContractStore which will update all sources to the RandomDataSource.

store = await ContractStore.from_glob("**/contract.py")
dummy_store = store.dummy_store()

Azure Blob Storage

External storage solutions like Azure Blob Storage are supported.

The Azure config works similar to a Directory. So all file sources should be available. E.g. CSV, Parquet, partitioned Parquet and the schema versioned source.

from aligned.sources.azure_blob_storage import AzureBlobConfig
config = AzureBlobConfig(
    account_id_env="...",
    tenant_id_env="...",
    client_id_env="...",
    client_secret_env="...",
    account_name_env="..."
)
taxi_data = config.parquet_at("taxi_data.parquet")

AWS Redshift

from aligned import RedshiftSQLConfig

database = RedshiftSQLConfig(env_var='REDSHIFT_DB_URL')
taxi_data = database.table("taxi")

Renaming Columns in Redshift

from aligned import RedshiftSQLConfig

database = RedshiftSQLConfig(env_var='REDSHIFT_DB_URL')
with_renames = database.table(
    name="taxi", 
    mapping_values={
        "ID": "taxi_id",
        "Duration": "duration"
    }
)

AWS S3

from aligned import AwsConfig

bucket = AwsS3Config(
    access_token_env="AWS_ACCESS_TOKEN",
    secret_token_env="AWS_SECRET_TOKEN",
    bucket_env="AWS_BUCKET",
    region_env="AWS_REGION"
)
taxi_parquet = bucket.parquet_at("taxi.parquet")
taxi_csv = bucket.csv_at("taxi.csv")

PostgreSQL

from aligned import PostgreSQLConfig

database = PostgreSQLConfig(env_var='PSQL_DB_URL')
taxi_data = database.table("taxi")

Renaming Columns in PostgreSQL

There is often not a one to one mapping between data and the "wanted" structure. Therefore, bellow is a way to rename columns.

from aligned import PostgreSQLConfig

database = PostgreSQLConfig(env_var='PSQL_DB_URL')
taxi_data = database.table(
    name="taxi", 
    mapping_values={
        "ID": "taxi_id",
        "Duration": "duration"
    }
)

Feature View

A feature view can be used as a data source, allowing for lineage tracking and dependency management.

@feature_view(
    name="zipcode",
    source=FileSource.parquet_at("...")
)
class Zipcode:
    zipcode = Int64().as_entity()

    event_timestamp = EventTimestamp()
    created_timestamp = Timestamp()

    location_type = String()
    population = Int64()



@feature_view(
    name="other",
    source=Zipcode
)
class Other:
    location_type = String().as_entity()

    population = Int64()
    event_timestamp = EventTimestamp()

Schema Structure

When using a feature view as a source. Make sure that the schema is matching in the down stream view. There is currently poor support for makeing sure you uphold the schama. However, this will be improved, as we can check that the schema would be upheld.

Custom Source

For custom logic, such as fetching data from an API, you can implement a CustomMethodDataSource.

from aligned import CustomMethodDataSource
from aligned.request.retrieval_request import RetrievalRequest
import polars as pl

def load_all_data(request: RetrievalRequest, limit: int | None) -> pl.LazyFrame:
    # My custom method
    return pl.DataFrame(...).lazy()

custom_source = CustomMethodDataSource.from_methods(
    all_data=load_all_data,
    all_between_dates=None,
    features_for=None,
    docker_config="my-custom-image:latest"
)
Previous
Getting started