Deep dive into Azure Cosmos Db Change Feed

Azure Cosmos Db has an impressive feature called ‘Change feed’. It enables capturing the changes in the data (inserts and updates) and provides an unified API to access those captured change events. The change event data feed can be used as an event source in the applications.  You can read about the overview of this feature from this link

From an architecture point of view, the change feed feature can be used as an event sourcing mechanism. Applications can subscribe to the change event feed, By default Cosmos Db is enabled with the change feed,  there are 3 different ways to subscribe to the change feed.

  1. Azure Functions – Serverless Approach
  2. Using Cosmos SQL SDK
  3. Using Change Feed Processor SDK

Using Azure Functions

Setting up the change feed using Azure Functions is straight forward, this is a trigger based mechanism. We can configure a Azure Function using the portal by navigating to the Cosmos Db collection and click ‘Add Azure Function’ in the blade. This will create an Azure Function with the minimum required template to subscribe to the change feed. The below gist shows a mildly altered version of the auto generated template.


using Microsoft.Azure.Documents;
using System.Collections.Generic;
using System;
public static async Task Run(IReadOnlyList<Document> input, TraceWriter log)
{
foreach(var changeInput in input)
{
if(changeInput.GetPropertyValue<string>("city") == "colombo")
{
log.Verbose("Something has happened in Colombo");
}
else
{
log.Verbose("Something has happened in somewhere else");
}
}
log.Verbose("Document count " + input.Count);
log.Verbose("First document Id " + input[0].Id);
}

The above Function gets triggered when a change occurs in the collection (insertion of a new document or an update in the existing document). One change event trigger may contain more than one changed documents, IReadOnlyList  parameter receives the list of changed documents and implements some business logic in a loop.

In order to get the feed from the last changed checkpoint, the serverless function need to persist the checkpoint information. So when we create the Azure Function, in order to capture the change, it will create a Cosmos Db document collection to store the checkpoint information. This collection is known as lease collection. The lease collection stores the continuation information per partition and helps to coordinate multiple subscribers per collection.

The below is a sample lease collection document.


{
"id": "applecosmos.documents.azure.com_BeRbAA==_BeRbALSrmAE=..0",
"_etag": "\"2800a558-0000-0000-0000-5b1fb9180000\"",
"state": 1,
"PartitionId": "0",
"Owner": null,
"ContinuationToken": "\"19\"",
"SequenceNumber": 1,
"_rid": "BeRbAKMEwAADAAAAAAAAAA==",
"_self": "dbs/BeRbAA==/colls/BeRbAKMEwAA=/docs/BeRbAKMEwAADAAAAAAAAAA==/",
"_attachments": "attachments/",
"_ts": 1528805656
}

In practical implementations, we would not worry much about the lease collection structure as this is used by the Azure Function to coordinate the work and subscribe to the right change feed and right checkpoint. Serverless implementation abstracts lots of details and this is the recommended option as per the documentation from Microsoft.

Using Cosmos SQL SDK

We can use the Cosmos SQL SDK to query the change events from Cosmos Db. Use the Cosmos Db NuGet package to add the Cosmos SQL SDK.

Install-Package Microsoft.Azure.DocumentDB

This SDK provides methods to subscribe to the change feed. In this mode, developers should handle the custom checkpoint logic and persist the checkpoint data for continuation. The below gist shows a sample, which describes how to subscribe to the changes per logical partition.


using Microsoft.Azure.Documents;
using Microsoft.Azure.Documents.Client;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
namespace SQLSDK
{
public class ChangeFeedSQLSDKProvider
{
private readonly DocumentClient _documentClient;
private readonly Uri _collectionUri;
public ChangeFeedSQLSDKProvider()
{
}
public ChangeFeedSQLSDKProvider(string url, string key, string database, string collection)
{
_documentClient = new DocumentClient(new Uri(url), key,
new ConnectionPolicy { ConnectionMode = ConnectionMode.Direct, ConnectionProtocol = Protocol.Tcp });
_collectionUri = UriFactory.CreateDocumentCollectionUri(database, collection);
}
public async Task<int> GetChangeFeedAsync(string partitionName)
{
//var partionKeyRangeReponse = await _documentClient.ReadPartitionKeyRangeFeedAsync(_collectionUri, new FeedOptions
//{
// RequestContinuation = await GetContinuationTokenForPartitionAsync(partitionName),
// PartitionKey = new PartitionKey(partitionName)
//});
//var partitionKeyRanges = new List<PartitionKeyRange>();
//partitionKeyRanges.AddRange(partionKeyRangeReponse);
var changeFeedQuery = _documentClient.CreateDocumentChangeFeedQuery(_collectionUri, new ChangeFeedOptions
{
StartFromBeginning = true,
PartitionKey = new PartitionKey(partitionName),
RequestContinuation = await GetContinuationTokenForPartitionAsync(partitionName),
});
var changeDocumentCount = 0;
while (changeFeedQuery.HasMoreResults)
{
var response = await changeFeedQuery.ExecuteNextAsync<DeveloperModel>();
foreach(var document in response)
{
// TODO :: process changes here
Console.WriteLine($"changed for id – {document.Id} with name {document.Name} and skill {document.Skill}");
}
SetContinuationTokenForPartitionAsync(partitionName, response.ResponseContinuation);
changeDocumentCount++;
}
return changeDocumentCount;
}
private async Task<string> GetContinuationTokenForPartitionAsync(string partitionName)
{
// TODO :: retrieve from a key value pair : persistence
return null;
}
private async Task SetContinuationTokenForPartitionAsync(string partitionName, string lsn)
{
// TODO :: get the continuation token from persistence store
}
}
}

The commented lines from line 31-38 shows the mechanism of subscribing at the partition key range. In my opinion, keeping the subscriptions at the logical partition level makes sense in most of the business cases, which is what shown in the above code. Logical partition name is passed as a parameter.

When the change feed is read the continuation token for the specified change feed option  (partition key range or partition key) is returned by the Cosmos Db. This should be explicitly stored by the developer in order to retrieve this and resume the change feed consumption from the point where it was left.

In the code you can notice that the checkpoint information is stored against each partition.

Using Change Processor Library

Cosmos Db has a dedicated Change Processor Library, which eases up the change subscription in custom applications. This library can be used in advance subscribe scenarios as developers do not need to manage partition and continuation token logic.

Install-Package Microsoft.Azure.DocumentDB.ChangeFeedProcessor

Change Processor Library helps handles lots of complexity in handling the coordination of subscribers. The below gist shows the sample code for the change processor library. The change feed subscription is made per the partition range key.


public class ChangeFeedProcessorSDK
{
private readonly DocumentCollectionInfo _monitoredCollection;
private readonly DocumentCollectionInfo _leaseCollection;
public ChangeFeedProcessorSDK(DocumentCollectionInfo monitorCollection, DocumentCollectionInfo leaseCollection)
{
_monitoredCollection = monitorCollection;
_leaseCollection = leaseCollection;
}
public async Task<int> GetChangesAsync()
{
var hostName = $"Host – {Guid.NewGuid().ToString()}";
var builder = new ChangeFeedProcessorBuilder();
builder
.WithHostName(hostName)
.WithFeedCollection(_monitoredCollection)
.WithLeaseCollection(_leaseCollection)
.WithObserverFactory(new CustomObserverFactory());
var processor = await builder.BuildAsync();
await processor.StartAsync();
Console.WriteLine($"Started host – {hostName}");
Console.WriteLine("Press any key to stop");
Console.ReadKey();
await processor.StopAsync();
return 0;
}
}
public class CustomObserverFactory : Microsoft.Azure.Documents.ChangeFeedProcessor.FeedProcessing.IChangeFeedObserverFactory
{
public Microsoft.Azure.Documents.ChangeFeedProcessor.FeedProcessing.IChangeFeedObserver CreateObserver()
{
return new CustomObserver();
}
}
public class CustomObserver : Microsoft.Azure.Documents.ChangeFeedProcessor.FeedProcessing.IChangeFeedObserver
{
public Task CloseAsync(IChangeFeedObserverContext context, Microsoft.Azure.Documents.ChangeFeedProcessor.FeedProcessing.ChangeFeedObserverCloseReason reason)
{
Console.WriteLine($"Closing the listener to the partition key range {context.PartitionKeyRangeId} because {reason}");
return Task.CompletedTask;
}
public Task OpenAsync(IChangeFeedObserverContext context)
{
Console.WriteLine($"Openning the listener to the partition key range {context.PartitionKeyRangeId}");
return Task.CompletedTask;
}
public Task ProcessChangesAsync(IChangeFeedObserverContext context, IReadOnlyList<Document> docs, CancellationToken cancellationToken)
{
foreach(var document in docs)
{
// TODO :: processing logic
Console.WriteLine($"Changed document Id – {document.Id}");
}
return Task.CompletedTask;
}
}

In the above code, the monitored collection and the lease collection are given and the change feed processor builder is built with the minimum required details. As a minimum requirement you should pass the IChangeFeedObserverFactory to the builderThe change feed processor library can manage rest of the things like how to share leases of different partitions between different subscribers and etc. Also, this library has features to implement custom partition processing and load balancing strategies which are not addressed here.

Summary

Cosmos Db change feed is a powerful feature to subscribe to the changes. There are three different ways to do this as mentioned above.

The below table summarizes the options and features.

cosmos change feed summary table

 

Advertisement