-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathdata
113 lines (91 loc) · 3.9 KB
/
data
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
import os
import cv2
import mediapipe as mp
import numpy as np
from logging import getLogger, StreamHandler, FileHandler, Formatter
# 로거 설정
logger = getLogger(__name__)
logger.setLevel("DEBUG")
# 핸들러가 중복되지 않도록 설정
if not logger.handlers:
stream_handler = StreamHandler()
file_handler = FileHandler("preprocess_data.log")
formatter = Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
stream_handler.setFormatter(formatter)
file_handler.setFormatter(formatter)
logger.addHandler(stream_handler)
logger.addHandler(file_handler)
# 비디오 전처리 함수
def preprocess_video(video_path, output_dir, resize_dim=(640, 480), normalize=True, gray_scale=False, frame_rate=30):
"""
비디오 파일을 전처리하고 프레임을 저장합니다.
Args:
video_path (str): 입력 비디오 파일 경로.
output_dir (str): 처리된 프레임이 저장될 디렉토리.
resize_dim (tuple): 프레임 크기 (가로, 세로).
normalize (bool): True일 경우, 프레임 픽셀값을 0~1로 정규화.
gray_scale (bool): True일 경우, 프레임을 흑백으로 변환.
frame_rate (int): 초당 프레임 수 (FPS).
"""
# 비디오 파일 확인
if not os.path.exists(video_path):
logger.error(f"비디오 파일을 찾을 수 없습니다: {video_path}")
return
# 출력 디렉토리 생성
os.makedirs(output_dir, exist_ok=True)
# 비디오 캡처 객체 생성
cap = cv2.VideoCapture(video_path)
if not cap.isOpened():
logger.error(f"비디오 파일을 열 수 없습니다: {video_path}")
return
frame_idx = 0
fps = cap.get(cv2.CAP_PROP_FPS)
if fps == 0: # fps가 유효하지 않다면 기본값을 설정
fps = 30
frame_interval = int(fps // frame_rate)
while cap.isOpened():
ret, frame = cap.read()
if not ret:
break
# 설정된 간격에 따라 프레임 처리
if frame_idx % frame_interval == 0:
# 이미지 크기 조정
frame = cv2.resize(frame, resize_dim)
# 흑백 변환
if gray_scale:
frame = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
# 정규화 (float32 → uint8로 변환하여 저장)
if normalize:
frame = (frame * 255).astype(np.uint8)
# 프레임 저장
output_filename = os.path.join(output_dir, f"frame_{frame_idx}.jpg")
cv2.imwrite(output_filename, frame)
logger.debug(f"프레임 {frame_idx} 저장 완료: {output_filename}")
frame_idx += 1
cap.release()
cv2.destroyAllWindows()
logger.info(f"프레임 저장이 {output_dir}에 완료되었습니다.")
# MediaPipe를 사용한 추가 프레임 분석 함수
def analyze_frame_with_mediapipe(video_path):
if not os.path.exists(video_path):
logger.error(f"비디오 파일을 찾을 수 없습니다: {video_path}")
return
cap = cv2.VideoCapture(video_path)
mp_holistic = mp.solutions.holistic
with mp_holistic.Holistic() as holistic:
while cap.isOpened():
ret, frame = cap.read()
if not ret:
break
# MediaPipe 처리를 수행하여 포즈 분석
results = holistic.process(cv2.cvtColor(frame, cv2.COLOR_BGR2RGB))
logger.debug("MediaPipe 결과 처리 완료")
# 필요 시 추가 처리나 특징 추출 수행 가능
cap.release()
cv2.destroyAllWindows()
logger.info("MediaPipe 기반 분석 완료.")
if __name__ == "__main__":
video_path = "data/input_videos/sample_video.mp4"
output_dir = "data/processed/frames"
preprocess_video(video_path, output_dir)
analyze_frame_with_mediapipe(video_path)