Apache Flink
Apache Flink is one of the most popular stream processing frameworks. Apache Flink jobs run on clusters,
which are composed of two types of nodes: TaskManagers
and JobManagers
. While clusters typically consists of
multiple TaskManagers
, only reason to run multiple JobManagers is high availability. The jobs are submitted
to JobManager
by JobClient
, that compiles user application into dataflow graph which is understandable by JobManager
.
JobManager
then coordinates job execution: it splits the parallel units of a job
to TaskManagers
, manages heartbeats, triggers checkpoints, reacts to failures and much more.
Apache Flink has multiple deployment modes - Session Mode, Application Mode and Per-Job mode. The most popular
are Session Mode and Application Mode. Session Mode consists of a JobManager
managing multiple jobs sharing single
Flink cluster. In this mode, JobClient
is executed on a machine that submits the job to the cluster.
Application Mode is used where cluster is utilized for a single job. In this mode, JobClient
, where the main method runs,
is executed on the JobManager
.
Flink jobs read data from Sources
and write data to Sinks
. In contrast to systems like Apache Spark, Flink jobs can write
data to multiple places - they can have multiple Sinks
.
Getting lineage from Flink
OpenLineage utilizes Flink's JobListener
interface. This interface is used by Flink to notify user of job submission,
successful finish of job, or job failure. Implementations of this interface are executed on JobClient
.
When OpenLineage listener receives information that job was submitted, it extracts Transformations
from job's
ExecutionEnvironment
. The Transformations
represent logical operations in the dataflow graph; they are composed
of both Flink's build-in operators, but also user-provided Sources
, Sinks
and functions. To get the lineage,
OpenLineage integration processes dataflow graph. Currently, OpenLineage is interested only in information contained
in Sources
and Sinks
, as they are the places where Flink interacts with external systems.
After job submission, OpenLineage integration starts actively listening to checkpoints - this gives insight into whether the job runs properly.
Limitations
Currently OpenLineage's Flink integration is limited to getting information from jobs running in Application Mode.
OpenLineage integration extracts lineage only from following Sources
and Sinks
:
Sources | Sinks |
---|---|
KafkaSource | KafkaSink (1) |
FlinkKafkaConsumer | FlinkKafkaProducer |
IcebergFlinkSource | IcebergFlinkSink |
We expect this list to grow as we add support for more connectors.
(1) KafkaSink supports sinks that write to a single topic as well as multi topic sinks. The
limitation for multi topic sink is that: topics need to have the same schema and implementation
of KafkaRecordSerializationSchema
must extend KafkaTopicsDescriptor
.
Methods isFixedTopics
and getFixedTopics
from KafkaTopicsDescriptor
are used to extract multiple topics
from a sink.
Usage
In your job, you need to set up OpenLineageFlinkJobListener
.
For example:
JobListener listener = JobListener listener = OpenLineageFlinkJobListener.builder()
.executionEnvironment(streamExecutionEnvironment)
.build();
streamExecutionEnvironment.registerJobListener(listener);
Also, OpenLineage needs certain parameters to be set in flink-conf.yaml
:
Configuration Key | Description | Expected Value | Default |
---|---|---|---|
execution.attached | This setting needs to be true if OpenLineage is to detect job start and failure | true | false |
OpenLineage jar needs to be present on JobManager
.
When the JobListener
is configured, you need to point the OpenLineage integration where the events should end up.
If you're using Marquez
, simplest way to do that is to set up OPENLINEAGE_URL
environment
variable to Marquez
URL. More advanced settings are in the client documentation..
Configuring Openlineage connector
Flink Openlineage connector utilizes standard Java client for Openlineage and allows all the configuration features present there to be used. The configuration can be passed with:
openlineage.yml
file with a environment propertyOPENLINEAGE_CONFIG
being set and pointing to configuration file. File structure and allowed options are described here.- Standard Flink configuration with the parameters defined below.
Flink Configuration parameters
The following parameters can be specified:
Parameter | Definition | Example |
---|---|---|
openlineage.transport.type | The transport type used for event emit, default type is console | http |
openlineage.facets.disabled | List of facets to disable, enclosed in [] (required from 0.21.x) and separated by ; | [some_facet1;some_facet1] |
Transports
Tip: See current list of all supported transports.
HTTP
Allows sending events to HTTP endpoint (with optional authorization headers).
- Yaml Config
- Spark Config
- Flink Config
transport:
type: http
url: http://localhost:5000
With authorization:
transport:
type: http
url: http://localhost:5000
endpoint: api/v1/lineage
auth:
type: api_key
api_key: f38d2189-c603-4b46-bdea-e573a3b5a7d5
Override the default configuration of the HttpTransport within Java client
Overriden default configuration of the HttpTransport
You can override the default configuration of the HttpTransport
by specifying the URL and API key when
creating a new client:
OpenLineageClient client = OpenLineageClient.builder()
.transport(
HttpTransport.builder()
.url("http://localhost:5000")
.apiKey("f38d2189-c603-4b46-bdea-e573a3b5a7d5")
.build())
.build();
To configure the client with query params appended on each HTTP request, use:
Map<String, String> queryParamsToAppend = Map.of(
"param0","value0",
"param1", "value1"
);
// Connect to http://localhost:5000
OpenLineageClient client = OpenLineageClient.builder()
.transport(
HttpTransport.builder()
.url("http://localhost:5000", queryParamsToAppend)
.apiKey("f38d2189-c603-4b46-bdea-e573a3b5a7d5")
.build())
.build();
// Define a simple OpenLineage START or COMPLETE event
OpenLineage.RunEvent startOrCompleteRun = ...
// Emit OpenLineage event to http://localhost:5000/api/v1/lineage?param0=value0¶m1=value1
client.emit(startOrCompleteRun);
Parameter | Definition | Example |
---|---|---|
spark.openlineage.transport.endpoint | Path to resource | /api/v1/lineage |
spark.openlineage.transport.auth.type | The type of authentication method to use | api_key |
spark.openlineage.transport.auth.apiKey | An API key to be used when sending events to the OpenLineage server | abcdefghijk |
spark.openlineage.transport.timeoutInMillis | Timeout for sending OpenLineage info in milliseconds | 5000 |
spark.openlineage.transport.urlParams.xyz | A URL parameter (replace xyz) and value to be included in requests to the OpenLineage API server | abcdefghijk |
spark.openlineage.transport.url | The hostname of the OpenLineage API server where events should be reported, it can have other properties embeded | http://localhost:5000 |
spark.openlineage.transport.headers.xyz | Request headers (replace xyz) and value to be included in requests to the OpenLineage API server | abcdefghijk |
URL parsing within Spark integration
URL parsing within Spark integration
You can supply http parameters using values in url, the parsed spark.openlineage.*
properties are located in url as follows:
{transport.url}/{transport.endpoint}/namespaces/{namespace}/jobs/{parentJobName}/runs/{parentRunId}?app_name={appName}&api_key={transport.apiKey}&timeout={transport.timeout}&xxx={transport.urlParams.xxx}
example:
http://localhost:5000/api/v1/namespaces/ns_name/jobs/job_name/runs/xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx?app_name=app&api_key=abc&timeout=5000&xxx=xxx
Parameter | Definition | Example |
---|---|---|
openlineage.transport.endpoint | Path to resource | /api/v1/lineage |
openlineage.transport.auth.type | The type of authentication method to use | api_key |
openlineage.transport.auth.apiKey | An API key to be used when sending events to the OpenLineage server | abcdefghijk |
openlineage.transport.timeout | Timeout for sending OpenLineage info in milliseconds | 5000 |
openlineage.transport.urlParams.xyz | A URL parameter (replace xyz) and value to be included in requests to the OpenLineage API server | abcdefghijk |
openlineage.transport.url | The hostname of the OpenLineage API server where events should be reported, it can have other properties embeded | http://localhost:5000 |
openlineage.transport.headers.xyz | Request headers (replace xyz) and value to be included in requests to the OpenLineage API server | abcdefghijk |
Kafka
If a transport type is set to kafka
, then the below parameters would be read and used when building KafkaProducer.
This transport requires the artifact org.apache.kafka:kafka-clients:3.1.0
(or compatible) on your classpath.
- Yaml Config
- Spark Config
- Flink Config
transport:
type: kafka
topicName: openlineage.events
# Kafka properties (see: http://kafka.apache.org/0100/documentation.html#producerconfigs)
properties:
bootstrap.servers: localhost:9092,another.host:9092
acks: all
retries: 3
key.serializer: org.apache.kafka.common.serialization.StringSerializer
value.serializer: org.apache.kafka.common.serialization.StringSerializer
Parameter | Definition | Example |
---|---|---|
spark.openlineage.transport.topicName | Required, name of the topic | topic-name |
spark.openlineage.transport.localServerId | Required, id of local server | xxxxxxxx |
spark.openlineage.transport.properties.[xxx] | Optional, the [xxx] is property of Kafka client | 1 |
Parameter | Definition | Example |
---|---|---|
openlineage.transport.topicName | Required, name of the topic | topic-name |
openlineage.transport.localServerId | Required, id of local server | xxxxxxxx |
openlineage.transport.properties.[xxx] | Optional, the [xxx] is property of Kafka client | 1 |
Kinesis
If a transport type is set to kinesis
, then the below parameters would be read and used when building KinesisProducer.
Also, KinesisTransport depends on you to provide artifact com.amazonaws:amazon-kinesis-producer:0.14.0
or compatible on your classpath.
- Yaml Config
- Spark Config
- Flink Config
transport:
type: kinesis
streamName: your_kinesis_stream_name
topicName: openlineage.events
region: your_aws_region
roleArn: arn:aws:iam::account-id:role/role-name # optional
properties: # Refer to amazon-kinesis-producer's default configuration.md for the available properties
property_name_1: value_1
property_name_2: value_2
Parameter | Definition | Example |
---|---|---|
spark.openlineage.transport.streamName | Required, the streamName of the Kinesis Stream | some-stream-name |
spark.openlineage.transport.region | Required, the region of the stream | us-east-2 |
spark.openlineage.transport.roleArn | Optional, the roleArn which is allowed to read/write to Kinesis stream | some-role-arn |
spark.openlineage.transport.properties.[xxx] | Optional, the [xxx] is property of Kinesis allowd properties | 1 |
Parameter | Definition | Example |
---|---|---|
openlineage.transport.streamName | Required, the streamName of the Kinesis Stream | some-stream-name |
openlineage.transport.region | Required, the region of the stream | us-east-2 |
openlineage.transport.roleArn | Optional, the roleArn which is allowed to read/write to Kinesis stream | some-role-arn |
openlineage.transport.properties.[xxx] | Optional, the [xxx] is property of Kinesis allowd properties | 1 |
Behavior:
- Events are serialized to JSON upon the
emit()
call and dispatched to the Kinesis stream. - The partition key is generated by combining the job's namespace and name.
- Two constructors are available: one accepting both
KinesisProducer
andKinesisConfig
and another solely acceptingKinesisConfig
.
Console
This straightforward transport emits OpenLineage events directly to the console through a logger.
Be cautious when using the DEBUG log level, as it might result in double-logging due to the OpenLineageClient
also logging.
No additional configuration is required.
- Yaml Config
- Spark Config
- Flink Config
transport:
type: console
File
Designed mainly for integration testing, the FileTransport
appends OpenLineage events to a given file.
Events are newline-separated, with all pre-existing newline characters within the event JSON removed.
- Yaml Config
- Spark Config
- Flink Config
transport:
type: file
location: /path/to/your/file.txt
Parameter | Definition | Example |
---|---|---|
spark.openlineage.transport.location | File path | /path/to/your/file.txt |
Parameter | Definition | Example |
---|---|---|
openlineage.transport.location | File path | /path/to/your/file.txt |
Notes:
- If the target file is absent, it's created.
- Events are added to the file, separated by newlines.
- Intrinsic newline characters within the event JSON are eliminated to ensure one-line events.
Circuit Breakers
This feature is available in OpenLineage versions >= 1.9.0.
To prevent from over-instrumentation OpenLineage integration provides a circuit breaker mechanism that stops OpenLineage from creating, serializing and sending OpenLineage events.
Simple Memory Circuit Breaker
Simple circuit breaker which is working based only on free memory within JVM. Configuration should
contain free memory threshold limit (percentage). Default value is 20%
. The circuit breaker
will close within first call if free memory is low. circuitCheckIntervalInMillis
parameter is used
to configure a frequency circuit breaker is called. Default value is 1000ms
, when no entry in config.
- Yaml Config
- Spark Config
- Flink Config
circuitBreaker:
type: simpleMemory
memoryThreshold: 20
circuitCheckIntervalInMillis: 1000
Parameter | Definition | Example |
---|---|---|
spark.openlineage.circuitBreaker.type | Circuit breaker type selected | simpleMemory |
spark.openlineage.circuitBreaker.memoryThreshold | Memory threshold | 20 |
spark.openlineage.circuitBreaker.circuitCheckIntervalInMillis | Frequency of checking circuit breaker | 1000 |
Parameter | Definition | Example |
---|---|---|
openlineage.circuitBreaker.type | Circuit breaker type selected | simpleMemory |
openlineage.circuitBreaker.memoryThreshold | Memory threshold | 20 |
openlineage.circuitBreaker.circuitCheckIntervalInMillis | Frequency of checking circuit breaker | 1000 |
Java Runtime Circuit Breaker
More complex version of circuit breaker. The amount of free memory can be low as long as
amount of time spent on Garbage Collection is acceptable. JavaRuntimeCircuitBreaker
closes
when free memory drops below threshold and amount of time spent on garbage collection exceeds
given threshold (10%
by default). The circuit breaker is always open when checked for the first time
as GC threshold is computed since the previous circuit breaker call.
circuitCheckIntervalInMillis
parameter is used
to configure a frequency circuit breaker is called.
Default value is 1000ms
, when no entry in config.
- Yaml Config
- Spark Config
- Flink Config
circuitBreaker:
type: javaRuntime
memoryThreshold: 20
gcCpuThreshold: 10
circuitCheckIntervalInMillis: 1000
Parameter | Definition | Example |
---|---|---|
spark.openlineage.circuitBreaker.type | Circuit breaker type selected | javaRuntime |
spark.openlineage.circuitBreaker.memoryThreshold | Memory threshold | 20 |
spark.openlineage.circuitBreaker.gcCpuThreshold | Garbage Collection CPU threshold | 10 |
spark.openlineage.circuitBreaker.circuitCheckIntervalInMillis | Frequency of checking circuit breaker | 1000 |
Parameter | Definition | Example |
---|---|---|
openlineage.circuitBreaker.type | Circuit breaker type selected | javaRuntime |
openlineage.circuitBreaker.memoryThreshold | Memory threshold | 20 |
openlineage.circuitBreaker.gcCpuThreshold | Garbage Collection CPU threshold | 10 |
openlineage.circuitBreaker.circuitCheckIntervalInMillis | Frequency of checking circuit breaker | 1000 |
Custom Circuit Breaker
List of available circuit breakers can be extended with custom one loaded via ServiceLoader
with own implementation of io.openlineage.client.circuitBreaker.CircuitBreakerBuilder
.