Testing your Apache Flink SQL code is a critical step in ensuring that your application is running smoothly and provides the expected results. Flink SQL applications are used for a wide range of data processing tasks, from complex analytics to simple SQL jobs. A comprehensive testing process can help identify potential issues early in the development process and ensure that your application works as expected.
This post will go through several testing possibilities for your Flink SQL application.
Manual vs Automated Testing
Using a SQL client is an effective way to quickly and easily test your Flink SQL code. SQL clients are designed to provide an interactive environment where you can run SQL queries and view the results. This makes it easy to test your code and make changes quickly.
However, you can mostly only perform manual testing with SQL clients. For more comprehensive testing, you should use automated testing tools. Automated testing tools can provide a way to test your code with multiple data sets and various scenarios. This can help identify issues that may not be immediately visible in manual testing.
Automated tests include unit and integration testing. Unit tests are used to test individual components of the application, while integration tests are used to test the integration between different components. Both of them help identify issues related to data processing, such as incorrect SQL syntax or incorrect data transformation.
Testing via Unit & Integration Tests
This testing method involves using unit and integration tests to validate the behavior of your code. It is easy to automate, which makes it a convenient option for testing smaller units, such as parts of a query or function. Additionally, it is highly customizable, allowing you to define individual test scenarios with specific input data without changelog streams and control over row time attributes.
Another benefit of this approach is that it allows for quick turnaround times when developing user-defined functions. This can help you identify type inferencing problems and other issues early in the development process.
However, this testing method involves using non-SQL code, which may require some knowledge of Java or Scala to use it effectively. This could include understanding the syntax and basic concepts of these programming languages, as well as any specialized libraries or frameworks that are used with them. While this may require some upfront investment in learning, it can be a great way for those who are familiar with these languages and are looking to automate certain processes or tasks.
Unit & Integration Tests in Flink 1.12 or below
If you need to supply input data for testing purposes, one option is to use the TableEnvironment.fromValues() method. However, it is important to note that this method only supports inserts and does not allow for changelog streams. Additionally, it does not provide any control over the order of the data, which may be necessary in some cases. Furthermore, there are no rowtime or watermarking options available.
An alternative approach is to use the values connector with the TestValuesTableFactory. This allows you to define input tables using the full range of options available in the table DDL. This includes the ability to specify rowtime and watermarking attributes, as well as any other specialties of the table DDL. However, it is important to note that this is a non-public API, which means it is not intended for use in production environments and may not be fully supported by the vendor.
If you want to collect the results of a query or operation in a table, one option is to use the TableResult.collect() method. This method retrieves the results as a CloseableIterator<Row>, which is a specialized iterator that allows you to iterate over the rows in the result set.
To create a shared Flink cluster, you will need to follow a few steps:
- Initialize the stream environment: This involves creating an instance of the StreamExecutionEnvironment class and configuring it with the appropriate settings, such as the parallelism and checkpointing parameters.
- Initialize the table environment: This involves creating an instance of the StreamTableEnvironment class and linking it to the stream environment allowing you to use the Table API to define and manipulate tables.
- Define the input table: This involves creating a table from data in an external source, such as a file or a database. You can use the table API or SQL to define the schema and transform the data as needed.
- Run the query and retrieve the results: Once you have defined the input table, you can run a query or perform some other operation on it. You can then use the collect() method or another method to retrieve the results as a TableResult object.
- Test and verification: Finally, you will want to verify that the results are correct and meet your expectations. You can do this by manually inspecting the results, comparing them to expected values, or using automated testing techniques.
import org.junit.Before;
import org.junit.ClassRule;
import org.junit.Test;
import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.junit.Assert.assertThat;
/** Integration test for the built-in LISTAGG function. */
public class ListAggITCase112 {
@ClassRule
public static MiniClusterWithClientResource flinkCluster =
new MiniClusterWithClientResource(
new MiniClusterResourceConfiguration.Builder()
.setNumberSlotsPerTaskManager(4)
.setNumberTaskManagers(1)
.build());
protected StreamExecutionEnvironment env;
protected StreamTableEnvironment tEnv;
@Before
public void setUp() {
env = StreamExecutionEnvironment.getExecutionEnvironment();
// configure environment as needed
env.setParallelism(4);
env.getConfig().setRestartStrategy(RestartStrategies.noRestart());
env.setStateBackend(new RocksDBStateBackend((StateBackend) new MemoryStateBackend()));
// create table environment
tEnv =
StreamTableEnvironment.create(
env, EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build());
TestValuesTableFactory.clearAllData();
// initialize and register any UDFs you need, e.g.
// tEnv.createTemporaryFunction("MyUDF", MyUDF.class);
}
private void createSource(Row... inputData) {
// can use this instead if there are only inserts and no need for row times, watermarks,...:
// tEnv.createTemporaryView("input", tEnv.fromValues((Object[]) inputData));
final String createSource =
String.format(
"CREATE TABLE input ( \n"
+ " `name` STRING,\n"
+ " `age` INT"
+ ") WITH (\n"
+ " 'connector' = 'values',\n"
+ " 'data-id' = '%s',\n"
+ " 'changelog-mode' = 'I,UA,UB,D'\n"
+ ")", TestValuesTableFactory.registerData(Arrays.asList(inputData)));
tEnv.executeSql(createSource);
}
private List getResult() throws Exception {
TableResult resultTable =
tEnv.executeSql("SELECT age, LISTAGG(DISTINCT name) FROM input GROUP BY age");
return getRowsFromTable(resultTable);
}
private static List getRowsFromTable(TableResult resultTable) throws Exception {
try (CloseableIterator rowCloseableIterator =
resultTable.collect()) {
List results = new ArrayList<>();
rowCloseableIterator.forEachRemaining(results::add);
return results;
}
}
@Test
public void testListAgg() throws Exception {
createSource(
Row.ofKind(RowKind.INSERT, "john", 32),
Row.ofKind(RowKind.UPDATE_BEFORE, "john", 32),
Row.ofKind(RowKind.UPDATE_AFTER, "john", 33));
assertThat(
getResult(),
containsInAnyOrder(
Row.ofKind(RowKind.INSERT, 32, "john"),
Row.ofKind(RowKind.DELETE, 32, "john"),
Row.ofKind(RowKind.INSERT, 33, "john")));
}
}
Unit & Integration Tests in Flink 1.13 or above
If you need to supply input data for testing purposes in Flink 1.13 (or later), one option is to use the TableEnvironment.fromValues() method. However, it is important to note that this method has the same limitations as in previous versions of Flink (1.12 or earlier).
An alternative approach is to use the StreamTableEnvironment.fromChangelogStream() method, which allows you to define the input as a DataStream<T> or DataStream<Row> with RowKind attributes. This method offers automatic type conversion and preserves event-time and watermarks across operations. Furthermore, it allows you to define custom schema definitions just like in a table DDL, providing greater flexibility and control over the input data. Overall, using the fromChangelogStream() method can be a more powerful and versatile way to supply input data for testing in Flink 1.13 (or later).
If you want to collect the results of a query or operation in a table, one option is to use the TableResult.collect() method. This method retrieves the results as a CloseableIterator<Row>, which is a specialized iterator that allows you to iterate over the rows in the result set.
One of the advantages of using the collect() method is that it allows for fine-grained comparisons of the results, including the RowKind attributes. RowKinds are used to indicate the type of change that occurred to a row in a changelog stream, and can have values such as "+I" (insert), "-U" (update), "+U" (update), and "-D" (delete). By comparing the RowKinds of the actual results to the expected values, you can more easily identify any discrepancies or issues with the query or operation.
protected StreamExecutionEnvironment env;
protected StreamTableEnvironment tEnv;
/** Integration test for the built-in LISTAGG function. */
public class ListAggITCase112 {
@ClassRule
public static MiniClusterWithClientResource flinkCluster =
new MiniClusterWithClientResource(
new MiniClusterResourceConfiguration.Builder()
.setNumberSlotsPerTaskManager(4)
.setNumberTaskManagers(1)
.build());
@Before
public void setUp() {
env = StreamExecutionEnvironment.getExecutionEnvironment();
// configure environment as needed
env.setParallelism(4);
env.getConfig().setRestartStrategy(RestartStrategies.noRestart());
env.setStateBackend(new EmbeddedRocksDBStateBackend());
env.getCheckpointConfig().setCheckpointStorage(new JobManagerCheckpointStorage());
// create table environment
// use EnvironmentSettings.newInstance().useBlinkPlanner() to get the planner if using Flink 1.13 or 1.14
tEnv =
StreamTableEnvironment.create(
env, EnvironmentSettings.newInstance().inStreamingMode().build());
// initialize and register any UDFs you need, e.g.
// tEnv.createTemporaryFunction("MyUDF", MyUDF.class);
}
private void createSource(Row... inputData) {
DataStreamSource changelogStream = env.fromElements(inputData);
tEnv.createTemporaryView("input",
tEnv.fromChangelogStream(changelogStream).as("name", "age"));
}
private List getResult() throws Exception {
Table resultTable = tEnv.sqlQuery("SELECT age, LISTAGG(DISTINCT name) FROM input GROUP BY age");
DataStream resultStream = tEnv.toChangelogStream(resultTable);
return getRowsFromDataStream(resultStream);
}
private static List getRowsFromDataStream(DataStream resultStream) throws Exception {
try (CloseableIterator rowCloseableIterator = resultStream.executeAndCollect()) {
List results = new ArrayList<>();
rowCloseableIterator.forEachRemaining(results::add);
return results;
}
}
@Test
public void testListAgg() throws Exception {
createSource(
Row.ofKind(RowKind.INSERT, "john", 32),
Row.ofKind(RowKind.UPDATE_BEFORE, "john", 32),
Row.ofKind(RowKind.UPDATE_AFTER, "john", 33));
assertThat(
getResult(),
containsInAnyOrder(
Row.ofKind(RowKind.INSERT, 32, "john"),
Row.ofKind(RowKind.DELETE, 32, "john"),
Row.ofKind(RowKind.INSERT, 33, "john")));
}
Note: Use EnvironmentSettings.newInstance().useBlinkPlanner() to get the planner if you are using Flink 1.13 or 1.14.
Conclusion
There are several testing possibilities for SQL queries and Table API constructs in Flink. One option is to manually verify the results by executing the queries and inspecting the output. This can be a quick and easy way to test small changes or simple queries, but it may not be practical for larger or more complex cases.
Another option is to use automated tests written in Java or Scala. This can be more efficient and allow you to test a wider range of scenarios, including edge cases and error conditions. Automated tests can be run as part of a continuous integration or delivery pipeline, helping to ensure that your code is reliable and behaves as expected.
The specific APIs and tools available for testing may vary depending on the version of Flink that you are using. For example, in Flink 1.12 (or earlier), you may have access to certain testing APIs and utilities that are not available in Flink 1.13 (or later). It is important to familiarize yourself with the testing capabilities of your specific version of Flink in order to choose the best approach for your needs.
Resources and further reading:
- DataStreamJavaITCase is an elaborate example of unit-testing with fromChangelogStream()/toChangelogStream() and similar DataStream interoperability tools
- SQL/Table API ↔ DataStream conversion
- DataStream API Testing