Core concepts

Data Source

A data source describes any form of data, ranging from a database table, key-value store, file, or structured folder. Consequently, the system can support existing infrastructures such as application databases, data warehouses, as well as more complex infrastructures like data lakes.

Furthermore, data sources will vary in the amount of needed metadata. Therefore, each data source is represented with a unique key enabling us to understand which metadata to decode. For instance, a CSV data source can contain a separator symbol and a compression format. However, a PostgreSQL source needs a table name, and potentially a schema name. Hence, enabling the user to be flexible, yet detailed.

Setting Data Sources

Most often will a source be associated with either a feature_view or a model_contract. However, it is also possible to update these on the fly with the .update_source_for(...) function.

store: ContractStore = await ContractStore.from_glob("**/contract.py")
store = store.update_source_for(
    "feature_view:titanic", 
    InMemorySource.from_values({
        "passenger_id": [1, 2, 3],
        "age": [1, 2, 3],
        ...
    })
)

There are currently support for the following data sources:

  • CSV
  • Parquet
  • Hive Paritioned Parquet
  • Schema Versioned
  • In Memory Source
  • Random Source
  • Azure Blog Storage
  • AWS Redshift
  • AWS S3
  • PostgreSQL
  • Feature View
  • Custom Source

CSV File

from aligned import FileSource

taxi_file = FileSource.csv_at("taxi_data.csv")

CSV Config

There are a lot of differnet variations of CSV files. That is why it is possible to define a CsvConfig containting which seperator, and compressions to use.

from aligned import FileSource
from aligned.sources.local import CsvConfig

taxi_file_config = CsvConfig(
    seperator=";",
    compression="gzip"
)
compressed_taxi_data = FileSource.csv_at(
    "gzip_taxi_data.csv",
    mapping_keys={
        "ID": "taxi_id",
        "Duration": "duration",
    },
    csv_config=taxi_file_config
)

Parquet File

There are a lot of differnet variations of Parquet files. That is why it is possible to define a ParquetConfig containting which seperator, and compressions to use.

from aligned import FileSource

taxi_file = FileSource.parquet_at("taxi_data.parquet")

Parquet Config

from aligned import FileSource
from aligned.sources.local import ParquetConfig

taxi_file_config = ParquetConfig(
    compression="gzip"
)
compressed_taxi_data = FileSource.parquet_at(
    "gzip_taxi_data.parquet",
    mapping_keys={
        "ID": "taxi_id",
        "Duration": "duration",
    },
    config=taxi_file_config
)

Hive Partitioned Parquet

Often we have data that can be split into different segments based on their data. This is why aligned offer a hive partitioned data source, which can signficantly improve filtering performance, and reduce memory usage in certain scenarios.

from aligned import FileSource

partitioned_credits = FileSource.partitioned_parquet_at(
    directory="credit_scoring",
    partition_keys=["country"],
    mapping_keys={ ... },
    config=None, # A parquet config
    date_formatter=None
)

This could partition the data with a file structure as:

credit_scoring/country=norway/data.parquet
credit_scoring/country=danmark/data.parquet
credit_scoring/country=uk/data.parquet

Schema Versioned Source

When deploying new versions of our programs is it highly likely that there will be a conflict in data sources at some point.

This can especially become a problem when deploying AB tests, or canarys. As we might need to run conflicting versions in production at the same time.

That is why aligned offer a data source that changes the source based on the data requirments. Therefore, enabling to reuse backwards complient sources, while still maintaining multiple versions at the same time with ease.

taxi_dir = FileSource.directory("taxi_data")

vendors_data = taxi_dir.with_schema_version(
    sub_directory="vendors"
).parquet_at("data.parquet")

@feature_view(source=vendors_data) 
class VendorSchema:
    vendor_id = Int32().as_entity()
    number_of_passengers = String()

This would read from the following file path:

taxi_data/vendors/26c65192dc9a74910a845d03ae05d4d2/data.parquet

However, if we want to change the data type from a string to an int, or maybe remove a column would it break.

So if a new code version looked like the folowing

# Change the data type to an Int
@feature_view(source=vendors_data) 
class VendorSchema:
    vendor_id = Int32().as_entity()
    number_of_passengers = Int32()

Would it read from the path:

taxi_data/vendors/2d05c032e9a7e6854f42b84cd6764092/data.parquet

Derived features

Adding new derived features and computations will not effect the schema version.

In Memory Source

Sometimes will it be nice to have an in memory cache, or hard-code some data for a spesific schenario. This is where the in memory source come into play.

from aligned.sources.in_memory_source import InMemorySource

# Assumes that there will be an insert, upsert or overwrite in the future
empty_source = InMemorySource.empty()

# Hard code values, but needs to contain the full schema
with_values = InMemorySource.from_values({
    "passenger_id": [1, 2, 3],
    "age": [1, 2, 3],
    ...
})

# Or send in a raw polars df
with_polars_df = InMemorySource(pl.DataFrame(...))

Random Source

Quality controlling our ML systems can be very hard as they depend so much on our data. However, sometimes we just want to know that our code runs, and that they do what we want given a few data scenarios. Thankfully the random source makes it easy to fabricate scenarios that our systems should get right.

The random source will leverate the data schema, and it's constraint to generate data points that should be somewhat in the expected distribution. Yet still enable you to hard-code scenarios if needed.

The source will also make sure to keep all computed features consistent with the the raw data.

from aligned.sources.random_source import RandomDataSource

@feature_view(
    source=RandomDataSource.with_values({
        "passenger_id": [1, 2, 3],
        "age": [10, 0.5, 90]
    }, seed=1)
)
class TitanicPassenger:
    passenger_id = Int32().as_entity()

    age = Float()
    sibsp = Int32().lower_bound(0).upper_bound(68).description(
        "Fun fact: the world record for most born children is 69!"
    )
    has_siblings = sibsp > 0
    
    sex = String().accepted_values(["male", "female"])
    is_male, is_female = sex.one_hot_encode(['male', 'female'])

    survived = Bool()

df = await TitanicPassenger.query().features_for({
    "passenger_id": [1, 2]
}).to_polars()
print(df)

Which returns the following:

shape: (2, 8)
┌──────────────┬─────────┬───────────┬──────────────┬──────────┬──────┬───────┬────────┐
│ has_siblings ┆ is_male ┆ is_female ┆ passenger_id ┆ survived ┆ age  ┆ sibsp ┆ sex    │
│ ---          ┆ ---     ┆ ---       ┆ ---          ┆ ---      ┆ ---  ┆ ---   ┆ ---    │
│ bool         ┆ bool    ┆ bool      ┆ i32          ┆ bool     ┆ f64  ┆ i32   ┆ str    │
╞══════════════╪═════════╪═══════════╪══════════════╪══════════╪══════╪═══════╪════════╡
│ true         ┆ false   ┆ true      ┆ 1            ┆ true     ┆ 10.0 ┆ 66    ┆ female │
│ true         ┆ true    ┆ false     ┆ 2            ┆ false    ┆ 0.5  ┆ 18    ┆ male   │
└──────────────┴─────────┴───────────┴──────────────┴──────────┴──────┴───────┴────────┘

You also have a .dummy_store() on the ContractStore which will update all sources to the RandomDataSource.

store = await ContractStore.from_glob("**/contract.py")
dummy_store = store.dummy_store()

Azure Blob Storage

There is also support for using external storage solutions like Azure Blob Storage.

The Azure config works similar to a Directory. So all file sources should be available. Aka. csv, parquet, partitioned parquet and the schema versioned source.

from aligned.sources.azure_blob_storage import AzureBlobConfig
config = AzureBlobConfig(
    account_id_env="...",
    tenant_id_env="...",
    client_id_env="...",
    client_secret_env="...",
    account_name_env="..."
)
taxi_data = config.parquet_at("taxi_data.parquet")

AWS Redshift

from aligned import RedshiftSQLConfig

database = RedshiftSQLConfig(env_var='REDSHIFT_DB_URL')
taxi_data = database.table("taxi")

Redshift Renaming

There is often not a one to one mapping between data and the "wanted" structure. Therefore, bellow is a way to rename columns.

from aligned import RedshiftSQLConfig

database = RedshiftSQLConfig(env_var='REDSHIFT_DB_URL')
with_renames = database.table(
    name="taxi", 
    mapping_values={
        "ID": "taxi_id",
        "Duration": "duration"
    }
)

AWS S3

from aligned import AwsConfig

bucket = AwsS3Config(
    access_token_env="AWS_ACCESS_TOKEN",
    secret_token_env="AWS_SECRET_TOKEN",
    bucket_env="AWS_BUCKET",
    region_env="AWS_REGION"
)
taxi_parquet = bucket.parquet_at("taxi.parquet")
taxi_csv = bucket.csv_at("taxi.csv")

PostgreSQL

from aligned import PostgreSQLConfig

database = PostgreSQLConfig(env_var='PSQL_DB_URL')
taxi_data = database.table("taxi")

PostgreSQL Renaming

There is often not a one to one mapping between data and the "wanted" structure. Therefore, bellow is a way to rename columns.

from aligned import PostgreSQLConfig

database = PostgreSQLConfig(env_var='PSQL_DB_URL')
taxi_data = database.table(
    name="taxi", 
    mapping_values={
        "ID": "taxi_id",
        "Duration": "duration"
    }
)

Feature View

You can also use a feature view as a data source.

Feature View Lineage

Using a feature view as the source for another will collect data lineage information. Therefore, aligned knows which sources depend on each other, which could be useful to keeping materialized sources up to date

@feature_view(
    name="zipcode",
    source=FileSource.parquet_at("...")
)
class Zipcode:
    zipcode = Int64().as_entity()

    event_timestamp = EventTimestamp()
    created_timestamp = Timestamp()

    location_type = String()
    population = Int64()



@feature_view(
    name="other",
    source=Zipcode
)
class Other:
    location_type = String().as_entity()

    population = Int64()
    event_timestamp = EventTimestamp()

Schema Structure

When using a feature view as a source. Make sure that the schema is matching in the down stream view. There is currently poor support for makeing sure you uphold the schama. However, this will be improved, as we can check that the schema would be upheld.

Custom Source

In some cases can it be nice to integrate with custom logic. Maybe you want to add a way to load data from an API or simulate data.

In this scenario do aligned support a custom data source where you can provide the functionality.

Here will you always get a RetrivalRequest structure. That contains which entities, and features are expected, and some other information like event timestamps etc.

from aligned import CustomMethodDataSource
from aligned.request.retrival_request import RetrivalRequest
import polars as pl

def load_all_data(request: RetrivalRequest, limit: int | None) -> pl.LazyFrame:
    # My custom method
    return pl.DataFrame(...).lazy()

custom_source = CustomMethodDataSource.from_methods(
    all_data=load_all_data,
    all_between_dates=None,
    features_for=None
)
Previous
Idiology