1편에서 장치 설치 하고 오기!

https://realyun99.tistory.com/153

이어서 진행해보자..


Kinesis Data Streams

  • 스트림 생성

 

DynamoDB

  • DB 생성

 

Lambda

  • IAM 역할 생성 - Lambda

  • IAM 역할 생성 - IoT

권한의 경우 알아서 추가해줌.. 그대로 진행하면 됨

 

  • Lambda 함수 생성 및 트리거 추가

역할의 경우 위에서 만들었던 IAM Lambda 역할을 지정해주자.

 

다음으로 트리거 추가를 하자. 먼저 Kinesis!

함수에 코드를 추가해주자.. 추가 후에 꼭 Deploy 버튼 누르기!!!!!!

더보기
from __future__ import print_function
import base64
import boto3
from boto3.dynamodb.conditions import Key, Attr
import json
import os
import traceback

#-----Dynamo Info change here------
TABLE_NAME = os.environ.get('TABLE_NAME', "default")
DDB_PRIMARY_KEY = "deviceid"
DDB_SORT_KEY = "timestamp"
DDB_ATTR = "temp"
#-----Dynamo Info change here------

dynamodb = boto3.resource('dynamodb')
table  = dynamodb.Table(TABLE_NAME)

'''
This Kinesis data(Json Image) is below
{
    "DEVICE_NAME": $device_name,
    "TIMESTAMP": $TimeStamp(yyyy-mm-ddThh:MM:SS),
    "HUMIDITY" : int,
    "TEMPERATURE" : int
}
'''
def checkItem(str_data):
    try:
        #String to Json object
        json_data = json.loads(str_data)
        # adjust your data format
        resDict = {
            DDB_PRIMARY_KEY:json_data['DEVICE_NAME'],
            DDB_SORT_KEY:json_data['TIMESTAMP'],
            "HUMIDITY": json_data['HUMIDITY'],
            "TEMPERATURE": json_data['TEMPERATURE']

        }
        print("resDict:{}".format(resDict))
        return resDict

    except Exception as e:
        print(traceback.format_exc())
        return None

def writeItemInfo(datas):
    ItemInfoDictList = []
    try:
        for data in datas:
            itemDict = checkItem(data)
            if None != itemDict:
                ItemInfoDictList.append(itemDict)
            # if data does not have key info, just pass
            else:
                print("Error data found:{}".format(data))
                pass

    except Exception as e:
        print(traceback.format_exc())
        print("Error on writeItemInfo")

    return ItemInfoDictList

def DynamoBulkPut(datas):
    try:
        putItemDictList = writeItemInfo(datas)
        with table.batch_writer() as batch:
            for putItemDict in putItemDictList:
                batch.put_item(Item = putItemDict)
        return

    except Exception as e:
        print("Error on DynamoBulkPut()")
        raise e

def decodeKinesisData(dataList):
    decodedList = []
    try:
        for data in dataList:
            payload =  base64.b64decode(data['kinesis']['data'])
            print("payload={}".format(payload))
            decodedList.append(payload)

        return decodedList

    except Exception as e:
        print("Error on decodeKinesisData()")
        raise e

#------------------------------------------------------------------------
# call by Lambda here.
#------------------------------------------------------------------------
def lambda_handler(event, context):
    print("lambda_handler start")

    try:
        print("---------------json inside----------------")
        print(json.dumps(event))
        encodeKinesisList = event['Records']
        print(encodeKinesisList)
        decodedKinesisList = decodeKinesisData(encodeKinesisList)
        # Dynamo Put
        if 0 < len(decodedKinesisList):
            DynamoBulkPut(decodedKinesisList)
        else:
            print("there is no valid data in Kinesis stream, all data passed")

        return

    except Exception as e:
        print(traceback.format_exc())
        # This is sample source. When error occur this return success and ignore the error.
        raise e

 

환경 변수 지정을 위해 설정을 들어가자 (구성 → 환경변수)

DynamoDB 테이블을 넣어주자

 

IoT Core 확인

  • 메시지 라우팅 규칙 생성

(메시지 라우팅 → 규칙)

from 뒤에는 아까 생성했던 디바이스의 topic을 넣어준다.

 

IoT와 Kinesis를 연결해준다. IoT 디바이스에서 데이터를 받아와 Kinesis로 흐르게

Role도 아까 생성했던 iot 역할로 지정해준다.

 

현재 iot 디바이스 연결도 잘 되어 있는 것을 볼 수 있음.

 

  • Kinesis Stream 모니터링

또, 위에서 연결했던 Kinesis의 모니터링을 보면 데이터가 잘 들어오는 것을 볼 수 있다.

(지표들 중에서 GetRecords - 합계 | PutRecords - 합계 값이 0이 아닌지 확인)

시간이 좀 지나야 해당 그래프를 볼 수 있음...!

  • DynamoDB 모니터링

라이브 항목 수 가져오기 클릭

항목 탐색을 확인해보자.

 

  • Lambda 모니터링

(Invocations 그래프가 끝에 떨어지는 모양은 내가 cloud9 실행을 멈췄기 때문/ 스크립트 계속 돌리면 그래프 유지됨.)

 

API 용 Lambda 구성

  • Lambda 함수 만들기

해당 코드를 넣어준다.(Deploy 클릭)

더보기
from __future__ import print_function
import boto3
from boto3.dynamodb.conditions import Key
import datetime
import json
import traceback
import os

#-----Dynamo Info change here------
TABLE_NAME = os.environ.get('TABLE_NAME', "default")
DDB_PRIMARY_KEY = "deviceid"
DDB_SORT_KEY = "timestamp"
#-----Dynamo Info change here------

dynamodb = boto3.resource('dynamodb')
table  = dynamodb.Table(TABLE_NAME)

#------------------------------------------------------------------------
def dynamoQuery(deviceid, requestTime):
    print("dynamoQuery start")
    valList = []
    res = table.query(
        KeyConditionExpression=
            Key(DDB_PRIMARY_KEY).eq(deviceid) &
            Key(DDB_SORT_KEY).lt(requestTime),
            ScanIndexForward = False,
            Limit = 30
        )

    for row in res['Items']:
        val = row['TEMPERATURE']
        itemDict = {
            "timestamp":row['timestamp'],
            "value":int(val)
        }
        valList.append(itemDict)

    return valList

#------------------------------------------------------------------------
# call by Lambda here.
#  Event structure : API-Gateway Lambda proxy post
#------------------------------------------------------------------------
def lambda_handler(event, context):
    #Lambda Proxy response back template
    HttpRes = {
        "statusCode": 200,
        "headers": {"Access-Control-Allow-Origin" : "*"},
        "body": "",
        "isBase64Encoded": False
    }

    try:
        print("lambda_handler start")
        print(json.dumps(event))

        # get Parameters
        pathParameters = event.get('pathParameters')
        deviceid = pathParameters["deviceid"]
        requestTime = datetime.datetime.now().strftime('%Y-%m-%dT%H:%M:%S')

        resItemDict = { deviceid : ""}
        resItemDict[deviceid] = dynamoQuery(deviceid, requestTime)
        HttpRes['body'] = json.dumps(resItemDict)

    except Exception as e:
        print(traceback.format_exc())
        HttpRes["statusCode"] = 500
        HttpRes["body"] = "Lambda error. check lambda log"

    print("response:{}".format(json.dumps(HttpRes)))
    return HttpRes

위에서 했던 것과 같이 DynamoDB 환경변수를 넣어준다.

 

API Gateway

  • REST API 생성

(작업 → 리소스 생성)

(/datas 클릭 후 리소스 생성)

리소스 경로 괄호에 주의하라

(/deviceid에서 매서드 생성)

권한 추가는 확인을 클릭해준다.

위와 같은 화면이 구성되면 완료.

(작업 → CORS 활성화)

/{deviceid} 에서 cors 활성화!

(작업 → API 배포)

/{deviceid} 에서 api 배포를 진행하자.

생성된 url을 복사해두자.

 

DrawGraph.zip을 받고

(js/createGraph.js 파일)

첫번째 줄 device_name에 만들었던 디바이스 입력

두번째 줄 hosturl에 api url 입력

 

다운받았던 폴더에서 html 브라우저에서 실행하면

다음과 같은 그래프가 나오면 성공이다.(해당 그래프 나올 때까지의 시간이 걸릴 수 있음!)

위의 값은 DynamoDB에 저장된 디바이스의 Temperature값을 표시한다.

 

 

❗ 다음엔 실시간 시각화 OpenSearch를 사용해보자. ❗

+ Recent posts