0. 목표
- SAM 활용하여 CANedge 데이터 전처리 과정 자동화 (최종 목표)
- SAM 구성 시행착오 소개(Build 실패, Event 변경 등)
1. SAM 구성 (1단계)
[AWS] SAM(Serverless Application Model) 4번을 참고하여 SAM 구성을 시작합니다.
코드는 앞서 진행하였던 [AWS] CANedge 데이터 분석 Part 1 을 참고합니다.
1.1 폴더 구성
.aws-sam 폴더는 Build 후 생성됩니다. 무시하셔도 좋습니다.
[AWS] SAM(Serverless Application Model) 4번을 따라한 이후 예시로 사용할 dbc_files 및 LOG 파일을 같은 폴더에 넣습니다.
최종 구성된 폴더 구성은 아래와 같습니다.
1.2 app.py
마지막 부분에 CSV 파일로 변환하는 코드는 주석처리하였습니다. 당장은 사용하지 않을 예정입니다.
복사해서 붙여넣기만 합니다.
import mdf_iter
import canedge_browser
import can_decoder
import pandas as pd
import numpy as np
from datetime import datetime, timezone
from utils import (
setup_fs,
load_dbc_files,
restructure_data,
add_custom_sig,
ProcessData,
)
def lambda_handler(event, context):
# specify devices to process (from local/S3), DBC files and start time
devices = ["LOG/958D2219"]
dbc_paths = ["dbc_files/CSS-Electronics-SAE-J1939-DEMO.dbc"]
start = datetime(year=2020, month=1, day=13, hour=0, tzinfo=timezone.utc)
# setup filesystem (local/S3), load DBC files and list log files for processing
fs = setup_fs(s3=False, key="", secret="", endpoint="")
db_list = load_dbc_files(dbc_paths)
log_files = canedge_browser.get_log_files(fs, devices, start_date=start)
print(f"Found a total of {len(log_files)} log files")
# --------------------------------------------
# perform data processing of each log file
proc = ProcessData(fs, db_list, signals=[])
df_phys_all = pd.DataFrame()
for log_file in log_files:
df_raw, device_id = proc.get_raw_data(log_file)
df_phys = proc.extract_phys(df_raw)
proc.print_log_summary(device_id, log_file, df_phys)
df_phys_all = df_phys_all.append(df_phys)
# --------------------------------------------
# example: Add a custom signal
def ratio(s1, s2):
return s2 / s1 if s1 else np.nan
df_phys_all = add_custom_sig(
df_phys_all, "WheelBasedVehicleSpeed", "EngineSpeed", ratio, "RatioRpmSpeed"
)
print(df_phys_all.tail())
# --------------------------------------------
# example: resample and restructure data (parameters in columns)
df_phys_join = restructure_data(df_phys=df_phys_all, res="1S")
# df_phys_join.to_csv("output_joined.csv")
print(df_phys_join)
1.3 requirements.txt
필요한 라이브러리를 명시한 파일입니다.
numpy는 버전을 명시하여 build할 때 에러가 발생하지 않도록 합니다.
attrs
bitstruct
botocore
can-decoder
canedge-browser
canmatrix
click
docutils
fsspec
future
jmespath
mdf-iter
numpy==1.19.1
pandas
pathlib2
python-dateutil
pytz
s3fs
six
urllib3
1.3.1 Build Error (Optional)
문제 해결 과정입니다. 참고만 해주세요.
모든 작성을 마친 후, Build를 진행하였습니다.
sam build --use-container --build-image amazon/aws-sam-cli-build-image-python3.8
numpy 라이브러리만 받아오지 못했습니다. cpython과 관련이 있는 것으로 추측됩니다.
시도한 것
- sam build
- pip install wheel
에러는 달랐으나, 정상적으로 Build 되지 않은 것은 똑같습니다.
Error: PythonPipBuilder:ResolveDependencies - {numpy==1.20.3(wheel)}
해결
이 문제를 해결책은 numpy에만 버전을 명시하는 것입니다.
...
numpy==1.19.1
...
numpy 버전이 최신은 아니지만, 상관은 없으리라 생각합니다. (희망)
Build Succeeded
Built Artifacts : .aws-sambuild
Built Template : .aws-sambuildtemplate.yaml
Commands you can use next
=========================
[*] Invoke Function: sam local invoke
[*] Deploy: sam deploy --guided
1.4 utils.py
이 파일의 내용을 바탕으로 합니다.
복사한 내용 중 restructure_data 함수만 조금 수정합니다.
def restructure_data(df_phys, res):
import pandas as pd
df_phys_join = pd.DataFrame({"TimeStamp": []})
if not df_phys.empty:
for signal, data in df_phys.groupby("Signal"):
df_phys_join = pd.merge_ordered(
df_phys_join,
data["Physical Value"].rename(signal).resample(res).pad().dropna(),
on="TimeStamp",
fill_method="none",
).set_index("TimeStamp")
return df_phys_join
1.5 template.yaml
Timeout시간은 10분, 이벤트는 Cloudwatch Schedule입니다.
나중에 이벤트는 S3 업로드로 변경할 예정입니다.
Globals:
Function:
Timeout: 600
Resources:
HelloWorldFunction:
Type: AWS::Serverless::Function # More info about Function Resource: https://github.com/awslabs/serverless-application-model/blob/master/versions/2016-10-31.md#awsserverlessfunction
Properties:
CodeUri: analysis/
Handler: app.lambda_handler
Runtime: python3.8
Events:
CheckScheduledEvent:
Type: Schedule
Properties:
Schedule: rate(1 minute)
2. SAM Build
아래 명령어를 입력하여 Build를 진행합니다.
sam build --use-container --build-image amazon/aws-sam-cli-build-image-python3.8
Build가 끝나면, event를 발생하여 의도대로 동작하는지 살펴봅니다.
sam local invoke
Part 1에서 본 결과와 같은 것을 보니, 정상적으로 동작하네요!
Invoking app.lambda_handler (python3.8)
Skip pulling image and use local one: amazon/aws-sam-cli-emulation-image-python3.8:rapid-1.23.0.
Mounting D:Team CS11. Git2021-Comtec 1. Project10. SAMlambda-python3.8-CANEdge.aws-sambuildHelloWorldFunction as /var/task:ro,delegated inside runtime container
START RequestId: 16f6b690-b2c5-49c0-b45e-2e3f84d8b32c Version: $LATEST
[WARNING] 2021-05-24T05:33:02.240Z arxml is not supported
[WARNING] 2021-05-24T05:33:02.430Z kcd is not supported
[WARNING] 2021-05-24T05:33:02.455Z fibex is not supported
[WARNING] 2021-05-24T05:33:02.928Z xls is not supported
[WARNING] 2021-05-24T05:33:02.956Z xlsx is not supported
[WARNING] 2021-05-24T05:33:02.970Z yaml is not supported
Found a total of 2 log files
---------------
Device: 958D2219 | Log file: /00002501/00002081.MF4 [Extracted 21542 decoded frames]
Period: 2020-01-13 14:47:09.816750+00:00 - 2020-01-13 14:50:25.659800+00:00
---------------
Device: 958D2219 | Log file: /00002501/00002082.MF4 [Extracted 21542 decoded frames]
Period: 2020-01-13 14:50:25.670250+00:00 - 2020-01-13 14:53:41.502950+00:00
CAN ID PGN ... Raw Value Physical Value
TimeStamp ...
2020-01-13 14:53:41.472850+00:00 NaN NaN ... NaN 17.676361
2020-01-13 14:53:41.483450+00:00 NaN NaN ... NaN 17.676361
2020-01-13 14:53:41.493050+00:00 NaN NaN ... NaN 17.659955
2020-01-13 14:53:41.494800+00:00 NaN NaN ... NaN 17.668190
2020-01-13 14:53:41.502950+00:00 NaN NaN ... NaN 17.662221
[5 rows x 6 columns]
EngineSpeed RatioRpmSpeed WheelBasedVehicleSpeed
TimeStamp
2020-01-13 14:47:10+00:00 1537.875 79.598868 19.320312
2020-01-13 14:47:11+00:00 1085.000 52.546349 20.648438
2020-01-13 14:47:12+00:00 1239.875 63.659848 19.476562
2020-01-13 14:47:13+00:00 1399.625 64.028592 21.859375
2020-01-13 14:47:14+00:00 1583.625 63.924314 24.773438
... ... ... ...
2020-01-13 14:53:37+00:00 1478.375 17.811747 83.000000
2020-01-13 14:53:38+00:00 1464.125 17.668332 82.867188
2020-01-13 14:53:39+00:00 1455.250 17.287425 84.179688
2020-01-13 14:53:40+00:00 1484.625 17.693855 83.906250
2020-01-13 14:53:41+00:00 1479.625 17.708462 83.554688
[392 rows x 3 columns]
여기서 더 응용하여 Event도 변경하고, Local에 저장된 값이 아닌 S3에 저장된 값을 불러오는 것으로 바꿔봅니다.
3. SAM 구성 (2단계)
3.1 event.json
Cloudwatch Schedule 이벤트에서 S3 업로드 이벤트로 변경해봅니다.
sam local generate-event s3 put
생성한 내용을 event.json에 붙여넣습니다.
{
"Records": [
{
"eventVersion": "2.0",
"eventSource": "aws:s3",
"awsRegion": "us-east-1",
"eventTime": "1970-01-01T00:00:00.000Z",
"eventName": "ObjectCreated:Put",
"userIdentity": {
"principalId": "EXAMPLE"
},
"requestParameters": {
"sourceIPAddress": "127.0.0.1"
},
"responseElements": {
"x-amz-request-id": "EXAMPLE123456789",
"x-amz-id-2": "EXAMPLE123/5678abcdefghijklambdaisawesome/mnopqrstuvwxyzABCDEFGH"
},
"s3": {
"s3SchemaVersion": "1.0",
"configurationId": "testConfigRule",
"bucket": {
"name": "example-bucket",
"ownerIdentity": {
"principalId": "EXAMPLE"
},
"arn": "arn:aws:s3:::example-bucket"
},
"object": {
"key": "test/key",
"size": 1024,
"eTag": "0123456789abcdef0123456789abcdef",
"sequencer": "0A1B2C3D4E5F678901"
}
}
}
]
}
3.2 template.yaml
Global, Resource 부분만 변경을 합니다.
Timeout은 60초로 변경합니다. 데이터의 크기가 커지면 변경을 고려해야겠지만, 현재는 60초로 충분합니다.
# More info about Globals: https://github.com/awslabs/serverless-application-model/blob/master/docs/globals.rst
Globals:
Function:
Timeout: 60
Dataframe을 처리할 때 메모리 부족으로 인한 시간 초과를 방지하기 위하여 Size를 1024로 설정합니다.
실제 샘플 데이터를 변환할 때는 약 250MB의 메모리를 사용하였습니다.
HelloWorldFunction 함수의 Event를 Cloudwatch schedule에서 S3 Put Event로 변경합니다.
정책은 테스트 목적이기 때문에 S3 전체 권한을 주었으며, 향후 DB 사용 예정이기 때문에 DynamoDB 전체 권한도 추가합니다.
Resources:
HelloWorldFunction:
Type: AWS::Serverless::Function # More info about Function Resource: https://github.com/awslabs/serverless-application-model/blob/master/versions/2016-10-31.md#awsserverlessfunction
Properties:
CodeUri: analysis/
Handler: app.lambda_handler
MemorySize: 1024
Runtime: python3.8
Policies:
- AmazonS3FullAccess
- AmazonDynamoDBFullAccess
Events:
S3Event:
Type: S3
Properties:
Bucket:
Ref: MyFilesBucket # This must be the name of an S3 bucket declared in the same template file
Events: s3:ObjectCreated:*
Resources 항목 아래에 MyFilesBucket(S3)을 생성합니다. HelloWorldFunction Bucket Ref와 같은 이름으로 생성합니다.
MyFilesBucket:
Type: AWS::S3::Bucket
3.3 app.py
S3에 업로드 했을 때 발생하는 이벤트에서 로그 파일을 읽을 수 있도록 코드를 변경합니다.
편의상 ratio 함수는 lambda_handler 함수에서 제외하여 별도로 분리합니다.
def ratio(s1, s2):
return s2 / s1 if s1 else np.nan
def lambda_handler(event, context):
bucket = event["Records"][0]["s3"]["bucket"]["name"]
print(bucket)
# specify devices to process (from local/S3), DBC files and start time
devices = [f"{bucket}/LOG/958D2219"]
dbc_paths = ["dbc_files/CSS-Electronics-SAE-J1939-DEMO.dbc"]
start = datetime(year=2020, month=1, day=13, hour=0, tzinfo=timezone.utc)
4. SAM 배포
Deploy하기 전 변경사항을 다시 빌드합니다.
sam build --use-container --build-image amazon/aws-sam-cli-build-image-python3.8
AWS SAM은 Code deploy를 활용하여 쉽게 함수를 업데이트 할 수 있습니다.
S3 Bucket(template에서 생성한 S3 Bucket을 의미하는 것이 아닌 sam cli가 관리하는 Bucket)과 template을 명시합니다.
sam package --template-file template.yaml --s3-bucket aws-sam-cli-managed-default-samclisourcebucket-aaa
기존의 Cloudformation Stack을 update하는 방식으로 변경합니다.
sam deploy --stack-name sam-app
5. 중간결과
- 기대한 람다 실행 횟수는 2회이지만, 여러번 반복해서 실행되는 문제 발생 (에러 발생으로 중복 실행 ▶ 에러 해결 필요)
- 원래 코드는 여러 개의 파일을 읽어 하나의 데이터프레임으로 합치는 과정이 있는데, 실시간 작업엔 적합하지 않음 ▶ 합치는 과정은 제외
- 모듈까지 묶어서 zip으로 올라가 잘 실행되는 것을 확인 (SAM 좋아!)
다음 글 보기
이전 글 보기