SQS, Lambda를 이용해 문자전송하기(1부)

들어가며

이 포스팅은 전체적으로 Get Started의 성격을 띄며 3부로 나눠져 있습니다.
좌측 사이드바에서 목차 빠른 이동이 가능합니다.

1부

  • 하고자 하는 것
  • SQS란?
  • 아키텍쳐
    • Skill Set
  • 구축 과정
    • SQS 생성 및 설정
      • SQS 큐 생성
      • SQS 큐 설정
      • SQS 권한설정
    • Lambda 생성 및 SQS로 메세지 전송하기
      • Lambda 함수생성
      • Lambda 함수에 SQS, RDS 권한 추가
      • SQS에 메세지 전송하기
    • Lambda layer로 DB 조회를 위한 외부 라이브러리 추가

2부

  • Lambda에서 EC2, RDS등 내부 서비스 접근 문제 해결하기
    • VPC설정으로 접근하기
    • VPC 란?
    • Lambda 함수에 VPC 셋팅하기
  • SQS에 메세지를 보내기 위한 VPC 설정
    • VPC 추가한 상태로 외부 인터넷에 요청을 보내기 위한 방법들
    • VPC에 SQS 엔드포인트 추가
    • SQS에 메세지 전송하기
  • Cloud Watch Event로 Cron Job 붙이기
    • 주의할 점
  • SQS 큐에 람다 트리거 함수 붙이기

3부

  • VPC 설정되어 있는 람다함수가 외부 인터넷에 요청 보내기
    • API Gateway
    • 생성 및 설정
  • 예외처리를 위한 DLQ 설정
  • 운영 하면서 겪은 문제들
    • 요금이 이렇게 나오다니?
    • 람다 타임아웃
    • 잘못된 DLQ 설정
    • 에러 핸들링
  • 예약문자 서비스를 위한 가장 좋은 설계
    • MQ
  • 총평

하고자 하는 것

외부 문자발송 API에 예약문자 발송 요청을 보내는 작업을 해야합니다. 단 조건이 있습니다.

  • 예약문자가 발송되기 전이면 취소가 가능해야 합니다.
  • 분할전송되는 문자의 순서를 보장하려고 합니다.
    분할전송되는 문자 순서의 경우, 순서에 맞게 요청을 보내도 문자 발송 API측에서 요청이 동시에 들어올 경우 순서를 섞어 발송합니다.
    분할문자의 순서가 섞이는 것을 방지하려면 많은 방법이 있지만 그 중 SQS의 DelaySeconds 옵션을 사용해 해결할 것 입니다.

Amazon SQS란?

SQS(Simple Queue Service)는 분산 메세지 큐 서비스로 메세지에 대한 수신, 송신만이 가능한 큐 입니다.
인터넷을 통해 통신하며 웹 서비스 응용 프로그램을 통해 메세지를 전송할 수 있습니다.
풀링(polling) 방식으로 동작하기 때문에 수신자는 매번 풀링하는 식으로 로직을 구성해야합니다.
폴링은 유연하게 동작합니다.
메세지 보관기간 지정, 전송 지연시간 지정, 실패한 작업을 배달못한 편지 대기열(DeadQueue)로 전송할 수 있는 기능등이 있습니다.
백만 요청당 가격은 표준 : $ 0.40 / FIFO : $ 0.50 로 저렴한 편입니다.
메세지는 최대 10개, 256KB 배치 송수신이 가능하며 SQS 배치 API 작업으로 비용을 절감할 수 있습니다.
대기열 종류로는 표준과 FIFO를 제공합니다.

표준 FIFO
처리량 무제한으로 초당 트랜잭션(TPS)수 지원 일괄처리를 통해 초당 3000개 혹은 API작업별로 300개 메세지를 지원
전달 최소 한번 전달이지만 가끔 두번이상 전달되기도 함 정확히 한번 전달
순서 순서보장 안됨 순서보장 됨
람다 Lambda 트리거 가능 Lambda 트리거 불가능 19/11/20 이후로 가능하게 변경되었습니다 [링크]

여기서 중요하게 봐야될 점은 순서보장과 Lambda 트리거 가능 여부입니다.
FIFO는 Lambda 트리거를 지원하지 않았습니다.
하지만 2019/11/20 이후로 FIFO에서도 람다 트리거를 사용할 수 있도록 변경되었습니다.[관련 링크]
이 포스팅을 작성할 당시 FIFO에서는 람다트리거를 사용할 수 없었기 때문에 저는 표준 대기열을 선택했지만, 순서가 중요하거나 한번만 전달되어야한다면 FIFO를 선택하시는게 좋습니다.

추가1)
SQS 트리거에 Lambda를 붙일 경우, Lambda는 연속적으로 SQS대기열을 장기 풀링합니다.
Lambda는 풀링을 자동조절 합니다.
대기열이 비어있을땐 5개의 병렬 폴링 연결을 하며, 처리중인 메세지 수가 증가할 경우 메세지 요청을 20회, 동시성을 분당 60회 증가시킵니다.
대기열이 계속 분주할 경우 동시성이 제한에 다다를 때 까지 요청과 동시성은 계속 증가됩니다.
처리중인 메세지가 하락중일 경우엔 분당 요청을 10회, 동시성을 30회씩 감소시킵니다.
만약 동시성등을 더 증가시켜야 한다면 aws측에 문의를 해서 제한을 풀어야합니다.

추가2)
Lambda와 SQS는 반드시 같은 AWS 리전에 있어야 합니다.
하나 이상의 Lambda함수에 하나의 SQS대기열만 연결할 수 있습니다. [관련 링크]

아키텍쳐

기술스택

  • Python 3.7
  • AWS Lambda
  • AWS SQS
  • AWS API Gateway
  • AWS VPC
  • AWS RDS
  • AWS CloudWatch

arch

최종적으로 위와같은 구조가 됩니다

각 서비스에 대한 설명은 다음과 같습니다.
send sqs func : 매 분마다 지금으로부터 발송 1분전의 시간으로 DB조회를 하고 delay를 계산해 SQS에 메세지 송신을 하는 람다 함수입니다.
sqs trigger func : SQS의 람다 트리거 함수입니다. API Gateway로 요청을 보내고 응답을 받아서 DB(RDS)에 저장합니다.
main queue : send sqs로부터 메세지를 수신한 뒤 sqs trigger가 주기적으로 풀링해 메세지를 가져옵니다.
API Gateway : 내부 서비스가 외부 인터넷으로 요청을 보낼수 있게 도와줍니다.

플로우는 다음과 같습니다.

  1. send_sqs_func가 rds에 1분후에 보내야될 데이터를 조회하는 요청을 보냅니다
  2. 조회된 데이터로 SQS 전송 delay시간을 조정해 같은 사람에게 보낼 문자의 순서가 섞이지 않도록 값을 셋팅후 SQS로 메세지를 전송합니다.
  3. SQS에 쌓여있는 데이터를 sqs_trigger_func가 지속적으로 풀링해 가져옵니다.
  4. sqs_trigger_func 에서 Api Gateway를 통해 외부 문자 API로 요청을 보냅니다.
  5. Exception이 발생한 경우 dead_queue로 해당 메세지를 보냅니다.

그럼 바로 예제에 들어가도록 하겠습니다.

구축과정

SQS 생성 및 설정

우선 SQS를 생성하겠습니다. 위 아키텍쳐에서 설명했던 것과 같이 두개의 SQS 큐를 생성할 것 입니다.

sqs-create1
우선 표준대기열로 main queue를 생성합니다.

sqs-create2
우측 하단의 대기열 구성에서 세부적인 속성을 제어할 수 있습니다.

아직 속성을 설정하지 않을 것이므로 속성을 기본값으로 놔둔 뒤 대기열을 생성합니다.
main queue와 동일한 방법으로 dead queue를 생성합니다.
이제 생성된 큐 중 main queue 의 속성 중 리드라이브 정책 사용을 활성화 합니다.

SQS 큐 설정

sqs-config1
sqs-config2
배달못한 편지 대기열은 dead_queue로 설정 뒤 변경사항을 저장합니다.

SQS의 기본설정은 short polling입니다. 만약 비용을 줄이고 싶다면 메시지 수신 대기시간을 조정해야 합니다.
다음과 같은 경우 short polling 입니다:

WaitTimeSeconds이 0으로 설정되어있는 경우
ReceiveMessageWaitTimeSeconds이 0으로 설정되어 있는 경우

SQS의 long polling 과 short polling에 대해 더 알고 싶다면 [공식문서]를 참고해주세요.

SQS 권한 설정

sqs가 외부로부터 메세지를 수신받기 위해선 다음과 같은 권한이 필요합니다:

sqs:ChangeMessageVisibility
sqs:DeleteMessage
sqs:GetQueueAttributes
sqs:ReceiveMessage

권한 설정은 하단의 권한 탭에서 추가할 수 있습니다.

sqs-auth1

권한 추가 버튼을 클릭합니다.

sqs-auth2

프린시펄이 있다면 설정합니다.

권한설정 후 하단의 권한추가로 완료합니다.

Lambda 생성 및 SQS로 메세지 전송하기

send sqs func, sqs trigger func 라는 두개의 람다 함수를 만들 것입니다.
우선 send sqs func를 만들어보겠습니다.

lambda-create
새 역할생성을 선택한 뒤 함수를 생성합니다.

Lambda 함수에 SQS, RDS 권한 추가

send sqs func는 위 아키텍쳐 섹션에서 설명했듯이 DB 조회결과를 sqs 큐로 보내는 작업을 하는 함수입니다.
따라서 정책에 SQS와 RDS권한을 추가합니다.
생성된 람다함수에 권한을 추가하기 위해
함수코드 아래에 그림과 같이 실행역할 > 기존역할 > send-sqs-func-role-0opyfljq 역할을 확인하십시오.를 클릭합니다.

lambda-auth1
lambda-auth2
lambda-auth3
권한이 성공적으로 추가되었다면 send sqs func 페이지에서 새로고침을 합니다.

lambda-auth4
성공적으로 추가가 되었습니다.

SQS에 메세지 전송하기

SQS의 main queue에 메세지를 보내는 코드를 lambda_function에 작성합니다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
import json
import os
import boto3
import logging

from botocore.exceptions import ClientError


def lambda_handler(event, context):
_params = {'message': "hello"}

msg_body = json.dumps(_params)
msg = send_sqs_message(os.environ['SQS_QUEUE'], msg_body)
return msg


def send_sqs_message(sqs_queue_url, msg_body):
"""
:param sqs_queue_url: String URL of existing SQS queue
:param msg_body: String message body
:return: Dictionary containing information about the sent message. If
error, returns None.
"""

# Send the SQS message
sqs_client = boto3.client('sqs')
try:
msg = sqs_client.send_message(QueueUrl=sqs_queue_url,
MessageBody=msg_body)
except ClientError as e:
logging.error(e)
return None
return msg

boto3 란?
python용 aws sdk 입니다.
람다에선 실행환경의 일부로 aws sdk의 여러버전이 포함되어 있으므로 굳이 설치하지 않아도 됩니다.

sqs-send1.1
함수코드 하단의 환경변수에서 SQS_QUEUE를 추가합니다.

sqs-send1
환경변수 SQS_QUEUE키에대한 값은 SQS main_queue의 URL 이며 SQS대기열 세부정보에서 확인할 수 있습니다.

sqs-send2
람다 테스트는 기본 이벤트를 선택한 후 다음과 같이 생성합니다.
오른쪽 상단의 save 후 테스트를 실행합니다.

sqs-send3

sqs-send4
SQS에서 방금 송신한 메세지가 main queue에 있는것을 확인할 수 있습니다.

Lambda layer로 DB 조회를 위한 외부 라이브러리 추가

이제 RDS에 접근해 DB커넥션을 생성해보도록 하겠습니다.
외부 라이브러리인 pymysql을 람다에서 사용해야합니다.
우선 인라인 편집에서 DB 커넥션을 생성하는 부분을 lambda_function.py에 작성합니다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
import json
import os
import boto3
import logging
import pymysql

from botocore.exceptions import ClientError


def lambda_handler(event, context):
get_connection()
_params = {'message': "hello"}

msg_body = json.dumps(_params)
msg = send_sqs_message(os.environ['SQS_QUEUE'], msg_body)
return msg


def send_sqs_message(sqs_queue_url, msg_body):
"""
:param sqs_queue_url: String URL of existing SQS queue
:param msg_body: String message body
:return: Dictionary containing information about the sent message. If
error, returns None.
"""

# Send the SQS message
sqs_client = boto3.client('sqs')
try:
msg = sqs_client.send_message(QueueUrl=sqs_queue_url,
MessageBody=msg_body)
except ClientError as e:
logging.error(e)
return None
return msg


def get_connection():
connection = pymysql.connect(host=os.environ['DB_ADDR'],
user=os.environ['DB_HOST'],
password=os.environ['DB_PASSWD'],
db=os.environ['DB_NAME'],
charset='utf8mb4',
autocommit=True,
cursorclass=pymysql.cursors.DictCursor)
return connection

이제 람다 레이어에 사용할 외부 라이브러러를 추가할 것입니다.
람다 레이어 설정에 대한 내용은 이 포스팅을 참고해주세요 : [AWS Lambda Layers로 함수 공통용 Python 패키지 재사용하기]
람다 레이어에 올리기전 외부 라이브러리를 패키징할 때 주의할 점이 있습니다.
압축을 풀었을 시 패키지들의 상위 폴더 이름은 python 이어야 합니다

1
2
3
-python
- 패키지폴더 1
- 패키지폴더 2

그렇지 않으면 라이브러리 import가 되지 않는다는 오류가 발생합니다.
람다 레이어에 올린 라이브러리 압축파일은 aws측에서 내부적으로 압축 해제 후 다음과 같이 /opt 아래에 둡니다.

/opt/python/패키지1

따라서 패키지의 상위 폴더 이름은 python 이어야 합니다.

opt/python/lib/python3.7/dist-package/

도 가능하기 때문에 패키지들의 상위 폴더가 /python/lib/python3.7/dist-package/ 여도 됩니다.
마찬가지로 node등은 node로 상위 폴더 이름을 설정합니다.

sqs-db2
프로젝트 파일을 zip으로 압축한 뒤 code entry type > .zip 파일 업로드로 방금 생성한 프로젝트 파일을 업로드합니다
(aws cli를 사용중이라면 좀 더 편하게 배포할 수 있습니다)
DB 접속정보등을 환경변수에 셋팅 후 상단의 save를 하고 테스트를 실행합니다.
만약 RDS가 모든 접근을 허용한다면 아무 오류없이 성공을 출력할 것 입니다.
하지만 RDS 인바운드에 몇가지 접근에 대해서만 허용해뒀다면 다음과 같이 타임아웃이 발생합니다.
sqs-db3
왜 그런걸까요? Lambda 함수가 보안그룹 설정이 되어있는 RDS에 접근할 수 없기 때문입니다.
lambda 함수가 왜 RDS에 접근을 할 수 없는지에 대해 알기위해선 먼저 VPC에 대해서 알아야 합니다.

2부에선 VPC가 무엇이며 RDS 및 SQS 접근 문제를 해결해보도록 하겠습니다.

2부에 계속..


읽어주셔서 감사합니다. 혹 궁금한게 있으시거나 오류가 있다면 댓글 남겨주세요🙆