| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744 |
- /**
- * OCR 聊天记录提取功能(Node.js 实现)
- *
- * 功能:根据屏幕截图提取聊天记录和聊天角色,并输出为 JSON 格式
- * - 使用 OCR 识别聊天内容
- * - 根据头像位置或消息位置识别发送者角色(friend/me)
- * - 返回 JSON 格式的消息数组,每条消息包含 sender 和 text 字段
- *
- * 实现方式:直接调用 Python 的 OnnxOCR 和 OpenCV,通过内联 Python 代码实现
- */
- import { exec } from 'child_process';
- import { promisify } from 'util';
- import { join, isAbsolute } from 'path';
- import { fileURLToPath } from 'url';
- import { dirname } from 'path';
- import { readFile, writeFile } from 'fs/promises';
- const execAsync = promisify(exec);
- const __filename = fileURLToPath(import.meta.url);
- const __dirname = dirname(__filename);
- /**
- * 安全读取包含 Unicode 字符的图片
- */
- function readImageSafe(imagePath) {
- return `
- import cv2
- import numpy as np
- import os
- def read_image_safe(image_path):
- abs_path = os.path.abspath(str(image_path))
- try:
- with open(abs_path, 'rb') as f:
- image_data = f.read()
- img_array = np.frombuffer(image_data, np.uint8)
- img = cv2.imdecode(img_array, cv2.IMREAD_COLOR)
- if img is None:
- raise ValueError(f"cv2.imdecode 无法解码图片: {abs_path}")
- return img
- except FileNotFoundError:
- raise FileNotFoundError(f"图片文件不存在: {abs_path}")
- except Exception as e:
- raise Exception(f"读取图片失败: {abs_path}, 错误: {str(e)}")
- `;
- }
- /**
- * 根据屏幕截图提取聊天记录和聊天角色
- *
- * @param {string} screenshotPath - 截图路径(屏幕截图文件)
- * @param {string} friendAvatarPath - 好友头像路径(可选,用于识别发送者角色)
- * @param {string} myAvatarPath - 我的头像路径(可选,用于识别发送者角色)
- * @param {number} deviceWidth - 设备宽度(可选,用于位置判断)
- * @param {number} deviceHeight - 设备高度(可选,用于位置判断)
- * @param {string} workflowFolder - 工作流文件夹路径(可选)
- * @param {string} regionJson - 识别区域 JSON 字符串(可选,包含四个顶点坐标的 corners 对象)
- * @returns {Promise<{success: boolean, messages?: Array<{sender: 'friend'|'me'|'unknown', text: string}>, messagesText?: string, error?: string}>}
- *
- * messages: JSON 格式的消息数组,每条消息包含:
- * - sender: 发送者角色('friend' 表示好友,'me' 表示自己,'unknown' 表示无法识别)
- * - text: 消息文本内容
- */
- export async function extractChatHistory(screenshotPath, friendAvatarPath, myAvatarPath, deviceWidth, deviceHeight, workflowFolder, regionJson = null, friendRgb = null, myRgb = null) {
- try {
- const pythonExePath = join(__dirname, '..', '..', 'py', 'venv', 'Scripts', 'python.exe');
- const onnxocrPath = join(__dirname, '..', '..', 'py', 'OnnxOCR');
-
- // 构建内联 Python 脚本
- const pythonCode = `
- import sys
- import os
- import cv2
- import numpy as np
- import json
- from pathlib import Path
- # 添加 OnnxOCR 路径
- sys.path.insert(0, r"${onnxocrPath.replace(/\\/g, '/')}")
- from onnxocr.onnx_paddleocr import ONNXPaddleOcr
- # 设置环境变量
- os.environ['DISABLE_MODEL_SOURCE_CHECK'] = 'True'
- ${readImageSafe()}
- def find_avatar_positions(screenshot_path, friend_avatar_path, my_avatar_path):
- """在截图中查找头像位置"""
- screenshot = read_image_safe(screenshot_path)
- result = {'friend': [], 'my': []}
-
- if friend_avatar_path and friend_avatar_path != 'None':
- try:
- friend_avatar = read_image_safe(friend_avatar_path)
- result_friend = cv2.matchTemplate(screenshot, friend_avatar, cv2.TM_CCOEFF_NORMED)
- locations_friend = np.where(result_friend >= 0.8)
- for pt in zip(*locations_friend[::-1]):
- result['friend'].append([int(pt[0]), int(pt[1])])
- except Exception as e:
- print(f"查找好友头像失败: {e}", file=sys.stderr)
-
- if my_avatar_path and my_avatar_path != 'None':
- try:
- my_avatar = read_image_safe(my_avatar_path)
- result_my = cv2.matchTemplate(screenshot, my_avatar, cv2.TM_CCOEFF_NORMED)
- locations_my = np.where(result_my >= 0.8)
- for pt in zip(*locations_my[::-1]):
- result['my'].append([int(pt[0]), int(pt[1])])
- except Exception as e:
- print(f"查找我的头像失败: {e}", file=sys.stderr)
-
- return result
- def detect_bubble_color(screenshot, box):
- """检测文本框区域的主要颜色(RGB)"""
- try:
- # 获取文本框的边界框
- x_coords = [point[0] for point in box]
- y_coords = [point[1] for point in box]
- x_min, x_max = int(min(x_coords)), int(max(x_coords))
- y_min, y_max = int(min(y_coords)), int(max(y_coords))
-
- # 确保坐标在图片范围内
- x_min = max(0, x_min)
- y_min = max(0, y_min)
- x_max = min(screenshot.shape[1] - 1, x_max)
- y_max = min(screenshot.shape[0] - 1, y_max)
-
- if x_max <= x_min or y_max <= y_min:
- return None
-
- # 提取文本框区域(扩大一点范围以包含气泡背景)
- # 向上和向下各扩展10像素,向左和向右各扩展5像素
- expand_x = 5
- expand_y = 10
- x_start = max(0, x_min - expand_x)
- y_start = max(0, y_min - expand_y)
- x_end = min(screenshot.shape[1], x_max + expand_x)
- y_end = min(screenshot.shape[0], y_max + expand_y)
-
- bubble_region = screenshot[y_start:y_end, x_start:x_end]
-
- if bubble_region.size == 0:
- return None
-
- # 计算区域的平均RGB值
- # OpenCV使用BGR格式,需要转换为RGB
- avg_bgr = np.mean(bubble_region.reshape(-1, 3), axis=0)
- avg_rgb = [int(avg_bgr[2]), int(avg_bgr[1]), int(avg_bgr[0])] # BGR -> RGB
-
- return avg_rgb
- except Exception as e:
- return None
- def match_rgb_color(actual_rgb, target_rgb, tolerance=30):
- """判断实际RGB是否匹配目标RGB(允许容差)"""
- if actual_rgb is None or target_rgb is None:
- return False
- return all(abs(actual_rgb[i] - target_rgb[i]) <= tolerance for i in range(3))
- def extract_chat_history(screenshot_path, friend_avatar_path, my_avatar_path, device_width, device_height, workflow_folder, region_json=None, friend_rgb=None, my_rgb=None):
- """提取完整的聊天记录"""
- try:
- original_screenshot = read_image_safe(screenshot_path)
- if original_screenshot is None:
- return {'success': False, 'error': '无法读取截图文件'}
-
- # 如果提供了区域,先裁剪图片,然后再进行OCR识别
- # 这样可以确保只识别指定区域,避免识别到键盘和导航栏
- crop_offset_x = 0
- crop_offset_y = 0
- screenshot = original_screenshot
- original_height = original_screenshot.shape[0]
- original_width = original_screenshot.shape[1]
-
- if region_json and region_json != 'None':
- try:
- region = json.loads(region_json)
- # 区域格式:corners 对象,包含 topLeft, topRight, bottomLeft, bottomRight
- if isinstance(region, dict) and 'topLeft' in region and 'bottomRight' in region:
- top_left = region['topLeft']
- bottom_right = region['bottomRight']
- x1 = int(top_left.get('x', 0))
- y1 = int(top_left.get('y', 0))
- x2 = int(bottom_right.get('x', original_width))
- y2 = int(bottom_right.get('y', original_height))
-
- # 确保坐标在图片范围内,并且 x2 > x1, y2 > y1
- x1 = max(0, min(x1, original_width - 1))
- y1 = max(0, min(y1, original_height - 1))
- x2 = max(x1 + 1, min(x2, original_width))
- y2 = max(y1 + 1, min(y2, original_height))
-
- # 验证裁剪区域是否有效
- if x2 > x1 and y2 > y1:
- # 保存裁剪偏移量(用于调整头像位置)
- crop_offset_x = x1
- crop_offset_y = y1
- # 裁剪图片:使用 numpy 数组切片 [y1:y2, x1:x2]
- screenshot = original_screenshot[y1:y2, x1:x2]
-
- # 验证裁剪后的图片是否有效
- if screenshot is not None and screenshot.size > 0:
- # 保存裁剪后的图片到工作流目录下的 tmp 目录,用于调试
- try:
- import datetime
- # 获取工作流目录下的 tmp 目录路径
- # 方法1: 从 workflow_folder 推断(如果提供)
- if workflow_folder and workflow_folder != 'None':
- workflow_path = Path(workflow_folder)
- # workflow_folder 通常是 static/processing/xxx 格式的绝对路径
- # tmp 目录应该在工作流目录下:static/processing/xxx/tmp
- tmp_dir = workflow_path / 'tmp'
- else:
- # 方法2: 从截图路径推断(向后兼容)
- screenshot_path_obj = Path(screenshot_path)
- # 尝试向上查找工作流目录(包含 tmp 目录的父目录)
- current = screenshot_path_obj.parent
- tmp_dir = None
- for _ in range(5): # 最多向上查找5层
- if (current / 'tmp').exists():
- tmp_dir = current / 'tmp'
- break
- # 检查是否是工作流目录(包含 processing.json)
- if (current / 'processing.json').exists():
- tmp_dir = current / 'tmp'
- break
- current = current.parent
-
- if tmp_dir is None:
- # 如果找不到,使用截图目录的父目录下的 tmp
- tmp_dir = screenshot_path_obj.parent / 'tmp'
-
- os.makedirs(str(tmp_dir), exist_ok=True)
- timestamp = datetime.datetime.now().strftime('%Y%m%d_%H%M%S_%f')
- cropped_image_path = tmp_dir / f'cropped_region_{timestamp}.png'
- cv2.imwrite(str(cropped_image_path), screenshot)
-
- # 清理工作流目录下的 tmp 目录:如果总大小超过 20MB,删除时间最早的文件
- try:
- max_size = 20 * 1024 * 1024 # 20MB
- files = []
- if tmp_dir.exists():
- for file_path in tmp_dir.iterdir():
- if file_path.is_file():
- file_stat = file_path.stat()
- files.append({
- 'path': file_path,
- 'size': file_stat.st_size,
- 'mtime': file_stat.st_mtime
- })
-
- # 计算总大小
- total_size = sum(f['size'] for f in files)
-
- # 如果总大小超过 20MB,删除时间最早的文件
- if total_size > max_size:
- # 按修改时间排序(最早的在前)
- files.sort(key=lambda x: x['mtime'])
-
- # 删除最早的文件直到总大小小于 20MB
- for file_info in files:
- if total_size <= max_size:
- break
- try:
- file_info['path'].unlink()
- total_size -= file_info['size']
- except Exception as del_error:
- pass
- except Exception as cleanup_error:
- pass # 清理失败不影响主流程
-
- print(f"Python: 裁剪后的图片已保存到: {cropped_image_path}", file=sys.stderr)
- print(f"Python: 裁剪区域 - x1={x1}, y1={y1}, x2={x2}, y2={y2}, 原始尺寸={original_width}x{original_height}, 裁剪后尺寸={screenshot.shape[1]}x{screenshot.shape[0]}", file=sys.stderr)
- except Exception as save_error:
- print(f"Python: 保存裁剪图片失败: {save_error}", file=sys.stderr)
- import traceback
- traceback.print_exc(file=sys.stderr)
- else:
- screenshot = original_screenshot
- crop_offset_x = 0
- crop_offset_y = 0
- except Exception as e:
- print(f"Python: 区域裁剪异常: {e}", file=sys.stderr)
- import traceback
- traceback.print_exc(file=sys.stderr)
-
- # 查找头像位置
- # 如果图片被裁剪了,在裁剪后的图片上查找头像(需要临时保存)
- # 否则在原始截图上查找
- import tempfile
- temp_cropped_path = None
- try:
- if crop_offset_x > 0 or crop_offset_y > 0:
- # 如果图片被裁剪了,需要临时保存裁剪后的图片用于头像匹配
- temp_cropped_path = tempfile.mktemp(suffix='.png')
- cv2.imwrite(temp_cropped_path, screenshot)
- avatar_positions = find_avatar_positions(temp_cropped_path, friend_avatar_path, my_avatar_path)
- else:
- # 使用原始截图查找头像
- avatar_positions = find_avatar_positions(screenshot_path, friend_avatar_path, my_avatar_path)
- finally:
- # 清理临时文件
- if temp_cropped_path and os.path.exists(temp_cropped_path):
- try:
- os.remove(temp_cropped_path)
- except:
- pass
-
- # 获取 OCR 实例
- ocr = ONNXPaddleOcr(use_angle_cls=False, use_gpu=True)
-
- # 执行 OCR(cls=False 避免角度分类器警告)
- # 如果提供了区域,在裁剪后的图片上识别;否则全屏识别
- ocr_result = ocr.ocr(screenshot, cls=False)
-
- if not ocr_result or not ocr_result[0]:
- return {'success': False, 'error': 'OCR 识别失败'}
-
- # 解析 OCR 结果,按 y 坐标分组消息
- messages = []
- friend_positions = avatar_positions.get('friend', [])
- my_positions = avatar_positions.get('my', [])
-
- # 获取截图高度(如果被裁剪了,使用裁剪后的高度;否则使用原始高度)
- screenshot_height = screenshot.shape[0]
- screenshot_width = screenshot.shape[1]
-
- # 计算键盘区域的阈值:通常键盘在屏幕底部,占屏幕高度的30-40%
- # 如果提供了区域裁剪,说明用户已经指定了识别区域,应该信任这个区域,不进行键盘过滤
- # 或者使用更宽松的阈值(90%),只过滤最底部的内容
- if region_json and region_json != 'None':
- # 如果提供了识别区域,使用更宽松的阈值(90%),几乎不过滤
- # 因为用户已经通过区域限制了识别范围
- keyboard_threshold_y = int(screenshot_height * 0.90) # 从90%的位置开始过滤
- else:
- # 如果没有提供区域,使用原来的65%阈值
- keyboard_threshold_y = int(screenshot_height * 0.65) # 从65%的位置开始过滤
-
- # 简单的消息分组逻辑:根据 y 坐标和头像位置判断发送者
- for line in ocr_result[0]:
- if not line:
- continue
-
- box = line[0]
- text = line[1][0] if len(line) > 1 and line[1] else ''
- confidence = line[1][1] if len(line) > 1 and len(line[1]) > 1 else 0.0
-
- if not text or confidence < 0.5:
- continue
-
- # 计算消息框的中心坐标
- x_center = sum([point[0] for point in box]) / len(box)
- y_center = sum([point[1] for point in box]) / len(box)
-
- # 过滤键盘区域:如果y坐标超过阈值,直接跳过
- if y_center > keyboard_threshold_y:
- continue
-
- # 过滤明显是键盘按键的文本(即使y坐标在阈值内,也要过滤)
- keyboard_keywords = ['ABC', 'DEF', 'GHI', 'JKL', 'MNO', 'PQRS', 'TUV', 'WXYZ',
- '分词', '重输', '换行', '符', '中/英', '中二', '123', '0', '1', '2', '3',
- '4', '5', '6', '7', '8', '9', 'Q', 'W', 'E', 'R', 'T', 'Y', 'U', 'I',
- 'O', 'P', 'A', 'S', 'D', 'F', 'G', 'H', 'J', 'K', 'L', 'Z', 'X', 'C',
- 'V', 'B', 'N', 'M', '△', '三', '-']
- if text.strip() in keyboard_keywords:
- continue
-
- # 判断发送者(优先使用RGB颜色判断,如果失败则使用头像距离判断,最后使用 x 坐标判断)
- sender = 'unknown'
-
- # 方法1: 使用RGB颜色判断(最高优先级)
- if friend_rgb and my_rgb:
- try:
- bubble_rgb = detect_bubble_color(screenshot, box)
- if bubble_rgb:
- # 判断是否匹配好友颜色(白色/浅色)
- if match_rgb_color(bubble_rgb, friend_rgb, tolerance=40):
- sender = 'friend'
- # 判断是否匹配我的颜色(绿色)
- elif match_rgb_color(bubble_rgb, my_rgb, tolerance=40):
- sender = 'me'
- except Exception as e:
- pass # RGB检测失败,继续使用其他方法
-
- # 方法2: 如果RGB判断失败,使用头像位置距离判断
- if sender == 'unknown':
- min_friend_dist = float('inf')
- min_my_dist = float('inf')
-
- for fx, fy in friend_positions:
- dist = abs(y_center - (fy + 20)) # 假设头像高度约 40px
- if dist < min_friend_dist:
- min_friend_dist = dist
-
- for mx, my in my_positions:
- dist = abs(y_center - (my + 20))
- if dist < min_my_dist:
- min_my_dist = dist
-
- # 优先使用距离判断(阈值100像素)
- if min_friend_dist < 100 and min_friend_dist < min_my_dist:
- sender = 'friend'
- elif min_my_dist < 100 and min_my_dist < min_friend_dist:
- sender = 'me'
- else:
- # 距离判断失败,使用备选方法
- screen_center_x = device_width / 2
- # 如果两个头像都没找到,直接使用 x 坐标判断
- if not friend_positions and not my_positions:
- if x_center < screen_center_x:
- sender = 'friend' # 左侧通常是好友
- else:
- sender = 'me' # 右侧通常是"我"
- # 如果只找到好友头像,放宽阈值到150像素
- elif friend_positions and not my_positions:
- if min_friend_dist < 150:
- sender = 'friend'
- elif x_center < screen_center_x:
- sender = 'friend'
- else:
- sender = 'me'
- # 如果只找到我的头像,放宽阈值到150像素
- elif my_positions and not friend_positions:
- if min_my_dist < 150:
- sender = 'me'
- elif x_center >= screen_center_x:
- sender = 'me'
- else:
- sender = 'friend'
- # 如果两个头像都找到了但距离判断失败,使用 x 坐标判断
- else:
- if x_center < screen_center_x:
- sender = 'friend'
- else:
- sender = 'me'
-
- messages.append({
- 'text': text,
- 'sender': sender,
- 'y': int(y_center),
- 'confidence': float(confidence)
- })
-
- # 按 y 坐标排序(从上到下)
- messages.sort(key=lambda m: m['y'])
-
- # 格式化消息文本
- messages_text = '\\n'.join([f"{'对方' if m['sender'] == 'friend' else '我' if m['sender'] == 'me' else '未知'}: {m['text']}" for m in messages])
-
- result = {
- 'success': True,
- 'messages': messages,
- 'messagesText': messages_text,
- 'count': len(messages)
- }
-
- return result
-
- except Exception as e:
- return {'success': False, 'error': f'提取聊天记录失败: {str(e)}'}
- # 主逻辑
- if __name__ == '__main__':
- import sys
- screenshot_path = sys.argv[1]
- friend_avatar_path = sys.argv[2] if len(sys.argv) > 2 and sys.argv[2] != 'None' else None
- my_avatar_path = sys.argv[3] if len(sys.argv) > 3 and sys.argv[3] != 'None' else None
- device_width = int(sys.argv[4]) if len(sys.argv) > 4 and sys.argv[4] else 1080
- device_height = int(sys.argv[5]) if len(sys.argv) > 5 and sys.argv[5] else 2400
- workflow_folder = sys.argv[6] if len(sys.argv) > 6 and sys.argv[6] != 'None' else None
- region_json = sys.argv[7] if len(sys.argv) > 7 and sys.argv[7] != 'None' else None
- friend_rgb_str = sys.argv[8] if len(sys.argv) > 8 and sys.argv[8] != 'None' else None
- my_rgb_str = sys.argv[9] if len(sys.argv) > 9 and sys.argv[9] != 'None' else None
-
- # 解析RGB字符串(格式:"(r,g,b)")
- friend_rgb = None
- my_rgb = None
-
- if friend_rgb_str and friend_rgb_str != 'None':
- try:
- # 解析格式 "(r,g,b)" 或 "(r, g, b)"
- friend_rgb_str = friend_rgb_str.strip().strip('()')
- parts = [int(x.strip()) for x in friend_rgb_str.split(',')]
- if len(parts) == 3:
- friend_rgb = parts
- except Exception as e:
- print(f"Python: 解析好友RGB失败: {e}", file=sys.stderr)
-
- if my_rgb_str and my_rgb_str != 'None':
- try:
- # 解析格式 "(r,g,b)" 或 "(r, g, b)"
- my_rgb_str = my_rgb_str.strip().strip('()')
- parts = [int(x.strip()) for x in my_rgb_str.split(',')]
- if len(parts) == 3:
- my_rgb = parts
- except Exception as e:
- print(f"Python: 解析我的RGB失败: {e}", file=sys.stderr)
-
- # 打印接收到的参数(用于调试)
- print(f"Python: 接收到的参数 - region_json={'已提供' if region_json and region_json != 'None' else '未提供'}, friend_rgb={friend_rgb}, my_rgb={my_rgb}", file=sys.stderr)
-
- result = extract_chat_history(screenshot_path, friend_avatar_path, my_avatar_path, device_width, device_height, workflow_folder, region_json, friend_rgb, my_rgb)
- print(json.dumps(result, ensure_ascii=False))
- `;
- // 将 Python 代码写入临时文件
- const tempScriptPath = join(__dirname, '..', '..', 'temp_extract_chat_history.py');
- await writeFile(tempScriptPath, pythonCode, 'utf8');
- // 构建命令
- const normalizedScreenshotPath = screenshotPath.replace(/\\/g, '/');
- let friendAvatarArg = 'None';
- if (friendAvatarPath) {
- friendAvatarArg = isAbsolute(friendAvatarPath)
- ? friendAvatarPath.replace(/\\/g, '/')
- : join(__dirname, '..', '..', 'static', 'processing', friendAvatarPath).replace(/\\/g, '/');
- }
-
- let myAvatarArg = 'None';
- if (myAvatarPath) {
- myAvatarArg = isAbsolute(myAvatarPath)
- ? myAvatarPath.replace(/\\/g, '/')
- : join(__dirname, '..', '..', 'static', 'processing', myAvatarPath).replace(/\\/g, '/');
- }
-
- let workflowFolderArg = 'None';
- if (workflowFolder) {
- workflowFolderArg = isAbsolute(workflowFolder)
- ? workflowFolder.replace(/\\/g, '/')
- : join(__dirname, '..', '..', 'static', 'processing', workflowFolder).replace(/\\/g, '/');
- }
- // 传递区域参数(如果提供)
- let regionArg = 'None';
- if (regionJson && regionJson !== 'None') {
- regionArg = regionJson.replace(/"/g, '\\"');
- }
-
- // 传递RGB参数(如果提供)
- let friendRgbArg = 'None';
- if (friendRgb && typeof friendRgb === 'string') {
- friendRgbArg = friendRgb;
- }
-
- let myRgbArg = 'None';
- if (myRgb && typeof myRgb === 'string') {
- myRgbArg = myRgb;
- }
-
- const command = `"${pythonExePath}" "${tempScriptPath}" "${normalizedScreenshotPath}" "${friendAvatarArg}" "${myAvatarArg}" ${deviceWidth || 1080} ${deviceHeight || 2400} "${workflowFolderArg}" "${regionArg}" "${friendRgbArg}" "${myRgbArg}"`;
- const env = {
- ...process.env,
- DISABLE_MODEL_SOURCE_CHECK: 'True'
- };
- const { stdout, stderr } = await execAsync(command, {
- timeout: 60000,
- maxBuffer: 10 * 1024 * 1024,
- cwd: join(__dirname, '..', '..'),
- encoding: 'utf8',
- env: { ...env, PYTHONIOENCODING: 'utf-8', PYTHONUTF8: '1' }
- });
- // 打印 Python 脚本的 stderr 输出
- if (stderr && stderr.trim()) {
- try {
- const decodedStderr = Buffer.from(stderr, 'utf8').toString('utf8');
- console.log(decodedStderr.trim());
- } catch (e) {
- console.log(stderr.trim());
- }
- }
- // 清理临时文件
- try {
- await import('fs/promises').then(fs => fs.unlink(tempScriptPath));
- } catch (e) {
- // 忽略删除失败
- }
- // 解析输出
- const cleanStdout = stdout.replace(/\[33m.*?\[0m/g, '').replace(/DeprecationWarning.*?\n/g, '');
-
- try {
- const result = JSON.parse(cleanStdout.trim());
- return result;
- } catch (parseError) {
- console.error('聊天记录解析失败:', parseError);
- console.error('原始输出:', cleanStdout);
- return { success: false, error: `解析聊天记录失败: ${parseError.message}` };
- }
- } catch (error) {
- console.error('提取聊天记录失败:', error);
- if (error.message && error.message.includes('timeout')) {
- return { success: false, error: '提取聊天记录超时,请检查网络连接或稍后重试' };
- }
- return { success: false, error: error.message };
- }
- }
- /**
- * 获取最后一条消息
- */
- export async function getLastMessage(screenshotPath, friendAvatarPath, myAvatarPath, deviceWidth, deviceHeight) {
- try {
- // 先提取完整聊天记录
- const result = await extractChatHistory(screenshotPath, friendAvatarPath, myAvatarPath, deviceWidth, deviceHeight, null);
-
- if (!result.success || !result.messages || result.messages.length === 0) {
- return { success: false, error: '未找到消息' };
- }
-
- // 获取最后一条消息(y 坐标最大的)
- const lastMessage = result.messages.reduce((max, msg) => msg.y > max.y ? msg : max, result.messages[0]);
-
- return {
- success: true,
- text: lastMessage.text,
- sender: lastMessage.sender,
- position: { y: lastMessage.y }
- };
- } catch (error) {
- return { success: false, error: error.message };
- }
- }
- /**
- * 全屏 OCR 识别
- */
- export async function ocrFullScreen(screenshotPath, deviceWidth, deviceHeight) {
- try {
- const pythonExePath = join(__dirname, '..', '..', 'py', 'venv', 'Scripts', 'python.exe');
- const onnxocrPath = join(__dirname, '..', '..', 'py', 'OnnxOCR');
-
- const pythonCode = `
- import sys
- import os
- import cv2
- import numpy as np
- import json
- # 添加 OnnxOCR 路径
- sys.path.insert(0, r"${onnxocrPath.replace(/\\/g, '/')}")
- from onnxocr.onnx_paddleocr import ONNXPaddleOcr
- # 设置环境变量
- os.environ['DISABLE_MODEL_SOURCE_CHECK'] = 'True'
- def read_image_safe(image_path):
- abs_path = os.path.abspath(str(image_path))
- try:
- with open(abs_path, 'rb') as f:
- image_data = f.read()
- img_array = np.frombuffer(image_data, np.uint8)
- img = cv2.imdecode(img_array, cv2.IMREAD_COLOR)
- if img is None:
- raise ValueError(f"cv2.imdecode 无法解码图片: {abs_path}")
- return img
- except FileNotFoundError:
- raise FileNotFoundError(f"图片文件不存在: {abs_path}")
- except Exception as e:
- raise Exception(f"读取图片失败: {abs_path}, 错误: {str(e)}")
- # 主逻辑
- if __name__ == '__main__':
- screenshot_path = sys.argv[1]
-
- try:
- screenshot = read_image_safe(screenshot_path)
- if screenshot is None:
- print(json.dumps({'success': False, 'error': '无法读取截图文件'}, ensure_ascii=False))
- sys.exit(1)
-
- ocr = ONNXPaddleOcr(use_angle_cls=False, use_gpu=True)
- ocr_result = ocr.ocr(screenshot, cls=False)
-
- if not ocr_result or not ocr_result[0]:
- print(json.dumps({'success': False, 'error': 'OCR 识别失败'}, ensure_ascii=False))
- sys.exit(1)
-
- # 提取所有文本
- texts = []
- for line in ocr_result[0]:
- if line and len(line) > 1:
- text = line[1][0] if isinstance(line[1], (list, tuple)) else str(line[1])
- if text:
- texts.append(text)
-
- full_text = '\\n'.join(texts)
-
- print(json.dumps({
- 'success': True,
- 'text': full_text,
- 'position': None
- }, ensure_ascii=False))
- except Exception as e:
- print(json.dumps({'success': False, 'error': f'OCR 识别失败: {str(e)}'}, ensure_ascii=False))
- sys.exit(1)
- `;
- // 将 Python 代码写入临时文件
- const tempScriptPath = join(__dirname, '..', '..', 'temp_ocr_full_screen.py');
- await writeFile(tempScriptPath, pythonCode, 'utf8');
- const normalizedScreenshotPath = screenshotPath.replace(/\\/g, '/');
- const command = `"${pythonExePath}" "${tempScriptPath}" "${normalizedScreenshotPath}"`;
- const env = {
- ...process.env,
- DISABLE_MODEL_SOURCE_CHECK: 'True'
- };
- const { stdout, stderr } = await execAsync(command, {
- timeout: 60000,
- maxBuffer: 10 * 1024 * 1024,
- cwd: join(__dirname, '..', '..'),
- encoding: 'utf8',
- env: { ...env, PYTHONIOENCODING: 'utf-8', PYTHONUTF8: '1' }
- });
- // 清理临时文件
- try {
- await import('fs/promises').then(fs => fs.unlink(tempScriptPath));
- } catch (e) {
- // 忽略删除失败
- }
- const cleanStdout = stdout.replace(/\[33m.*?\[0m/g, '').replace(/DeprecationWarning.*?\n/g, '');
- try {
- const result = JSON.parse(cleanStdout.trim());
- return result;
- } catch (parseError) {
- return { success: false, error: `解析失败: ${parseError.message}` };
- }
- } catch (error) {
- return { success: false, error: error.message };
- }
- }
|