PyFlink Series (Part 7)
Back to: Part 6 (Data Stream API Job with Local Kinesis Source and DynamoDB Sink)
In Parts 3 and 5 we built Kafka-to-Kafka and Kafka-to-Postgres pipelines using the DataStream API. Those jobs required builder patterns, manual serialization schemas, and (in the Postgres case) a custom Java escape hatch just to get data into JDBC.
Flink offers two higher-level APIs that can dramatically simplify these same jobs: the Table API and the
SQL API. Both operate on a TableEnvironment and use Flink’s connector catalog to define sources and
sinks declaratively. The connector handles serialization, schema mapping, and sink mechanics for you.
The tradeoff is control. The DataStream API gives you per-event access, custom operators, managed state, and process functions. If your pipeline is primarily moving structured data between systems, with filtering and transformations that can be expressed as projections or predicates, the Table and SQL APIs are the better tool for the job.
Choosing the Right API
Before we look at the code, it’s worth understanding when you’d reach for each API.
DataStream API is your choice when you need fine-grained, per-event processing. Custom operators, managed state, complex windowing, side outputs, and process functions all live here. You’re writing imperative Python code that handles each record individually. The cost is more boilerplate, more manual wiring, and (as we saw in Parts 5 and 6) the occasional escape hatch when Python bindings don’t exist for a connector.
Table API offers a middle ground. You define sources and sinks with typed Python descriptor objects
(TableDescriptor, Schema, DataTypes) and compose pipelines using relational-style operations like
filter, select, and join. You get IDE support and type checking, and Flink’s optimizer plans the
execution for you. Reach for this when your data is structured, your transformations are relational, and
you want Pythonic code without writing raw SQL.
SQL API is the simplest option. DDL strings define your sources and sinks, DML strings define your pipeline. If the logic maps naturally to SQL, this is the fastest way to get a job running. It’s also the easiest to hand off to someone who knows SQL but isn’t a Python developer.
Both the Table and SQL APIs compile down to the same execution plan under the hood, so the choice between them is largely about ergonomics and team preference.
Kafka to Kafka
In Part 3 we built this pipeline with KafkaSource, KafkaSink, serialization
schemas, and builder chains. Here’s the same job using the SQL API and Table API.
SQL API
import os
from os import path
from pyflink.table import EnvironmentSettings, TableEnvironment
LOCAL_DEBUG = os.getenv('LOCAL_DEBUG', False)
def run():
brokers = "localhost:9092"
env_settings = EnvironmentSettings.in_streaming_mode()
table_env = TableEnvironment.create(env_settings)
table_env.get_config().set("parallelism.default", "1")
if LOCAL_DEBUG:
jar_location = str(path.join(path.dirname(path.abspath(__file__)), "../lib/bin/pyflink-services-1.0.jar"))
table_env.get_config().set("pipeline.jars", f"file:///{jar_location}")
table_env.get_config().set("pipeline.classpaths", f"file:///{jar_location}")
# Kafka source table via SQL DDL
table_env.execute_sql(f"""
CREATE TABLE kafka_source (
`message` STRING
) WITH (
'connector' = 'kafka',
'properties.bootstrap.servers' = '{brokers}',
'topic' = 'input_topic',
'properties.group.id' = 'stream_example',
'scan.startup.mode' = 'earliest-offset',
'format' = 'raw'
)
""")
# Kafka sink table via SQL DDL
table_env.execute_sql(f"""
CREATE TABLE kafka_sink (
`message` STRING
) WITH (
'connector' = 'kafka',
'properties.bootstrap.servers' = '{brokers}',
'topic' = 'output_topic',
'sink.delivery-guarantee' = 'at-least-once',
'format' = 'raw'
)
""")
# Pipeline via SQL DML
table_env.execute_sql("""
INSERT INTO kafka_sink
SELECT * FROM kafka_source
""").wait()
if __name__ == '__main__':
run()
The entire job is three SQL statements: one to create the source table, one to create the sink table, and one to move data between them. No serialization classes, no builder chains, no watermark strategies.
Table API
import os
from os import path
from pyflink.table import (EnvironmentSettings, TableEnvironment, TableDescriptor,
Schema, DataTypes, FormatDescriptor)
LOCAL_DEBUG = os.getenv('LOCAL_DEBUG', False)
def run():
brokers = "localhost:9092"
env_settings = EnvironmentSettings.in_streaming_mode()
table_env = TableEnvironment.create(env_settings)
table_env.get_config().set("parallelism.default", "1")
if LOCAL_DEBUG:
jar_location = str(path.join(path.dirname(path.abspath(__file__)), "../lib/bin/pyflink-services-1.0.jar"))
table_env.get_config().set("pipeline.jars", f"file:///{jar_location}")
table_env.get_config().set("pipeline.classpaths", f"file:///{jar_location}")
# Kafka source table definition
table_env.create_temporary_table(
"kafka_source",
TableDescriptor.for_connector("kafka")
.schema(Schema.new_builder()
.column("message", DataTypes.STRING())
.build())
.option("properties.bootstrap.servers", brokers)
.option("topic", "input_topic")
.option("properties.group.id", "stream_example")
.option("scan.startup.mode", "earliest-offset")
.format(FormatDescriptor.for_format("raw").build())
.build()
)
# Kafka sink table definition
table_env.create_temporary_table(
"kafka_sink",
TableDescriptor.for_connector("kafka")
.schema(Schema.new_builder()
.column("message", DataTypes.STRING())
.build())
.option("properties.bootstrap.servers", brokers)
.option("topic", "output_topic")
.option("sink.delivery-guarantee", "at-least-once")
.format(FormatDescriptor.for_format("raw").build())
.build()
)
# Read from source and insert into sink using the Table API
source_table = table_env.from_path("kafka_source")
source_table.execute_insert("kafka_sink").wait()
if __name__ == '__main__':
run()
The Table API version uses Python objects instead of SQL strings. The table definitions are more verbose than raw SQL, but you get typed column definitions and IDE autocompletion. The pipeline itself is two lines: read from the source table and insert into the sink.
Kafka to Postgres
This is where the difference is most striking. In Part 5 this pipeline
required a custom Java JsonJdbcSink class, Py4J gateway calls, java.util.Properties objects,
typed Java arrays, and _j_data_stream.addSink(). All of that goes away with the Table and SQL APIs
because Flink’s JDBC table connector
works out of the box.
SQL API
import os
from os import path
from pyflink.table import EnvironmentSettings, TableEnvironment
LOCAL_DEBUG = os.getenv('LOCAL_DEBUG', False)
KAFKA_BROKERS = os.getenv('KAFKA_BROKERS', 'kafka0:29092')
POSTGRES_HOST = os.getenv('POSTGRES_HOST', 'postgres_container')
POSTGRES_PORT = os.getenv('POSTGRES_PORT', '5432')
POSTGRES_DB = os.getenv('POSTGRES_DB', 'postgres')
POSTGRES_USER = os.getenv('POSTGRES_USER', 'postgres')
POSTGRES_PASSWORD = os.getenv('POSTGRES_PASSWORD', 'changeme')
def run():
env_settings = EnvironmentSettings.in_streaming_mode()
table_env = TableEnvironment.create(env_settings)
table_env.get_config().set("parallelism.default", "1")
if LOCAL_DEBUG:
jar_location = str(path.join(path.dirname(path.abspath(__file__)), "../lib/bin/pyflink-services-1.0.jar"))
table_env.get_config().set("pipeline.jars", f"file:///{jar_location}")
table_env.get_config().set("pipeline.classpaths", f"file:///{jar_location}")
jdbc_url = f"jdbc:postgresql://{POSTGRES_HOST}:{POSTGRES_PORT}/{POSTGRES_DB}"
# Kafka source table with JSON format via SQL DDL
table_env.execute_sql(f"""
CREATE TABLE kafka_source (
`id` STRING,
`kind` STRING,
`value` STRING,
`timestamp` STRING
) WITH (
'connector' = 'kafka',
'properties.bootstrap.servers' = '{KAFKA_BROKERS}',
'topic' = 'input_topic',
'properties.group.id' = 'kafka_postgres_group',
'scan.startup.mode' = 'latest-offset',
'format' = 'json',
'json.fail-on-missing-field' = 'false'
)
""")
# JDBC/Postgres sink table with upsert via primary key
table_env.execute_sql(f"""
CREATE TABLE postgres_sink (
`id` STRING,
`kind` STRING,
`value` STRING,
`timestamp` STRING,
PRIMARY KEY (`id`, `timestamp`) NOT ENFORCED
) WITH (
'connector' = 'jdbc',
'url' = '{jdbc_url}',
'table-name' = 'sensor_readings',
'driver' = 'org.postgresql.Driver',
'username' = '{POSTGRES_USER}',
'password' = '{POSTGRES_PASSWORD}'
)
""")
# Pipeline via SQL DML with null filtering
table_env.execute_sql("""
INSERT INTO postgres_sink
SELECT * FROM kafka_source
WHERE `id` IS NOT NULL
""").wait()
if __name__ == '__main__':
run()
The JDBC connector is defined inline with the sink DDL. Upsert behavior comes from declaring a
PRIMARY KEY on the table. No custom Java code, no escape hatch, no get_gateway() calls.
Compare this to the DataStream version in Part 5, which needed a full
Java class (JsonJdbcSink) to accomplish the same thing.
Table API
import os
from os import path
from pyflink.table import (EnvironmentSettings, TableEnvironment, TableDescriptor,
Schema, DataTypes, FormatDescriptor)
LOCAL_DEBUG = os.getenv('LOCAL_DEBUG', False)
KAFKA_BROKERS = os.getenv('KAFKA_BROKERS', 'kafka0:29092')
POSTGRES_HOST = os.getenv('POSTGRES_HOST', 'postgres_container')
POSTGRES_PORT = os.getenv('POSTGRES_PORT', '5432')
POSTGRES_DB = os.getenv('POSTGRES_DB', 'postgres')
POSTGRES_USER = os.getenv('POSTGRES_USER', 'postgres')
POSTGRES_PASSWORD = os.getenv('POSTGRES_PASSWORD', 'changeme')
def run():
env_settings = EnvironmentSettings.in_streaming_mode()
table_env = TableEnvironment.create(env_settings)
table_env.get_config().set("parallelism.default", "1")
if LOCAL_DEBUG:
jar_location = str(path.join(path.dirname(path.abspath(__file__)), "../lib/bin/pyflink-services-1.0.jar"))
table_env.get_config().set("pipeline.jars", f"file:///{jar_location}")
table_env.get_config().set("pipeline.classpaths", f"file:///{jar_location}")
# Kafka source table with JSON format matching sensor_readings schema
table_env.create_temporary_table(
"kafka_source",
TableDescriptor.for_connector("kafka")
.schema(Schema.new_builder()
.column("id", DataTypes.STRING())
.column("kind", DataTypes.STRING())
.column("value", DataTypes.STRING())
.column("timestamp", DataTypes.STRING())
.build())
.option("properties.bootstrap.servers", KAFKA_BROKERS)
.option("topic", "input_topic")
.option("properties.group.id", "kafka_postgres_group")
.option("scan.startup.mode", "latest-offset")
.format(FormatDescriptor.for_format("json")
.option("fail-on-missing-field", "false")
.build())
.build()
)
# JDBC/Postgres sink table with upsert support via primary key
jdbc_url = f"jdbc:postgresql://{POSTGRES_HOST}:{POSTGRES_PORT}/{POSTGRES_DB}"
table_env.create_temporary_table(
"postgres_sink",
TableDescriptor.for_connector("jdbc")
.schema(Schema.new_builder()
.column("id", DataTypes.STRING())
.column("kind", DataTypes.STRING())
.column("value", DataTypes.STRING())
.column("timestamp", DataTypes.STRING())
.primary_key("id", "timestamp")
.build())
.option("url", jdbc_url)
.option("table-name", "sensor_readings")
.option("driver", "org.postgresql.Driver")
.option("username", POSTGRES_USER)
.option("password", POSTGRES_PASSWORD)
.build()
)
# Read from Kafka, filter nulls, and upsert into Postgres
source_table = table_env.from_path("kafka_source")
filtered = source_table.filter(source_table.id.is_not_null)
filtered.execute_insert("postgres_sink").wait()
if __name__ == '__main__':
run()
Same idea, but with Python descriptor objects. Note the primary_key("id", "timestamp") call
on the schema builder, which enables upsert mode automatically. The filter uses a typed column
reference (source_table.id.is_not_null) instead of a SQL WHERE clause.
Running Locally
The infrastructure is identical to Parts 3 and 5. The same docker-compose stack provides Kafka
and Postgres, and the same make services target starts everything up.
Start the services:
make services
Submit any of the jobs. For example, the SQL API Kafka-to-Postgres job:
make run app=jobs/sql_app_kafka_postgres
Or the Table API Kafka-to-Kafka job:
make run app=jobs/tbl_app_kafka_kafka
Generate test data using the same Kafka producer from Part 5:
make generate_kafka loops=3 delay_ms=200
Verify the Postgres results with the same query:
docker compose exec -T postgres psql -U postgres -c "SELECT * FROM sensor_readings ORDER BY id, timestamp;"
For the Kafka-to-Kafka jobs, check the output_topic in the Kafka UI at
http://localhost:8080 as described in Part 3.
Up next: Part 8 (Table & SQL API with Kinesis and DynamoDB)
Series:
- Intro
- Part 1 (Basic Data Stream API Job)
- Part 2 (Basic Data Stream API Job with Custom Operators)
- Part 3 (Data Stream API Job with Local Kafka Source and Sink)
- Part 4 (Data Stream API Job with Local AWS Kinesis Source and Sink)
- Part 5 (Data Stream API Job with Local Kafka Source and JDBC<postgres> Sink)
- Part 6 (Data Stream API Job with Local Kinesis Source and DynamoDB Sink)
- Part 7 (Table & SQL API with Kafka and Postgres)
- Part 8 (Table & SQL API with Kinesis and DynamoDB)