ReactoKinesix


Stopping and Starting processing of a shard

If for some reason you need to stop processing a shard, and restart it later, you can call the IReactoKinesixApp.StopProcessing and IReactoKinesixApp.StartProcessing methods with a shard ID.

Note: when stopping processing of a shard, in order to avoid lost of progress and potentially process the same records more than once when processing is resumed, processing of the shard will come to a stop only after we have managed to finish processing the current batch of records that have been received and that the checkpoint has been updated successfully in Amazon DynamoDB.
Note: if you are running the client application on multiple nodes then you'll need to call the stop/start processing method on all the nodes otherwise another node will simply take over processing of the shard when the heartbeat time out has lapsed.

Changing processor on the fly

You can also change the IRecordProcessorFactory implementation used by the client application at runtime, by calling the IReactoKinesixApp.ChangeProcessorFactory method.

Note: if you are running the client application on multiple nodes then you'll need to call the IReactoKinesixApp.ChangeProcessorFactory method on all the nodes.

Stopping the client application

To completely stop the client application and release all the resources currently used, simply dispose of the running IReactoKinesixApp instance. Doing so will stop the processing of all the shards whilst still making sure that the application is kept in a consistent state so that we are able to resume later from where we left off without risk processing the same records again.

Handling shard merge/split

When you merge or split shards in Amazon Kinesis, it will create new shard(s) and the old shards will be closed (but still available to pull records from for 1 day). When this happens the client application will see the new shards and start processing them as soon as they become available, and the old shards will be processed until all their records have been processed. This is handled automatically by this library.

Configuring the client application

Whilst you don't need to specify a configuration when creating a new client application using the ReactoKinesixApp.CreateNew static method, a default configuration is used with the following settings:

Configuration Default Value Description
DynamoDBReadThroughput 10 Read throughput to use for the DynamoDB table.
DynamoDBWriteThroughput 10 Write throughput to use for the DynamoDB table.
DynamoDBTableSuffix KinesisState Suffix used to name your application's state table in DynamoDB.
Heartbeat 30 seconds Heartheat frequency.
HeartbeatTimeout 3 minutes Timeout for the heartbeat check.
EmptyReceiveDelay 3 seconds Delay in trying to pull the stream if the last pull returned no records.
MaxDynamoDBRetries 3 Maximum number of retries on DynamoDB operations.
MaxKinesisRetries 3 Maximum number of retries on Kinesis operations.
CheckStreamChangesFrequency 1 minute How frequently should we check for shard merges/splits in the stream.
CheckUnprocessedShardsFrequency 1 minute How frequently should we check for shards whose worker has died.
LoadBalanceFrequency 3 minutes How frequently should we try to balance the load amongst the workers.
HandoverRequestExpiry 10 minutes How much time to allow a handover request to complete.
CheckPendingHandoverRequestFrequency 1 minute How frequently should we check for pending handover requests for a shard.
ErrorHandlingMode Retry twice and then skip How to handle errors.
MaxBatchSize 10,000 Max number of records per batch.

If you need to use a different configuration to the default, then simply create an instance of ReactoKinesixConfig with the configurations you want and pass it into the ReactoKinesixApp.CreateNew when creating your client application.

Important: if you know that your application will use a large number of shards and worker nodes then you will want to increase the read and write throughput for the DynamoDB table otherwise database operations are likely to be throttled on a regular basis and cause delays in processing your records.

Fork me on GitHub