PyFlink Series (Part 4)


Back to: Part 3 (Data Stream API Job with Local Kafka Source and Sink)

Similar to the previous Kafka example, we’re going to need external dependencies to run this one too.

The example project contains artifacts and convenience make commands to help package our dependencies.

Within the lib folder, we have a pom.xml containing several dependencies for all the example jobs in this repository.

Of interest to this particular job is the kinesis connector:

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-kinesis</artifactId>
    <version>4.3.0-1.19</version>
</dependency>

To build the dependencies for running this and other examples, you can simply run: make jar This will use a Maven+Java Docker container to fetch all required dependencies and package them into a single “fat” jar. You’ll here this fat jar sometimes called a shadowjar.

Define imports

from os import path, getenv
from typing import Dict

from pyflink.common.serialization import SimpleStringSchema
from pyflink.datastream import (StreamExecutionEnvironment, RuntimeExecutionMode)
from pyflink.datastream.connectors.kinesis import (FlinkKinesisConsumer, KinesisStreamsSink, PartitionKeyGenerator)

Build the DAG

Setting up the DataStream environment. These could be standardized in a production system to be configured in a more modular way.

brokers = "localhost:9092" # Local kafka running on docker-compose
env = StreamExecutionEnvironment.get_execution_environment()
env.set_runtime_mode(RuntimeExecutionMode.STREAMING)
env.set_parallelism(1)

We’ll get into submitting jobs into a real Flink cluster later on, but for running locally we’ll need to tell Flink where our dependencies are. We set the following environment configuration to enable local running/debugging by manually adding the project’s shadow jar (location + classpath).

The shadowjar has all the connectors and additional Java dependencies built into a single jar.

if LOCAL_DEBUG:
    jar_location = str(path.join(path.dirname(path.abspath(__file__)), "../lib/bin/pyflink-services-1.0.jar"))
    env.add_jars(f"file:///{jar_location}")
    env.add_classpaths(f"file:///{jar_location}")

As we start getting more experience building jobs, we find ourselves wanting to introduce patterns that reduce duplication A mature pipeline codebase will probably have several helper functions for sorting our config, and creating sources and sinks.

Below are some simplified examples of moving in that direction.

Helper function for creating kinesis sources with reasonable defaults:

def get_source(stream_name: str, config: Dict = None) -> FlinkKinesisConsumer:
    props = config or {}
    consumer_config = {
        'aws.region': 'us-east-1',
        'aws.credentials.provider.basic.accesskeyid': 'localstack_ignored',
        'aws.credentials.provider.basic.secretkey': 'localstack_ignored',
        'flink.stream.initpos': 'LATEST',
        'aws.endpoint': 'http://localhost:4566',
        **props
    }
    return FlinkKinesisConsumer(stream_name, SimpleStringSchema(), consumer_config)

Helper function for creating kinesis sinks with reasonable defaults:

def get_sink(stream_name: str, config: Dict = None) -> KinesisStreamsSink:
    props = config or {}
    sink_properties = {
        'aws.region': 'us-east-1',
        'aws.credentials.provider.basic.accesskeyid': 'aws_access_key_id',
        'aws.credentials.provider.basic.secretkey': 'aws_secret_access_key',
        'aws.endpoint': 'http://localhost:4566',
        **props
    }

    return (KinesisStreamsSink.builder()
            .set_kinesis_client_properties(sink_properties)
            .set_serialization_schema(SimpleStringSchema())
            .set_partition_key_generator(PartitionKeyGenerator.fixed())
            .set_stream_name(stream_name)
            .build())

Putting it all together, we connect the source into the sink and run our job:

def run():
    env = StreamExecutionEnvironment.get_execution_environment()
    env.set_runtime_mode(RuntimeExecutionMode.STREAMING)
    env.set_parallelism(1)

    # To enable local running/debugging, we manually add the project's shadow jar that has all the connectors built in
    if LOCAL_DEBUG:
        jar_location = str(path.join(path.dirname(path.abspath(__file__)), "../lib/bin/pyflink-services-1.0.jar"))
        env.add_jars(f"file:///{jar_location}")
        env.add_classpaths(f"file:///{jar_location}")

    # Kinesis source definition

    # Build a Datastream from the Kinesis source
    stream = env.add_source(get_source('input_stream'))

    # Kinesis sink definition
    sink = get_sink('output_stream')

    # sink the Datastream from the Kinesis source
    stream.sink_to(sink)

    env.execute("kinesis-2-kinesis")


if __name__ == '__main__':
    run()

We’re not doing anything too fancy here, essentially relaying events from one stream to another.

To test this job out, you’ll need two kinesis streams created (real AWS or localstack), as well as some mechanism to push events into the input_stream, as well as a way to read the relayed events over on the output_stream.

You can use the example project’s docker-compose setup to start a localstack.

To start the local services defined in the docker-compose file, simply run the following command from the console: make services

Once everything is built and running, start the job:

python ./jobs/ds_app_kinesis_kinesis.py

We’ve added some makefile targets for reading and writing data to the input_stream and output_stream kinesis streams, but you’ll need to make sure you have the AWS CLI(v2) installed.

Send an event

make test_put_kinesis testdata="red alert" 

We can see the test_put_kinesis does a few nice things for us:

.PHONY: test_put_kinesis
test_put_kinesis:
	export AWS_ACCESS_KEY_ID="test"
	export AWS_SECRET_ACCESS_KEY="test"
	export AWS_DEFAULT_REGION="us-east-1"
	$(eval DATA = $(shell echo $(testdata) | base64))
	aws kinesis put-record --stream-name input_stream --partition-key 123 --data $(DATA) --endpoint-url http://localhost:4566

Here we are:

  1. exporting aws credentals (localstack ignores)
  2. base64 encoding our sample event
  3. calling PutRecord on the input_stream

Conversely, to read off the stream, we can issue the following make command:

.PHONY: test_get_kinesis
test_get_kinesis:
	$(eval SHARD_ITERATOR = $(shell aws kinesis get-shard-iterator --shard-id shardId-000000000000 --shard-iterator-type TRIM_HORIZON --stream-name output_stream --query 'ShardIterator' --endpoint-url http://localhost:4566))
	$(info ${SHARD_ITERATOR})
	# read the records, use `jq` to grab the data of the first record, and base64 decode it
	aws kinesis get-records --shard-iterator $(SHARD_ITERATOR) --endpoint-url http://localhost:4566 | jq -r '.Records'

To read off a kinesis stream, you’ll need to grab a handle to a shard iterator. Above we’re getting the iterator and then reading from it in a subsequent call.

Congratulations, we’re now running PyFlink Datastream API jobs that are interfacing with external connectors!

Up next: Part 5 (Data Stream API Job with Local Kafka Source and JDBC Sink)