본문 바로가기

AWS Data Analytics/Collection

Collection - Kinesis & SQS

Collection Introduction

 

Real Time - Immediate actions

- Kinesis Data Streams (KDS)

- Simple Queue Service (SQS)

- Internet of Things (IoT)

 

Near-real time - Reactive action 

- Kinesis Data Firehose (KDF)

- Database Migration Service (DS)

 

Batch - Historical Analysis

- Snowball

- Data Pipeline

 


AWS Kinesis Overview

 

- Kinesis는 Apache Kafka에 대한 관리형 서비스 대안

- Application Log, Metric, IoT Clickstream에 적합)

- "real-time" 빅데이터에 적합

- 스트리밍 처리 Framework(Spark, NiFi 등..)에 적합

- 데이터는 3개의 AZ에 동기식으로 복제된다.

 

- Kinesis Stream: 대규모의 저지연 스트리밍 수집

- Kinesis Analytics: SQL을 사용하여 Stream에 대한 실시간 분석 수행

- Kinesis Firehose: Stream을 S3, Redshift, ElasticSearch 및 Splunk로 로드

 

 


Kinesis Streams Overview

 

- Stream은 순서가 지정된 Shard/Partition으로 나뉨

- 데이터 보존은 기본적으로 24시간이며 최대 365일까지 가능

- 데이터 Replocess/Replay 능력이 있음

- 여러 애플리케이션이 동일한 스트림을 사용 가능

- Throughput scale로 실시간 프로세싱 

- Kinesis에 한번 Insert되면 삭제되지 않음 (불변성, Immutability)


Kinesis Streams Shards

 

- 하나의 Stream은 다양한 Shard로 구성

- 요금은 Provisioning된 Sahrd별로 이루어지며 원하는 만큼 Shard를 가질 수 있음

- Batch 또는 Message Call별 가능

- 시간이 지나면서 샤드의 수

- The number of shards can evolve over time (reshard / merge)

- Recods는 Shard별로 정렬

 


Kinesis Streams Records

- Data Blob:

  * data가 바이트 기준으로 직렬화되어 전송, 최대 1MB까지 무엇이든 가능

- Record Key:

  * Recod와 함께 전송되어 Shard에서 Record를 그룹화한는데 도움, 동일 Key = 동일 Shard

  * "Hot Partition" 문제를 피하기 위해 고도로 분산된 Key 사용

 

- Sequence 번호:

   * Shard에 있는 각 Record에 대한 고유 식별자, 수집 후 Kinesis에 의해 추가됨

 

 

 


Kinesis Data Streams Limits to know

 

- Producer:

  ㅇ 1MB/s or 1000 messages/s at write PER SHARD

  ㅇ "ProvisionedThroughputException" otherwise

- Consumer Classic:

  ㅇ 2MB/s at read PER SHARD across all consumers

  ㅇ 5 API calls per second PER SHARD across all consumers

- Consumer Enhanced Fan-Out:

  ㅇ 2MB/s at read PER SHARD, PER ENHANCED CONSUMER

  ㅇ No API calls needed (push model)

- Data Retention:

  ㅇ 24 hours data retention by default

  ㅇ Can be extented to 365 days

 


Kinesis Agent

 

- Monitor Log files and sends them to Kinesis Data Streams

- Java-based agent, built on top of KPL

- Install in Linux-based server environments

- Features:

  * Write from multiple directories and write to multiple streams

  * Routing feature based on directory / log file

  * Pre-process data before sending to streams (single line, csv to json, log to json ..)

  * Emits metrics to CloudWatch for monitoring

 


 

Kinesis Consumers - Classic

- Kinesis SDK

- Kinesis Client LIbrary (KCL)

- Kinesis Connector Library

- 3rd party libraries: Spark, Log4J Appenders, Flume, Kafka Connect...

- Kinesis Firehose

- AWS Lambda

- (Kinesis Consumer Enhanced Fan-Out discussed in the next lecture)

 

 

 

 


Kinesis Consumer SDK - GetRecords

- Classic Kinesis - Records are polled by consumers from a shard 

- Each shard has 2MB total aggregate throughput

- GetRecords returns up to 10MB of data (then throttle for 5 seconds) or up to 10000 records

- Maximum of 5 GetRecords API calls per shard per second = 200ms latency

- If 5 consumers application consume from the same shard, means every consumer can poll once a second and receive less than 400KB/s

 

 


Kinesis Client Library (KCL)

- Java-first library but exists for other languages too (Golang, Python, Ruby, Node, .NET ...)

- Read records from Kinesis produced with the KPL (de-aggregation)

- Share multiple shards with multiple consumers in one "group", shard discovery

- Checkpointing feature to resume progress

- Leverages DynamoDB for coordination and checkpointing (one row per shard)

  * Make sure you provision enough WCU/RCU

  * Or use On-Demand for DynamoDB

  * Otherwise DynamoDB may slow down KCL

- Record processors will process the data

- ExpiredIteratorException -> increase WCU

 


Kinesis Connect Library (KCL)

- Older Java library (2016), leverages the KCL library

- Write data to: 

  * Amazon S3

  * DynamoDB

  * Redshift

  * ElasticSearch

 

- Kinesis Firehose replaces the Connector Library for a few of these targets, Lambda for the others

 

 

 

 

 


AWS Lambda sourcing from Kinesis

 

- AWS Lambda can source records from Kinesis Data Streams

- Lambda consumer has a library to de-aggregate record from the KPL

- Lambda can be used to run lightweight ETL to:

  * Amazon S3

  * DynamoDB

  * Redshift

  * ElasticSearch

  * Anywhere you want

- Lambda can be used to trigger notifications / send emails in real time

- Lambda has aconfiguratable batch size (more in Lambda section)

 


Kinesis Enhanced Fan Out

- New game-changing feature from August 2018

- Works with KCL 2.0 and AWS Lambda (Nov 2018)

- Each Consumer get 2MB/s of provisioned throughput per shard

- That means 20 consumers will get 40MB/s per shard aggregated

- No more 2MB/s limit!

- Enhanced Fan Out: Kinesis pushes data to consumers over HTTP/2

- Reduce latency (~70 ms)

 

 

 

 

 


Enhanced Fan-Out vs Standard Consumers

 

- Standard consumers:

  * Low number of consuming applications (1,2,3...)

  * Can tolerate ~200 ms latency

  * Minimize cost

 

- Enhanced Fan Out Consumers:

  * Multiple Consumer applications for the same Stream

  * Low Latency requirements ~70ms

  * Higher costs (see Kinesis pricing page)

  * Default limit of 5 consumers using enhanced fan-out per data stream

 


Kinesis Operations - Adding Shards

 

- Also called "Shard Splitting"

- Can be used to increase the Stream capacity (1MB/s data in per shard)

- Can be used to divide a "hot shard"

- The old shard is closed and will be deleted once the data is expire

 


Kinesis Operations - Merging Shards

 

- Decrease the Stream capacity and save costs

- Can be used to group two shards with low traffic

- Old shards are closed and deleted based on data expiration


Out-of-order records after resharding

- After a reshard, you can read from child shards

- However, data you haven't read yet could still be in the parent

- If you start reading the child before completing reading the parent, you could read data for a particular hash key out of order

- After a reshard, read entirely from the parent until you don't have new records

- Note: The Kinesis Client Library (KCL) has this logic already built-in, even after resharding operations

 

 

 

 

 

 


Kinesis Operations - Auto Scaling

 

- Auto Scaling은 Kinesis의 기본 기능이 아님

- 샤드 수를 변경하는 API 호출은 UpdateShardCount이다.

- AWS Lambda로 Auto Scaling을 구현할 수 있다.

- See: https://aws.amazon.com/blogs/big-data/scaling-amazon-kinesis-data-streams-with-aws-application-auto-scaling/

 

Scale Amazon Kinesis Data Streams with AWS Application Auto Scaling | Amazon Web Services

Recently, AWS launched a new feature of AWS Application Auto Scaling that let you define scaling policies that automatically add and remove shards to an Amazon Kinesis Data Stream. For more detailed information about this feature, see the Application Auto

aws.amazon.com


Kinesis Scaling Limitations

 

- Resharding은 병렬로 수행할 수 없다. 사전에 용량 계획이 필요하다.

- 한번에 하나의 Resharding 작업만 수행할 수 있으며 수행 시 몇초 걸림

- 1000개의 Shard의 경우 2000개의 샤드로 두배 늘리는데 30,000초(8.3시간) 걸림

- 다음을 할 수 없다.

 * 각 스트림의 롤링 24시간 기간에 대해 2배 이상 확장

 * 스트림의 현대 샤드 수를 두 배 이상으로 확장

 * 스티림에 대한 현재 샤드 수의 절반 이하로 축소

 * 스트림에서 최대 500개 이상의 샤드 확장

 * 결과가 500개 미만인 경우가 아니면 500개 이상의 샤드가 있는 스트림을 축소

 * 계정의 샤드 한도 이상으로 확장

 


Kinesis Security

 

- IAM Policy를 이용한 접근(Access)/승인(Authorization) 제어

- HTTPS Endpoint를 사용한 전송 중(In flight) 암호화

- KMS를 사용한 저장 데이터 암호화

- Client측 암호화는 수동으로 구현해야 함(어려움)

- Kinesis가 VPC 내에서 액세스할 수 있는 VPC 엔드포인트

 


Kinesis Data Streams - Handling Duplicates For Producers

 

- Producer retries can create duplicates due to network timeouts

- Although the two records have identical data, they also have unique sequence numbers

- Fix: embed unique record ID in the data to de-duplicate on the consumer side

 

 


Kinesis Data Streams - Handling Duplicates For Consumers

 

- Consumer Retry는 Application이 동일한 데이터를 두 번 읽게 할 수 있다.

- Record 프로세서가 다시 시작도리 때 Consumer Retry가 발생

 1. 작업자가 예기치 않게 종료됨

 2. 작업자 인스턴스 추가 또는 제거

 3. Shard가 병합되거나 분할됨

 4. 애플리케이션 배포

- Fiexes

 * Consume Application을 멱등성으로 만들기

 * 최종 목적지가 중복을 처리할 수 있는 경우 거기에서 수행하는 것이 좋음

- More info: https://docs.aws.amazon.com/ko_kr/streams/latest/dev/kinesis-record-processor-duplicates.html

 

중복 레코드 처리 - Amazon Kinesis Data Streams

이 페이지에 작업이 필요하다는 점을 알려 주셔서 감사합니다. 실망시켜 드려 죄송합니다. 잠깐 시간을 내어 설명서를 향상시킬 수 있는 방법에 대해 말씀해 주십시오.

docs.aws.amazon.com

 


Kinesis Data Firehose

 

 


AWS Kinesis Data Firehose

 

- 관리가 필요없는 완전 관리형 서비스

- 준 실시간(전체 배치가 아닌 경우 최소 대기 시간 60초)

- Redshift / Amazon S3 / ElasticSearch / Splunk로 데이터 로드

- 자동 Scaling

- 다양한 데이터 format 지원

- JSON에서 Parquet/ORC로 데이터 변환(S3만 해당)

- AWS Lambda를 통한 데이터 변환 (ex. CSV --> JSON)

- 대상이 Amazon S3(GZIP, ZIP 및 SNAPPU)인 경우 압축 지원

- GZIP만 데이터가 Redshift에 추가로 로드

- Firehose를 통해 전송되는 데이터 양만큼 지불

- Spark/KCL은 KDF에서 읽지 않음.

 


Kinesis Data Firehose Delivery Diagram

 

 


Firehose Buffer Sizing

 

- Firehose는 Buffer에 Record를 축적함

- Buffer는 시간 및 크기 규칙에 따라 플러시됨

 

- Buffer Size (ex: 32MB): 해당 버퍼 크기에 도달하면 Flush

- Buffer Time (ex: 2 minutes): 해당 시간에 도달하면 Flush

- Firehose는 Throughput을 증가시키기 위해 Buffer Size를 자동으로 늘릴 수 있다.

 

- High throughput ->  Buffer Size will be hit

- Low throughput -> Buffer Time will be hit

 


Kinesis Data Streams vs Firehose

 

- Streams

 * Going to write custom code (producer / consumer)

 * Custom Code 작성 예정 (Producer/Consumer)

 * 실시간 (Classic의 경우 200ms 이내 latency, 강화된 fan-out의 경우 70ms 이내의 latency)

 * 스케일링(Sharding Splitting / Merge)을 관리해야 함

 * 1~365일 동안의 데이터 저장, 재생 기능, 다중 소비자

 * ElasticSearch에 실시간으로 데이터를 삽입하기 위해 Lambda와 함께 사용

 

- Firehose

 * 완전 관리형, S3, Splunk, Redshift, ElasticSearch로 전송

 * Lambda를 사용한 서버리스 데이터 변환

 * 준 실시간(최저 버퍼 시간 1분)

 * 자동 스케일링

 * 데이터 저장 없음


CloudWatch Logs Subscriptions Filters

 

- You can stream CloudWatch Logs into

 * Kinesis Data Streams

 * Kinesis Data Firehose

 * AWS Lambda

 

- Using CloudWatch Logs Subscriptions Filters

 

- You can enable them using the AWS CLI

 


CloudWatch Logs Subscription Filter Patterns Near Real Time into Amazon ES


CloudWatch Logs Subscription Filter Pattern Real Time Load into Amazon ES

 


CloudWatch Logs Subscription Filter Patterns Real Time Analytics

 

 


What's a queue?

 


AWS SQS - Standard Queue

 

- Oldes offering (over 10 years old)

- Fully managed

- 초당 1개의 메시지에서 초당 10,000개의 메시지로 확장

- 기본 메시지 보관 기간: 4일, 최대 14일

- 대기열에 있을 수 있는 메시지 수에 제한이 없음

- 짧은 대기 시간 (게시 및 수신 시 < 10ms)

- 소비자 수에 따른 수평적 확장

- 중복 메시지가 있을 수 있음 (최소 한번 delivery, 경우에 따라 가끔)

- 순서가 잘못된 메시지를 가질 수 있음 (best effort ordering)

- 전송된 메시지당 256KB 제한

 


SQS - Producing Messages

- Define Body

- 메시지 속성 추가 (메타 데이터 - Optional)

- Delivery 지연 제공 (Optional)

- Get back

 * 메시지 식별자

 * 본문의 MD5 Hash

 

 

 

 

 

 


SQS - Consuming Messages

 

- Consumers...

- SQS Message 폴링(한 번에 최대 10개 메시지 수신)

- 가시성 제한 시간(Visibility Timeout) 내에 메시지 처리

- 메시지 ID 및 수신 핸들을 사용하여 메시지 삭제

 


AWS SQS - FIFO Queue

- Newer offering (First In - First out) - not available in all regions!

- 대기열의 이름은 .fifo로 끝나야 함.

- 더 낮은 처리량(배칭을 사용하여 초당 최대 3,000, 일괄 처리를 사용하지 않을 경우 300/s)

- Consumer가 메시지를 순서대로 처리

- Message는 정확히 한번만 전송

-" 중복 ID"를 사용한 5분 간격 중복 제거



 

 

 

 

 

 

 


SQS Extended Client

 

- 메시지 크기 제한은 256KB이다. 대용량 메시지는 어떻게 보내나?

- SQS Extended Client 사용(Java Library)

 


AWS SQS Use Cases

 

- 애플리케이션 분리 (예: 지불을 비동기식으로 처리하기 위해)

- 데이터베이스에 버퍼 쓰기 (예: 투표 애플리케이션)

- 들어오는 많은 양의 메시지 처리(예: 이메일 발신자)

- SQS는 CloudWatch를 통해 Auto Scaling과 통합 가능)

 


SQS Limits

 

- Maximum of 120,000 in-flight messages being processed by consumers
- Consumer가 처리하는 최대 120,000개의 전송 메시지 

- 일괄 요청에는 최대 10개의 메시지, 최대 256KB

- 메시지 내용은 XML, JSON 형식이 지정되지 않은 텍스트임.

- Standard queues는 무제한 TPS가 있다.

- FIFO queues 초당 최대 3,000개의 messages를 지원한다.(using batching)

- 최대 메시지 크기는 256KB(또는 확장 클라이언트 사용)

- 데이터는 1분에서 14일까지 보존

- Pricing: 

 * API Request당

 * Network 사용량 당

 


AWS SQS Security

 

- HTTPS endpoint를 사용한 전송 중 암호화

- KMS를 사용하여 SSE (Server Sied Encryption) 활성화 가능

 * 사용하고자 하는 CMK (Customer Master Key) 설정 가능

 * SSE는 메타데이터((message ID, timestamp, attributes)가 아닌 본문(the body)만 암호화

- IAM policy는 SQS 사용을 허용해야 함

- SQS queue 액세스 정책

 * IP에 대한 세분화된 제어

 * Request가 들어오는 시간 제어

 


Kinesis Data Stream vs SQS

 

Kinesis Data Stream SQS
- Data는 여러번 사용 가능 Queue, decouple applications
- Data는 보유 기간 후 삭제 Queue 하나당 하나의 애플리케이션
- 레코드의 순서가 보존됨(at the shard level) - 재생 중에도 Records are deleted after consumption (ack/fail)
- 동일한 스트림에서 독립적으로 읽는 여러 애플리케이션 구축 (Pub/Sub)

Messages are processed independently for standard queue
- "Streaming MapReduce" querying 기능 Ordering for FIFO queues
- Checkpointing needed to track progress of consumption Capability to "delay" messages
- Shards (capacity) must be provided ahead of time Dynamic scaling of load (no-ops)

Kinesis Data Streams vs SQS

 

  Kinesis Data Stream Kinesis Data Firehose Amazon SQS Standard Amazon SQS FIFO
Managed by AWS yes yes yes yes
Ordering Shard/Key No No Specify Group ID
Delivery At least once At lease once At least once Exactly Once
Replay Yes No No No
Max Data Retention 365 days No 14 days 14 days
Scaling Provision Shards:
1MB/s producer
2MB/s consumer
No limit No limit ~3000 messages per second with batching (soft limit)
Max Object Size 1MB 128MB at destination 256KB (more if using extended lib) 256KB (more if using extended lib)

 


SQS vs Kinesis - Use Cases

 

- SQS Use cases:

 * Order processing

 * Image Processing

 * Auto Scaling queues according to messages.

 * Buffer and Batch messages for future processing.

 * Request Offloading

 

- Amazon Kinesis Data Streams Use cases:

 * Fast log and event data collection and processing

 * Real Time metrics and reports

 * Mobile data capture

 * Real Time data analytics

 * Gaming data feed

 * Complex Stream Processing

 * Data Feed from "Internet of Thins"