diff --git a/airtest/aircv/screen_recorder.py b/airtest/aircv/screen_recorder.py index 50266788..da7cfd6e 100644 --- a/airtest/aircv/screen_recorder.py +++ b/airtest/aircv/screen_recorder.py @@ -46,36 +46,58 @@ class FfmpegVidWriter: """ Generate a video using FFMPEG. """ - def __init__(self, outfile, width, height, fps=10, orientation=0, timetag=True): + def __init__(self, outfile, width, height, fps=10, orientation=0, timetag=True, max_size=1280): self.fps = fps + self.original_width = width + self.original_height = height - # 三种横竖屏录屏模式 1 竖屏 2 横屏 0 方形居中 + # 三种横竖屏录屏模式 1 竖屏 2 横屏 0 保持原始宽高比 self.orientation = RECORDER_ORI.get(str(orientation).upper(), orientation) if self.orientation == 1: + # 竖屏模式:确保高度>=宽度 self.height = max(width, height) self.width = min(width, height) elif self.orientation == 2: + # 横屏模式:确保宽度>=高度 self.width = max(width, height) self.height = min(width, height) else: - self.width = self.height = max(width, height) + # orientation=0: 自适应模式,支持横竖屏切换 + # 使用横竖屏的最大尺寸作为画布,确保无论横竖屏都能完整显示 + max_dimension = max(width, height) # 长边 + min_dimension = min(width, height) # 短边 + + # 使用正方形画布,边长为max(width, height) + # 这样无论横竖屏,都能完整显示,只是会有黑边 + self.width = max_dimension + self.height = max_dimension + LOGGING.info(f"Adaptive mode: using square canvas {self.width}x{self.height} to support rotation") - # 满足视频宽高条件 - self.height = height = self.height - (self.height % 32) + 32 - self.width = width = self.width - (self.width % 32) + 32 - self.cache_frame = np.zeros((height, width, 3), dtype=np.uint8) + # 降低编码分辨率(保持宽高比) + max_encode_width = max_size + if self.width > max_encode_width: + scale = max_encode_width / self.width + self.width = max_encode_width + self.height = int(self.height * scale) + LOGGING.info(f"Reduced encoding resolution: {self.width}x{self.height}, scale: {scale:.3f}") + + # 满足视频宽高条件(32的倍数) + self.encode_width = self.width - (self.width % 32) + 32 + self.encode_height = self.height - (self.height % 32) + 32 + + # 创建缓存帧 + self.cache_frame = np.zeros((self.encode_height, self.encode_width, 3), dtype=np.uint8) # 添加时间戳 self.timetag = timetag if self.timetag: - scale = self.height*0.001 + scale = self.encode_height * 0.001 self.tag_scale = max(0.5, min(scale, 1.5)) - thickness = int(self.height*0.002) - # 指定时间戳的位置和粗细 + thickness = int(self.encode_height * 0.002) self.tag_thickness = max(1, min(thickness+1, 4)) - self.tag_pos = (0, int(self.height*0.035)) + self.tag_pos = (0, int(self.encode_height * 0.035)) - # 生成时区信息(UTC+08:00) + # 生成时区信息 timezone_offset = time.timezone / 3600 timezone_offset_hours = int(abs(timezone_offset)) self.timezone_str = f"(UTC{'+' if timezone_offset <= 0 else '-'}{timezone_offset_hours:02d}:00)" @@ -93,33 +115,87 @@ def __init__(self, outfile, width, height, fps=10, orientation=0, timetag=True): self.process = ( ffmpeg .input('pipe:', format='rawvideo', pix_fmt='rgb24', - s='{}x{}'.format(width, height), framerate=self.fps) - .output(outfile, pix_fmt='yuv420p', vcodec='libx264', crf=25, threads=1, - preset="veryfast", framerate=self.fps) - .global_args("-loglevel", "error") + s='{}x{}'.format(self.encode_width, self.encode_height), framerate=self.fps) + .output(outfile, pix_fmt='yuv420p', vcodec='libx264', crf=28, threads=0, + preset="ultrafast", framerate=self.fps, tune="zerolatency") + .global_args("-loglevel", "error", "-x264-params", + "keyint=30:min-keyint=30:scenecut=0:ref=1:bframes=0:subme=1:trellis=0:fast-pskip=1:no-mbtree=1:weightb=0:mixed-refs=0:me=dia:merange=4:rc-lookahead=0" + ) .overwrite_output() .run_async(pipe_stdin=True) ) self.writer = self.process.stdin + self.write_count = 0 + self.avg_write_time = 0 + def process_frame(self, frame): assert len(frame.shape) == 3 - frame = frame[..., ::-1] - if self.orientation == 1 and frame.shape[1] > frame.shape[0]: - frame = cv2.resize(frame, (self.width, int(self.width*self.width/self.height))) - elif self.orientation == 2 and frame.shape[1] < frame.shape[0]: - frame = cv2.resize(frame, (int(self.height*self.height/self.width), self.height)) - h_st = max(self.cache_frame.shape[0]//2 - frame.shape[0]//2, 0) - w_st = max(self.cache_frame.shape[1]//2 - frame.shape[1]//2, 0) - h_ed = min(h_st+frame.shape[0], self.cache_frame.shape[0]) - w_ed = min(w_st+frame.shape[1], self.cache_frame.shape[1]) + frame = frame[..., ::-1] # BGR to RGB + + # 获取原始帧尺寸 + frame_h, frame_w = frame.shape[:2] + + # 计算目标尺寸,保持原始宽高比 + if self.orientation == 1: + # 竖屏模式 + if frame_w > frame_h: + target_w = self.encode_width + target_h = int(target_w * frame_h / frame_w) + frame = cv2.resize(frame, (target_w, target_h)) + else: + target_w = self.encode_width + target_h = int(target_w * frame_h / frame_w) + frame = cv2.resize(frame, (target_w, target_h)) + elif self.orientation == 2: + # 横屏模式 + if frame_h > frame_w: + target_h = self.encode_height + target_w = int(target_h * frame_w / frame_h) + frame = cv2.resize(frame, (target_w, target_h)) + else: + target_h = self.encode_height + target_w = int(target_h * frame_w / frame_h) + frame = cv2.resize(frame, (target_w, target_h)) + else: + # orientation=0: 自适应模式,支持横竖屏切换 + # 计算缩放比例,确保画面完整显示(不被裁剪) + scale_w = self.encode_width / frame_w + scale_h = self.encode_height / frame_h + scale = min(scale_w, scale_h) # 使用较小的缩放比例,确保完整显示 + + target_w = int(frame_w * scale) + target_h = int(frame_h * scale) + + frame = cv2.resize(frame, (target_w, target_h), interpolation=cv2.INTER_LINEAR) + + # 居中放置(无论尺寸是否匹配都居中,确保横竖屏切换时画面居中) + frame_h, frame_w = frame.shape[:2] + + # 清空画布 self.cache_frame[:] = 0 - self.cache_frame[h_st:h_ed, w_st:w_ed, :] = frame[:(h_ed-h_st), :(w_ed-w_st)] + + # 计算居中位置 + h_st = max((self.encode_height - frame_h) // 2, 0) + w_st = max((self.encode_width - frame_w) // 2, 0) + h_ed = min(h_st + frame_h, self.encode_height) + w_ed = min(w_st + frame_w, self.encode_width) + + # 确保不越界 + copy_h = min(h_ed - h_st, frame_h) + copy_w = min(w_ed - w_st, frame_w) + + # 将缩放后的帧居中放置到画布上 + self.cache_frame[h_st:h_st+copy_h, w_st:w_st+copy_w, :] = frame[:copy_h, :copy_w] + result = self.cache_frame.copy() + + # 添加时间戳 if self.timetag: - cv2.putText(self.cache_frame, time.strftime("%Y-%m-%d %H:%M:%S" + self.timezone_str), + cv2.putText(result, time.strftime("%Y-%m-%d %H:%M:%S" + self.timezone_str), self.tag_pos, cv2.FONT_HERSHEY_SIMPLEX, self.tag_scale, (0, 255, 0), self.tag_thickness) - return self.cache_frame.copy() + + return result def write(self, frame): if frame.dtype != np.uint8: @@ -140,17 +216,19 @@ def close(self): class ScreenRecorder: - def __init__(self, outfile, get_frame_func, fps=10, snapshot_sleep=0.001, orientation=0, timetag=True): + def __init__(self, outfile, get_frame_func, fps=10, snapshot_sleep=0.001, orientation=0, timetag=True, max_size=1280): self.get_frame_func = get_frame_func self.tmp_frame = self.get_frame_func() self.snapshot_sleep = snapshot_sleep width, height = self.tmp_frame.shape[1], self.tmp_frame.shape[0] - self.writer = FfmpegVidWriter(outfile, width, height, fps, orientation, timetag) + self.writer = FfmpegVidWriter(outfile, width, height, fps, orientation, timetag, max_size) self.tmp_frame = self.writer.process_frame(self.tmp_frame) self.frame_queue = deque(maxlen=100) self.frame_queue.append((time.time(), self.tmp_frame)) + self.process_frame_queue = deque(maxlen=100) + self._is_running = False self._stop_flag = False self._stop_time = 0 @@ -184,6 +262,9 @@ def start(self): self.t_stream = threading.Thread(target=self.get_frame_loop) self.t_stream.setDaemon(True) self.t_stream.start() + self.t_process_frame = threading.Thread(target=self.process_frame_loop) + self.t_process_frame.setDaemon(True) + self.t_process_frame.start() self.t_write = threading.Thread(target=self.write_frame_loop) self.t_write.setDaemon(True) self.t_write.start() @@ -218,9 +299,7 @@ def get_frame_loop(self): cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 0), 1) time.sleep(1) - self.tmp_frame = self.writer.process_frame(tmp_frame) - self.frame_queue.append((time.time(), self.tmp_frame)) - time.sleep(0.5/self.writer.fps) + self.frame_queue.append((time.time(), tmp_frame)) if self.is_stop(): break self._stop_flag = True @@ -229,6 +308,19 @@ def get_frame_loop(self): self._stop_flag = True raise + def process_frame_loop(self): + while not self.is_stop() or len(self.frame_queue) > 0: + if len(self.frame_queue) > 0: + frames_to_process = [] + while len(self.frame_queue) > 0 and len(frames_to_process) < 5: + frames_to_process.append(self.frame_queue.popleft()) + + for t, tmp_frame in frames_to_process: + processed = self.writer.process_frame(tmp_frame) + self.process_frame_queue.append((t, processed)) + else: + time.sleep(0.001) + def write_frame_loop(self): try: duration = 1.0/self.writer.fps @@ -240,8 +332,8 @@ def write_frame_loop(self): if self.writer.process.poll() is not None: # 检查 FFmpeg 进程状态 LOGGING.error("FFmpeg process has terminated unexpectedly. Exiting write loop.") break - if len(self.frame_queue) > 0: - t, frame = self.frame_queue.popleft() + if len(self.process_frame_queue) > 0: + t, frame = self.process_frame_queue.popleft() if last_frame is None: try: self.writer.write(frame) diff --git a/airtest/core/android/android.py b/airtest/core/android/android.py index c427d572..10259644 100644 --- a/airtest/core/android/android.py +++ b/airtest/core/android/android.py @@ -953,10 +953,10 @@ def start_recording(self, max_time=1800, output=None, fps=10, mode="yosemite", max_time=max_time, bit_rate=bit_rate, bool_is_vertical=bool_is_vertical) return save_path - if fps > 10 or fps < 1: - LOGGING.warning("fps should be between 1 and 10, becuase of the recording effiency") - if fps > 10: - fps = 10 + if fps > 24 or fps < 1: + LOGGING.warning("fps should be between 1 and 24, becuase of the recording effiency") + if fps > 24: + fps = 24 if fps < 1: fps = 1 diff --git a/airtest/core/android/static/apks/Yosemite.apk b/airtest/core/android/static/apks/Yosemite.apk index bb2cfd60..cf74a38e 100644 Binary files a/airtest/core/android/static/apks/Yosemite.apk and b/airtest/core/android/static/apks/Yosemite.apk differ diff --git a/airtest/core/ios/ios.py b/airtest/core/ios/ios.py index d7b7c227..6e9892cb 100644 --- a/airtest/core/ios/ios.py +++ b/airtest/core/ios/ios.py @@ -1058,6 +1058,27 @@ def home_interface(self): return True return False + def get_mjpeg_fps(self): + try: + settings = self.driver._session_http.get("/appium/settings") + fps = settings.value.get("mjpegServerFramerate") + return fps + except: + return None + + def set_mjpeg_fps(self, fps): + data = { + "settings": { + "mjpegServerFramerate": fps + } + } + try: + self.driver._session_http.post("/appium/settings", data=data) + return True + except Exception as e: + LOGGING.error(f"set mjpeg fps to {fps} failed: {e}") + return False + def disconnect(self): """Disconnected mjpeg and rotation_watcher. """ @@ -1105,13 +1126,18 @@ def start_recording(self, max_time=1800, output=None, fps=10, >>> dev.start_recording(output="test.mp4", max_size=800) """ - if fps > 10 or fps < 1: - LOGGING.warning("fps should be between 1 and 10, becuase of the recording effiency") - if fps > 10: - fps = 10 + if fps > 24 or fps < 1: + LOGGING.warning("fps should be between 1 and 24, becuase of the recording effiency") + if fps > 24: + fps = 24 if fps < 1: fps = 1 + origin_fps = self.get_mjpeg_fps() + if origin_fps != fps: + ret = self.set_mjpeg_fps(fps) + LOGGING.info(f"set mjpeg fps to {fps}. result: {ret}") + if self.recorder and self.recorder.is_running(): LOGGING.warning("recording is already running, please don't call again") return None diff --git a/airtest/core/win/win.py b/airtest/core/win/win.py index a48d5fdd..280aa03c 100644 --- a/airtest/core/win/win.py +++ b/airtest/core/win/win.py @@ -1,5 +1,6 @@ # -*- coding: utf-8 -*- +import ctypes import time import socket import subprocess @@ -164,9 +165,9 @@ def snapshot(self, filename=None, quality=10, max_size=None): """ if self.app: rect = self.get_rect() - rect = self._fix_image_rect(rect) - monitor = {"top": rect.top, "left": rect.left, "width": rect.right - rect.left - abs(self.monitor["left"]), - "height": rect.bottom - rect.top, "monitor": 1} + monitor = {"top": rect.top, "left": rect.left, + "width": rect.right - rect.left, + "height": rect.bottom - rect.top} else: monitor = self.screen.monitors[0] try: @@ -222,13 +223,63 @@ def text(self, text, **kwargs): def _fix_op_pos(self, pos): """Fix operation position.""" # 如果是全屏的话,就进行双屏修正,否则就正常即可 - if not self.handle: + if not self.app: pos = list(pos) pos[0] = pos[0] + self.monitor["left"] pos[1] = pos[1] + self.monitor["top"] return pos + @staticmethod + def _safe_move(coords): + """Move mouse cursor using SetCursorPos (virtual screen coordinates, multi-monitor safe).""" + win32api.SetCursorPos((int(coords[0]), int(coords[1]))) + + @staticmethod + def _safe_mouse_event(coords, button="left", button_down=True, button_up=True): + """Send mouse button events at coords using SendInput (multi-monitor safe). + + Uses SetCursorPos for positioning and SendInput for button events, + avoiding pywinauto's mouse_event which incorrectly normalizes coordinates + with SM_CXSCREEN instead of SM_CXVIRTUALSCREEN on multi-monitor setups. + """ + win32api.SetCursorPos((int(coords[0]), int(coords[1]))) + + MOUSEEVENTF_LEFTDOWN = 0x0002 + MOUSEEVENTF_LEFTUP = 0x0004 + MOUSEEVENTF_RIGHTDOWN = 0x0008 + MOUSEEVENTF_RIGHTUP = 0x0010 + MOUSEEVENTF_MIDDLEDOWN = 0x0020 + MOUSEEVENTF_MIDDLEUP = 0x0040 + + button_map = { + "left": (MOUSEEVENTF_LEFTDOWN, MOUSEEVENTF_LEFTUP), + "right": (MOUSEEVENTF_RIGHTDOWN, MOUSEEVENTF_RIGHTUP), + "middle": (MOUSEEVENTF_MIDDLEDOWN, MOUSEEVENTF_MIDDLEUP), + } + down_flag, up_flag = button_map.get(button, button_map["left"]) + + class MOUSEINPUT(ctypes.Structure): + _fields_ = [("dx", ctypes.c_long), ("dy", ctypes.c_long), + ("mouseData", ctypes.c_ulong), ("dwFlags", ctypes.c_ulong), + ("time", ctypes.c_ulong), ("dwExtraInfo", ctypes.POINTER(ctypes.c_ulong))] + + class INPUT(ctypes.Structure): + class _INPUT(ctypes.Union): + _fields_ = [("mi", MOUSEINPUT)] + _fields_ = [("type", ctypes.c_ulong), ("ii", _INPUT)] + + def _send_mouse_input(flags): + mi = MOUSEINPUT(0, 0, 0, flags, 0, None) + inp = INPUT(type=0) # INPUT_MOUSE = 0 + inp.ii.mi = mi + ctypes.windll.user32.SendInput(1, ctypes.byref(inp), ctypes.sizeof(INPUT)) + + if button_down: + _send_mouse_input(down_flag) + if button_up: + _send_mouse_input(up_flag) + def key_press(self, key): """Simulates a key press event. @@ -309,28 +360,76 @@ def touch(self, pos, **kwargs): for i in range(1, steps): x = int(start_x + (end_x - start_x) * i / steps) y = int(start_y + (end_y - start_y) * i / steps) - self.mouse.move(coords=(x, y)) + self._safe_move((x, y)) time.sleep(interval) - self.mouse.move(coords=(end_x, end_y)) + self._safe_move((end_x, end_y)) for i in range(1, offset + 1): - self.mouse.move(coords=(end_x + i, end_y + i)) + self._safe_move((end_x + i, end_y + i)) time.sleep(0.01) for i in range(offset): - self.mouse.move(coords=(end_x + offset - i, end_y + offset - i)) + self._safe_move((end_x + offset - i, end_y + offset - i)) time.sleep(0.01) - self.mouse.press(button=button, coords=(end_x, end_y)) + self._safe_mouse_event((end_x, end_y), button=button, button_down=True, button_up=False) time.sleep(duration) - self.mouse.release(button=button, coords=(end_x, end_y)) + self._safe_mouse_event((end_x, end_y), button=button, button_down=False, button_up=True) return ori_end def double_click(self, pos): ori_pos = get_absolute_coordinate(pos, self) coords = self._fix_op_pos(self._action_pos(ori_pos)) - self.mouse.double_click(coords=coords) + self._safe_mouse_event(coords, button="left", button_down=True, button_up=True) + time.sleep(0.05) + self._safe_mouse_event(coords, button="left", button_down=True, button_up=True) + return ori_pos + + def scroll(self, pos, direction="down", clicks=None, percent=None): + """ + Perform mouse wheel scroll at given position + + Two ways to specify scroll amount (mutually exclusive): + - clicks: exact number of wheel clicks (Windows-native, precise) + - percent: scroll amount as a fraction of screen height (cross-platform compatible) + + If neither is specified, defaults to percent=0.3. + + Args: + pos: coordinates where to scroll, supports absolute (pixels) or relative ([0,1]) coordinates + direction: scroll direction, 'up' or 'down', default is 'down' + clicks: number of scroll wheel clicks, e.g. 3 means 3 wheel notches + percent: scroll amount as screen height fraction, e.g. 0.3 means 30% of screen height + + Examples: + >>> from airtest.core.api import connect_device + >>> dev = connect_device("Windows:///") + >>> dev.scroll((500, 300), direction="down", clicks=3) + >>> dev.scroll((0.5, 0.5), direction="up", percent=0.5) + + Returns: + None + + """ + if clicks is not None and percent is not None: + raise ValueError("Cannot specify both 'clicks' and 'percent', use one or the other") + + if clicks is not None: + wheel_clicks = abs(int(clicks)) + elif percent is not None: + # 将 percent 转换为滚轮格数: 屏幕高度 * percent / 120 (Windows 默认每格 120 像素) + _, screen_h = self.get_current_resolution() + wheel_clicks = max(1, round(abs(percent) * screen_h / 120)) + else: + # 默认 percent=0.3 + _, screen_h = self.get_current_resolution() + wheel_clicks = max(1, round(0.3 * screen_h / 120)) + + ori_pos = get_absolute_coordinate(pos, self) + coords = self._fix_op_pos(self._action_pos(ori_pos)) + wheel_dist = wheel_clicks if direction == "up" else -wheel_clicks + self.mouse.scroll(coords=(int(coords[0]), int(coords[1])), wheel_dist=wheel_dist) return ori_pos def swipe(self, p1, p2, duration=0.8, steps=5, button="left"): @@ -361,35 +460,30 @@ def swipe(self, p1, p2, duration=0.8, steps=5, button="left"): to_x, to_y = self._fix_op_pos(self._action_pos(ori_to)) interval = float(duration) / (steps + 1) - self.mouse.press(coords=(from_x, from_y), button=button) + self._safe_mouse_event((from_x, from_y), button=button, button_down=True, button_up=False) time.sleep(interval) for i in range(1, steps): - self.mouse.move(coords=( + self._safe_move(( int(from_x + (to_x - from_x) * i / steps), int(from_y + (to_y - from_y) * i / steps), )) time.sleep(interval) for i in range(10): - self.mouse.move(coords=(to_x, to_y)) + self._safe_move((to_x, to_y)) time.sleep(interval) - self.mouse.release(coords=(to_x, to_y), button=button) + self._safe_mouse_event((to_x, to_y), button=button, button_down=False, button_up=True) return ori_from, ori_to def mouse_move(self, pos): """Simulates a `mousemove` event. - Known bug: - Due to a bug in the pywinauto module, users might experience \ - off-by-one errors when it comes to the exact coordinates of \ - the position on screen. - :param pos: A tuple (x, y), where x and y are x and y coordinates of the screen to move the mouse to, respectively. """ if not isinstance(pos, tuple) or len(pos) != 2: # pos is not a 2-tuple raise ValueError('invalid literal for mouse_move: {}'.format(pos)) try: - self.mouse.move(coords=self._action_pos(pos)) + self._safe_move(self._action_pos(pos)) except ValueError: # in case where x, y are not numbers raise ValueError('invalid literal for mouse_move: {}'.format(pos)) @@ -405,7 +499,7 @@ def mouse_down(self, button='left'): raise ValueError('invalid literal for mouse_down(): {}'.format(button)) else: coords = self._action_pos(win32api.GetCursorPos()) - self.mouse.press(button=button, coords=coords) + self._safe_mouse_event(coords, button=button, button_down=True, button_up=False) def mouse_up(self, button='left'): """Simulates a `mouseup` event. @@ -420,7 +514,7 @@ def mouse_up(self, button='left'): raise ValueError('invalid literal for mouse_up(): {}'.format(button)) else: coords = self._action_pos(win32api.GetCursorPos()) - self.mouse.release(button=button, coords=coords) + self._safe_mouse_event(coords, button=button, button_down=False, button_up=True) def start_app(self, path, *args, **kwargs): """ diff --git a/airtest/utils/version.py b/airtest/utils/version.py index edf8b05e..806665ac 100644 --- a/airtest/utils/version.py +++ b/airtest/utils/version.py @@ -1,4 +1,4 @@ -__version__ = "1.4.3" +__version__ = "1.4.6" import os import sys