QuestDB Flink connector
Apache Flink is a popular framework and stream processing engine. QuestDB ships a QuestDB Flink Sink connector for fast ingestion from Apache Flink into QuestDB. The connector implements the Table API and SQL for Flink.
Quick start
This section shows the steps to use the QuestDB Flink connector to ingest data from Flink into QuestDB. The connector uses the SQL interface to interact with Flink. The overall steps are the followings:
- The connector creates a table in Flink backed by QuestDB.
- The connector inserts data into the table.
- Finally it queries the data in QuestDB.
Prerequisites
- A local JDK version 11 installation
- Docker for running QuestDB
Connector installation
-
Start the QuestDB container image:
docker run -p 9000:9000 -p 9009:9009 questdb/questdb:8.1.0
-
Download Apache Flink distribution and unpack it.
-
Download the QuestDB Flink connector from Maven Central and place it in the
lib
directory of your Flink installation. -
Go to the
bin
directory of your Flink installation and run the following to start a Flink cluster:./start-cluster.sh
-
While still in the
bin
directory, start a Flink SQL console by running:./sql-client.sh
Then, run the following SQL command in the Flink SQL console:
CREATE TABLE Orders (
order_number BIGINT,
price BIGINT,
buyer STRING
) WITH (
'connector'='questdb',
'host'='localhost'
);Expected output:
[INFO] Execute statement succeed.
This command created a Flink table backed by QuestDB. The table is called
Orders
and has three columns:order_number
,price
, andbuyer
. Theconnector
option specifies the QuestDB Flink connector. Thehost
option specifies the host and port where QuestDB is running. The default port is9009
. -
While still in the Flink SQL console execute:
INSERT INTO Orders values (0, 42, 'IBM');
Expected output:
[INFO] SQL update statement has been successfully submitted to the cluster:
Job ID: <random hexadecimal id>This command used Flink SQL to insert a row into the
Orders
table in Flink. The table is connected to QuestDB, so the row is also into QuestDB. -
Go to the QuestDB web console http://localhost:9000 and execute this query:
SELECT * FROM Orders;
You should see a table with one row.
Congratulations! You have successfully used the QuestDB Flink connector to ingest data from Flink into QuestDB. You can now build Flink data pipelines that use QuestDB as a sink.
See the QuestDB Flink connector GitHub repository for more examples.
Configuration
The QuestDB Flink connector supports the following configuration options:
Name | Type | Example | Default | Meaning |
---|---|---|---|---|
host | string | localhost:9009 | N/A | Host and port where QuestDB server is running |
username | string | testUser1 | admin | Username for authentication. The default is used when also token is set. |
token | string | GwBXoGG5c6NoUTLXnzMxw | admin | Token for authentication |
table | string | my_table | Same as Flink table name | Target table in QuestDB |
tls | boolean | true | false | Whether to use TLS/SSL for connecting to QuestDB server |
buffer.size.kb | integer | 32 | 64 | Size of the QuestDB client send buffer |
sink.parallelism | integer | 2 | Same as upstream processors | QuestDB Sink Parallelism |
Example configuration for connecting to QuestDB running on localhost:
CREATE TABLE Orders (
order_number BIGINT,
price BIGINT,
buyer STRING
) WITH (
'connector'='questdb',
'host'='localhost',
'table' = 'orders'
);
Connector Distribution
The connector is distributed as a single jar file. The jar file is available in the Maven Central repository and it's available under the following coordinates:
<dependency>
<groupId>org.questdb</groupId>
<artifactId>flink-questdb-connector</artifactId>
<version>LATEST</version>
</dependency>
FAQ
Q: Why is QuestDB client not repackaged into a different Java package?
A:
QuestDB client uses native code, this makes repackaging hard.
Q: I need to use QuestDB as a Flink source, what should I do?
A: This
connector is Sink only. If you want to use QuestDB as a Source then your best
chance is to use
Flink JDBC source
and rely on
QuestDB Postgres compatibility.