[AWS] Amazon Athena 사용법

먼저 Athena가 뭐하는 친군지 살펴보자. Amazon Athena 표준 SQL을 사용해 S3에 저장된 데이터를 간편하게 분석할 수 있는 대화식 쿼리 서버리스 서비스

저번 포스팅에 이어서 Glue까지 사용을 해보자. 사용 전에 Glue가 뭔지 부터 확인하자.


AWS Glue

  • 분석, 기계 학습 및 애플리케이션 개발을 위해 데이터를 쉽게 탐색, 준비, 조합할 수 있도록 지원하는 서버리스 데이터 통합 서비스
  • 데이터 통합: 해당 개발을 위해 데이터를 준비하고 결합하는 프로세스
    • 데이터 검색 및 추출
    • 데이터 강화, 정리, 정규화 및 결합
    • 데이터베이스, 데이터 웨어하우스 및 데이터 레이크에 데이터 로드 및 구성 등
  • 기능
    • Data Catalog: 모든 데이터 자산을 위한 영구 메타데이터 스토어
      • 모든 AWS 데이터 세트에서 검색: 자동으로 통계를 계산하고 파티션을 등록, 데이터 변경 사항 파악
      • Crawlers: 소스/대상 스토어에 연결해 우선순위가 지정된 Classifiers을 거치면서 데이터의 스키마 결정, 메타 데이터 생성
      • Stream schema registries: Apache Avro 스키마를 사용해 스트리밍 데이터의 변화를 검증하고 제어
    • Data Integration and ETL(Extract / Transform / Load)
      • Studio Job Notebooks: Studio에서 최소한으로 설정할 수 있는 서버리스 노트북
      • Interactive Sessions: 데이터 통합 작업 개발을 간소화, 엔지니어와 대화식으로 데이터 탐색, 준비
      • ETL 파이프라인 구축: 여러 개의 작업을 병렬로 시작하거나 작업 간에 종속성을 지정
      • Studio: 분산 처리를 위한 확정성이 뛰어난 ETL 작업 가능, 에디터에서 ETL 프로세스를 정의하면 Glue가 자동으로 코드 생성
      • 등등...
    • Glue DataBrew: 시각적 데이터 준비 도구(사전 빌드된 250개 이상의 변환 구성 중 선택해서 코드 없이 가능)

솔직히 뭐라고 하는지 모르겠다.. 직접 써보는게 답!

Crawler로 실습을 진행해보자.


  • Glue로 테이블 만들기

크롤러를 생성해주자.

데이터 스토어는 저번 포스팅 때 가져왔었던 데이터셋이 위치한 버킷으로 지정할 것!

Create new IAM role을 통해 새로 생성하고 지정해준다. 타겟 DB까지 정해주면 끝!

크롤러를 돌려주자(생성된 크롤러 선택 후 Run 버튼 클릭)

Glue → Tables에 들어가면 테이블이 하나 더 추가된 것을 볼 수 있다.

Athena로 돌아가 다음 쿼리를 실행해 데이터를 확인해보자.

select * from amazon_review_glue_parquet limit 10;


  • Views 만들기

다시 Athena로 돌아왔다.(Athena의 view는 물리적 테이블이 아닌 논리적 테이블!)

저장된 쿼리에서 Athena_create_view_top_rated를 클릭한다.

위 이미지 처럼 선택 후 실행한다. 아래와 같이 보기에 추가된 것을 확인할 수 있다.

두번째 쿼리를 선택한 뒤 실행해 등급별로 상위 10개 제품을 확인해보자.


  • 결과 확인(S3 버킷)

S3에서 athena-workshop-<account-id> 로 시작하는 버킷을 찾아 들어가면 처음엔 비어있던 버킷이 Athena에서 돌렸던 쿼리문을 기준으로 폴더들이 생성되어 있는 것을 볼 수 있고, Athena_compare_reviews 접두사 중 하나를 찾아보면 쿼리 ID와 함께 저장된 결과를 볼 수 있음.




❗ 그래서 결과적으로 Glue Crawler가 무슨 역할을 했냐...? ❗

크롤러 작동 방식:

데이터를 분류하여 raw data의 포맷, 스키마 및 관련 속성 결정 / 데이터를 테이블 혹은 파티션으로 분류 / 메타데이터를 Data Catalog에 작성


위와 같은 것들을 crawler를 생성하고 실행하면 알아서 해주는 느낌인듯?

데이터 스토어를 지금은 하나를 지정했지만, 여러 개 병렬로도 가능하고...

지금 했던 실습 같은 경우는 아주 단순한 경우인 거고, 여러 방식으로 활용이 가능하다!

(그리고 Athena는 쿼리를 돌려서 테이블을 만들었지만, 크롤러는 그런게 없이 알아서 돌려주네?!)

먼저 Athena가 뭐하는 친군지 살펴보자.

Amazon Athena

  • 표준 SQL을 사용해 S3에 저장된 데이터를 간편하게 분석할 수 있는 대화식 쿼리 서버리스 서비스
  • 그냥 단순히 S3에 저장된 데이터를 가리키고 스키마를 정의한 후 표준 SQL을 사용하여 쿼리를 시작하면 됨
  • AWS Glue Data Catalog와 즉시 통합됨
    • 다양한 서비스에 걸쳐 통합된 메타데이터 리포지토리를 생성하고, 데이터 원본을 크롤링하여 스키마를 검색하고 카탈로그를 신규 및 수정된 테이블 정의와 파티션 정의로 채우며, 스키마 버전을 관리할 수 있음
  • 기능
    • 표준 SQL을 사용한 간편한 쿼리: 오픈 소스 분산 SQL 쿼리 엔진(Presto) 사용
    • 쿼리당 비용 지불: 실행한 쿼리에 대한 비용 지불(각 쿼리에서 스캔한 데이터 양에 따라 요금 부과)
    • 빠른 성능: 자동으로 쿼리를 병렬로 실행, 대규모 데이터 세트에서도 빠른 결과 얻음
    • 기계학습: SQL 쿼리에서 SageMaker 기계학습 모델을 호출하여 추론 실행 가능

등.. 많은 기능이 있다! 그러면 이러한 좋은 서비스 Athena를 사용해볼까?

aws workshop을 따라 가는 중..

  • CloudFormation

CloudFormation(버지니아 북부)를 통해 밑바탕을 깔자.



# Purpose:  Creates Athena Workgroups, Named Queries, IAM Users via AWS CloudFormation
AWSTemplateFormatVersion: "2010-09-09"
Description: |
  Athena Immersion Day - Creates Athena Workgroups, Named Queries, IAM Users

    Type: "AWS::S3::Bucket"
      BucketName: !Join [ "-", ["athena-workshop", Ref: "AWS::AccountId"]] 

    Type: AWS::Athena::WorkGroup
      Name: workgroupA
      RecursiveDeleteOption: true
        PublishCloudWatchMetricsEnabled: true
          OutputLocation: !Join [ "", ["s3://" , Ref: AthenaWorkShopBucket, "/"]]

    Type: AWS::Athena::WorkGroup
      Name: workgroupB
      RecursiveDeleteOption: true

    Type: AWS::Athena::WorkGroup
      Name: AmazonAthenaIcebergPreview
      RecursiveDeleteOption: true
        EnforceWorkGroupConfiguration: true
         SelectedEngineVersion: Athena engine version 2       
        PublishCloudWatchMetricsEnabled: true
          OutputLocation: !Join [ "", ["s3://" , Ref: AthenaWorkShopBucket, "/"]]

    Type: AWS::Athena::NamedQuery
      Database: "default"
      Description: "Create table amazon_reviews_tsv"
      Name: "Athena_create_amazon_reviews_tsv"
      QueryString: |
                    CREATE EXTERNAL TABLE amazon_reviews_tsv (
                    marketplace string, 
                    customer_id string, 
                    review_id string, 
                    product_id string, 
                    product_parent string, 
                    product_title string, 
                    product_category string, 
                    star_rating int, 
                    helpful_votes int, 
                    total_votes int, 
                    vine string, 
                    verified_purchase string, 
                    review_headline string, 
                    review_body string, 
                    review_date date,
                    year int)
                    ROW FORMAT DELIMITED
                    FIELDS TERMINATED BY '\t'
                    ESCAPED BY '\\'
                    LINES TERMINATED BY '\n'
                    TBLPROPERTIES ("skip.header.line.count"="1");  

    Type: AWS::Athena::NamedQuery
      Database: "default"
      Description: "Create table amazon_reviews_parquet"
      Name: "Athena_create_amazon_reviews_parquet"
      QueryString: |
                    CREATE EXTERNAL TABLE amazon_reviews_parquet(
                    marketplace string, 
                    customer_id string, 
                    review_id string, 
                    product_id string, 
                    product_parent string, 
                    product_title string, 
                    star_rating int, 
                    helpful_votes int, 
                    total_votes int, 
                    vine string, 
                    verified_purchase string, 
                    review_headline string, 
                    review_body string, 
                    review_date bigint, 
                    year int)
                    PARTITIONED BY (product_category string)
                    ROW FORMAT SERDE 
                    STORED AS INPUTFORMAT 

                    /* Next we will load the partitions for this table */
                    MSCK REPAIR TABLE amazon_reviews_parquet;

                    /* Check the partitions */
                    SHOW PARTITIONS amazon_reviews_parquet;

    Type: AWS::Athena::NamedQuery
      Database: "default"
      Description: "Reviews Ratings table amazon_reviews_tsv"
      Name: "Athena_compare_reviews"
      QueryString: |
                    /* Let's try to find the products and their corresponding category by number of reviews and avg star rating */
                    SELECT product_id, product_category, product_title, count(*) as num_reviews, avg(star_rating) as avg_stars
                    FROM amazon_reviews_tsv 
                    GROUP BY 1, 2, 3
                    ORDER BY 4 DESC
                    limit 10;

                    /* Let's try to find the products and their corresponding category by number of reviews and avg star rating on parquet table */
                    SELECT product_id, product_category, product_title, count(*) as num_reviews, avg(star_rating) as avg_stars
                    FROM amazon_reviews_parquet 
                    GROUP BY 1, 2, 3
                    ORDER BY 4 DESC
                    limit 10;
                    /* Let's try to find the products by number of reviews and avg star rating in Mobile_Apps category */
                    SELECT product_id, product_title, count(*) as num_reviews, avg(star_rating) as avg_stars
                    FROM amazon_reviews_tsv where product_category='Mobile_Apps'
                    GROUP BY 1, 2
                    ORDER BY 3 DESC
                    limit 10;

                    /* Let's try to find the products by number of reviews and avg star rating in Mobile_Apps category */
                    SELECT product_id, product_title, count(*) as num_reviews, avg(star_rating) as avg_stars
                    FROM amazon_reviews_parquet where product_category='Mobile_Apps'
                    GROUP BY 1, 2
                    ORDER BY 3 DESC
                    limit 10;

    Type: AWS::Athena::NamedQuery
      Database: "default"
      Description: "Create View TopRatedProducts"
      Name: "Athena_create_view_top_rated"
      QueryString: |
                    CREATE view topratedproducts AS
                    SELECT product_category,
                            count(*) count_reviews
                    FROM amazon_reviews_parquet
                    WHERE star_rating=5
                    GROUP BY  1, 2, 3
                    ORDER BY  4 desc;

                    Select * from topratedproducts limit 10;

    Type: AWS::Athena::NamedQuery
      Database: "default"
      Description: "CTAS Amazon Reviews by Marketplace"
      Name: "Athena_ctas_reviews"
      QueryString: |
                    CREATE TABLE amazon_reviews_by_marketplace
                    WITH ( format='PARQUET', parquet_compression = 'SNAPPY', partitioned_by = ARRAY['marketplace', 'year'], 
                    external_location =   's3://<<Athena-WorkShop-Bucket>>/athena-ctas-insert-into/') AS
                    SELECT customer_id,
                            year(review_date) AS year
                    FROM amazon_reviews_tsv
                    WHERE "$path" LIKE '%tsv.gz';

                    /* Let's try to find the products and their corresponding category by number of reviews and avg star rating for US marketplace in year 2015 */

                    SELECT product_id,
                            count(*) AS num_reviews,
                            avg(star_rating) AS avg_stars
                    FROM amazon_reviews_by_marketplace
                    WHERE marketplace='US'
                    AND year=2015
                    GROUP BY  1, 2, 3
                    ORDER BY  4 DESC limit 10; 

    Type: AWS::Athena::NamedQuery
      Database: "default"
      Description: "Compare query performance"
      Name: "Athena_compare_reviews_marketplace"
      QueryString: |
                    SELECT product_id, COUNT(*) FROM amazon_reviews_by_marketplace
                    WHERE marketplace='US' AND year = 2013
                    GROUP BY 1 ORDER BY 2 DESC LIMIT 10;

                    SELECT product_id, COUNT(*) FROM amazon_reviews_parquet
                    WHERE marketplace='US' AND year = 2013
                    GROUP BY 1 ORDER BY 2 DESC LIMIT 10;

                    SELECT product_id, COUNT(*) FROM amazon_reviews_tsv
                    WHERE marketplace='US' AND extract(year from review_date) = 2013
                    GROUP BY 1 ORDER BY 2 DESC LIMIT 10;
    Type: AWS::Athena::NamedQuery
      Database: "default"
      Description: "Top 10 routes delayed by more than 1 hour"
      Name: "Athena_flight_delay_60"
      QueryString: |
                    SELECT origin, dest, count(*) as delays
                    FROM flight_delay_parquet
                    WHERE depdelayminutes > 60
                    GROUP BY origin, dest
                    ORDER BY 3 DESC
                    LIMIT 10;

    Type: "AWS::SecretsManager::Secret"
      Description: Athena Workshop User Password
      Name: "/athenaworkshopuser/password"
        SecretStringTemplate: '{}'
        GenerateStringKey: "password"
        PasswordLength: 30

    Type: "AWS::IAM::User"
      Path: "/"
        Password: !Sub '{{resolve:secretsmanager:${LabsUserPassword}:SecretString:password}}'
        PasswordResetRequired: false
        - PolicyName: "Athena-WorkgroupA-Policy"
            Version: '2012-10-17'
            - Effect: Allow
              - s3:Put*
              - s3:Get*
              - s3:List*
              - glue:*
              - cloudwatch:*
              - athena:ListNamedQueries
              - athena:ListWorkGroups
              - athena:GetExecutionEngine
              - athena:GetExecutionEngines
              - athena:GetNamespace
              - athena:GetCatalogs
              - athena:GetNamespaces
              - athena:GetTables
              - athena:GetTable
              Resource: "*"
            - Effect: Allow
              - athena:StartQueryExecution
              - athena:GetQueryResults
              - athena:DeleteNamedQuery
              - athena:GetNamedQuery
              - athena:ListQueryExecutions
              - athena:StopQueryExecution
              - athena:GetQueryResultsStream
              - athena:ListNamedQueries
              - athena:CreateNamedQuery
              - athena:GetQueryExecution
              - athena:BatchGetNamedQuery
              - athena:BatchGetQueryExecution
              - !Join [ "", ["arn:aws:athena:us-east-1:", Ref: "AWS::AccountId" , ":workgroup/workgroupA"]]              
            - Effect: Allow
              - athena:DeleteWorkGroup
              - athena:UpdateWorkGroup
              - athena:GetWorkGroup
              - athena:CreateWorkGroup
              - !Join [ "", ["arn:aws:athena:us-east-1:", Ref: "AWS::AccountId" , ":workgroup/workgroupA"]]
      UserName: "userA"

    Type: "AWS::IAM::User"
      Path: "/"
        Password: !Sub '{{resolve:secretsmanager:${LabsUserPassword}:SecretString:password}}'
        PasswordResetRequired: false
        - PolicyName: "Athena-WorkgroupA-Policy"
            Version: '2012-10-17'
            - Effect: Allow
              - s3:Put*
              - s3:Get*
              - s3:List*
              - glue:*
              - athena:ListWorkGroups
              - athena:GetExecutionEngine
              - athena:GetExecutionEngines
              - athena:GetNamespace
              - athena:GetCatalogs
              - athena:GetNamespaces
              - athena:GetTables
              - athena:GetTable
              Resource: "*"
            - Effect: Allow
              - athena:StartQueryExecution
              - athena:GetQueryResults
              - athena:DeleteNamedQuery
              - athena:GetNamedQuery
              - athena:ListQueryExecutions
              - athena:StopQueryExecution
              - athena:GetQueryResultsStream
              - athena:ListNamedQueries
              - athena:CreateNamedQuery
              - athena:GetQueryExecution
              - athena:BatchGetNamedQuery
              - athena:BatchGetQueryExecution
              - !Join [ "", ["arn:aws:athena:us-east-1:", Ref: "AWS::AccountId" , ":workgroup/workgroupB"]]              
            - Effect: Allow
              - athena:DeleteWorkGroup
              - athena:UpdateWorkGroup
              - athena:GetWorkGroup
              - athena:CreateWorkGroup
              - !Join [ "", ["arn:aws:athena:us-east-1:", Ref: "AWS::AccountId" , ":workgroup/workgroupB"]]
      UserName: "userB"  
    Description: S3 bucket
    Value: !Ref AthenaWorkShopBucket

    Description: LoginUrl
    Value: !Join ["", ["https://", Ref: "AWS::AccountId" , ""]]

    Description: AWS Secrets URL to find the generated password for User A and User B
    Value: !Sub '${AWS::Region}#/secret?name=/athenaworkshopuser/password'  

스택의 리소스나 이벤트를 확인해보면 생성 결과 → 쿼리문 파일들 + Athena 작업그룹 + IAM User A/B + S3 버킷 정도?

사실 이번 포스팅에서 User를 나누는 의미는 없다. 나중에 이어서 실습할 내용들에 있어서 필요한 유저들..


  • Dataset 확인

데이터셋은 workshop에서 제공해주는 s3 버킷에 담겨있다.



  • Athena 기본 설정

작업 그룹에 대해 CloudWatch 지표를 활성화 한다.

CloudFormation을 통해 만들어진 작업 그룹들이다. 현재는 primary 기준으로 진행할 것!

(primary → 편집)

쿼리들을 실행하기 전에 먼저 Athena 설정에서 S3 버킷을 지정해줘야한다.

쿼리 편집기 → 설정보기

CloudFormation에서 생성했던 s3로 지정해준다.


  • Athena - 테이블 생성 및 쿼리 실행

default 데이터베이스에서 실행할 예정. 만약 데이터베이스가 없다면 'CREATE database default' 실행.

저장된 쿼리에서 Athena_create_amazon_review_tsv를 클릭후 편집기에 해당 쿼리문이 열리면 실행하자.

위 이미지와 같이 테이블이 만들어지면 성공


다음으론 저장된 쿼리에서 Athena_create_amazon_reviews_parquet를 클릭.

쿼리문에서 한 번에 하나의 쿼리를 선택하고 실행하자.(주석 기준으로 나눠서 드래그 후 실행)

딱 여기까지.. 다음 쿼리문들은 파티셔닝 관련 명령이다.

테이블 하나 더 생김!


  • Athena - Partitioning Data

데이터를 분할하여 각 쿼리에서 스캔하는 데이터의 양을 제한하여 성능을 개선하고 비용을 절감할 수있음.

모든 키로 데이터를 분할할 수 있다. (보통 시간을 기준으로 데이터를 분할)

- 참고:


그럼 amazon_reviews_parquet 테이블을 기준으로 파티셔닝을 해보자.

위와 같이 두 명령을 각각 실행해보자.

그러면 파티션 추가 완료.


  • Athena - 테이블 간 성능 테스트

저장된 쿼리에서 Athena_compare_reviews를 열자.

쿼리를 하나씩 선택해서 실행해 각 결과를 비교해보자.(스캔한 데이터 양과 소요 시간 차이 확인)


파티셔닝 전의 테이블: amazon_reviews_tsv

평균 리뷰별 상위 10개 제픔
모바일 앱 카테고리의 평균 리뷰 기준 상위 10개 제품

파티셔닝 후의 테이블: amazon_reviews_parquet

평균 리뷰별 상위 10개 제품
모바일 앱 카테고리의 평균 리뷰 기준 상위 10개 제품


확실히 파티셔닝 후의 테이블의 성능이 뛰어나다는 것을 확인할 수 있다.



기본적인 SQL문(Select From 뭐시기... 이런 것)들은 알고 있는데..

파티셔닝 관련한 명령은 알지 못해서.. 간단히 정리해본다.


파티션을 사용하는 테이블을 생성하려면 CREATE TABLE 문에 PARTITIONED BY 절을 사용

기본 문법:

first string,
last string,
username string
PARTITIONED BY (id string)
STORED AS parquet

테이블을 생성하고 쿼리를 위해 파티션에 데이터를 로드하는 명령

(참고로 parquet(파켓)은 Apache Hadoop 에코 시스템의 오픈소스 열 지향 데이터 저장 형식)

이후에 Hive 스타일 파티션의 경우 MSCK REPAIR TABLE을 실행(지금 했던 방식)

아니라면 ALTER TABLE ADD PARTITON을 사용해 파티션을 수동으로 추가

CREATE EXTERNAL TABLE amazon_reviews_parquet(
marketplace string, 
customer_id string, 
review_id string, 
product_id string, 
product_parent string, 
product_title string, 
star_rating int, 
helpful_votes int, 
total_votes int, 
vine string, 
verified_purchase string, 
review_headline string, 
review_body string, 
review_date bigint, 
year int)
PARTITIONED BY (product_category string)

/* Next we will load the partitions for this table */
MSCK REPAIR TABLE amazon_reviews_parquet;

/* Check the partitions */
SHOW PARTITIONS amazon_reviews_parquet;

위의 코드블럭은 실습 진행하면서 사용했던 쿼리문이다.(어렵다 어려워...)




❗ 다음 포스팅에선 Glue와 어떤 식으로 통합해 사용하는지 알아보자 ❗

이제 좀 실질적으로 필요한 실습을 진행해보자..(많이 사용할만 한걸로다가 😅)

  • 버킷 생성

이름 알아서 고유하게 잘 설정한다.

이후에 로그 샘플 파일을 업로드 한다.

이와 같은 형태로 주욱 나열되어 있음(의미는 없음)


  • Glue Crawler 생성

이름: Demo-Athena-log-crawler

Data store 추가(위에서 생성했던 버킷 경로로!)

IAM Role: 이름 적당히 해서 새로 생성 후 지정

Database는 새로 생성

실행 시키자!

로그 데이터 양에 따라 걸리는 시간이 달라질 것!


테이블을 확인해보면,

다음과 같이 로그 파일에 알맞는 스키마가 생성된 것을 확인할 수 있다.


  • Athena로 쿼리

해당 버킷을 설정해주자.

테이블 미리보기를 누른 화면이다.. 아까 Glue에서 생성했던 테이블 결과를 확인할 수 있다.


원하는 쿼리로 실행해서 잘 이용하면 됨!!!

Athena 사용사례

- 여러 로그파일이 저장된 S3에서 필요한 데이터를 조회

- 정형화된 메타데이터 혹은 저장 데이터를 조회

- 이벤트 데이터에서 필요한 정보를 추출(A/B테스트) 등


❗ 지금은 로그 샘플 파일이라 적고 별거 없지만, 실제 프로젝트 로그의 경우 아주 많기 때문에 오류나 이런거 찾기에 좋을듯! ❗

