Batch processing in Cloudant NoSQL database using nodejs

An efficient way to do bulk processing of documents in the Cloudant database in a synchronized manner using nodejs

atch processing is a method of running high-volume, repetitive data jobs and it drives many actions behind the scenes. This includes applying updates, processing data, integrating data from multiple sources, data extraction, and many more. In this article, we will discuss how to write a batch process for the Cloudant NoSQL database using nodejs.

nodejs use an event-driven, asynchronous non-blocking I/O model. So, considering a scenario where thousands or even millions of records need to be processed or updated, it could impact database performance as it would almost simultaneously fire all the calls at once. In this article, we will discuss the ways to address this issue using nodejs synchronous mechanism along with Cloudant bulk API.

To demonstrate batch processing with an example lets replicate “movies-demodatabase listed in examples.cloudant.com.

Refer the example here to setup database replication

Scenario

For this article, we will use the “movies-demo” database and update all the documents which are older than the year 2000, with a new attribute Old_movie:true using nodejs.

Steps

  1. Create a view in the “movies-demo” Cloudant database to filter out the documents which are older than the year 2000.
  2. Write a nodejs script to add a new attribute Old_movie:true in all these documents selected by the view, in a synchronized manner.
  3. Improvise the node script using Cloudant Bulk API.

Step 1

Create a view in the “movies-demo” Cloudant NoSQL database to filter out the documents for processing.

Click on “New View” option on the plus symbol present in “All Documents” or “Design Documents”

Let's give view name as “get-old-movies” and design-doc name as “data” and paste the code as below.

function (doc) {
if(doc.Movie_year < 2000){
emit(doc.Movie_year, doc.Movie_name);
}
}

Click on the “Create Document and then Build Index” button.

Step 2:

Write a nodejs script to add a new attribute Old_movie:true in all these selected documents.

The script performs the below three functionalities:

  • Get the Cloudant database connection.
  • Get data from Cloudant view “get-old-movies” and add a new attribute to each document and push them to an array.
  • Iterate the array and update the database documents in a synchronized manner.

nodejs script with the synchronous Cloudant database update

const cloudant = require('@cloudant/cloudant');
const cloudantURL = 'https://account-name.cloudant.com/';
const userName = 'username';
const passwd = 'password';
const databaseName = 'movies-demo';

/*
* This function returns the database connection.
*/
let getConnection = () => {
return new Promise(function(resolve, reject) {
cloudant({
url : cloudantURL,
account : userName,
password : passwd
}, function(error, connection){
if (connection)
resolve (connection);
else
reject (error);
})
})
}

/*
* This function gets the data from the view 'get-old-movies'
* and insert a new attribute in each document.
*/
let fetchDataFromDB = (connection) => {
return new Promise(function(resolve, reject) {
let objectArray = [];
let dbName = connection.db.use (databaseName);
dbName.view('data','get-old-movies', {'include_docs':true}, (error,result) => {
if(result){
result.rows.forEach((document) => {
var dbDoc = document.doc;
/*
* add the new attribute 'Old_movie:true'
* to each document and push it to the array.
*/
dbDoc.Old_movie = true;
objectArray.push(dbDoc);
});
resolve (objectArray);
}else{
reject (error);
}
})
})
}

/*
* This function updates the document in 'movies-demo' database.
*/
let updateData = (documentObject, connection) => {
return new Promise(function(resolve,reject) {
let dbName = connection.db.use (databaseName);
dbName.insert(documentObject, function(err, doc) {
if(err) {
reject(err);
}else{
resolve('200');
}
});
})
}

/*
* async main method for executing function in a
* synchronous/blocking manner.
*/
async function main(){
let connection = await getConnection();
let dataArray = await fetchDataFromDB(dbConnection);
for(i in dataArray){
try{
/*
* Sequential execution for the document update.
*/
let status = await updateData(dataArray[i], connection);
if(status != '200'){
console.log(`Error : ${status}`);
break;
}
}catch(err) {
console.log(err);
}
}
console.log('>>> Finished !!');
}

/*
* Execution starts from here.
*/
main();

Note: This node script addresses the issues that arise due to the default non-blocking nature of nodejs. But wait! if every database update call will be executed in a sequential manner then wouldn’t it cause a significant delay? Yes, it would. In fact, the above script takes around 15 minutes to update 3680 documents, which is a significant time consumption !!

Is there a way to enhance the speed of execution? Yes, we could make use of the Cloudant bulk operations API. It is much more efficient to bulk update multiple documents in a single bulk request than sending them as a separate call. Now, let’s reprogram the above script to incorporate bulk operation, where we would target to bulk update 250 documents at once, but in a synchronized manner.

Step 3:

Revised nodejs script for updating documents using Cloudant Bulk API

const cloudant = require('@cloudant/cloudant');
const cloudantURL = 'https://account-name.cloudant.com/';
const userName = 'username';
const passwd = 'password';
const databaseName = 'movies-demo';

/*
* This function returns the database connection.
*/
let getConnection = () => {
return new Promise(function(resolve, reject) {
cloudant({
url : cloudantURL,
account : userName,
password : passwd
}, function(error, connection){
if (connection)
resolve (connection);
else
reject (error);
})
})
}

/*
* This function gets the data from the view 'get-old-movies'
* and insert a new attribute in each document.
*/
let fetchDataFromDB = (connection) => {
return new Promise(function(resolve, reject) {
let objectArray = [];
let dbName = connection.db.use (databaseName);
dbName.view('data','get-old-movies', {'include_docs':true}, (error,result) => {
if(result){
result.rows.forEach((document) => {
var dbDoc = document.doc;
/*
* add the new attribute 'Old_movie:true'
* to each document and push it to the array.
*/
dbDoc.Old_movie = true;
objectArray.push(dbDoc);
});
resolve (objectArray);
}else{
reject (error);
}
})
})
}

/*
* This function updates the document in 'movies-demo' database.
*/
let updateBulkData = (documentArray, connection) => {
return new Promise(function(resolve,reject) {
let dbName = connection.db.use (databaseName);
dbName.bulk(documentArray, function(err, doc) {
if(err) {
reject(err);
}else{
resolve('200');
}
});
})
}

/*
* async main method for executing function in
* a synchronous/blocking manner.
*/
async function main(){
let connection = await getConnection();
let dataArray = await fetchDataFromDB(dbConnection);
var i = 0;
const size = 250;
while (dataArray[i]) {
try{
/*
* Sequential execution with 250 documents
* updates per request.
*/
let status = await updateBulkData({docs: dataArray.splice(0, size)}, connection);
if(status != '200'){
console.log(`Error : ${status}`);
break;
}
}catch(err) {
console.log(err);
}
}
console.log('>>> Finished !!');
}

/*
* Execution starts from here.
*/
main();

Conclusion:

When I tested this modified script with Cloudant bulk API, it was able to update the same 3680 number of documents in 10 seconds! compared to the earlier script which took around 15 minutes. This modified nodejs script brings more efficiency and performance when it comes to batch processing.

Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store