Introduction
This guide demonstrates steps to communicate with Amazon Kinesis* and DynamoDB* on Ubuntu* using the UPSquared* and a Gigabyte* gateway. The UP Squared pushes (sends) information and the gateway pulls (receives) the information for further analysis.
From the example shown in this article, you’ll learn to use both the UP Squared and a Gigabyte gateway GB-BXTB-3825 platforms to:
- Create a Cognito* parameter
- Describe to the Amazon Kinesis stream
- Put record, get record and monitor the Amazon Kinesis
- How to pull data from Kinesis stream and store them in DynamoDB* cloud database for later analysis
About the UP Squared Board
Characterized by low power consumption and high performance ideal for the Internet of Things (IoT), the UPSquared platform is the fastest x86 maker board based on the Apollo Lake platform from Intel. It contains both a Intel Celeron® processor Dual Core N3350 and Intel® Pentium® brand Quad Core N4200 processor.
Operating System Compatibility
The UPSquared platform can run Ubilinux*, Ubuntu*, Windows® 10 IoT Core, Windows® 10, Yocto Project*, or Android* Marshmallow operating systems. For more information on UPSquared, visit http://www.up-board.org/upsquared. Refer to http://b2b.gigabyte.com/Embedded-Computing/GB-BXBT-3825-rev-10#ov for details on Intel® IoT Gateway GB-BXTB-3825.
Hardware Components
The hardware components used in this project are listed below:
Development Boards
Characterized by low power consumption and high performance ideal for the Internet of Things (IoT), the UP Squared platform is a fast prototyping board based on Intel’s Apollo Lake platform. It contains both Intel® Celeron® Dual Core N3350 and Intel® Pentium® Quad Core N4200 processors.
UP Squared Platform
Before you begin, the Ubuntu* operating system should be installed on the UP Squared platform.
1. To ensure that the Ubuntu operating system is up to date and dependent Ubuntu packages are installed, open a command prompt (terminal) and type the following:
sudo apt-get update
2. Install the npm (Node Version Manager) package by typing the following command in the terminal:
sudo apt-get install npm
3. AWS* is an open-source code that provides a JavaScript* library for many AWS services including Kinesis stream, DynamoDB, and more. To begin using AWS with the AWS SDK for JavaScript* in Node.js on the UP Squared platform, install the AWS SDK JavaScript* library and documentation on the Ubuntu* terminal:
npm install aws-sdk
4. Node.js* is an open-source, cross-platform JavaScript* runtime environment:
sudo apt-get install node.js
Gigabyte* Gateway GB-BXTB-3825 Platform
Be sure Ubuntu* is installed on the GB-BXTB-3825 gateway platform.
Similarly, ensure the Ubuntu operating system is up to date, install dependencies, npm, aws-sdk, and Node.js* packages. The gateway pulls packages from the Kinesis stream and stores them in the DynamoDB table.
Create Amazon Kinesis* Stream
The Amazon Kinesis Stream enables real time data to stream on a large scale and makes the data available for consumption. The data in the stream are the ordered sequence of data records. The data records in the stream are split into multiple shards. Splitting data into multiple shards is a technique to spread out the load.
Let’s start by logging into Amazon Web Services (AWS)* Management Console, opening Amazon* Kinesis console, choosing a region, then selecting Go to the Stream console.
1. Select Create Kinesis stream
2. Choose a name for your Kinesis stream and the number of shards.
3. When you see a message notifying you that your stream has been created and the status is “ACTIVE”, the stream is ready for use.
Create Amazon DynamoDB* Table
Amazon DynamoDB is a fully managed cloud service. Its flexible data management makes it great for Internet of Things applications. With the Amazon DynamoDB, we can store the data being broadcast from the UP Squared platform and analyze the data for specific needs.
1. To create a DynamoDB table, open a DynamoDB console and click Create table. The example below creates a DynamoDB table named “TinkSmartPegTable” with two attributes PegID and FiveBytesData typed String.
2. Click Create table to create the DynamoDB table.
3. Navigate to the Items tab of the TinkSmartPegTable that you just created, click on the Create item to create more items.
4. Select Insert from the more list on the Create item form
5. Select String from the type drop-down list.
6. Enter the name and value of the first item, enter the value of the second items, then select Save.
7. The TinkSmartPegTable now has been created and contains two items: PegID and FiveBytesData.
Create Policy
An IAM explicitly lists actions which are allowed and the resources on which the actions are applicable.
1. From the IAM console, choose Policies on the left column, and then choose Create Policy.
2. In the Create Policy form, choose Policy Generator.
3. Fill out policy permissions as follows:
- AWS Service: Amazon Kinesis
- Actions: Select DescribeStream, GetShardIterator, GetRecords, PutRecord, and PutRecords as the allowed actions.
- Amazon Resource Name (ARN): arn:aws:kinesis:us-east-1:111548290563:stream/TinkKinesisStream where us-east-1 is the region, 111548290563 is the AWS account ID, TinkKinesisStream is the Kinesis stream name.
To get Kinesis stream ARN, on the Kinesis console, select the stream name, then go to Stream on the column on the left. Copy the Stream ARN.
4. Fill out the AWS Service, Action, and ARN on the Edit Permissions form.
5. Click Add Statement on the Edit Permissions form.
6. Fill out the action and ARN for Amazon DynamoDB.
- AWS Service: Amazon DynamoDB
- Actions: Select All Actions as the allowed actions.
- Amazon Resource Name (ARN): arn:aws:kinesis:us-east-1:111548290563:stream/TinkKinesisStream where us-east-1 is the region, 111548290563 is the AWS account ID, TinkKinesisStream is the DynamoDB name.
To get DynamoDB ARN, go to DynamoDB console, click on the Tables on the column on the left, select the DynamoDB table name TinkSmartPegTable, you should see the DynamoDB ARN on the Overview tab.
7. Fill out the AWS Service, Action, and ARN on the Edit Permissions form, then click on Add Statement.
8. Click Add Statement on the Edit Permissions form.
9. Fill out the action and ARN for Amazon DynamoDB.
- AWS Service: Amazon DynamoDB
- Actions: Select All Actions as the allowed actions.
- Amazon Resource Name (ARN): arn:aws:kinesis:us-east-1:111548290563:stream/TinkKinesisStream where us-east-1 is the region, 111548290563 is the AWS account ID, TinkKinesisStream is the DynamoDB name.
To get DynamoDB ARN, go to DynamoDB console, click on the Tables on the column on the left, select the DynamoDB table name TinkSmartPegTable, you should see the DynamoDB ARN on the Overview tab.
10. Fill out the AWS Service, Action, and ARN on the Edit Permissions form, then click on Add Statement.
11. Continue with the Next Step.
12. Edit the policy name then select Create Policy.
If you see the message “TinkKinesisDynamoDBPolicy has been created”, the new policy was created successfully.
Amazon Cognito*
Amazon Cognito is a web service to specify the temporary security credentials to UP Squared and Gateway GB-BXTB-3825 devices to access your Kinesis stream and DynamoDB. The Amazon Cognito parameters are used to initialize the Cognito Credentials object.
1. Navigate to the Cognito console, then select Manage Federated Identities.
2. Enter identity pool name, check Enable access to unauthenticated identities.
3. Select Allow.
4. In the Getting started with Amazon Cognito form, click on Edit identity pool on the top right corner.
5. Click on Cognito Stream to expand.
6. Select TinkKinesisStream for stream name and enabled stream status, then Save Changes.
Attach Policies
1. Open IAM console, choose Roles, and then select Cognito_TinkPegIdentityPoolUnauth_Role.
2. Click on Attach policy.
3. Select TinkKinesisStreamPolicy and click on Attach policy.
4. Grant full DynamoDB access for Cognito_TinkPegIdentityUnauth_Role so that Cognito_TinkPegIdentityUnauth_Role is authorized to perform dynamodb:ListTables. Click on Attach policy again to attach to AmazonDynamoDBFullAccess.
5. Select AmazonDynamoDBFullAccess, then click on Attach policy.
6. Now Cognito_TinkPegIdentityPoolUnauth_Role attached to both TinkKinesisDynamoDBPolicy and AmazonDynamoDBFullAccess.
Sample Code
Creating Cognito* Parameters for Amazon Kinesis* Stream
Amazon Cognito* is a web service to specify the temporary security credentials to mobile devices. The Amazon Cognito parameters are used to initialize the Cognito Credentials object.
1. Create the Cognito parameters for the Amazon Kinesis Stream as follow:
- AccountId: From the Account Name on the upper right corner, go to My Account.
- IdentityPoolId: To get Identity pool ID for code cognitoParams, navigate to Cognito console, then click Manage Federated Identities, select TinkPegIdentityPool, then click on Edit identity pool on the upper right corner. The Identity pool ID is on the Edit identity pool form.
2. Select TinkPegIdentityPool.
3. Click on Edit identity pool.
var awsRegion = "us-east-1";
var streamName = TinkKinesisStream;
var tableName = " TinkSmartPegTable";
var cognitoParams = {
AccountId: "111548290563",
IdentityPoolId: "us-east-1:ab55e2b3-36b6-4e88-aa9a-38f6d437d89e"
};
Code Example 1: Cognito parameters for Amazon Kinesis stream
Creating Credentials Object
Now you are ready to write a sample application using the Amazon Cognito Identify* service to create a credentials object.
var AWS = require('aws-sdk');
AWS.config.region = awsRegion;
AWS.config.credentials = new AWS.CognitoIdentityCredentials(cognitoParams);
AWS.config.credentials.get(function(err) {
if (!err) {
console.log("Cognito Identity Id: " + AWS.config.credentials.identityId);
}
else {
console.log("Error in Cognito identiy credentials.");
console.log(err);
}
});
Code Example 2: Creating a new credentials object
DescribeStream
DescribeStream
returns the current status of the stream, the Amazon resource names (ARN), an array of shard objects, and states whether there are more shards available. The example below describe to the Amazon Kinesis stream to get the available shard.
kinesis.describeStream(describeParams, function(err, data) {
if (err) {
console.log(err, err.stack);
}
else {
console.log("describeStream data");
console.log(data.StreamDescription.Shards[2].ShardId);
}
})
Code Example 3: DescribeStream
PutRecord
The sample code below put a record in the JSON* format into the Amazon Kinesis stream 'TinkKinesisStream'
. The record contains two data typed string: the MAC address and the five-byte value. For more information about PutRecord, go to http://docs.aws.amazon.com/kinesis/latest/APIReference/API_PutRecord.html.
function putRecordsToKinesisStream (){
var json = {
"PegID": "FF.FF.FF.FF.FF",
"FiveBytesData": "0x1212121212",
};
var params = {
Data: JSON.stringify(json),
PartitionKey: partitionKey,
StreamName: streamName
};
kinesis.putRecord(params, function(err, putData) {
if (err) {
console.log(err, err.stack);
} else {
try {
sharId.value = putData.ShardId;
} catch(err) {
console.log("Error in Kinesis.putRecord()");
console.log(err);
}
}
});
};
Code Example 4: putRecord
getRecord
Before getRecords
, we call getShardIterator
to specify the name of the Kinesis stream, how to read the data records, and position in the shard from which to start reading records sequentially. For details description of getShardIterator
, see http://docs.aws.amazon.com/kinesis/latest/APIReference/API_GetShardIterator.html.
var describeParams = {
StreamName: streamName
};
kinesis.describeStream(describeParams, function(err, data) {
if (err) {
console.log(err, err.stack);
}
else {
var getParams = {
ShardId: data.StreamDescription.Shards[2].ShardId,
ShardIteratorType: "TRIM_HORIZON",
StreamName: streamName,
};
kinesis.getShardIterator(getParams, function(err, result) {
if (err) {
console.log("Error in getShardIterator()");
console.log(err);
} else {
getRecord(result.ShardIterator);
}
});
}
});
Code Example 5: getShardIterator
Now we can GetRecords
from the Amazon Kinesis stream TinkKinesisStream
. For more information about GetRecords
, see http://docs.aws.amazon.com/kinesis/latest/APIReference/API_GetRecords.html.
function getRecord(shard_iterator) {
var getRecParams = {
ShardIterator: shard_iterator
};
kinesis.getRecords(getRecParams, function(err, result) {
if (err) {
console.log("Error in getRecords() from the Kinesis stream.");
console.log(err);
} else {
try {
var StringDecoder = require('string_decoder').StringDecoder;
var decoder = new StringDecoder('utf8');
if(result.Records.length > 0) {
…
}
} catch(err) {
console.log("Error parsing the package.");
console.log(err);
}
if (result.NextShardIterator) {
getRecord(result.NextShardIterator);
}
}
});
}
Code Example 6: getRecords
Amazon DynamoDB*
Editing Policies
The IAM policies allow us to specify any API action from any service that supports IAM. The policies below specify API actions for both Amazon Kinesis stream and DynamoDB that used in the example code throughout document. Edit the policies to add or remove the allowed actions on the Kinesis stream or DynamoDB.
{
"Version": "2012-10-17",
"Statement": [
{
"Sid": "Stmt1505944340000",
"Effect": "Allow",
"Action": [
"kinesis:DescribeStream",
"kinesis:GetShardIterator",
"kinesis:GetRecords",
"kinesis:PutRecord",
"kinesis:PutRecords"
],
"Resource": [
"arn:aws:kinesis:us-east-1:111548290563:stream/TinkKinesisStream"
]
},
{
"Sid": "Stmt1505944397000",
"Effect": "Allow",
"Action": [
"dynamodb:*"
],
"Resource": [
"arn:aws:dynamodb:us-east-1:111548290563:table/TinkSmartPegTable"
]
}
]
}
Figure 7: Policies for Kinesis stream and DynamoDB
Calling listTable
listTable returns an array of the Amazon DynamoDB table names that previously created. We call listTable to get the available Amazon DynamoDB table name.
var db = new AWS.DynamoDB();
var dbParams = {};
db.listTables(dbParams , function(err, data) {
if (err) {
console.log("Error in dynamoDb.listTables(): ");
console.log(err, err.stack);
return;
}
else {
tableStr.value = data.TableNames;
console.log(tableStr.value.length);
}
});
Code Example 7: listTables
Calling putItem
The function below receives the package from the Amazon Kinesis stream and stores it in the Amazon DynamoDB table.
putItem = function(tableName, pegId, fiveBytes) {
var item = {
'PegID': { 'S': pegId },
'FiveBytesData': { 'S': fiveBytes }
};
var putItemParams = {
TableName: tableName,
Item: item
};
db.putItem(putItemParams, function(err, data) {
if (err) {
console.log("Got error: ");
console.log(err, err.stack);
} else {
console.log(JSON.stringify(data, null, 2));
}
});
};
Code Example 8: putItem
Example Sketches
The following example sketches show how to repeatedly read the Kinesis stream, get the records from the Amazon Kinesis stream and store them in the Amazon DynamoDB. Both of the examples below are developed and executed on the UP Squared and Gigabyte Gateway GB-BXTB-3825 platforms. Ensure both platforms are connected to network before you execute.
Kinesis stream putRecord example
var awsRegion = "us-east-1";
var streamName = 'TinkKinesisStream';
var tableName = "TinkSmartPegTable";
var cognitoParams = {
AccountId: "111548290563",
IdentityPoolId: "us-east-1:ab55e2b3-36b6-4e88-aa9a-38f6d437d89e"
};
var AWS = require('aws-sdk');
AWS.config.region = awsRegion;
AWS.config.credentials = new AWS.CognitoIdentityCredentials(cognitoParams);
AWS.config.credentials.get(function(err) {
if (!err) {
console.log("Cognito Identity Id: " + AWS.config.credentials.identityId);
}
else {
console.log(err);
}
});
var kinesis = new AWS.Kinesis({apiVersion: '2013-12-02'});
function sharIdObj() {
this.value = "";
}
var sharId = new sharIdObj();
function gateWayPackageObj() {
this.value = "";
}
var package = new gateWayPackageObj();
var describeParams = {
StreamName: streamName
};
kinesis.describeStream(describeParams, function(err, data) {
if (err) {
console.log(err, err.stack);
}
else {
console.log("describeStream data");
console.log(data);
}
})
setInterval(putRecordsToKinesisStream, 5000);
function putRecordsToKinesisStream (){
var json = {
"PegID": "FF.FF.FF.FF.FF",
"FiveBytesData": "0x1212121212",
};
var params = {
Data: JSON.stringify(json),
PartitionKey: partitionKey,
StreamName: streamName
};
kinesis.putRecord(params, function(err, putData) {
if (err) {
console.log(err, err.stack);
} else {
try {
sharId.value = putData.ShardId;
} catch(err) {
console.log("Error in Kinesis.putRecord()");
console.log(err);
}
}
});
};
Code Example 9: TinkGatewayToCloud.js Example
Continuously read Kinesis stream example
var awsRegion = "us-east-1";
var streamName = 'TinkKinesisStream';
var tableName = "TinkSmartPegTable";
var cognitoParams = {
AccountId: "111548290563",
IdentityPoolId: "us-east-1:ab55e2b3-36b6-4e88-aa9a-38f6d437d89e"
};
var AWS = require('aws-sdk');
AWS.config.region = awsRegion;
AWS.config.credentials = new AWS.CognitoIdentityCredentials(cognitoParams);
AWS.config.credentials.get(function(err) {
if (!err) {
console.log("Cognito Identity Id: " + AWS.config.credentials.identityId);
}
else {
console.log("Error in Cognito identiy credentials.");
console.log(err);
}
});
var kinesis = new AWS.Kinesis({apiVersion: '2013-12-02'});
var db = new AWS.DynamoDB();
var dbParams = {};
function tableObj() {
this.value = "";
}
var tableStr = new tableObj();
db.listTables(dbParams , function(err, data) {
if (err) {
console.log("Error in dynamoDb.listTables(): ");
console.log(err, err.stack);
return;
}
else {
tableStr.value = data.TableNames;
console.log(tableStr.value.length);
}
});
var describeParams = {
StreamName: streamName
};
kinesis.describeStream(describeParams, function(err, data) {
if (err) {
console.log(err, err.stack);
}
else {
var getParams = {
ShardId: data.StreamDescription.Shards[4].ShardId,
ShardIteratorType: "TRIM_HORIZON",
StreamName: streamName,
};
kinesis.getShardIterator(getParams, function(err, result) {
if (err) {
console.log("Error in getShardIterator()");
console.log(err);
} else {
getRecord(result.ShardIterator);
}
});
}
});
function getRecord(shard_iterator) {
var getRecParams = {
ShardIterator: shard_iterator
};
kinesis.getRecords(getRecParams, function(err, result) {
if (err) {
console.log("Error in getRecords() from the Kinesis stream.");
console.log(err);
} else {
try {
var StringDecoder = require('string_decoder').StringDecoder;
var decoder = new StringDecoder('utf8');
if(result.Records.length > 0) {
for(var i = 0; i < result.Records.length; i++) {
if(result.Records[i] != undefined) {
var getData = JSON.parse(decoder.write(result.Records[i].Data));
console.log("PegId =");
console.log(getData.PegID);
putItem(tableStr.value[0], getData.PegID, getData.FiveBytesData);
}
}
}
} catch(err) {
console.log("Error parsing the package.");
console.log(err);
}
if (result.NextShardIterator) {
getRecord(result.NextShardIterator);
}
}
});
}
putItem = function(tableName, pegId, fiveBytes) {
var item = {
'PegID': { 'S': pegId },
'FiveBytesData': { 'S': fiveBytes }
};
var putItemParams = {
TableName: tableName,
Item: item
};
db.putItem(putItemParams, function(err, data) {
if (err) {
console.log("Got error: ");
console.log(err, err.stack);
} else {
console.log(JSON.stringify(data, null, 2));
}
});
};
Code Example 10: TinkGetPackageFromCloud.js Example
The UP Squared will continuously push packages into the Kinesis stream.
The gateway will continuously pull packages from Kinesis stream and storing them into the DynamoDB table.
Summary
We have described how to use Amazon Kinesis stream and DynamoDB* on UP Squared and Gigabyte gateway GB-BXTB-3825 platforms using JavaScript* on the Ubuntu* operating system. The UP Squared continuously pushes packages into the Kinesis stream and the Gateway GB-BXTB-3825 continuously pulls packages from Kinesis stream then stores them into the cloud for further analysis. We recommend you try out different Amazon Web Services with these platforms.
References