Introduction#
In streaming data processing, one of the most common requirements is enriching with reference data. For instance, adding customer information to transactions, product details to orders, or feature flags to application logs.
Recently, I faced a scenario where I needed to enrich a Flink stream with infrequent reference data provided by a web API endpoint. The data was small and didn't change often, making it ideal for a caching approach rather than performing lookups for each record.
Flink and Lookup Caching#
After researching Apache Flink's documentation, I stumble on FLIP-221, which discusses abstraction for lookup source caching. It outlines two main caching strategies:
Partial Caching: Data is loaded into the cache on-demand. When a key is looked up, if it's not in the cache, the system fetches it from the external source and stores it for future use. This approach uses an eviction policy to manage the cache size.
Full Caching: When the reference data is small enough to fit in memory and doesn't change frequently, it's more efficient to load all entries into the cache at once. This approach completely eliminates network I/O during stream processing and refreshes the entire cache periodically.
My requirements aligned perfectly with the Full Caching approach:
- The stream could work with slightly stale data until the next cache refresh
- Processing needed to continue uninterrupted during cache refresh retries
- The lookup data was small (feature flags, configuration settings)
- The data changed infrequently
While I found several connectors implementing the partial caching strategy (like GetInData's HTTP connector), I couldn't find a proper implementation of the full caching approach for HTTP APIs. So I decided to build one.
Implementing the HTTP Full Cache Connector#
The connector I created has three main components:
- HttpLookupTableSource: The main entry point that implements Flink's LookupTableSource interface and uses FullCachingLookupProvider to handle the caching strategy.
- HttpLookupTableSourceFactory: Handles configuration options and creates instances of the table source with appropriate parameters.
- HttpRowDataInputFormat: Handles the actual HTTP requests, JSON parsing, and data transformation needed to fetch and process the reference data.
The connector provides several key features:
- Periodic Cache Refresh: Automatically refresh the cache at configurable intervals
- Resilient HTTP Handling: Built-in retry mechanism with configurable delays and timeouts
- JSON Processing: Extract specific data using JSON pointers from complex API responses
- Full Type Support: Compatible with Flink's row data types for seamless integration
- Atomic Updates: Cache is updated atomically to prevent inconsistent data
Using the Connector#
To use the connector in a Flink SQL job, you can define a table like this:
CREATE TABLE user_category_flags (
id INT,
is_cost_price_mode_v2 BOOLEAN,
is_free_shipping BOOLEAN
) WITH (
'connector' = 'http-lookup-full-cache',
'url' = 'http://api.example.com/users_categories_flag',
'xpath' = 'flags_list',
'cache.refresh-interval' = 'PT1M'
);
Here's an example of using the HTTP Full Cache table to enrich a transaction stream and calculate different amounts based on feature flags:
SELECT
t.transaction_id,
t.user_id,
t.user_category,
t.product_id,
t.amount,
-- Apply different shipping calculation based on the feature flag
CASE
WHEN f.is_free_shipping THEN t.amount
ELSE t.amount + t.shipping_cost
END AS total_amount,
-- Apply different pricing model based on the feature flag
CASE
WHEN f.is_cost_price_mode_v2 THEN
t.amount * 0.9 -- 10% discount for users with cost_price_mode_v2 enabled
ELSE
t.amount
END AS discounted_amount,
-- Final amount calculation
CASE
WHEN f.is_free_shipping THEN
CASE
WHEN f.is_cost_price_mode_v2 THEN t.amount * 0.9
ELSE t.amount
END
ELSE
CASE
WHEN f.is_cost_price_mode_v2 THEN (t.amount * 0.9) + t.shipping_cost
ELSE t.amount + t.shipping_cost
END
END AS final_amount
FROM transactions t
JOIN user_category_flags f
FOR SYSTEM_TIME AS OF t.proc_time
ON t.user_category = f.id
This example shows how feature flags help with:
- Free shipping for certain user categories
- Special pricing (10% discount) for qualifying users
- Final amount calculation based on active flags
Note: You must include FOR SYSTEM_TIME AS OF t.proc_time
in your query. This tells Flink which version of the cached data to use when processing each transaction record.
Full Cache Usage Guidelines#
Scenario | When to Use Full Cache | When Not to Use Full Cache |
---|---|---|
Data Size | Reference data is small enough to fit in memory (up to a few MB's) | Reference data is large (exceeding several megabytes) |
Update Frequency | Data changes infrequently (hourly, daily, or weekly updates) | Data changes frequently (every few seconds) |
Access Pattern | Read operations far outnumber updates | Only a small subset of the reference data is accessed |
Data Freshness | Application can tolerate slightly stale data between refresh intervals | Zero data staleness is required (absolute real-time lookups) |
In these scenarios, consider using a partial cache implementation or directly querying your data source for each lookup.
Alternatives#
Consider using partial caching with asynchronous lookups for more granular control
Consider using Flink's Async I/O API for non-cached asynchronous lookups
Conclusion#
Whether you're working with feature flags, configuration settings, product catalogs, or any other type of reference data that changes infrequently, this connector provides a robust and efficient solution for stream enrichment. This implementation can significantly improve the performance of your Flink applications while reducing operational complexity.
The full source code, comprehensive documentation, and working examples are available on GitHub at https://github.com/datanutshell/flink-http-full-cache-connector. I welcome contributions, feedback, and discussions to enhance its features and capabilities.
Resource: