Configuration
We recommend configuring the client with an openlineage.yml
file that contains all the
details of how to connect to your OpenLineage backend.
You can make this file available to the client in three ways (the list also presents precedence of the configuration):
- Set an
OPENLINEAGE_CONFIG
environment variable to a file path:OPENLINEAGE_CONFIG=path/to/openlineage.yml
. - Place an
openlineage.yml
in the user's current working directory. - Place an
openlineage.yml
under.openlineage/
in the user's home directory (~/.openlineage/openlineage.yml
).
Environment Variables
The following environment variables are available:
Name | Description | Since |
---|---|---|
OPENLINEAGE_CONFIG | The path to the YAML configuration file. Example: path/to/openlineage.yml | |
OPENLINEAGE_DISABLED | When true , OpenLineage will not emit events. | 0.9.0 |
Facets Configuration
In YAML configuration file you can also specify a list of disabled facets that will not be included in OpenLineage event.
YAML Configuration
transport:
type: console
facets:
disabled:
- spark_unknown
- spark_logicalPlan
Transports
Tip: See current list of all supported transports.
HTTP
Allows sending events to HTTP endpoint (with optional authorization headers).
- Yaml Config
- Spark Config
- Flink Config
transport:
type: http
url: http://localhost:5000
With authorization:
transport:
type: http
url: http://localhost:5000
endpoint: api/v1/lineage
auth:
type: api_key
api_key: f38d2189-c603-4b46-bdea-e573a3b5a7d5
Override the default configuration of the HttpTransport within Java client
Overriden default configuration of the HttpTransport
You can override the default configuration of the HttpTransport
by specifying the URL and API key when
creating a new client:
OpenLineageClient client = OpenLineageClient.builder()
.transport(
HttpTransport.builder()
.url("http://localhost:5000")
.apiKey("f38d2189-c603-4b46-bdea-e573a3b5a7d5")
.build())
.build();
To configure the client with query params appended on each HTTP request, use:
Map<String, String> queryParamsToAppend = Map.of(
"param0","value0",
"param1", "value1"
);
// Connect to http://localhost:5000
OpenLineageClient client = OpenLineageClient.builder()
.transport(
HttpTransport.builder()
.url("http://localhost:5000", queryParamsToAppend)
.apiKey("f38d2189-c603-4b46-bdea-e573a3b5a7d5")
.build())
.build();
// Define a simple OpenLineage START or COMPLETE event
OpenLineage.RunEvent startOrCompleteRun = ...
// Emit OpenLineage event to http://localhost:5000/api/v1/lineage?param0=value0¶m1=value1
client.emit(startOrCompleteRun);
Parameter | Definition | Example |
---|---|---|
spark.openlineage.transport.endpoint | Path to resource | /api/v1/lineage |
spark.openlineage.transport.auth.type | The type of authentication method to use | api_key |
spark.openlineage.transport.auth.apiKey | An API key to be used when sending events to the OpenLineage server | abcdefghijk |
spark.openlineage.transport.timeoutInMillis | Timeout for sending OpenLineage info in milliseconds | 5000 |
spark.openlineage.transport.urlParams.xyz | A URL parameter (replace xyz) and value to be included in requests to the OpenLineage API server | abcdefghijk |
spark.openlineage.transport.url | The hostname of the OpenLineage API server where events should be reported, it can have other properties embeded | http://localhost:5000 |
spark.openlineage.transport.headers.xyz | Request headers (replace xyz) and value to be included in requests to the OpenLineage API server | abcdefghijk |
URL parsing within Spark integration
URL parsing within Spark integration
You can supply http parameters using values in url, the parsed spark.openlineage.*
properties are located in url as follows:
{transport.url}/{transport.endpoint}/namespaces/{namespace}/jobs/{parentJobName}/runs/{parentRunId}?app_name={appName}&api_key={transport.apiKey}&timeout={transport.timeout}&xxx={transport.urlParams.xxx}
example:
http://localhost:5000/api/v1/namespaces/ns_name/jobs/job_name/runs/xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx?app_name=app&api_key=abc&timeout=5000&xxx=xxx
Parameter | Definition | Example |
---|---|---|
openlineage.transport.endpoint | Path to resource | /api/v1/lineage |
openlineage.transport.auth.type | The type of authentication method to use | api_key |
openlineage.transport.auth.apiKey | An API key to be used when sending events to the OpenLineage server | abcdefghijk |
openlineage.transport.timeout | Timeout for sending OpenLineage info in milliseconds | 5000 |
openlineage.transport.urlParams.xyz | A URL parameter (replace xyz) and value to be included in requests to the OpenLineage API server | abcdefghijk |
openlineage.transport.url | The hostname of the OpenLineage API server where events should be reported, it can have other properties embeded | http://localhost:5000 |
openlineage.transport.headers.xyz | Request headers (replace xyz) and value to be included in requests to the OpenLineage API server | abcdefghijk |
Kafka
If a transport type is set to kafka
, then the below parameters would be read and used when building KafkaProducer.
This transport requires the artifact org.apache.kafka:kafka-clients:3.1.0
(or compatible) on your classpath.
- Yaml Config
- Spark Config
- Flink Config
transport:
type: kafka
topicName: openlineage.events
# Kafka properties (see: http://kafka.apache.org/0100/documentation.html#producerconfigs)
properties:
bootstrap.servers: localhost:9092,another.host:9092
acks: all
retries: 3
key.serializer: org.apache.kafka.common.serialization.StringSerializer
value.serializer: org.apache.kafka.common.serialization.StringSerializer
Parameter | Definition | Example |
---|---|---|
spark.openlineage.transport.topicName | Required, name of the topic | topic-name |
spark.openlineage.transport.localServerId | Required, id of local server | xxxxxxxx |
spark.openlineage.transport.properties.[xxx] | Optional, the [xxx] is property of Kafka client | 1 |
Parameter | Definition | Example |
---|---|---|
openlineage.transport.topicName | Required, name of the topic | topic-name |
openlineage.transport.localServerId | Required, id of local server | xxxxxxxx |
openlineage.transport.properties.[xxx] | Optional, the [xxx] is property of Kafka client | 1 |
Kinesis
If a transport type is set to kinesis
, then the below parameters would be read and used when building KinesisProducer.
Also, KinesisTransport depends on you to provide artifact com.amazonaws:amazon-kinesis-producer:0.14.0
or compatible on your classpath.
- Yaml Config
- Spark Config
- Flink Config
transport:
type: kinesis
streamName: your_kinesis_stream_name
topicName: openlineage.events
region: your_aws_region
roleArn: arn:aws:iam::account-id:role/role-name # optional
properties: # Refer to amazon-kinesis-producer's default configuration.md for the available properties
property_name_1: value_1
property_name_2: value_2
Parameter | Definition | Example |
---|---|---|
spark.openlineage.transport.streamName | Required, the streamName of the Kinesis Stream | some-stream-name |
spark.openlineage.transport.region | Required, the region of the stream | us-east-2 |
spark.openlineage.transport.roleArn | Optional, the roleArn which is allowed to read/write to Kinesis stream | some-role-arn |
spark.openlineage.transport.properties.[xxx] | Optional, the [xxx] is property of Kinesis allowd properties | 1 |
Parameter | Definition | Example |
---|---|---|
openlineage.transport.streamName | Required, the streamName of the Kinesis Stream | some-stream-name |
openlineage.transport.region | Required, the region of the stream | us-east-2 |
openlineage.transport.roleArn | Optional, the roleArn which is allowed to read/write to Kinesis stream | some-role-arn |
openlineage.transport.properties.[xxx] | Optional, the [xxx] is property of Kinesis allowd properties | 1 |
Behavior:
- Events are serialized to JSON upon the
emit()
call and dispatched to the Kinesis stream. - The partition key is generated by combining the job's namespace and name.
- Two constructors are available: one accepting both
KinesisProducer
andKinesisConfig
and another solely acceptingKinesisConfig
.
Console
This straightforward transport emits OpenLineage events directly to the console through a logger.
Be cautious when using the DEBUG log level, as it might result in double-logging due to the OpenLineageClient
also logging.
No additional configuration is required.
- Yaml Config
- Spark Config
- Flink Config
transport:
type: console
File
Designed mainly for integration testing, the FileTransport
appends OpenLineage events to a given file.
Events are newline-separated, with all pre-existing newline characters within the event JSON removed.
- Yaml Config
- Spark Config
- Flink Config
transport:
type: file
location: /path/to/your/file.txt
Parameter | Definition | Example |
---|---|---|
spark.openlineage.transport.location | File path | /path/to/your/file.txt |
Parameter | Definition | Example |
---|---|---|
openlineage.transport.location | File path | /path/to/your/file.txt |
Notes:
- If the target file is absent, it's created.
- Events are added to the file, separated by newlines.
- Intrinsic newline characters within the event JSON are eliminated to ensure one-line events.
Error Handling via Transport
// Connect to http://localhost:5000
OpenLineageClient client = OpenLineageClient.builder()
.transport(
HttpTransport.builder()
.uri("http://localhost:5000")
.apiKey("f38d2189-c603-4b46-bdea-e573a3b5a7d5")
.build())
.registerErrorHandler(new EmitErrorHandler() {
@Override
public void handleError(Throwable throwable) {
// Handle emit error here
}
}).build();
Defining Your Own Transport
OpenLineageClient client = OpenLineageClient.builder()
.transport(
new MyTransport() {
@Override
public void emit(OpenLineage.RunEvent runEvent) {
// Add emit logic here
}
}).build();
Circuit Breakers
This feature is available in OpenLineage versions >= 1.9.0.
To prevent from over-instrumentation OpenLineage integration provides a circuit breaker mechanism that stops OpenLineage from creating, serializing and sending OpenLineage events.
Simple Memory Circuit Breaker
Simple circuit breaker which is working based only on free memory within JVM. Configuration should
contain free memory threshold limit (percentage). Default value is 20%
. The circuit breaker
will close within first call if free memory is low. circuitCheckIntervalInMillis
parameter is used
to configure a frequency circuit breaker is called. Default value is 1000ms
, when no entry in config.
- Yaml Config
- Spark Config
- Flink Config
circuitBreaker:
type: simpleMemory
memoryThreshold: 20
circuitCheckIntervalInMillis: 1000
Parameter | Definition | Example |
---|---|---|
spark.openlineage.circuitBreaker.type | Circuit breaker type selected | simpleMemory |
spark.openlineage.circuitBreaker.memoryThreshold | Memory threshold | 20 |
spark.openlineage.circuitBreaker.circuitCheckIntervalInMillis | Frequency of checking circuit breaker | 1000 |
Parameter | Definition | Example |
---|---|---|
openlineage.circuitBreaker.type | Circuit breaker type selected | simpleMemory |
openlineage.circuitBreaker.memoryThreshold | Memory threshold | 20 |
openlineage.circuitBreaker.circuitCheckIntervalInMillis | Frequency of checking circuit breaker | 1000 |
Java Runtime Circuit Breaker
More complex version of circuit breaker. The amount of free memory can be low as long as
amount of time spent on Garbage Collection is acceptable. JavaRuntimeCircuitBreaker
closes
when free memory drops below threshold and amount of time spent on garbage collection exceeds
given threshold (10%
by default). The circuit breaker is always open when checked for the first time
as GC threshold is computed since the previous circuit breaker call.
circuitCheckIntervalInMillis
parameter is used
to configure a frequency circuit breaker is called.
Default value is 1000ms
, when no entry in config.
- Yaml Config
- Spark Config
- Flink Config
circuitBreaker:
type: javaRuntime
memoryThreshold: 20
gcCpuThreshold: 10
circuitCheckIntervalInMillis: 1000
Parameter | Definition | Example |
---|---|---|
spark.openlineage.circuitBreaker.type | Circuit breaker type selected | javaRuntime |
spark.openlineage.circuitBreaker.memoryThreshold | Memory threshold | 20 |
spark.openlineage.circuitBreaker.gcCpuThreshold | Garbage Collection CPU threshold | 10 |
spark.openlineage.circuitBreaker.circuitCheckIntervalInMillis | Frequency of checking circuit breaker | 1000 |
Parameter | Definition | Example |
---|---|---|
openlineage.circuitBreaker.type | Circuit breaker type selected | javaRuntime |
openlineage.circuitBreaker.memoryThreshold | Memory threshold | 20 |
openlineage.circuitBreaker.gcCpuThreshold | Garbage Collection CPU threshold | 10 |
openlineage.circuitBreaker.circuitCheckIntervalInMillis | Frequency of checking circuit breaker | 1000 |
Custom Circuit Breaker
List of available circuit breakers can be extended with custom one loaded via ServiceLoader
with own implementation of io.openlineage.client.circuitBreaker.CircuitBreakerBuilder
.