Start by downloading the Nuget package.
1: 2: 3: 4: 5: 6: 7: 8: 9: 10: 11: 12: 13: 14: 15: 16: 17: 18: 19: 20: 21: 22: 23: 24: 25: 26: 27: 28: 29: 30: 31: 32: 33: 34: 35: 36: 37: 38: 39: 40: 41: 42: 43: 44: 45: 46: 47: |
#r "AWSSDK.dll" #r "ReactoKinesix.dll" open Amazon open ReactoKinesix open ReactoKinesix.Model let awsKey = "AKIAI5Y767DTOFBUSYAA" let awsSecret = "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" let region = RegionEndpoint.USEast1 let appName = "TestApp" let streamName = "TestStream" let workerId = "LocalHost" let act (record : Record) = let msg = System.Text.Encoding.UTF8.GetString(record.Data) printfn "You got message [%s] : %s" record.SequenceNumber msg let processBatch (records : Record[]) = records |> Array.iter act { Status = Success; Checkpoint = true } let maxRetryExceeded (records : Record[]) = printfn "Batch [count:%d] failed." records.Length let gen () = { new IRecordProcessor with member this.Process(shardId, records) = processBatch records member this.OnMaxRetryExceeded(records, _) = maxRetryExceeded records member this.Dispose() = printfn "Processor disposed" } let factory = { new IRecordProcessorFactory with member this.CreateNew _ = gen() } printfn "Starting client application..." let app = ReactoKinesixApp.CreateNew (awsKey, awsSecret, region, appName, streamName, workerId, factory) app.OnInitialized.Add(fun _ -> printfn "Client application started") app.OnBatchProcessed.Add(fun _ -> printfn "Another batch processed...") printfn "Press any key to quit..." System.Console.ReadKey() |> ignore app.Dispose() |
namespace Amazon
namespace ReactoKinesix
namespace ReactoKinesix.Model
val awsKey : string
Full name: Example-fsharp.awsKey
Full name: Example-fsharp.awsKey
val awsSecret : string
Full name: Example-fsharp.awsSecret
Full name: Example-fsharp.awsSecret
val region : RegionEndpoint
Full name: Example-fsharp.region
Full name: Example-fsharp.region
type RegionEndpoint =
member DisplayName : string with get, set
member GetEndpointForService : serviceName:string -> Endpoint
member GuessEndpointForService : serviceName:string -> Endpoint
member SystemName : string with get, set
member ToString : unit -> string
static val USEast1 : RegionEndpoint
static val USWest1 : RegionEndpoint
static val USWest2 : RegionEndpoint
static val EUWest1 : RegionEndpoint
static val EUCentral1 : RegionEndpoint
...
nested type Endpoint
Full name: Amazon.RegionEndpoint
member DisplayName : string with get, set
member GetEndpointForService : serviceName:string -> Endpoint
member GuessEndpointForService : serviceName:string -> Endpoint
member SystemName : string with get, set
member ToString : unit -> string
static val USEast1 : RegionEndpoint
static val USWest1 : RegionEndpoint
static val USWest2 : RegionEndpoint
static val EUWest1 : RegionEndpoint
static val EUCentral1 : RegionEndpoint
...
nested type Endpoint
Full name: Amazon.RegionEndpoint
field RegionEndpoint.USEast1
val appName : string
Full name: Example-fsharp.appName
Full name: Example-fsharp.appName
val streamName : string
Full name: Example-fsharp.streamName
Full name: Example-fsharp.streamName
val workerId : string
Full name: Example-fsharp.workerId
Full name: Example-fsharp.workerId
val act : record:Record -> unit
Full name: Example-fsharp.act
Full name: Example-fsharp.act
val record : Record
type Record =
{SequenceNumber: string;
Data: byte [];
PartitionKey: string;}
static member op_Explicit : record:Record -> Record
Full name: ReactoKinesix.Model.Record
{SequenceNumber: string;
Data: byte [];
PartitionKey: string;}
static member op_Explicit : record:Record -> Record
Full name: ReactoKinesix.Model.Record
val msg : string
namespace System
namespace System.Text
type Encoding =
member BodyName : string
member Clone : unit -> obj
member CodePage : int
member DecoderFallback : DecoderFallback with get, set
member EncoderFallback : EncoderFallback with get, set
member EncodingName : string
member Equals : value:obj -> bool
member GetByteCount : chars:char[] -> int + 3 overloads
member GetBytes : chars:char[] -> byte[] + 5 overloads
member GetCharCount : bytes:byte[] -> int + 2 overloads
...
Full name: System.Text.Encoding
member BodyName : string
member Clone : unit -> obj
member CodePage : int
member DecoderFallback : DecoderFallback with get, set
member EncoderFallback : EncoderFallback with get, set
member EncodingName : string
member Equals : value:obj -> bool
member GetByteCount : chars:char[] -> int + 3 overloads
member GetBytes : chars:char[] -> byte[] + 5 overloads
member GetCharCount : bytes:byte[] -> int + 2 overloads
...
Full name: System.Text.Encoding
property System.Text.Encoding.UTF8: System.Text.Encoding
System.Text.Encoding.GetString(bytes: byte []) : string
System.Text.Encoding.GetString(bytes: byte [], index: int, count: int) : string
System.Text.Encoding.GetString(bytes: byte [], index: int, count: int) : string
Record.Data: byte []
val printfn : format:Printf.TextWriterFormat<'T> -> 'T
Full name: Microsoft.FSharp.Core.ExtraTopLevelOperators.printfn
Full name: Microsoft.FSharp.Core.ExtraTopLevelOperators.printfn
Record.SequenceNumber: string
val processBatch : records:Record [] -> ProcessRecordsResult
Full name: Example-fsharp.processBatch
Full name: Example-fsharp.processBatch
val records : Record []
module Array
from Microsoft.FSharp.Collections
from Microsoft.FSharp.Collections
val iter : action:('T -> unit) -> array:'T [] -> unit
Full name: Microsoft.FSharp.Collections.Array.iter
Full name: Microsoft.FSharp.Collections.Array.iter
type Status =
| Success
| Failure of Exception
Full name: ReactoKinesix.Model.Status
| Success
| Failure of Exception
Full name: ReactoKinesix.Model.Status
union case Status.Success: Status
val maxRetryExceeded : records:Record [] -> unit
Full name: Example-fsharp.maxRetryExceeded
Full name: Example-fsharp.maxRetryExceeded
property System.Array.Length: int
val gen : unit -> IRecordProcessor
Full name: Example-fsharp.gen
Full name: Example-fsharp.gen
type IRecordProcessor =
interface
inherit IDisposable
abstract member OnMaxRetryExceeded : records:Record [] * errorHandlingMode:ErrorHandlingMode -> unit
abstract member Process : shardId:string * records:Record [] -> ProcessRecordsResult
end
Full name: ReactoKinesix.IRecordProcessor
interface
inherit IDisposable
abstract member OnMaxRetryExceeded : records:Record [] * errorHandlingMode:ErrorHandlingMode -> unit
abstract member Process : shardId:string * records:Record [] -> ProcessRecordsResult
end
Full name: ReactoKinesix.IRecordProcessor
val this : IRecordProcessor
abstract member IRecordProcessor.Process : shardId:string * records:Record [] -> ProcessRecordsResult
val shardId : string
abstract member IRecordProcessor.OnMaxRetryExceeded : records:Record [] * errorHandlingMode:ErrorHandlingMode -> unit
System.IDisposable.Dispose() : unit
val factory : IRecordProcessorFactory
Full name: Example-fsharp.factory
Full name: Example-fsharp.factory
type IRecordProcessorFactory =
interface
abstract member CreateNew : shardId:string -> IRecordProcessor
end
Full name: ReactoKinesix.IRecordProcessorFactory
interface
abstract member CreateNew : shardId:string -> IRecordProcessor
end
Full name: ReactoKinesix.IRecordProcessorFactory
val this : IRecordProcessorFactory
abstract member IRecordProcessorFactory.CreateNew : shardId:string -> IRecordProcessor
val app : IReactoKinesixApp
Full name: Example-fsharp.app
Full name: Example-fsharp.app
type ReactoKinesixApp =
interface IDisposable
interface IReactoKinesixApp
private new : kinesis:IAmazonKinesis * dynamoDB:IAmazonDynamoDB * cloudWatch:IAmazonCloudWatch * appName:string * streamName:string * workerId:string * processorFactory:IRecordProcessorFactory * config:ReactoKinesixConfig -> ReactoKinesixApp
override Finalize : unit -> unit
member private MarkAsClosed : shardId:ShardId -> unit
member private StopProcessing : shardId:ShardId -> unit
member private Config : ReactoKinesixConfig
member private DynamoDB : IAmazonDynamoDB
member private Kinesis : IAmazonKinesis
member private MetricsAgent : MetricsAgent
...
Full name: ReactoKinesix.ReactoKinesixApp
interface IDisposable
interface IReactoKinesixApp
private new : kinesis:IAmazonKinesis * dynamoDB:IAmazonDynamoDB * cloudWatch:IAmazonCloudWatch * appName:string * streamName:string * workerId:string * processorFactory:IRecordProcessorFactory * config:ReactoKinesixConfig -> ReactoKinesixApp
override Finalize : unit -> unit
member private MarkAsClosed : shardId:ShardId -> unit
member private StopProcessing : shardId:ShardId -> unit
member private Config : ReactoKinesixConfig
member private DynamoDB : IAmazonDynamoDB
member private Kinesis : IAmazonKinesis
member private MetricsAgent : MetricsAgent
...
Full name: ReactoKinesix.ReactoKinesixApp
static member ReactoKinesixApp.CreateNew : kinesis:Kinesis.IAmazonKinesis * dynamoDB:DynamoDBv2.IAmazonDynamoDB * cloudWatch:CloudWatch.IAmazonCloudWatch * appName:string * streamName:string * workerId:string * processorFactory:IRecordProcessorFactory -> IReactoKinesixApp
static member ReactoKinesixApp.CreateNew : awsKey:string * awsSecret:string * region:RegionEndpoint * appName:string * streamName:string * workerId:string * processorFactory:IRecordProcessorFactory -> IReactoKinesixApp
static member ReactoKinesixApp.CreateNew : kinesis:Kinesis.IAmazonKinesis * dynamoDB:DynamoDBv2.IAmazonDynamoDB * cloudWatch:CloudWatch.IAmazonCloudWatch * appName:string * streamName:string * workerId:string * processorFactory:IRecordProcessorFactory * config:ReactoKinesixConfig -> IReactoKinesixApp
static member ReactoKinesixApp.CreateNew : awsKey:string * awsSecret:string * region:RegionEndpoint * appName:string * streamName:string * workerId:string * processorFactory:IRecordProcessorFactory * config:ReactoKinesixConfig -> IReactoKinesixApp
static member ReactoKinesixApp.CreateNew : awsKey:string * awsSecret:string * region:RegionEndpoint * appName:string * streamName:string * workerId:string * processorFactory:IRecordProcessorFactory -> IReactoKinesixApp
static member ReactoKinesixApp.CreateNew : kinesis:Kinesis.IAmazonKinesis * dynamoDB:DynamoDBv2.IAmazonDynamoDB * cloudWatch:CloudWatch.IAmazonCloudWatch * appName:string * streamName:string * workerId:string * processorFactory:IRecordProcessorFactory * config:ReactoKinesixConfig -> IReactoKinesixApp
static member ReactoKinesixApp.CreateNew : awsKey:string * awsSecret:string * region:RegionEndpoint * appName:string * streamName:string * workerId:string * processorFactory:IRecordProcessorFactory * config:ReactoKinesixConfig -> IReactoKinesixApp
event IReactoKinesixApp.OnInitialized: IEvent<OnInitializedDelegate,System.EventArgs>
member System.IObservable.Add : callback:('T -> unit) -> unit
event IReactoKinesixApp.OnBatchProcessed: IEvent<OnBatchProcessedDelegate,System.EventArgs>
type Console =
static member BackgroundColor : ConsoleColor with get, set
static member Beep : unit -> unit + 1 overload
static member BufferHeight : int with get, set
static member BufferWidth : int with get, set
static member CapsLock : bool
static member Clear : unit -> unit
static member CursorLeft : int with get, set
static member CursorSize : int with get, set
static member CursorTop : int with get, set
static member CursorVisible : bool with get, set
...
Full name: System.Console
static member BackgroundColor : ConsoleColor with get, set
static member Beep : unit -> unit + 1 overload
static member BufferHeight : int with get, set
static member BufferWidth : int with get, set
static member CapsLock : bool
static member Clear : unit -> unit
static member CursorLeft : int with get, set
static member CursorSize : int with get, set
static member CursorTop : int with get, set
static member CursorVisible : bool with get, set
...
Full name: System.Console
System.Console.ReadKey() : System.ConsoleKeyInfo
System.Console.ReadKey(intercept: bool) : System.ConsoleKeyInfo
System.Console.ReadKey(intercept: bool) : System.ConsoleKeyInfo
val ignore : value:'T -> unit
Full name: Microsoft.FSharp.Core.Operators.ignore
Full name: Microsoft.FSharp.Core.Operators.ignore