Streaming Source Ingestion and Feature Definition

Feathr supports defining features from a stream source (for example Kafka) with transformations, and sink the features into an online store (such as Redis). This is very useful if you need up-to-date features for online store, for example when user clicks on the website, that web log event is usually sent to Kafka, and data scientists might need some features immediately, such as the browser used in this particular event. The steps are as below:

Define Kafka streaming input source

Currently only Avro format is supported, so users need to specify AvroJsonSchema as below:

# Define input data schema
schema = AvroJsonSchema(schemaStr="""
{
    "type": "record",
    "name": "DriverTrips",
    "fields": [
        {"name": "driver_id", "type": "long"},
        {"name": "trips_today", "type": "int"},
        {
        "name": "datetime",
        "type": {"type": "long", "logicalType": "timestamp-micros"}
        }
    ]
}
""")
stream_source = KafKaSource(name="kafkaStreamingSource",
                            kafkaConfig=KafkaConfig(brokers=["feathrazureci.servicebus.windows.net:9093"],
                                                    topics=["feathrcieventhub"],
                                                    schema=schema)
                            )

You may need to produce data and send them into Kafka as this data source in advance. Please check Kafka data source producer as a reference. Also you should keep this producer running which means there are data stream keep coming into Kafka while calling the ‘materialize_features’ below.

Define feature definition with the Kafka source

You can then define features. They are mostly the same with the regular feature definition.

Note that for the transform part, only row level transformation is allowed in streaming anchor at the moment, i.e. the transformations listed in Spark SQL Built-in Functions are supported. Users can also define customized Spark SQL functions.

For example, you can specify to do a row-level transformation like trips_today + randn() * cos(trips_today) for your input data.

driver_id = TypedKey(key_column="driver_id",
                     key_column_type=ValueType.INT64,
                     description="driver id",
                     full_name="nyc driver id")

kafkaAnchor = FeatureAnchor(name="kafkaAnchor",
                            source=stream_source,
                            features=[Feature(name="f_modified_streaming_count",
                                              feature_type=INT32,
                                              transform="trips_today + 1",
                                              key=driver_id),
                                      Feature(name="f_modified_streaming_count2",
                                              feature_type=INT32,
                                              transform="trips_today + randn() * cos(trips_today)",
                                              key=driver_id)]
                            )

Start streaming job

You can then start a streaming job and stream all the features into online store, such as Redis:

redisSink = RedisSink(table_name="kafkaSampleDemoFeature", streaming=True, streamingTimeoutMs=10000)
# The 'streamingTimeoutMs' parameter may need to be increased, such as 10min or even longer, to make sure some data sources from Kafka was captured.
settings = MaterializationSettings(name="kafkaSampleDemo",
                                   sinks=[redisSink],
                                   feature_names=['f_modified_streaming_count']
                                   )
client.materialize_features(settings) # Will streaming for 10 seconds since streamingTimeoutMs is 10000

Fetch streaming feature values

And finally get the relevant features from online store:

res = client.get_online_features('kafkaSampleDemoFeature', '1',['f_modified_streaming_count'])
# Get features for multiple feature keys
res = client.multi_get_online_features('kafkaSampleDemoFeature', ['1', '2'], ['f_modified_streaming_count'])

You can also refer to the test case for more details.

Kafka configuration

Please refer to the Feathr Configuration Doc for more details on the credentials.

Event Hub monitoring

If you feel something is wrong, you can check the monitor panel on your ‘Event Hub’ overview page while running the Feathr materialization job, to make sure there are both incoming and outgoing messages, like the graph below. Otherwise, you may not get anything from get_online_features() since the source is empty.

Kafka Monitor Page