Flink HTTP Full Cache Connector

Efficient Enrichment Strategy for Infrequently Changing Reference Data in Apache Flink Streaming Applications

André Santos

By André Santos

Published on June 25, 2025

Introduction#

In streaming data processing, one of the most common requirements is enriching with reference data. For instance, adding customer information to transactions, product details to orders, or feature flags to application logs.

Recently, I faced a scenario where I needed to enrich a Flink stream with infrequent reference data provided by a web API endpoint. The data was small and didn't change often, making it ideal for a caching approach rather than performing lookups for each record.

After researching Apache Flink's documentation, I stumble on FLIP-221, which discusses abstraction for lookup source caching. It outlines two main caching strategies:

  1. Partial Caching: Data is loaded into the cache on-demand. When a key is looked up, if it's not in the cache, the system fetches it from the external source and stores it for future use. This approach uses an eviction policy to manage the cache size.

  2. Full Caching: When the reference data is small enough to fit in memory and doesn't change frequently, it's more efficient to load all entries into the cache at once. This approach completely eliminates network I/O during stream processing and refreshes the entire cache periodically.

My requirements aligned perfectly with the Full Caching approach:

  • The stream could work with slightly stale data until the next cache refresh
  • Processing needed to continue uninterrupted during cache refresh retries
  • The lookup data was small (feature flags, configuration settings)
  • The data changed infrequently

While I found several connectors implementing the partial caching strategy (like GetInData's HTTP connector), I couldn't find a proper implementation of the full caching approach for HTTP APIs. So I decided to build one.

Implementing the HTTP Full Cache Connector#

The connector I created has three main components:

  1. HttpLookupTableSource: The main entry point that implements Flink's LookupTableSource interface and uses FullCachingLookupProvider to handle the caching strategy.
  2. HttpLookupTableSourceFactory: Handles configuration options and creates instances of the table source with appropriate parameters.
  3. HttpRowDataInputFormat: Handles the actual HTTP requests, JSON parsing, and data transformation needed to fetch and process the reference data.

The connector provides several key features:

  • Periodic Cache Refresh: Automatically refresh the cache at configurable intervals
  • Resilient HTTP Handling: Built-in retry mechanism with configurable delays and timeouts
  • JSON Processing: Extract specific data using JSON pointers from complex API responses
  • Full Type Support: Compatible with Flink's row data types for seamless integration
  • Atomic Updates: Cache is updated atomically to prevent inconsistent data

Using the Connector#

To use the connector in a Flink SQL job, you can define a table like this:

sql
CREATE TABLE user_category_flags (
  id INT,
  is_cost_price_mode_v2 BOOLEAN,
  is_free_shipping BOOLEAN
) WITH (
  'connector' = 'http-lookup-full-cache',
  'url' = 'http://api.example.com/users_categories_flag',
  'xpath' = 'flags_list', 
  'cache.refresh-interval' = 'PT1M'
);

Here's an example of using the HTTP Full Cache table to enrich a transaction stream and calculate different amounts based on feature flags:

sql
SELECT 
  t.transaction_id,
  t.user_id,
  t.user_category,
  t.product_id,
  t.amount,
  -- Apply different shipping calculation based on the feature flag
  CASE 
    WHEN f.is_free_shipping THEN t.amount
    ELSE t.amount + t.shipping_cost
  END AS total_amount,
  -- Apply different pricing model based on the feature flag
  CASE 
    WHEN f.is_cost_price_mode_v2 THEN 
      t.amount * 0.9  -- 10% discount for users with cost_price_mode_v2 enabled
    ELSE 
      t.amount
  END AS discounted_amount,
  -- Final amount calculation
  CASE 
    WHEN f.is_free_shipping THEN 
      CASE 
        WHEN f.is_cost_price_mode_v2 THEN t.amount * 0.9
        ELSE t.amount
      END
    ELSE 
      CASE 
        WHEN f.is_cost_price_mode_v2 THEN (t.amount * 0.9) + t.shipping_cost
        ELSE t.amount + t.shipping_cost
      END
  END AS final_amount
FROM transactions t
JOIN user_category_flags f 
  FOR SYSTEM_TIME AS OF t.proc_time 
  ON t.user_category = f.id

This example shows how feature flags help with:

  1. Free shipping for certain user categories
  2. Special pricing (10% discount) for qualifying users
  3. Final amount calculation based on active flags

Note: You must include FOR SYSTEM_TIME AS OF t.proc_time in your query. This tells Flink which version of the cached data to use when processing each transaction record.

Full Cache Usage Guidelines#

ScenarioWhen to Use Full CacheWhen Not to Use Full Cache
Data SizeReference data is small enough to fit in memory (up to a few MB's)Reference data is large (exceeding several megabytes)
Update FrequencyData changes infrequently (hourly, daily, or weekly updates)Data changes frequently (every few seconds)
Access PatternRead operations far outnumber updatesOnly a small subset of the reference data is accessed
Data FreshnessApplication can tolerate slightly stale data between refresh intervalsZero data staleness is required (absolute real-time lookups)

In these scenarios, consider using a partial cache implementation or directly querying your data source for each lookup.

Alternatives#

  • Consider using partial caching with asynchronous lookups for more granular control

  • Consider using Flink's Async I/O API for non-cached asynchronous lookups

Conclusion#

Whether you're working with feature flags, configuration settings, product catalogs, or any other type of reference data that changes infrequently, this connector provides a robust and efficient solution for stream enrichment. This implementation can significantly improve the performance of your Flink applications while reducing operational complexity.

The full source code, comprehensive documentation, and working examples are available on GitHub at https://github.com/datanutshell/flink-http-full-cache-connector. I welcome contributions, feedback, and discussions to enhance its features and capabilities.

Resource: