728x90
728x90
이번에는 위에 글에 이어서 모델평가를 코드로 어떻게 썼는지 같이 보겠습니다.
CODE -1 ( FP, FN 구하기)
시간차이를 통해서 FP와 FN을 구합니다.
import csv
import pandas as pd
import numpy as np
filename = '파일이름'
loadname = 'csv 파일이 있는 경로'
savename = '파일 저장할 경로'
compare_file_path = f'... 1번에서 도출한 csv파일을 입력'
# CSV 파일 읽기
csv_data = pd.read_csv(compare_file_path)
# real_event에 따라 그룹화
grouped_data = csv_data.groupby('real_event').agg({
'real_start_time': lambda x: list(x),
'real_end_time': lambda x: list(x),
'ai_event_time_start': lambda x: [eval(i) for i in x if isinstance(i, str) and i.startswith('[')],
'ai_event_time_end': lambda x: [eval(i) for i in x if isinstance(i, str) and i.startswith('[')],
'ai_event': lambda x: [eval(i) for i in x if isinstance(i, str) and i.startswith('[')],
'total_length': 'last' # total_length는 마지막 값을 사용
}).reset_index()
# 'wav_length'를 맨 아래로 이동시키기 위해 정렬
grouped_data['is_wav_length'] = grouped_data['real_event'] == 'wav_length'
grouped_data.sort_values(by=['is_wav_length'], ascending=True, inplace=True)
grouped_data.drop(columns=['is_wav_length'], inplace=True) # 임시 열 제거
# wav_length의 total_length 가져오기
total_length = grouped_data.loc[grouped_data['real_event'] == 'wav_length', 'total_length'].values[0]
def reconstruct_ai_times(real_start_times, real_end_times, ai_start_times, ai_end_times):
combined_ai_start_times = [] # 초기화
combined_ai_end_times = [] # 초기화
# AI 시작 및 종료 시간을 모두 원본 리스트로 초기화
ai_start_times_full = ai_start_times[:] # 원본 리스트 복사
ai_end_times_full = ai_end_times[:] # 원본 리스트 복사
for start, end in zip(real_start_times, real_end_times):
# AI 이벤트가 real_start와 real_end 사이에 포함되는지 확인
included_indices = [j for j, (ai_start, ai_end) in enumerate(zip(ai_start_times, ai_end_times)) if ai_start >= start and ai_end <= end]
if included_indices:
# 포함된 AI 이벤트가 있을 경우, 하나로 묶기
combined_start = min(ai_start_times[idx] for idx in included_indices)
combined_end = max(ai_end_times[idx] for idx in included_indices)
print(f"Combined AI event for interval ({start}, {end}): ({combined_start}, {combined_end})")
# AI 시작 및 종료 시간 사이의 값을 0.0으로 대체
for i in range(len(ai_start_times)):
if ai_start_times[i] > combined_start and ai_start_times[i] < combined_end:
ai_start_times_full[i] = 0.0 # AI 시작 시간이 구간에 포함되면 0.0으로 대체
if ai_end_times[i] > combined_start and ai_end_times[i] < combined_end:
ai_end_times_full[i] = 0.0 # AI 종료 시간이 구간에 포함되면 0.0으로 대체
# combined_ai_start_times와 combined_ai_end_times에 추가
combined_ai_start_times.append(combined_start)
combined_ai_end_times.append(combined_end)
else:
# 포함된 AI 이벤트가 없을 경우, re_start와 re_end의 길이가 ai_start와 ai_end의 길이와 다를 때만 0.0 추가
if len(re_start_times) != len(ai_start_times):
combined_ai_start_times.append(0.0)
combined_ai_end_times.append(0.0)
else :
combined_ai_start_times = ai_start_times[:]
combined_ai_end_times = ai_end_times[:]
return ai_start_times_full, ai_end_times_full, combined_ai_start_times, combined_ai_end_times
def calculate_fn_fp(row):
re_start_times = row['real_start_time']
re_end_times = row['real_end_time']
ai_start_times = [start for sublist in row['ai_event_time_start'] for start in sublist] # AI 시작 시간
ai_end_times = [end for sublist in row['ai_event_time_end'] for end in sublist] # AI 종료 시간
real_event = row['real_event']
fn_values = []
fp_values = []
# last_end_time = 0.0 # 초기 시작 시간
print(f"Calculating FN for event: {real_event}")
print(f"re_start_times: {real_start_times}")
print(f"re_end_times: {real_end_times}")
# AI 시작 및 종료 시간 재구성
ai_start_times_full, ai_end_times_full, combined_ai_start_times, combined_ai_end_times = reconstruct_ai_times(real_start_times, real_end_times, ai_start_times, ai_end_times)
# ai_start_times_full, ai_end_times_full = reconstruct_ai_times(real_start_times, real_end_times, ai_start_times, ai_end_times)
# AI 시작과 종료 시간이 같을 경우 하나로 합치기
new_ai_start_times = []
new_ai_end_times = []
for start, end in zip(ai_start_times_full, ai_end_times_full):
if start == 0.0 and end == 0.0:
new_ai_start_times.append(start)
new_ai_end_times.append(end)
elif start != 0.0: # 0.0이 아닐 경우
if new_ai_end_times and new_ai_end_times[-1] == start:
new_ai_end_times[-1] = end # 종료 시간을 업데이트
else:
new_ai_start_times.append(start)
new_ai_end_times.append(end)
print(f"new AI Start Times: {new_ai_start_times}")
print(f"new AI End Times: {new_ai_end_times}")
print(f"Processed AI Start Times: {combined_ai_start_times}")
print(f"Processed AI End Times: {combined_ai_end_times}")
for i, (start, end) in enumerate(zip(re_start_times, re_end_times)):
# AI 이벤트의 시작 및 종료 시간 비교
if len(new_ai_start_times) != len(re_start_times):
ai_start = combined_ai_start_times[i] if i < len(combined_ai_start_times) else None
ai_end = combined_ai_end_times[i] if i < len(combined_ai_end_times) else None
else :
ai_start = new_ai_start_times[i] if i < len(new_ai_start_times) else None
ai_end = new_ai_end_times[i] if i < len(new_ai_end_times) else None
# AI 시작이 re_start보다 빠른 경우 FP 처리
if ai_start is not None and ai_start < start:
if ai_start==0.0:
fn_value = end - start
fn_values.append(fn_value)
print(f"FN end(0.0) for interval ({start}, {end}): {fn_value}")
else:
fp_value = start - ai_start # AI가 감지한 시작이 현재 시작보다 빠르면 FP
fp_values.append(fp_value)
print(f"FP start for interval ({ai_start}, {start}): {fp_value}")
# AI 시작이 real_start보다 느린 경우 FN 처리
if ai_start is not None and ai_start >= start:
fn_value = ai_start - start # AI 시작이 현재 시작보다 늦으면 FN
fn_values.append(fn_value)
print(f"FN stat for interval ({start}, {ai_start}): {fn_value}")
# AI 종료가 real_end보다 늦은 경우 FP 처리
if ai_end is not None and ai_end > end:
fp_value = ai_end - end # AI가 감지한 종료가 현재 종료보다 늦으면 FP
fp_values.append(fp_value)
print(f"FP end for interval ({end}, {ai_end}): {fp_value}")
# AI 종료가 real_end보다 빠른 경우 FN 처리
if ai_end is not None and ai_end <= end:
if ai_end == 0.0:
continue
else:
fn_value = end - ai_end # AI 종료가 현재 끝보다 빠르면 FN
fn_values.append(fn_value)
print(f"FN end for interval ({ai_end}, {ai_end}): {fn_value}")
# # AI 종료가 0.0인 경우
# if ai_end is not None and ai_end == 0.0:
# fn_value = end - start
# fn_values.append(fn_value)
# print(f"FN end(0.0) for interval ({start}, {end}): {fn_value}")
if all(np.isnan(x) for x in re_end_times):
fp_value = ai_end - ai_start # AI 종료가 현재 끝보다 빠르면 FN
fp_values.append(fp_value)
print(f"FN None for interval ({ai_start}, {ai_end}): {fp_value}")
print(f"FN values: {fn_values}")
print(f"FP values: {fp_values}")
total_fn = sum(fn_values) # FN의 합계
total_fp = sum(fp_values) # FP의 합계
print(f"Total FN for event {real_event}: {total_fn}, Total FP: {total_fp}\n")
return total_fn, total_fp # FN과 FP 반환
# 마지막 구간의 FN 추가
if last_end_time < total_length: # total_length를 사용하여 마지막 구간의 길이를 추가
final_fn = total_length - last_end_time # 전체 길이에서 마지막 끝 시간을 빼서 FN 계산
fn_values.append(final_fn)
print(f"Final FN for last interval: {final_fn}")
print(f"FN values: {fn_values}")
print(f"FP values: {fp_values}")
total_fn = sum(fn_values) # FN의 합계
total_fp = sum(fp_values) # FP의 합계
print(f"Total FN for event {real_event}: {total_fn}, Total FP: {total_fp}\n")
return total_fn, total_fp # FN과 FP 반환
# FN FP 컬럼 추가
grouped_data[['FN', 'FP']] = grouped_data.apply(calculate_fn_fp, axis=1, result_type='expand')
# 결과 출력
print(grouped_data)
# 결과를 CSV 파일로 저장 (선택 사항)
grouped_data.to_csv(f'/저장할경로.../{savename}/{filename}_fpfn.csv', index=False)
CODE -2 ( TP, TN 구하기)
이 역시 시간차이를 통해서 TP, TN을 구합니다.
import pandas as pd
import numpy as np
import ast
filename = '파일이름'
# CSV 파일 경로
csv_file_path = f'..위에서 도출한 csv 파일 경로 입력'
# 데이터 읽기
data = pd.read_csv(csv_file_path)
# wav_length 행에서 total_length 가져오는 함수
def get_total_length(row, data):
if row['real_event'] == 'wav_length': # wav_length 행인 경우
return row['total_length'] # total_length 값 반환
else:
# wav_length 행을 찾아서 total_length 가져오기
wav_length_row = data[data['real_event'] == 'wav_length']
if not wav_length_row.empty:
return wav_length_row['total_length'].values[0] # 첫 번째 값을 반환
return 0.0 # wav_length가 없으면 0.0 반환
# TP와 TN 계산 함수
def calculate_tp_tn(row):
real_event_name = row['real_event']
total_length = get_total_length(row, data)
print(f"Real Event: {real_event_name}, Total Length: {total_length}")
try:
ai_event_time_start = ast.literal_eval(row['ai_event_time_start'])
ai_event_time_end = ast.literal_eval(row['ai_event_time_end'])
except (ValueError, SyntaxError) as e:
print(f"Error parsing AI event times: {row['ai_event_time_start']}, {row['ai_event_time_end']}")
return 0, 0 # 오류가 발생하면 (0, 0) 반환
re_start_times = []
re_end_times = []
if pd.notna(row['re_start_time']):
try:
re_start_times = ast.literal_eval(row['re_start_time'])
except (ValueError, SyntaxError):
print(f"Error parsing re_start_time: {row['re_start_time']}")
if pd.notna(row['re_end_time']):
try:
re_end_times = ast.literal_eval(row['re_end_time'])
except (ValueError, SyntaxError):
print(f"Error parsing re_end_time: {row['re_end_time']}")
ai_start_times = [float(start[0]) for start in ai_event_time_start if start]
ai_end_times = [float(end[0]) for end in ai_event_time_end if end]
tp_value = 0
for start, end in zip(re_start_times, re_end_times):
if not np.isnan(start) and not np.isnan(end):
tp_value += end - start
tn_values = []
if real_event_name is None:
print('real_event가 None일 때 TN 계산 시작')
previous_end = 0.0
for ai_start, ai_end in zip(ai_start_times, ai_end_times):
if not np.isnan(ai_start) and not np.isnan(ai_end):
if previous_end < ai_start:
tn_value = ai_start - previous_end
tn_values.append(tn_value)
print(f" TN from {previous_end} to {ai_start}: {tn_value}")
previous_end = ai_end
if previous_end < total_length:
tn_value = total_length - previous_end
tn_values.append(tn_value)
print(f" TN from {previous_end} to {total_length}: {tn_value}")
else:
previous_end = 0.0
for re_start, re_end in zip(re_start_times, re_end_times):
if np.isnan(re_start) or np.isnan(re_end):
continue
relevant_ai_starts = [start for start in ai_start_times if re_start < start < re_end]
relevant_ai_ends = [end for end in ai_end_times if re_start < end < re_end]
current_start = re_start
if relevant_ai_starts:
current_start = min(current_start, min(relevant_ai_starts))
current_end = re_end
if relevant_ai_ends:
current_end = max(current_end, max(relevant_ai_ends))
if previous_end < current_start:
tn_value = current_start - previous_end
tn_values.append(tn_value)
print(f" TN from {previous_end} to {current_start}: {tn_value}")
previous_end = current_end
if previous_end < total_length:
tn_value = total_length - previous_end
tn_values.append(tn_value)
print(f" TN from {previous_end} to {total_length}: {tn_value}")
tn_total = sum(tn_values)
print(f"Total TN for {real_event_name}: {tn_total}")
return tp_value, tn_total # TP와 TN을 반환합니다.
Accuracy, Precision, Recall, Fall-out, F1-Score 구하기
마지막으로 앞에 포스팅했던 공식들을 그대로 차용해 값을 구합니다.
... 위에 코드와 이어집니다.
data['TP'], data['TN'] = zip(*data.apply(calculate_tp_tn, axis=1))
# Precision과 Accuracy, Recall, Fall-out, F1-score 계산
data['Precision'] = data.apply(lambda x: x['TP'] / (x['TP'] + x['FP']) if (x['TP'] + x['FP']) > 0 else 0, axis=1)
data['Accuracy'] = data.apply(lambda x: (x['TP'] + x['TN']) / (x['TP'] + x['TN'] + x['FP'] + x['FN']) if (x['TP'] + x['TN'] + x['FP'] + x['TN']) > 0 else 0, axis=1)
data['Recall'] = data.apply(lambda x: x['TP'] / (x['TP'] + x['FN']) if (x['TP'] + x['FN']) > 0 else 0, axis=1)
data['Fall-out'] = data.apply(lambda x: x['FP'] / (x['TN'] + x['FP']) if (x['TN'] + x['FP']) > 0 else 0, axis=1)
data['F1-score'] = data.apply(lambda x: (2 * x['Precision'] * x['Recall']) / (x['Precision'] + x['Recall']) if (x['Precision'] + x['Recall']) > 0 else 0, axis=1)
# wav_length 제외
final_data = data[data['real_event'] != 'wav_length']
# 결과를 CSV 파일로 저장
final_data.to_csv(f'..파일 저장 경로{filename}_merge.csv', index=False)
# 결과 출력
print(final_data[['real_event', 'TP', 'FP', 'FN', 'TN', 'Precision', 'Accuracy', 'Recall', 'F1-score']])
이렇게 모델 평가하는 방법을 음향에서는 어떻게 적용해볼까 했던
과제를 이렇게 마쳤습니다.
조금이나마 도움이 되셨길 바라겠습니다.
그럼 코딩이 쉬워지는 그날까지!!
728x90
300x250
'CODE > Python' 카테고리의 다른 글
[Python] 음향에서 TN,TP,FN,FP 사용해보기 (Accuracy, Precision, Recall, F1-score, Fall-out) -1 (0) | 2024.11.05 |
---|---|
[Python] Postgresql 연동하기 (psycopg2 이용) (1) | 2024.01.04 |