0. 목표

  • SAM 활용하여 CANedge 데이터 전처리 과정 자동화 (최종 목표)
  • SAM 구성 시행착오 소개(Build 실패, Event 변경 등)

1. SAM 구성 (1단계)

[AWS] SAM(Serverless Application Model) 4번을 참고하여 SAM 구성을 시작합니다.

코드는 앞서 진행하였던 [AWS] CANedge 데이터 분석 Part 1 을 참고합니다.

1.1 폴더 구성

.aws-sam 폴더는 Build 후 생성됩니다. 무시하셔도 좋습니다.

[AWS] SAM(Serverless Application Model) 4번을 따라한 이후 예시로 사용할 dbc_files 및 LOG 파일을 같은 폴더에 넣습니다.

최종 구성된 폴더 구성은 아래와 같습니다.

1.2 app.py

마지막 부분에 CSV 파일로 변환하는 코드는 주석처리하였습니다. 당장은 사용하지 않을 예정입니다.

복사해서 붙여넣기만 합니다.

import mdf_iter
import canedge_browser
import can_decoder
import pandas as pd
import numpy as np
from datetime import datetime, timezone
from utils import (
    setup_fs,
    load_dbc_files,
    restructure_data,
    add_custom_sig,
    ProcessData,
)
def lambda_handler(event, context):
    # specify devices to process (from local/S3), DBC files and start time
    devices = ["LOG/958D2219"]
    dbc_paths = ["dbc_files/CSS-Electronics-SAE-J1939-DEMO.dbc"]
    start = datetime(year=2020, month=1, day=13, hour=0, tzinfo=timezone.utc)
    # setup filesystem (local/S3), load DBC files and list log files for processing
    fs = setup_fs(s3=False, key="", secret="", endpoint="")
    db_list = load_dbc_files(dbc_paths)
    log_files = canedge_browser.get_log_files(fs, devices, start_date=start)
    print(f"Found a total of {len(log_files)} log files")
    # --------------------------------------------
    # perform data processing of each log file
    proc = ProcessData(fs, db_list, signals=[])
    df_phys_all = pd.DataFrame()
    for log_file in log_files:
        df_raw, device_id = proc.get_raw_data(log_file)
        df_phys = proc.extract_phys(df_raw)
        proc.print_log_summary(device_id, log_file, df_phys)
        df_phys_all = df_phys_all.append(df_phys)
    # --------------------------------------------
    # example: Add a custom signal
    def ratio(s1, s2):
        return s2 / s1 if s1 else np.nan
    df_phys_all = add_custom_sig(
        df_phys_all, "WheelBasedVehicleSpeed", "EngineSpeed", ratio, "RatioRpmSpeed"
    )
    print(df_phys_all.tail())
    # --------------------------------------------
    # example: resample and restructure data (parameters in columns)
    df_phys_join = restructure_data(df_phys=df_phys_all, res="1S")
    # df_phys_join.to_csv("output_joined.csv")
    print(df_phys_join)
1.3 requirements.txt

필요한 라이브러리를 명시한 파일입니다.

numpy는 버전을 명시하여 build할 때 에러가 발생하지 않도록 합니다.

attrs
bitstruct
botocore
can-decoder
canedge-browser
canmatrix
click
docutils
fsspec
future
jmespath
mdf-iter
numpy==1.19.1
pandas
pathlib2
python-dateutil
pytz
s3fs
six
urllib3
1.3.1 Build Error (Optional)

문제 해결 과정입니다. 참고만 해주세요.

모든 작성을 마친 후, Build를 진행하였습니다.

sam build --use-container --build-image amazon/aws-sam-cli-build-image-python3.8

numpy 라이브러리만 받아오지 못했습니다. cpython과 관련이 있는 것으로 추측됩니다.

시도한 것

  • sam build
  • pip install wheel

에러는 달랐으나, 정상적으로 Build 되지 않은 것은 똑같습니다.

Error: PythonPipBuilder:ResolveDependencies - {numpy==1.20.3(wheel)}

해결

이 문제를 해결책은 numpy에만 버전을 명시하는 것입니다.

...
numpy==1.19.1
...

numpy 버전이 최신은 아니지만, 상관은 없으리라 생각합니다. (희망)

Build Succeeded
Built Artifacts  : .aws-sambuild
Built Template   : .aws-sambuildtemplate.yaml
Commands you can use next
=========================
[*] Invoke Function: sam local invoke
[*] Deploy: sam deploy --guided
1.4 utils.py

이 파일의 내용을 바탕으로 합니다.

복사한 내용 중 restructure_data 함수만 조금 수정합니다.

def restructure_data(df_phys, res):
    import pandas as pd
    df_phys_join = pd.DataFrame({"TimeStamp": []})
    if not df_phys.empty:
        for signal, data in df_phys.groupby("Signal"):
            df_phys_join = pd.merge_ordered(
                df_phys_join,
                data["Physical Value"].rename(signal).resample(res).pad().dropna(),
                on="TimeStamp",
                fill_method="none",
            ).set_index("TimeStamp")
    return df_phys_join
1.5 template.yaml

Timeout시간은 10분, 이벤트는 Cloudwatch Schedule입니다.

나중에 이벤트는 S3 업로드로 변경할 예정입니다.

Globals:
  Function:
    Timeout: 600
Resources:
  HelloWorldFunction:
    Type: AWS::Serverless::Function # More info about Function Resource: https://github.com/awslabs/serverless-application-model/blob/master/versions/2016-10-31.md#awsserverlessfunction
    Properties:
      CodeUri: analysis/
      Handler: app.lambda_handler
      Runtime: python3.8
      Events:
        CheckScheduledEvent:
          Type: Schedule
          Properties:
            Schedule: rate(1 minute)

2. SAM Build

아래 명령어를 입력하여 Build를 진행합니다.

sam build --use-container --build-image amazon/aws-sam-cli-build-image-python3.8

Build가 끝나면, event를 발생하여 의도대로 동작하는지 살펴봅니다.

sam local invoke

Part 1에서 본 결과와 같은 것을 보니, 정상적으로 동작하네요!

Invoking app.lambda_handler (python3.8)
Skip pulling image and use local one: amazon/aws-sam-cli-emulation-image-python3.8:rapid-1.23.0.
Mounting D:Team CS11. Git2021-Comtec1. Project10. SAMlambda-python3.8-CANEdge.aws-sambuildHelloWorldFunction as /var/task:ro,delegated inside runtime container
START RequestId: 16f6b690-b2c5-49c0-b45e-2e3f84d8b32c Version: $LATEST
[WARNING]       2021-05-24T05:33:02.240Z                arxml is not supported
[WARNING]       2021-05-24T05:33:02.430Z                kcd is not supported
[WARNING]       2021-05-24T05:33:02.455Z                fibex is not supported
[WARNING]       2021-05-24T05:33:02.928Z                xls is not supported
[WARNING]       2021-05-24T05:33:02.956Z                xlsx is not supported
[WARNING]       2021-05-24T05:33:02.970Z                yaml is not supported
Found a total of 2 log files
---------------
Device: 958D2219 | Log file: /00002501/00002081.MF4 [Extracted 21542 decoded frames]
Period: 2020-01-13 14:47:09.816750+00:00 - 2020-01-13 14:50:25.659800+00:00
---------------
Device: 958D2219 | Log file: /00002501/00002082.MF4 [Extracted 21542 decoded frames]
Period: 2020-01-13 14:50:25.670250+00:00 - 2020-01-13 14:53:41.502950+00:00
                                  CAN ID  PGN  ...  Raw Value Physical Value
TimeStamp                                      ...
2020-01-13 14:53:41.472850+00:00     NaN  NaN  ...        NaN      17.676361
2020-01-13 14:53:41.483450+00:00     NaN  NaN  ...        NaN      17.676361
2020-01-13 14:53:41.493050+00:00     NaN  NaN  ...        NaN      17.659955
2020-01-13 14:53:41.494800+00:00     NaN  NaN  ...        NaN      17.668190
2020-01-13 14:53:41.502950+00:00     NaN  NaN  ...        NaN      17.662221
[5 rows x 6 columns]
                           EngineSpeed  RatioRpmSpeed  WheelBasedVehicleSpeed
TimeStamp
2020-01-13 14:47:10+00:00     1537.875      79.598868               19.320312
2020-01-13 14:47:11+00:00     1085.000      52.546349               20.648438
2020-01-13 14:47:12+00:00     1239.875      63.659848               19.476562
2020-01-13 14:47:13+00:00     1399.625      64.028592               21.859375
2020-01-13 14:47:14+00:00     1583.625      63.924314               24.773438
...                                ...            ...                     ...
2020-01-13 14:53:37+00:00     1478.375      17.811747               83.000000
2020-01-13 14:53:38+00:00     1464.125      17.668332               82.867188
2020-01-13 14:53:39+00:00     1455.250      17.287425               84.179688
2020-01-13 14:53:40+00:00     1484.625      17.693855               83.906250
2020-01-13 14:53:41+00:00     1479.625      17.708462               83.554688
[392 rows x 3 columns]

여기서 더 응용하여 Event도 변경하고, Local에 저장된 값이 아닌 S3에 저장된 값을 불러오는 것으로 바꿔봅니다.

3. SAM 구성 (2단계)

3.1 event.json

Cloudwatch Schedule 이벤트에서 S3 업로드 이벤트로 변경해봅니다.

sam local generate-event s3 put

생성한 내용을 event.json에 붙여넣습니다.

{
  "Records": [
    {
      "eventVersion": "2.0",
      "eventSource": "aws:s3",
      "awsRegion": "us-east-1",
      "eventTime": "1970-01-01T00:00:00.000Z",
      "eventName": "ObjectCreated:Put",
      "userIdentity": {
        "principalId": "EXAMPLE"
      },
      "requestParameters": {
        "sourceIPAddress": "127.0.0.1"
      },
      "responseElements": {
        "x-amz-request-id": "EXAMPLE123456789",
        "x-amz-id-2": "EXAMPLE123/5678abcdefghijklambdaisawesome/mnopqrstuvwxyzABCDEFGH"
      },
      "s3": {
        "s3SchemaVersion": "1.0",
        "configurationId": "testConfigRule",
        "bucket": {
          "name": "example-bucket",
          "ownerIdentity": {
            "principalId": "EXAMPLE"
          },
          "arn": "arn:aws:s3:::example-bucket"
        },
        "object": {
          "key": "test/key",
          "size": 1024,
          "eTag": "0123456789abcdef0123456789abcdef",
          "sequencer": "0A1B2C3D4E5F678901"
        }
      }
    }
  ]
}
3.2 template.yaml

Global, Resource 부분만 변경을 합니다.

Timeout은 60초로 변경합니다. 데이터의 크기가 커지면 변경을 고려해야겠지만, 현재는 60초로 충분합니다.

# More info about Globals: https://github.com/awslabs/serverless-application-model/blob/master/docs/globals.rst
Globals:
  Function:
    Timeout: 60

Dataframe을 처리할 때 메모리 부족으로 인한 시간 초과를 방지하기 위하여 Size를 1024로 설정합니다.

실제 샘플 데이터를 변환할 때는 약 250MB의 메모리를 사용하였습니다.

HelloWorldFunction 함수의 Event를 Cloudwatch schedule에서 S3 Put Event로 변경합니다.

정책은 테스트 목적이기 때문에 S3 전체 권한을 주었으며, 향후 DB 사용 예정이기 때문에 DynamoDB 전체 권한도 추가합니다.

Resources:
  HelloWorldFunction:
    Type: AWS::Serverless::Function # More info about Function Resource: https://github.com/awslabs/serverless-application-model/blob/master/versions/2016-10-31.md#awsserverlessfunction
    Properties:
      CodeUri: analysis/
      Handler: app.lambda_handler
      MemorySize: 1024
      Runtime: python3.8
      Policies:
        - AmazonS3FullAccess
        - AmazonDynamoDBFullAccess
      Events:
        S3Event:
          Type: S3
          Properties:
            Bucket:
              Ref: MyFilesBucket     # This must be the name of an S3 bucket declared in the same template file
            Events: s3:ObjectCreated:*

Resources 항목 아래에  MyFilesBucket(S3)을 생성합니다. HelloWorldFunction Bucket Ref와 같은 이름으로 생성합니다.

  MyFilesBucket:
    Type: AWS::S3::Bucket
3.3 app.py

S3에 업로드 했을 때 발생하는 이벤트에서 로그 파일을 읽을 수 있도록 코드를 변경합니다.

편의상 ratio 함수는 lambda_handler 함수에서 제외하여 별도로 분리합니다.

def ratio(s1, s2):
    return s2 / s1 if s1 else np.nan
def lambda_handler(event, context):
    bucket = event["Records"][0]["s3"]["bucket"]["name"]
    print(bucket)
    # specify devices to process (from local/S3), DBC files and start time
    devices = [f"{bucket}/LOG/958D2219"]
    dbc_paths = ["dbc_files/CSS-Electronics-SAE-J1939-DEMO.dbc"]
    start = datetime(year=2020, month=1, day=13, hour=0, tzinfo=timezone.utc)

4. SAM 배포

Deploy하기 전 변경사항을 다시 빌드합니다.

sam build --use-container --build-image amazon/aws-sam-cli-build-image-python3.8

AWS SAM은 Code deploy를 활용하여 쉽게 함수를 업데이트 할 수 있습니다.

S3 Bucket(template에서 생성한 S3 Bucket을 의미하는 것이 아닌 sam cli가 관리하는 Bucket)과 template을 명시합니다.

sam package --template-file template.yaml --s3-bucket aws-sam-cli-managed-default-samclisourcebucket-aaa

기존의 Cloudformation Stack을 update하는 방식으로 변경합니다.

sam deploy --stack-name sam-app

5. 중간결과

  • 기대한 람다 실행 횟수는 2회이지만, 여러번 반복해서 실행되는 문제 발생 (에러 발생으로 중복 실행 ▶ 에러 해결 필요)
  • 원래 코드는 여러 개의 파일을 읽어 하나의 데이터프레임으로 합치는 과정이 있는데, 실시간 작업엔 적합하지 않음 ▶ 합치는 과정은 제외
  • 모듈까지 묶어서 zip으로 올라가 잘 실행되는 것을 확인 (SAM 좋아!)

다음 글 보기

이전 글 보기