1편에서 장치 설치 하고 오기!

https://realyun99.tistory.com/153

이어서 진행해보자..


Kinesis Data Streams

  • 스트림 생성

 

DynamoDB

  • DB 생성

 

Lambda

  • IAM 역할 생성 - Lambda

  • IAM 역할 생성 - IoT

권한의 경우 알아서 추가해줌.. 그대로 진행하면 됨

 

  • Lambda 함수 생성 및 트리거 추가

역할의 경우 위에서 만들었던 IAM Lambda 역할을 지정해주자.

 

다음으로 트리거 추가를 하자. 먼저 Kinesis!

함수에 코드를 추가해주자.. 추가 후에 꼭 Deploy 버튼 누르기!!!!!!

더보기
from __future__ import print_function
import base64
import boto3
from boto3.dynamodb.conditions import Key, Attr
import json
import os
import traceback

#-----Dynamo Info change here------
TABLE_NAME = os.environ.get('TABLE_NAME', "default")
DDB_PRIMARY_KEY = "deviceid"
DDB_SORT_KEY = "timestamp"
DDB_ATTR = "temp"
#-----Dynamo Info change here------

dynamodb = boto3.resource('dynamodb')
table  = dynamodb.Table(TABLE_NAME)

'''
This Kinesis data(Json Image) is below
{
    "DEVICE_NAME": $device_name,
    "TIMESTAMP": $TimeStamp(yyyy-mm-ddThh:MM:SS),
    "HUMIDITY" : int,
    "TEMPERATURE" : int
}
'''
def checkItem(str_data):
    try:
        #String to Json object
        json_data = json.loads(str_data)
        # adjust your data format
        resDict = {
            DDB_PRIMARY_KEY:json_data['DEVICE_NAME'],
            DDB_SORT_KEY:json_data['TIMESTAMP'],
            "HUMIDITY": json_data['HUMIDITY'],
            "TEMPERATURE": json_data['TEMPERATURE']

        }
        print("resDict:{}".format(resDict))
        return resDict

    except Exception as e:
        print(traceback.format_exc())
        return None

def writeItemInfo(datas):
    ItemInfoDictList = []
    try:
        for data in datas:
            itemDict = checkItem(data)
            if None != itemDict:
                ItemInfoDictList.append(itemDict)
            # if data does not have key info, just pass
            else:
                print("Error data found:{}".format(data))
                pass

    except Exception as e:
        print(traceback.format_exc())
        print("Error on writeItemInfo")

    return ItemInfoDictList

def DynamoBulkPut(datas):
    try:
        putItemDictList = writeItemInfo(datas)
        with table.batch_writer() as batch:
            for putItemDict in putItemDictList:
                batch.put_item(Item = putItemDict)
        return

    except Exception as e:
        print("Error on DynamoBulkPut()")
        raise e

def decodeKinesisData(dataList):
    decodedList = []
    try:
        for data in dataList:
            payload =  base64.b64decode(data['kinesis']['data'])
            print("payload={}".format(payload))
            decodedList.append(payload)

        return decodedList

    except Exception as e:
        print("Error on decodeKinesisData()")
        raise e

#------------------------------------------------------------------------
# call by Lambda here.
#------------------------------------------------------------------------
def lambda_handler(event, context):
    print("lambda_handler start")

    try:
        print("---------------json inside----------------")
        print(json.dumps(event))
        encodeKinesisList = event['Records']
        print(encodeKinesisList)
        decodedKinesisList = decodeKinesisData(encodeKinesisList)
        # Dynamo Put
        if 0 < len(decodedKinesisList):
            DynamoBulkPut(decodedKinesisList)
        else:
            print("there is no valid data in Kinesis stream, all data passed")

        return

    except Exception as e:
        print(traceback.format_exc())
        # This is sample source. When error occur this return success and ignore the error.
        raise e

 

환경 변수 지정을 위해 설정을 들어가자 (구성 → 환경변수)

DynamoDB 테이블을 넣어주자

 

IoT Core 확인

  • 메시지 라우팅 규칙 생성

(메시지 라우팅 → 규칙)

from 뒤에는 아까 생성했던 디바이스의 topic을 넣어준다.

 

IoT와 Kinesis를 연결해준다. IoT 디바이스에서 데이터를 받아와 Kinesis로 흐르게

Role도 아까 생성했던 iot 역할로 지정해준다.

 

현재 iot 디바이스 연결도 잘 되어 있는 것을 볼 수 있음.

 

  • Kinesis Stream 모니터링

또, 위에서 연결했던 Kinesis의 모니터링을 보면 데이터가 잘 들어오는 것을 볼 수 있다.

(지표들 중에서 GetRecords - 합계 | PutRecords - 합계 값이 0이 아닌지 확인)

시간이 좀 지나야 해당 그래프를 볼 수 있음...!

  • DynamoDB 모니터링

라이브 항목 수 가져오기 클릭

항목 탐색을 확인해보자.

 

  • Lambda 모니터링

(Invocations 그래프가 끝에 떨어지는 모양은 내가 cloud9 실행을 멈췄기 때문/ 스크립트 계속 돌리면 그래프 유지됨.)

 

API 용 Lambda 구성

  • Lambda 함수 만들기

해당 코드를 넣어준다.(Deploy 클릭)

더보기
from __future__ import print_function
import boto3
from boto3.dynamodb.conditions import Key
import datetime
import json
import traceback
import os

#-----Dynamo Info change here------
TABLE_NAME = os.environ.get('TABLE_NAME', "default")
DDB_PRIMARY_KEY = "deviceid"
DDB_SORT_KEY = "timestamp"
#-----Dynamo Info change here------

dynamodb = boto3.resource('dynamodb')
table  = dynamodb.Table(TABLE_NAME)

#------------------------------------------------------------------------
def dynamoQuery(deviceid, requestTime):
    print("dynamoQuery start")
    valList = []
    res = table.query(
        KeyConditionExpression=
            Key(DDB_PRIMARY_KEY).eq(deviceid) &
            Key(DDB_SORT_KEY).lt(requestTime),
            ScanIndexForward = False,
            Limit = 30
        )

    for row in res['Items']:
        val = row['TEMPERATURE']
        itemDict = {
            "timestamp":row['timestamp'],
            "value":int(val)
        }
        valList.append(itemDict)

    return valList

#------------------------------------------------------------------------
# call by Lambda here.
#  Event structure : API-Gateway Lambda proxy post
#------------------------------------------------------------------------
def lambda_handler(event, context):
    #Lambda Proxy response back template
    HttpRes = {
        "statusCode": 200,
        "headers": {"Access-Control-Allow-Origin" : "*"},
        "body": "",
        "isBase64Encoded": False
    }

    try:
        print("lambda_handler start")
        print(json.dumps(event))

        # get Parameters
        pathParameters = event.get('pathParameters')
        deviceid = pathParameters["deviceid"]
        requestTime = datetime.datetime.now().strftime('%Y-%m-%dT%H:%M:%S')

        resItemDict = { deviceid : ""}
        resItemDict[deviceid] = dynamoQuery(deviceid, requestTime)
        HttpRes['body'] = json.dumps(resItemDict)

    except Exception as e:
        print(traceback.format_exc())
        HttpRes["statusCode"] = 500
        HttpRes["body"] = "Lambda error. check lambda log"

    print("response:{}".format(json.dumps(HttpRes)))
    return HttpRes

위에서 했던 것과 같이 DynamoDB 환경변수를 넣어준다.

 

API Gateway

  • REST API 생성

(작업 → 리소스 생성)

(/datas 클릭 후 리소스 생성)

리소스 경로 괄호에 주의하라

(/deviceid에서 매서드 생성)

권한 추가는 확인을 클릭해준다.

위와 같은 화면이 구성되면 완료.

(작업 → CORS 활성화)

/{deviceid} 에서 cors 활성화!

(작업 → API 배포)

/{deviceid} 에서 api 배포를 진행하자.

생성된 url을 복사해두자.

 

DrawGraph.zip을 받고

(js/createGraph.js 파일)

첫번째 줄 device_name에 만들었던 디바이스 입력

두번째 줄 hosturl에 api url 입력

 

다운받았던 폴더에서 html 브라우저에서 실행하면

다음과 같은 그래프가 나오면 성공이다.(해당 그래프 나올 때까지의 시간이 걸릴 수 있음!)

위의 값은 DynamoDB에 저장된 디바이스의 Temperature값을 표시한다.

 

 

❗ 다음엔 실시간 시각화 OpenSearch를 사용해보자. ❗

참고: https://catalog.us-east-1.prod.workshops.aws/workshops/b3e0b830-79b8-4c1d-8a4c-e10406600035/ja-JP/phase1

일본어로 된 workshop 자료라... 최신 버전이 이것밖에 없었따..😥 번역기 돌려서 사용하세요

 


Device Setup

  • Cloud9 인스턴스 생성

서울 리전 기준으로 t3.small 이상으로 잡아야 생성 가능!

이 인스턴스가 디바이스 역할을 해줄 예정(나중에 라즈베리파이로도 진행해보자)

 

  • AWS IoT Device SDK Python v2 설치

아래의 명령어를 Cloud9 인스턴스에 입력하자.

pip3 install --user awsiotsdk
mkdir -p ~/environment/dummy_client/certs/
cd ~/environment/dummy_client/
wget https://awsj-iot-handson.s3-ap-northeast-1.amazonaws.com/aws-iot-core-workshop/dummy_client/device_main.py -O device_main.py

 

IoT Core Setup

  • IoT Core 정의

IoT 설정의 디바이스 데이터 엔드포인트 값을 복사한다.

다음으로 로그 관리를 클릭 하고 로그 설정을 해주자.

IAM 역할은 새로 생성해주자!

 

  • IoT Policy 생성

(side menu → 보안 → 정책)

❗ 현재 정책 설정을 모든 작업 모든 리소스를 허용하게끔 되어 있는데, 이건 테스트 용이니까 광범위하게 설정한 것..

프로젝트를 할 때에는 최소한의 권한으로 설정해둘 것! ❗
보통 최소한의 권한의 경우(workshop 기준, 도쿄 리전)

더보기

{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Effect": "Allow",
      "Action": "iot:Connect",
      "Resource": "arn:aws:iot:ap-northeast-1:123456789012:client/${iot:ClientId}"
    },
    {
      "Effect": "Allow",
      "Action": "iot:Publish",
      "Resource": [
        "arn:aws:iot:ap-northeast-1:123456789012:topic/data/${iot:Connection.Thing.ThingName}",
        "arn:aws:iot:ap-northeast-1:123456789012:topic/$aws/things/${iot:Connection.Thing.ThingName}/shadow/update",
        "arn:aws:iot:ap-northeast-1:123456789012:topic/$aws/things/${iot:Connection.Thing.ThingName}/shadow/get"
      ]
    },
    {
      "Effect": "Allow",
      "Action": "iot:Receive",
      "Resource": [
        "arn:aws:iot:ap-northeast-1:123456789012:topic/$aws/things/${iot:Connection.Thing.ThingName}/shadow/update/delta",
        "arn:aws:iot:ap-northeast-1:123456789012:topic/$aws/things/${iot:Connection.Thing.ThingName}/shadow/update/accepted",
        "arn:aws:iot:ap-northeast-1:123456789012:topic/$aws/things/${iot:Connection.Thing.ThingName}/shadow/update/rejected",
        "arn:aws:iot:ap-northeast-1:123456789012:topic/$aws/things/${iot:Connection.Thing.ThingName}/shadow/get/accepted",
        "arn:aws:iot:ap-northeast-1:123456789012:topic/$aws/things/${iot:Connection.Thing.ThingName}/shadow/get/rejected"
      ]
    },
    {
      "Effect": "Allow",
      "Action": [
        "iot:Subscribe"
      ],
      "Resource": [
        "arn:aws:iot:ap-northeast-1:123456789012:topicfilter/$aws/things/${iot:Connection.Thing.ThingName}/shadow/update/delta",
        "arn:aws:iot:ap-northeast-1:123456789012:topicfilter/$aws/things/${iot:Connection.Thing.ThingName}/shadow/update/accepted",
        "arn:aws:iot:ap-northeast-1:123456789012:topicfilter/$aws/things/${iot:Connection.Thing.ThingName}/shadow/update/rejected",
        "arn:aws:iot:ap-northeast-1:123456789012:topicfilter/$aws/things/${iot:Connection.Thing.ThingName}/shadow/get/accepted",
        "arn:aws:iot:ap-northeast-1:123456789012:topicfilter/$aws/things/${iot:Connection.Thing.ThingName}/shadow/get/rejected"
      ]
    }
  ]
}

 

  • 디바이스 생성

(모든 디바이스 → 사물)

단일 사물 생성

새 인증서 자동 생성(권장)

정책의 경우 위에서 생성했던 정책 클릭

인증서랑 키 파일 다운로드

 

  • Cloud9 인스턴스에 키파일 등록

private.pem.key와 certificate.pem.crt 업로드

아래의 명령어 입력

cd ~/environment/dummy_client
wget https://www.amazontrust.com/repository/AmazonRootCA1.pem -O certs/AmazonRootCA1.pem

 

  • Device SDK

해당 명령어들 터미널에 입력한다.

cd ~/environment/dummy_client/
python3 device_main.py --device_name {Device_name} --endpoint {AWS IoT endpoint_url}

해당 커멘드를 실행하면 dummy_client는 IoT Core에 MQTT로 연결하고 5초 마다 메세지 전송(default)

 

  • Device Test

(테스트 → MQTT 테스트 클라이언트)

아까 위에서 실행했던 커멘드 출력들 중 topic을 알아두자.

해당 topic을 구독하면 device에서 보내는 메시지를 확인할 수 있다.

 

  • Device Shadow 확인

해당 클래식 섀도우에 들어가 디바이스 섀도우 문서 편집을 누른다.

해당 코드를 

{
    "state": {
        "reported": {
            "wait_time": 5
        },
        "desired": {
            "wait_time": 1
        }
    }
}

로 변경하고 업데이트 해준다.

업데이트 후 문서에 "welcome": "aws-iot" 가 남아있으면, 

위 이미지처럼 null 값을 넣어준다. wait_time 값을 변경하면 device가 data를 보내는 빈도가 바뀜.

desired: 디바이스에 지시하고 있는 상태

reported: 디바이스로부터 보고된 상태

delta: desired와 reported 상태에 차이가 있을 때 표시됨

섀도우에서 다시 해당 문서를 wait_time을 2로 변경하면 터미널에서도 확인할 수 있다.

MQTT 테스트 쪽에서 주제를 다시 구독한 후 확인하면 데이터가 전송되는 간격이 변경되었다는 것을 확인할 수 있음.

 

 

❗ 다음 포스팅은 애플리케이션용 DB 만들기에 관해 포스팅 올릴 계획!  ❗

참고: https://docs.aws.amazon.com/ko_kr/lambda/latest/dg/with-s3-example.html

 

게임데이 준비하면서 해본 간단한 실습!


  • S3 버킷 생성 및 샘플 객체 업로드

그냥 하고 싶은대로 설정 후 원하는 파일 아무거나 업로드하면 된다.

 

  • Lambda 함수 생성

function blueprint를 사용해 생성할 예정..

샘플 코드를 활용한다고 생각하면 된다!

python으로 진행

위에서 생성했던 버킷으로 트리거를 설정한다.

Amazon S3 가 함수를 호출할 수 있도록 함수의 리소스 기반 정책을 수정하자.

역할 문서를 확인해보자. S3 관련한 정책을 허용하는 내용이 잘 들어가있는지 확인한다.

트리거가 잘 잡혔는지 확인해보자..(나는 위처럼만 하면 트리거 추가가 안되서.. 따로 추가 다시 해줌..)

 

  • Lambda Test

아래 JSON 코드에서 S3 버킷 이름(examplie-bucket)과, 객체 키(test%2Fkey)를 테스트 파일 이름(버킷 안 파일)으로 바꿔준다.

해당 이벤트로 설정 후에 테스트를 해보면

다음과 같이 결과를 얻을 수 있다.

 

  • S3 Trigger Test

S3 버킷에 파일을 업로드할 때 함수를 호출한다.

따라서 파일 업로드를 몇 개 하고 Lambda의 모니터링을 확인해보면

Invocations 그래프의 숫자는 S3 버킷에 업로드한 파일의 수와 일치해야한다.

cloudwatch 로그에서도 확인 가능하다.

참고: https://catalog.us-east-1.prod.workshops.aws/workshops/4a2a9a24-071d-4d96-b9be-0cc57b7db434/en-US

Rekognition에 궁금증이 생겨 좀 다뤄볼까 한다..

(꼭.. us-east-1에서 진행하세요!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!)


Preprocessing

  • S3에 데이터 업로드

버킷을 하나 생성해주고 aws 로고 이미지들을 '버킷/aws' 경로로 올려준다.

 

  • Rekognition 프로젝트 만들기

Rekognition → 사용자 지정 레이블 만들기(Custom Labels)

❗ 프로젝트를 눌러 생성하면 되는데.. 왜 자꾸 처음에 s3 버킷을 생성하라고 뜰까?! ❗

과정을 마치고 해당 버킷으로 돌아와 보면 위 이미지 처럼 객체들이 들어가있다.

아마 생성했던 프로젝트들, 라벨들을 관리(?) 하는 버킷의 느낌이다.

 

 

  • Dataset 생성

single dataset으로 선택 후

아까 생성했던 s3 버킷으로 지정하고 라벨링을 위한 작업을 위해 해당 체크 표시를 해준다.

(Automatically assign image-level labels to images based on the folder name)

 

정책을 복사해 버킷 정책에 붙여 넣는다.

해당 파란색 링크를 클릭하면 알아서 해당 버킷으로 데려가 줌!

정책을 붙여넣고 create dataset 클릭.

 

  • Start labeling

이미지들 선택 후에 Draw bounding boxes 선택(단일 페이지에서만 이미지 선택 가능 - 첫번째 페이지 하고 두번 째)

이런 식으로 aws 로고 선택해주자..

단순 반복 작업...

다 끝내면 Save changes 버튼을 클릭해준다.

 

 

Training

  • Train model

Train model을 클릭한다.

방금 생성한 프로젝트를 선택하고 Train model 클릭

(모델 트레이닝은 50-60분 정도 소요된다...?! 취소하고 싶으면 CLI 환경에서 모델 트레이닝 중지를 시켜야함.. 콘솔 불가)

 

Inference

와... 서울 리전 데모 애플리케이션이 없다... 대충 이런 프로세스 느낌이라는 것만 알면 될 듯...?

(여러분은 us-east-1으로 진행하세요...😥)

https://github.com/aws-samples/amazon-rekognition-custom-labels-demo

과정만 간단히 살펴 보자면..

 모델의 상태가 TRAINING_COMPLETED 인 경우에 훈련이 끝난 것..

모델을 클릭하고 레이블별 성능을 확인해본다. 만족 시 Use model을 클릭해서 사용하면 된다.

 

이후 과정에선 저 위의 링크에 들어가 CloudFormation으로 애플리케이션을 활용해 테스트 해보는 과정인데..

잘못된 리전 선택으로.. 하지 못했..다...ㅠㅠ

무튼 이미지를 넣고 Results에 신뢰도 점수가 표시된다!

그리고 테스트가 끝나면 Stop the model 클릭을 하는 거 까지가 과정이다!!

참고: https://www.youtube.com/watch?v=VDqToPfbuok 

이제 좀 실질적으로 필요한 실습을 진행해보자..(많이 사용할만 한걸로다가 😅)


  • 버킷 생성

이름 알아서 고유하게 잘 설정한다.

이후에 로그 샘플 파일을 업로드 한다.

이와 같은 형태로 주욱 나열되어 있음(의미는 없음)

 

  • Glue Crawler 생성

이름: Demo-Athena-log-crawler

Data store 추가(위에서 생성했던 버킷 경로로!)

IAM Role: 이름 적당히 해서 새로 생성 후 지정

Database는 새로 생성

실행 시키자!

로그 데이터 양에 따라 걸리는 시간이 달라질 것!

 

테이블을 확인해보면,

다음과 같이 로그 파일에 알맞는 스키마가 생성된 것을 확인할 수 있다.

 

  • Athena로 쿼리

해당 버킷을 설정해주자.

테이블 미리보기를 누른 화면이다.. 아까 Glue에서 생성했던 테이블 결과를 확인할 수 있다.

 

원하는 쿼리로 실행해서 잘 이용하면 됨!!!


Athena 사용사례

- 여러 로그파일이 저장된 S3에서 필요한 데이터를 조회

- 정형화된 메타데이터 혹은 저장 데이터를 조회

- 이벤트 데이터에서 필요한 정보를 추출(A/B테스트) 등

 

❗ 지금은 로그 샘플 파일이라 적고 별거 없지만, 실제 프로젝트 로그의 경우 아주 많기 때문에 오류나 이런거 찾기에 좋을듯! ❗

AWS 솔루션을 설계하는 동안 데이터 전송 요금을 놓치는 경우가 많다. 이를 고려하면 비용 절감에 도움을 준다.

조건 하나하나 따져볼 예정..

참고: https://aws.amazon.com/ko/blogs/architecture/overview-of-data-transfer-costs-for-common-architectures/


AWS와 인터넷 간의 데이터 전송

인터넷 → AWS(인바운드): 요금 부과 X

AWS → 인터넷(아웃바운드): 요금 부과

 

AWS 내 데이터 전송

  • 워크로드와 다른 AWS 서비스 간의 데이터 전송
    • 워크로드가 AWS 서비스에 액세스할 때: 요금 부과
  • 동일한 AWS 리전 내 서비스 액세스
    •  IGW를 사용해 AWS의 퍼블릭 엔드포인트 액세스: 요금 부과 X
    • NGW를 사용해 AWS의 퍼블릭 엔드포인트 액세스: 요금 부과(게이트웨이를 통과하는 데이터에 대해)

 

  • AWS 리전에서 서비스 액세스
    • 워크로드가 다른 리전의 서비스에 액세스: 요금 부과

  • 워크로드의 다양한 구성 요소 내에서 데이터 전송
    • 워크로드의 서로 다른 구성 요소 간에 데이터 전송이 있는 경우 요금 부과
  • 동일한 AWS 리전의 워크로드 구성 요소(AZ 관련)
    • 동일한 AZ 내 데이터 전송: 요금 부과 X
    • 고가용성 - 여러 AZ 배포: 요금 부과

  • VPC 피어링 연결(AWS 네트워크의 여러 VPC에 워크로드를 배포)
    • 동일한 AZ 내 VPC 피어링 연결: 요금 부과 X
    • AZ를 교차한 VPC 피어링 연결: 요금 부과

  • Transit Gateway(연결된 VPC, DX, Site-to-Site VPN 요금 포함)

  • 다양한 AWS 리전의 워크로드 구성 요소(리전 간)
    • VPC 피어링: 요금 부과
    • Transit Gateway: 피어의 한쪽에서만 요금 부과

AWS와 온프레미스 데이터 센터 간의 데이터 전송

  • AWS Site-to-Site VPN

  • AWS Direct Connect

  • 여러 리전의 Direct Connect


요금 팁

  • VPC 엔드포인트를 사용하여 AWS 내에서 AWS 서비스에 연결할 때 인터넷을 통한 트래픽 라우팅을 피해라
    • VPC 게이트웨이 엔드포인트를 사용하면 동일한 리전 내에서 데이터 전송 요금 없이 S3, DynamoDB 통신 가능
    • VPC 인터페이스 엔드포인트는 일부 AWS 서비스에서 사용 가능, 시간당 서비스 요금 + 데이터 전송 요금 발생
  • 온프레미스 네트워크에 데이터를 보낼 때 인터넷 대신 Direct Connect 사용
  • AZ 경계를 넘는 트래픽에는 일반적으로 데이터 전송 요금이 부과됨
  • 리전 경계를 넘는 트래픽에도 일반적으로 데이터 전송 요금이 부과됨
  • AWS 프리티어를 활용하라
  • AWS 요금 계산기를 활용하라
  • https://wellarchitectedlabs.com/cost/200_labs/200_enterprise_dashboards/3_create_data_transfer_cost_analysis/ :관련한 워크숍

+ Recent posts