본문 바로가기
CODE/Python

[Python] 음향에서 TN,TP,FN,FP 사용해보기 (Accuracy, Precision, Recall, F1-score, Fall-out) -2

by Nuridal_class 2024. 11. 6.
728x90
728x90
 

[Python] 음향에서 TN,TP,FN,FP 사용해보기 (Accuracy, Precision, Recall, F1-score, Fall-out) -1

안녕하세요. 오랜만에 포스팅을 남겨봅니다. 다른 프젝을 하느라 너무 바쁜 해를 보내고 있어서 이제 시간이 나네요!여러 프젝이 있었지만 AI 관련된 프젝을 하느라 알게된 개념과 Python에서 적

nuridal-class.tistory.com

이번에는 위에 글에 이어서 모델평가를 코드로 어떻게 썼는지 같이 보겠습니다.

 

CODE  -1 ( FP, FN 구하기)

시간차이를 통해서 FP와 FN을 구합니다.
import csv
import pandas as pd
import numpy as np
filename = '파일이름'
loadname = 'csv 파일이 있는 경로'
savename = '파일 저장할 경로'

compare_file_path = f'... 1번에서 도출한 csv파일을 입력'

# CSV 파일 읽기
csv_data = pd.read_csv(compare_file_path)

# real_event에 따라 그룹화
grouped_data = csv_data.groupby('real_event').agg({
    'real_start_time': lambda x: list(x),
    'real_end_time': lambda x: list(x),
    'ai_event_time_start': lambda x: [eval(i) for i in x if isinstance(i, str) and i.startswith('[')],
    'ai_event_time_end': lambda x: [eval(i) for i in x if isinstance(i, str) and i.startswith('[')],
    'ai_event': lambda x: [eval(i) for i in x if isinstance(i, str) and i.startswith('[')],
    'total_length': 'last'  # total_length는 마지막 값을 사용
}).reset_index()

# 'wav_length'를 맨 아래로 이동시키기 위해 정렬
grouped_data['is_wav_length'] = grouped_data['real_event'] == 'wav_length'
grouped_data.sort_values(by=['is_wav_length'], ascending=True, inplace=True)
grouped_data.drop(columns=['is_wav_length'], inplace=True)  # 임시 열 제거

# wav_length의 total_length 가져오기
total_length = grouped_data.loc[grouped_data['real_event'] == 'wav_length', 'total_length'].values[0]

def reconstruct_ai_times(real_start_times, real_end_times, ai_start_times, ai_end_times):
    combined_ai_start_times = []  # 초기화
    combined_ai_end_times = []  # 초기화

    # AI 시작 및 종료 시간을 모두 원본 리스트로 초기화
    ai_start_times_full = ai_start_times[:]  # 원본 리스트 복사
    ai_end_times_full = ai_end_times[:]  # 원본 리스트 복사

    for start, end in zip(real_start_times, real_end_times):
        # AI 이벤트가 real_start와 real_end 사이에 포함되는지 확인
        included_indices = [j for j, (ai_start, ai_end) in enumerate(zip(ai_start_times, ai_end_times)) if ai_start >= start and ai_end <= end]

        if included_indices:
            # 포함된 AI 이벤트가 있을 경우, 하나로 묶기
            combined_start = min(ai_start_times[idx] for idx in included_indices)
            combined_end = max(ai_end_times[idx] for idx in included_indices)
            print(f"Combined AI event for interval ({start}, {end}): ({combined_start}, {combined_end})")
            
            # AI 시작 및 종료 시간 사이의 값을 0.0으로 대체
            for i in range(len(ai_start_times)):
                if ai_start_times[i] > combined_start and ai_start_times[i] < combined_end:
                    ai_start_times_full[i] = 0.0  # AI 시작 시간이 구간에 포함되면 0.0으로 대체
                if ai_end_times[i] > combined_start and ai_end_times[i] < combined_end:
                    ai_end_times_full[i] = 0.0  # AI 종료 시간이 구간에 포함되면 0.0으로 대체

            # combined_ai_start_times와 combined_ai_end_times에 추가
            combined_ai_start_times.append(combined_start)
            combined_ai_end_times.append(combined_end)
        else:
            # 포함된 AI 이벤트가 없을 경우, re_start와 re_end의 길이가 ai_start와 ai_end의 길이와 다를 때만 0.0 추가
            if len(re_start_times) != len(ai_start_times):
                combined_ai_start_times.append(0.0)
                combined_ai_end_times.append(0.0)
            else :
                combined_ai_start_times = ai_start_times[:]
                combined_ai_end_times = ai_end_times[:] 
    return ai_start_times_full, ai_end_times_full, combined_ai_start_times, combined_ai_end_times

def calculate_fn_fp(row):
    re_start_times = row['real_start_time']
    re_end_times = row['real_end_time']
    ai_start_times = [start for sublist in row['ai_event_time_start'] for start in sublist]  # AI 시작 시간
    ai_end_times = [end for sublist in row['ai_event_time_end'] for end in sublist]  # AI 종료 시간
    real_event = row['real_event']
    fn_values = []
    fp_values = []
    # last_end_time = 0.0  # 초기 시작 시간

    print(f"Calculating FN for event: {real_event}")
    print(f"re_start_times: {real_start_times}")
    print(f"re_end_times: {real_end_times}")

    # AI 시작 및 종료 시간 재구성
    ai_start_times_full, ai_end_times_full, combined_ai_start_times, combined_ai_end_times = reconstruct_ai_times(real_start_times, real_end_times, ai_start_times, ai_end_times)
    # ai_start_times_full, ai_end_times_full = reconstruct_ai_times(real_start_times, real_end_times, ai_start_times, ai_end_times)

    # AI 시작과 종료 시간이 같을 경우 하나로 합치기
    new_ai_start_times = []
    new_ai_end_times = []

    for start, end in zip(ai_start_times_full, ai_end_times_full):
        if start == 0.0 and end == 0.0:
            new_ai_start_times.append(start)
            new_ai_end_times.append(end)
        elif start != 0.0:  # 0.0이 아닐 경우
            if new_ai_end_times and new_ai_end_times[-1] == start:
                new_ai_end_times[-1] = end  # 종료 시간을 업데이트
            else:
                new_ai_start_times.append(start)
                new_ai_end_times.append(end)

    print(f"new AI Start Times: {new_ai_start_times}")
    print(f"new AI End Times: {new_ai_end_times}")
    
    print(f"Processed AI Start Times: {combined_ai_start_times}")
    print(f"Processed AI End Times: {combined_ai_end_times}")
    for i, (start, end) in enumerate(zip(re_start_times, re_end_times)):
        
        # AI 이벤트의 시작 및 종료 시간 비교
        if len(new_ai_start_times) != len(re_start_times):
            ai_start = combined_ai_start_times[i] if i < len(combined_ai_start_times) else None
            ai_end = combined_ai_end_times[i] if i < len(combined_ai_end_times) else None
        else :
            ai_start = new_ai_start_times[i] if i < len(new_ai_start_times) else None
            ai_end = new_ai_end_times[i] if i < len(new_ai_end_times) else None
        
        # AI 시작이 re_start보다 빠른 경우 FP 처리
        if ai_start is not None and ai_start < start:
            if ai_start==0.0:
                fn_value = end - start
                fn_values.append(fn_value)
                print(f"FN end(0.0) for interval ({start}, {end}): {fn_value}")
            else:
                fp_value = start - ai_start  # AI가 감지한 시작이 현재 시작보다 빠르면 FP
                fp_values.append(fp_value)
                print(f"FP start for interval ({ai_start}, {start}): {fp_value}")

        # AI 시작이 real_start보다 느린 경우 FN 처리
        if ai_start is not None and ai_start >= start:
            fn_value = ai_start - start  # AI 시작이 현재 시작보다 늦으면 FN
            fn_values.append(fn_value)
            print(f"FN stat for interval ({start}, {ai_start}): {fn_value}")

        # AI 종료가 real_end보다 늦은 경우 FP 처리
        if ai_end is not None and ai_end > end:
            fp_value = ai_end - end  # AI가 감지한 종료가 현재 종료보다 늦으면 FP
            fp_values.append(fp_value)
            print(f"FP end for interval ({end}, {ai_end}): {fp_value}")

        # AI 종료가 real_end보다 빠른 경우 FN 처리
        if ai_end is not None and ai_end <= end:
            if ai_end == 0.0:
                continue
            else: 
                fn_value = end - ai_end  # AI 종료가 현재 끝보다 빠르면 FN
                fn_values.append(fn_value)
                print(f"FN end for interval ({ai_end}, {ai_end}): {fn_value}")
        
        # # AI 종료가 0.0인 경우
        # if ai_end is not None and ai_end == 0.0:
        #     fn_value = end - start
        #     fn_values.append(fn_value)
        #     print(f"FN end(0.0) for interval ({start}, {end}): {fn_value}")
        if all(np.isnan(x) for x in re_end_times):
            fp_value = ai_end - ai_start  # AI 종료가 현재 끝보다 빠르면 FN
            fp_values.append(fp_value)
            print(f"FN None for interval ({ai_start}, {ai_end}): {fp_value}")

    print(f"FN values: {fn_values}")
    print(f"FP values: {fp_values}")

    total_fn = sum(fn_values)  # FN의 합계
    total_fp = sum(fp_values)  # FP의 합계
    
    print(f"Total FN for event {real_event}: {total_fn}, Total FP: {total_fp}\n")
    
    return total_fn, total_fp  # FN과 FP 반환

    
    # 마지막 구간의 FN 추가
    if last_end_time < total_length:  # total_length를 사용하여 마지막 구간의 길이를 추가
        final_fn = total_length - last_end_time  # 전체 길이에서 마지막 끝 시간을 빼서 FN 계산
        fn_values.append(final_fn)
        print(f"Final FN for last interval: {final_fn}")

    print(f"FN values: {fn_values}")
    print(f"FP values: {fp_values}")

    total_fn = sum(fn_values)  # FN의 합계
    total_fp = sum(fp_values)  # FP의 합계
    
    print(f"Total FN for event {real_event}: {total_fn}, Total FP: {total_fp}\n")
    
    return total_fn, total_fp  # FN과 FP 반환

# FN FP 컬럼 추가
grouped_data[['FN', 'FP']] = grouped_data.apply(calculate_fn_fp, axis=1, result_type='expand')

# 결과 출력
print(grouped_data)

# 결과를 CSV 파일로 저장 (선택 사항)
grouped_data.to_csv(f'/저장할경로.../{savename}/{filename}_fpfn.csv', index=False)

 

 

CODE  -2 ( TP, TN 구하기)

이 역시 시간차이를 통해서 TP, TN을 구합니다.
import pandas as pd
import numpy as np
import ast

filename = '파일이름'

# CSV 파일 경로
csv_file_path = f'..위에서 도출한 csv 파일 경로 입력'

# 데이터 읽기
data = pd.read_csv(csv_file_path)

# wav_length 행에서 total_length 가져오는 함수
def get_total_length(row, data):
    if row['real_event'] == 'wav_length':  # wav_length 행인 경우
        return row['total_length']  # total_length 값 반환
    else:
        # wav_length 행을 찾아서 total_length 가져오기
        wav_length_row = data[data['real_event'] == 'wav_length']
        if not wav_length_row.empty:
            return wav_length_row['total_length'].values[0]  # 첫 번째 값을 반환
        return 0.0  # wav_length가 없으면 0.0 반환

# TP와 TN 계산 함수
def calculate_tp_tn(row):
    real_event_name = row['real_event']
    total_length = get_total_length(row, data)
    print(f"Real Event: {real_event_name}, Total Length: {total_length}")

    try:
        ai_event_time_start = ast.literal_eval(row['ai_event_time_start'])
        ai_event_time_end = ast.literal_eval(row['ai_event_time_end'])
    except (ValueError, SyntaxError) as e:
        print(f"Error parsing AI event times: {row['ai_event_time_start']}, {row['ai_event_time_end']}")
        return 0, 0  # 오류가 발생하면 (0, 0) 반환

    re_start_times = []
    re_end_times = []
    
    if pd.notna(row['re_start_time']):
        try:
            re_start_times = ast.literal_eval(row['re_start_time'])
        except (ValueError, SyntaxError):
            print(f"Error parsing re_start_time: {row['re_start_time']}")
    
    if pd.notna(row['re_end_time']):
        try:
            re_end_times = ast.literal_eval(row['re_end_time'])
        except (ValueError, SyntaxError):
            print(f"Error parsing re_end_time: {row['re_end_time']}")

    ai_start_times = [float(start[0]) for start in ai_event_time_start if start]
    ai_end_times = [float(end[0]) for end in ai_event_time_end if end]
    
    tp_value = 0
    for start, end in zip(re_start_times, re_end_times):
        if not np.isnan(start) and not np.isnan(end):
            tp_value += end - start

    tn_values = []

    if real_event_name is None:
        print('real_event가 None일 때 TN 계산 시작')
        previous_end = 0.0
        for ai_start, ai_end in zip(ai_start_times, ai_end_times):
            if not np.isnan(ai_start) and not np.isnan(ai_end):
                if previous_end < ai_start:
                    tn_value = ai_start - previous_end
                    tn_values.append(tn_value)
                    print(f"  TN from {previous_end} to {ai_start}: {tn_value}")
                previous_end = ai_end

        if previous_end < total_length:
            tn_value = total_length - previous_end
            tn_values.append(tn_value)
            print(f"  TN from {previous_end} to {total_length}: {tn_value}")

    else:
        previous_end = 0.0
        for re_start, re_end in zip(re_start_times, re_end_times):
            if np.isnan(re_start) or np.isnan(re_end):
                continue
            
            relevant_ai_starts = [start for start in ai_start_times if re_start < start < re_end]
            relevant_ai_ends = [end for end in ai_end_times if re_start < end < re_end]

            current_start = re_start
            if relevant_ai_starts:
                current_start = min(current_start, min(relevant_ai_starts))
            
            current_end = re_end
            if relevant_ai_ends:
                current_end = max(current_end, max(relevant_ai_ends))
            
            if previous_end < current_start:
                tn_value = current_start - previous_end
                tn_values.append(tn_value)
                print(f"  TN from {previous_end} to {current_start}: {tn_value}")

            previous_end = current_end

        if previous_end < total_length:
            tn_value = total_length - previous_end
            tn_values.append(tn_value)
            print(f"  TN from {previous_end} to {total_length}: {tn_value}")

    tn_total = sum(tn_values)
    print(f"Total TN for {real_event_name}: {tn_total}")

    return tp_value, tn_total  # TP와 TN을 반환합니다.

 

Accuracy, Precision, Recall, Fall-out, F1-Score 구하기

마지막으로 앞에 포스팅했던 공식들을 그대로 차용해 값을 구합니다.
... 위에 코드와 이어집니다.

data['TP'], data['TN'] = zip(*data.apply(calculate_tp_tn, axis=1))

# Precision과 Accuracy, Recall, Fall-out, F1-score 계산
data['Precision'] = data.apply(lambda x: x['TP'] / (x['TP'] + x['FP']) if (x['TP'] + x['FP']) > 0 else 0, axis=1)
data['Accuracy'] = data.apply(lambda x: (x['TP'] + x['TN']) / (x['TP'] + x['TN'] + x['FP'] + x['FN']) if (x['TP'] + x['TN'] + x['FP'] + x['TN']) > 0 else 0, axis=1)
data['Recall'] = data.apply(lambda x: x['TP'] / (x['TP'] + x['FN']) if (x['TP'] + x['FN']) > 0 else 0, axis=1)
data['Fall-out'] = data.apply(lambda x: x['FP'] / (x['TN'] + x['FP']) if (x['TN'] + x['FP']) > 0 else 0, axis=1)
data['F1-score'] = data.apply(lambda x: (2 * x['Precision'] * x['Recall']) / (x['Precision'] + x['Recall']) if (x['Precision'] + x['Recall']) > 0 else 0, axis=1)

# wav_length 제외
final_data = data[data['real_event'] != 'wav_length']

# 결과를 CSV 파일로 저장
final_data.to_csv(f'..파일 저장 경로{filename}_merge.csv', index=False)

# 결과 출력
print(final_data[['real_event', 'TP', 'FP', 'FN', 'TN', 'Precision', 'Accuracy', 'Recall', 'F1-score']])

이렇게 모델 평가하는 방법을 음향에서는 어떻게 적용해볼까 했던

과제를 이렇게 마쳤습니다.

조금이나마 도움이 되셨길 바라겠습니다.

그럼 코딩이 쉬워지는 그날까지!!

728x90
300x250