iot 실습하다가 Kinesis Data Streams와 Kinesis Data Firehose 차이가 와닿지 않아 개념을 좀 정리해보려 한다..

Amazon Kinesis

  • 실시간 스트리밍 데이터를 손쉽게 수집, 처리 및 분석 가능
  • 모든 규모의 스트리밍 데이터를 비용 효율적으로 처리할 수 있는 핵심 기능, 애플리케이션 요구 사항에 가장 적합한 도구를 선택할 수 있는 유연성 제공
  • 실시간 데이터: (비디오, 오디오, 애플리케이션 로그, 웹 사이트 클릭 스트림, IoT 텔레메트리 등)

Kinesis Video Streams

  • 분석, ML(기계 학습), 재생 및 기타 처리를 위해 커넥티드 디바이스에서 AWS로 비디오를 쉽고 안전하게 스트리밍
  • 수백만 대의 디바이스의 스트리밍 비디오 데이터 수집하는 데 필요한 인프라 자동으로 프로비저닝, 탄력적 스케일링

Kinesis Data Analytics

Kinesis Data Streams

  • 데이터 스트림
  • 기능:

Kinesis Data Firehose

  • 전송 스트림
  • 기능:


여기서 문제가 되는 건... Data Streams와 Data Firehose의 차이다!

Video Stream 과 Data Analytics는 이름만 봐도 알 수 있을 정도로 역할이 명확히 나뉘는데...😑

Kinesis Data Stream Console


Kinesis Data Firehose Console

소스: Kinesis Data Streams, Direct PUT

대상: OpenSearch Service, Redshift, S3, Coralogix, Datadog, Dynatrace, HTTP 엔드포인트 등... 

Kinesis Data Stream은 실시간으로 data들을 받아들일 수 있는 입구이자 저장소의 역할을 한다.

한 시스템이 실시간으로 데이터를 전송하면 해당 Data Stream을 듣고 있던 다른 시스템이 해당 데이터를 받아 처리

약간 pipeline이자 메시지 큐와 같은 느낌? 


Kinesis Data Firehose의 목적은 미리 정의된 대상(Destination)에 데이터를 안전하게 전달하는 것이다.

대상의 경우 S3 bucket, ElasticSearch, Amazon Redshift 등 데이터레이크의 역할을 할 수 있는 다양한 저장소를 의미한다.

중간에 lambda를 이용해 가공하는 작업도 가능!



❗ 다시 한번 더 정리를 해보면 ❗

  • Data Stream - low latency streaming service / Firehose - data transfer service
  • Data Stream은 길게는 일주일 까지 데이터를 잠시 저장할 수 있지만, Firehose의 경우 데이터 저장의 기능이 없음
  • Data Stream은 Stream에서 데이터를 꺼내와 작업하는 느낌이라면 Firehose는 데이터를 직접 Destination에 전달
  • Data Stream은 여러개의 Consumer를 지정할 수 있지만 Firehose는 단일 Destination을 가짐
  • Data Stream은 샤드 수를 조정하여 수동으로 Scailing, but Firehose는 데이터 요청에 따라 Scailing이 자동으로


이걸 보면 알 수 있을 듯?!

Data Streams의 경우 바로 lambda로 가공을 시작하지만, Data Firehose의 경우 S3의 저장을 한 뒤에 가공을 하네?!

얼마 안남았다...! 마지막 lab!!

  • S3 버킷

버킷을 생성한다.(나머진 default로 진행)


  • Kinesis Data Firehose 생성

Browse를 눌러 위에서 생성했던 버킷 지정해준다.

위와 같이 추가 설정을 진행한 후 생성을 클릭


  • IoT Core 규칙 생성

위와 같이 규칙 작업을 추가해준다.


  • S3 버킷 확인하기

.gz 파일 객체 확인

→ 디바이스의 메시지가 S3 데이터 레이크에 저장되었음을 확인.




❗ 위의 설계도 처럼 사용하기도 하지만 보통 OpenSearch의 경우 비싸 많이 사용하지 않는다. 오히려 S3 뒤쪽으로 분석 쪽 서비스들을 더해 주로 사용하게 될듯! ❗


(S3 + Glue + Athena로 분석하고 QuickSight로 시각화)

1편에서 장치 설치 하고 오기!

이어서 진행해보자..

Kinesis Data Streams

  • 스트림 생성



  • DB 생성



  • IAM 역할 생성 - Lambda

  • IAM 역할 생성 - IoT

권한의 경우 알아서 추가해줌.. 그대로 진행하면 됨


  • Lambda 함수 생성 및 트리거 추가

역할의 경우 위에서 만들었던 IAM Lambda 역할을 지정해주자.


다음으로 트리거 추가를 하자. 먼저 Kinesis!

함수에 코드를 추가해주자.. 추가 후에 꼭 Deploy 버튼 누르기!!!!!!

from __future__ import print_function
import base64
import boto3
from boto3.dynamodb.conditions import Key, Attr
import json
import os
import traceback

#-----Dynamo Info change here------
TABLE_NAME = os.environ.get('TABLE_NAME', "default")
DDB_PRIMARY_KEY = "deviceid"
DDB_SORT_KEY = "timestamp"
DDB_ATTR = "temp"
#-----Dynamo Info change here------

dynamodb = boto3.resource('dynamodb')
table  = dynamodb.Table(TABLE_NAME)

This Kinesis data(Json Image) is below
    "DEVICE_NAME": $device_name,
    "TIMESTAMP": $TimeStamp(yyyy-mm-ddThh:MM:SS),
    "HUMIDITY" : int,
    "TEMPERATURE" : int
def checkItem(str_data):
        #String to Json object
        json_data = json.loads(str_data)
        # adjust your data format
        resDict = {
            "HUMIDITY": json_data['HUMIDITY'],
            "TEMPERATURE": json_data['TEMPERATURE']

        return resDict

    except Exception as e:
        return None

def writeItemInfo(datas):
    ItemInfoDictList = []
        for data in datas:
            itemDict = checkItem(data)
            if None != itemDict:
            # if data does not have key info, just pass
                print("Error data found:{}".format(data))

    except Exception as e:
        print("Error on writeItemInfo")

    return ItemInfoDictList

def DynamoBulkPut(datas):
        putItemDictList = writeItemInfo(datas)
        with table.batch_writer() as batch:
            for putItemDict in putItemDictList:
                batch.put_item(Item = putItemDict)

    except Exception as e:
        print("Error on DynamoBulkPut()")
        raise e

def decodeKinesisData(dataList):
    decodedList = []
        for data in dataList:
            payload =  base64.b64decode(data['kinesis']['data'])

        return decodedList

    except Exception as e:
        print("Error on decodeKinesisData()")
        raise e

# call by Lambda here.
def lambda_handler(event, context):
    print("lambda_handler start")

        print("---------------json inside----------------")
        encodeKinesisList = event['Records']
        decodedKinesisList = decodeKinesisData(encodeKinesisList)
        # Dynamo Put
        if 0 < len(decodedKinesisList):
            print("there is no valid data in Kinesis stream, all data passed")


    except Exception as e:
        # This is sample source. When error occur this return success and ignore the error.
        raise e


환경 변수 지정을 위해 설정을 들어가자 (구성 → 환경변수)

DynamoDB 테이블을 넣어주자


IoT Core 확인

  • 메시지 라우팅 규칙 생성

(메시지 라우팅 → 규칙)

from 뒤에는 아까 생성했던 디바이스의 topic을 넣어준다.


IoT와 Kinesis를 연결해준다. IoT 디바이스에서 데이터를 받아와 Kinesis로 흐르게

Role도 아까 생성했던 iot 역할로 지정해준다.


현재 iot 디바이스 연결도 잘 되어 있는 것을 볼 수 있음.


  • Kinesis Stream 모니터링

또, 위에서 연결했던 Kinesis의 모니터링을 보면 데이터가 잘 들어오는 것을 볼 수 있다.

(지표들 중에서 GetRecords - 합계 | PutRecords - 합계 값이 0이 아닌지 확인)

시간이 좀 지나야 해당 그래프를 볼 수 있음...!

  • DynamoDB 모니터링

라이브 항목 수 가져오기 클릭

항목 탐색을 확인해보자.


  • Lambda 모니터링

(Invocations 그래프가 끝에 떨어지는 모양은 내가 cloud9 실행을 멈췄기 때문/ 스크립트 계속 돌리면 그래프 유지됨.)


API 용 Lambda 구성

  • Lambda 함수 만들기

해당 코드를 넣어준다.(Deploy 클릭)

from __future__ import print_function
import boto3
from boto3.dynamodb.conditions import Key
import datetime
import json
import traceback
import os

#-----Dynamo Info change here------
TABLE_NAME = os.environ.get('TABLE_NAME', "default")
DDB_PRIMARY_KEY = "deviceid"
DDB_SORT_KEY = "timestamp"
#-----Dynamo Info change here------

dynamodb = boto3.resource('dynamodb')
table  = dynamodb.Table(TABLE_NAME)

def dynamoQuery(deviceid, requestTime):
    print("dynamoQuery start")
    valList = []
    res = table.query(
            Key(DDB_PRIMARY_KEY).eq(deviceid) &
            ScanIndexForward = False,
            Limit = 30

    for row in res['Items']:
        val = row['TEMPERATURE']
        itemDict = {

    return valList

# call by Lambda here.
#  Event structure : API-Gateway Lambda proxy post
def lambda_handler(event, context):
    #Lambda Proxy response back template
    HttpRes = {
        "statusCode": 200,
        "headers": {"Access-Control-Allow-Origin" : "*"},
        "body": "",
        "isBase64Encoded": False

        print("lambda_handler start")

        # get Parameters
        pathParameters = event.get('pathParameters')
        deviceid = pathParameters["deviceid"]
        requestTime ='%Y-%m-%dT%H:%M:%S')

        resItemDict = { deviceid : ""}
        resItemDict[deviceid] = dynamoQuery(deviceid, requestTime)
        HttpRes['body'] = json.dumps(resItemDict)

    except Exception as e:
        HttpRes["statusCode"] = 500
        HttpRes["body"] = "Lambda error. check lambda log"

    return HttpRes

위에서 했던 것과 같이 DynamoDB 환경변수를 넣어준다.


API Gateway

  • REST API 생성

(작업 → 리소스 생성)

(/datas 클릭 후 리소스 생성)

리소스 경로 괄호에 주의하라

(/deviceid에서 매서드 생성)

권한 추가는 확인을 클릭해준다.

위와 같은 화면이 구성되면 완료.

(작업 → CORS 활성화)

/{deviceid} 에서 cors 활성화!

(작업 → API 배포)

/{deviceid} 에서 api 배포를 진행하자.

생성된 url을 복사해두자.


DrawGraph.zip을 받고

(js/createGraph.js 파일)

첫번째 줄 device_name에 만들었던 디바이스 입력

두번째 줄 hosturl에 api url 입력


다운받았던 폴더에서 html 브라우저에서 실행하면

다음과 같은 그래프가 나오면 성공이다.(해당 그래프 나올 때까지의 시간이 걸릴 수 있음!)

위의 값은 DynamoDB에 저장된 디바이스의 Temperature값을 표시한다.



❗ 다음엔 실시간 시각화 OpenSearch를 사용해보자. ❗

