동적 파티셔닝

파티셔닝 + 데이터 변환

Kinesis Date Firehose

레코드 변형 및 변환

AWS Lambda를 사용하여 소스 레코드 변형

Lambda를 사용해 데이터 레코드를 변환, 필터링, 압축 해제, 처리할 수 있다.

Lambda로 오는 event 객체 구조는 다음과 같다.

{
    "invocationId": "18f26268-95e5-404d-a04e-7787823e01d0",
    "sourceKinesisStreamArn": "arn:aws:kinesis:ap-northeast-2:838235863925:stream/MyStream",
    "deliveryStreamArn": "arn:aws:firehose:ap-northeast-2:838235863925:deliverystream/KDS-S3-cwDTA",
    "region": "ap-northeast-2",
    "records": [
        {
            "recordId": "49642604398712332168934069598817577707763034311748485122000000",
            "approximateArrivalTimestamp": 1689310666565,
            "data": "eyJ0aW1lc3RhbXAiOiIxMDAwLTEwLTExIiwibWVzc2FnZSI6Imdvb2RieWVcbuqwmeydgCDsnbTrsqTtirjsl5Ag7Y+s7ZWoIDFcbuqwmeydgCDsnbTrsqTtirjsl5Ag7Y+s7ZWoIDJcbuqwmeydgCDsnbTrsqTtirjsl5Ag7Y+s7ZWoIDMifQo=",
            "kinesisRecordMetadata": {
                "sequenceNumber": "49642604398712332168934069598817577707763034311748485122",
                "subsequenceNumber": 0,
                "partitionKey": "36315.0502983276",
                "shardId": "shardId-000000000000",
                "approximateArrivalTimestamp": 1689310666565,
            },
        },
        ...
    ]
}

레코드를 변환하는 Lambda 예제 코드이다.

import json
import base64
import datetime

def lambda_handler(event, context):
    output = []

    for record in event['records']:
        payload = base64.b64decode(record['data']).decode()
        json_value = json.loads(payload)
        print(payload)

        partition_keys = {
        
        }

        output_record = {
            'recordId': record['recordId'],
            'result': 'Ok', # 혹은 DeliveryFailed, Dropped
            'data': base64.b64encode(json.dumps(json_value).encode()),
            'metadata': { 'partitionKeys': partition_keys }
        }
        output.append(output_record)

    return {'records': output}

레코드 형식 변환

AWS Glue를 통해 JSON 형식의 데이터를 Apache Parquet 또는 Apache ORC로 변환할 수 있다. 데이터가 JSON이 아닐 경우, Lambda를 사용해 데이터를 변환해야 한다.

[
  {
    "Name": "timestamp",
    "Type": "string",
    "Comment": ""
  },
  {
    "Name": "message",
    "Type": "string",
    "Comment": ""
  }
]

대상 설정