0. 목표
- CANedge 데이터 전처리 과정 살펴보기 (with SageMaker)
- 데이터 분석에서 많이 사용하는 Pandas 라이브러리 맛보기
1. 준비
1.1 Git Clone
링크를 눌러 CSS-Electronics/api-examples Github에 들어갑니다.
Git clone으로 CANedge 데이터 처리를 따라해볼 수 있는 자료를 다운받습니다.
본 과정은 SageMaker에서 진행하여서 git clone 명령어 앞에 ! (느낌표)가 있습니다.
!git clone https://github.com/CSS-Electronics/api-examples
아래 사진처럼 회색 네모 박스(Cell)에 내용을 넣은 다음 Shift + Enter로 Cell을 실행합니다.
실행 결과는 아래 하얀 바탕에 나타납니다.
현재 폴더에 api-examples 폴더가 생겼습니다.
1.2 필수 라이브러리 설치
api-examples/examples/data-processing/requirements.txt에 정의한 필수 라이브러리를 설치합니다.
!pip install -r api-examples/examples/data-processing/requirements.txt
의존성 충돌로 인하여 에러가 발생하였지만, 눈 꼭 감고 넘어갑니다.
ERROR: pip's dependency resolver does not currently take into account all the packages that are installed. This behaviour is the source of the following dependency conflicts.
anaconda-project 0.9.1 requires ruamel-yaml, which is not installed.
boto3 1.17.70 requires botocore<1.21.0,>=1.20.70, but you have botocore 1.17.43 which is incompatible.
awscli 1.19.70 requires botocore==1.20.70, but you have botocore 1.17.43 which is incompatible.
aiobotocore 1.2.1 requires botocore<1.19.53,>=1.19.52, but you have botocore 1.17.43 which is incompatible.
2. Data Processing
2.1 시작
편의를 위해, api-examples/examples/data-processing 폴더 아래에 ipynb 파일을 생성하여 진행합니다.
api-examples/examples/data-processing/process_data.py 파일을 참고하여 진행합니다.
2.2 Module Import
필요한 모듈을 불러옵니다.
import mdf_iter # MDF4 데이터 불러오는 라이브러리
import canedge_browser # 로컬 또는 S3에 저장된 데이터 가져오는 라이브러리
import can_decoder # 데이터를 복호화하는 라이브러리
import pandas as pd # 데이터분석 라이브러리
from datetime import datetime, timezone # 날짜와 시간 데이터를 가져올 수 있는 파이썬 라이브러리
from utils import setup_fs, load_dbc_files, restructure_data, add_custom_sig, ProcessData # 폴더 내 위치한 Custom 라이브러리
2.3 변수 설정
기기 및 경로, 시간을 지정합니다.
# specify devices to process (from local/S3), DBC files and start time
devices = ["LOG/958D2219"]
dbc_paths = ["dbc_files/CSS-Electronics-SAE-J1939-DEMO.dbc"]
start = datetime(year=2020, month=1, day=13, hour=0, tzinfo=timezone.utc)
s3 또는 로컬에 저장한 파일을 불러옵니다.
만약 s3에 저장한 경우라면, s3=True를 입력합니다.
# setup filesystem (local/S3), load DBC files and list log files for processing
fs = setup_fs(s3=False, key="", secret="", endpoint="")
List 형태로 입력한 dbc 파일 경로를 받아 변환 규칙 데이터베이스 List를 생성합니다.
db_list = load_dbc_files(dbc_paths)
참고- CAN DBC file
자세한 내용은 여기를 참고하세요.
가공하지 않은 CAN data를 살펴보면, 16진수로 사람이 이해할 수 없습니다.
이를 사람이 읽을 수 있게 변환이 필요합니다. CAN DBC는 CAN ID에 맞는 변환 규칙을 갖고 있습니다.
실습에 사용하는 코드는 같은 폴더 내에 위치한 모든 로그 파일을 읽어 하나의 데이터프레임으로 만드는데 사용합니다.
canedge_browser.get_log_files는 조건에 맞는 로그 파일 경로를 List로 반환합니다.
log_files = canedge_browser.get_log_files(fs, devices, start_date=start)
print(f"Found a total of {len(log_files)} log files")
# Expected
" Found a total of 2 log files "
print(log_files)
# Expected
"['/LOG/958D2219/00002501/00002081.MF4', '/LOG/958D2219/00002501/00002082.MF4']"
2.4 데이터 처리
데이터 처리하는 ProcessData 클래스의 인스턴스를 생성합니다.
proc = ProcessData(fs, db_list, signals=[])
데이터프레임(Dataframe)을 생성합니다.
df_phys_all = pd.DataFrame()
참고 – Dataframe
데이터프레임은 2차원 형태의 테이블 값입니다.
실제 로그 파일로부터 데이터를 불러와, 복호화하는 과정을 거쳐 데이터프레임에 추가합니다.
for log_file in log_files:
df_raw, device_id = proc.get_raw_data(log_file)
df_phys = proc.extract_phys(df_raw)
proc.print_log_summary(device_id, log_file, df_phys)
df_phys_all = df_phys_all.append(df_phys)
# Expected
'''
---------------
Device: 958D2219 | Log file: /00002501/00002081.MF4 [Extracted 21542 decoded frames]
Period: 2020-01-13 14:47:09.816750+00:00 - 2020-01-13 14:50:25.659800+00:00
---------------
Device: 958D2219 | Log file: /00002501/00002082.MF4 [Extracted 21542 decoded frames]
Period: 2020-01-13 14:50:25.670250+00:00 - 2020-01-13 14:53:41.502950+00:00
'''
참고 – Dataframe 2
데이터 전처리시 주로 사용하는 함수들을 살펴봅니다.
info는 데이터프레임의 전체 정보를 요약하여 출력합니다.
(43084, 6)의 2차원 데이터를 갖고 있습니다.
df_phys_all.info()
# Expected
'''
<class 'pandas.core.frame.DataFrame'>
DatetimeIndex: 43084 entries, 2020-01-13 14:47:09.816750+00:00 to 2020-01-13 14:53:41.502950+00:00
Data columns (total 6 columns):
# Column Non-Null Count Dtype
--- ------ -------------- -----
0 CAN ID 43084 non-null uint32
1 PGN 43084 non-null int64
2 Source Address 43084 non-null uint32
3 Signal 43084 non-null object
4 Raw Value 43084 non-null uint16
5 Physical Value 43084 non-null float64
dtypes: float64(1), int64(1), object(1), uint16(1), uint32(2)
memory usage: 1.7+ MB
'''
head는 가장 먼저 나오는 5개의 데이터를 보여줍니다.
테이블이 대략적으로 어떻게 구성되어있는지 알아볼 때 사용합니다.
df_phys_all.head()
'''
CAN ID PGN Source Address Signal Raw Value Physical Value
TimeStamp
2020-01-13 14:47:09.816750+00:00 217056256 61444 0 EngineSpeed 13596 1699.500
2020-01-13 14:47:09.826750+00:00 217056256 61444 0 EngineSpeed 13554 1694.250
2020-01-13 14:47:09.836800+00:00 217056256 61444 0 EngineSpeed 13495 1686.875
2020-01-13 14:47:09.846750+00:00 217056256 61444 0 EngineSpeed 13418 1677.250
2020-01-13 14:47:09.856850+00:00 217056256 61444 0 EngineSpeed 13331 1666.375
'''
ratio 함수를 생성합니다.
기존의 값으로 새로운 값을 만드는 함수입니다.
def ratio(s1, s2):
return s2 / s1 if s1 else np.nan
두 메세지(Signal) 및 함수를 기반으로 하여 새로운 메세지를 만드는 함수(add_custom_sig)를 호출합니다.
df_phys_all = add_custom_sig(df_phys_all, "WheelBasedVehicleSpeed", "EngineSpeed", ratio, "RatioRpmSpeed")
참고 – add_custom_sig 함수
아래 함수가 어떤 역할을 하는지 예시를 통해 살펴봅니다.
def add_custom_sig(df_phys, signal1, signal2, function, new_signal):
"""Helper function for calculating a new signal based on two signals and a function.
Returns a dataframe with the new signal name and physical values
"""
import pandas as pd
try:
s1 = df_phys[df_phys["Signal"] == signal1]["Physical Value"].rename(signal1)
s2 = df_phys[df_phys["Signal"] == signal2]["Physical Value"].rename(signal2)
df_new_sig = pd.merge_ordered(s1, s2, on="TimeStamp", fill_method="ffill",).set_index("TimeStamp")
df_new_sig = df_new_sig.apply(lambda x: function(x[0], x[1]), axis=1).dropna().rename("Physical Value").to_frame()
df_new_sig["Signal"] = new_signal
df_phys = df_phys.append(df_new_sig)
except:
print(f"Warning: Custom signal {new_signal} not createdn")
return df_phys
동작 방식 살펴보기
앞에서 살펴본 데이터프레임을 축소하여 (7, 4)의 데이터프레임을 생성합니다.
“TimeStamp” Column을 Index로 변경합니다.
d = {'TimeStamp': ["2020-01-13","2020-01-14","2020-01-15","2020-01-16","2020-01-17","2020-01-18","2020-01-19"], 'Signal': [1, 2, 1, 2, 1, 2, 7], 'Raw Value': [11, 12, 13, 14, 15, 16, 17], 'Physical Value': range(7)}
df = pd.DataFrame(data=d)
df = df.set_index("TimeStamp")
“Signal” Column에서 값이 1인 row만 찾을 때는 아래의 명령어를 사용합니다.
df[df["Signal"] == 1]
“Signal” Column에서 값이 1인 row 데이터 중 “Physical Value” Column만 나타냅니다.
df[df["Signal"] == 1]["Physical Value"]
Name이 Physical Value로 나와있습니다.
“rename” Method를 통해 Name을 바꿀 수 있습니다.
df1 = df[df["Signal"] == 1]["Physical Value"].rename("a")
같은 방식으로 “Signal” Column의 값이 2인 “Physical Value” Column의 Name을 b로 변경합니다.
df2 = df[df["Signal"] == 2]["Physical Value"].rename("b")
df1과 df2를 합쳐봅니다.
df_new_sig = pd.merge_ordered(df1, df2, on="TimeStamp", fill_method="ffill",).set_index("TimeStamp")
df1, df2는 TimeStamp가 겹치지 않는 개별의 Series입니다. Merge하면서 비어있는 TimeStamp값은 그 이전 TimeStamp값을 참조하여 값이 들어가있습니다. (fill_method=”ffill”)
TimeStamp 인덱스 기준으로 합쳐졌습니다.
이제 함수를 적용합니다.
x[0]은 a column, x[1]은 b column입니다.
df_new_sig.apply(lambda x: ratio(x[0], x[1]), axis=1)
다시 한번 ratio 함수를 가져왔습니다.
2020-01-13, 2020-01-14는 0이라서 np.nan을
나머지 TimeStamp는 x[1] / x[0] 값을 반환합니다.
def ratio(s1, s2):
import numpy as np
return s2 / s1 if s1 else np.nan
결과를 볼까요?
x[0]가 0이었던 TimStamp는 NaN, 나머지는 실수를 반환하였습니다.
위의 테이블에서 NaN값을 제거하는 “dropna” Method와 이름을 변경하는 “rename” Method를 호출하였습니다.
이름은 “Physical Value”로 변경하였네요.
df_new_sig.apply(lambda x: ratio(x[0], x[1]), axis=1).dropna().rename("Physical Value")
Physical Value Series를 Dataframe으로 반환합니다.
df_new_sig.apply(lambda x: ratio(x[0], x[1]), axis=1).dropna().rename("Physical Value").to_frame()
마지막으로 Physical Value와 나란하게 Signal을 붙여줍니다.
new_signal = "RatioRpmSpeed"
df_new_sig["Signal"] = new_signal
마지막으로 원래 데이터프레임에 새로운 데이터프레임을 추가합니다.
df_phys = df_phys.append(df_new_sig)
다시 돌아와, add_custom_sig 함수로 Custom signal이 추가된 데이터프레임을 보면,
append는 데이터프레임 마지막 행 아래에 이어 추가합니다.
기존의 데이터프레임 아래에 추가된 것을 볼 수 있습니다.
df_phys_all.tail()
restructure_data 함수를 통해 새로운 데이터프레임을 생성합니다.
df_phys_join = restructure_data(df_phys=df_phys_all, res="1S")
print(df_phys_join)
함수 이름과 같이 전체 데이터를 정리하여 Signal 데이터만 새로운 데이터프레임으로 생성합니다.
새로운 데이터프레임을 CSV 파일로 변환합니다.
df_phys_join.to_csv("output_joined.csv")
생성한 CSV파일을 갖고 분석 Part2를 진행합니다.
다음 글 보기
이전 글 보기