Amazon Kinesis Data Streams
또는 Direct PUT
Amazon OpenSearch, Amazon Redshift, Amazon S3
등Lambda를 사용해 데이터 레코드를 변환, 필터링, 압축 해제, 처리할 수 있다.
Lambda로 오는 event 객체 구조는 다음과 같다.
{
"invocationId": "18f26268-95e5-404d-a04e-7787823e01d0",
"sourceKinesisStreamArn": "arn:aws:kinesis:ap-northeast-2:838235863925:stream/MyStream",
"deliveryStreamArn": "arn:aws:firehose:ap-northeast-2:838235863925:deliverystream/KDS-S3-cwDTA",
"region": "ap-northeast-2",
"records": [
{
"recordId": "49642604398712332168934069598817577707763034311748485122000000",
"approximateArrivalTimestamp": 1689310666565,
"data": "eyJ0aW1lc3RhbXAiOiIxMDAwLTEwLTExIiwibWVzc2FnZSI6Imdvb2RieWVcbuqwmeydgCDsnbTrsqTtirjsl5Ag7Y+s7ZWoIDFcbuqwmeydgCDsnbTrsqTtirjsl5Ag7Y+s7ZWoIDJcbuqwmeydgCDsnbTrsqTtirjsl5Ag7Y+s7ZWoIDMifQo=",
"kinesisRecordMetadata": {
"sequenceNumber": "49642604398712332168934069598817577707763034311748485122",
"subsequenceNumber": 0,
"partitionKey": "36315.0502983276",
"shardId": "shardId-000000000000",
"approximateArrivalTimestamp": 1689310666565,
},
},
...
]
}
레코드를 변환하는 Lambda 예제 코드이다.
import json
import base64
import datetime
def lambda_handler(event, context):
output = []
for record in event['records']:
payload = base64.b64decode(record['data']).decode()
json_value = json.loads(payload)
print(payload)
partition_keys = {
}
output_record = {
'recordId': record['recordId'],
'result': 'Ok', # 혹은 DeliveryFailed, Dropped
'data': base64.b64encode(json.dumps(json_value).encode()),
'metadata': { 'partitionKeys': partition_keys }
}
output.append(output_record)
return {'records': output}
AWS Glue를 통해 JSON 형식의 데이터를 Apache Parquet 또는 Apache ORC로 변환할 수 있다. 데이터가 JSON이 아닐 경우, Lambda를 사용해 데이터를 변환해야 한다.
[
{
"Name": "timestamp",
"Type": "string",
"Comment": ""
},
{
"Name": "message",
"Type": "string",
"Comment": ""
}
]
yyyy/mm/dd/hh (UTC 기준)
접두사를 추가한다. 이 앞에 사용자 지정 접두사를 추가할 수 있다. ex) my-server/