验证码-循环神经网络&CRNN与不定长图文识别
目录
前言
TIP
循环神经网络
CRNN
不定长图文识别
循环神经网络
一、循环神经网络(RNN)是什么?
想象一下,你在听一个故事,要理解整个故事的意思,不能只听一句话,需要记住前面说了什么,才能明白后面在说什么。循环神经网络就是专门用来处理这种"有顺序"的信息的智能系统。
RNN的核心特点
有记忆
RNN最大的特点就是它有"记忆"能力。它处理信息时,不仅看当前输入,还会记住之前处理过的信息。就像你读文章时,看到"他"这个字,需要记住前面提到的人是谁,才能理解"他"指的是谁。
循环神经网络(RNN)的基本概念
循环神经网络(Recurrent Neural Network, RNN)是一种专门用于处理序列数据的神经网络结构。与前馈神经网络不同,RNN具有循环连接,使得隐藏层能够接收上一时刻的隐藏状态作为输入,从而形成"记忆"机制。这种设计使RNN能够捕捉序列数据中的时序依赖关系,适用于自然语言处理、时间序列预测、语音识别等任务。
二、RNN的核心结构与工作原理
RNN的基本结构
包括输入层、隐藏层和输出层。隐藏层之间存在循环连接,使得当前时刻的隐藏状态不仅取决于当前输入,还取决于上一时刻的隐藏状态。
RNN的数学表达式为:
隐藏状态计算:h_t = f(W{hh} * h{t-1} + W_{xh} * x_t + b_h)
输出计算:y_t = W_{hy} * h_t + b_y
其中:
- W{xh}、W{hh}、W_{hy}是权重矩阵,
- b_h、b_y是偏置向量,
- f是激活函数(通常为tanh或ReLU)。
RNN的工作方式:
按顺序处理:RNN会一个字一个字、一个词一个词地处理信息,就像你读书一样从左到右 记住过去:每处理一个信息,它都会把当前的理解和之前记住的内容结合起来 层层递进:随着处理的信息越来越多,它的理解也越来越深入
三、RNN的优缺点
优点: 序列建模能力强,适合处理时间序列数据 可处理变长输入序列 参数共享,减少模型复杂度 记忆能力(LSTM/GRU可记忆长期依赖)
缺点:
原始RNN存在梯度消失/爆炸问题
计算效率低,难以并行化
长程依赖处理能力有限
解释性差,黑箱性质强
长序列训练内存消耗大
四、RNN和CNN的区别
- 处理的信息类型不同
CNN:专门处理图片、图像这类"空间"信息,它关心的是"上下左右"的关系
RNN:专门处理文字、语音、时间序列这类"时间"信息,它关心的是"前后顺序"的关系 - 工作方式不同
CNN:像用放大镜看图片,同时看很多地方,找出特征
RNN:像听故事,必须按顺序听,边听边记 - 记忆能力不同
CNN:没有记忆,每次看图片都是独立的
RNN:有记忆,能记住之前的信息 - 应用场景不同
CNN:适合识别图片里的物体、人脸识别、自动驾驶看路
RNN:适合机器翻译、语音识别、写文章、预测股票走势
CNN和RNN都是重要工具,但它们各有所长:
- CNN是"空间专家":擅长处理图片、图像这类有空间结构的信息
- RNN是"时间专家":擅长处理文字、语音这类有时间顺序的信息
就像你有两个朋友,一个擅长看地图找路(CNN),一个擅长听故事记情节(RNN),遇到不同的问题,找不同的朋友帮忙就行。
五、RNN的改进模型
为解决梯度消失问题,研究者提出了两种重要变体:
- 长短期记忆网络(LSTM) LSTM通过引入门控机制(输入门、遗忘门、输出门)和细胞状态,有效缓解了梯度消失问题。遗忘门控制从上一时刻记忆单元中丢弃多少信息,输入门决定当前输入信息有多少要添加到记忆单元,输出门控制当前记忆单元状态有多少要输出。
- 门控循环单元(GRU) GRU是LSTM的简化版本,将输入门和遗忘门合并为更新门,并引入重置门。GRU参数量更少,训练速度更快,在许多任务中表现与LSTM相当。
LSTM(长短期记忆网络)详解
一、LSTM是什么?
LSTM(Long Short-Term Memory,长短期记忆网络)是一种特殊的循环神经网络,专门为了解决一般RNN存在的长期依赖问题而设计。简单来说,LSTM让计算机能够像人一样,在处理序列信息时记住重要的历史信息,忘记不重要的信息1,6。
想象一下,你读一本小说,读到后面章节时,还能记得前面重要的人物和情节吗?LSTM就是让计算机也能做到这一点4。
二、LSTM为什么需要"门"?
普通的RNN就像一个人记性不好,读文章时读到后面就忘了前面,难以处理长序列数据4,6。LSTM通过引入独特的门控机制来解决这个问题,主要包括三个门1,8:
- 遗忘门:决定忘记什么(像大脑过滤不重要的信息)
- 输入门:决定记住什么(像做笔记时只记重点)
- 输出门:决定输出什么(像回答问题时要筛选信息)
三、LSTM的工作过程
LSTM的工作过程可以概括为四个步骤6,8:
第一步:看新信息
- 接收新的输入(如一个词或一个数据点)
第二步:决定忘记什么(遗忘门工作)
- 判断之前记忆的哪些部分已经过时,可以遗忘
- 比如在阅读时,自动过滤掉"的"、"了"这类不重要的词
第三步:决定记住什么(输入门工作)
- 将新的重要信息与保留的历史信息结合,更新记忆
- 比如记住主角的名字、关键情节等核心内容
第四步:决定输出什么(输出门工作)
- 根据当前记忆和输入,给出最终的判断结果
四、LSTM的核心组件
LSTM的核心在于其细胞状态和三个门结构:
1. 细胞状态(记忆主线)
- 像一条"传送带",贯穿整个时间序列
- 主要负责长期信息的传递和保存
2. 三个门控机制:
- 遗忘门:控制哪些历史信息应该被丢弃
- 输入门:控制哪些新信息应该被加入到细胞状态中
- 输出门:控制当前时刻哪些信息应该被输出
五、LSTM的优势
与传统RNN相比,LSTM具有以下显著优势
| 优势 | 说明 |
|---|---|
| 长期记忆能力 | 能够记住很久以前的重要信息,解决长期依赖问题 |
| 选择性记忆 | 不是什么都记,只关注重要信息,提高效率 |
| 缓解梯度问题 | 有效缓解RNN中的梯度消失和梯度爆炸问题 |
| 适应性强 | 适用于各种序列数据处理任务 |
六、LSTM的应用领域
LSTM在众多领域都有广泛应用
- 机器翻译:翻译长句子时需要记住整个句子的语义信息
- 语音识别:识别连续语音时需要记住前面的语音内容
- 文本生成:根据前文内容预测下一个词或句子
- 时间序列预测:基于历史数据预测股票价格、天气变化等
- 视频分析:理解视频中动作的连续性
- 医疗诊断:分析连续的医疗监测数据
七、简单总结
LSTM就像一位有选择性的记忆大师,它通过精巧的门控机制,知道什么信息该记、什么信息该忘。这种设计使得LSTM特别擅长处理需要长期记忆的任务,无论是分析长篇文章、理解连续语音,还是处理时间序列数据,LSTM都能有效地捕捉和利用关键信息
CRNN(卷积循环神经网络)详解
一、CRNN是什么?
CRNN(Convolutional Recurrent Neural Network)是一种专为序列识别(如文字识别)设计的深度学习模型。它结合了卷积神经网络(CNN) 和循环神经网络(RNN),实现了端到端的图像序列识别,尤其擅长处理不定长文本图像
核心特点:无需预先切割单个字符,直接输入整张图片,即可输出识别出的文本内容。
二、CRNN的工作流程
CRNN的工作流程可以概括为三个核心步骤,下图直观展示了其整体架构与数据处理过程:
1. CNN卷积层:特征提取(视觉质检员)
- 任务:像“视觉质检员”一样扫描图片,提取出有用的视觉特征
- 具体工作:
- 输入图片首先被缩放到固定高度(如32像素),宽度按比例调整
- 通过多个卷积和池化层,将图像转换为一系列特征图。这个过程相当于把图片在高度上不断压缩,最后得到一条“特征带”
- 输出结果是一个特征序列,可以理解为图片被从左到右分割成了若干小块,每个小块都用一个特征向量表示
2. RNN循环层:序列建模(序列分析员)
- 任务:像“序列分析员”一样,按顺序分析CNN传来的特征序列,理解字符之间的上下文关系
- 具体工作:
- 通常采用双向LSTM,能同时考虑前文和后文信息。例如,看到“京”之后是“A”,能推断出这可能是一个北京车牌的开头
- 为特征序列中的每一个时间步(即每一小块特征)预测一个概率分布,表示该位置是某个字符的可能性
3. CTC转录层:对齐解码(格式校对员)
- 任务:解决RNN输出与最终文本标签之间的对齐问题。RNN可能会对同一个字符产生多个重复的预测,或者插入一些空白符。CTC层就像一位“格式校对员”,负责去掉这些冗余信息,整理出干净的文本
- 核心机制:
- 引入
blank(空白符) 来占位,帮助分隔字符 - 合并重复字符:将连续重复的字符合并为一个(如
"aaa"→"a")。 - 去除空白符:删除所有
blank符号,得到最终序列
- 引入
三、CRNN的优势
| 优势 | 说明 |
|---|---|
| 端到端训练 | 输入原始图像,直接输出文本,无需复杂的字符切割或预处理步骤,简化了流程 |
| 处理不定长序列 | 得益于RNN和CTC的设计,能够识别长度不一的文本,适应性强 |
| 高准确率 | CNN能提取鲁棒的特征,RNN能利用上下文信息(如“苹果”比“苹菓”更合理),两者结合大幅提升识别精度 |
| 模型相对轻量 | 参数共享机制使模型比一些纯CNN模型更小巧,便于部署 |
四、CRNN的常见应用
- 车牌识别:识别车辆牌照号码
- 文档数字化:扫描纸质文档或书籍,将其转换为可编辑的电子文本
- 身份证/银行卡识别:快速读取证件上的关键信息
- 场景文字识别:识别街景招牌、广告牌、商品标签等自然场景中的文字
- 手写体识别:识别手写笔记或表格
五、总结与比喻
可以将CRNN模型想象成一个高效的文字识别流水线
- CNN是眼睛:负责“看”图片,找出哪里是文字,并提取基本特征。
- RNN是大脑:负责“理解”这些特征的顺序和上下文关系,猜出可能是什么字。
- CTC是秘书:负责把大脑“碎碎念”般的初步猜测,整理成简洁、准确的最终文本。
这种分工协作的机制,使CRNN成为文字识别领域非常经典且实用的模型
不定长图文识别
识别
完整代码
计算
完整代码
import asyncio
import aiofiles
from PIL import Image, ImageDraw, ImageFont
import random
import time
import os
from io import BytesIO
from torchvision.utils import save_image
async def generate_single_captcha_async(params):
"""
异步生成单个验证码
"""
WIDTH = 150
HEIGHT = 50
num1, num2, operator, operator_symbol, save_path, i, is_name = params
# 构建验证码文本
text = f"{num1}{operator}{num2}"
# 创建图片
image = Image.new('RGB', (WIDTH, HEIGHT), color=(240, 240, 240))
draw = ImageDraw.Draw(image)
# 加载字体(同步操作,但很快)
try:
font_paths = [
"simhei.ttf", "msyh.ttc",
"/System/Library/Fonts/PingFang.ttc",
"C:/Windows/Fonts/simhei.ttf", "arial.ttf"
]
font = None
for font_path in font_paths:
try:
font = ImageFont.truetype(font_path, 32)
break
except:
continue
if font is None:
font = ImageFont.load_default()
except:
font = ImageFont.load_default()
# 计算文本位置
bbox = draw.textbbox((0, 0), text, font=font)
text_width = bbox[2] - bbox[0]
text_height = bbox[3] - bbox[1]
start_x = (WIDTH - text_width) // 2
start_y = (HEIGHT - text_height) // 2
# 设置文字颜色
colors = [(255, 0, 0), (128, 0, 128), (0, 0, 0),
(200, 0, 0), (0, 100, 0), (0, 0, 150)]
text_color = random.choice(colors)
# 绘制文本
draw.text((start_x, start_y), text, font=font, fill=text_color)
# 添加干扰线
for _ in range(random.randint(3, 5)):
x1, y1 = random.randint(0, WIDTH), random.randint(0, HEIGHT)
x2, y2 = random.randint(0, WIDTH), random.randint(0, HEIGHT)
line_color = (random.randint(150, 200), random.randint(150, 200), random.randint(150, 200))
draw.line([(x1, y1), (x2, y2)], fill=line_color, width=1)
# 添加噪点
for _ in range(random.randint(50, 150)):
x, y = random.randint(0, WIDTH - 1), random.randint(0, HEIGHT - 1)
noise_color = (random.randint(200, 250), random.randint(200, 250), random.randint(200, 250))
draw.point((x, y), fill=noise_color)
# 异步保存图片
timestamp = int(time.time() * 1000)
if is_name:
safe_operator = operator if operator != '*' else '乘'
filename = f"{num1}{safe_operator}{num2}_{timestamp}_{i}.png"
else:
filename = f"math_captcha_{i}_{timestamp}.png"
full_path = os.path.join(save_path, filename)
# 使用内存缓冲区保存,然后异步写入文件
img_buffer = BytesIO()
image.save(img_buffer, format='PNG')
img_buffer.seek(0)
async with aiofiles.open(full_path, 'wb') as f:
await f.write(img_buffer.getvalue())
# 计算结果
if operator in ['加', '+']:
result = num1 + num2
elif operator in ['减', '-']:
result = num1 - num2
else: # 乘
result = num1 * num2
calc_expr = f"{num1}{operator_symbol}{num2}={result}"
return full_path, calc_expr, str(result)
async def generate_math_captcha_async(batch_size=1, save_images=False, save_path="./math_captcha_async/",
is_name=True, operator_type="mixed", max_concurrent=10):
"""
异步生成计算题型验证码
"""
CHINESE_OPERATORS = ['加', '减', '乘']
ENGLISH_OPERATORS = ['+', '-', '*']
if save_images:
os.makedirs(save_path, exist_ok=True)
# 准备所有任务参数
tasks_params = []
for i in range(batch_size):
# 选择运算符类型
if operator_type == "chinese":
operator = random.choice(CHINESE_OPERATORS)
elif operator_type == "english":
operator = random.choice(ENGLISH_OPERATORS)
else: # mixed
operator = random.choice(CHINESE_OPERATORS) if random.choice([True, False]) else random.choice(
ENGLISH_OPERATORS)
operator_map = {'加': '+', '减': '-', '乘': '*', '+': '+', '-': '-', '*': '*'}
operator_symbol = operator_map[operator]
# 生成数字
if operator in ['乘', '*']:
num1, num2 = random.randint(100, 999), random.randint(10, 99)
else:
num1, num2 = random.randint(100, 999), random.randint(10, 999)
if operator in ['减', '-'] and num1 < num2:
num1, num2 = num2, num1
tasks_params.append((num1, num2, operator, operator_symbol, save_path, i, is_name))
# 使用信号量控制并发数量
semaphore = asyncio.Semaphore(max_concurrent)
async def bounded_task(params):
async with semaphore:
return await generate_single_captcha_async(params)
# 创建并执行所有任务
tasks = [bounded_task(params) for params in tasks_params]
results = await asyncio.gather(*tasks, return_exceptions=True)
# 处理结果
file_paths = []
calculation_expressions = []
labels = []
for result in results:
if isinstance(result, Exception):
print(f"生成验证码时出错: {result}")
continue
file_path, calc_expr, label = result
file_paths.append(file_path)
calculation_expressions.append(calc_expr)
labels.append(label)
return file_paths, labels, calculation_expressions
async def generate_dataset_async(num_samples=1000, save_path="./dataset_async/",
train_ratio=0.8, batch_size=50, max_concurrent=100):
"""
异步生成完整数据集
"""
train_dir = os.path.join(save_path, "train")
test_dir = os.path.join(save_path, "test")
os.makedirs(train_dir, exist_ok=True)
os.makedirs(test_dir, exist_ok=True)
train_count = int(num_samples * train_ratio)
test_count = num_samples - train_count
print(f"开始异步生成数据集,共{num_samples}个样本")
print(f"训练集: {train_count}个样本,测试集: {test_count}个样本")
print(f"并发数: {max_concurrent},批次大小: {batch_size}")
print("-" * 50)
# 生成训练集
print("生成训练集...")
train_batches = train_count // batch_size
if train_count % batch_size > 0:
train_batches += 1
for batch_idx in range(train_batches):
current_batch_size = min(batch_size, train_count - batch_idx * batch_size)
if current_batch_size <= 0:
break
operator_type = random.choice(["chinese", "english", "mixed"])
file_paths, labels, exprs = await generate_math_captcha_async(
batch_size=current_batch_size,
save_images=True,
save_path=train_dir,
operator_type=operator_type,
max_concurrent=max_concurrent
)
print(f"训练集批次 {batch_idx + 1}/{train_batches}: 生成 {len(file_paths)} 个样本")
# 生成测试集
print("\n生成测试集...")
test_batches = test_count // batch_size
if test_count % batch_size > 0:
test_batches += 1
for batch_idx in range(test_batches):
current_batch_size = min(batch_size, test_count - batch_idx * batch_size)
if current_batch_size <= 0:
break
operator_type = random.choice(["chinese", "english", "mixed"])
file_paths, labels, exprs = await generate_math_captcha_async(
batch_size=current_batch_size,
save_images=True,
save_path=test_dir,
operator_type=operator_type,
max_concurrent=max_concurrent
)
print(f"测试集批次 {batch_idx + 1}/{test_batches}: 生成 {len(file_paths)} 个样本")
print(f"\n异步数据集生成完成!")
print(f"保存路径: {save_path}")
async def benchmark_comparison():
"""
性能对比测试
"""
import time
print("开始性能对比测试...")
# 测试同步版本(小批量)
print("\n1. 测试同步版本 (100个样本):")
start_time = time.time()
# 这里可以调用您原来的同步版本函数
sync_time = time.time() - start_time
print(f"同步版本耗时: {sync_time:.2f}秒")
# 测试异步版本
print("\n2. 测试异步版本 (100个样本):")
start_time = time.time()
await generate_math_captcha_async(batch_size=100, save_images=True,
save_path="./benchmark/", max_concurrent=5)
async_time = time.time() - start_time
print(f"异步版本耗时: {async_time:.2f}秒")
print(f"\n性能提升: {((sync_time / async_time) - 1) * 100:.1f}%")
# 使用示例
async def main():
print("异步验证码生成器")
print("=" * 50)
# 安装所需依赖
# pip install aiofiles pillow
# 生成小批量测试
# print("\n1. 生成测试样本:")
# file_paths, labels, exprs = await generate_math_captcha_async(
# batch_size=40000,
# save_images=True,
# save_path="./picture/train/",
# operator_type="mixed",
# max_concurrent=500
# )
#
# file_paths, labels, exprs = await generate_math_captcha_async(
# batch_size=2000,
# save_images=True,
# save_path="./picture/test/",
# operator_type="mixed",
# max_concurrent=500
# )
#
# file_paths, labels, exprs = await generate_math_captcha_async(
# batch_size=1,
# save_images=True,
# save_path="./",
# operator_type="mixed",
# max_concurrent=500
# )
if __name__ == "__main__":
# 运行异步主函数
asyncio.run(main())
import torch
import torch.nn as nn
from torch.nn import functional as F
cate = 17
class RestNetBasicBlock(nn.Module):
def __init__(self, in_channels, out_channels, stride):
super(RestNetBasicBlock, self).__init__()
self.conv1 = nn.Conv2d(in_channels, out_channels, kernel_size=3, stride=stride, padding=1)
self.bn1 = nn.BatchNorm2d(out_channels)
self.conv2 = nn.Conv2d(out_channels, out_channels, kernel_size=3, stride=stride, padding=1)
self.bn2 = nn.BatchNorm2d(out_channels)
def forward(self, x):
output = self.conv1(x)
output = F.relu(self.bn1(output))
output = self.conv2(output)
output = self.bn2(output)
return F.relu(x + output)
class RestNetDownBlock(nn.Module):
def __init__(self, in_channels, out_channels, stride):
super(RestNetDownBlock, self).__init__()
self.conv1 = nn.Conv2d(in_channels, out_channels, kernel_size=3, stride=stride[0], padding=1)
self.bn1 = nn.BatchNorm2d(out_channels)
self.conv2 = nn.Conv2d(out_channels, out_channels, kernel_size=3, stride=stride[1], padding=1)
self.bn2 = nn.BatchNorm2d(out_channels)
self.extra = nn.Sequential(
nn.Conv2d(in_channels, out_channels, kernel_size=1, stride=stride[0], padding=0),
nn.BatchNorm2d(out_channels)
)
def forward(self, x):
extra_x = self.extra(x)
output = self.conv1(x)
out = F.relu(self.bn1(output))
out = self.conv2(out)
out = self.bn2(out)
return F.relu(extra_x + out)
class ResNet_LSTM_Shape(nn.Module):
def __init__(self):
super(ResNet_LSTM_Shape, self).__init__()
self.conv1 = nn.Conv2d(3, 64, kernel_size=7, stride=2, padding=3)
self.bn1 = nn.BatchNorm2d(64)
self.maxpool = nn.MaxPool2d(kernel_size=3, stride=2, padding=1)
self.layer1 = nn.Sequential(RestNetBasicBlock(64, 64, 1),
RestNetBasicBlock(64, 64, 1))
self.layer2 = nn.Sequential(RestNetDownBlock(64, 128, [2, 1]),
RestNetBasicBlock(128, 128, 1))
self.layer3 = nn.Sequential(RestNetDownBlock(128, 256, [2, 1]),
RestNetBasicBlock(256, 256, 1))
def forward(self, x):
out = self.conv1(x)
out = self.layer1(out)
out = self.layer2(out)
out = self.layer3(out)
return out.shape
class ResNet_LSTM(nn.Module):
def __init__(self, image_shape):
super(ResNet_LSTM, self).__init__()
self.conv1 = nn.Conv2d(3, 64, kernel_size=7, stride=2, padding=3)
self.bn1 = nn.BatchNorm2d(64)
self.maxpool = nn.MaxPool2d(kernel_size=3, stride=2, padding=1)
self.layer1 = nn.Sequential(RestNetBasicBlock(64, 64, 1),
RestNetBasicBlock(64, 64, 1))
self.layer2 = nn.Sequential(RestNetDownBlock(64, 128, [2, 1]),
RestNetBasicBlock(128, 128, 1))
self.layer3 = nn.Sequential(RestNetDownBlock(128, 256, [2, 1]),
RestNetBasicBlock(256, 256, 1))
x = torch.zeros((1, 3) + image_shape)
size = ResNet_LSTM_Shape()(x)
input_size = size[1] * size[2]
self.lstm = nn.LSTM(input_size=input_size, hidden_size=input_size, num_layers=1, bidirectional=True)
#
self.fcl = nn.Linear(input_size * 2, cate)
def forward(self, x):
out = self.conv1(x)
out = self.layer1(out)
out = self.layer2(out)
out = self.layer3(out) # torch.Size([2, 256, 7, 19]) [batch,layer,h,w]
out = out.permute(3, 0, 1, 2) # torch.Size([19, 2, 256, 7])
out_shape = out.shape
out = out.view(out_shape[0], out_shape[1], out_shape[2] * out_shape[3]) # torch.Size([19, 2, 1792])
out, _ = self.lstm(out) # torch.Size([19, 2, 3584])
out_shape = out.shape
out = out.view(out_shape[0] * out_shape[1], out_shape[2])
out = self.fcl(out)
out = out.view(out_shape[0], out_shape[1], -1)
return outimport os
import pprint
import string
from pathlib import Path
import torch
from PIL import Image
from torch.utils.data import Dataset
from tqdm import tqdm
class LetterDataset(Dataset):
def __init__(self, root: str = "./picture", transform=None):
self.path = root
self.transform = transform
self.images = self._load_picture_path()
CHAR_SET = f"_0123456789加减乘+-*"
pprint.pp({
"12": "加",
"13": "减",
"14": "+",
"15": "-",
"16": "*",
})
self.mapper = [i for i in CHAR_SET]
def _load_picture_path(self):
# 使用 lambda 表达式过滤图片文件
image_extensions = {'.jpg', '.jpeg', '.png', '.gif', '.bmp', '.tiff', '.webp'}
# 使用 os.walk 和 lambda 表达式获取所有图片路径
picture_paths = list(
map(lambda walk_result: [
[file, Path(walk_result[0]) / file]
for file in walk_result[2]
if os.path.splitext(file)[1].lower() in image_extensions
], os.walk(self.path))
)
# 将嵌套列表展平
picture_paths = [item for sublist in picture_paths for item in sublist]
return picture_paths
def __getitem__(self, index):
image = self.images[index]
image_path = image[1]
pic = Image.open(image_path)
if self.transform:
pic = self.transform(pic)
labels = [self.mapper.index(i) for i in image[0].split('_')[0]]
for i in range(9 - len(labels)):
labels.insert(0, 0)
labels = torch.as_tensor(labels, dtype=torch.int64)
return pic, labels,len(labels)
def __len__(self):
return len(self.images)
def compute_mean_std_pytorch(dataset, batch_size=8):
"""
使用PyTorch张量高效计算数据集的均值和标准差
"""
import torch
from torchvision import transforms as T # 重命名避免冲突
from tqdm import tqdm
original_transform = dataset.transform
dataset.transform = None
print("使用PyTorch方法计算...")
# 初始化统计变量
mean_accum = 0.0
sq_mean_accum = 0.0
num_pixels = 0
for i in tqdm(range(0, len(dataset), batch_size)):
batch_indices = range(i, min(i + batch_size, len(dataset)))
batch_pixels = []
for idx in batch_indices:
pic, labels, length = dataset[idx]
# 使用明确的模块引用
if isinstance(pic, Image.Image):
img_tensor = T.ToTensor()(pic) # 使用 T 而不是 transforms
else:
img_tensor = pic if torch.is_tensor(pic) else torch.tensor(pic)
batch_pixels.append(img_tensor)
if batch_pixels:
batch_tensor = torch.stack(batch_pixels)
batch_pixels_count = batch_tensor.numel()
# 累加统计量
pixel_sum = batch_tensor.sum()
pixel_sq_sum = (batch_tensor ** 2).sum()
# 更新全局统计(使用加权平均)
batch_mean = pixel_sum / batch_pixels_count
batch_sq_mean = pixel_sq_sum / batch_pixels_count
mean_accum = (num_pixels * mean_accum + batch_pixels_count * batch_mean) / (num_pixels + batch_pixels_count)
sq_mean_accum = (num_pixels * sq_mean_accum + batch_pixels_count * batch_sq_mean) / (
num_pixels + batch_pixels_count)
num_pixels += batch_pixels_count
# 计算最终标准差:sqrt(E[X²] - (E[X])²)
final_std = torch.sqrt(sq_mean_accum - mean_accum ** 2)
dataset.transform = original_transform
return [mean_accum.item()] * 3, [final_std.item()] * 3
if __name__ == '__main__':
from torchvision import transforms
transforms = transforms.Compose(
[
transforms.ToTensor(),
transforms.Normalize(mean=[0.1307, ], std=[0.3081, ]),
]
)
dataset = LetterDataset(transform=transforms)
#
# # 2. 计算数据集的均值和标准差
# mean, std = compute_mean_std_pytorch(dataset, batch_size=32)
#
# # 3. 打印结果,并用于创建新的、包含标准化的数据加载流程
# print(f"计算得到的均值: {mean}")
# print(f"计算得到的标准差: {std}")import torch
from torch import save
from torchvision import transforms
from torch.utils.data import DataLoader
from torch import nn, optim
from tqdm import tqdm
import test
import numpy as np
from MyDataset import LetterDataset
from model_LSTM import ResNet_LSTM
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
# 模型
batch_size = 2
cate = 17
# 实例化模型
size = (50, 150)
model = ResNet_LSTM(size)
optimizer = optim.Adam(model.parameters())
# 加载以及训练好的模型和优化器继续训练
model = model.to(device)
loss_fn = nn.CTCLoss()
# transforms.Normalize(mean=[0.94326925, 0.94316506, 0.9447249], std=[0.19776213, 0.197599402, 0.19384143]),
transforms = transforms.Compose(
[
transforms.ToTensor(),
transforms.Normalize(mean=[0.8605688810348511, 0.8605688810348511, 0.8605688810348511],
std=[0.22332395613193512, 0.22332395613193512, 0.22332395613193512]),
]
)
mine_train = LetterDataset(root='./picture/train/', transform=transforms)
def train(epoch):
total_loss = []
data_loader = DataLoader(mine_train, batch_size=batch_size, shuffle=True, drop_last=True)
data_loader = tqdm(data_loader, total=len(data_loader))
model.train()
# 三件套
for (img, label, label_length) in data_loader:
img = img.to(device)
label = label.to(device)
# 梯度置0
optimizer.zero_grad()
# 传播
output = model(img) # torch.Size([19, 2, 17])
# print(output.shape) # torch.Size([19, 2, 17])
# print(label.shape) # torch.Size([2, 9])
input_length = torch.IntTensor([output.shape[0]] * output.shape[1])
# 单次优化
loss = loss_fn(output, label, input_length, label_length)
total_loss.append(loss.item())
data_loader.set_description("loss: %.4f" % np.mean(total_loss))
# 反向传播
loss.backward()
# 优化器更新
optimizer.step()
save(model.state_dict(), './models/model.pkl')
save(optimizer.state_dict(), './models/optimizer.pkl')
loss = np.mean(total_loss)
print(f"\n第{epoch}轮epoch, 损失为:\t{loss} ", )
print("=====================================================================")
for i in range(3):
epoch = i + 1
train(epoch)
print(f"\n第{epoch}轮epoch, 成功率:\t {test.test_success(i)}")import itertools
import os
import torch
from torch.utils.data import DataLoader
from tqdm import tqdm
from MyDataset import LetterDataset
from model_LSTM import ResNet_LSTM
batch_size = 2
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
cate = 17
def test_success(t):
total = 0
success = 0
# 实例化模型
size = (50 , 150)
model = ResNet_LSTM(image_shape=size)
model = model.to(device)
if os.path.exists('./models/model.pkl'):
model.load_state_dict(torch.load('./models/model.pkl'))
from torchvision import transforms
transforms = transforms.Compose(
[
transforms.ToTensor(),
transforms.Normalize(mean=[0.8605688810348511, 0.8605688810348511, 0.8605688810348511],
std=[0.22332395613193512, 0.22332395613193512, 0.22332395613193512]),
]
)
mine_train = LetterDataset(root='./picture/test/', transform=transforms)
data_loader = DataLoader(mine_train, batch_size=batch_size, shuffle=True, drop_last=True)
data_loader = tqdm(data_loader, total=len(data_loader))
model.eval()
# 成功率列表
# 三件套
with torch.no_grad():
for (img, label, label_length) in data_loader:
img = img.to(device)
label = label.to(device)
# 获取结果
output = model(img)
output = output.permute(1, 0, 2)
for i in range(output.shape[0]):
output_result = output[i, :, :]
output_result = output_result.max(-1)[-1]
mapping = mine_train.mapper
output_s = [mapping[i[0]] for i in itertools.groupby(output_result.cpu().numpy()) if i[0] != 0]
label_s = [mapping[i] for i in label[i].cpu().numpy() if mapping[i] != '_']
if output_s == label_s:
success += 1
total += 1
return success / float(total)
# print(test_success(1))import itertools
import os
import torch
from PIL import Image
from torchvision import transforms
from model_LSTM import ResNet_LSTM
def predict_single_image(image_path='./test.png'):
"""
单张图片推理函数
Args:
image_path: 要预测的图片路径
Returns:
predicted_text: 预测的文本结果
confidence: 置信度(可选)
"""
# 设备设置
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
# 模型初始化
size = (50, 150)
model = ResNet_LSTM(image_shape=size)
model = model.to(device)
# 加载训练好的模型权重
if os.path.exists('./models/model.pkl'):
model.load_state_dict(torch.load('./models/model.pkl', map_location=device))
# 数据预处理 - 需要与训练时保持一致[7](@ref)
transform = transforms.Compose([
transforms.ToTensor(),
transforms.Normalize(mean=[0.8605688810348511, 0.8605688810348511, 0.8605688810348511],
std=[0.22332395613193512, 0.22332395613193512, 0.22332395613193512]),
])
# 设置模型为评估模式[6](@ref)
model.eval()
# 加载并预处理单张图片[1,5](@ref)
try:
image = Image.open(image_path).convert('RGB')
# 如果需要调整图片大小以匹配模型输入
# image = image.resize((150, 50)) # 根据实际情况调整
input_tensor = transform(image).unsqueeze(0) # 添加batch维度[5](@ref)
input_tensor = input_tensor.to(device)
except Exception as e:
print(f"图片加载失败: {e}")
return None
# 推理过程[6,7](@ref)
with torch.no_grad(): # 关闭梯度计算以提升效率
output = model(input_tensor)
output = output.permute(1, 0, 2) # 调整维度顺序
# 解码预测结果[2](@ref)
# 创建mapper实例(需要根据您的实际mapper实现调整)
from MyDataset import LetterDataset
mine_train = LetterDataset(root='./picture/test/', transform=transform)
mapping = mine_train.mapper
predicted_texts = []
for i in range(output.shape[0]):
output_result = output[i, :, :]
output_result = output_result.max(-1)[-1] # 获取最大概率的索引
# 使用groupby合并连续重复的字符,并过滤掉空白符[2](@ref)
output_s = [mapping[i[0]] for i in itertools.groupby(output_result.cpu().numpy()) if i[0] != 0]
predicted_text = ''.join(output_s)
predicted_texts.append(predicted_text)
# 返回第一个预测结果(如果是单张图片,通常只有一个结果)
return predicted_texts[0] if predicted_texts else ""
# 使用示例
if __name__ == "__main__":
result = predict_single_image('./test.png')
print(f"预测结果: {result}")