Before you start
Please familiarize yourself with how Amazon Kinesis works by looking through its online documentations, in particular its Key Concepts and Limitations.
Getting Started
Download and install the library from Nuget here.
This library enables you to create a client application which consumes and processes records that have been pushed to an Amazon Kinesis stream by taking care of most of the plumbing involved.
To process incoming records, you need to provide an implementation for the IRecordProcessor
interface which has the following methods:
Process | Process a batch of records received from the Kinesis Stream. This method returns an instance of ProcessRecordsResult where you can indicate:
|
OnMaxRetryExceeded | Last chance to deal with a failing batch of records when the number of retry attempts specified in the configuration has been reached. For example, you might choose to:
|
To start, you can create a client application by calling the static method ReactoKinesixApp.CreateNew
which returns a running instance of IReactoKinesixApp
that will start processing records from the stream straight away.
You will notice that the ReactoKinesixApp.CreateNew
requires an instance of IRecordProcessorFactory
rather than an instance of IRecordProcessor
. The rationale for this decision is that it enables
Tracking the state of your client application
To enable us to track the state of your client application (e.g. what shards are we processing and where in the stream of records did we get to (a checkpoint so that we can easily return to where we stopped at at a later time), the client application uses a Amazon DynamoDB table (such as the following) to store the necessary state information for each client application.
Assigning Worker IDs
Each node (e.g. EC2 instance running the client application) that is processing records from a stream should be given a unique worker ID to identify itself. If you're running your client application within Amazon EC2, then Instance ID is a perfect choice to act as a meaningful worker ID.
F# Example
1: 2: 3: 4: 5: 6: 7: 8: 9: 10: 11: 12: 13: 14: 15: 16: 17: 18: 19: 20: 21: 22: 23: 24: 25: 26: 27: 28: 29: 30: 31: 32: 33: 34: 35: 36: 37: 38: 39: 40: 41: 42: 43: 44: 45: 46: 47: |
#r "AWSSDK.dll" #r "ReactoKinesix.dll" open Amazon open ReactoKinesix open ReactoKinesix.Model let awsKey = "AKIAI5Y767DTOFBUSYAA" let awsSecret = "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" let region = RegionEndpoint.USEast1 let appName = "TestApp" let streamName = "TestStream" let workerId = "LocalHost" let act (record : Record) = let msg = System.Text.Encoding.UTF8.GetString(record.Data) printfn "You got message [%s] : %s" record.SequenceNumber msg let processBatch (records : Record[]) = records |> Array.iter act { Status = Success; Checkpoint = true } let maxRetryExceeded (records : Record[]) = printfn "Batch [count:%d] failed." records.Length let gen () = { new IRecordProcessor with member this.Process(shardId, records) = processBatch records member this.OnMaxRetryExceeded(records, _) = maxRetryExceeded records member this.Dispose() = printfn "Processor disposed" } let factory = { new IRecordProcessorFactory with member this.CreateNew _ = gen() } printfn "Starting client application..." let app = ReactoKinesixApp.CreateNew (awsKey, awsSecret, region, appName, streamName, workerId, factory) app.OnInitialized.Add(fun _ -> printfn "Client application started") app.OnBatchProcessed.Add(fun _ -> printfn "Another batch processed...") printfn "Press any key to quit..." System.Console.ReadKey() |> ignore app.Dispose() |
Full name: Getting-started.awsKey
Full name: Getting-started.awsSecret
Full name: Getting-started.region
member DisplayName : string with get, set
member GetEndpointForService : serviceName:string -> Endpoint
member GuessEndpointForService : serviceName:string -> Endpoint
member SystemName : string with get, set
member ToString : unit -> string
static val USEast1 : RegionEndpoint
static val USWest1 : RegionEndpoint
static val USWest2 : RegionEndpoint
static val EUWest1 : RegionEndpoint
static val EUCentral1 : RegionEndpoint
...
nested type Endpoint
Full name: Amazon.RegionEndpoint
Full name: Getting-started.appName
Full name: Getting-started.streamName
Full name: Getting-started.workerId
Full name: Getting-started.act
{SequenceNumber: string;
Data: byte [];
PartitionKey: string;}
static member op_Explicit : record:Record -> Record
Full name: ReactoKinesix.Model.Record
member BodyName : string
member Clone : unit -> obj
member CodePage : int
member DecoderFallback : DecoderFallback with get, set
member EncoderFallback : EncoderFallback with get, set
member EncodingName : string
member Equals : value:obj -> bool
member GetByteCount : chars:char[] -> int + 3 overloads
member GetBytes : chars:char[] -> byte[] + 5 overloads
member GetCharCount : bytes:byte[] -> int + 2 overloads
...
Full name: System.Text.Encoding
System.Text.Encoding.GetString(bytes: byte [], index: int, count: int) : string
Full name: Microsoft.FSharp.Core.ExtraTopLevelOperators.printfn
Full name: Getting-started.processBatch
from Microsoft.FSharp.Collections
Full name: Microsoft.FSharp.Collections.Array.iter
| Success
| Failure of Exception
Full name: ReactoKinesix.Model.Status
Full name: Getting-started.maxRetryExceeded
Full name: Getting-started.gen
interface
inherit IDisposable
abstract member OnMaxRetryExceeded : records:Record [] * errorHandlingMode:ErrorHandlingMode -> unit
abstract member Process : shardId:string * records:Record [] -> ProcessRecordsResult
end
Full name: ReactoKinesix.IRecordProcessor
Full name: Getting-started.factory
interface
abstract member CreateNew : shardId:string -> IRecordProcessor
end
Full name: ReactoKinesix.IRecordProcessorFactory
Full name: Getting-started.app
interface IDisposable
interface IReactoKinesixApp
private new : kinesis:IAmazonKinesis * dynamoDB:IAmazonDynamoDB * cloudWatch:IAmazonCloudWatch * appName:string * streamName:string * workerId:string * processorFactory:IRecordProcessorFactory * config:ReactoKinesixConfig -> ReactoKinesixApp
override Finalize : unit -> unit
member private MarkAsClosed : shardId:ShardId -> unit
member private StopProcessing : shardId:ShardId -> unit
member private Config : ReactoKinesixConfig
member private DynamoDB : IAmazonDynamoDB
member private Kinesis : IAmazonKinesis
member private MetricsAgent : MetricsAgent
...
Full name: ReactoKinesix.ReactoKinesixApp
static member ReactoKinesixApp.CreateNew : awsKey:string * awsSecret:string * region:RegionEndpoint * appName:string * streamName:string * workerId:string * processorFactory:IRecordProcessorFactory -> IReactoKinesixApp
static member ReactoKinesixApp.CreateNew : kinesis:Kinesis.IAmazonKinesis * dynamoDB:DynamoDBv2.IAmazonDynamoDB * cloudWatch:CloudWatch.IAmazonCloudWatch * appName:string * streamName:string * workerId:string * processorFactory:IRecordProcessorFactory * config:ReactoKinesixConfig -> IReactoKinesixApp
static member ReactoKinesixApp.CreateNew : awsKey:string * awsSecret:string * region:RegionEndpoint * appName:string * streamName:string * workerId:string * processorFactory:IRecordProcessorFactory * config:ReactoKinesixConfig -> IReactoKinesixApp
static member BackgroundColor : ConsoleColor with get, set
static member Beep : unit -> unit + 1 overload
static member BufferHeight : int with get, set
static member BufferWidth : int with get, set
static member CapsLock : bool
static member Clear : unit -> unit
static member CursorLeft : int with get, set
static member CursorSize : int with get, set
static member CursorTop : int with get, set
static member CursorVisible : bool with get, set
...
Full name: System.Console
System.Console.ReadKey(intercept: bool) : System.ConsoleKeyInfo
Full name: Microsoft.FSharp.Core.Operators.ignore