/

Kafka Publication

Publishing data to Cask during Cask Ingestion


If you wish you can get Cask to write the data it ingests into Kafka. By default, Cask does not enable this feature, so you need to make two changes to get Cask to publish ingested messages to Kafka:

  1. Update the ingested taxi model by annotating it with @ObserveChanges

Example:

given an existing model:

 model Order {
   id: OrderId
 }

as

 @ObserveChanges(writeToConnectionName = "Orders")
 model Order {
    id: OrderId
 }

@ObserveChanges annotation has single named argument - writeToConnectionName. You will need to use the value of it to specify the relevant kafka configuration details that you'll define in step 2.

  1. Add the required configuration into the corresponding cask application.yaml / properties file to define the kafka connection details for the 'writeToConnectionName' value that you declared in step 1.

in this particular case lets assume you define a Cask configuration yaml - application-kafka-publish.yaml

vyne:
   connections:
      kafka:
         - connection-name: Orders
           bootstrap-servers: prod1:9022,prod2:9022,prod3:9022
           topic: Orders

in the above yaml, we have defined the kafka configuration settings for the 'writeToConnectionName' value, i.e. 'Orders', and hence 'connection-name' value matches the value of 'writeToConnectionName' There is no limit on the number of ingestion models that you can annotate with @ObserveChanges, but don't forget to add the corresponding kafka connection configuration details into the cask configuration file.

Here is a configuration example for two ingestion models annotated with @ObserveChanges:

vyne:
   connections:
      kafka:
         - connection-name: Customers
           bootstrap-servers: localhost:9022
           topic: customer
         - connection-name: Orders
           bootstrap-servers: prod1:9022,prod2:9022,prod3:9022
           topic: retailorders

Please note that what is written onto Kafka depends on the availability of @PrimaryKey annotation in the ingestion model.

  • Publication Content When the @PrimaryKey attribute is not available in the ingestion model

Assume we are using the following ingestion model:

      @ObserveChanges(writeToConnectionName = "OrderWindowSummary")
      model OrderWindowSummaryCsv {
         orderDate : Date(@format = "dd/MM/yyyy") by column(1)
         symbol : Symbol by column(2)
         open : Price by column(3)
         close : Price by column(4)
      }

When we upload the following csv contents to Cask:

Date,Symbol,Open,High,Low,Close
19/03/2019,BTCUSD,6300,6330,6186.08,6235.2

Cask will publish the following json message to specified Kafka topic:

{
  "ids": {
    "cask_raw_id": "6cf5edf4-da56-47a7-bc4d-b06082234e61"
  },
  "current": {
    "orderDate": "19/03/2019",
    "symbol": "BTCUSD",
    "open": 6300,
    "close": "6330"
  },
  "old": null
}

This message has three fields where 'ids' contains the list of unique identifier values associated with the ingested entry, 'current' contains the data that is ingested and 'old' contains the previous data associated with the previous version of the ingested message. For models without @PrimaryKey annotation 'old' attribute is always null as Cask has no notation of 'the previous value' for an ingested message.

  • Publication Content When the @PrimaryKey attribute is available in the ingestion model

Let's consider the following model where we annotated 'symbol' field with @PrimaryKey annotation:

@ObserveChanges(writeToConnectionName = "OrderWindowSummary")
model OrderWindowSummaryCsv {
         orderDate : Date(@format = "dd/MM/yyyy") by column(1)
         @PrimaryKey
         symbol : Symbol by column(2)
         open : Price by column(3)
         close : Price by column(4)
}

and Let's assume we've upload the following CSV file for the very first time (i.e. there is no cask for OrderWindowSummaryCsv yet.)

Date,Symbol,Open,High,Low,Close
19/03/2019,BTCUSD,6300,6330,6186.08,6235.2
19/03/2019,ETHUSD,6300,6330,6186.08,6235.2
20/03/2019,BTCUSD,6301,6331,6186.08,6235.2
20/03/2019,ETHUSD,6200,6230,6186.08,6235.2

If you check the CSV contents carefully, you'll see that there are two distinct entries as there are two entries for the 2 distinct 'symbol' values 'BTCUSD' and 'ETHUSD' correspondingly.

During the ingestion, Cask will write the following json messages to Kafka:

First Message:

{
  "ids": {
    "symbol": "'BTCUSD'"
  },
  "current": {
    "orderDate": "19/03/2019",
    "symbol": "BTCUSD",
    "open": 6300,
    "close": 6330
  },
  "old": {
    "close": null,
    "open": null,
    "orderDate": null,
    "symbol": null
  }
}

Second Message:

{
  "ids": {
    "symbol": "'ETHUSD'"
  },
  "current": {
    "orderDate": "19/03/2019",
    "symbol": "ETHUSD",
    "open": 6300,
    "close": 6330
  },
  "old": {
    "close": null,
    "open": null,
    "orderDate": null,
    "symbol": null
  }
}

Third Message:

{
  "ids": {
    "symbol": "'BTCUSD'"
  },
  "current": {
    "orderDate": "20/03/2019",
    "symbol": "BTCUSD",
    "open": 6301,
    "close": 6331
  },
  "old": {
    "close": 6330,
    "open": 6300,
    "orderDate": "19/03/2019",
    "symbol": "BTCUSD"
  }
}

Fourth Message:

{
  "ids": {
    "symbol": "'ETHUSD'"
  },
  "current": {
    "orderDate": "20/03/2019",
    "symbol": "ETHUSD",
    "open": 6200,
    "close": 6230
  },
  "old": {
    "close": 6330,
    "open": 6300,
    "orderDate": "19/03/2019",
    "symbol": "ETHUSD"
  }
}