Using SQL databases and CosmosDb with Feathr
Feathr supports using SQL databases as source or offline store, also supports using CosmosDb as online store.
Using SQL databases as source
To use SQL database as source, we need to create JdbcSource instead of HdfsSource in projects.
A JdbcSource can be created with follow statement:
source = feathr.JdbcSource(name, url, dbtable, query, auth)
nameis the source name, same as other sources.urlis the database URL in JDBC format.dbtableorquery,dbtableis the table name in the database andqueryis a SQLSELECTstatement, only one of them should be specified at the same time.authcan beNone,'USERPASS'or'TOKEN'.
There are some other parameters such as preprocessing, they’re same as other sources like HdfsSource.
After creating the JdbcSource, you can use it in the same way as other kinds of sources.
Auth
When the auth parameter is omitted or set to None, Feathr assumes that the database doesn’t need any authentication.
When the auth parameter is set to USERPASS, you need to set following environment variables before submitting job:
-
name_USER, the
nameis the source name and the value of this variable is the user name to log in to the database. -
name_PASSWORD, the
nameis the source name and the value of this variable is the password to log in to the database.
When the auth parameter is set to TOKEN, you need to set following environment variables before submitting job:
- name_TOKEN, used the
nameis the source name and the value of this variable is the token to log in to the database, currently only Azure SQL database supports this auth type.
I.e., if you created a source:
src1_name="source1"
source1 = JdbcSource(name=src1_name, url="jdbc:...", dbtable="table1", auth="USERPASS")
anchor1 = FeatureAnchor(name="anchor_name",
source=source1,
features=[some_features, some_other_features])
You need to set 2 environment variables before submitting jobs:
os.environ[f"{src1_name.upper()}_USER"] = "some_user_name"
os.environ[f"{src1_name.upper()}_PASSWORD"] = "some_magic_word"
client.build_features(anchor_list=[anchor1, ...])
client.get_offline_features(...)
These values will be automatically passed to the Feathr core when submitting the job.
If you want to use token, the code will be like this: Step 1: Define the source JdbcSource
src_name="source_name"
source = JdbcSource(name=src_name, url="jdbc:...", dbtable="table_name", auth="TOKEN")
anchor = FeatureAnchor(name="anchor_name",
source=source,
features=[some_features, some_other_features])
Step 2: Set the environment variable before submitting the job
os.environ[f"{src_name.upper()}_TOKEN"] = "some_token"
To enable Azure AD authentication in Azure SQL database, please refer to this document.
There are several ways to obtain Azure AD access token, please refer to this document for more details.
If you want to leverage existing credential in python client, you could try:
from azure.identity import DefaultAzureCredential
credential = DefaultAzureCredential()
token = credential.get_token("https://management.azure.com/.default").token()
Using SQL database as the offline store
To use SQL database as the offline store, you can use JdbcSink as the output_path parameter of FeathrClient.get_offline_features, e.g.:
name = 'output'
sink = client.JdbcSink(name, some_jdbc_url, dbtable, "USERPASS")
Then you need to set following environment variables before submitting job:
os.environ[f"{name.upper()}_USER"] = "some_user_name"
os.environ[f"{name.upper()}_PASSWORD"] = "some_magic_word"
client.get_offline_features(..., output_path=sink)
“TOKEN” auth type is also supported in JdbcSink.
Using SQL database as the online store
Same as the offline, create JDBC sink and add it to the MaterializationSettings, set corresponding environment variables, then use it with FeathrClient.materialize_features.
Using CosmosDb as the online store
To use CosmosDb as the online store, create CosmosDbSink and add it to the MaterializationSettings, then use it with FeathrClient.materialize_features, e.g..
name = 'cosmosdb_output'
sink = CosmosDbSink(name, some_cosmosdb_url, some_cosmosdb_database, some_cosmosdb_collection)
os.environ[f"{name.upper()}_KEY"] = "some_cosmosdb_api_key"
client.materialize_features(..., materialization_settings=MaterializationSettings(..., sinks=[sink]))
Feathr client doesn’t support getting feature values from CosmosDb, you need to use official CosmosDb client to get the values:
from azure.cosmos import exceptions, CosmosClient, PartitionKey
client = CosmosClient(some_cosmosdb_url, some_cosmosdb_api_key)
db_client = client.get_database_client(some_cosmosdb_database)
container_client = db_client.get_container_client(some_cosmosdb_collection)
doc = container_client.read_item(some_key)
feature_value = doc['feature_name']
Note on ClassNotFound errors when using CosmosDB
There is currently a known issue with using azure.cosmos.spark package on Databricks cluster. Somehow this dependency is not resolving correctly on Databricks when packaged through gradle and results in ClassNotFound error. To mitigate this issue, please install the azure.cosmos.spark package directly from maven central following the steps here