Pipeline spec reference
Reference documentation on how to configure inputs and outputs of pipelines
Introduction
Pipeline configs are json files, with a name ending in .pipeline.json
, with a single file per pipeline.
The high level format is as follows:
{
"id" : "pipeline-lnuYHU", // A unique Id. Will be assigned if one isn't already present
"description" : "My first pipeline" // A human readable description
"name" : "Pipeline One", // The name of the pipeline
"input" : {
"type" : "Generic", // The type of the source
... // other config here, as defined by the source type
},
"output" : {
"type" : "Generic" // The type of the output
... // other config here, as defined by the output type
},
}
Most of the details of the pipeline are defined by the config for the selected Input and Output
Inputs
AWS S3
Pipeline Type Key | Direction | Maturity |
---|---|---|
awsS3 | INPUT | Beta |
A source that consumes a single file/object from an S3 bucket.
Parameters
The following configuration parameters are available:
Parameter | Description | Required | Default Value |
---|---|---|---|
connectionName | The name of the connection, as registered in Vyne's connection manager | true | |
bucket | The bucket name | true | |
objectKey | The name of the object in the S3 bucket - generally a file name | true | |
targetTypeName | The name of the type that content from the S3 bucket should be consumed as | true |
Example
{
"input" : {
"type" : "awsS3",
"direction" : "INPUT",
"connectionName" : "my-aws-connection",
"bucket" : "my-bucket",
"objectKey" : "customers.csv",
"targetTypeName" : "com.demo.customers.Customer",
"endPointOverride" : null,
"targetType" : "com.demo.customers.Customer"
}
}
AWS S3 via Sqs
Pipeline Type Key | Direction | Maturity |
---|---|---|
awsS3 | INPUT | Beta |
A source that consumes a stream of S3 events via a preconfigured Sqs queue
Parameters
The following configuration parameters are available:
Parameter | Description | Required | Default Value |
---|---|---|---|
connection | The name of the connection, as registered in Vyne's connection manager | true | |
targetTypeName | The name of the type that content from the S3 bucket should be consumed as | true | |
queueName | The name of the SQS queue | true | |
pollSchedule | A cron expression that defines how frequently to check for new messages. Defaults to every second | false |
Example
{
"input" : {
"type" : "awsS3",
"direction" : "INPUT",
"connectionName" : "my-aws-connection",
"bucket" : "my-bucket",
"objectKey" : "customers.csv",
"targetTypeName" : "com.demo.customers.Customer",
"endPointOverride" : null,
"targetType" : "com.demo.customers.Customer"
}
}
Polling operation input
Pipeline Type Key | Direction | Maturity |
---|---|---|
taxiOperation | INPUT | Beta |
Invokes an operation (as defined or published to Vyne), on a periodic basis.
Accepts inputs defined in the configuration, which will be passed to the service on invocation. The result of this operation is published downstream on the pipeline to be transformed to another type, and published to an output.
Parameters
The following configuration parameters are available:
Parameter | Description | Required | Default Value |
---|---|---|---|
operationName | The name of the operation, as defined in the schema. Should be in the format of a fully qualified operation name. See the sample for an example | true | |
pollSchedule | A cron expression, defining the frequency this operation should be invoked. | true | |
parameterMap | An optional map of parameters to pass to the operation | false |
Example
{
"input" : {
"type" : "taxiOperation",
"direction" : "INPUT",
"operationName" : "com.demo.customers.CustomerService@@listCustomers",
"pollSchedule" : "* * * * * *",
"parameterMap" : {
"customerStatus" : "ACTIVE"
}
}
}
Kafka topic
Pipeline Type Key | Direction | Maturity |
---|---|---|
kafka | INPUT | Beta |
Defines an input that reads from a Kafka topic.
The kafka broker is configured using Vyne's connection manager, along with a topic defined for this pipeline input.
Controlling deserialization (protobuf / avro etc)
Deserialization is controlled using annotations declared on the configured type (targetTypeName
).
If not specified, Vyne attempts to read the content as JSON, using a StringDecoder
Parameters
The following configuration parameters are available:
Parameter | Description | Required | Default Value |
---|---|---|---|
connectionName | The name of the connection, as registered in Vyne's connection manager | true | |
topic | The name of the topic to consume from | true | |
targetTypeName | The fully qualified name of the type that content should be read as. | true |
Example
{
"input" : {
"type" : "kafka",
"direction" : "INPUT",
"connectionName" : "my-kafka-connection",
"topic" : "customerNotifications",
"targetTypeName" : "com.demo.CustomerEvent",
"targetType" : "com.demo.CustomerEvent"
}
}
Outputs
Cask
Pipeline Type Key | Direction | Maturity |
---|---|---|
cask | OUTPUT | Beta |
An output that writes directly to a Vyne Cask.
Casks provide a way to store content in a database, and expose an auto-generated RESTful service over the top, with all the correct Vyne operation schemas generated.
You may wish to consider using a Jdbc database output instead.
Parameters
The following configuration parameters are available:
Parameter | Description | Required | Default Value |
---|---|---|---|
targetType | The type that defines the cask containing this data | true |
Example
{
"output" : {
"type" : "cask",
"direction" : "OUTPUT",
"targetType" : "com.demo.Customer"
}
}
Operation output
Pipeline Type Key | Direction | Maturity |
---|---|---|
taxiOperation | OUTPUT | Beta |
Invokes an operation (as defined or published to Vyne), using the data provided upstream in the pipeline.
If the provided data does not satisfy the contract of the operation, Vyne will use the provided input as the basis of a discovery search, to find additional data.
Parameters
The following configuration parameters are available:
Parameter | Description | Required | Default Value |
---|---|---|---|
operationName | The name of the operation, as defined in the schema. Should be in the format of a fully qualified operation name. See the sample for an example | true |
Example
{
"output" : {
"type" : "taxiOperation",
"direction" : "OUTPUT",
"operationName" : "com.demo.customers.CustomerService@@DisableCustomerAccounts"
}
}
Kafka topic
Pipeline Type Key | Direction | Maturity |
---|---|---|
kafka | OUTPUT | Beta |
Defines an output that writes to a Kafka topic.
The kafka broker is configured using Vyne's connection manager, along with a topic defined for this pipeline output.
Controlling serialization (protobuf / avro etc)
Serialization is controlled using annotations declared on the configured type (targetTypeName
).
If not specified, Vyne attempts to write the content as JSON, using a StringDecoder
Parameters
The following configuration parameters are available:
Parameter | Description | Required | Default Value |
---|---|---|---|
connectionName | The name of the connection, as registered in Vyne's connection manager | true | |
topic | The name of the topic to write to | true | |
targetTypeName | The fully qualified name of the type that content should be written as. | true |
Example
{
"output" : {
"type" : "kafka",
"direction" : "OUTPUT",
"connectionName" : "my-kafka-connection",
"topic" : "CustomerEvents",
"targetTypeName" : "com.demo.customers.CustomerEvent",
"targetType" : "com.demo.customers.CustomerEvent"
}
}
Database Output
Pipeline Type Key | Direction | Maturity |
---|---|---|
jdbc | OUTPUT | Beta |
A pipeline output that writes to a database.
The pipeline uses a connection that has been defined using Vyne's connection manager. Most database types are supported, providing they expose a JDBC driver.
Table definition
A table is created, if it doesn't already exist, using the config defined on the target type.
If the table contains a io.vyne.jdbc.Table
annotation, then this is used
to define the table name. Otherwise, a table name is derived from the
name of the type.
Similarly, columns are created for all attributes annotated with a io.vyne.jdbc.Column
annotation.
The table creation occurs when the pipeline is first initiated, and run once.
Table creation occurs using a CREATE IF NOT EXISTS
statement, so if the type
has been changed since the table was first created, changes will not be propagated
to the database.
Batching inserts
In order to reduce load on the database, inserts are batched in windows of 500ms.
Parameters
The following configuration parameters are available:
Parameter | Description | Required | Default Value |
---|---|---|---|
connection | The name of a connection, configured in Vyne's connection manager | true | |
targetTypeName | The fully qualified name of the type which content being pushed to the database should be read as | true |
Example
{
"output" : {
"type" : "jdbc",
"direction" : "OUTPUT",
"connection" : "my-connection",
"targetTypeName" : "com.demo.Customer",
"targetType" : "com.demo.Customer",
"windowDurationMs" : 500
}
}