Batch processing in Cloudant NoSQL database using nodejs

An efficient way to do bulk processing of documents in the Cloudant database in a synchronized manner using nodejs

Scenario

Steps

Step 1

function (doc) {
if(doc.Movie_year < 2000){
emit(doc.Movie_year, doc.Movie_name);
}
}

Step 2:

nodejs script with the synchronous Cloudant database update

const cloudant = require('@cloudant/cloudant');
const cloudantURL = 'https://account-name.cloudant.com/';
const userName = 'username';
const passwd = 'password';
const databaseName = 'movies-demo';

/*
* This function returns the database connection.
*/
let getConnection = () => {
return new Promise(function(resolve, reject) {
cloudant({
url : cloudantURL,
account : userName,
password : passwd
}, function(error, connection){
if (connection)
resolve (connection);
else
reject (error);
})
})
}

/*
* This function gets the data from the view 'get-old-movies'
* and insert a new attribute in each document.
*/
let fetchDataFromDB = (connection) => {
return new Promise(function(resolve, reject) {
let objectArray = [];
let dbName = connection.db.use (databaseName);
dbName.view('data','get-old-movies', {'include_docs':true}, (error,result) => {
if(result){
result.rows.forEach((document) => {
var dbDoc = document.doc;
/*
* add the new attribute 'Old_movie:true'
* to each document and push it to the array.
*/
dbDoc.Old_movie = true;
objectArray.push(dbDoc);
});
resolve (objectArray);
}else{
reject (error);
}
})
})
}

/*
* This function updates the document in 'movies-demo' database.
*/
let updateData = (documentObject, connection) => {
return new Promise(function(resolve,reject) {
let dbName = connection.db.use (databaseName);
dbName.insert(documentObject, function(err, doc) {
if(err) {
reject(err);
}else{
resolve('200');
}
});
})
}

/*
* async main method for executing function in a
* synchronous/blocking manner.
*/
async function main(){
let connection = await getConnection();
let dataArray = await fetchDataFromDB(dbConnection);
for(i in dataArray){
try{
/*
* Sequential execution for the document update.
*/
let status = await updateData(dataArray[i], connection);
if(status != '200'){
console.log(`Error : ${status}`);
break;
}
}catch(err) {
console.log(err);
}
}
console.log('>>> Finished !!');
}

/*
* Execution starts from here.
*/
main();

Step 3:

const cloudant = require('@cloudant/cloudant');
const cloudantURL = 'https://account-name.cloudant.com/';
const userName = 'username';
const passwd = 'password';
const databaseName = 'movies-demo';

/*
* This function returns the database connection.
*/
let getConnection = () => {
return new Promise(function(resolve, reject) {
cloudant({
url : cloudantURL,
account : userName,
password : passwd
}, function(error, connection){
if (connection)
resolve (connection);
else
reject (error);
})
})
}

/*
* This function gets the data from the view 'get-old-movies'
* and insert a new attribute in each document.
*/
let fetchDataFromDB = (connection) => {
return new Promise(function(resolve, reject) {
let objectArray = [];
let dbName = connection.db.use (databaseName);
dbName.view('data','get-old-movies', {'include_docs':true}, (error,result) => {
if(result){
result.rows.forEach((document) => {
var dbDoc = document.doc;
/*
* add the new attribute 'Old_movie:true'
* to each document and push it to the array.
*/
dbDoc.Old_movie = true;
objectArray.push(dbDoc);
});
resolve (objectArray);
}else{
reject (error);
}
})
})
}

/*
* This function updates the document in 'movies-demo' database.
*/
let updateBulkData = (documentArray, connection) => {
return new Promise(function(resolve,reject) {
let dbName = connection.db.use (databaseName);
dbName.bulk(documentArray, function(err, doc) {
if(err) {
reject(err);
}else{
resolve('200');
}
});
})
}

/*
* async main method for executing function in
* a synchronous/blocking manner.
*/
async function main(){
let connection = await getConnection();
let dataArray = await fetchDataFromDB(dbConnection);
var i = 0;
const size = 250;
while (dataArray[i]) {
try{
/*
* Sequential execution with 250 documents
* updates per request.
*/
let status = await updateBulkData({docs: dataArray.splice(0, size)}, connection);
if(status != '200'){
console.log(`Error : ${status}`);
break;
}
}catch(err) {
console.log(err);
}
}
console.log('>>> Finished !!');
}

/*
* Execution starts from here.
*/
main();

Conclusion:

Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store