With Flink Forward Berlin 2024 coming up fast, Ververica is spotlighting some of the interesting learnings from prior years and the engineering techniques behind them. This guest blog from Erik de Nooij, current Program Committee Member and Flink Forward Speaker, focuses specifically on data enrichment by connecting to an API endpoint using FlinkSQL, by building a novel HTTP connector for FlinkSQL at ING bank.
This blog provides additional information to what ING shared during this presentation last year at Flink Forward Seattle 2023. You can watch the recording of that session by signing into Ververica Academy. ING are long time users of Apache Flink and Ververica, and in this piece Erik shows how ING enables its business users to do API calls, including model inferencing using Flink SQL.
Introduction
Data enrichment is a common task in stream processing, and Flink offers various capabilities to connect to external sources like lookup tables, databases, or web services that can be used to fetch the required data during the processing of events in a stream.
Although calling an API from Flink SQL for data enrichment purposes was our first and main objective, without changing any code it can also be used for model inferencing in case an ML model has been wrapped in an API which may be the case if you use e.g. MLflow for model inferencing.
Background
ING Bank has been an early adopter of Apache Flink since 2016. Our first implementation was a custom Scala implementation in which we implemented a DSL (Domain Specific Language) enabling business users to configure their use cases. Note that back then Flink SQL did not exist yet, but we already adopted the same philosophy that use cases could be described in a declarative way.
For this initial custom implementation, custom code was written to be able to call APIs. However, when ING adopted Flink SQL and moved to a SQL-first approach it revealed a gap in the out of box capabilities of Flink SQL, necessitating the development of an alternative solution to do data enrichments: the HTTP connector.
Requirements for the HTTP connector
The following section lists the requirements followed by a paragraph per requirement providing additional details.
- Data enrichment must be performed asynchronously via API calls using plain SQL.
- The specifications of the API as defined in the Open-API definition must be adhered to.
- The runtime characteristics of the API endpoint should be considered.
- Life cycle management of libraries that implement an HTTP client.
Ad 1: Data enrichment must be performed asynchronously via API calls using plain SQL.
In Flink SQL, data sources and sinks are abstracted as tables. This allows you to use standard SQL to declaratively define the transformations you want to perform on the data. The logical consequence is that to support data enrichment via API calls as part of these transformations, also API endpoints need to be abstracted as tables.
By abstracting an API endpoint as a reference table, a lookup join can be used to enrich data when processing events.
A lookup join in Flink SQL is implemented using the processing Time Temporal Join syntax which is characterised by having a common key (like any other join) but also by time attributes of the participating events. Asynchronicity is achieved by inheriting from the AsyncLookupFunction class.
Ad 2: The specifications of the API as defined in the Open-API definition must be adhered to
An API endpoint that is called is often developed and operated by another team, or even another organisation than the team using the HTTP Connector to call the API, so the specification of the endpoint is a given.
This specification of an API is typically described via a Swagger file or Open API document which describes the following items:
- API Information: General information about the API, including title, version, description, and contact details.
- API Endpoints: Available endpoints of the API, including their paths, HTTP methods (GET, POST, PUT, DELETE, etc.), and parameters.
- Request and Response Formats: Data formats and models used for request payloads (inputs) and response payloads (outputs) of the API, including data types, validation rules, and example data.
- Authentication and Authorization: Description of the authentication mechanisms used by the API, such as API keys, OAuth, or JWT tokens.
- Error Handling: Documentation of the possible error responses that the API can return, including the HTTP status codes, error messages, and error code conventions.
- API Documentation: Additional descriptive information about the API, such as usage guidelines, examples, and explanations for each endpoint.
- API Schema: Definition of a schema or data model for the API, describing the structure and properties of the data objects used in the API.
Ad 3: The runtime characteristics of the API endpoint should be considered
The HTTP connector should be able to deal with the characteristics of a specific API being:
- Scalability
- Although APIs are typically designed to scale there is a limit to the amount of TPS they can handle. To avoid flooding APIs with requests, caching has been implemented on the following two levels.
- Result caching. If the result of the API call has been cached it is returned immediately
- Request caching. if the same request (same "key") is pending, the request is coupled with the previous request instead of calling the API again.
- Although APIs are typically designed to scale there is a limit to the amount of TPS they can handle. To avoid flooding APIs with requests, caching has been implemented on the following two levels.
- Security
- APIs typically require authentication, often implemented interpreting a credential passed on via the HTTP header. The table definition has columns that can be populated for the HTTP connector to set the HTTP headers
- Versioning
- APIs often undergo updates and changes over time. When the definition of the API changes this will be handled by generating a new table definition and give it a new unique table name.
- Data format
- APIs can support various data formats, such as JSON, XML, or custom formats
- Response Handling
- A well-designed API provides specific HTTP codes with corresponding specific response messages. This includes the error codes. Each HTTP code results in a specific column in the table definition containing the field(s) to store those response (error messages.
Ad 4: Life cycle management of libraries that implement an HTTP client
To have the HTTP connector perform an API call an HTTP client is required. There are many libraries available that implement an HTTP client, some of them are compatible with the libraries used by Flink but some are not. To avoid dependencies issues, an HTTP client of our choosing has been used that has been deployed exclusively in a side car container that co-exists in the pod of the Flink taskmanager. From the Flink taskmanager the sidecar is called using an HTTP client that is implemented in a library already available as part of the Flink libraries.
Using the HTTP connector from a user perspective
Now that the requirements are clear, let’s look at using the HTTP connector from the perspective of a user that wants to do an API call in a use case built using Flink SQL.
Step 1: Convert the open-api file to a Flink SQL table definition
To convert an open-api file to a Flink SQL table definition the following configuration is needed by the conversion script:
- The path to the open-api-file.
- Which endpoint to use from the open-api-file, since typically an open-api file defines multiple endpoints
- Which method to use, this can be get or post. Note that the post method is not used to do an update but to do a functional get by passing on the required parameters via the request payload and not via the query parameters to the url.
- The path that is to be used by the http client to construct the url
- The primary key column(s) which are one or more fields used in the lookup join
- Etc, the full list is in the paragraph on implementation details
Based on the configuration the following is generated:
- The Flink SQL table definition: especially for large open-api definition files generating the Flink SQL table definition instead of manually creating it, is the only way to make this an effortless and errorfree exercise.
- Technical information needed to execute the API call stored as table options: From the open-api file several parameters are retrieved that are needed by the HTTP connector to do the API call, these parameters are added to the table definition as table options, e.g.
The table definition that is generated contains both the key-fields that are used in the request as well as the fields that are used in the response. In other words, the table is being used as an abstraction to pass information back and forth between the Flink job and the API. In the appendix there is a sample open-api file and the corresponding generated Flink SQL table definition.
Step 2: Create the table definition for the input table and output table
A lookup join enriches information based on a (compound) key field from an input table and uses the enriched information to populate the fields of an output table.
In the example that we will be using below the input table only contains a customer identifier which is used to do the API call. For the given customer identifier, the API will return firstName, lastName and date_of_birth.
Note: The table definitions of the input and output table can be found in the appendix
Step 3: Creating the lookup join
Now that we have the definitions of all tables, we can write our query. The query is a standard lookup join shielding all the technical details of the API call that are being made under the hood. This enables every end user with standard SQL knowledge to do data enrichments without the need to have technical knowledge of the API. In other words, something inherently complex has been made very easy to use.
INSERT INTO `outputTable` (
`id`,
`firstName`,
`lastName`,
`date_of_birth`
)
SELECT
inputTable.`id`,
apiTable.`firstName`,
apiTable.`lastName`,
apiTable.`date_of_birth`
FROM inputTable
INNER JOIN apiTable FOR SYSTEM_TIME AS OF inputTable.procTime
ON inputTable.id = apiTable.id
WHERE apiTable.httpStatusCode = 200
Step 4: Deploy the Flink job and test it
Once the Flink job has been deployed, an event on the input topic triggers the API call and in this example the customer ID is found, and the fields returned by the API are written to the output topic.
Note: The screenshot above has been taken from the outputTable in the Flink SQL editor. The field values shown are those returned by the API.
Error handling
Proper error handling is important for any IT system but maybe even more so when doing an a-synchronous API call from a stream to an API that you do not have any control over.
When calling an API, errors can be divided into two categories: The “known” documented errors that are described in the open-api file for the various http return codes errors and the ones that you don’t have any control over like network errors.
The known documented errors
If the API returns a http code code other than 200 then the specific field for that code is being populated. E.g. calling the API with a non-existing key will populate the httpcode404 field and leave the other fields empty. The same mechanism is in place for all http codes defined in the swagger file. Additionally, there is a fall back implemented in case an HTTP code is returned that has not been defined in the open-api file since documentation tends not to be flawless.
The unknown errors
Errors in this category are timeouts, network issues and similar issues that prevent the request to either reach the API or that prevent the API to return a response. The timeouts can be detected by the Http Client in the sidecar which will then return a HTTP 408 return code.
Model inferencing and updating and deleting data via API calls
The original purpose behind the HTTP connector was to be able to do data enrichments using plain SQL. Technically this can be done by a get in case the parameters are passed on via the url or a post in case the parameters are passed on via the request body.
Over time additional requirements were met with only slight additions to the HTTP connector.
- Model inferencing: Model inferencing did not require a change in the HTTP connector, it was implemented by wrapping the model in an API and describing the API using an open-api file.
- Updating, creating and deleting information via an API call. Since the post method was already implemented it only required a small change to use it to call an API to update or create information. The only change required was to disable the caching which was in place in case the post method was used to perform a functional get. An argument could be made that changing information should be done via a sink at the end of the execution plan. Reason that this was not implemented that way, was because the lookup join had additional advantages in error handling. In other words, depending on the http code being returned one could handle the event differently. In the case of a sink this would be a send and pray scenario.
Implementation details
This section peeks under the hood and explains several implementation details.
The behaviour of an API table is configured via table options which are part of the table definition and are generated by converting an open-api file. Below is an example of a table where the various options are detailed further down.
CREATE TABLE apiTable (
-- See Appendix for an example of the fields that are generated for a given open--- api file
) WITH (
-- connector options
'connector' = 'api-endpoint-ing', -- Mandatory table options
'host' = 'api.mycompany.com',
'method' = 'post',
'path-template' = '/lookup',
'status-code-column' = 'httpStatusCode',
'response-column-prefix' = 'httpcode',
'format' = 'json', -- Optional table options
'base-uri' = '${api_base_uri}',
'http-connect-timeout' = '5 seconds',
'http-socket-timeout' = '10 seconds',
Etc, see further down
)
The connector 'api-endpoint-ing'
The API table is a Dynamic table meaning Flink does not own the data itself. The content of a Dynamic table comes from an external system, in this case it comes from an API response. Implementing the connector can be done by hooking into the existing stack. To make it a bit more specific, the custom 'api-endpoint-ing' connector has been implemented by creating a class 'ApiDynamicTableFactory' which implements the Java interface DynamicTableSourceFactory which has several methods that need to be implemented as document below.
Source: https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sourcessinks/#overview
The function `factoryIdentifier` registers the connector `api-endpoint-ing` to Flink
@Override
public String factoryIdentifier() {
return "api-endpoint-ing";
}
The function `requiredOptions` makes the mandatory table options known to the connector so that the connector can construct the request and knows in which columns to store the result. The actual values come from the mandatory table options
@Override
public Set<ConfigOption<?>> requiredOptions() {
final Set<ConfigOption<?>> options = new HashSet<>();
options.add(HOST); // FQDN of the host that serves the API
options.add(METHOD); // post or get
options.add(PATH_TEMPLATE); // Query URL parameter, e.g. /lookup
options.add(STATUS_CODE_COLUMN); // Field used to store the httpcode
options.add(RESPONSE_COLUMN_PREFIX); // Prefix applied to all column names
return options;
}
The function `optionalOptions ` is similar to the function `requiredOptions` with the difference that these options are optional and are used for more specific APIs and to optimize the runtime behavior of the connector to fit the runtime characteristics of the API.
@Override
public Set<ConfigOption<?>> optionalOptions() {
final Set<ConfigOption<?>> options = new HashSet<>();
options.add(BASE_URI); // base-uri used to call the api
options.add(REQUEST_BODY_COLUMN); // Column name for for the request-body
options.add(QUERY_COLUMNS); // Columns names mapped to query-arguments
options.add(HEADER_COLUMNS); // Columns names mapped to header values
options.add(HTTP_CONNECT_TIMEOUT); // Timeout setting for the HTTP connection
options.add(HTTP_SOCKET_TIMEOUT); // Timeout setting for the SOCKET connection
options.add(HTTP_VALIDATE_AFTER_INACTIVITY); //Re-validate connections setting
options.add(HTTP_TIME_TO_LIVE); // Total span of time connections
return options;
}
@Override
public DynamicTableSource createDynamicTableSource(Context context)
…
// Helper functions implementing the functionality of the DynamicTableSource
…
return new ApiDynamicTableSource(
apiEndpointDefinition,
decodingFormat,
encodingFormat,
producedDataType,
cache);
}
Finally, the core of the runtime behavior is in the `ApiRowDataAsyncLookupFunction` class which has all parameters to its proposal and implements the construction of the HTTP request and handles the response.
Discussions & open questions
Is the HTTP Connector open source?
No, it has not been open sourced at this moment. Reason being is that there are several implementation details that are ING specific. Then again, certain parts can be beneficial to other users of Flink SQL and can be open-sourced. E.g. the tool to generate a table definition based on an open-api file which is very similar to the tool we have built to convert an. avro schema to a Flink SQL table definition.
Both the HTTP connector and the ING specific Kafka connector are based on standard connectors with additional ING specific security functionality which is the reason why the source code of these connectors cannot be open sourced in its entirety.
How does the HTTP connector relate to FLIP_437 number?
The HTTP connector can also be used for model inferencing based on the condition that the ML model is wrapped in an API. If that condition is met, then an end-user can do model inferencing using plain SQL. The approach that is outlined in FLIP 437 is a programmatically approach that requires implementing a Java interface.
Future outlook
ING has adopted a SQL first approach for its Streaming Data Analytics (SDA) platform and where possible a SQL only approach. Paramount to its success is being able to abstract all sources and sinks using tables and where needed develop the relevant connectors.
Moving forward, SDA is expected to play a bigger role in the orchestration of model inferencing via integration with a feature store. Last but not least, we are investigating if and how SDA can play a role in the RAG architecture of Generative AI and LLM’s. Developments in these areas will be shared via additional future blogs and possible on Flink Forward (subject to abstract approval).
More resources
If you'd like to learn more about Apache Flink and network with the Flink Community, including Erik, tickets for Flink Forward 2024 in Berlin are available here.
Ready to try Apache Flink for yourself? Spin up Ververica's cloud offering with $400 free credits to get started.
Appendix
This appendix contains the files that were used in the example described earlier in this blog.
- The swagger file: individual.yaml
- The table definition of the apiTable: apiTable.sql
- The table definition of the inputTable: inputTable.sql
- The table definition of the outputTable: outputTable.sql