Skip to content
Open

V1.4.6 #1313

Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
162 changes: 127 additions & 35 deletions airtest/aircv/screen_recorder.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,36 +46,58 @@ class FfmpegVidWriter:
"""
Generate a video using FFMPEG.
"""
def __init__(self, outfile, width, height, fps=10, orientation=0, timetag=True):
def __init__(self, outfile, width, height, fps=10, orientation=0, timetag=True, max_size=1280):
self.fps = fps
self.original_width = width
self.original_height = height

# 三种横竖屏录屏模式 1 竖屏 2 横屏 0 方形居中
# 三种横竖屏录屏模式 1 竖屏 2 横屏 0 保持原始宽高比
self.orientation = RECORDER_ORI.get(str(orientation).upper(), orientation)
if self.orientation == 1:
# 竖屏模式:确保高度>=宽度
self.height = max(width, height)
self.width = min(width, height)
elif self.orientation == 2:
# 横屏模式:确保宽度>=高度
self.width = max(width, height)
self.height = min(width, height)
else:
self.width = self.height = max(width, height)
# orientation=0: 自适应模式,支持横竖屏切换
# 使用横竖屏的最大尺寸作为画布,确保无论横竖屏都能完整显示
max_dimension = max(width, height) # 长边
min_dimension = min(width, height) # 短边

# 使用正方形画布,边长为max(width, height)
# 这样无论横竖屏,都能完整显示,只是会有黑边
self.width = max_dimension
self.height = max_dimension
LOGGING.info(f"Adaptive mode: using square canvas {self.width}x{self.height} to support rotation")

# 满足视频宽高条件
self.height = height = self.height - (self.height % 32) + 32
self.width = width = self.width - (self.width % 32) + 32
self.cache_frame = np.zeros((height, width, 3), dtype=np.uint8)
# 降低编码分辨率(保持宽高比)
max_encode_width = max_size
if self.width > max_encode_width:
scale = max_encode_width / self.width
self.width = max_encode_width
self.height = int(self.height * scale)
LOGGING.info(f"Reduced encoding resolution: {self.width}x{self.height}, scale: {scale:.3f}")

# 满足视频宽高条件(32的倍数)
self.encode_width = self.width - (self.width % 32) + 32
self.encode_height = self.height - (self.height % 32) + 32

# 创建缓存帧
self.cache_frame = np.zeros((self.encode_height, self.encode_width, 3), dtype=np.uint8)

# 添加时间戳
self.timetag = timetag
if self.timetag:
scale = self.height*0.001
scale = self.encode_height * 0.001
self.tag_scale = max(0.5, min(scale, 1.5))
thickness = int(self.height*0.002)
# 指定时间戳的位置和粗细
thickness = int(self.encode_height * 0.002)
self.tag_thickness = max(1, min(thickness+1, 4))
self.tag_pos = (0, int(self.height*0.035))
self.tag_pos = (0, int(self.encode_height * 0.035))

# 生成时区信息(UTC+08:00)
# 生成时区信息
timezone_offset = time.timezone / 3600
timezone_offset_hours = int(abs(timezone_offset))
self.timezone_str = f"(UTC{'+' if timezone_offset <= 0 else '-'}{timezone_offset_hours:02d}:00)"
Expand All @@ -93,33 +115,87 @@ def __init__(self, outfile, width, height, fps=10, orientation=0, timetag=True):
self.process = (
ffmpeg
.input('pipe:', format='rawvideo', pix_fmt='rgb24',
s='{}x{}'.format(width, height), framerate=self.fps)
.output(outfile, pix_fmt='yuv420p', vcodec='libx264', crf=25, threads=1,
preset="veryfast", framerate=self.fps)
.global_args("-loglevel", "error")
s='{}x{}'.format(self.encode_width, self.encode_height), framerate=self.fps)
.output(outfile, pix_fmt='yuv420p', vcodec='libx264', crf=28, threads=0,
preset="ultrafast", framerate=self.fps, tune="zerolatency")
.global_args("-loglevel", "error", "-x264-params",
"keyint=30:min-keyint=30:scenecut=0:ref=1:bframes=0:subme=1:trellis=0:fast-pskip=1:no-mbtree=1:weightb=0:mixed-refs=0:me=dia:merange=4:rc-lookahead=0"
)
.overwrite_output()
.run_async(pipe_stdin=True)
)
self.writer = self.process.stdin

self.write_count = 0
self.avg_write_time = 0

def process_frame(self, frame):
assert len(frame.shape) == 3
frame = frame[..., ::-1]
if self.orientation == 1 and frame.shape[1] > frame.shape[0]:
frame = cv2.resize(frame, (self.width, int(self.width*self.width/self.height)))
elif self.orientation == 2 and frame.shape[1] < frame.shape[0]:
frame = cv2.resize(frame, (int(self.height*self.height/self.width), self.height))
h_st = max(self.cache_frame.shape[0]//2 - frame.shape[0]//2, 0)
w_st = max(self.cache_frame.shape[1]//2 - frame.shape[1]//2, 0)
h_ed = min(h_st+frame.shape[0], self.cache_frame.shape[0])
w_ed = min(w_st+frame.shape[1], self.cache_frame.shape[1])
frame = frame[..., ::-1] # BGR to RGB

# 获取原始帧尺寸
frame_h, frame_w = frame.shape[:2]

# 计算目标尺寸,保持原始宽高比
if self.orientation == 1:
# 竖屏模式
if frame_w > frame_h:
target_w = self.encode_width
target_h = int(target_w * frame_h / frame_w)
frame = cv2.resize(frame, (target_w, target_h))
else:
target_w = self.encode_width
target_h = int(target_w * frame_h / frame_w)
frame = cv2.resize(frame, (target_w, target_h))
elif self.orientation == 2:
# 横屏模式
if frame_h > frame_w:
target_h = self.encode_height
target_w = int(target_h * frame_w / frame_h)
frame = cv2.resize(frame, (target_w, target_h))
else:
target_h = self.encode_height
target_w = int(target_h * frame_w / frame_h)
frame = cv2.resize(frame, (target_w, target_h))
else:
# orientation=0: 自适应模式,支持横竖屏切换
# 计算缩放比例,确保画面完整显示(不被裁剪)
scale_w = self.encode_width / frame_w
scale_h = self.encode_height / frame_h
scale = min(scale_w, scale_h) # 使用较小的缩放比例,确保完整显示

target_w = int(frame_w * scale)
target_h = int(frame_h * scale)

frame = cv2.resize(frame, (target_w, target_h), interpolation=cv2.INTER_LINEAR)

# 居中放置(无论尺寸是否匹配都居中,确保横竖屏切换时画面居中)
frame_h, frame_w = frame.shape[:2]

# 清空画布
self.cache_frame[:] = 0
self.cache_frame[h_st:h_ed, w_st:w_ed, :] = frame[:(h_ed-h_st), :(w_ed-w_st)]

# 计算居中位置
h_st = max((self.encode_height - frame_h) // 2, 0)
w_st = max((self.encode_width - frame_w) // 2, 0)
h_ed = min(h_st + frame_h, self.encode_height)
w_ed = min(w_st + frame_w, self.encode_width)

# 确保不越界
copy_h = min(h_ed - h_st, frame_h)
copy_w = min(w_ed - w_st, frame_w)

# 将缩放后的帧居中放置到画布上
self.cache_frame[h_st:h_st+copy_h, w_st:w_st+copy_w, :] = frame[:copy_h, :copy_w]
result = self.cache_frame.copy()

# 添加时间戳
if self.timetag:
cv2.putText(self.cache_frame, time.strftime("%Y-%m-%d %H:%M:%S" + self.timezone_str),
cv2.putText(result, time.strftime("%Y-%m-%d %H:%M:%S" + self.timezone_str),
self.tag_pos, cv2.FONT_HERSHEY_SIMPLEX, self.tag_scale,
(0, 255, 0), self.tag_thickness)
return self.cache_frame.copy()

return result

def write(self, frame):
if frame.dtype != np.uint8:
Expand All @@ -140,17 +216,19 @@ def close(self):


class ScreenRecorder:
def __init__(self, outfile, get_frame_func, fps=10, snapshot_sleep=0.001, orientation=0, timetag=True):
def __init__(self, outfile, get_frame_func, fps=10, snapshot_sleep=0.001, orientation=0, timetag=True, max_size=1280):
self.get_frame_func = get_frame_func
self.tmp_frame = self.get_frame_func()
self.snapshot_sleep = snapshot_sleep

width, height = self.tmp_frame.shape[1], self.tmp_frame.shape[0]
self.writer = FfmpegVidWriter(outfile, width, height, fps, orientation, timetag)
self.writer = FfmpegVidWriter(outfile, width, height, fps, orientation, timetag, max_size)
self.tmp_frame = self.writer.process_frame(self.tmp_frame)
self.frame_queue = deque(maxlen=100)
self.frame_queue.append((time.time(), self.tmp_frame))

self.process_frame_queue = deque(maxlen=100)

self._is_running = False
self._stop_flag = False
self._stop_time = 0
Expand Down Expand Up @@ -184,6 +262,9 @@ def start(self):
self.t_stream = threading.Thread(target=self.get_frame_loop)
self.t_stream.setDaemon(True)
self.t_stream.start()
self.t_process_frame = threading.Thread(target=self.process_frame_loop)
self.t_process_frame.setDaemon(True)
self.t_process_frame.start()
self.t_write = threading.Thread(target=self.write_frame_loop)
self.t_write.setDaemon(True)
self.t_write.start()
Expand Down Expand Up @@ -218,9 +299,7 @@ def get_frame_loop(self):
cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 0), 1)
time.sleep(1)

self.tmp_frame = self.writer.process_frame(tmp_frame)
self.frame_queue.append((time.time(), self.tmp_frame))
time.sleep(0.5/self.writer.fps)
self.frame_queue.append((time.time(), tmp_frame))
if self.is_stop():
break
self._stop_flag = True
Expand All @@ -229,6 +308,19 @@ def get_frame_loop(self):
self._stop_flag = True
raise

def process_frame_loop(self):
while not self.is_stop() or len(self.frame_queue) > 0:
if len(self.frame_queue) > 0:
frames_to_process = []
while len(self.frame_queue) > 0 and len(frames_to_process) < 5:
frames_to_process.append(self.frame_queue.popleft())

for t, tmp_frame in frames_to_process:
processed = self.writer.process_frame(tmp_frame)
self.process_frame_queue.append((t, processed))
else:
time.sleep(0.001)

def write_frame_loop(self):
try:
duration = 1.0/self.writer.fps
Expand All @@ -240,8 +332,8 @@ def write_frame_loop(self):
if self.writer.process.poll() is not None: # 检查 FFmpeg 进程状态
LOGGING.error("FFmpeg process has terminated unexpectedly. Exiting write loop.")
break
if len(self.frame_queue) > 0:
t, frame = self.frame_queue.popleft()
if len(self.process_frame_queue) > 0:
t, frame = self.process_frame_queue.popleft()
if last_frame is None:
try:
self.writer.write(frame)
Expand Down
8 changes: 4 additions & 4 deletions airtest/core/android/android.py
Original file line number Diff line number Diff line change
Expand Up @@ -953,10 +953,10 @@ def start_recording(self, max_time=1800, output=None, fps=10, mode="yosemite",
max_time=max_time, bit_rate=bit_rate, bool_is_vertical=bool_is_vertical)
return save_path

if fps > 10 or fps < 1:
LOGGING.warning("fps should be between 1 and 10, becuase of the recording effiency")
if fps > 10:
fps = 10
if fps > 24 or fps < 1:
LOGGING.warning("fps should be between 1 and 24, becuase of the recording effiency")
if fps > 24:
fps = 24
if fps < 1:
fps = 1

Expand Down
Binary file modified airtest/core/android/static/apks/Yosemite.apk
Binary file not shown.
34 changes: 30 additions & 4 deletions airtest/core/ios/ios.py
Original file line number Diff line number Diff line change
Expand Up @@ -1058,6 +1058,27 @@ def home_interface(self):
return True
return False

def get_mjpeg_fps(self):
try:
settings = self.driver._session_http.get("/appium/settings")
fps = settings.value.get("mjpegServerFramerate")
return fps
except:
return None

def set_mjpeg_fps(self, fps):
data = {
"settings": {
"mjpegServerFramerate": fps
}
}
try:
self.driver._session_http.post("/appium/settings", data=data)
return True
except Exception as e:
LOGGING.error(f"set mjpeg fps to {fps} failed: {e}")
return False

def disconnect(self):
"""Disconnected mjpeg and rotation_watcher.
"""
Expand Down Expand Up @@ -1105,13 +1126,18 @@ def start_recording(self, max_time=1800, output=None, fps=10,
>>> dev.start_recording(output="test.mp4", max_size=800)

"""
if fps > 10 or fps < 1:
LOGGING.warning("fps should be between 1 and 10, becuase of the recording effiency")
if fps > 10:
fps = 10
if fps > 24 or fps < 1:
LOGGING.warning("fps should be between 1 and 24, becuase of the recording effiency")
if fps > 24:
fps = 24
if fps < 1:
fps = 1

origin_fps = self.get_mjpeg_fps()
if origin_fps != fps:
ret = self.set_mjpeg_fps(fps)
LOGGING.info(f"set mjpeg fps to {fps}. result: {ret}")

if self.recorder and self.recorder.is_running():
LOGGING.warning("recording is already running, please don't call again")
return None
Expand Down
Loading
Loading