Click here to Skip to main content
13,796,494 members
Click here to Skip to main content
Add your own
alternative version

Stats

3.6K views
7 bookmarked
Posted 25 Oct 2018
Licenced CPOL

IEI Tank AIoT Developer Kit and Microsoft Azure: Running Machine Learning on the Edge

, 25 Oct 2018
This paper will convert a Python based motor defect detector solution to a deployable Azure module.

Editorial Note

This article is in the Product Showcase section for our sponsors at CodeProject. These articles are intended to provide you with information on products and services that we consider useful and of value to developers.

Deploying a machine learning model remotely to the edge with Microsoft Azure* IoT Edge can help scale an IoT application. Azure IoT Edge works by containerizing the solution into a module to be pushed down to machines at the edge. This paper will convert a Python* based motor defect detector solution to a deployable Azure module. The motor defect detector uses a K-means clustering algorithm to do predictive maintenance on motor bearings to determine if they will fail or not. The module does analysis on simulated data from the motors and then sends messages back to the IOT hub about the status of each bearing.

Figure 1: Flow of module from Development Machine to Edge Device

Hardware

Setting up Development Environment and Microsoft Azure*

Follow the Python module tutorial here to make sure all prerequisites are setup. It will walk through setting up Visual Studio Code and other resources on the development machine as well as creating all the necessary dependencies in Azure. The main things in Azure to have are a standard tier IoT Hub, the edge device registered to the hub, and a registry to store the container images.

Tips

Cookie Cutter needs to be added as an environment variable to the ‘Path’ on the development machine. It is installed at the location below on the development machine used.

C:\Users\\AppData\Roaming\Python\Python36\Scripts\

The development machine should use Python3 and also needs the following things installed so Visual Studio Code doesn’t show errors on the code in the tutorial:

pip install iothub_client pandas numpy sklearn scipy

Pylint might also needs to be installed if not already for Visual Studio Code.

pip install pylint

Restart Visual Studio code so it can find the installations.

Creating the Module

On the development machine we will use Visual Studio Code to create the module to be deployed to the Tank edge device.

  1. Create a "New IoT Edge Solution" by right clicking on modules.
  2. Call it MotorDefectDetectorSolution.
  3. Select Python Module and call the module KmeansModule.
  4. Enter in the registry address: .azurecr.io/KmeansModule
  5. The new Edge Solution will open.
  6. Copy in kmeanModel.npy from the motor defect detector GitHub* to the KmeansModule. This is the model file.
  7. Create and copy in utils.py from below. Utils.py handles most of the mathematical calculations. It has been edited from the github utils.py by changing it to use the 1st test set by default and removing the unused plotting functions.
    #importing the libraries
    import numpy as np
    import pandas as pd
    #import matplotlib.pyplot as plt
    
    from scipy.fftpack import fft
    from scipy.spatial.distance import cdist
    #from sklearn import cluster
    
    #cal_labels function take no_of_files as input and generate the label based on 70-30 split.
    #files for the testset1 = 2148,testset2 = 984,testset3 = 6324
    def cal_Labels(files):
        range_low = files*0.7
        range_high = files*1.0
        label = []
        for i in range(0,files):
            if(i= range_low and i <= range_high):
                label.append(1)
            else:
                label.append(2)
        return label
    
    # cal_amplitude take the fftdata, n = no of maximun amplitude as input and return the top5 frequecy which has the highest amplitude
    def cal_amplitude(fftData,n):
        ifa = []
        ia = []
        amp = abs(fftData[0:int(len(fftData)/2)])
        freq = np.linspace(0,10000,num = int(len(fftData)/2))
        ida = np.array(amp).argsort()[-n:][::-1]
        ia.append([amp[i] for i in ida])
        ifa.append([freq[i] for i in ida])
        return(ifa,ia)
    
    # this function calculate the top n freq which has the heighest amplitude and retuen the list for each maximum
    def cal_max_freq(files,path):
        freq_max1, freq_max2, freq_max3, freq_max4, freq_max5 = ([] for _ in range(5))
        for f in files:
            temp = pd.read_csv(path+f,  sep = "\t",header = None)
            temp_freq_max1,temp_freq_max2,temp_freq_max3,temp_freq_max4,temp_freq_max5 = ([] for _ in range(5))
            rhigh = 8
            for i in range(0,rhigh):
                t = fft(temp[i])
                ff,aa = cal_amplitude(t,5)
                temp_freq_max1.append(np.array(ff)[:,0])
                temp_freq_max2.append(np.array(ff)[:,1])
                temp_freq_max3.append(np.array(ff)[:,2])
                temp_freq_max4.append(np.array(ff)[:,3])
                temp_freq_max5.append(np.array(ff)[:,4])
            freq_max1.append(temp_freq_max1)
            freq_max2.append(temp_freq_max2)
            freq_max3.append(temp_freq_max3)
            freq_max4.append(temp_freq_max4)
            freq_max5.append(temp_freq_max5)
        return(freq_max1,freq_max2,freq_max3,freq_max4,freq_max5)
    
    
    def create_dataframe(freq_max1,freq_max2,freq_max3,freq_max4,freq_max5,bearing):
        result = pd.DataFrame()
        result['fmax1'] = list((np.array(freq_max1))[:,bearing])
        result['fmax2'] = list((np.array(freq_max2))[:,bearing])
        result['fmax3'] = list((np.array(freq_max3))[:,bearing])
        result['fmax4'] = list((np.array(freq_max4))[:,bearing])
        result['fmax5'] = list((np.array(freq_max5))[:,bearing])
        x = result[["fmax1","fmax2","fmax3","fmax4","fmax5"]]
        return x
    Code 1: utils.py
  8. Copy in the code from main.py below into the default main.py. Main is where the program is going to run and send messages to topic about the status of the bearings. In order to simulate data being generated, the script will download the data sets from NASA used in the original GitHub project and extract the 1st test set. Then it will copy the 1st test set files one by one to folder /tmp/test. This folder is where the program will pull the data from, hence simulating the motor running and gathering data over time.
    import random
    import time
    import sys
    import iothub_client
    # pylint: disable=E0611
    from iothub_client import IoTHubModuleClient, IoTHubClientError, IoTHubTransportProvider
    from iothub_client import IoTHubMessage, IoTHubMessageDispositionResult, IoTHubError
    
    import pandas as pd
    import numpy as np
    from utils import cal_max_freq
    import os
    import urllib
    import shutil
    
    def checkBearings(hubManager):
        datadir= '/tmp/1st_test/'
        filedir = '/tmp/test/'
        try:
            if not os.path.exists(datadir): 
                os.system("mkdir /tmp/test")
                print("data not found7, downloading")
                urllib.request.urlretrieve("https://ti.arc.nasa.gov/c/3/", "/tmp/IMS.7z")
                print("downloaded, now unzipping")
                os.system("7za x /tmp/IMS.7z -o/tmp/")
                os.system("unrar x /tmp/1st_test.rar /tmp/")
                print("unzipped")
                files = [x for x in os.listdir(datadir)]
                oldest= min(os.listdir(datadir), key=lambda f: os.path.getctime("{}/{}".format(datadir, f)))
                os.rename(datadir + oldest, filedir  + oldest)
        except IOError as e:
            print(e)
            print("error end")
    
        # load the model
        filename = "kmeanModel.npy"
        model = np.load(filename).item() 
    
        # iteration for 1st_test data
        rhigh = 8
    
        moredata= True
    
        while moredata:
            try:
                # load the files
                all_files = os.listdir(filedir)
                freq_max1,freq_max2,freq_max3,freq_max4,freq_max5  =  cal_max_freq(all_files,filedir)
            except IOError as e:
                print("you have entered either the wrong data directory path or filepath ")
                print(e)
                print("error end")
    
    
            #testlabels = []
            for i in range(0,rhigh):
                print("checking for the bearing",i+1)
                result = pd.DataFrame()
                result['freq_max1'] = list((np.array(freq_max1))[:,i])
                result['freq_max2'] = list((np.array(freq_max2))[:,i])
                result['freq_max3'] = list((np.array(freq_max3))[:,i])
                result['freq_max4'] = list((np.array(freq_max4))[:,i])
                result['freq_max5'] = list((np.array(freq_max5))[:,i])
    
                X = result[["freq_max1","freq_max2","freq_max3","freq_max4","freq_max5"]]
    
                label = model.predict(X)
                labelfive = list(label[-100:]).count(5)
                labelsix = list(label[-100:]).count(6)
                labelseven = list(label[-100:]).count(7)
                totalfailur = labelfive+labelsix+labelseven#+labelfour
                ratio = (totalfailur/100)*100
                if(ratio >= 25):
                    print("bearing"+ str(i+1) + " is suspected to fail")
                    hubManager.send_event_to_output("output2", "bearing"+ str(i+1) + " is suspected to fail", 0)
                else:
                    print("bearing"+ str(i+1) + " is working in normal condition")
                    hubManager.send_event_to_output("output2", "bearing"+ str(i+1) + " is working in normal condition", 0)
    
            files = [x for x in os.listdir(datadir)]
            if len(files):
                oldest= min(os.listdir(datadir), key=lambda f: os.path.getctime("{}/{}".format(datadir, f)))
                os.rename(datadir + oldest, filedir  + oldest)
            else:
                moredata = False
                print("done")
    		
    
    # messageTimeout - the maximum time in milliseconds until a message times out.
    # The timeout period starts at IoTHubModuleClient.send_event_async.
    # By default, messages do not expire.
    MESSAGE_TIMEOUT = 10000
    
    # global counters
    RECEIVE_CALLBACKS = 0
    SEND_CALLBACKS = 0
    
    # Choose HTTP, AMQP or MQTT as transport protocol.  Currently only MQTT is supported.
    PROTOCOL = IoTHubTransportProvider.MQTT
    
    # Callback received when the message that we're forwarding is processed.
    def send_confirmation_callback(message, result, user_context):
        global SEND_CALLBACKS
        print ( "Confirmation[%d] received for message with result = %s" % (user_context, result) )
        map_properties = message.properties()
        key_value_pair = map_properties.get_internals()
        print ( "    Properties: %s" % key_value_pair )
        SEND_CALLBACKS += 1
        print ( "    Total calls confirmed: %d" % SEND_CALLBACKS )
    
    
    # receive_message_callback is invoked when an incoming message arrives on the specified 
    # input queue (in the case of this sample, "input1").  Because this is a filter module, 
    # we will forward this message onto the "output1" queue.
    def receive_message_callback(message, hubManager):
        global RECEIVE_CALLBACKS
        message_buffer = message.get_bytearray()
        size = len(message_buffer)
        #print ( "    Data: <<<%s>>> & Size=%d" % (message_buffer[:size].decode('utf-8'), size) )
        map_properties = message.properties()
        key_value_pair = map_properties.get_internals()
        #print ( "    Properties: %s" % key_value_pair )
        RECEIVE_CALLBACKS += 1
        #print ( "    Total calls received: %d" % RECEIVE_CALLBACKS )
        #hubManager.forward_event_to_output("output1", message, 0)
        return IoTHubMessageDispositionResult.ACCEPTED
    
    
    class HubManager(object):
    
        def __init__(
                self,
                protocol=IoTHubTransportProvider.MQTT):
            self.client_protocol = protocol
            self.client = IoTHubModuleClient()
            self.client.create_from_environment(protocol)
    
            # set the time until a message times out
            self.client.set_option("messageTimeout", MESSAGE_TIMEOUT)
            
            # sets the callback when a message arrives on "input1" queue.  Messages sent to 
            # other inputs or to the default will be silently discarded.
            self.client.set_message_callback("input1", receive_message_callback, self)
    
        # Forwards the message received onto the next stage in the process.
        def forward_event_to_output(self, outputQueueName, event, send_context):
            self.client.send_event_async(
                outputQueueName, event, send_confirmation_callback, send_context)
        # Send the message 
        def send_event_to_output(self, outputQueueName, messsage, send_context):
            event=IoTHubMessage(messsage)
            self.client.send_event_async(
                outputQueueName, event, send_confirmation_callback, send_context)
    
    def main(protocol):
        try:
            print ( "\nPython %s\n" % sys.version )
            print ( "IoT Hub Client for Python3" )
    
            hub_manager = HubManager(protocol)
    
            print ( "Starting the IoT Hub Python sample using protocol %s..." % hub_manager.client_protocol )
            print ( "The sample is now waiting for messages and will indefinitely.  Press Ctrl-C to exit. ")
    
            checkBearings(hub_manager)
    
            while True:
                time.sleep(1)
    
        except IoTHubError as iothub_error:
            print ( "Unexpected error %s from IoTHub" % iothub_error )
            return
        except KeyboardInterrupt:
            print ( "IoTHubModuleClient sample stopped" )
    
    if __name__ == '__main__':
        main(PROTOCOL)
    Code 2: main.py
  9. Update the requirements.txt. This will install the dependencies of the Motor Fault Detector.
    azure-iothub-device-client==1.4.0
    numpy>=1.11.2
    scipy>=1.1.0
    pandas>=0.23.4
    scikit-learn>=0.19.1
    sklearn>=0.0
    Code 3: requirements.txt
  10. And update Dockerfile.amd64. Note that the container only comes with Python 2.7 by default so Python3 needs to be installed and the Python path updated.
FROM ubuntu:xenial

WORKDIR /app

RUN apt-get update && \
    apt-get install -y --no-install-recommends libcurl4-openssl-dev libboost-python-dev p7zip-full unrar python3-pip python3-dev python3-setuptools && \
    cd /usr/local/bin && \
    ln -s /usr/bin/python3 python && \
    pip3 install --upgrade pip && \
    rm -rf /var/lib/apt/lists/* 

COPY requirements.txt ./
RUN pip3 install -r requirements.txt

COPY . .

RUN useradd -ms /bin/bash moduleuser
USER moduleuser

CMD [ "python3", "-u", "./main.py" ]
Code 4: Dockerfile.amd64

With the added files, the module structure should look as below in Visual Studio Code.

Figure 2: IoT Edge Solution in Visual Studio Code

Deploying the Module

1. In Visual Studio Code, right click on deployment.template.solution and Build and Push IoT Edge Solution.

Figure 3: Build and Push IoT Edge Solution location

It will take some time to build the container with all the requirements.

2. Then right click on the Azure IoT Hub Device you want to deploy to and select Create Deployment for Single Device.

Figure 4: Deploy the module

3. Log into the Iot Hub edge Device, in this case, the Tank.

Use the below command to monitor the progress:

sudo iotedge logs KmeansModule –f

It will take some time to download the data and extract it. The module will first download the data, extract the data, and then start copying it into the folder. It will then send the messages back to the IoT hub.

The messages can be seen on the development machine by right clicking on the next to Azure IOT HUB Devices and selecting Start Monitoring D2C Message in Visual Studio Code.

Figure 5: Monitoring the Device to Cloud messages

Figure 6: Iot Hub Messages

Useful module commands on the Tank

List the module installed:

iotedge list

Remove the container:

sudo docker rm -f KmeansModule

Look at the logs of the container:

sudo iotedge logs KmeansModule –f

Conclusion

Now the motor defect detector Python project has been converted into a module on the Azure Iot Hub and deployed to an edge device. As a next step, Azure can turn those messages into actionable events with routing.

About the author

Whitney Foster is a software engineer at Intel in the Core and Visual Computing Group working on scale enabling projects for Internet of Things and Computer Vision.

Learn More

License

This article, along with any associated source code and files, is licensed under The Code Project Open License (CPOL)

Share

About the Author

Intel Corporation
United States United States
You may know us for our processors. But we do so much more. Intel invents at the boundaries of technology to make amazing experiences possible for business and society, and for every person on Earth.

Harnessing the capability of the cloud, the ubiquity of the Internet of Things, the latest advances in memory and programmable solutions, and the promise of always-on 5G connectivity, Intel is disrupting industries and solving global challenges. Leading on policy, diversity, inclusion, education and sustainability, we create value for our stockholders, customers and society.
Group type: Organisation

43 members


You may also be interested in...

Pro
Pro

Comments and Discussions

 
-- There are no messages in this forum --
Permalink | Advertise | Privacy | Cookies | Terms of Use | Mobile
Web05 | 2.8.181207.3 | Last Updated 25 Oct 2018
Article Copyright 2018 by Intel Corporation
Everything else Copyright © CodeProject, 1999-2018
Layout: fixed | fluid