Pipeline spec reference

Reference documentation on how to configure inputs and outputs of pipelines


Introduction

Pipeline configs are json files, with a name ending in .pipeline.json, with a single file per pipeline.

The high level format is as follows:

{
   "id" : "pipeline-lnuYHU", // A unique Id.  Will be assigned if one isn't already present
  "description" : "My first pipeline" // A human readable description
  "name" : "Pipeline One", // The name of the pipeline
  "input" : {
    "type" : "Generic", // The type of the source
    ... // other config here, as defined by the source type
  },
  "output" : {
      "type" : "Generic" // The type of the output
    ... // other config here, as defined by the output type
  },

}

Most of the details of the pipeline are defined by the config for the selected Input and Output

Inputs

AWS S3

Pipeline Type KeyDirectionMaturity
awsS3INPUTBeta

A source that consumes a single file/object from an S3 bucket.

Parameters

The following configuration parameters are available:

ParameterDescriptionRequiredDefault Value
connectionNameThe name of the connection, as registered in Vyne's connection managertrue
bucketThe bucket nametrue
objectKeyThe name of the object in the S3 bucket - generally a file nametrue
targetTypeNameThe name of the type that content from the S3 bucket should be consumed astrue

Example

{
  "input" : {
    "type" : "awsS3",
    "direction" : "INPUT",
    "connectionName" : "my-aws-connection",
    "bucket" : "my-bucket",
    "objectKey" : "customers.csv",
    "targetTypeName" : "com.demo.customers.Customer",
    "endPointOverride" : null,
    "targetType" : "com.demo.customers.Customer"
  }
}

AWS S3 via Sqs

Pipeline Type KeyDirectionMaturity
awsS3INPUTBeta

A source that consumes a stream of S3 events via a preconfigured Sqs queue

Parameters

The following configuration parameters are available:

ParameterDescriptionRequiredDefault Value
connectionThe name of the connection, as registered in Vyne's connection managertrue
targetTypeNameThe name of the type that content from the S3 bucket should be consumed astrue
queueNameThe name of the SQS queuetrue
pollScheduleA cron expression that defines how frequently to check for new messages. Defaults to every secondfalse

Example

{
  "input" : {
    "type" : "awsS3",
    "direction" : "INPUT",
    "connectionName" : "my-aws-connection",
    "bucket" : "my-bucket",
    "objectKey" : "customers.csv",
    "targetTypeName" : "com.demo.customers.Customer",
    "endPointOverride" : null,
    "targetType" : "com.demo.customers.Customer"
  }
}

Polling operation input

Pipeline Type KeyDirectionMaturity
taxiOperationINPUTBeta

Invokes an operation (as defined or published to Vyne), on a periodic basis.

Accepts inputs defined in the configuration, which will be passed to the service on invocation. The result of this operation is published downstream on the pipeline to be transformed to another type, and published to an output.

Parameters

The following configuration parameters are available:

ParameterDescriptionRequiredDefault Value
operationNameThe name of the operation, as defined in the schema. Should be in the format of a fully qualified operation name. See the sample for an exampletrue
pollScheduleA cron expression, defining the frequency this operation should be invoked.true
parameterMapAn optional map of parameters to pass to the operationfalse

Example

{
  "input" : {
    "type" : "taxiOperation",
    "direction" : "INPUT",
    "operationName" : "com.demo.customers.CustomerService@@listCustomers",
    "pollSchedule" : "* * * * * *",
    "parameterMap" : {
      "customerStatus" : "ACTIVE"
    }
  }
}

Kafka topic

Pipeline Type KeyDirectionMaturity
kafkaINPUTBeta

Defines an input that reads from a Kafka topic.

The kafka broker is configured using Vyne's connection manager, along with a topic defined for this pipeline input.

Controlling deserialization (protobuf / avro etc)

Deserialization is controlled using annotations declared on the configured type (targetTypeName).

If not specified, Vyne attempts to read the content as JSON, using a StringDecoder

Parameters

The following configuration parameters are available:

ParameterDescriptionRequiredDefault Value
connectionNameThe name of the connection, as registered in Vyne's connection managertrue
topicThe name of the topic to consume fromtrue
targetTypeNameThe fully qualified name of the type that content should be read as.true

Example

{
  "input" : {
    "type" : "kafka",
    "direction" : "INPUT",
    "connectionName" : "my-kafka-connection",
    "topic" : "customerNotifications",
    "targetTypeName" : "com.demo.CustomerEvent",
    "targetType" : "com.demo.CustomerEvent"
  }
}

Outputs

Cask

Pipeline Type KeyDirectionMaturity
caskOUTPUTBeta

An output that writes directly to a Vyne Cask.

Casks provide a way to store content in a database, and expose an auto-generated RESTful service over the top, with all the correct Vyne operation schemas generated.

You may wish to consider using a Jdbc database output instead.

Parameters

The following configuration parameters are available:

ParameterDescriptionRequiredDefault Value
targetTypeThe type that defines the cask containing this datatrue

Example

{
  "output" : {
    "type" : "cask",
    "direction" : "OUTPUT",
    "targetType" : "com.demo.Customer"
  }
}

Operation output

Pipeline Type KeyDirectionMaturity
taxiOperationOUTPUTBeta

Invokes an operation (as defined or published to Vyne), using the data provided upstream in the pipeline.

If the provided data does not satisfy the contract of the operation, Vyne will use the provided input as the basis of a discovery search, to find additional data.

Parameters

The following configuration parameters are available:

ParameterDescriptionRequiredDefault Value
operationNameThe name of the operation, as defined in the schema. Should be in the format of a fully qualified operation name. See the sample for an exampletrue

Example

{
  "output" : {
    "type" : "taxiOperation",
    "direction" : "OUTPUT",
    "operationName" : "com.demo.customers.CustomerService@@DisableCustomerAccounts"
  }
}

Kafka topic

Pipeline Type KeyDirectionMaturity
kafkaOUTPUTBeta

Defines an output that writes to a Kafka topic.

The kafka broker is configured using Vyne's connection manager, along with a topic defined for this pipeline output.

Controlling serialization (protobuf / avro etc)

Serialization is controlled using annotations declared on the configured type (targetTypeName).

If not specified, Vyne attempts to write the content as JSON, using a StringDecoder

Parameters

The following configuration parameters are available:

ParameterDescriptionRequiredDefault Value
connectionNameThe name of the connection, as registered in Vyne's connection managertrue
topicThe name of the topic to write totrue
targetTypeNameThe fully qualified name of the type that content should be written as.true

Example

{
  "output" : {
    "type" : "kafka",
    "direction" : "OUTPUT",
    "connectionName" : "my-kafka-connection",
    "topic" : "CustomerEvents",
    "targetTypeName" : "com.demo.customers.CustomerEvent",
    "targetType" : "com.demo.customers.CustomerEvent"
  }
}

Database Output

Pipeline Type KeyDirectionMaturity
jdbcOUTPUTBeta

A pipeline output that writes to a database.

The pipeline uses a connection that has been defined using Vyne's connection manager. Most database types are supported, providing they expose a JDBC driver.

Table definition

A table is created, if it doesn't already exist, using the config defined on the target type.

If the table contains a io.vyne.jdbc.Table annotation, then this is used to define the table name. Otherwise, a table name is derived from the name of the type.

Similarly, columns are created for all attributes annotated with a io.vyne.jdbc.Column annotation.

The table creation occurs when the pipeline is first initiated, and run once. Table creation occurs using a CREATE IF NOT EXISTS statement, so if the type has been changed since the table was first created, changes will not be propagated to the database.

Batching inserts

In order to reduce load on the database, inserts are batched in windows of 500ms.

Parameters

The following configuration parameters are available:

ParameterDescriptionRequiredDefault Value
connectionThe name of a connection, configured in Vyne's connection managertrue
targetTypeNameThe fully qualified name of the type which content being pushed to the database should be read astrue

Example

{
  "output" : {
    "type" : "jdbc",
    "direction" : "OUTPUT",
    "connection" : "my-connection",
    "targetTypeName" : "com.demo.Customer",
    "targetType" : "com.demo.Customer",
    "windowDurationMs" : 500
  }
}