ReactoKinesix


Start by downloading the Nuget package.

1: 2: 3: 4: 5: 6: 7: 8: 9: 10: 11: 12: 13: 14: 15: 16: 17: 18: 19: 20: 21: 22: 23: 24: 25: 26: 27: 28: 29:
class Program
{
    static void Main()
    {
        var awsKey = "AKIAJTGWYUCX6G7P2FOQ";
        var awsSecret = "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";
        var region = RegionEndpoint.USEast1;
        var appName = "YC-test";
        var streamName = "YC-test";
        var workerId = "PHANTOM-cs";

        Console.WriteLine("Starting client application...");

        var factory = new MyProcessorFactory();
        var app = 
            ReactoKinesixApp.CreateNew(
                awsKey, awsSecret, region, appName, streamName, workerId, factory);

        app.OnInitialized += (_, evtArgs) => 
            Console.WriteLine("Client application started");
        app.OnBatchProcessed += (_, evtArgs) => 
            Console.WriteLine("Another batch processed...");

        Console.WriteLine("Press any key to quit...");
        Console.ReadKey();

        app.Dispose();
    }
}

The above example assumes existence of MyProcessorFactory.

1: 2: 3: 4: 5: 6: 7: 8: 9: 10: 11: 12: 13: 14: 15: 16: 17: 18: 19: 20: 21: 22: 23: 24: 25: 26: 27: 28: 29: 30: 31: 32: 33: 34: 35: 36: 37: 38: 39: 40: 41: 42: 43:
open ReactoKinesix
open ReactoKinesix.Model

public class MyProcessorFactory : IRecordProcessorFactory
{
    public IRecordProcessor CreateNew(string shardId)
    {
        return new MyProcessor();
    }
}

public class MyProcessor : IRecordProcessor
{
    public ProcessRecordsResult Process(
        string shardId, 
        Record[] records)
    {
        foreach (var record in records)
        {
            var msg = System.Text.Encoding.UTF8.GetString(record.Data);
            Console.WriteLine(
                "You got message [{0}] : {1}", 
                record.SequenceNumber, 
                msg);
        }

        return new ProcessRecordsResult(Status.Success, true);
    }

    public void OnMaxRetryExceeded(
        Record[] records, 
        ErrorHandlingMode errorHandlingMode)
    {
        Console.WriteLine(
            "Batch [count:{0}] failed.", 
            records.Length);
    }

    public void Dispose()
    {
        Console.WriteLine("Processor disposed");
    }
}
Fork me on GitHub