동적 파티셔닝엔 JQ와 Lambda를 사용하는 방법 2가지가 있다.
JSON을 위한 인라인 구문 분석 옵션을 활성화한다. 다음과 같이 활용할 수 있다.
# 데이터 1
{
"type": {
"device": "phone",
"event": "unlock"
},
"id": 1,
"event_timestamp": 1565382027
}
# 표현식 1
device .type.device
year .event_timestamp| strftime("%Y")
month .event_timestamp| strftime("%m")
day .event_timestamp| strftime("%d")
hour .event_timestamp| strftime("%H")
# 데이터 2
{"timestamp": "2001-01-01 01:01:01.123123"}
# 표현식 2
year .timestamp | strptime("%Y-%m-%d %H:%M:%S%Z") | strftime("%Y")
month .timestamp | strptime("%Y-%m-%d %H:%M:%S%Z") | strftime("%m")
day .timestamp | strptime("%Y-%m-%d %H:%M:%S%Z") | strftime("%d")
hour .timestamp | strptime("%Y-%m-%d %H:%M:%S%Z") | strftime("%H")
# S3 버킷 접두사
year=!{partitionKeyFromQuery:year}/month=!{partitionKeyFromQuery:month}/day=!{partitionKeyFromQuery:day}/hour=!{partitionKeyFromQuery:hour}/
2000-01-01 20:01:01.123
import base64
import json
import datetime
# Signature for all Lambda functions that user must implement
def lambda_handler(firehose_records_input, context):
firehose_records_output = {'records': []}
for firehose_record_input in firehose_records_input['records']:
payload = base64.b64decode(firehose_record_input['data'])
json_value = json.loads(payload)
time_format = "%Y-%m-%d %H:%M:%S.%f"
print(datetime.datetime.strptime(json_value["time"], time_format))
# 사용할 땐 payload 데이터를 추출해야됨
partition_keys = {
'year': datetime.datetime.strptime(json_value["time"], time_format).year,
'month': datetime.datetime.strptime(json_value["time"], time_format).month,
'day': datetime.datetime.strptime(json_value["time"], time_format).day,
'hour': datetime.datetime.strptime(json_value["time"], time_format).hour
}
firehose_record_output = {'recordId': firehose_record_input['recordId'],
'data': firehose_record_input['data'],
'result': 'Ok',
'metadata': { 'partitionKeys': partition_keys }}
firehose_records_output['records'].append(firehose_record_output)
return firehose_records_output
# S3 버킷 접두사
year=!{partitionKeyFromLambda:year}/month=!{partitionKeyFromLambda:month}/day=!{partitionKeyFromLambda:day}/hour=!{partitionKeyFromLambda:hour}/