저번 포스팅에 이어서 Glue까지 사용을 해보자. 사용 전에 Glue가 뭔지 부터 확인하자.
AWS Glue
분석, 기계 학습 및 애플리케이션 개발을 위해 데이터를 쉽게 탐색, 준비, 조합할 수 있도록 지원하는 서버리스 데이터 통합 서비스
데이터 통합: 해당 개발을 위해 데이터를 준비하고 결합하는 프로세스
데이터 검색 및 추출
데이터 강화, 정리, 정규화 및 결합
데이터베이스, 데이터 웨어하우스 및 데이터 레이크에 데이터 로드 및 구성 등
기능
Data Catalog: 모든 데이터 자산을 위한 영구 메타데이터 스토어
모든 AWS 데이터 세트에서 검색: 자동으로 통계를 계산하고 파티션을 등록, 데이터 변경 사항 파악
Crawlers: 소스/대상 스토어에 연결해 우선순위가 지정된 Classifiers을 거치면서 데이터의 스키마 결정, 메타 데이터 생성
Stream schema registries: Apache Avro 스키마를 사용해 스트리밍 데이터의 변화를 검증하고 제어
Data Integration and ETL(Extract / Transform / Load)
Studio Job Notebooks: Studio에서 최소한으로 설정할 수 있는 서버리스 노트북
Interactive Sessions: 데이터 통합 작업 개발을 간소화, 엔지니어와 대화식으로 데이터 탐색, 준비
ETL 파이프라인 구축: 여러 개의 작업을 병렬로 시작하거나 작업 간에 종속성을 지정
Studio: 분산 처리를 위한 확정성이 뛰어난 ETL 작업 가능, 에디터에서 ETL 프로세스를 정의하면 Glue가 자동으로 코드 생성
등등...
Glue DataBrew: 시각적 데이터 준비 도구(사전 빌드된 250개 이상의 변환 구성 중 선택해서 코드 없이 가능)
솔직히 뭐라고 하는지 모르겠다.. 직접 써보는게 답!
Crawler로 실습을 진행해보자.
Glue로 테이블 만들기
크롤러를 생성해주자.
데이터 스토어는 저번 포스팅 때 가져왔었던 데이터셋이 위치한 버킷으로 지정할 것!
Create new IAM role을 통해 새로 생성하고 지정해준다. 타겟 DB까지 정해주면 끝!
크롤러를 돌려주자(생성된 크롤러 선택 후 Run 버튼 클릭)
Glue → Tables에 들어가면 테이블이 하나 더 추가된 것을 볼 수 있다.
Athena로 돌아가 다음 쿼리를 실행해 데이터를 확인해보자.
select * from amazon_review_glue_parquet limit 10;
Views 만들기
다시 Athena로 돌아왔다.(Athena의 view는 물리적 테이블이 아닌 논리적 테이블!)
저장된 쿼리에서 Athena_create_view_top_rated를 클릭한다.
위 이미지 처럼 선택 후 실행한다. 아래와 같이 보기에 추가된 것을 확인할 수 있다.
두번째 쿼리를 선택한 뒤 실행해 등급별로 상위 10개 제품을 확인해보자.
결과 확인(S3 버킷)
S3에서 athena-workshop-<account-id> 로 시작하는 버킷을 찾아 들어가면 처음엔 비어있던 버킷이 Athena에서 돌렸던 쿼리문을 기준으로 폴더들이 생성되어 있는 것을 볼 수 있고, Athena_compare_reviews 접두사 중 하나를 찾아보면 쿼리 ID와 함께 저장된 결과를 볼 수 있음.
❗ 그래서 결과적으로 Glue Crawler가 무슨 역할을 했냐...? ❗
크롤러 작동 방식:
데이터를 분류하여 raw data의 포맷, 스키마 및 관련 속성 결정 / 데이터를 테이블 혹은 파티션으로 분류 / 메타데이터를 Data Catalog에 작성
# Purpose: Creates Athena Workgroups, Named Queries, IAM Users via AWS CloudFormation #=============================================================================== AWSTemplateFormatVersion: "2010-09-09" Description: | Athena Immersion Day - Creates Athena Workgroups, Named Queries, IAM Users
/* Next we will load the partitions for this table */ MSCK REPAIR TABLE amazon_reviews_parquet;
/* Check the partitions */ SHOW PARTITIONS amazon_reviews_parquet;
qryamazonreviewstsv: Type: AWS::Athena::NamedQuery Properties: Database: "default" Description: "Reviews Ratings table amazon_reviews_tsv" Name: "Athena_compare_reviews" QueryString: | /* Let's try to find the products and their corresponding category by number of reviews and avg star rating */ SELECT product_id, product_category, product_title, count(*) as num_reviews, avg(star_rating) as avg_stars FROM amazon_reviews_tsv GROUP BY 1, 2, 3 ORDER BY 4 DESC limit 10;
/* Let's try to find the products and their corresponding category by number of reviews and avg star rating on parquet table */ SELECT product_id, product_category, product_title, count(*) as num_reviews, avg(star_rating) as avg_stars FROM amazon_reviews_parquet GROUP BY 1, 2, 3 ORDER BY 4 DESC limit 10;
/* Let's try to find the products by number of reviews and avg star rating in Mobile_Apps category */ SELECT product_id, product_title, count(*) as num_reviews, avg(star_rating) as avg_stars FROM amazon_reviews_tsv where product_category='Mobile_Apps' GROUP BY 1, 2 ORDER BY 3 DESC limit 10;
/* Let's try to find the products by number of reviews and avg star rating in Mobile_Apps category */ SELECT product_id, product_title, count(*) as num_reviews, avg(star_rating) as avg_stars FROM amazon_reviews_parquet where product_category='Mobile_Apps' GROUP BY 1, 2 ORDER BY 3 DESC limit 10;
TopReviewedStarRatedProductsv: Type: AWS::Athena::NamedQuery Properties: Database: "default" Description: "Create View TopRatedProducts" Name: "Athena_create_view_top_rated" QueryString: | CREATE view topratedproducts AS SELECT product_category, product_id, product_title, count(*) count_reviews FROM amazon_reviews_parquet WHERE star_rating=5 GROUP BY 1, 2, 3 ORDER BY 4 desc;
Select * from topratedproducts limit 10;
ctas: Type: AWS::Athena::NamedQuery Properties: Database: "default" Description: "CTAS Amazon Reviews by Marketplace" Name: "Athena_ctas_reviews" QueryString: | CREATE TABLE amazon_reviews_by_marketplace WITH ( format='PARQUET', parquet_compression = 'SNAPPY', partitioned_by = ARRAY['marketplace', 'year'], external_location = 's3://<<Athena-WorkShop-Bucket>>/athena-ctas-insert-into/') AS SELECT customer_id, review_id, product_id, product_parent, product_title, product_category, star_rating, helpful_votes, total_votes, verified_purchase, review_headline, review_body, review_date, marketplace, year(review_date) AS year FROM amazon_reviews_tsv WHERE "$path" LIKE '%tsv.gz';
/* Let's try to find the products and their corresponding category by number of reviews and avg star rating for US marketplace in year 2015 */
SELECT product_id, product_category, product_title, count(*) AS num_reviews, avg(star_rating) AS avg_stars FROM amazon_reviews_by_marketplace WHERE marketplace='US' AND year=2015 GROUP BY 1, 2, 3 ORDER BY 4 DESC limit 10;
comparereviews: Type: AWS::Athena::NamedQuery Properties: Database: "default" Description: "Compare query performance" Name: "Athena_compare_reviews_marketplace" QueryString: | SELECT product_id, COUNT(*) FROM amazon_reviews_by_marketplace WHERE marketplace='US' AND year = 2013 GROUP BY 1 ORDER BY 2 DESC LIMIT 10;
SELECT product_id, COUNT(*) FROM amazon_reviews_parquet WHERE marketplace='US' AND year = 2013 GROUP BY 1 ORDER BY 2 DESC LIMIT 10;
SELECT product_id, COUNT(*) FROM amazon_reviews_tsv WHERE marketplace='US' AND extract(year from review_date) = 2013 GROUP BY 1 ORDER BY 2 DESC LIMIT 10;
flights: Type: AWS::Athena::NamedQuery Properties: Database: "default" Description: "Top 10 routes delayed by more than 1 hour" Name: "Athena_flight_delay_60" QueryString: | SELECT origin, dest, count(*) as delays FROM flight_delay_parquet WHERE depdelayminutes > 60 GROUP BY origin, dest ORDER BY 3 DESC LIMIT 10;
ConsolePassword: Description: AWS Secrets URL to find the generated password for User A and User B Value: !Sub 'https://console.aws.amazon.com/secretsmanager/home?region=${AWS::Region}#/secret?name=/athenaworkshopuser/password'
스택의 리소스나 이벤트를 확인해보면 생성 결과 → 쿼리문 파일들 + Athena 작업그룹 + IAM User A/B + S3 버킷 정도?
사실 이번 포스팅에서 User를 나누는 의미는 없다. 나중에 이어서 실습할 내용들에 있어서 필요한 유저들..
쿼리를 하나씩 선택해서 실행해 각 결과를 비교해보자.(스캔한 데이터 양과 소요 시간 차이 확인)
파티셔닝 전의 테이블: amazon_reviews_tsv
평균 리뷰별 상위 10개 제픔모바일 앱 카테고리의 평균 리뷰 기준 상위 10개 제품
파티셔닝 후의 테이블: amazon_reviews_parquet
평균 리뷰별 상위 10개 제품모바일 앱 카테고리의 평균 리뷰 기준 상위 10개 제품
확실히 파티셔닝 후의 테이블의 성능이 뛰어나다는 것을 확인할 수 있다.
기본적인 SQL문(Select From 뭐시기... 이런 것)들은 알고 있는데..
파티셔닝 관련한 명령은 알지 못해서.. 간단히 정리해본다.
파티션을 사용하는 테이블을 생성하려면 CREATE TABLE 문에 PARTITIONED BY 절을 사용
기본 문법:
CREATE EXTERNAL TABLE users (
first string,
last string,
username string
)
PARTITIONED BY (id string)
STORED AS parquet
LOCATION 's3://DOC-EXAMPLE-BUCKET/folder/'
테이블을 생성하고 쿼리를 위해 파티션에 데이터를 로드하는 명령
(참고로 parquet(파켓)은 Apache Hadoop 에코 시스템의 오픈소스 열 지향 데이터 저장 형식)
이후에 Hive 스타일 파티션의 경우 MSCK REPAIR TABLE을 실행(지금 했던 방식)
아니라면 ALTER TABLE ADD PARTITON을 사용해 파티션을 수동으로 추가
CREATE EXTERNAL TABLE amazon_reviews_parquet(
marketplace string,
customer_id string,
review_id string,
product_id string,
product_parent string,
product_title string,
star_rating int,
helpful_votes int,
total_votes int,
vine string,
verified_purchase string,
review_headline string,
review_body string,
review_date bigint,
year int)
PARTITIONED BY (product_category string)
ROW FORMAT SERDE
'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'
STORED AS INPUTFORMAT
'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat'
OUTPUTFORMAT
'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'
LOCATION
's3://amazon-reviews-pds/parquet/';
/* Next we will load the partitions for this table */
MSCK REPAIR TABLE amazon_reviews_parquet;
/* Check the partitions */
SHOW PARTITIONS amazon_reviews_parquet;
import boto3
region = 'ap-northeast-2'
instances = []
ec2_r = boto3.resource('ec2')
ec2 = boto3.client('ec2', region_name=region)
for instance in ec2_r.instances.all():
for tag in instance.tags:
if tag['Key'] == 'auto-schedule':
if tag['Value'] == 'True':
instances.append(instance.id)
def lambda_handler(event, context):
ec2.start_instances(InstanceIds=instances)
print('started your instances: ' + str(instances))
제한 시간을 30초로 변경해준다.
그리고 그냥 아무 테스트 만들어서 돌려준다. 성공 로그 뜨면 완료.
3) EC2 stop 함수 생성
코드만 바뀌고 다른 과정은 위의 EC2 start 함수와 같다.
import boto3
region = 'ap-northeast-2'
instances = []
ec2_r = boto3.resource('ec2')
ec2 = boto3.client('ec2', region_name=region)
for instance in ec2_r.instances.all():
for tag in instance.tags:
if tag['Key'] == 'auto-schedule':
if tag['Value'] == 'True':
instances.append(instance.id)
def lambda_handler(event, context):
ec2.stop_instances(InstanceIds=instances)
print('stopped your instances: ' + str(instances))
해당 ec2가 중지 중인걸 볼 수 있음!
EventBridge 설정
규칙을 원하는대로 설정해서 생성하면 된다.(월-금 오전 9시 - 오후 6시까지 start, 나머지 stop)