Stream Processing & Apache Flink - News and Best Practices

Flink's Test Harnesses Uncovered

Written by Ververica | 17 May 2023

When working with Apache Flink, developers often face challenges while testing user-defined functions (UDFs) that utilize state and timers. In this article we will answer a question "How to test user-defined functions (UDFs) using Flink's test harnesses".

Using Flink’s test harnesses

Testing User-Defined Functions (UDFs) that use Flink state and timers can be complex, especially when using functions such as KeyedProcessFunction. Flink includes a set of test harnesses that are specifically designed to make this task easier. These test harnesses were created to help verify the correctness of Flink's built-in operators, but they can also be used to test custom UDFs that use Flink state and timers. By using these test harnesses, developers can ensure that their UDFs are working correctly and are able to handle different types of input data. This can save time and effort during the development process and help ensure that the UDFs are reliable and accurate.

The graph illustrates that the KeyedOneInputStreamOperatorTestHarness encompasses a KeyedProcessOperator, which in turn contains the KeyedProcessFunction slated for evaluation.

Configurations for the Test Harnesses

You need to add some dependencies to your project in order to leverage the test harnesses.

To test DataStream jobs, you may add the following in the dependencies block of pom.xml for your Maven project:

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-test-utils</artifactId>
    <version>1.17.0</version>
    <scope>test</scope>
</dependency>

To test Table/SQL jobs, you may add the following in the dependencies block of pom.xml for your Maven project, in addition to the aforementioned flink-test-utils:

<dependency>
	<groupId>org.apache.flink</groupId>
	<artifactId>flink-table-test-utils</artifactId>
	<version>1.17.0</version>
	<scope>test</scope>
</dependency>

Note: The module flink-table-test-utils was introduced in Flink 1.15 and is considered experimental.

Testing user functions through an operator

Testing user functions through an operator can be a useful approach, as it allows you to test the function in the context of the operator and see how it behaves when used in a Flink pipeline.

It's important to note that Flink only provides test harnesses for operators, so you will need to manually instantiate the appropriate operator in order to use these test harnesses. This means that you will need to create an instance of the operator and configure it with the necessary inputs and parameters. Once you have done this, you can use the test harnesses to verify the correctness of your user function. It's worth noting that this approach can be more complex than using other testing methods, as it requires you to manually set up the operator and configure it with the appropriate inputs.

@Test
public void testMyProcessFunction() throws Exception {
    KeyedProcessOperator<String, String, String> operator =
        new KeyedProcessOperator<>(new MyKeyedProcessFunction());

    // setup test harness
    // push in data
    // verify results
}

To test a user function in Flink, it is first necessary to create the appropriate TestHarness. Once you have created the TestHarness, you can use it to test various aspects of the user function and the operator it is used in. For example, you can test that processing an element with the operator creates a state and that this state is later cleared. To do this, you can use the TestHarness to send test data through the operator and then check the state to ensure that it has been properly created and later cleared. This can be an important step in the testing process, as it helps to ensure that the operator and user function are working correctly and can handle the state properly.

public class MyKeyedProcessFunctionTest {
   private KeyedOneInputStreamOperatorTestHarness<Long, Long, Long> testHarness;
​
   @Before
   public void setupTestHarness() throws Exception {
​
       KeyedProcessOperator<Long, Long, Long> operator =
               new KeyedProcessOperator<>(MyKeyedProcessFunction);
​
       testHarness =
               new KeyedOneInputStreamOperatorTestHarness<>(operator, e -> e.key, Types.LONG);
​
       testHarness.open();
   }
​
   @Test
   public void testingStatefulFunction() throws Exception {
​
       assertThat(testHarness.numKeyedStateEntries(), is(0));
       testHarness.setProcessingTime(0L);
       
       testHarness.processElement(2L, 100L);
       assertThat(testHarness.numKeyedStateEntries(), is(not(0)));
       
       testHarness.setProcessingTime(3600000L);
       assertThat(testHarness.numKeyedStateEntries(), is(0));
   }
}

Testing timer behavior

When testing a user function in Flink, it is essential to test various aspects of its behavior. One common thing to test is the creation and firing of timers. You can use the TestHarness to send test data through the operator and verify that a timer is created. You can then advance the watermark and verify that the timer has fired. In addition to testing timers, you may also want to test that processing an element creates a state and verify the results of this processing.

Suppose we expect a timer to be fired for our test target within 20 milliseconds. The example code below demonstrates how we can test it with a test harness.

@Test
public void testTimelyOperator() throws Exception {
​
    // setup initial conditions
    testHarness.processWatermark(0L);
    assertThat(testHarness.numEventTimeTimers(), is(0));
​
    // send in some data
    testHarness.processElement(3L, 100L);
​
    // verify timer
    assertThat(testHarness.numEventTimeTimers(), is(1));
​
    // advance the time to 20 in order to fire the timer.
    testHarness.processWatermark(20);
    assertThat(testHarness.numEventTimeTimers(), is(0));
​
    // verify results
    assertThat(testHarness.getOutput(), containsInExactlyThisOrder(3L));
    assertThat(testHarness.getSideOutput(new OutputTag<>("invalidRecords")), hasSize(0))
}

Limits

There are a few caveats to keep in mind when using Flink's test harnesses to test user functions. One important thing to note is that these test harnesses only allow you to test a single operator, rather than a whole pipeline of operators. This means that if you want to test a user function that is used in a pipeline of operators, you'll need to set up and test each operator separately.

It's also important to note that these test harnesses are considered internal, which means that the API could change between minor versions of Flink. While these changes are generally backward compatible, it's possible that you may need to make some updates to your tests if you upgrade to a new minor version of Flink. Overall, it's a good idea to keep an eye on the Flink documentation and release notes to stay informed about any changes to the test harnesses and other internal APIs.

Conclusion

In conclusion, ensuring that a Flink application is working correctly is essential, and testing is a crucial part of this process. There are various tools available for testing Flink applications, including Flink's test harnesses and the ability to test user functions through an operator. It is important to test the application at multiple levels, including unit testing functions that use state and timers, integration testing, and performance testing. This multi-level testing approach helps to ensure that the application is working as expected and is ready for production.