ReactoKinesix


Before you start

Please familiarize yourself with how Amazon Kinesis works by looking through its online documentations, in particular its Key Concepts and Limitations.

Getting Started

Download and install the library from Nuget here.

This library enables you to create a client application which consumes and processes records that have been pushed to an Amazon Kinesis stream by taking care of most of the plumbing involved.

To process incoming records, you need to provide an implementation for the IRecordProcessor interface which has the following methods:

Process

Process a batch of records received from the Kinesis Stream. This method returns an instance of ProcessRecordsResult where you can indicate:

  • whether the processing was successful, if an unhandled exception is thrown from this method then this defaults to Status.Failure, and
  • if a checkpoint should be placed on the shard immediatel after the last record in the batch. If the current worker terminates prematurely (due to hardware failure, for instance) then another worker can resume processing of the shard from the last checkpoint
OnMaxRetryExceeded

Last chance to deal with a failing batch of records when the number of retry attempts specified in the configuration has been reached.

For example, you might choose to:

  • push the data in the records to Amazon SQS for processing later
  • send out notification via Amazon SNS
  • ...

To start, you can create a client application by calling the static method ReactoKinesixApp.CreateNew which returns a running instance of IReactoKinesixApp that will start processing records from the stream straight away.

You will notice that the ReactoKinesixApp.CreateNew requires an instance of IRecordProcessorFactory rather than an instance of IRecordProcessor. The rationale for this decision is that it enables

Tracking the state of your client application

To enable us to track the state of your client application (e.g. what shards are we processing and where in the stream of records did we get to (a checkpoint so that we can easily return to where we stopped at at a later time), the client application uses a Amazon DynamoDB table (such as the following) to store the necessary state information for each client application. Example state table

Assigning Worker IDs

Each node (e.g. EC2 instance running the client application) that is processing records from a stream should be given a unique worker ID to identify itself. If you're running your client application within Amazon EC2, then Instance ID is a perfect choice to act as a meaningful worker ID.

F# Example

 1: 
 2: 
 3: 
 4: 
 5: 
 6: 
 7: 
 8: 
 9: 
10: 
11: 
12: 
13: 
14: 
15: 
16: 
17: 
18: 
19: 
20: 
21: 
22: 
23: 
24: 
25: 
26: 
27: 
28: 
29: 
30: 
31: 
32: 
33: 
34: 
35: 
36: 
37: 
38: 
39: 
40: 
41: 
42: 
43: 
44: 
45: 
46: 
47: 
#r "AWSSDK.dll"
#r "ReactoKinesix.dll"

open Amazon
open ReactoKinesix
open ReactoKinesix.Model

let awsKey      = "AKIAI5Y767DTOFBUSYAA"
let awsSecret   = "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"
let region      = RegionEndpoint.USEast1
let appName     = "TestApp"
let streamName  = "TestStream"
let workerId    = "LocalHost"

let act (record : Record) =
    let msg = System.Text.Encoding.UTF8.GetString(record.Data)
    printfn "You got message [%s] : %s" record.SequenceNumber msg  

let processBatch (records : Record[]) = 
    records |> Array.iter act
    { Status = Success; Checkpoint = true }

let maxRetryExceeded (records : Record[]) =
    printfn "Batch [count:%d] failed." records.Length
    
let gen () = 
    { new IRecordProcessor with 
        member this.Process(shardId, records) = processBatch records
        member this.OnMaxRetryExceeded(records, _) = maxRetryExceeded records
        member this.Dispose() = printfn "Processor disposed" }

let factory = 
    { new IRecordProcessorFactory with
        member this.CreateNew _ = gen() }

printfn "Starting client application..."
let app = 
    ReactoKinesixApp.CreateNew
        (awsKey, awsSecret, region, appName, streamName, workerId, factory)

app.OnInitialized.Add(fun _ -> printfn "Client application started")
app.OnBatchProcessed.Add(fun _ -> printfn "Another batch processed...")

printfn "Press any key to quit..."
System.Console.ReadKey() |> ignore

app.Dispose()
namespace Amazon
namespace ReactoKinesix
namespace ReactoKinesix.Model
val awsKey : string

Full name: Getting-started.awsKey
val awsSecret : string

Full name: Getting-started.awsSecret
val region : RegionEndpoint

Full name: Getting-started.region
type RegionEndpoint =
  member DisplayName : string with get, set
  member GetEndpointForService : serviceName:string -> Endpoint
  member GuessEndpointForService : serviceName:string -> Endpoint
  member SystemName : string with get, set
  member ToString : unit -> string
  static val USEast1 : RegionEndpoint
  static val USWest1 : RegionEndpoint
  static val USWest2 : RegionEndpoint
  static val EUWest1 : RegionEndpoint
  static val EUCentral1 : RegionEndpoint
  ...
  nested type Endpoint

Full name: Amazon.RegionEndpoint
field RegionEndpoint.USEast1
val appName : string

Full name: Getting-started.appName
val streamName : string

Full name: Getting-started.streamName
val workerId : string

Full name: Getting-started.workerId
val act : record:Record -> unit

Full name: Getting-started.act
val record : Record
type Record =
  {SequenceNumber: string;
   Data: byte [];
   PartitionKey: string;}
  static member op_Explicit : record:Record -> Record

Full name: ReactoKinesix.Model.Record
val msg : string
namespace System
namespace System.Text
type Encoding =
  member BodyName : string
  member Clone : unit -> obj
  member CodePage : int
  member DecoderFallback : DecoderFallback with get, set
  member EncoderFallback : EncoderFallback with get, set
  member EncodingName : string
  member Equals : value:obj -> bool
  member GetByteCount : chars:char[] -> int + 3 overloads
  member GetBytes : chars:char[] -> byte[] + 5 overloads
  member GetCharCount : bytes:byte[] -> int + 2 overloads
  ...

Full name: System.Text.Encoding
property System.Text.Encoding.UTF8: System.Text.Encoding
System.Text.Encoding.GetString(bytes: byte []) : string
System.Text.Encoding.GetString(bytes: byte [], index: int, count: int) : string
Record.Data: byte []
val printfn : format:Printf.TextWriterFormat<'T> -> 'T

Full name: Microsoft.FSharp.Core.ExtraTopLevelOperators.printfn
Record.SequenceNumber: string
val processBatch : records:Record [] -> ProcessRecordsResult

Full name: Getting-started.processBatch
val records : Record []
module Array

from Microsoft.FSharp.Collections
val iter : action:('T -> unit) -> array:'T [] -> unit

Full name: Microsoft.FSharp.Collections.Array.iter
type Status =
  | Success
  | Failure of Exception

Full name: ReactoKinesix.Model.Status
union case Status.Success: Status
val maxRetryExceeded : records:Record [] -> unit

Full name: Getting-started.maxRetryExceeded
property System.Array.Length: int
val gen : unit -> IRecordProcessor

Full name: Getting-started.gen
type IRecordProcessor =
  interface
    inherit IDisposable
    abstract member OnMaxRetryExceeded : records:Record [] * errorHandlingMode:ErrorHandlingMode -> unit
    abstract member Process : shardId:string * records:Record [] -> ProcessRecordsResult
  end

Full name: ReactoKinesix.IRecordProcessor
val this : IRecordProcessor
abstract member IRecordProcessor.Process : shardId:string * records:Record [] -> ProcessRecordsResult
val shardId : string
abstract member IRecordProcessor.OnMaxRetryExceeded : records:Record [] * errorHandlingMode:ErrorHandlingMode -> unit
System.IDisposable.Dispose() : unit
val factory : IRecordProcessorFactory

Full name: Getting-started.factory
type IRecordProcessorFactory =
  interface
    abstract member CreateNew : shardId:string -> IRecordProcessor
  end

Full name: ReactoKinesix.IRecordProcessorFactory
val this : IRecordProcessorFactory
abstract member IRecordProcessorFactory.CreateNew : shardId:string -> IRecordProcessor
val app : IReactoKinesixApp

Full name: Getting-started.app
type ReactoKinesixApp =
  interface IDisposable
  interface IReactoKinesixApp
  private new : kinesis:IAmazonKinesis * dynamoDB:IAmazonDynamoDB * cloudWatch:IAmazonCloudWatch * appName:string * streamName:string * workerId:string * processorFactory:IRecordProcessorFactory * config:ReactoKinesixConfig -> ReactoKinesixApp
  override Finalize : unit -> unit
  member private MarkAsClosed : shardId:ShardId -> unit
  member private StopProcessing : shardId:ShardId -> unit
  member private Config : ReactoKinesixConfig
  member private DynamoDB : IAmazonDynamoDB
  member private Kinesis : IAmazonKinesis
  member private MetricsAgent : MetricsAgent
  ...

Full name: ReactoKinesix.ReactoKinesixApp
static member ReactoKinesixApp.CreateNew : kinesis:Kinesis.IAmazonKinesis * dynamoDB:DynamoDBv2.IAmazonDynamoDB * cloudWatch:CloudWatch.IAmazonCloudWatch * appName:string * streamName:string * workerId:string * processorFactory:IRecordProcessorFactory -> IReactoKinesixApp
static member ReactoKinesixApp.CreateNew : awsKey:string * awsSecret:string * region:RegionEndpoint * appName:string * streamName:string * workerId:string * processorFactory:IRecordProcessorFactory -> IReactoKinesixApp
static member ReactoKinesixApp.CreateNew : kinesis:Kinesis.IAmazonKinesis * dynamoDB:DynamoDBv2.IAmazonDynamoDB * cloudWatch:CloudWatch.IAmazonCloudWatch * appName:string * streamName:string * workerId:string * processorFactory:IRecordProcessorFactory * config:ReactoKinesixConfig -> IReactoKinesixApp
static member ReactoKinesixApp.CreateNew : awsKey:string * awsSecret:string * region:RegionEndpoint * appName:string * streamName:string * workerId:string * processorFactory:IRecordProcessorFactory * config:ReactoKinesixConfig -> IReactoKinesixApp
event IReactoKinesixApp.OnInitialized: IEvent<OnInitializedDelegate,System.EventArgs>
member System.IObservable.Add : callback:('T -> unit) -> unit
event IReactoKinesixApp.OnBatchProcessed: IEvent<OnBatchProcessedDelegate,System.EventArgs>
type Console =
  static member BackgroundColor : ConsoleColor with get, set
  static member Beep : unit -> unit + 1 overload
  static member BufferHeight : int with get, set
  static member BufferWidth : int with get, set
  static member CapsLock : bool
  static member Clear : unit -> unit
  static member CursorLeft : int with get, set
  static member CursorSize : int with get, set
  static member CursorTop : int with get, set
  static member CursorVisible : bool with get, set
  ...

Full name: System.Console
System.Console.ReadKey() : System.ConsoleKeyInfo
System.Console.ReadKey(intercept: bool) : System.ConsoleKeyInfo
val ignore : value:'T -> unit

Full name: Microsoft.FSharp.Core.Operators.ignore
Fork me on GitHub