동적 파티셔닝엔 JQ와 Lambda를 사용하는 방법 2가지가 있다.

JQ

JSON을 위한 인라인 구문 분석 옵션을 활성화한다. 다음과 같이 활용할 수 있다.

# 데이터 1
{
  "type": {
    "device": "phone",
    "event": "unlock"
  },
  "id": 1,
  "event_timestamp": 1565382027
}

# 표현식 1
device .type.device
year .event_timestamp| strftime("%Y")
month .event_timestamp| strftime("%m")
day .event_timestamp| strftime("%d")
hour .event_timestamp| strftime("%H")

# 데이터 2
{"timestamp": "2001-01-01 01:01:01.123123"}

# 표현식 2
year .timestamp | strptime("%Y-%m-%d %H:%M:%S%Z") | strftime("%Y")
month .timestamp | strptime("%Y-%m-%d %H:%M:%S%Z") | strftime("%m")
day .timestamp | strptime("%Y-%m-%d %H:%M:%S%Z") | strftime("%d")
hour .timestamp | strptime("%Y-%m-%d %H:%M:%S%Z") | strftime("%H")
# S3 버킷 접두사
year=!{partitionKeyFromQuery:year}/month=!{partitionKeyFromQuery:month}/day=!{partitionKeyFromQuery:day}/hour=!{partitionKeyFromQuery:hour}/

Lambda

2000-01-01 20:01:01.123

import base64
import json
import datetime
 
# Signature for all Lambda functions that user must implement
def lambda_handler(firehose_records_input, context):
 
    firehose_records_output = {'records': []}
 
    for firehose_record_input in firehose_records_input['records']:

        payload = base64.b64decode(firehose_record_input['data'])
        json_value = json.loads(payload)

        time_format = "%Y-%m-%d %H:%M:%S.%f"
        
        
        print(datetime.datetime.strptime(json_value["time"], time_format))
        
        
        # 사용할 땐 payload 데이터를 추출해야됨
        partition_keys = {
            'year': datetime.datetime.strptime(json_value["time"], time_format).year,
            'month': datetime.datetime.strptime(json_value["time"], time_format).month,
            'day': datetime.datetime.strptime(json_value["time"], time_format).day,
            'hour': datetime.datetime.strptime(json_value["time"], time_format).hour
        }

        firehose_record_output = {'recordId': firehose_record_input['recordId'],
                                  'data': firehose_record_input['data'],
                                  'result': 'Ok',
                                  'metadata': { 'partitionKeys': partition_keys }}
 
        firehose_records_output['records'].append(firehose_record_output)

    return firehose_records_output
# S3 버킷 접두사
year=!{partitionKeyFromLambda:year}/month=!{partitionKeyFromLambda:month}/day=!{partitionKeyFromLambda:day}/hour=!{partitionKeyFromLambda:hour}/