/** * OCR 聊天记录提取功能(Node.js 实现) * * 功能:根据屏幕截图提取聊天记录和聊天角色,并输出为 JSON 格式 * - 使用 OCR 识别聊天内容 * - 根据头像位置或消息位置识别发送者角色(friend/me) * - 返回 JSON 格式的消息数组,每条消息包含 sender 和 text 字段 * * 实现方式:直接调用 Python 的 OnnxOCR 和 OpenCV,通过内联 Python 代码实现 */ import { exec } from 'child_process'; import { promisify } from 'util'; import { join, isAbsolute } from 'path'; import { fileURLToPath } from 'url'; import { dirname } from 'path'; import { readFile, writeFile } from 'fs/promises'; const execAsync = promisify(exec); const __filename = fileURLToPath(import.meta.url); const __dirname = dirname(__filename); /** * 安全读取包含 Unicode 字符的图片 */ function readImageSafe(imagePath) { return ` import cv2 import numpy as np import os def read_image_safe(image_path): abs_path = os.path.abspath(str(image_path)) try: with open(abs_path, 'rb') as f: image_data = f.read() img_array = np.frombuffer(image_data, np.uint8) img = cv2.imdecode(img_array, cv2.IMREAD_COLOR) if img is None: raise ValueError(f"cv2.imdecode 无法解码图片: {abs_path}") return img except FileNotFoundError: raise FileNotFoundError(f"图片文件不存在: {abs_path}") except Exception as e: raise Exception(f"读取图片失败: {abs_path}, 错误: {str(e)}") `; } /** * 根据屏幕截图提取聊天记录和聊天角色 * * @param {string} screenshotPath - 截图路径(屏幕截图文件) * @param {string} friendAvatarPath - 好友头像路径(可选,用于识别发送者角色) * @param {string} myAvatarPath - 我的头像路径(可选,用于识别发送者角色) * @param {number} deviceWidth - 设备宽度(可选,用于位置判断) * @param {number} deviceHeight - 设备高度(可选,用于位置判断) * @param {string} workflowFolder - 工作流文件夹路径(可选) * @param {string} regionJson - 识别区域 JSON 字符串(可选,包含四个顶点坐标的 corners 对象) * @returns {Promise<{success: boolean, messages?: Array<{sender: 'friend'|'me'|'unknown', text: string}>, messagesText?: string, error?: string}>} * * messages: JSON 格式的消息数组,每条消息包含: * - sender: 发送者角色('friend' 表示好友,'me' 表示自己,'unknown' 表示无法识别) * - text: 消息文本内容 */ export async function extractChatHistory(screenshotPath, friendAvatarPath, myAvatarPath, deviceWidth, deviceHeight, workflowFolder, regionJson = null, friendRgb = null, myRgb = null) { try { const pythonExePath = join(__dirname, '..', '..', 'py', 'venv', 'Scripts', 'python.exe'); const onnxocrPath = join(__dirname, '..', '..', 'py', 'OnnxOCR'); // 构建内联 Python 脚本 const pythonCode = ` import sys import os import cv2 import numpy as np import json from pathlib import Path # 添加 OnnxOCR 路径 sys.path.insert(0, r"${onnxocrPath.replace(/\\/g, '/')}") from onnxocr.onnx_paddleocr import ONNXPaddleOcr # 设置环境变量 os.environ['DISABLE_MODEL_SOURCE_CHECK'] = 'True' ${readImageSafe()} def find_avatar_positions(screenshot_path, friend_avatar_path, my_avatar_path): """在截图中查找头像位置""" screenshot = read_image_safe(screenshot_path) result = {'friend': [], 'my': []} if friend_avatar_path and friend_avatar_path != 'None': try: friend_avatar = read_image_safe(friend_avatar_path) result_friend = cv2.matchTemplate(screenshot, friend_avatar, cv2.TM_CCOEFF_NORMED) locations_friend = np.where(result_friend >= 0.8) for pt in zip(*locations_friend[::-1]): result['friend'].append([int(pt[0]), int(pt[1])]) except Exception as e: print(f"查找好友头像失败: {e}", file=sys.stderr) if my_avatar_path and my_avatar_path != 'None': try: my_avatar = read_image_safe(my_avatar_path) result_my = cv2.matchTemplate(screenshot, my_avatar, cv2.TM_CCOEFF_NORMED) locations_my = np.where(result_my >= 0.8) for pt in zip(*locations_my[::-1]): result['my'].append([int(pt[0]), int(pt[1])]) except Exception as e: print(f"查找我的头像失败: {e}", file=sys.stderr) return result def detect_bubble_color(screenshot, box): """检测文本框区域的主要颜色(RGB)""" try: # 获取文本框的边界框 x_coords = [point[0] for point in box] y_coords = [point[1] for point in box] x_min, x_max = int(min(x_coords)), int(max(x_coords)) y_min, y_max = int(min(y_coords)), int(max(y_coords)) # 确保坐标在图片范围内 x_min = max(0, x_min) y_min = max(0, y_min) x_max = min(screenshot.shape[1] - 1, x_max) y_max = min(screenshot.shape[0] - 1, y_max) if x_max <= x_min or y_max <= y_min: return None # 提取文本框区域(扩大一点范围以包含气泡背景) # 向上和向下各扩展10像素,向左和向右各扩展5像素 expand_x = 5 expand_y = 10 x_start = max(0, x_min - expand_x) y_start = max(0, y_min - expand_y) x_end = min(screenshot.shape[1], x_max + expand_x) y_end = min(screenshot.shape[0], y_max + expand_y) bubble_region = screenshot[y_start:y_end, x_start:x_end] if bubble_region.size == 0: return None # 计算区域的平均RGB值 # OpenCV使用BGR格式,需要转换为RGB avg_bgr = np.mean(bubble_region.reshape(-1, 3), axis=0) avg_rgb = [int(avg_bgr[2]), int(avg_bgr[1]), int(avg_bgr[0])] # BGR -> RGB return avg_rgb except Exception as e: return None def match_rgb_color(actual_rgb, target_rgb, tolerance=30): """判断实际RGB是否匹配目标RGB(允许容差)""" if actual_rgb is None or target_rgb is None: return False return all(abs(actual_rgb[i] - target_rgb[i]) <= tolerance for i in range(3)) def extract_chat_history(screenshot_path, friend_avatar_path, my_avatar_path, device_width, device_height, workflow_folder, region_json=None, friend_rgb=None, my_rgb=None): """提取完整的聊天记录""" try: original_screenshot = read_image_safe(screenshot_path) if original_screenshot is None: return {'success': False, 'error': '无法读取截图文件'} # 如果提供了区域,先裁剪图片,然后再进行OCR识别 # 这样可以确保只识别指定区域,避免识别到键盘和导航栏 crop_offset_x = 0 crop_offset_y = 0 screenshot = original_screenshot original_height = original_screenshot.shape[0] original_width = original_screenshot.shape[1] if region_json and region_json != 'None': try: region = json.loads(region_json) # 区域格式:corners 对象,包含 topLeft, topRight, bottomLeft, bottomRight if isinstance(region, dict) and 'topLeft' in region and 'bottomRight' in region: top_left = region['topLeft'] bottom_right = region['bottomRight'] x1 = int(top_left.get('x', 0)) y1 = int(top_left.get('y', 0)) x2 = int(bottom_right.get('x', original_width)) y2 = int(bottom_right.get('y', original_height)) # 确保坐标在图片范围内,并且 x2 > x1, y2 > y1 x1 = max(0, min(x1, original_width - 1)) y1 = max(0, min(y1, original_height - 1)) x2 = max(x1 + 1, min(x2, original_width)) y2 = max(y1 + 1, min(y2, original_height)) # 验证裁剪区域是否有效 if x2 > x1 and y2 > y1: # 保存裁剪偏移量(用于调整头像位置) crop_offset_x = x1 crop_offset_y = y1 # 裁剪图片:使用 numpy 数组切片 [y1:y2, x1:x2] screenshot = original_screenshot[y1:y2, x1:x2] # 验证裁剪后的图片是否有效 if screenshot is not None and screenshot.size > 0: # 保存裁剪后的图片到工作流目录下的 tmp 目录,用于调试 try: import datetime # 获取工作流目录下的 tmp 目录路径 # 方法1: 从 workflow_folder 推断(如果提供) if workflow_folder and workflow_folder != 'None': workflow_path = Path(workflow_folder) # workflow_folder 通常是 static/processing/xxx 格式的绝对路径 # tmp 目录应该在工作流目录下:static/processing/xxx/tmp tmp_dir = workflow_path / 'tmp' else: # 方法2: 从截图路径推断(向后兼容) screenshot_path_obj = Path(screenshot_path) # 尝试向上查找工作流目录(包含 tmp 目录的父目录) current = screenshot_path_obj.parent tmp_dir = None for _ in range(5): # 最多向上查找5层 if (current / 'tmp').exists(): tmp_dir = current / 'tmp' break # 检查是否是工作流目录(包含 processing.json) if (current / 'processing.json').exists(): tmp_dir = current / 'tmp' break current = current.parent if tmp_dir is None: # 如果找不到,使用截图目录的父目录下的 tmp tmp_dir = screenshot_path_obj.parent / 'tmp' os.makedirs(str(tmp_dir), exist_ok=True) timestamp = datetime.datetime.now().strftime('%Y%m%d_%H%M%S_%f') cropped_image_path = tmp_dir / f'cropped_region_{timestamp}.png' cv2.imwrite(str(cropped_image_path), screenshot) # 清理工作流目录下的 tmp 目录:如果总大小超过 20MB,删除时间最早的文件 try: max_size = 20 * 1024 * 1024 # 20MB files = [] if tmp_dir.exists(): for file_path in tmp_dir.iterdir(): if file_path.is_file(): file_stat = file_path.stat() files.append({ 'path': file_path, 'size': file_stat.st_size, 'mtime': file_stat.st_mtime }) # 计算总大小 total_size = sum(f['size'] for f in files) # 如果总大小超过 20MB,删除时间最早的文件 if total_size > max_size: # 按修改时间排序(最早的在前) files.sort(key=lambda x: x['mtime']) # 删除最早的文件直到总大小小于 20MB for file_info in files: if total_size <= max_size: break try: file_info['path'].unlink() total_size -= file_info['size'] except Exception as del_error: pass except Exception as cleanup_error: pass # 清理失败不影响主流程 print(f"Python: 裁剪后的图片已保存到: {cropped_image_path}", file=sys.stderr) print(f"Python: 裁剪区域 - x1={x1}, y1={y1}, x2={x2}, y2={y2}, 原始尺寸={original_width}x{original_height}, 裁剪后尺寸={screenshot.shape[1]}x{screenshot.shape[0]}", file=sys.stderr) except Exception as save_error: print(f"Python: 保存裁剪图片失败: {save_error}", file=sys.stderr) import traceback traceback.print_exc(file=sys.stderr) else: screenshot = original_screenshot crop_offset_x = 0 crop_offset_y = 0 except Exception as e: print(f"Python: 区域裁剪异常: {e}", file=sys.stderr) import traceback traceback.print_exc(file=sys.stderr) # 查找头像位置 # 如果图片被裁剪了,在裁剪后的图片上查找头像(需要临时保存) # 否则在原始截图上查找 import tempfile temp_cropped_path = None try: if crop_offset_x > 0 or crop_offset_y > 0: # 如果图片被裁剪了,需要临时保存裁剪后的图片用于头像匹配 temp_cropped_path = tempfile.mktemp(suffix='.png') cv2.imwrite(temp_cropped_path, screenshot) avatar_positions = find_avatar_positions(temp_cropped_path, friend_avatar_path, my_avatar_path) else: # 使用原始截图查找头像 avatar_positions = find_avatar_positions(screenshot_path, friend_avatar_path, my_avatar_path) finally: # 清理临时文件 if temp_cropped_path and os.path.exists(temp_cropped_path): try: os.remove(temp_cropped_path) except: pass # 获取 OCR 实例 ocr = ONNXPaddleOcr(use_angle_cls=False, use_gpu=True) # 执行 OCR(cls=False 避免角度分类器警告) # 如果提供了区域,在裁剪后的图片上识别;否则全屏识别 ocr_result = ocr.ocr(screenshot, cls=False) if not ocr_result or not ocr_result[0]: return {'success': False, 'error': 'OCR 识别失败'} # 解析 OCR 结果,按 y 坐标分组消息 messages = [] friend_positions = avatar_positions.get('friend', []) my_positions = avatar_positions.get('my', []) # 获取截图高度(如果被裁剪了,使用裁剪后的高度;否则使用原始高度) screenshot_height = screenshot.shape[0] screenshot_width = screenshot.shape[1] # 计算键盘区域的阈值:通常键盘在屏幕底部,占屏幕高度的30-40% # 如果提供了区域裁剪,说明用户已经指定了识别区域,应该信任这个区域,不进行键盘过滤 # 或者使用更宽松的阈值(90%),只过滤最底部的内容 if region_json and region_json != 'None': # 如果提供了识别区域,使用更宽松的阈值(90%),几乎不过滤 # 因为用户已经通过区域限制了识别范围 keyboard_threshold_y = int(screenshot_height * 0.90) # 从90%的位置开始过滤 else: # 如果没有提供区域,使用原来的65%阈值 keyboard_threshold_y = int(screenshot_height * 0.65) # 从65%的位置开始过滤 # 简单的消息分组逻辑:根据 y 坐标和头像位置判断发送者 for line in ocr_result[0]: if not line: continue box = line[0] text = line[1][0] if len(line) > 1 and line[1] else '' confidence = line[1][1] if len(line) > 1 and len(line[1]) > 1 else 0.0 if not text or confidence < 0.5: continue # 计算消息框的中心坐标 x_center = sum([point[0] for point in box]) / len(box) y_center = sum([point[1] for point in box]) / len(box) # 过滤键盘区域:如果y坐标超过阈值,直接跳过 if y_center > keyboard_threshold_y: continue # 过滤明显是键盘按键的文本(即使y坐标在阈值内,也要过滤) keyboard_keywords = ['ABC', 'DEF', 'GHI', 'JKL', 'MNO', 'PQRS', 'TUV', 'WXYZ', '分词', '重输', '换行', '符', '中/英', '中二', '123', '0', '1', '2', '3', '4', '5', '6', '7', '8', '9', 'Q', 'W', 'E', 'R', 'T', 'Y', 'U', 'I', 'O', 'P', 'A', 'S', 'D', 'F', 'G', 'H', 'J', 'K', 'L', 'Z', 'X', 'C', 'V', 'B', 'N', 'M', '△', '三', '-'] if text.strip() in keyboard_keywords: continue # 判断发送者(优先使用RGB颜色判断,如果失败则使用头像距离判断,最后使用 x 坐标判断) sender = 'unknown' # 方法1: 使用RGB颜色判断(最高优先级) if friend_rgb and my_rgb: try: bubble_rgb = detect_bubble_color(screenshot, box) if bubble_rgb: # 判断是否匹配好友颜色(白色/浅色) if match_rgb_color(bubble_rgb, friend_rgb, tolerance=40): sender = 'friend' # 判断是否匹配我的颜色(绿色) elif match_rgb_color(bubble_rgb, my_rgb, tolerance=40): sender = 'me' except Exception as e: pass # RGB检测失败,继续使用其他方法 # 方法2: 如果RGB判断失败,使用头像位置距离判断 if sender == 'unknown': min_friend_dist = float('inf') min_my_dist = float('inf') for fx, fy in friend_positions: dist = abs(y_center - (fy + 20)) # 假设头像高度约 40px if dist < min_friend_dist: min_friend_dist = dist for mx, my in my_positions: dist = abs(y_center - (my + 20)) if dist < min_my_dist: min_my_dist = dist # 优先使用距离判断(阈值100像素) if min_friend_dist < 100 and min_friend_dist < min_my_dist: sender = 'friend' elif min_my_dist < 100 and min_my_dist < min_friend_dist: sender = 'me' else: # 距离判断失败,使用备选方法 screen_center_x = device_width / 2 # 如果两个头像都没找到,直接使用 x 坐标判断 if not friend_positions and not my_positions: if x_center < screen_center_x: sender = 'friend' # 左侧通常是好友 else: sender = 'me' # 右侧通常是"我" # 如果只找到好友头像,放宽阈值到150像素 elif friend_positions and not my_positions: if min_friend_dist < 150: sender = 'friend' elif x_center < screen_center_x: sender = 'friend' else: sender = 'me' # 如果只找到我的头像,放宽阈值到150像素 elif my_positions and not friend_positions: if min_my_dist < 150: sender = 'me' elif x_center >= screen_center_x: sender = 'me' else: sender = 'friend' # 如果两个头像都找到了但距离判断失败,使用 x 坐标判断 else: if x_center < screen_center_x: sender = 'friend' else: sender = 'me' messages.append({ 'text': text, 'sender': sender, 'y': int(y_center), 'confidence': float(confidence) }) # 按 y 坐标排序(从上到下) messages.sort(key=lambda m: m['y']) # 格式化消息文本 messages_text = '\\n'.join([f"{'对方' if m['sender'] == 'friend' else '我' if m['sender'] == 'me' else '未知'}: {m['text']}" for m in messages]) result = { 'success': True, 'messages': messages, 'messagesText': messages_text, 'count': len(messages) } return result except Exception as e: return {'success': False, 'error': f'提取聊天记录失败: {str(e)}'} # 主逻辑 if __name__ == '__main__': import sys screenshot_path = sys.argv[1] friend_avatar_path = sys.argv[2] if len(sys.argv) > 2 and sys.argv[2] != 'None' else None my_avatar_path = sys.argv[3] if len(sys.argv) > 3 and sys.argv[3] != 'None' else None device_width = int(sys.argv[4]) if len(sys.argv) > 4 and sys.argv[4] else 1080 device_height = int(sys.argv[5]) if len(sys.argv) > 5 and sys.argv[5] else 2400 workflow_folder = sys.argv[6] if len(sys.argv) > 6 and sys.argv[6] != 'None' else None region_json = sys.argv[7] if len(sys.argv) > 7 and sys.argv[7] != 'None' else None friend_rgb_str = sys.argv[8] if len(sys.argv) > 8 and sys.argv[8] != 'None' else None my_rgb_str = sys.argv[9] if len(sys.argv) > 9 and sys.argv[9] != 'None' else None # 解析RGB字符串(格式:"(r,g,b)") friend_rgb = None my_rgb = None if friend_rgb_str and friend_rgb_str != 'None': try: # 解析格式 "(r,g,b)" 或 "(r, g, b)" friend_rgb_str = friend_rgb_str.strip().strip('()') parts = [int(x.strip()) for x in friend_rgb_str.split(',')] if len(parts) == 3: friend_rgb = parts except Exception as e: print(f"Python: 解析好友RGB失败: {e}", file=sys.stderr) if my_rgb_str and my_rgb_str != 'None': try: # 解析格式 "(r,g,b)" 或 "(r, g, b)" my_rgb_str = my_rgb_str.strip().strip('()') parts = [int(x.strip()) for x in my_rgb_str.split(',')] if len(parts) == 3: my_rgb = parts except Exception as e: print(f"Python: 解析我的RGB失败: {e}", file=sys.stderr) # 打印接收到的参数(用于调试) print(f"Python: 接收到的参数 - region_json={'已提供' if region_json and region_json != 'None' else '未提供'}, friend_rgb={friend_rgb}, my_rgb={my_rgb}", file=sys.stderr) result = extract_chat_history(screenshot_path, friend_avatar_path, my_avatar_path, device_width, device_height, workflow_folder, region_json, friend_rgb, my_rgb) print(json.dumps(result, ensure_ascii=False)) `; // 将 Python 代码写入临时文件 const tempScriptPath = join(__dirname, '..', '..', 'temp_extract_chat_history.py'); await writeFile(tempScriptPath, pythonCode, 'utf8'); // 构建命令 const normalizedScreenshotPath = screenshotPath.replace(/\\/g, '/'); let friendAvatarArg = 'None'; if (friendAvatarPath) { friendAvatarArg = isAbsolute(friendAvatarPath) ? friendAvatarPath.replace(/\\/g, '/') : join(__dirname, '..', '..', 'static', 'processing', friendAvatarPath).replace(/\\/g, '/'); } let myAvatarArg = 'None'; if (myAvatarPath) { myAvatarArg = isAbsolute(myAvatarPath) ? myAvatarPath.replace(/\\/g, '/') : join(__dirname, '..', '..', 'static', 'processing', myAvatarPath).replace(/\\/g, '/'); } let workflowFolderArg = 'None'; if (workflowFolder) { workflowFolderArg = isAbsolute(workflowFolder) ? workflowFolder.replace(/\\/g, '/') : join(__dirname, '..', '..', 'static', 'processing', workflowFolder).replace(/\\/g, '/'); } // 传递区域参数(如果提供) let regionArg = 'None'; if (regionJson && regionJson !== 'None') { regionArg = regionJson.replace(/"/g, '\\"'); } // 传递RGB参数(如果提供) let friendRgbArg = 'None'; if (friendRgb && typeof friendRgb === 'string') { friendRgbArg = friendRgb; } let myRgbArg = 'None'; if (myRgb && typeof myRgb === 'string') { myRgbArg = myRgb; } const command = `"${pythonExePath}" "${tempScriptPath}" "${normalizedScreenshotPath}" "${friendAvatarArg}" "${myAvatarArg}" ${deviceWidth || 1080} ${deviceHeight || 2400} "${workflowFolderArg}" "${regionArg}" "${friendRgbArg}" "${myRgbArg}"`; const env = { ...process.env, DISABLE_MODEL_SOURCE_CHECK: 'True' }; const { stdout, stderr } = await execAsync(command, { timeout: 60000, maxBuffer: 10 * 1024 * 1024, cwd: join(__dirname, '..', '..'), encoding: 'utf8', env: { ...env, PYTHONIOENCODING: 'utf-8', PYTHONUTF8: '1' } }); // 打印 Python 脚本的 stderr 输出 if (stderr && stderr.trim()) { try { const decodedStderr = Buffer.from(stderr, 'utf8').toString('utf8'); console.log(decodedStderr.trim()); } catch (e) { console.log(stderr.trim()); } } // 清理临时文件 try { await import('fs/promises').then(fs => fs.unlink(tempScriptPath)); } catch (e) { // 忽略删除失败 } // 解析输出 const cleanStdout = stdout.replace(/\[33m.*?\[0m/g, '').replace(/DeprecationWarning.*?\n/g, ''); try { const result = JSON.parse(cleanStdout.trim()); return result; } catch (parseError) { console.error('聊天记录解析失败:', parseError); console.error('原始输出:', cleanStdout); return { success: false, error: `解析聊天记录失败: ${parseError.message}` }; } } catch (error) { console.error('提取聊天记录失败:', error); if (error.message && error.message.includes('timeout')) { return { success: false, error: '提取聊天记录超时,请检查网络连接或稍后重试' }; } return { success: false, error: error.message }; } } /** * 获取最后一条消息 */ export async function getLastMessage(screenshotPath, friendAvatarPath, myAvatarPath, deviceWidth, deviceHeight) { try { // 先提取完整聊天记录 const result = await extractChatHistory(screenshotPath, friendAvatarPath, myAvatarPath, deviceWidth, deviceHeight, null); if (!result.success || !result.messages || result.messages.length === 0) { return { success: false, error: '未找到消息' }; } // 获取最后一条消息(y 坐标最大的) const lastMessage = result.messages.reduce((max, msg) => msg.y > max.y ? msg : max, result.messages[0]); return { success: true, text: lastMessage.text, sender: lastMessage.sender, position: { y: lastMessage.y } }; } catch (error) { return { success: false, error: error.message }; } } /** * 全屏 OCR 识别 */ export async function ocrFullScreen(screenshotPath, deviceWidth, deviceHeight) { try { const pythonExePath = join(__dirname, '..', '..', 'py', 'venv', 'Scripts', 'python.exe'); const onnxocrPath = join(__dirname, '..', '..', 'py', 'OnnxOCR'); const pythonCode = ` import sys import os import cv2 import numpy as np import json # 添加 OnnxOCR 路径 sys.path.insert(0, r"${onnxocrPath.replace(/\\/g, '/')}") from onnxocr.onnx_paddleocr import ONNXPaddleOcr # 设置环境变量 os.environ['DISABLE_MODEL_SOURCE_CHECK'] = 'True' def read_image_safe(image_path): abs_path = os.path.abspath(str(image_path)) try: with open(abs_path, 'rb') as f: image_data = f.read() img_array = np.frombuffer(image_data, np.uint8) img = cv2.imdecode(img_array, cv2.IMREAD_COLOR) if img is None: raise ValueError(f"cv2.imdecode 无法解码图片: {abs_path}") return img except FileNotFoundError: raise FileNotFoundError(f"图片文件不存在: {abs_path}") except Exception as e: raise Exception(f"读取图片失败: {abs_path}, 错误: {str(e)}") # 主逻辑 if __name__ == '__main__': screenshot_path = sys.argv[1] try: screenshot = read_image_safe(screenshot_path) if screenshot is None: print(json.dumps({'success': False, 'error': '无法读取截图文件'}, ensure_ascii=False)) sys.exit(1) ocr = ONNXPaddleOcr(use_angle_cls=False, use_gpu=True) ocr_result = ocr.ocr(screenshot, cls=False) if not ocr_result or not ocr_result[0]: print(json.dumps({'success': False, 'error': 'OCR 识别失败'}, ensure_ascii=False)) sys.exit(1) # 提取所有文本 texts = [] for line in ocr_result[0]: if line and len(line) > 1: text = line[1][0] if isinstance(line[1], (list, tuple)) else str(line[1]) if text: texts.append(text) full_text = '\\n'.join(texts) print(json.dumps({ 'success': True, 'text': full_text, 'position': None }, ensure_ascii=False)) except Exception as e: print(json.dumps({'success': False, 'error': f'OCR 识别失败: {str(e)}'}, ensure_ascii=False)) sys.exit(1) `; // 将 Python 代码写入临时文件 const tempScriptPath = join(__dirname, '..', '..', 'temp_ocr_full_screen.py'); await writeFile(tempScriptPath, pythonCode, 'utf8'); const normalizedScreenshotPath = screenshotPath.replace(/\\/g, '/'); const command = `"${pythonExePath}" "${tempScriptPath}" "${normalizedScreenshotPath}"`; const env = { ...process.env, DISABLE_MODEL_SOURCE_CHECK: 'True' }; const { stdout, stderr } = await execAsync(command, { timeout: 60000, maxBuffer: 10 * 1024 * 1024, cwd: join(__dirname, '..', '..'), encoding: 'utf8', env: { ...env, PYTHONIOENCODING: 'utf-8', PYTHONUTF8: '1' } }); // 清理临时文件 try { await import('fs/promises').then(fs => fs.unlink(tempScriptPath)); } catch (e) { // 忽略删除失败 } const cleanStdout = stdout.replace(/\[33m.*?\[0m/g, '').replace(/DeprecationWarning.*?\n/g, ''); try { const result = JSON.parse(cleanStdout.trim()); return result; } catch (parseError) { return { success: false, error: `解析失败: ${parseError.message}` }; } } catch (error) { return { success: false, error: error.message }; } }