대학교 시절 프로젝트 성으로 잠깐 약간 맛만 봤었는데 이번 게임데이 준비하면서 좀 더 깊게 파볼 생각...

딥 러닝 기반 시각 분석 서비스 인데.. 신기하다 신기해! 약간 YOLO랑 비슷한 느낌?

 

참고: https://aws.amazon.com/ko/blogs/machine-learning/build-your-own-face-recognition-service-using-amazon-rekognition


  • Indexing(blue flow): 나중에 분석하기 위해 얼굴 이미지를 컬렉션으로 가져오는 프로세스
  • Analysis(black flow): 인덱스 내에서 일치하는 얼굴 컬렉션 쿼리하는 프로세스

  • CloudFormation 실행(한번에 다 설치해줌...)

템플릿 다운

위의 이미지는 스택 생성 후 생성된 리소스들이다. 과정 하나씩 살펴보자.

  • rekognition create-collection
aws rekognition create-collection --collection-id family-collection

위의 명령어를 AWS CLI에 입력해 collection을 생성하는 과정을 거친다.

 

  • DynamoDB create-table

RekognitionId 를 파티션 키로 family-collection 테이블을 생성했다.

 

  • IndexFaces 작업

S3 버킷 생성

 

IAM 역할 설정

1) S3 객체에 액세스할 수 있는 권한을 lambda에 부여

2) Rekognition의 IndexFaces 함수를 시작하고 FaceId 간의 매핑을 위해 DynamoDB key-value 저장소 내에 여러 항목 생성  가능

더보기
{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Action": [
                "logs:CreateLogGroup",
                "logs:CreateLogStream",
                "logs:PutLogEvents"
            ],
            "Resource": "arn:aws:logs:*:*:*",
            "Effect": "Allow"
        },
        {
            "Action": [
                "s3:GetObject"
            ],
            "Resource": [
                "arn:aws:s3:::training-bucket-221004/*"
            ],
            "Effect": "Allow"
        },
        {
            "Action": [
                "rekognition:IndexFaces"
            ],
            "Resource": "*",
            "Effect": "Allow"
        },
        {
            "Action": [
                "dynamodb:PutItem"
            ],
            "Resource": "arn:aws:dynamodb:<Region>:<ACCOUNT-ID>:table/family-collection",
            "Effect": "Allow"
        }
    ]
}

해당 IAM 역할을 Lambda 함수에 붙여주자.

 

마지막으로 새 사진이 S3에 업로드될 때마다 트리거되는 Lambda 함수 생성(트리거 추가)

 

해당 lambda 함수에 코드 넣어주기.

1) Rekognition IndexFaces API를 사용하여 입력 이미지에서 얼굴을 감지하고 지정된 컬렉션에 추가

2) 성공하면 S3에 있는 객체의 메타데이터에서 사람의 전체 이름을 검색, 그런 다음 나중에 참조할 수 있도록 DynamoDB 테이블에 FaceId가 있는 key-value 튜플로 저장

더보기
import os
import boto3
from decimal import Decimal
import json
import urllib

print('Loading function')

dynamodb = boto3.client('dynamodb')
s3 = boto3.client('s3')
rekognition = boto3.client('rekognition')


# --------------- Helper Functions ------------------

def index_faces(bucket, key):
    CollectionId = os.getenv("CollectionName")

    response = rekognition.index_faces(
        Image={"S3Object": {"Bucket": bucket, "Name": key}},
        CollectionId=CollectionId,
    )
    return response


def update_index(tableName,faceId, fullName):
    response = dynamodb.put_item(
        TableName=tableName,
        Item={
        'RekognitionId': {'S': faceId},
        'FullName': {'S': fullName}
        }
    )

# --------------- Main handler ------------------

def lambda_handler(event, context):

    # Get the object from the event
    bucket = event['Records'][0]['s3']['bucket']['name']
    key = urllib.parse.unquote_plus(event['Records'][0]['s3']['object']['key'], encoding='utf-8')
              

    try:
        # Calls Amazon Rekognition IndexFaces API to detect faces in S3 object
        # to index faces into specified collection
        response = index_faces(bucket, key)

        # Commit faceId and full name object metadata to DynamoDB
        if response['ResponseMetadata']['HTTPStatusCode'] == 200:
            faceId = response['FaceRecords'][0]['Face']['FaceId']

            ret = s3.head_object(Bucket=bucket,Key=key)
            personFullName = ret['Metadata']['fullname']

            update_index(os.getenv("DynamoDBTableName"), faceId, personFullName)

        # Print response to console.
        print(response)

        return response
    except Exception as e:
        print(e)
        print("Error processing {} from bucket {}. ".format(key, bucket)) 
        raise e

 

이후에 이미지를 S3에 업로드한 뒤에 해당 코드를 통해 Face collection을 시드할 수 있다.

(현재는 S3 버킷에 이미지가 없음....;; 따라서 코드 실행에 에러 나는게 당연하다..?)

 

해당 컬렉션이 잘 만들어졌는지 확인하기 위해선

aws rekognition list-faces --collection-id family-collection

 

위의 명령어를 사용하면 된다.

 

  • 분석 작업

컬렉션이 채워지면 얼굴이 포함된 다른 이미지를 전달하여 쿼리가 가능하다.

SearchFacesByImageAPI를 사용하여 쿼리할 컬렉션 이름과 분석할 이미지에 대한 참조 두 가지 이상의 매개변수 제공

아래의 코드는 이미지를 bytestream으로 제출하는 방법을 보여준다.

(응답의 경우 일치 항목의 FaceId가 포함된 JSON 객체를 반환, (메타데이터 중 신뢰도 점수, 이미지 내 얼굴의 좌표))

더보기
import boto3
import io
from PIL import Image

rekognition = boto3.client('rekognition', region_name='eu-west-1')
dynamodb = boto3.client('dynamodb', region_name='eu-west-1')
    
image = Image.open("group1.jpeg")
stream = io.BytesIO()
image.save(stream,format="JPEG")
image_binary = stream.getvalue()


response = rekognition.search_faces_by_image(
        CollectionId='family_collection',
        Image={'Bytes':image_binary}                                       
        )
    
for match in response['FaceMatches']:
    print (match['Face']['FaceId'],match['Face']['Confidence'])
        
    face = dynamodb.get_item(
        TableName='family_collection',  
        Key={'RekognitionId': {'S': match['Face']['FaceId']}}
        )
    
    if 'Item' in face:
        print (face['Item']['FullName']['S'])
    else:
        print ('no match found in person lookup')

 

 

❗ 우씨 뭐가뭔지 모르겠다... Rekognition 구조부터 파악하고 해봐야할듯?! ❗

'Cloud > AWS' 카테고리의 다른 글

[AWS] Lambda Layer  (0) 2022.10.07
[AWS] Amazon Rekognition 개념  (1) 2022.10.05
[AWS] EC2로 MQTT 통신 테스트  (0) 2022.09.27
[AWS] Amazon Kinesis 개념  (0) 2022.09.26
[AWS] IoT Core 살펴보기 -4(데이터 레이크)  (0) 2022.09.26

IoT core 말고 ec2로도 iot 디바이스와 연결할 수 있게 구성해보려 한다.. 나자신 화이팅!

먼저 디바이스와 통신할 수 있는 프로토콜에 대해 먼저 알아보자.(더보기 클릭)

 

더보기

IoT 통신 프로토콜

  • Bluetooth: 스마트 기기와 페어링하기 위한 웨어러블 기술 제공
  •  WiFi: 많은 양의 데이터를 제어하는 능력과 빠른 데이터 전송 속도(과도한 전력 소모가 단점)
  • ZigBee: 블루투스와 비슷, 산업체를 위해 설계된 느낌, 
  • MQTT IoT: Message Queue Telemety Transport, 원격지에서 모니터링하는 곳, TCP
  • CoAP: Constrained Application Protocol, 제한된 스마트 장치를 위해, HTTP
  • DDS: 고성능의 확장 가능한 실시간 M2M 통신을 위한 표준, 실시간 분산 애플리케이션
  • NFC: 안전한 양방향 통신 연결, 비접촉식 결제 거래
  • Cellular: 더 먼 거리에서 작동해야하는 느낌, 많은 비용과 높은 전력 소비
  • AMQP: 응용 프로그램 계층 프로토콜, Exchange / Message Queue / Binding
  • LoRaWAN: 장거리 광역 네트워크, IoT 프로토콜 광역 네트워크용
  • RFID: 무선 주파수 식별
  • Z-wave: IoT 프로토콜 저전력 RF, 무선 주파수 통신, 주로 홈 자동화 응용 프로그램에 사용
  • Sigfox: Cellular 및 WiFi 속성을 모두 포함
  • 실: 가장 최근의 IoT 프로토콜 중 하나, 홈 자동화 앱에서 사용, IPv6
  • EnOcean: 무선 감지 및 에너지 수확 플랫폼, 다양한 상황에서 응답이 필요할 때

 


AWS IoT Core가 제공하는 디바이스 통신 프로토콜은 MQTT HTTPS 이다.

https://docs.aws.amazon.com/ko_kr/iot/latest/developerguide/protocols.html

 

메시지를 게시하고 구독하기 위한 MQTT / MQTT over WebSocket(WSS) 프로토콜,

메시지를 게시하기 위한 HTTPS 프로토콜을 사용하는 디바이스 및 클라이언트를 지원!

 

MQTT

  • 제약된 디바이스를 위해 설계된 경량의 메시징 프로토콜
  • clientId로 식별되는 디바이스 연결을 지원
  • AWS IoT Device SDK 지원(디바이스를 AWS IoT에 연결할 때 권장되는 방법)
    • 만약 SDK 사용 안하면, 필요한 연결과 통신 보안을 제공하면 됨
  • MQTT QoS level 0과 1만 지원(QoS level 2의 게시 또는 구독 지원 안함..)
    • QoS level 0: 신뢰할 수 있는 통신 링크를 통해 전송되거나 누락되어도 문제 없는 메시지에 이용
    • QoS level 1: 전송한 사람이 PUBACK 응답을 수신해 성공적인 전달을 나타낼 때까지는 완료 안한 것

https://www.hardcopyworld.com/?p=3369

HTTPS

  • HTTP 1.0 또는 1.1 사용해 REST API에 요청하여 메시지 게시 가능
  • clientId 값 지원 안함
  • 클라이언트별 엔드포인트 및 주제별 URL에 대한 POST 요청을 함으로서 메시지를 게시
    • https://IoT_data_endpoint/topics/url_encoded_topic_name?qos=1"
    • IoT_data_endpoint: AWS IoT 디바이스 데이터 엔드포인트
    • url_encoded_topic_name: 전송되는 메시지의 전체 topic name

EC2에 MQTT를 올리는 작업을 해보자.

참고로 MQTT 브로커 프로그램에는 ActiveMQ, Apollo, IBM Message, Sight, RabbitMQ, Mosquitto 등이 자주 사용됨.

실습에는 Mosquitto를 사용할 예정!

( https://github.com/mqtt/mqtt.org/wiki/server-support : 브로커 특징 비교 자료)

 

  • EC2에 mosquitto 설치
sudo amazon-linux-extras install epel //epel 설치
sudo yum -y install mosquitto //mosquitto 설치
sudo systemctl start mosquitto && sudo systemctl enable mosquitto //mosquitto 서비스 시작
sudo systemctl status mosquitto // mosquitto 서비스 상태 확인

 

  • Mosquitto 사용

Mosquitto: Message 브로커로 pub&sub 명령어 지원

토픽을 구독하고 메시지를 배포!

mosquitto_sub -d -t my-topic

mosquitto_pub -d -t my-topic -m "message check"

창 두개를 띄워서 한쪽 창에서 sub 먼저 하고 다른 창에서 pub 실행해보자.

 

로그 확인

sudo tail -f /var/log/messages

 

New client가 연결된 것을 볼 수 있다!

 

 

❗ 사실 이렇게만 보면 뭘 했는지 와닿진 않는다.. 메시지를 구독(sub)하고 게시(pub)하는 건 알겠는데.. 실질적으로 IoT에서 어떻게 활용되냐 이말이야.. ❗

iot 실습하다가 Kinesis Data Streams와 Kinesis Data Firehose 차이가 와닿지 않아 개념을 좀 정리해보려 한다..


Amazon Kinesis

  • 실시간 스트리밍 데이터를 손쉽게 수집, 처리 및 분석 가능
  • 모든 규모의 스트리밍 데이터를 비용 효율적으로 처리할 수 있는 핵심 기능, 애플리케이션 요구 사항에 가장 적합한 도구를 선택할 수 있는 유연성 제공
  • 실시간 데이터: (비디오, 오디오, 애플리케이션 로그, 웹 사이트 클릭 스트림, IoT 텔레메트리 등)

Kinesis Video Streams

  • 분석, ML(기계 학습), 재생 및 기타 처리를 위해 커넥티드 디바이스에서 AWS로 비디오를 쉽고 안전하게 스트리밍
  • 수백만 대의 디바이스의 스트리밍 비디오 데이터 수집하는 데 필요한 인프라 자동으로 프로비저닝, 탄력적 스케일링

Kinesis Data Analytics

Kinesis Data Streams

  • 데이터 스트림
  • 기능: https://aws.amazon.com/ko/kinesis/data-streams/features/?nc=sn&loc=2

Kinesis Data Firehose

  • 전송 스트림
  • 기능: https://aws.amazon.com/ko/kinesis/data-firehose/?nc=sn&loc=2&dn=3

 

여기서 문제가 되는 건... Data Streams와 Data Firehose의 차이다!

Video Stream 과 Data Analytics는 이름만 봐도 알 수 있을 정도로 역할이 명확히 나뉘는데...😑


Kinesis Data Stream Console

 

Kinesis Data Firehose Console

소스: Kinesis Data Streams, Direct PUT

대상: OpenSearch Service, Redshift, S3, Coralogix, Datadog, Dynatrace, HTTP 엔드포인트 등... 

Kinesis Data Stream은 실시간으로 data들을 받아들일 수 있는 입구이자 저장소의 역할을 한다.

한 시스템이 실시간으로 데이터를 전송하면 해당 Data Stream을 듣고 있던 다른 시스템이 해당 데이터를 받아 처리

약간 pipeline이자 메시지 큐와 같은 느낌? 

 

Kinesis Data Firehose의 목적은 미리 정의된 대상(Destination)에 데이터를 안전하게 전달하는 것이다.

대상의 경우 S3 bucket, ElasticSearch, Amazon Redshift 등 데이터레이크의 역할을 할 수 있는 다양한 저장소를 의미한다.

중간에 lambda를 이용해 가공하는 작업도 가능!

 

 

❗ 다시 한번 더 정리를 해보면 ❗

  • Data Stream - low latency streaming service / Firehose - data transfer service
  • Data Stream은 길게는 일주일 까지 데이터를 잠시 저장할 수 있지만, Firehose의 경우 데이터 저장의 기능이 없음
  • Data Stream은 Stream에서 데이터를 꺼내와 작업하는 느낌이라면 Firehose는 데이터를 직접 Destination에 전달
  • Data Stream은 여러개의 Consumer를 지정할 수 있지만 Firehose는 단일 Destination을 가짐
  • Data Stream은 샤드 수를 조정하여 수동으로 Scailing, but Firehose는 데이터 요청에 따라 Scailing이 자동으로

sample

이걸 보면 알 수 있을 듯?!

Data Streams의 경우 바로 lambda로 가공을 시작하지만, Data Firehose의 경우 S3의 저장을 한 뒤에 가공을 하네?!

참고 1: https://realyun99.tistory.com/153

참고 2: https://realyun99.tistory.com/154

참고 3: https://realyun99.tistory.com/155

얼마 안남았다...! 마지막 lab!!


  • S3 버킷

버킷을 생성한다.(나머진 default로 진행)

 

  • Kinesis Data Firehose 생성

Browse를 눌러 위에서 생성했던 버킷 지정해준다.

위와 같이 추가 설정을 진행한 후 생성을 클릭

 

  • IoT Core 규칙 생성

위와 같이 규칙 작업을 추가해준다.

 

  • S3 버킷 확인하기

.gz 파일 객체 확인

→ 디바이스의 메시지가 S3 데이터 레이크에 저장되었음을 확인.

 

 

 

❗ 위의 설계도 처럼 사용하기도 하지만 보통 OpenSearch의 경우 비싸 많이 사용하지 않는다. 오히려 S3 뒤쪽으로 분석 쪽 서비스들을 더해 주로 사용하게 될듯! ❗

sample

(S3 + Glue + Athena로 분석하고 QuickSight로 시각화)

참고 1: https://realyun99.tistory.com/153

참고 2: https://realyun99.tistory.com/154

위의 포스팅 부터 확인하고 오세요~


OpenSearch Service

(Apache 2.0 라이선스 하에 제공되는 분산형 커뮤니티 기반 100% 오픈소스 검색 및 분석 제품군)

(실시간 애플리케이션 모니터링, 로그 분석 및 웹 사이트 검색과 같이 다양한 사용 사례에 사용됨)

  • CloudFormation 작업

해당 파일을 다운로드.

cfn.yaml

그 후에 스택을 생성해주자.

파라미터에 SourceIP에는 내 ip를, ThingName에는 저번에  생성했던 iot 디바이스 이름을 넣어주자.

(꽤나 오래 걸린다..)

 

스택 리소스에서 Domain을 클릭해 들어가자.

해당 url에 들어가자.

 

<Stack 생성 완료의 결과물>

- OpenSearch 도메인

- IoT 규칙(opensearch_rule)

- IoT 규칙에서 사용되는 IAM 역할

 

  • OpenSearch 작업

(메뉴 → Discover)

스크립트를 돌리는 상태에서 실행해야함(cloud9)

위와 같은 화면이 보이고 Create index pattern을 클릭하자.

자동으로 데이터가 분석되는 상황을 확인할 수 있다.

 

다시 메뉴 → Discover로 돌아가보면

OpenSearch 대시보드 화면에 데이터가 등록되어 있는지 확인이 가능하다.

 

(메뉴 → Visualize)

Line 선택 → timestamp* 선택

위와 같이 설정해주고 업데이트 클릭

SAVE 버튼 누르고 원하는 title 달아서 저장 가능

(입맛대로 설정을 변경해서 새 그래프를 추가해 사용 사례에 맞는 대시보드로 활용하면 될듯!)

 

 

❗ 세상이 참 좋아졌다는 것을 느낀다😁 ❗

+ Recent posts