1편에서 장치 설치 하고 오기!
https://realyun99.tistory.com/153
이어서 진행해보자..
Kinesis Data Streams
- 스트림 생성
DynamoDB
- DB 생성
Lambda
- IAM 역할 생성 - Lambda
- IAM 역할 생성 - IoT
권한의 경우 알아서 추가해줌.. 그대로 진행하면 됨
- Lambda 함수 생성 및 트리거 추가
역할의 경우 위에서 만들었던 IAM Lambda 역할을 지정해주자.
다음으로 트리거 추가를 하자. 먼저 Kinesis!
함수에 코드를 추가해주자.. 추가 후에 꼭 Deploy 버튼 누르기!!!!!!
from __future__ import print_function
import base64
import boto3
from boto3.dynamodb.conditions import Key, Attr
import json
import os
import traceback
#-----Dynamo Info change here------
TABLE_NAME = os.environ.get('TABLE_NAME', "default")
DDB_PRIMARY_KEY = "deviceid"
DDB_SORT_KEY = "timestamp"
DDB_ATTR = "temp"
#-----Dynamo Info change here------
dynamodb = boto3.resource('dynamodb')
table = dynamodb.Table(TABLE_NAME)
'''
This Kinesis data(Json Image) is below
{
"DEVICE_NAME": $device_name,
"TIMESTAMP": $TimeStamp(yyyy-mm-ddThh:MM:SS),
"HUMIDITY" : int,
"TEMPERATURE" : int
}
'''
def checkItem(str_data):
try:
#String to Json object
json_data = json.loads(str_data)
# adjust your data format
resDict = {
DDB_PRIMARY_KEY:json_data['DEVICE_NAME'],
DDB_SORT_KEY:json_data['TIMESTAMP'],
"HUMIDITY": json_data['HUMIDITY'],
"TEMPERATURE": json_data['TEMPERATURE']
}
print("resDict:{}".format(resDict))
return resDict
except Exception as e:
print(traceback.format_exc())
return None
def writeItemInfo(datas):
ItemInfoDictList = []
try:
for data in datas:
itemDict = checkItem(data)
if None != itemDict:
ItemInfoDictList.append(itemDict)
# if data does not have key info, just pass
else:
print("Error data found:{}".format(data))
pass
except Exception as e:
print(traceback.format_exc())
print("Error on writeItemInfo")
return ItemInfoDictList
def DynamoBulkPut(datas):
try:
putItemDictList = writeItemInfo(datas)
with table.batch_writer() as batch:
for putItemDict in putItemDictList:
batch.put_item(Item = putItemDict)
return
except Exception as e:
print("Error on DynamoBulkPut()")
raise e
def decodeKinesisData(dataList):
decodedList = []
try:
for data in dataList:
payload = base64.b64decode(data['kinesis']['data'])
print("payload={}".format(payload))
decodedList.append(payload)
return decodedList
except Exception as e:
print("Error on decodeKinesisData()")
raise e
#------------------------------------------------------------------------
# call by Lambda here.
#------------------------------------------------------------------------
def lambda_handler(event, context):
print("lambda_handler start")
try:
print("---------------json inside----------------")
print(json.dumps(event))
encodeKinesisList = event['Records']
print(encodeKinesisList)
decodedKinesisList = decodeKinesisData(encodeKinesisList)
# Dynamo Put
if 0 < len(decodedKinesisList):
DynamoBulkPut(decodedKinesisList)
else:
print("there is no valid data in Kinesis stream, all data passed")
return
except Exception as e:
print(traceback.format_exc())
# This is sample source. When error occur this return success and ignore the error.
raise e
환경 변수 지정을 위해 설정을 들어가자 (구성 → 환경변수)
DynamoDB 테이블을 넣어주자
IoT Core 확인
- 메시지 라우팅 규칙 생성
(메시지 라우팅 → 규칙)
from 뒤에는 아까 생성했던 디바이스의 topic을 넣어준다.
IoT와 Kinesis를 연결해준다. IoT 디바이스에서 데이터를 받아와 Kinesis로 흐르게
Role도 아까 생성했던 iot 역할로 지정해준다.
현재 iot 디바이스 연결도 잘 되어 있는 것을 볼 수 있음.
- Kinesis Stream 모니터링
또, 위에서 연결했던 Kinesis의 모니터링을 보면 데이터가 잘 들어오는 것을 볼 수 있다.
(지표들 중에서 GetRecords - 합계 | PutRecords - 합계 값이 0이 아닌지 확인)
시간이 좀 지나야 해당 그래프를 볼 수 있음...!
- DynamoDB 모니터링
라이브 항목 수 가져오기 클릭
항목 탐색을 확인해보자.
- Lambda 모니터링
(Invocations 그래프가 끝에 떨어지는 모양은 내가 cloud9 실행을 멈췄기 때문/ 스크립트 계속 돌리면 그래프 유지됨.)
API 용 Lambda 구성
- Lambda 함수 만들기
해당 코드를 넣어준다.(Deploy 클릭)
from __future__ import print_function
import boto3
from boto3.dynamodb.conditions import Key
import datetime
import json
import traceback
import os
#-----Dynamo Info change here------
TABLE_NAME = os.environ.get('TABLE_NAME', "default")
DDB_PRIMARY_KEY = "deviceid"
DDB_SORT_KEY = "timestamp"
#-----Dynamo Info change here------
dynamodb = boto3.resource('dynamodb')
table = dynamodb.Table(TABLE_NAME)
#------------------------------------------------------------------------
def dynamoQuery(deviceid, requestTime):
print("dynamoQuery start")
valList = []
res = table.query(
KeyConditionExpression=
Key(DDB_PRIMARY_KEY).eq(deviceid) &
Key(DDB_SORT_KEY).lt(requestTime),
ScanIndexForward = False,
Limit = 30
)
for row in res['Items']:
val = row['TEMPERATURE']
itemDict = {
"timestamp":row['timestamp'],
"value":int(val)
}
valList.append(itemDict)
return valList
#------------------------------------------------------------------------
# call by Lambda here.
# Event structure : API-Gateway Lambda proxy post
#------------------------------------------------------------------------
def lambda_handler(event, context):
#Lambda Proxy response back template
HttpRes = {
"statusCode": 200,
"headers": {"Access-Control-Allow-Origin" : "*"},
"body": "",
"isBase64Encoded": False
}
try:
print("lambda_handler start")
print(json.dumps(event))
# get Parameters
pathParameters = event.get('pathParameters')
deviceid = pathParameters["deviceid"]
requestTime = datetime.datetime.now().strftime('%Y-%m-%dT%H:%M:%S')
resItemDict = { deviceid : ""}
resItemDict[deviceid] = dynamoQuery(deviceid, requestTime)
HttpRes['body'] = json.dumps(resItemDict)
except Exception as e:
print(traceback.format_exc())
HttpRes["statusCode"] = 500
HttpRes["body"] = "Lambda error. check lambda log"
print("response:{}".format(json.dumps(HttpRes)))
return HttpRes
위에서 했던 것과 같이 DynamoDB 환경변수를 넣어준다.
API Gateway
- REST API 생성
(작업 → 리소스 생성)
(/datas 클릭 후 리소스 생성)
리소스 경로 괄호에 주의하라
(/deviceid에서 매서드 생성)
권한 추가는 확인을 클릭해준다.
위와 같은 화면이 구성되면 완료.
(작업 → CORS 활성화)
/{deviceid} 에서 cors 활성화!
(작업 → API 배포)
/{deviceid} 에서 api 배포를 진행하자.
생성된 url을 복사해두자.
DrawGraph.zip을 받고
(js/createGraph.js 파일)
첫번째 줄 device_name에 만들었던 디바이스 입력
두번째 줄 hosturl에 api url 입력
다운받았던 폴더에서 html 브라우저에서 실행하면
다음과 같은 그래프가 나오면 성공이다.(해당 그래프 나올 때까지의 시간이 걸릴 수 있음!)
위의 값은 DynamoDB에 저장된 디바이스의 Temperature값을 표시한다.
❗ 다음엔 실시간 시각화 OpenSearch를 사용해보자. ❗
'Cloud > AWS' 카테고리의 다른 글
[AWS] IoT Core 살펴보기 -4(데이터 레이크) (0) | 2022.09.26 |
---|---|
[AWS] IoT Core 살펴보기 -3(실시간 시각화) (0) | 2022.09.26 |
[AWS] IoT Core 살펴보기 -1(디바이스 설치) (1) | 2022.09.26 |
[AWS] S3 트리거를 사용하여 Lambda 함수 호출 (1) | 2022.09.23 |
[AWS] AWS Rekognition - Custom Labels (1) | 2022.09.23 |