ReactoKinesix


Start by downloading the Nuget package.

 1: 
 2: 
 3: 
 4: 
 5: 
 6: 
 7: 
 8: 
 9: 
10: 
11: 
12: 
13: 
14: 
15: 
16: 
17: 
18: 
19: 
20: 
21: 
22: 
23: 
24: 
25: 
26: 
27: 
28: 
29: 
30: 
31: 
32: 
33: 
34: 
35: 
36: 
37: 
38: 
39: 
40: 
41: 
42: 
43: 
44: 
45: 
46: 
47: 
#r "AWSSDK.dll"
#r "ReactoKinesix.dll"

open Amazon
open ReactoKinesix
open ReactoKinesix.Model

let awsKey      = "AKIAI5Y767DTOFBUSYAA"
let awsSecret   = "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"
let region      = RegionEndpoint.USEast1
let appName     = "TestApp"
let streamName  = "TestStream"
let workerId    = "LocalHost"

let act (record : Record) =
    let msg = System.Text.Encoding.UTF8.GetString(record.Data)
    printfn "You got message [%s] : %s" record.SequenceNumber msg  

let processBatch (records : Record[]) = 
    records |> Array.iter act
    { Status = Success; Checkpoint = true }

let maxRetryExceeded (records : Record[]) =
    printfn "Batch [count:%d] failed." records.Length
    
let gen () = 
    { new IRecordProcessor with 
        member this.Process(shardId, records) = processBatch records
        member this.OnMaxRetryExceeded(records, _) = maxRetryExceeded records
        member this.Dispose() = printfn "Processor disposed" }

let factory = 
    { new IRecordProcessorFactory with
        member this.CreateNew _ = gen() }

printfn "Starting client application..."
let app = 
    ReactoKinesixApp.CreateNew
        (awsKey, awsSecret, region, appName, streamName, workerId, factory)

app.OnInitialized.Add(fun _ -> printfn "Client application started")
app.OnBatchProcessed.Add(fun _ -> printfn "Another batch processed...")

printfn "Press any key to quit..."
System.Console.ReadKey() |> ignore

app.Dispose()
namespace Amazon
namespace ReactoKinesix
namespace ReactoKinesix.Model
val awsKey : string

Full name: Example-fsharp.awsKey
val awsSecret : string

Full name: Example-fsharp.awsSecret
val region : RegionEndpoint

Full name: Example-fsharp.region
type RegionEndpoint =
  member DisplayName : string with get, set
  member GetEndpointForService : serviceName:string -> Endpoint
  member GuessEndpointForService : serviceName:string -> Endpoint
  member SystemName : string with get, set
  member ToString : unit -> string
  static val USEast1 : RegionEndpoint
  static val USWest1 : RegionEndpoint
  static val USWest2 : RegionEndpoint
  static val EUWest1 : RegionEndpoint
  static val EUCentral1 : RegionEndpoint
  ...
  nested type Endpoint

Full name: Amazon.RegionEndpoint
field RegionEndpoint.USEast1
val appName : string

Full name: Example-fsharp.appName
val streamName : string

Full name: Example-fsharp.streamName
val workerId : string

Full name: Example-fsharp.workerId
val act : record:Record -> unit

Full name: Example-fsharp.act
val record : Record
type Record =
  {SequenceNumber: string;
   Data: byte [];
   PartitionKey: string;}
  static member op_Explicit : record:Record -> Record

Full name: ReactoKinesix.Model.Record
val msg : string
namespace System
namespace System.Text
type Encoding =
  member BodyName : string
  member Clone : unit -> obj
  member CodePage : int
  member DecoderFallback : DecoderFallback with get, set
  member EncoderFallback : EncoderFallback with get, set
  member EncodingName : string
  member Equals : value:obj -> bool
  member GetByteCount : chars:char[] -> int + 3 overloads
  member GetBytes : chars:char[] -> byte[] + 5 overloads
  member GetCharCount : bytes:byte[] -> int + 2 overloads
  ...

Full name: System.Text.Encoding
property System.Text.Encoding.UTF8: System.Text.Encoding
System.Text.Encoding.GetString(bytes: byte []) : string
System.Text.Encoding.GetString(bytes: byte [], index: int, count: int) : string
Record.Data: byte []
val printfn : format:Printf.TextWriterFormat<'T> -> 'T

Full name: Microsoft.FSharp.Core.ExtraTopLevelOperators.printfn
Record.SequenceNumber: string
val processBatch : records:Record [] -> ProcessRecordsResult

Full name: Example-fsharp.processBatch
val records : Record []
module Array

from Microsoft.FSharp.Collections
val iter : action:('T -> unit) -> array:'T [] -> unit

Full name: Microsoft.FSharp.Collections.Array.iter
type Status =
  | Success
  | Failure of Exception

Full name: ReactoKinesix.Model.Status
union case Status.Success: Status
val maxRetryExceeded : records:Record [] -> unit

Full name: Example-fsharp.maxRetryExceeded
property System.Array.Length: int
val gen : unit -> IRecordProcessor

Full name: Example-fsharp.gen
type IRecordProcessor =
  interface
    inherit IDisposable
    abstract member OnMaxRetryExceeded : records:Record [] * errorHandlingMode:ErrorHandlingMode -> unit
    abstract member Process : shardId:string * records:Record [] -> ProcessRecordsResult
  end

Full name: ReactoKinesix.IRecordProcessor
val this : IRecordProcessor
abstract member IRecordProcessor.Process : shardId:string * records:Record [] -> ProcessRecordsResult
val shardId : string
abstract member IRecordProcessor.OnMaxRetryExceeded : records:Record [] * errorHandlingMode:ErrorHandlingMode -> unit
System.IDisposable.Dispose() : unit
val factory : IRecordProcessorFactory

Full name: Example-fsharp.factory
type IRecordProcessorFactory =
  interface
    abstract member CreateNew : shardId:string -> IRecordProcessor
  end

Full name: ReactoKinesix.IRecordProcessorFactory
val this : IRecordProcessorFactory
abstract member IRecordProcessorFactory.CreateNew : shardId:string -> IRecordProcessor
val app : IReactoKinesixApp

Full name: Example-fsharp.app
type ReactoKinesixApp =
  interface IDisposable
  interface IReactoKinesixApp
  private new : kinesis:IAmazonKinesis * dynamoDB:IAmazonDynamoDB * cloudWatch:IAmazonCloudWatch * appName:string * streamName:string * workerId:string * processorFactory:IRecordProcessorFactory * config:ReactoKinesixConfig -> ReactoKinesixApp
  override Finalize : unit -> unit
  member private MarkAsClosed : shardId:ShardId -> unit
  member private StopProcessing : shardId:ShardId -> unit
  member private Config : ReactoKinesixConfig
  member private DynamoDB : IAmazonDynamoDB
  member private Kinesis : IAmazonKinesis
  member private MetricsAgent : MetricsAgent
  ...

Full name: ReactoKinesix.ReactoKinesixApp
static member ReactoKinesixApp.CreateNew : kinesis:Kinesis.IAmazonKinesis * dynamoDB:DynamoDBv2.IAmazonDynamoDB * cloudWatch:CloudWatch.IAmazonCloudWatch * appName:string * streamName:string * workerId:string * processorFactory:IRecordProcessorFactory -> IReactoKinesixApp
static member ReactoKinesixApp.CreateNew : awsKey:string * awsSecret:string * region:RegionEndpoint * appName:string * streamName:string * workerId:string * processorFactory:IRecordProcessorFactory -> IReactoKinesixApp
static member ReactoKinesixApp.CreateNew : kinesis:Kinesis.IAmazonKinesis * dynamoDB:DynamoDBv2.IAmazonDynamoDB * cloudWatch:CloudWatch.IAmazonCloudWatch * appName:string * streamName:string * workerId:string * processorFactory:IRecordProcessorFactory * config:ReactoKinesixConfig -> IReactoKinesixApp
static member ReactoKinesixApp.CreateNew : awsKey:string * awsSecret:string * region:RegionEndpoint * appName:string * streamName:string * workerId:string * processorFactory:IRecordProcessorFactory * config:ReactoKinesixConfig -> IReactoKinesixApp
event IReactoKinesixApp.OnInitialized: IEvent<OnInitializedDelegate,System.EventArgs>
member System.IObservable.Add : callback:('T -> unit) -> unit
event IReactoKinesixApp.OnBatchProcessed: IEvent<OnBatchProcessedDelegate,System.EventArgs>
type Console =
  static member BackgroundColor : ConsoleColor with get, set
  static member Beep : unit -> unit + 1 overload
  static member BufferHeight : int with get, set
  static member BufferWidth : int with get, set
  static member CapsLock : bool
  static member Clear : unit -> unit
  static member CursorLeft : int with get, set
  static member CursorSize : int with get, set
  static member CursorTop : int with get, set
  static member CursorVisible : bool with get, set
  ...

Full name: System.Console
System.Console.ReadKey() : System.ConsoleKeyInfo
System.Console.ReadKey(intercept: bool) : System.ConsoleKeyInfo
val ignore : value:'T -> unit

Full name: Microsoft.FSharp.Core.Operators.ignore
Fork me on GitHub