0. 목표

  • SAM 활용하여 CANedge 데이터 전처리 과정 자동화(최종 목표)
  • Part 2 문제점 개선하기(Lambda 중복 실행 해결, One input ▶ One output)
  • DynamoDB에 데이터 추가
  • DynamoDB 데이터를 Athena에서 조회

1. SAM 구성

Part 2 문제점을 개선한 SAM 구성 요소를 소개합니다.

1.1 app.py

app.py는 3개의 함수로 구성하였습니다.

import mdf_iter
...
def upload_file(file_name, bucket, object_name=None):
    ...
    return True
def ratio(s1, s2):
    return s2 / s1 if s1 else np.nan
def lambda_handler(event, context):
    bucket = event["Records"][0]["s3"]["bucket"]["name"]
    ...
    upload_file(f"/tmp/{object_key.split('/')[-1]}", bucket, object_key)
    print(df_phys_join)
1.1.1 import

app.py를 실행하는데 필요한 모듈을 불러옵니다.

import mdf_iter
import canedge_browser
import can_decoder
import pandas as pd
import numpy as np
from datetime import datetime, timezone
from utils import (
    setup_fs,
    load_dbc_files,
    restructure_data,
    add_custom_sig,
    ProcessData,
)
import logging
import boto3
from botocore.exceptions import ClientError
1.1.2 S3 업로드

Python SDK를 활용하여 S3에 csv 파일을 업로드합니다.

def upload_file(file_name, bucket, object_name=None):
    """Upload a file to an S3 bucket
    :param file_name: File to upload
    :param bucket: Bucket to upload to
    :param object_name: S3 object name. If not specified then file_name is used
    :return: True if file was uploaded, else False
    """
    # If S3 object_name was not specified, use file_name
    if object_name is None:
        object_name = file_name
    # Upload the file
    s3_client = boto3.client("s3")
    try:
        response = s3_client.upload_file(file_name, bucket, object_name)
    except ClientError as e:
        logging.error(e)
        return False
    return True
1.1.3 lambda_handler

Lambda에 파일을 생성하려면 경로를 반드시 /tmp/로 지정하여야합니다.

하나의 MDF4 파일만 변환하여 CSV 파일로 변환한 후 업로드합니다.

(기존의 코드는 폴더 속 모든 MDF4 파일을 불러와 데이터프레임에 추가합니다.)

def ratio(s1, s2):
    return s2 / s1 if s1 else np.nan
def lambda_handler(event, context):
    bucket = event["Records"][0]["s3"]["bucket"]["name"]
    object_key = f'{event["Records"][0]["s3"]["object"]["key"].split(".")[0]}.csv'
    print(bucket)
    # specify devices to process (from local/S3), DBC files and start time
    devices = [f"{bucket}/LOG/958D2219"]
    dbc_paths = ["dbc_files/CSS-Electronics-SAE-J1939-DEMO.dbc"]
    start = datetime(year=2020, month=1, day=13, hour=0, tzinfo=timezone.utc)
    # setup filesystem (local/S3), load DBC files and list log files for processing
    fs = setup_fs(s3=True, endpoint="http://s3.ap-southeast-1.amazonaws.com")
    db_list = load_dbc_files(dbc_paths)
    log_file = f'{bucket}/{event["Records"][0]["s3"]["object"]["key"]}'
    # --------------------------------------------
    # perform data processing of each log file
    proc = ProcessData(fs, db_list, signals=[])
    df_phys_all = pd.DataFrame()
    df_raw, device_id = proc.get_raw_data(log_file)
    df_phys = proc.extract_phys(df_raw)
    proc.print_log_summary(device_id, log_file, df_phys)
    df_phys_all = df_phys_all.append(df_phys)
    # --------------------------------------------
    # example: Add a custom signal
    df_phys_all = add_custom_sig(
        df_phys_all, "WheelBasedVehicleSpeed", "EngineSpeed", ratio, "RatioRpmSpeed"
    )
    # --------------------------------------------
    # example: resample and restructure data (parameters in columns)
    df_phys_join = restructure_data(df_phys=df_phys_all, res="1S")
    df_phys_join.to_csv(f"/tmp/{object_key.split('/')[-1]}")
    upload_file(f"/tmp/{object_key.split('/')[-1]}", bucket, object_key)
    print(df_phys_join)
1.2 결과

MF4 파일을 업로드한 다음의 결과입니다.

MF4 파일에 저장된 값을 변환하여 같은 이름의 csv 파일로 저장합니다.

2. DynamoDB

Dashboard에 사용할 값(ex 과거 통계 데이터)을 가공하여 DynamoDB에 저장합니다.

2.1 Function

2.1.1 Create Table

테이블을 생성하는 함수입니다.

Attribute Definitions에 정의한 특성과 Key schema에 정의한 특성은 동일하여야 합니다.

특성 Year는 Partition key, 특성 Month는 Sort Key라고 부릅니다.

def dynamodb_create_table(name):
    import boto3
    client = boto3.client("dynamodb")
    # DynamoDB Options
    attribute_definitions = [
        {"AttributeName": "Year", "AttributeType": "N"},
        {"AttributeName": "Month", "AttributeType": "N"},
    ]
    table_name = name
    key_schema = [
        {"AttributeName": "Year", "KeyType": "HASH"},
        {"AttributeName": "Month", "KeyType": "RANGE"},
    ]
    provisioned_throughput = {"ReadCapacityUnits": 1, "WriteCapacityUnits": 1}
    response = client.create_table(
        AttributeDefinitions=attribute_definitions,
        TableName=table_name,
        KeySchema=key_schema,
        ProvisionedThroughput=provisioned_throughput,
    )
    return response["TableDescription"]

2.1.2 List Table

전체 테이블 리스트를 불러옵니다

def dynamodb_list_table():
    import boto3
    client = boto3.client("dynamodb")
    response = client.list_tables()
    return response["TableNames"]

2.1.3 Describe Table

테이블 상세정보를 가져옵니다.

def dynamodb_describe_table(name):
    import boto3
    client = boto3.client("dynamodb")
    response = client.describe_table(TableName=name)
    return response["Table"]

2.1.4 Get Item

테이블 내의 아이템을 조회합니다.

def dynamodb_get_item(name, year, month):
    import boto3
    key = {
        "Year": {"N": str(year)},
        "Month": {"N": str(month)},
    }
    client = boto3.client("dynamodb")
    response = client.get_item(TableName=name, Key=key)
    try:
        return response["Item"]
    except:
        return False

2.1.5 Create Statisctical

추가할 아이템 변수를 정의합니다.

Dataframe에서 날짜별로 각 항목들의 합을 저장합니다.

만약 기존의 항목이 없다면 새로 생성하며, 있는 경우에는 기존 값을 불러와 값을 더합니다.

def create_statistical_item(name, items):
    import pandas as pd
    from datetime import datetime
    # Dataframe Size
    column_size = items.columns.size
    # Convert Dataframe Timestamp -> datetime.datetime
    first_timestamp = items.index[0].to_pydatetime()
    item = dynamodb_get_item(
        name, str(first_timestamp.year), str(first_timestamp.month)
    )
    if not item:
        item = {
            "Year": {"N": str(first_timestamp.year)},
            "Month": {"N": str(first_timestamp.month)},
            "Day": {"M": {}},
        }
    if str(first_timestamp.day) not in item["Day"]["M"].keys():
        item["Day"]["M"][str(first_timestamp.day)] = {"M": {}}
        for index in range(column_size):
            item["Day"]["M"][str(first_timestamp.day)]["M"][items.columns[index]] = {
                "N": str(items.sum()[index])
            }
    else:
        for index in range(column_size):
            item["Day"]["M"][str(first_timestamp.day)]["M"][items.columns[index]] = {
                "N": str(
                    float(
                        item["Day"]["M"][str(first_timestamp.day)]["M"][
                            items.columns[index]
                        ]["N"]
                    )
                    + items.sum()[index]
                )
            }
    return item

변경사항(2021.05.31)

Day 항목의 타입을 리스트로 바꾸는 과정에서 일부 코드 수정이 있었습니다.

아래 결과가 다르게 보일 수 있으나,  진행 과정은 변함없습니다. 참고바랍니다.

def create_statistical_item(name, items):
    import pandas as pd
    from datetime import datetime
    # Dataframe Size
    row_size = items.index.size
    column_size = items.columns.size
    pre_item = {"Count": {"N": str(row_size)}}
    # Convert Dataframe Timestamp -> datetime.datetime
    first_timestamp = items.index[0].to_pydatetime()
    last_timestamp = items.index[-1].to_pydatetime()
    item = dynamodb_get_item(
        name, str(first_timestamp.year), str(first_timestamp.month)
    )
    if not item:
        item = {
            "Year": {"N": str(first_timestamp.year)},
            "Month": {"N": str(first_timestamp.month)},
            "Day": {"L": []},
        }
    """
    "Day" : {
        {"L" : [
            {"M": {
                "Day" : {
                    {"N" : 13}
                },
                "WheelBasedVehicleSpeed" : {
                    {"N" : 150000}
                }
                ...
            }
          }
        ]
      }
    }
    """
    day_set = [obj["M"]["Day"]["N"] for obj in item["Day"]["L"]]
    if str(first_timestamp.day) not in day_set:
        for index in range(column_size):
            pre_item[items.columns[index]] = {"N": str(items.sum()[index])}
        pre_item["Day"] = {"N": str(first_timestamp.day)}
        item["Day"]["L"].append({"M": pre_item})
    else:
        for content in item["Day"]["L"]:
            if content["M"]["Day"]["N"] == str(first_timestamp.day):
                content["M"]["Count"]["N"] = str(
                    float(content["M"]["Count"]["N"]) + row_size
                )
                for index in range(column_size):
                    content["M"][items.columns[index]]["N"] = str(
                        float(content["M"][items.columns[index]]["N"])
                        + items.sum()[index]
                    )
    return item

2.1.6 Put Item

테이블에 값을 추가합니다.

테이블이 있는지 확인하여, 없는 경우 생성합니다.

2.1.5에서 생성한 아이템을 테이블에 추가합니다.

def dynamodb_put_item(name, items):
    """Add items from dataframe into AWS DynamoDB
    :param name: DynamoDB table name
    :param items: Groups of item which will be added to AWS DynamoDB
    """
    # Check a DynamoDB Table List
    dynamotb_tables = dynamodb_list_table()
    if name not in dynamotb_tables:
        import time
        table = dynamodb_create_table(name)
        while True:
            table = dynamodb_describe_table(name)
            if table["TableStatus"] != "ACTIVE":
                print(
                    f'Table Name: {table["TableName"]}  Status: {table["TableStatus"]}'
                )
                time.sleep(5)
            else:
                print(
                    f'Table Name: {table["TableName"]}  Status: {table["TableStatus"]}'
                )
                break
    import boto3
    import pandas as pd
    client = boto3.client("dynamodb")
    item = create_statistical_item(name, items)
    response = client.put_item(TableName=name, Item=item)
    return True
2.2 App.py

생성한 함수가 동작할 수 있게 App.py 파일에 추가합니다.

from dynamodb import dynamodb_put_item
...
def lambda_handler(event, context):
    ...
    upload_file(f"/tmp/{object_key.split('/')[-1]}", bucket, object_key)
    dynamodb_put_item("<your-table-name>", df_phys_join)
    print(df_phys_join)
2.3 결과

데이터가 정상적으로 들어간 것을 확인할 수 있습니다.

3. Athena

3.1 SAM Resource 추가

Athena ▶ Data Sources ▶ Connect data source 순으로 선택합니다.

Query a data source ▶ Amazon DynamoDB ▶ Next 순으로 선택합니다.

Configure new AWS Lambda function을 선택합니다.

SAM 리소스로 복사를 선택합니다.

SAM 구성 중 template.yaml 파일을 수정합니다.

Resource 항목 아래에 복사한 내용을 추가합니다. AthenaCatalogName, SpillBucket을 필요에 따라 변경합니다.

  AthenaDynamoDBConnector:
    Type: AWS::Serverless::Application
    Properties:
      Location:
        ApplicationId: arn:aws:serverlessrepo:us-east-1:292517598671:applications/AthenaDynamoDBConnector
        SemanticVersion: 2021.18.1
      Parameters:
        # The name you will give to this catalog in Athena. It will also be used as the function name. This name must satisfy the pattern ^[a-z0-9-_]{1,64}$
        AthenaCatalogName: dynamodbcatalog
        # WARNING: If set to 'true' encryption for spilled data is disabled.
        # DisableSpillEncryption: 'false' # Uncomment to override default value
        # Lambda memory in MB (min 128 - 3008 max).
        # LambdaMemory: '3008' # Uncomment to override default value
        # Maximum Lambda invocation runtime in seconds. (min 1 - 900 max)
        # LambdaTimeout: '900' # Uncomment to override default value
        # The name of the bucket where this function can spill data.
        SpillBucket: !Ref MyFilesBucket
        # The prefix within SpillBucket where this function can spill data.
        # SpillPrefix: 'athena-spill' # Uncomment to override default value 

변경사항을 build하여 Cloudformation에 반영합니다.

sam build --use-container --build-image amazon/aws-sam-cli-build-image-python3.8
sam package --template-file template.yaml --s3-bucket aws-sam-cli-managed-default-samclisourcebucket-aaa
sam deploy --stack-name sam-app
3.2 Athena 연결

다시 돌아와서 방금 생성한 함수를 선택한 후 Catalog name을 똑같이 입력합니다.

Data source ▶ Table ▶  ▶ Preview Table 순으로 선택합니다.

3.3 결과

Results에 DynamoDB에 저장된 값을 볼 수 있습니다.

다음 글 보기

이전 글 보기