Click here to Skip to main content
65,938 articles
CodeProject is changing. Read more.
Articles / IoT

Using Amazon Kinesis and Amazon Dynamodb on the UPSquared Board

5.00/5 (1 vote)
15 Dec 2017CPOL10 min read 6.9K  
This guide demonstrates steps to communicate with Amazon Kinesis and DynamoDB on Ubuntu using the UPSquared and a Gigabyte gateway.

This article is for our sponsors at CodeProject. These articles are intended to provide you with information on products and services that we consider useful and of value to developers

Introduction

This guide demonstrates steps to communicate with Amazon Kinesis* and DynamoDB* on Ubuntu* using the UPSquared* and a Gigabyte* gateway. The UP Squared pushes (sends) information and the gateway pulls (receives) the information for further analysis.

From the example shown in this article, you’ll learn to use both the UP Squared and a Gigabyte gateway GB-BXTB-3825 platforms to:

  • Create a Cognito* parameter
  • Describe to the Amazon Kinesis stream
  • Put record, get record and monitor the Amazon Kinesis
  • How to pull data from Kinesis stream and store them in DynamoDB* cloud database for later analysis

About the UP Squared Board

Characterized by low power consumption and high performance ideal for the Internet of Things (IoT), the UPSquared platform is the fastest x86 maker board based on the Apollo Lake platform from Intel. It contains both a Intel Celeron® processor Dual Core N3350 and Intel® Pentium® brand Quad Core N4200 processor.

Operating System Compatibility

The UPSquared platform can run Ubilinux*, Ubuntu*, Windows® 10 IoT Core, Windows® 10, Yocto Project*, or Android* Marshmallow operating systems. For more information on UPSquared, visit http://www.up-board.org/upsquared. Refer to http://b2b.gigabyte.com/Embedded-Computing/GB-BXBT-3825-rev-10#ov for details on Intel® IoT Gateway GB-BXTB-3825.

 

Hardware Components

The hardware components used in this project are listed below:

Development Boards

Characterized by low power consumption and high performance ideal for the Internet of Things (IoT), the UP Squared platform is a fast prototyping board based on Intel’s Apollo Lake platform. It contains both Intel® Celeron® Dual Core N3350 and Intel® Pentium® Quad Core N4200 processors.

UP Squared Platform

Before you begin, the Ubuntu* operating system should be installed on the UP Squared platform.

1. To ensure that the Ubuntu operating system is up to date and dependent Ubuntu packages are installed, open a command prompt (terminal) and type the following:

 

JavaScript
sudo apt-get update

2. Install the npm (Node Version Manager) package by typing the following command in the terminal:

JavaScript
sudo apt-get install npm

3. AWS* is an open-source code that provides a JavaScript* library for many AWS services including Kinesis stream, DynamoDB, and more. To begin using AWS with the AWS SDK for JavaScript* in Node.js on the UP Squared platform, install the AWS SDK JavaScript* library and documentation on the Ubuntu* terminal:

JavaScript
npm install aws-sdk

4. Node.js* is an open-source, cross-platform JavaScript* runtime environment:

JavaScript
sudo apt-get install node.js

Gigabyte* Gateway GB-BXTB-3825 Platform

Be sure Ubuntu* is installed on the GB-BXTB-3825 gateway platform.
Similarly, ensure the Ubuntu operating system is up to date, install dependencies, npm, aws-sdk, and Node.js* packages. The gateway pulls packages from the Kinesis stream and stores them in the DynamoDB table.

Create Amazon Kinesis* Stream

The Amazon Kinesis Stream enables real time data to stream on a large scale and makes the data available for consumption. The data in the stream are the ordered sequence of data records. The data records in the stream are split into multiple shards. Splitting data into multiple shards is a technique to spread out the load.
Let’s start by logging into Amazon Web Services (AWS)* Management Console, opening Amazon* Kinesis console, choosing a region, then selecting Go to the Stream console.

1. Select Create Kinesis stream

2. Choose a name for your Kinesis stream and the number of shards.

3. When you see a message notifying you that your stream has been created and the status is “ACTIVE”, the stream is ready for use.

Create Amazon DynamoDB* Table

Amazon DynamoDB is a fully managed cloud service. Its flexible data management makes it great for Internet of Things applications. With the Amazon DynamoDB, we can store the data being broadcast from the UP Squared platform and analyze the data for specific needs.
1. To create a DynamoDB table, open a DynamoDB console and click Create table. The example below creates a DynamoDB table named “TinkSmartPegTable” with two attributes PegID and FiveBytesData typed String.

2. Click Create table to create the DynamoDB table.

3. Navigate to the Items tab of the TinkSmartPegTable that you just created, click on the Create item to create more items.

4. Select Insert from the more list on the Create item form

5. Select String from the type drop-down list.

6. Enter the name and value of the first item, enter the value of the second items, then select Save.

7. The TinkSmartPegTable now has been created and contains two items: PegID and FiveBytesData.

Create Policy

An IAM explicitly lists actions which are allowed and the resources on which the actions are applicable.
1. From the IAM console, choose Policies on the left column, and then choose Create Policy.

2. In the Create Policy form, choose Policy Generator.

3. Fill out policy permissions as follows:

  • AWS Service: Amazon Kinesis
  • Actions: Select DescribeStream, GetShardIterator, GetRecords, PutRecord, and PutRecords as the allowed actions.
  • Amazon Resource Name (ARN): arn:aws:kinesis:us-east-1:111548290563:stream/TinkKinesisStream where us-east-1 is the region, 111548290563 is the AWS account ID, TinkKinesisStream is the Kinesis stream name.

To get Kinesis stream ARN, on the Kinesis console, select the stream name, then go to Stream on the column on the left. Copy the Stream ARN.

4. Fill out the AWS Service, Action, and ARN on the Edit Permissions form.

5. Click Add Statement on the Edit Permissions form.

 

6. Fill out the action and ARN for Amazon DynamoDB.

  • AWS Service: Amazon DynamoDB
  • Actions: Select All Actions as the allowed actions.
  • Amazon Resource Name (ARN): arn:aws:kinesis:us-east-1:111548290563:stream/TinkKinesisStream where us-east-1 is the region, 111548290563 is the AWS account ID, TinkKinesisStream is the DynamoDB name.

To get DynamoDB ARN, go to DynamoDB console, click on the Tables on the column on the left, select the DynamoDB table name TinkSmartPegTable, you should see the DynamoDB ARN on the Overview tab.

7. Fill out the AWS Service, Action, and ARN on the Edit Permissions form, then click on Add Statement.

8. Click Add Statement on the Edit Permissions form.

9. Fill out the action and ARN for Amazon DynamoDB.

  • AWS Service: Amazon DynamoDB
  • Actions: Select All Actions as the allowed actions.
  • Amazon Resource Name (ARN): arn:aws:kinesis:us-east-1:111548290563:stream/TinkKinesisStream where us-east-1 is the region, 111548290563 is the AWS account ID, TinkKinesisStream is the DynamoDB name.

To get DynamoDB ARN, go to DynamoDB console, click on the Tables on the column on the left, select the DynamoDB table name TinkSmartPegTable, you should see the DynamoDB ARN on the Overview tab.

10. Fill out the AWS Service, Action, and ARN on the Edit Permissions form, then click on Add Statement.

11. Continue with the Next Step.

12. Edit the policy name then select Create Policy.

If you see the message “TinkKinesisDynamoDBPolicy has been created”, the new policy was created successfully.

Amazon Cognito*

Amazon Cognito is a web service to specify the temporary security credentials to UP Squared and Gateway GB-BXTB-3825 devices to access your Kinesis stream and DynamoDB. The Amazon Cognito parameters are used to initialize the Cognito Credentials object.

1. Navigate to the Cognito console, then select Manage Federated Identities.

2. Enter identity pool name, check Enable access to unauthenticated identities.

3. Select Allow.

4. In the Getting started with Amazon Cognito form, click on Edit identity pool on the top right corner.

5. Click on Cognito Stream to expand.

6. Select TinkKinesisStream for stream name and enabled stream status, then Save Changes.

Attach Policies

1. Open IAM console, choose Roles, and then select Cognito_TinkPegIdentityPoolUnauth_Role.

2. Click on Attach policy.

3. Select TinkKinesisStreamPolicy and click on Attach policy.

4. Grant full DynamoDB access for Cognito_TinkPegIdentityUnauth_Role so that Cognito_TinkPegIdentityUnauth_Role is authorized to perform dynamodb:ListTables. Click on Attach policy again to attach to AmazonDynamoDBFullAccess.

5. Select AmazonDynamoDBFullAccess, then click on Attach policy.

6. Now Cognito_TinkPegIdentityPoolUnauth_Role attached to both TinkKinesisDynamoDBPolicy and AmazonDynamoDBFullAccess.

 

Sample Code

Creating Cognito* Parameters for Amazon Kinesis* Stream

Amazon Cognito* is a web service to specify the temporary security credentials to mobile devices. The Amazon Cognito parameters are used to initialize the Cognito Credentials object.

1. Create the Cognito parameters for the Amazon Kinesis Stream as follow:

  • AccountId: From the Account Name on the upper right corner, go to My Account.
  • IdentityPoolId: To get Identity pool ID for code cognitoParams, navigate to Cognito console, then click Manage Federated Identities, select TinkPegIdentityPool, then click on Edit identity pool on the upper right corner. The Identity pool ID is on the Edit identity pool form.

 

2. Select TinkPegIdentityPool.

3. Click on Edit identity pool.

JavaScript
var awsRegion = "us-east-1";
var streamName = TinkKinesisStream; 		// Kinesis stream
var tableName = " TinkSmartPegTable";

// Cognito parameters for Kinesis Stream
var cognitoParams = {
    AccountId: "111548290563",
    IdentityPoolId: "us-east-1:ab55e2b3-36b6-4e88-aa9a-38f6d437d89e"
};

Code Example 1: Cognito parameters for Amazon Kinesis stream

Creating Credentials Object

Now you are ready to write a sample application using the Amazon Cognito Identify* service to create a credentials object.

JavaScript
var AWS = require('aws-sdk');
AWS.config.region = awsRegion;
AWS.config.credentials = new AWS.CognitoIdentityCredentials(cognitoParams);
AWS.config.credentials.get(function(err) {
    if (!err) {
        console.log("Cognito Identity Id: " + AWS.config.credentials.identityId);
    }
    else {
        console.log("Error in Cognito identiy credentials.");
        console.log(err);
    }
});

Code Example 2: Creating a new credentials object

DescribeStream

DescribeStream returns the current status of the stream, the Amazon resource names (ARN), an array of shard objects, and states whether there are more shards available. The example below describe to the Amazon Kinesis stream to get the available shard.

JavaScript
kinesis.describeStream(describeParams, function(err, data) {
  if (err) {
      console.log(err, err.stack); // an error occurred
  }
  else {
        console.log("describeStream data");
        console.log(data.StreamDescription.Shards[2].ShardId);
  }
})

Code Example 3: DescribeStream

PutRecord

The sample code below put a record in the JSON* format into the Amazon Kinesis stream 'TinkKinesisStream'. The record contains two data typed string: the MAC address and the five-byte value. For more information about PutRecord, go to http://docs.aws.amazon.com/kinesis/latest/APIReference/API_PutRecord.html.

JavaScript
function putRecordsToKinesisStream (){
    var json = {
        "PegID": "FF.FF.FF.FF.FF",
        "FiveBytesData": "0x1212121212",
    };

    var params = {
        Data: JSON.stringify(json),
        PartitionKey: partitionKey,
        StreamName: streamName
    };

    kinesis.putRecord(params, function(err, putData) {
        if (err) {
            console.log(err, err.stack);
        } else {
            try {
                sharId.value = putData.ShardId;
            } catch(err) {
                console.log("Error in Kinesis.putRecord()");
      	        console.log(err);
            }
        }
    });
};

Code Example 4: putRecord

getRecord

Before getRecords, we call getShardIterator to specify the name of the Kinesis stream, how to read the data records, and position in the shard from which to start reading records sequentially. For details description of getShardIterator, see http://docs.aws.amazon.com/kinesis/latest/APIReference/API_GetShardIterator.html.

JavaScript
// Describe stream parameters
var describeParams = {
    StreamName: streamName
};

kinesis.describeStream(describeParams, function(err, data) {
    if (err) {
      console.log(err, err.stack); // an error occurred
    }
    else {
        var getParams = {
            ShardId: data.StreamDescription.Shards[2].ShardId,
            ShardIteratorType: "TRIM_HORIZON",   // get oldest package
            StreamName: streamName,
        };

        kinesis.getShardIterator(getParams, function(err, result) {
            if (err) {
                 console.log("Error in getShardIterator()");
                 console.log(err);
            } else {
                // Get records from the Kinesis stream
                getRecord(result.ShardIterator);
            }
        });
    }
});

Code Example 5: getShardIterator

Now we can GetRecords from the Amazon Kinesis stream TinkKinesisStream. For more information about GetRecords, see http://docs.aws.amazon.com/kinesis/latest/APIReference/API_GetRecords.html.

JavaScript
function getRecord(shard_iterator) {

    var getRecParams = {
        ShardIterator: shard_iterator
    };

    kinesis.getRecords(getRecParams, function(err, result) {
        if (err) {
            console.log("Error in getRecords() from the Kinesis stream.");
            console.log(err);
        } else {
            try {
                var StringDecoder = require('string_decoder').StringDecoder;
                var decoder = new StringDecoder('utf8');

                // If there are packages from getRecords()
                if(result.Records.length > 0) {
                    // Loop through all the packages
                    …
                }
            } catch(err) {
                console.log("Error parsing the package.");
                console.log(err);
            }

            if (result.NextShardIterator) {
                getRecord(result.NextShardIterator);
            }
        }
    });
}

Code Example 6: getRecords

Amazon DynamoDB*

Editing Policies

The IAM policies allow us to specify any API action from any service that supports IAM. The policies below specify API actions for both Amazon Kinesis stream and DynamoDB that used in the example code throughout document. Edit the policies to add or remove the allowed actions on the Kinesis stream or DynamoDB.

JavaScript
{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Sid": "Stmt1505944340000",
            "Effect": "Allow",
            "Action": [
                "kinesis:DescribeStream",
                "kinesis:GetShardIterator",
                "kinesis:GetRecords",
                "kinesis:PutRecord",
                "kinesis:PutRecords"
            ],
            "Resource": [
                "arn:aws:kinesis:us-east-1:111548290563:stream/TinkKinesisStream"
            ]
        },
        {
            "Sid": "Stmt1505944397000",
            "Effect": "Allow",
            "Action": [
                "dynamodb:*"
            ],
            "Resource": [
                "arn:aws:dynamodb:us-east-1:111548290563:table/TinkSmartPegTable"
            ]
        }
    ]
}

Figure 7: Policies for Kinesis stream and DynamoDB

Calling listTable

listTable returns an array of the Amazon DynamoDB table names that previously created. We call listTable to get the available Amazon DynamoDB table name.

JavaScript
var db = new AWS.DynamoDB();
var dbParams = {};

// If the DynamoDB table exists, get the DynamoDB table name.
db.listTables(dbParams , function(err, data) {
    if (err) {
        console.log("Error in dynamoDb.listTables(): ");
        console.log(err, err.stack); // an error occurred
        return;
    }
    else {
        tableStr.value = data.TableNames;
        console.log(tableStr.value.length);
    }
});

Code Example 7: listTables

Calling putItem

The function below receives the package from the Amazon Kinesis stream and stores it in the Amazon DynamoDB table.

JavaScript
putItem = function(tableName, pegId, fiveBytes) {
    // The item has two attributes: PegID type string and FiveBytesData type String
    var item = {
        'PegID': { 'S': pegId },
        'FiveBytesData': { 'S': fiveBytes }
    };

    var putItemParams = {
        TableName: tableName,
        Item: item
    };

    // Store the package in the DynamoDB
    db.putItem(putItemParams, function(err, data) {
    if (err) {
        console.log("Got error: ");
        console.log(err, err.stack); // an error occurred
    } else {
        console.log(JSON.stringify(data, null, 2));
    }
  });
};

Code Example 8: putItem

Example Sketches

The following example sketches show how to repeatedly read the Kinesis stream, get the records from the Amazon Kinesis stream and store them in the Amazon DynamoDB. Both of the examples below are developed and executed on the UP Squared and Gigabyte Gateway GB-BXTB-3825 platforms. Ensure both platforms are connected to network before you execute.

Kinesis stream putRecord example

JavaScript
var awsRegion = "us-east-1";
var streamName = 'TinkKinesisStream'; // Kinesis stream
var tableName = "TinkSmartPegTable";
var cognitoParams = {
    AccountId: "111548290563",
    IdentityPoolId: "us-east-1:ab55e2b3-36b6-4e88-aa9a-38f6d437d89e"
};

var AWS = require('aws-sdk'); //package for AWS
AWS.config.region = awsRegion;
AWS.config.credentials = new AWS.CognitoIdentityCredentials(cognitoParams);
AWS.config.credentials.get(function(err) {
    if (!err) {
        console.log("Cognito Identity Id: " + AWS.config.credentials.identityId);
    }
    else {
        console.log(err);
    }
});

var kinesis = new AWS.Kinesis({apiVersion: '2013-12-02'});

function sharIdObj() {
    this.value = "";
}
var sharId = new sharIdObj();

function gateWayPackageObj() {
    this.value = "";
}
var package = new gateWayPackageObj();
var describeParams = {
        StreamName: streamName
    };

kinesis.describeStream(describeParams, function(err, data) {
  if (err) {
      console.log(err, err.stack); // an error occurred
  }
  else {
        console.log("describeStream data");
        console.log(data);           // successful response
  }
})

setInterval(putRecordsToKinesisStream, 5000);

////////////////////////////////////////////////////////////////////////////////////////////////
// Function Name: putRecordsToKinesisStream()
// This function puts the package into the Kinesis stream. The Kinesis stream is pre-created.
// Parameters: NA
////////////////////////////////////////////////////////////////////////////////////////////////

function putRecordsToKinesisStream (){

    var json = {
        "PegID": "FF.FF.FF.FF.FF",
        "FiveBytesData": "0x1212121212",
    };

    var params = {
        Data: JSON.stringify(json),
        PartitionKey: partitionKey,
        StreamName: streamName
    };

    kinesis.putRecord(params, function(err, putData) {
        if (err) {
            console.log(err, err.stack);
        } else {
            try {
                sharId.value = putData.ShardId;
            } catch(err) {
                console.log("Error in Kinesis.putRecord()");
      	        console.log(err);
            }
        }
    });
};

Code Example 9: TinkGatewayToCloud.js Example

Continuously read Kinesis stream example

JavaScript
var awsRegion = "us-east-1";
var streamName = 'TinkKinesisStream'; // Kinesis stream
var tableName = "TinkSmartPegTable";

// Cognito parameters for Kinesis Stream and DynamoDB
var cognitoParams = {
    AccountId: "111548290563",
    IdentityPoolId: "us-east-1:ab55e2b3-36b6-4e88-aa9a-38f6d437d89e"
};
var AWS = require('aws-sdk');       			//package for AWS
AWS.config.region = awsRegion;
AWS.config.credentials = new AWS.CognitoIdentityCredentials(cognitoParams);
AWS.config.credentials.get(function(err) {
    if (!err) {
        console.log("Cognito Identity Id: " + AWS.config.credentials.identityId);
    }
    else {
        console.log("Error in Cognito identiy credentials.");
        console.log(err);
    }
});

var kinesis = new AWS.Kinesis({apiVersion: '2013-12-02'});
var db = new AWS.DynamoDB();
var dbParams = {};

// Object for the DynamoDB table name
function tableObj() {
    this.value = "";
}
var tableStr = new tableObj();

// If the DynamoDB table exists, get the DynamoDB table name.
db.listTables(dbParams , function(err, data) {
    if (err) {
        console.log("Error in dynamoDb.listTables(): ");
        console.log(err, err.stack);			 // an error occurred
        return;
    }
    else {
        tableStr.value = data.TableNames;
        console.log(tableStr.value.length);
    }
});

// Describe stream parameters
var describeParams = {
    StreamName: streamName
};

kinesis.describeStream(describeParams, function(err, data) {
    if (err) {
      console.log(err, err.stack);			 // an error occurred
    }
    else {
        var getParams = {
            ShardId: data.StreamDescription.Shards[4].ShardId,
            ShardIteratorType: "TRIM_HORIZON",   		// get oldest package
            StreamName: streamName,
        };

        kinesis.getShardIterator(getParams, function(err, result) {
            if (err) {
                 console.log("Error in getShardIterator()");
                 console.log(err);
            } else {
                // Get records from the Kinesis stream
                getRecord(result.ShardIterator);
            }
        });
    }
});

////////////////////////////////////////////////////////////////////////////////////////////////
// Function Name: getRecord
// This function gets records from the kinesis stream and then puts the item into the
// DynamoDB.
//
// Parameters:
//     - shard_iterator: The shard position from which to start reading data records
//                                    sequentiallly.
////////////////////////////////////////////////////////////////////////////////////////////////
function getRecord(shard_iterator) {
    var getRecParams = {
        ShardIterator: shard_iterator
    };

    kinesis.getRecords(getRecParams, function(err, result) {
        if (err) {
            console.log("Error in getRecords() from the Kinesis stream.");
            console.log(err);
        } else {
            try {
                var StringDecoder = require('string_decoder').StringDecoder;
                var decoder = new StringDecoder('utf8');

                // If there are packages from getRecords()
                if(result.Records.length > 0) {
                    // Loop through all the packages
                    for(var i = 0; i < result.Records.length; i++) {
                        if(result.Records[i] != undefined) {
                            var getData = JSON.parse(decoder.write(result.Records[i].Data));

                            console.log("PegId =");
                            console.log(getData.PegID);

                            // Put the package into the DynamoDB table
                            putItem(tableStr.value[0], getData.PegID, getData.FiveBytesData);
                        }
                    }
                }
            } catch(err) {
                console.log("Error parsing the package.");
                console.log(err);
            }

            if (result.NextShardIterator) {
                getRecord(result.NextShardIterator);
            }
        }
    });
}

////////////////////////////////////////////////////////////////////////////////////////////////
// Function Name: putItem
// This function
//     - Creates an item using the pass in parameters: pegId and fiveBytes
//     - Calls dynamoDb.putItem() to put the item into the tableName
// Parameters:
//     - tableName: The name of the DynamoDB
//     - pegId:     UUID of the peg
//     - fiveBytes: The stock information of the peg
////////////////////////////////////////////////////////////////////////////////////////////////
putItem = function(tableName, pegId, fiveBytes) {
    // The item has two attributes: PegID type string and FiveBytesData type String
    var item = {
        'PegID': { 'S': pegId },
        'FiveBytesData': { 'S': fiveBytes }
    };

    var putItemParams = {
        TableName: tableName,
        Item: item
    };

    // Store the package in the DynamoDB
    db.putItem(putItemParams, function(err, data) {
    if (err) {
        console.log("Got error: ");
        console.log(err, err.stack); // an error occurred
    } else {
        console.log(JSON.stringify(data, null, 2));
    }
  });
};

Code Example 10: TinkGetPackageFromCloud.js Example

The UP Squared will continuously push packages into the Kinesis stream.

The gateway will continuously pull packages from Kinesis stream and storing them into the DynamoDB table.

Summary

We have described how to use Amazon Kinesis stream and DynamoDB* on UP Squared and Gigabyte gateway GB-BXTB-3825 platforms using JavaScript* on the Ubuntu* operating system. The UP Squared continuously pushes packages into the Kinesis stream and the Gateway GB-BXTB-3825 continuously pulls packages from Kinesis stream then stores them into the cloud for further analysis. We recommend you try out different Amazon Web Services with these platforms.

References

License

This article, along with any associated source code and files, is licensed under The Code Project Open License (CPOL)