0. 목표

  • CANedge 데이터 전처리 과정 살펴보기 (with SageMaker)
  • 데이터 분석에서 많이 사용하는 Pandas 라이브러리 맛보기

1. 준비

1.1 Git Clone

링크를 눌러 CSS-Electronics/api-examples Github에 들어갑니다.

Git clone으로 CANedge 데이터 처리를 따라해볼 수 있는 자료를 다운받습니다.

본 과정은 SageMaker에서 진행하여서 git clone 명령어 앞에 ! (느낌표)가 있습니다.

!git clone https://github.com/CSS-Electronics/api-examples

아래 사진처럼 회색 네모 박스(Cell)에 내용을 넣은 다음 Shift + Enter로 Cell을 실행합니다.

실행 결과는 아래 하얀 바탕에 나타납니다.

현재 폴더에 api-examples 폴더가 생겼습니다.

1.2 필수 라이브러리 설치

api-examples/examples/data-processing/requirements.txt에 정의한 필수 라이브러리를 설치합니다.

!pip install -r api-examples/examples/data-processing/requirements.txt

의존성 충돌로 인하여 에러가 발생하였지만, 눈 꼭 감고 넘어갑니다.

ERROR: pip's dependency resolver does not currently take into account all the packages that are installed. This behaviour is the source of the following dependency conflicts.
anaconda-project 0.9.1 requires ruamel-yaml, which is not installed.
boto3 1.17.70 requires botocore<1.21.0,>=1.20.70, but you have botocore 1.17.43 which is incompatible.
awscli 1.19.70 requires botocore==1.20.70, but you have botocore 1.17.43 which is incompatible.
aiobotocore 1.2.1 requires botocore<1.19.53,>=1.19.52, but you have botocore 1.17.43 which is incompatible.

2. Data Processing

2.1 시작

편의를 위해, api-examples/examples/data-processing 폴더 아래에 ipynb 파일을 생성하여 진행합니다.

api-examples/examples/data-processing/process_data.py 파일을 참고하여 진행합니다.

2.2 Module Import

필요한 모듈을 불러옵니다.

import mdf_iter               # MDF4 데이터 불러오는 라이브러리
import canedge_browser        # 로컬 또는 S3에 저장된 데이터 가져오는 라이브러리
import can_decoder            # 데이터를 복호화하는 라이브러리
import pandas as pd           # 데이터분석 라이브러리
from datetime import datetime, timezone  # 날짜와 시간 데이터를 가져올 수 있는 파이썬 라이브러리
from utils import setup_fs, load_dbc_files, restructure_data, add_custom_sig, ProcessData # 폴더 내 위치한 Custom 라이브러리
2.3 변수 설정

기기 및 경로, 시간을 지정합니다.

# specify devices to process (from local/S3), DBC files and start time
devices = ["LOG/958D2219"]
dbc_paths = ["dbc_files/CSS-Electronics-SAE-J1939-DEMO.dbc"]
start = datetime(year=2020, month=1, day=13, hour=0, tzinfo=timezone.utc)

s3 또는 로컬에 저장한 파일을 불러옵니다. 

만약 s3에 저장한 경우라면, s3=True를 입력합니다.

# setup filesystem (local/S3), load DBC files and list log files for processing
fs = setup_fs(s3=False, key="", secret="", endpoint="")

List 형태로 입력한 dbc 파일 경로를 받아 변환 규칙 데이터베이스 List를 생성합니다.

db_list = load_dbc_files(dbc_paths)
참고- CAN DBC file

자세한 내용은 여기를 참고하세요.

가공하지 않은 CAN data를 살펴보면, 16진수로 사람이 이해할 수 없습니다.

이를 사람이 읽을 수 있게 변환이 필요합니다. CAN DBC는 CAN ID에 맞는 변환 규칙을 갖고 있습니다.

실습에 사용하는 코드는 같은 폴더 내에 위치한 모든 로그 파일을 읽어 하나의 데이터프레임으로 만드는데 사용합니다.

canedge_browser.get_log_files는 조건에 맞는 로그 파일 경로를 List로 반환합니다.

log_files = canedge_browser.get_log_files(fs, devices, start_date=start)
print(f"Found a total of {len(log_files)} log files")
# Expected
" Found a total of 2 log files "
print(log_files)
# Expected
"['/LOG/958D2219/00002501/00002081.MF4', '/LOG/958D2219/00002501/00002082.MF4']"
2.4 데이터 처리

데이터 처리하는 ProcessData 클래스의 인스턴스를 생성합니다.

proc = ProcessData(fs, db_list, signals=[])

데이터프레임(Dataframe)을 생성합니다.

df_phys_all = pd.DataFrame()
참고 – Dataframe

데이터프레임은 2차원 형태의 테이블 값입니다.

실제 로그 파일로부터 데이터를 불러와, 복호화하는 과정을 거쳐 데이터프레임에 추가합니다.

for log_file in log_files:
    df_raw, device_id = proc.get_raw_data(log_file)
    df_phys = proc.extract_phys(df_raw)
    proc.print_log_summary(device_id, log_file, df_phys)
    df_phys_all = df_phys_all.append(df_phys)
# Expected
'''
---------------
Device: 958D2219 | Log file: /00002501/00002081.MF4 [Extracted 21542 decoded frames]
Period: 2020-01-13 14:47:09.816750+00:00 - 2020-01-13 14:50:25.659800+00:00
---------------
Device: 958D2219 | Log file: /00002501/00002082.MF4 [Extracted 21542 decoded frames]
Period: 2020-01-13 14:50:25.670250+00:00 - 2020-01-13 14:53:41.502950+00:00
'''
참고 – Dataframe 2

데이터 전처리시 주로 사용하는 함수들을 살펴봅니다.

info는 데이터프레임의 전체 정보를 요약하여 출력합니다.

(43084, 6)의 2차원 데이터를 갖고 있습니다.

df_phys_all.info()
# Expected
'''
<class 'pandas.core.frame.DataFrame'>
DatetimeIndex: 43084 entries, 2020-01-13 14:47:09.816750+00:00 to 2020-01-13 14:53:41.502950+00:00
Data columns (total 6 columns):
 #   Column          Non-Null Count  Dtype
---  ------          --------------  -----
 0   CAN ID          43084 non-null  uint32
 1   PGN             43084 non-null  int64
 2   Source Address  43084 non-null  uint32
 3   Signal          43084 non-null  object
 4   Raw Value       43084 non-null  uint16
 5   Physical Value  43084 non-null  float64
dtypes: float64(1), int64(1), object(1), uint16(1), uint32(2)
memory usage: 1.7+ MB
'''

head는 가장 먼저 나오는 5개의 데이터를 보여줍니다.

테이블이 대략적으로 어떻게 구성되어있는지 알아볼 때 사용합니다.

df_phys_all.head()
'''
	                                   CAN ID	PGN	    Source Address	     Signal	Raw Value	Physical Value
                       TimeStamp
2020-01-13 14:47:09.816750+00:00	217056256	61444	             0	EngineSpeed	   13596	1699.500
2020-01-13 14:47:09.826750+00:00	217056256	61444	             0	EngineSpeed	   13554	1694.250
2020-01-13 14:47:09.836800+00:00	217056256	61444	             0	EngineSpeed	   13495	1686.875
2020-01-13 14:47:09.846750+00:00	217056256	61444	             0	EngineSpeed	   13418	1677.250
2020-01-13 14:47:09.856850+00:00	217056256	61444	             0	EngineSpeed	   13331	1666.375
'''

ratio 함수를 생성합니다.

기존의 값으로 새로운 값을 만드는 함수입니다.

def ratio(s1, s2):
    return s2 / s1 if s1 else np.nan

두 메세지(Signal) 및 함수를 기반으로 하여 새로운 메세지를 만드는 함수(add_custom_sig)를 호출합니다.

df_phys_all = add_custom_sig(df_phys_all, "WheelBasedVehicleSpeed", "EngineSpeed", ratio, "RatioRpmSpeed")
참고 – add_custom_sig 함수

아래 함수가 어떤 역할을 하는지 예시를 통해 살펴봅니다.

def add_custom_sig(df_phys, signal1, signal2, function, new_signal):
    """Helper function for calculating a new signal based on two signals and a function.
    Returns a dataframe with the new signal name and physical values
    """
    import pandas as pd
    try:
        s1 = df_phys[df_phys["Signal"] == signal1]["Physical Value"].rename(signal1)
        s2 = df_phys[df_phys["Signal"] == signal2]["Physical Value"].rename(signal2)
        df_new_sig = pd.merge_ordered(s1, s2, on="TimeStamp", fill_method="ffill",).set_index("TimeStamp")
        df_new_sig = df_new_sig.apply(lambda x: function(x[0], x[1]), axis=1).dropna().rename("Physical Value").to_frame()
        df_new_sig["Signal"] = new_signal
        df_phys = df_phys.append(df_new_sig)
    except:
        print(f"Warning: Custom signal {new_signal} not createdn")
    return df_phys
동작 방식 살펴보기

앞에서 살펴본 데이터프레임을 축소하여 (7, 4)의 데이터프레임을 생성합니다. 

“TimeStamp” Column을 Index로 변경합니다.

d = {'TimeStamp': ["2020-01-13","2020-01-14","2020-01-15","2020-01-16","2020-01-17","2020-01-18","2020-01-19"], 'Signal': [1, 2, 1, 2, 1, 2, 7], 'Raw Value': [11, 12, 13, 14, 15, 16, 17], 'Physical Value': range(7)}
df = pd.DataFrame(data=d)
df = df.set_index("TimeStamp")

“Signal” Column에서 값이 1인 row만 찾을 때는 아래의 명령어를 사용합니다.

df[df["Signal"] == 1]

“Signal” Column에서 값이 1인 row 데이터 중 “Physical Value” Column만 나타냅니다.

df[df["Signal"] == 1]["Physical Value"]

Name이 Physical Value로 나와있습니다.

“rename” Method를 통해 Name을 바꿀 수 있습니다.

df1 = df[df["Signal"] == 1]["Physical Value"].rename("a")

같은 방식으로 “Signal” Column의 값이 2인 “Physical Value” Column의 Name을 b로 변경합니다.

df2 = df[df["Signal"] == 2]["Physical Value"].rename("b")

df1과 df2를 합쳐봅니다.

df_new_sig = pd.merge_ordered(df1, df2, on="TimeStamp", fill_method="ffill",).set_index("TimeStamp")

df1, df2는 TimeStamp가 겹치지 않는 개별의 Series입니다. Merge하면서 비어있는 TimeStamp값은 그 이전 TimeStamp값을 참조하여 값이 들어가있습니다. (fill_method=”ffill”) 

TimeStamp 인덱스 기준으로 합쳐졌습니다.

이제 함수를 적용합니다.

x[0]은 a column, x[1]은 b column입니다.

df_new_sig.apply(lambda x: ratio(x[0], x[1]), axis=1)

다시 한번 ratio 함수를 가져왔습니다.

2020-01-13, 2020-01-14는 0이라서 np.nan을

나머지 TimeStamp는 x[1] / x[0] 값을 반환합니다.

def ratio(s1, s2):
    import numpy as np
    return s2 / s1 if s1 else np.nan

결과를 볼까요?

x[0]가 0이었던 TimStamp는 NaN, 나머지는 실수를 반환하였습니다.

위의 테이블에서 NaN값을 제거하는 “dropna” Method와 이름을 변경하는 “rename” Method를 호출하였습니다.

이름은 “Physical Value”로 변경하였네요.

df_new_sig.apply(lambda x: ratio(x[0], x[1]), axis=1).dropna().rename("Physical Value")

Physical Value Series를 Dataframe으로 반환합니다.

df_new_sig.apply(lambda x: ratio(x[0], x[1]), axis=1).dropna().rename("Physical Value").to_frame()

마지막으로 Physical Value와 나란하게 Signal을 붙여줍니다.

new_signal = "RatioRpmSpeed"
df_new_sig["Signal"] = new_signal

마지막으로 원래 데이터프레임에 새로운 데이터프레임을 추가합니다.

df_phys = df_phys.append(df_new_sig)

다시 돌아와, add_custom_sig 함수로 Custom signal이 추가된 데이터프레임을 보면,

append는 데이터프레임 마지막 행 아래에 이어 추가합니다.

기존의 데이터프레임 아래에 추가된 것을 볼 수 있습니다.

df_phys_all.tail()

restructure_data 함수를 통해 새로운 데이터프레임을 생성합니다.

df_phys_join = restructure_data(df_phys=df_phys_all, res="1S")
print(df_phys_join)

함수 이름과 같이 전체 데이터를 정리하여 Signal 데이터만 새로운 데이터프레임으로 생성합니다.

새로운 데이터프레임을 CSV 파일로 변환합니다.

df_phys_join.to_csv("output_joined.csv")

생성한 CSV파일을 갖고 분석 Part2를 진행합니다.

다음 글 보기

이전 글 보기