Azure Cosmos Database — CRUD using Azure Cosmos SDK — Part 2

Raghunandan Gupta
FAUN — Developer Community 🐾
15 min readMay 30, 2020

--

In the previous article, I explained the basic concepts of the Cosmos database.

In this, we use Azure SDKs to interact with the Azure Cosmos database. To start we will first define our schema which will tell what all data we are going to store.

We will work on the below schema because it provides large varieties of queries that will help us analyzing usage and SDK & any optimizations if needed.

Data Payload

{
“id”: “eac7efa5dbd3d667f26eb3d3ab504464”,
“productId”: “eac7efa5dbd3d667f26eb3d3ab504464”,
“productName”: “Hornby 2014 Catalogue”,
“manufacturer”: “Hornby”,
“numberOfViews”: 15,
“numberOfAnsweredQuestions”: 1,
“averageReviewRating”: “4.9 out of 5 stars”,
“categorySubcategory”: “Hobbies > Model Trains & Railway Sets > Rail Vehicles > Trains”
}

With the above data, we can perform below operations on Azure Cosmos

  • Inserting Records sequentially in Azure Cosmos
  • Inserting Records in parallel in Azure Cosmos
  • Updating document in Azure Cosmos
  • Updating Document in Batches in Azure Cosmos
  • Retrieve Record from Azure Cosmos
  • Pagination

Let’s create a database and container first.

  • Database: CMS (Catalog Management System)
  • Container: Products
  • Partition Key: categorySubcategory
  • Unique Key: productId

Creating Product Class & read data from CSV

public static List<Product> readDataLineByLine(String file)
{
List<Product> products = new ArrayList<>();
Set<String> categories = new HashSet<>();
try {
FileReader filereader = new FileReader(file);
CSVReader csvReader = new CSVReaderBuilder(filereader).withSkipLines(1).build();
String[] nextRecord;
while ((nextRecord = csvReader.readNext()) != null) {
Product product = null;
try {
products.add(new Product(nextRecord[0], nextRecord[1], nextRecord[2], Integer.parseInt(nextRecord[3]), Integer.parseInt(nextRecord[4]), nextRecord[5], nextRecord[6]));
}catch(Exception e1) {
//Ignoring Exception
}
}
}
catch (Exception e) {
//Ignoring Exception
}
return products;
}

There are multiple SDKs are available to communicate with Azure Cosmos. In this, we will talk about Azure Cosmos SDK.

<dependency>
<groupId>com.microsoft.azure</groupId>
<artifactId>azure-cosmos</artifactId>
<version>3.6.0</version>
</dependency>

To start performing operations we first need to create Cosmos Database and Cosmos Container instance using SDK.

public static CosmosContainer container = null;static {
ConnectionPolicy connectionPolicy = new ConnectionPolicy();
connectionPolicy.connectionMode(ConnectionMode.DIRECT);
CosmosClient cosmosClient = CosmosClient.builder()
.endpoint(System.getenv("COSMOS_END_POINT"))
.key(System.getenv("COSMOS_PRIMARY_KEY"))
.connectionPolicy(connectionPolicy)
.build();
CosmosDatabase cosmosDatabase = cosmosClient.getDatabase(System.getenv("COSMOS_DATABASE"));
container = cosmosDatabase.getContainer(System.getenv("COSMOS_CONTAINER"));
}
public static CosmosContainer getInstance() {
return container;
}

End Point, Primary keys details you can find in the Keys section of Cosmos Database. These are sensitive and should be secured in either vault or somewhere safe.

Database and Container are names that we provided at the time of Container Creation using Azure Cosmos Portal.

Data Explorer is GUI where you can find details about databases, container their partition keys, unique keys, and data. It’s like workbench where you can perform queries check details.

Maven Dependency

This provides a reactive way to perform operations on the Cosmos database.

<dependency>
<groupId>com.microsoft.azure</groupId>
<artifactId>azure-cosmos</artifactId>
<version>3.0.0</version>
</dependency>

Inserting data sequentially in the Cosmos database

To insert the document we can use either createItem or upsertItem. You have to make sure you send “id” as well in the object which you are trying to save. Ideally “id” is generated by the Cosmos database but in Java SDK this functionality is currently not available at least with the one I am using. So simply I will set the value of “Id” same as “ProductId”.

https://stackoverflow.com/questions/55344254/azure-cosmosdb-rest-error-creating-document

public static void saveAll() {
final long start = System.nanoTime();
Flux
.just(CSVUtils.readDataLineByLine("sample_kaggle_data.csv").subList(0,1))
.flatMap(Flux::fromIterable)
.map(product -> addDocument(Mono.just(product), Product.class, product.getCategorySubcategory()).block())
.doFinally(endType -> LOGGER.info("Time taken : {} " , TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start) + " milliseconds."))
.subscribe(product -> LOGGER.info("Inserted = {} ", product.getId()));
}
public static <T> Mono<T> addDocument(Mono<T> obj, Class<T> klass, Object partitionKey) {
return obj.flatMap(data -> {
CosmosItemRequestOptions cosmosItemRequestOptions = new CosmosItemRequestOptions();
cosmosItemRequestOptions.partitionKey(new PartitionKey(partitionKey));
return ProductClient.getInstance().createItem(data, cosmosItemRequestOptions)
.map(response -> {
LOGGER.info("Request Charge to add record for Request Charge = {} ", response.requestCharge());
T savedData = null;
try {
savedData = response.properties().getObject(klass);
} catch (IOException e) {
LOGGER.error("Exception occurred while parsing output to Pojo = {} ", e);
}
return savedData;
});
});
}

CosmosItemRequestOptions allows setting different parameters when making a query to the Cosmos database. Currently, we are just setting Partition Key but we can also enable or disable cross partition queries. Enabling cross partition makes queries slow so it’s always recommended to choose partition key which you will have when you query.

Output

12:58:40.456 [CosmosEventLoop-2–2] INFO c.a.c.c.service.DocumentAddService — Request Charge to add record for Request Charge = 9.83 
12:58:40.476 [main] INFO c.a.c.c.service.DocumentAddService — Inserted = eac7efa5dbd3d667f26eb3d3ab504464
12:58:40.481 [main] INFO c.a.c.c.service.DocumentAddService — Time taken : 10051 milliseconds.

This shows how much time is taken to insert the record. This is also referred to as the Request Unit which is the pricing parameter in the Cosmos database. For insertion generally, it takes less time but I am using my local system so it is taking more time than expected.

Inserting All Data in Cosmos database

Once we start inserting data we can check stats using the Metric section present in Cosmos Database on Azure Portal. Below are a few stats that were captured at the time of inserting data.

public static void saveAll() {
final long start = System.nanoTime();
Flux
.just(CSVUtils.readDataLineByLine("sample_kaggle_data.csv"))
.flatMap(Flux::fromIterable)
.map(product -> addDocument(Mono.just(product), Product.class, product.getCategorySubcategory()).block())
.doFinally(endType -> LOGGER.info("Time taken : {} " , TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start) + " milliseconds."))
.subscribe(product -> LOGGER.info("Inserted = {} ", product.getId()));
}

In this, we are inserting items sequentially.

13:22:10.607 [main] INFO c.a.c.c.service.DocumentAddService — Time taken : 543459 milliseconds.

Let’s check how many items were inserted in the Cosmos Container.

The below query shows how many items we have per partition.

Inserting Records in parallel in Azure Cosmos

public static void saveAll() {
Flux
.just(CSVUtils.readDataLineByLine("sample_kaggle_data.csv"))
.flatMap(Flux::fromIterable)
.parallel()
.runOn(Schedulers.elastic())
.map(product -> addDocument(Mono.just(product), Product.class, product.getCategorySubcategory()).block())
.sequential().collectList().block();
}
public static <T> Mono<T> addDocument(Mono<T> obj, Class<T> klass, Object partitionKey) {
return obj.flatMap(data -> {
CosmosItemRequestOptions cosmosItemRequestOptions = new CosmosItemRequestOptions();
cosmosItemRequestOptions.partitionKey(new PartitionKey(partitionKey));
return ProductClient.getInstance().createItem(data, cosmosItemRequestOptions)
.map(response -> {
LOGGER.info("Request Charge to add record for Request Charge = {} ", response.requestCharge());
T savedData = null;
try {
savedData = response.properties().getObject(klass);
} catch (IOException e) {
LOGGER.error("Exception occurred while parsing output to Pojo = {} ", e);
}
return savedData;
});
});
}

Below you can see the time taken is 8 times lesser compared to sequential insertions. In the log, you can also see threads are different because of parallel execution.

Output

14:57:34.504 [CosmosEventLoop-2–4] INFO c.a.c.c.s.DocumentAsyncAddService — Request Charge to add record for Request Charge = 9.83 
14:57:34.810 [CosmosEventLoop-2–6] INFO c.a.c.c.s.DocumentAsyncAddService — Request Charge to add record for Request Charge = 9.83
14:57:34.810 [CosmosEventLoop-2–3] INFO c.a.c.c.s.DocumentAsyncAddService — Request Charge to add record for Request Charge = 9.83
14:57:34.810 [CosmosEventLoop-2–4] INFO c.a.c.c.s.DocumentAsyncAddService — Request Charge to add record for Request Charge = 9.83
14:57:34.810 [CosmosEventLoop-2–7] INFO c.a.c.c.s.DocumentAsyncAddService — Request Charge to add record for Request Charge = 9.83
14:57:34.810 [CosmosEventLoop-2–1] INFO c.a.c.c.s.DocumentAsyncAddService — Request Charge to add record for Request Charge = 9.83
14:57:34.811 [CosmosEventLoop-2–4] INFO
14:58:09.312 [main] INFO c.a.c.c.s.DocumentAsyncAddService — Time taken : 76674 milliseconds.

Updating document in Azure Cosmos

For updating, you can use the upsert method provided by Azure SDK. Upsert basically update or insert. This is a more costly operation compare to create. If we say the cost of creating a new item is X than the cost of an upsert is almost X+3. This I observed while performing both create an upsert operation.

public static <T> Mono<T> addDocument(Mono<T> obj, Class<T> klass, Object partitionKey) {
return obj.flatMap(data -> {
CosmosItemRequestOptions cosmosItemRequestOptions = new CosmosItemRequestOptions();
cosmosItemRequestOptions.partitionKey(new PartitionKey(partitionKey));
return ProductClient.getInstance().upsertItem(data, cosmosItemRequestOptions)
.map(response -> {
LOGGER.info("Request Charge to add record for Request Charge = {} ", response.requestCharge());
T savedData = null;
try {
savedData = response.properties().getObject(klass);
} catch (IOException e) {
LOGGER.error("Exception occurred while parsing output to Pojo = {} ", e);
}
return savedData;
});
});
}

Below you can execution is happening in a single thread only and time taken is higher as well as the cost of the request.

Output

16:03:36.664 [CosmosEventLoop-2–5] INFO c.a.c.c.s.DocumentUpsertService — Request Charge to add record for Request Charge = 12.34 
16:03:36.665 [main] INFO c.a.c.c.s.DocumentUpsertService — Inserted = f449e56f86d15ce6f70d18f8eccbc5b7
16:03:36.973 [CosmosEventLoop-2–5] INFO c.a.c.c.s.DocumentUpsertService — Request Charge to add record for Request Charge = 12.34
16:03:36.973 [main] INFO c.a.c.c.s.DocumentUpsertService — Inserted = 5287fb389c9a681b128dfd5ec9c83458
16:03:37.294 [CosmosEventLoop-2–5] INFO c.a.c.c.s.DocumentUpsertService — Request Charge to add record for Request Charge = 12.34
16:03:37.295 [main] INFO c.a.c.c.s.DocumentUpsertService — Inserted = b693e968576023ab1ea538a1df3a5e38
16:03:37.542 [CosmosEventLoop-2–5] INFO c.a.c.c.s.DocumentUpsertService — Request Charge to add record for Request Charge = 12.34
16:03:37.542 [main] INFO c.a.c.c.s.DocumentUpsertService — Inserted = 89dbb4a57d48692f76f2954664c5ff52
16:03:37.546 [main] INFO c.a.c.c.s.DocumentUpsertService — Time taken : 609739 milliseconds.
16:03:37.547 [main] INFO c.a.c.c.s.DocumentParallelAddService — Time taken : 609748 milliseconds.

Updating document in Parallel in Azure Cosmos

public static <T> Mono<T> addDocument(Mono<T> obj, Class<T> klass, Object partitionKey) {
return obj.flatMap(data -> {
CosmosItemRequestOptions cosmosItemRequestOptions = new CosmosItemRequestOptions();
cosmosItemRequestOptions.partitionKey(new PartitionKey(partitionKey));
return ProductClient.getInstance().upsertItem(data, cosmosItemRequestOptions)
.map(response -> {
LOGGER.info("Request Charge to add record for Request Charge = {} ", response.requestCharge());
T savedData = null;
try {
savedData = response.properties().getObject(klass);
} catch (IOException e) {
LOGGER.error("Exception occurred while parsing output to Pojo = {} ", e);
}
return savedData;
});
});
}

Here time taken is quite less.

Output

16:07:42.421 [CosmosEventLoop-2–5] INFO c.a.c.c.s.DocumentParallelUpsertService — Request Charge to add record for Request Charge = 12.34 
16:07:42.421 [CosmosEventLoop-2–4] INFO c.a.c.c.s.DocumentParallelUpsertService — Request Charge to add record for Request Charge = 12.34
16:07:42.421 [CosmosEventLoop-2–2] INFO c.a.c.c.s.DocumentParallelUpsertService — Request Charge to add record for Request Charge = 12.34
16:07:42.421 [CosmosEventLoop-2–4] INFO c.a.c.c.s.DocumentParallelUpsertService — Request Charge to add record for Request Charge = 12.34
16:07:52.892 [main] INFO c.a.c.c.s.DocumentParallelAddService - Time taken : 71804 milliseconds.

Retrieval from Cosmos Database

If we want to retrieve documents present in the Cosmos database we need to know about 3 parameters in SDK.

enableCrossPartitionQueryThis parameter is to tell if query needs to run on a single partition or it should run on all Partitions. Basically this is set false when we send Partition Key for retrieving data from Cosmos.With true query takes more time because now it has to go to each partition and fetch data.FeedOptions options = new FeedOptions();
if(Objects.nonNull(partitionKey) && StringUtils.isNotEmpty(partitionKey)) {
options.partitionKey(new PartitionKey(partitionKey));
options.enableCrossPartitionQuery(false);
} else {
options.enableCrossPartitionQuery(true);
}
partitionKeyThis parameter is to tell on which partition it will search the data. Basically with this you have to set enableCrossPartitionQuery to false.FeedOptions options = new FeedOptions();
if(Objects.nonNull(partitionKey) && StringUtils.isNotEmpty(partitionKey)) {
options.partitionKey(new PartitionKey(partitionKey));
options.enableCrossPartitionQuery(false);
} else {
options.enableCrossPartitionQuery(true);
}
maxItemCountThis parameter tells Cosmos to give records equal to maxItemCount. If we set it to one it will fetch 1 record at a time and keep fetching till it fetches all records. This param needs to set less than Page Size. If you need 100 there is no point of setting it to 500. So this should be set with optimam value to avoid performance issues.FeedOptions options = new FeedOptions();
options.maxItemCount(500);

Query fetching all records

public static void fetch() {
List<Product> products = getCosmosDocuments("select * from products", Product.class, null)
.collectList().block();
LOGGER.info("Total Products = {} ", products.size());
}
public static <T> Flux<T> getCosmosDocuments(String cosmosSQL, Class<T> klass, String partitionKey) {
try {
FeedOptions options = new FeedOptions();
if(Objects.nonNull(partitionKey) && StringUtils.isNotEmpty(partitionKey)) {
options.partitionKey(new PartitionKey(partitionKey));
options.enableCrossPartitionQuery(false);
} else {
options.enableCrossPartitionQuery(true);
}
options.maxItemCount(500);
return ProductClient.getInstance().queryItems(cosmosSQL, options)
.map(response -> {
List<T> definitions = new ArrayList<>();
LOGGER.info("Request Charge to fetch record for query = {} Request Charge = {} ", cosmosSQL, response.requestCharge());
response.results().forEach(props -> {
T savedData = null;
try {
savedData = props.getObject(klass);
} catch (IOException e) {
LOGGER.error("Exception occurred while parsing output to Pojo = {} ", e);
}
if (Objects.nonNull(savedData)) {
definitions.add(savedData);
}
});
LOGGER.info("Data fetched = {} Request Charge ={} Continuation Token = {} ", definitions.size(),response.requestCharge(), response.continuationToken());
return definitions;
}).flatMap(Flux::fromIterable);
} catch (Exception exception) {
LOGGER.error("Exception occurred while fetching documents from Cosmos = {} ", exception);
}
return Flux.empty();
}

You can see that the above code fetched all records in batches. We did set max item count value to 500 so it is fetching 500 records from Azure Cosmos. Continuation token is used to fetch the next set of records. When we implement pagination using API calls that time API needs to send a continuation token to fetch the next set of records.

Output

18:12:48.893 [reactor-http-nio-6] INFO  c.a.c.c.s.DocumentRetrievalService - Request Charge to fetch record for query = select * from products Request Charge = 19.29 
18:12:48.928 [reactor-http-nio-6] INFO c.a.c.c.s.DocumentRetrievalService - Data fetched = 500 Request Charge =19.29 Continuation Token = {"token":"-RID:~JQ9OAOCCpVz5AQAAAAAAAA==#RT:1#TRC:500#ISV:2#IEO:65551","range":{"min":"","max":"FF"}}
18:12:51.745 [reactor-http-nio-8] INFO c.a.c.c.s.DocumentRetrievalService - Request Charge to fetch record for query = select * from products Request Charge = 19.56
18:12:51.753 [reactor-http-nio-8] INFO c.a.c.c.s.DocumentRetrievalService - Data fetched = 500 Request Charge =19.56 Continuation Token = {"token":"-RID:~JQ9OAOCCpVztAwAAAAAAAA==#RT:2#TRC:1000#ISV:2#IEO:65551","range":{"min":"","max":"FF"}}
18:12:54.671 [reactor-http-nio-2] INFO c.a.c.c.s.DocumentRetrievalService - Request Charge to fetch record for query = select * from products Request Charge = 19.71
18:12:54.677 [reactor-http-nio-2] INFO c.a.c.c.s.DocumentRetrievalService - Data fetched = 500 Request Charge =19.71 Continuation Token = {"token":"-RID:~JQ9OAOCCpVzhBQAAAAAAAA==#RT:3#TRC:1500#ISV:2#IEO:65551","range":{"min":"","max":"FF"}}
18:12:57.122 [reactor-http-nio-4] INFO c.a.c.c.s.DocumentRetrievalService - Request Charge to fetch record for query = select * from products Request Charge = 14.95
18:12:57.125 [reactor-http-nio-4] INFO c.a.c.c.s.DocumentRetrievalService - Data fetched = 365 Request Charge =14.95 Continuation Token = null
18:12:57.125 [main] INFO c.a.c.c.s.DocumentRetrievalService - Total Products = 1865
18:12:57.129 [main] INFO c.a.c.c.s.DocumentParallelAddService - Time taken : 17037 milliseconds.
  • *Issue with the above query is that it won’t stop fetching records till it fetches all records so if you have a requirement like fetching the first 500. There are a couple of ways to do this.

Query fetching a specific number of records

With the below approach, we will be able to fetch whatever size we give. If we give 100 it will fetch only 100 records because we set max item count to 100 and we are not iterating through Flux, we are getting the first element of Flux using next and processing it. With Flux it keeps processing till it fetches all records present in the database.

We can send a continuation token to subsequent request to fetch the next set of 100 records. We get a continuation in response which we need to send to the next request and that’s how we implement pagination.

public static void fetch() {
List<Product> products = getCosmosDocumentsMono("select * from products", Product.class, null, 100).block();
LOGGER.info("Total Products = {} ", products.size());
}
public static <T> Mono<List<T>> getCosmosDocumentsMono(String cosmosSQL, Class<T> klass, String partitionKey, Integer size) {
try {
FeedOptions options = new FeedOptions();
if(Objects.nonNull(partitionKey) && StringUtils.isNotEmpty(partitionKey)) {
options.partitionKey(new PartitionKey(partitionKey));
options.enableCrossPartitionQuery(false);
} else {
options.enableCrossPartitionQuery(true);
}
options.maxItemCount(size);
// options.requestContinuation("<set token retrieved from response>");
Flux<FeedResponse<CosmosItemProperties>> cosmosResponse = ProductClient.getInstance().queryItems(cosmosSQL, options);
return cosmosResponse.next().map(res -> {
List<T> definitions = new ArrayList<>();
res.results().forEach(props -> {
T savedData = null;
try {
savedData = props.getObject(klass);
} catch (IOException e) {
LOGGER.error("Exception occurred while parsing output to Pojo = {} ", e);
}
if (Objects.nonNull(savedData)) {
definitions.add(savedData);
}
});
LOGGER.info("Data fetched = {} Request Charge ={} Continuation Token = {} ", definitions.size(),res. requestCharge(), res.continuationToken());
return definitions;
});
} catch (Exception exception) {
LOGGER.error("Exception occurred while fetching documents from Cosmos = {} ", exception.getMessage());
}
return Mono.empty();
}

Output

18:15:13.260 [reactor-http-nio-6] INFO  c.a.c.c.s.DocumentRetrievalService - Data fetched = 100 Request Charge =5.75 Continuation Token = {"token":"-RID:~JQ9OAOCCpVxpAAAAAAAAAA==#RT:1#TRC:100#ISV:2#IEO:65551","range":{"min":"","max":"FF"}} 
18:15:13.263 [main] INFO c.a.c.c.s.DocumentRetrievalService - Total Products = 100
18:15:13.267 [main] INFO c.a.c.c.s.DocumentParallelAddService - Time taken : 7355 milliseconds.

Query fetching records using Offset and Limit

This is another way to fetch records using Offset and Limit.

public static void fetch() {
List<Product> products = getCosmosDocuments("select * from products offset 0 limit 750", Product.class, null)
.collectList().block();
LOGGER.info("Total Products = {} ", products.size());
}

public static <T> Flux<T> getCosmosDocuments(String cosmosSQL, Class<T> klass, String partitionKey) {
try {
FeedOptions options = new FeedOptions();
if(Objects.nonNull(partitionKey) && StringUtils.isNotEmpty(partitionKey)) {
options.partitionKey(new PartitionKey(partitionKey));
options.enableCrossPartitionQuery(false);
} else {
options.enableCrossPartitionQuery(true);
}
options.maxItemCount(500);
return ProductClient.getInstance().queryItems(cosmosSQL, options)
.map(response -> {
List<T> definitions = new ArrayList<>();
LOGGER.info("Request Charge to fetch record for query = {} Request Charge = {} ", cosmosSQL, response.requestCharge());
response.results().forEach(props -> {
T savedData = null;
try {
savedData = props.getObject(klass);
} catch (IOException e) {
LOGGER.error("Exception occurred while parsing output to Pojo = {} ", e);
}
if (Objects.nonNull(savedData)) {
definitions.add(savedData);
}
});
LOGGER.info("Data fetched = {} Request Charge ={} Continuation Token = {} ", definitions.size(),response.requestCharge(), response.continuationToken());
return definitions;
}).flatMap(Flux::fromIterable);
} catch (Exception exception) {
LOGGER.error("Exception occurred while fetching documents from Cosmos = {} ", exception);
}
return Flux.empty();
}

It fetched only 750 records only.

Output

18:24:47.135 [CosmosEventLoop-2–5] INFO c.a.c.c.s.DocumentRetrievalService — Request Charge to fetch record for query = select * from products offset 0 limit 750 Request Charge = 19.29 
18:24:47.170 [CosmosEventLoop-2–5] INFO c.a.c.c.s.DocumentRetrievalService — Data fetched = 500 Request Charge =19.29 Continuation Token = {“token”:”-RID:~JQ9OAOCCpVz5AQAAAAAAAA==#RT:1#TRC:500#ISV:2#IEO:65551",”range”:”{\”min\”:\”\”,\”max\”:\”FF\”,\”isMinInclusive\”:true,\”isMaxInclusive\”:false}”}
18:24:47.173 [CosmosEventLoop-2–5] INFO c.a.c.c.s.DocumentRetrievalService — Request Charge to fetch record for query = select * from products offset 0 limit 750 Request Charge = 10.71
18:24:47.178 [CosmosEventLoop-2–5] INFO c.a.c.c.s.DocumentRetrievalService — Data fetched = 250 Request Charge =10.71 Continuation Token = null
18:24:47.179 [main] INFO c.a.c.c.s.DocumentRetrievalService — Total Products = 750
18:24:47.183 [main] INFO c.a.c.c.s.DocumentParallelAddService — Time taken : 16183 milliseconds.

Pagination
You can implement pagination using 2 approaches but they all have pros and cons.
Continuation Token
This token is returned when we get response from Cosmos. We can send it again to send next set of records.
Offset & Limit
We send offset and limit in each query to fetch next set of records.

Query by Partition Key

public static void fetch() {
List<Product> products = getCosmosDocuments("select * from products", Product.class, "Arts & Crafts > Paper & Stickers")
.collectList().block();
LOGGER.info("Total Products = {} ", products.size());
}

public static <T> Flux<T> getCosmosDocuments(String cosmosSQL, Class<T> klass, String partitionKey) {
try {
FeedOptions options = new FeedOptions();
if(Objects.nonNull(partitionKey) && StringUtils.isNotEmpty(partitionKey)) {
options.partitionKey(new PartitionKey(partitionKey));
options.enableCrossPartitionQuery(false);
} else {
options.enableCrossPartitionQuery(true);
}
options.maxItemCount(500);
return ProductClient.getInstance().queryItems(cosmosSQL, options)
.map(response -> {
List<T> definitions = new ArrayList<>();
LOGGER.info("Request Charge to fetch record for query = {} Request Charge = {} ", cosmosSQL, response.requestCharge());
response.results().forEach(props -> {
T savedData = null;
try {
savedData = props.getObject(klass);
} catch (IOException e) {
LOGGER.error("Exception occurred while parsing output to Pojo = {} ", e);
}
if (Objects.nonNull(savedData)) {
definitions.add(savedData);
}
});
LOGGER.info("Data fetched = {} Request Charge ={} Continuation Token = {} ", definitions.size(),response.requestCharge(), response.continuationToken());
return definitions;
}).flatMap(Flux::fromIterable);
} catch (Exception exception) {
LOGGER.error("Exception occurred while fetching documents from Cosmos = {} ", exception);
}
return Flux.empty();
}

You can see even we use select * query but we got only 120 records because when we mention Partition key it searches query on that partition only. That’s why it is recommended to select a partition key which segregates data as per the requirement because it makes query very fast.

Output

18:29:51.738 [reactor-http-nio-6] INFO  c.a.c.c.s.DocumentRetrievalService - Request Charge to fetch record for query = select * from products Request Charge = 8.26 
18:29:51.770 [reactor-http-nio-6] INFO c.a.c.c.s.DocumentRetrievalService - Data fetched = 120 Request Charge =8.26 Continuation Token = null
18:29:51.772 [main] INFO c.a.c.c.s.DocumentRetrievalService - Total Products = 120
18:29:51.776 [main] INFO c.a.c.c.s.DocumentParallelAddService - Time taken : 7649 milliseconds.

Containers throughput

There is a way to check your containers throughput using the browse tab in the Containers section in Azure Cosmos.

When you click on the browser you will see below data

Source Code

One thing I realized is that if we add indexes, request charge increases also. In the above, if we don’t add unique key “productId” insert operation will take request charge = 8.19 and update will take request charge = 10.29 that is quite less. Ideally, we should have a partition key that is frequently used in queries. There is no limit on Logical partitions. All logical partitions map to physical partitions they might have some limit but Azure cosmos manages them internally. You should also check if your application is read or write-heavy on the basis of that also you can select partition keys.

Next Blog, I will tell you about the indexing policy in Azure Cosmos and how it plays an important role in RUs as well as performance.

I am leaving delete operation for you guys. Let me know if you face any problems.

Additional Azure Cosmos Links

Subscribe to FAUN topics and get your weekly curated email of the must-read tech stories, news, and tutorials 🗞️

Follow us on Twitter 🐦 and Facebook 👥 and Instagram 📷 and join our Facebook and Linkedin Groups 💬

If this post was helpful, please click the clap 👏 button below a few times to show your support for the author! ⬇

--

--