本文内容为《扩散模型从原理到实战(人民邮电出版社)》代码实践,本机为 macOS arm64 环境。
在 Notebook 中,Cell 是可以单独执行的一段 Python 代码块,类似于多个单独的代码文件,但每个 Cell 在运行时共享变量和内存,例如:
1x = 102y = x + 11print(y)虽然它们属于不同的“文件”,但它们共享同一个 Python 进程,因此 print(y) 不会报错。
在 PyCharm 中创建 conda 解释器,命名为 build_diffusion_model (建议小写+下划线)
在 build_diffusion_model 环境中执行
1python -m pip install -U pip2pip install torch torchvision # 若使用CUDA,需使用selector选择CUDA版本3pip install diffusers matplotlib1python -c "import torch; print(torch.backends.mps.is_available())"导出当前解释器环境以便复现
1conda env export --no-builds > environment_macosx.yml编写 Cell 实现模型的训练管线 mnist_baseline.ipynb
1"""2依赖自检3"""4
5import sys6import importlib.util7
8print(f"{'='*10} 执行依赖自检 {'='*10}")9
10required_packages = [11 ("torch", "PyTorch" ),12 ("torchvision", "TorchVision"),13 ("diffusers", "Diffusers"),14 ("matplotlib", "Matplotlib")15]16
17missing_packages = []18
19for package_name, display_name in required_packages:20 if importlib.util.find_spec(package_name) is None:21 missing_packages.append(package_name)22 else:23 module = __import__(package_name)24 version = getattr(module, '__version__', '未知版本')25 print(f"{display_name}: {version}")26
27if missing_packages:28 print(f"\n【ERROR】缺少以下依赖包: {', '.join(missing_packages)}")29 sys.exit(1)30else:31 try:32 import torch33 import torchvision34 from torch import nn35 from torch.nn import functional as f36 from torch.utils.data import DataLoader37 from diffusers import DDPMScheduler, UNet2DModel38 from matplotlib import pyplot as plt39 except Exception as e:40 print(f"【ERROR】导入发生未知错误")41 sys.exit(1)42
43print(f"{'='*10} 依赖自检完成 {'='*10}")1"""2Device 自检3Apple Silicon:优先 mps,否则 cpu4NVIDIA:优先 cuda,否则 cpu5"""6
7print(f"{'='*10} 执行硬件自检 {'='*10}")8
9try:10 if torch.backends.mps.is_available():11 device_name = "mps"12 if torch.backends.mps.is_built():13 print("【INFO】Use Apple Silicon (MPS)")14 elif torch.cuda.is_available():15 device_name = "cuda"16 print("【INFO】Use NVIDIA")17 else:18 device_name = "cpu"19 print("【INFO】Use CPU")20
21 device = torch.device(device_name)22 x = torch.ones(1).to(device) # 在CPU中创建张量,并将其移动至device,赋值给x23 print(f"{device} 可以使用")24
25except Exception as e:26 print(f"【ERROR】{e} 设备无法使用")27 device = torch.device("cpu")28
29print(f"{'='*10} 硬件自检完成 {'='*10}")1"""2数据集测试3"""4
5# 数据集6dataset = torchvision.datasets.MNIST(7 root="../data/datasets",8 train=True, # 使用训练集,False为测试集9 download=True, # 下载数据集10 transform=torchvision.transforms.ToTensor() # 将图像转换为张量11 )12
13# 为数据集创建数据加载器14dataset_loader = DataLoader(15 dataset, # 要加载的数据对象16 batch_size=16, # 每次迭代加载的样本数量17 shuffle=True # 打乱数据顺序18 )19
20# 从加载器中取出第一批数据21x, y = next(iter(dataset_loader))22print('Input shape:', x.shape)23print('Labels :', y)24plt.imshow(torchvision.utils.make_grid(x)[0], cmap = 'gray') # 以单通道取出所有图像,拼接成大图并用灰度显示编写此管线的目的是测试硬件是否可调用。
在扩散模型中,“退化过程”指的是数据信息逐渐丧失的过程。狭义上理解就是不断向图像中添加高斯噪声的过程,直到图像完全变成各向同性的高斯噪声。
退化过程是扩散模型训练流程的一部分。扩散模型的整个训练流程可以简单归纳为:
1准备数据 → 向数据中添加噪声 → 模型预测加进去了哪些噪声 → 计算与真实误差 → 调整权重模型的产出过程(反向过程)则大致与上面的流程相反。从纯噪声开始,根据学到的经验预测并一步步移除噪声,就得到了产物。
当然,前向和反向过程都涉及复杂的数学原理,这里不展开叙述(我也看不懂)。
前面我们已经准备好了数据集,为了训练模型,我们需要往训练集中人为添加噪声。
为了查看不同噪声程度下的图片,我们可以引入一个变量来手动控制内容“损坏”的程度。
如下方的代码所示。其中 amount 是我们引入的、人为控制的变量,x 为图像张量,noise 为噪声,noise_x 为加了噪声后的张量。
1noised_x = (1-amount) * x + amount * noise可以看出,当 amount = 0 时,noise_x 与原始张量相同;当 amount = 1 时,noise_x 则为纯粹的噪声。
这种方法在数学上叫做线性插值。
当然,amount 的值不是手动填写的,那样训练就太低效了。在深度学习中,我们通常不是一张一张地处理图片,而是成批(Batch)处理,我们通常会为这一批次里的每一张图随机生成一个不同的损坏程度。
x (图像张量) 的形状通常是 [Batch_size, Channels, Height, Width]。
例如在上面的代码中,输出的 Input shape 为 [16, 1, 28, 28],表示一共有 16 张 1 通道的 28x28 图片。
问题在于:如果 amount 是 1 维的随机数,那么它直接乘以 4 维的张量会发生错误。
为了让 amount 能和 x 相乘,我们需要把 amount 从 1 维变成 4 维,形状变为 [ , , , ]。
于是,我们就需要用到 PyTorch 的 .view() 方法来改变张量的形状。
.view(-1, 1, 1, 1) 内有几个参数,其中 -1 表示自动计算一个批次的数量,其余的 1 都表示自动适配图片规格。其实整个方法都是起自动适配的作用。
1"""2添加噪声,并对输出结果进行可视化3"""4
5def corrupt(x, amount):6 """7 根据给定的amount值,向输入张量x中添加噪声,返回添加噪声后的张量8 """9 amount = amount.view(-1, 1, 1, 1) # 调整amount形状以便广播10 noise = torch.rand_like(x) # 生成与x形状相同的噪声11 return x * (1 - amount) + noise * amount12
13# 绘制输入数据14fig, axs = plt.subplots(2, 1, figsize=(12, 5)) # 画布行数,画布列数,画布大小。plt.subplots返回两个方法,第一个是画布对象fig,第二个是子图对象axs15plt.subplots_adjust(hspace=0.4) # 扩大子图间距16axs[0].set_title('Input Images')17axs[0].imshow(torchvision.utils.make_grid(x)[0], cmap='gray') # 以单通道取出所有图像,拼接成大图并用灰度显示18
19# 加入噪声20amount = torch.linspace(0, 1, x.shape[0]) # 在指定的范围内,生成一组等距离的数字,数量与x的Batch_size相同21noised_x = corrupt(x, amount)22
23# 绘制加入噪声后的图像24axs[1].set_title('Corrupted Images (--- amount increases --->)')25axs[1].imshow(torchvision.utils.make_grid(noised_x)[0], cmap='gray')我们可以参考 UNet 神经网络的架构来预测噪声,最终输出剔除噪声后的结果。
UNet 架构预测噪声的流程如下:
下采样分为以下几个步骤:
卷积:一个数学运算动作。指卷积核在输入图像上“滑动”,并进行“乘加运算”的过程。
激活:将卷积生成的线性函数变为分段函数。
池化:下采样的一种方式。最常见的是最大池化,即保留一片区域中的最显眼特征。还有一种操作是平均池化。
设池化窗口为 2x2,采用最大池化,用数字演示就是最大的值:
| 1 | 2 | 3 | 4 |
|---|---|---|---|
| 0 | -1 | 8 | -6 |
| 6 | -3 | 7 | 0 |
| 3 | 2 | -1 | -9 |
池化后,我们可以得到一个新的 2x2 表格:
| 2 | 8 |
|---|---|
| 6 | 7 |
引入池化层可以提高模型的平移不变性,即当输入存在少量平移时,输出的结果不会产生很大影响。
步长卷积:卷积核滚动过程中,会跳过特定步长区域的值。
1class BasicUnet(nn.Module):2 """3 一个简单的UNet网络部署4 """5 def __init__(self, in_channels=1, out_channels=1):6 super().__init__() # 初始化7
8 # 下采样路径,包含三个卷积层9 self.down_layers = torch.nn.ModuleList([10 nn.Conv2d(in_channels, 32, kernel_size=5, padding=2), # 由输入通道数生成32个特征图,卷积核大小为5x5,填充为2以保持尺寸11 nn.Conv2d(32, 64, kernel_size=5, padding=2), # 由32个特征图生成64个特征图12 nn.Conv2d(64, 64, kernel_size=5, padding=2), # 不继续上升通道,防止过拟合13 ])14
15 # 上采样路径,包含三个转置卷积层16 self.up_layers = torch.nn.ModuleList([17 nn.ConvTranspose2d(64, 64, kernel_size=5, padding=2),18 nn.ConvTranspose2d(64, 32, kernel_size=5, padding=2),19 nn.ConvTranspose2d(32, out_channels, kernel_size=5, padding=2),20 ])21 self.act = nn.SiLU() # 激活函数22 self.downscale = nn.MaxPool2d(2) # 下采样使用最大池化法,窗口大小为2x223 self.upscale = nn.Upsample(scale_factor=2) # 上采样使用插值法24
25 def forward(self, x):26 h = [] # 创建一个空列表,保存下采样前的数据供上采样参考,以免上采样时丢失信息27 # 下采样循环 (Encoder)28 for i, l in enumerate(self.down_layers):29 x = self.act(l(x)) # 卷积 -> 激活30 if i < 2: # 前两层卷积层执行以下操作31 h.append(x) # 把当前特征图存入 h 列表32 x = self.downscale(x) # 池化33 # 上采样循环 (Decoder)34 for i, l in enumerate(self.up_layers):35 if i > 0: # 除了第一个上采样层外,其他层都执行以下操作36 x = self.upscale(x) # 插值37 x = x + h.pop() # 与对应的下采样特征图相加(跳跃连接)38 x = self.act(l(x)) # 转置卷积 -> 激活39 return x # 返回预测的噪声需要额外说明的几个点:
卷积核大小通常为奇数。偶数大小的效果不好。
padding 是额外填充的内容
完成一个简单的 UNet 网络后,我们可以验证输出结果的形状与输入的形状是否相同。同时查看整个网络的参数大小。
1"""2验证输入和输出的形状是否相同,并查看 UNet 网络的参数量3"""4
5net = BasicUnet()6x = torch.rand(8, 1, 28, 28) # 生成形状 (8,1,28,28) 的随机张量7net(x).shape # 将随机张量丢进网络,查看输出形状是否与输入相同8print(net(x).shape)9
10sum(p.numel() for p in net.parameters()) # 计算网络的参数量现在给定一个带噪的输入 noisy_x,扩散模型应该输出对原始输入 x 的最佳预测。同时,我们需要通过均方误差比较预测值与真实值。总体流程如下:
1"""2开始训练模型3"""4
5batch_size = 5126
7# 数据加载器8dataset_loader = DataLoader(9 dataset, # 要加载的数据对象10 batch_size = batch_size, # 每次迭代加载的样本数量11 shuffle=True # 打乱数据顺序12 )13
14# 运行周期15num_epochs = 316
17# 创建 UNet 网络18net = BasicUnet().to(device)19
20# 损失函数(均方误差)21loss_fn = nn.MSELoss()22
23# 优化器,根据损失函数结果调整网络权重24optimizer = torch.optim.AdamW(net.parameters(), lr=1e-3) # 学习率:1e-325
26# 记录训练损失27loss_history = []28
29# 训练循环30for epoch in range(num_epochs):31 for x, y in dataset_loader:32 # 加载数据并添加噪声33 x = x.to(device) # 加载数据34 noise_amount = torch.rand(x.shape[0]).to(device) # 为每个样本生成一个随机的噪声数量35 noisy_x = corrupt(x, noise_amount) # 向样本中添加噪声36
37 # 预测的噪声结果38 predicted_noise = net(noisy_x)39
40 # 计算损失41 loss = loss_fn(predicted_noise, x) # 对比预测噪声与原始图像42
43 # 反向传播并更新权重44 optimizer.zero_grad() # 清除之前的梯度45 loss.backward() # 反向传播计算新的梯度46 optimizer.step() # 更新权重47
48 # 记录损失49 loss_history.append(loss.item())50
51 # 输出每个 epoch 的损失均值52 avg_loss = sum(loss_history[-len(dataset_loader):]) / len(dataset_loader)53 print(f"Finished epoch {epoch}. Average loss: {avg_loss:05f}")54
55# 绘制损失曲线56plt.plot(loss_history)57plt.ylim(0, 0.1)接下来可以看看模型预测的结果,对比原始、加噪后的图像是什么样的。
1"""2观察训练结果3"""4
5x, y = next(iter(dataset_loader)) # 从数据集中取出一批数据6x = x[:8] # 取出前8个样本7
8amount = torch.linspace(0, 1, x.shape[0]) # 生成一组等距离的噪声数量9noised_x = corrupt(x, amount) # 向样本中添加噪10
11# 得到模型预测结果12with torch.no_grad(): # 在评估模式下,不计算梯度13 predicted_noise = net(noised_x.to(device)).cpu() # 预测噪声,将结果移回CPU(NumPy无法绘制GPU数据)14
15# 绘制结果16fig, axs = plt.subplots(3, 1, figsize=(12, 7))17axs[0].set_title('Input Images')18axs[0].imshow(torchvision.utils.make_grid(x)[0].clip(0, 1), cmap='gray')19axs[1].set_title('Corrupted Images (--- amount increases --->)')20axs[1].imshow(torchvision.utils.make_grid(noised_x)[0].clip(0, 1), cmap='gray')21axs[2].set_title('Predicted Noise')22axs[2].imshow(torchvision.utils.make_grid(predicted_noise)[0].clip(0, 1), cmap='gray')如上图所示,尽管模型在噪声量较少时的预测结果不错,但在噪声数量较高时结果几乎不准确。
我们可以通过拆解采样步骤来提高预测质量:例如将预测 20% 后的输出作为下一次预测的输入。
1"""2拆解采样步骤3"""4
5step = 56x = torch.rand(8, 1, 28, 28).to(device) # 随机初始化一个图像张量7step_history = [x.detach().cpu()] # 每个步骤的图像8predicted_output = [] # 每个步骤的预测输出9
10for i in range(step):11 with torch.no_grad():12 predicted_image = net(x) # 预测噪声13 predicted_output.append(predicted_image.detach().cpu()) # 记录预测输出14
15 mix_factor = 1/(step - i) # 朝预测方向移动的步骤16 x = x * (1 - mix_factor) + predicted_image * mix_factor # 更新图像17 step_history.append(x.detach().cpu()) # 记录当前步骤的图像18
19# 绘制每个步骤的图像和预测输出20fig, axs = plt.subplots(step, 2, figsize=(9, 4), sharex=True)21axs[0, 0].set_title('Input Image')22axs[0, 1].set_title('Predicted Noise')23for i in range(step):24 axs[i, 0].imshow(torchvision.utils.make_grid(step_history[i])[0].clip(0, 1), cmap='gray')25 axs[i, 1].imshow(torchvision.utils.make_grid(predicted_output[i])[0].clip(0, 1), cmap='gray')下面我们使用另一种神经网络架构:UNet2DModel 来训练模型。
UNet2DModel 使用了 DDPM 算法,同时引入了注意力机制,其架构和性能比 UNet 更加先进。
由于引入了两层注意力算子,UNet2DModel 的训练时间很长,我这里调整了一下代码,尝试使用 Metal API 加速(我也不知道有没有用)。
同样是 3 轮 epoch,训练完的损失率比经典 UNet 还高,这是怎么会事呢?
需要先注册 Hugging Face 并获取 Access Token,便于上传模型。
1"""2依赖自检3"""4
5import sys6import os7from pathlib import Path8
9project_root = Path.cwd().parent10sys.path.insert(0, str(project_root / 'src'))11
12env_path = project_root / '.env'13if env_path.exists():14 for line in env_path.read_text().splitlines():15 line = line.strip()16 if not line or line.startswith('#') or '=' not in line:17 continue18 key, value = line.split('=', 1)19 value = value.strip().strip("'").strip('\"')20 os.environ.setdefault(key.strip(), value)21
22from diffusion.env import ensure_dependencies23
24ensure_dependencies()25
26import torch27from torch.nn import functional as f28from torchvision import transforms29from datasets import load_dataset30
31"""32Device 自检33"""34
35from diffusion.env import select_device36
37device = select_device(torch)38
39"""40加载数据集41"""42from diffusion.data import create_dataloader43from diffusion.hf import login_hf44
45login_hf()46
47dataset = load_dataset('huggan/smithsonian_butterflies_subset', split='train')48
49image_size = 3250batch_size = 6451
52# 图像预处理53preprocess = transforms.Compose(54 [55 transforms.Resize((image_size, image_size)), # 统一图像大小(宽x高)56 transforms.RandomHorizontalFlip(), # 随机水平翻转图像57 transforms.ToTensor(),58 transforms.Normalize([0.5], [0.5]), # 归一化到 [-1, 1]59 ]60)61
62def transform(examples):63 """对数据集中的图像进行预处理"""64 images = [preprocess(image.convert("RGB")) for image in examples['image']]65 return {'images': images}66
67# 动态函数,获取数据集内容时,对数据集进行转换68dataset.set_transform(transform)69
70train_dataloader = create_dataloader(71 dataset,72 batch_size=batch_size73)74
75"""76可视化图像数据77"""78
79from PIL import Image80from diffusion.visualize import show_images81
82xbatch = next(iter(train_dataloader))['images'].to(device)[:8]83print(f'批量图像张量形状: {xbatch.shape}') # torch.Size([8, 3, 32, 32])84show_images(xbatch).resize((8 * 64, 64), resample=Image.NEAREST)在模型的训练阶段中,我们需要获取这些输入图像并为它们添加噪声,然后将“带噪”的图像输入模型;在推理阶段,我们将使用模型的预测结果逐步消除这些噪声。
在扩散模型中,这两个步骤是由调度器(scheduler)处理的。
下面为图像添加噪声:
1"""2为图像添加噪声3"""4
5from diffusers import DDPMScheduler6
7noise_scheduler = DDPMScheduler(8 num_train_timesteps=10009)10
11timesteps = torch.linspace(0, 999, 8).long().to(device) # 8 个时间步12noise = torch.randn_like(xbatch) # 生成随机噪声13noisy_xbatch = noise_scheduler.add_noise(xbatch, noise, timesteps) # 添加噪声14print(f'添加噪声后的图像张量形状: {noisy_xbatch.shape}') # torch.Size([8, 3, 32, 32])15show_images(noisy_xbatch).resize((8 * 64, 64), resample=Image.NEAREST)下一步应该是定义模型了。前面我们已经学习过 UNet 模型的基本结构。
1"""2创建扩散模型3"""4
5from diffusers import UNet2DModel6
7model = UNet2DModel(8 sample_size=image_size, # 输入图像的大小(宽和高)9 in_channels=3, # 输入图像的通道数(RGB 图像为 3)10 out_channels=3, # 输出图像的通道数11 layers_per_block=2, # 每个块中的层 ResNet 层数12 block_out_channels=(64, 128, 128, 256), # 每个块的输出通道数13 down_block_types=( # 下采样块类型14 "DownBlock2D",15 "DownBlock2D",16 "AttnDownBlock2D",17 "AttnDownBlock2D"18 ),19 up_block_types=( # 上采样块类型20 "AttnUpBlock2D",21 "AttnUpBlock2D",22 "UpBlock2D",23 "UpBlock2D"24 )25).to(device)1"""2创建训练循环3"""4
5import numpy as np6from matplotlib import pyplot as plt7
8noise_scheduler = DDPMScheduler(9 num_train_timesteps=1000,10 beta_schedule="squaredcos_cap_v2"11)12
13optimizer = torch.optim.AdamW(model.parameters(), lr=1e-4)14
15losses = []16
17num_epochs = 5018
19for epoch in range(num_epochs):20 for step, batch in enumerate(train_dataloader):21 clean_images = batch['images'].to(device)22 # 1. 生成噪声23 noise = torch.randn(clean_images.shape).to(clean_images.device)24 bsz = clean_images.shape[0]25 # 2. 为每张图像随机选择时间步26 timesteps = torch.randint(27 0, # 最小时间步28 noise_scheduler.num_train_timesteps, # 最大时间步29 (bsz,), # 生成 bsz 个时间步30 device=clean_images.device31 ).long()32 # 3. 根据每个时间步的噪声大小,添加噪声33 noisy_images = noise_scheduler.add_noise(34 clean_images,35 noise,36 timesteps37 )38 # 4. 预测噪声39 noise_pred = model(40 noisy_images,41 timesteps,42 return_dict=False43 )[0]44 # 5. 计算损失45 loss = f.mse_loss(noise_pred, noise)46 loss.backward(loss)47 losses.append(loss.item())48 # 6. 优化模型参数49 optimizer.step()50 optimizer.zero_grad()51 # 每 5 个周期打印一次损失52 if (epoch + 1) % 5 == 0:53 loss_last_epoch = sum(losses[-len(train_dataloader):]) / len(train_dataloader)54 print(f'Epoch {epoch+1}, Loss: {loss_last_epoch:.4f}')55
56# 绘制损失曲线57fig, axs = plt.subplots(1, 2, figsize=(12, 4))58axs[0].plot(losses)59axs[1].plot(np.log(losses))60plt.show()1"""2使用模型生成图像3"""4
5from diffusers import DDPMPipeline6
7# 创建图像生成管线8image_pipeline = DDPMPipeline(9 unet=model,10 scheduler=noise_scheduler11)12
13# 生成图像14pipeline_output = image_pipeline()15pipeline_output.images[0]生成的图像如下:
1"""2保存模型和管线3"""4
5image_pipeline.save_pretrained("../models/generate_butterflies")从上面的体验中可以得知,从头开始训练一个扩散模型需要很长的时间,且数据量也可能非常庞大。
对于这种情况,我们可以从一个已经训练过的模型开始训练——当你的新数据和原始模型的训练数据相似时。这种方法叫“微调”。例如若要生成卡通人脸,可以使用在人脸数据集上训练过的模型进行微调。
未施加生成条件的模型一般无法对生成的内容进行控制(例如控制图像的整体色调)。因此我们可以训练一个条件模型,使其接收额外输入,以此控制生成过程。对于没有生成条件的模型,可以使用引导函数。
将条件信息输入模型的方法有很多种:
将条件信息作为额外的通道输入
把条件信息直接和图像“捆”在一起。如果原图是 RGB 3 通道,条件图是 1通道,就把它们拼成一个 4 通道的输入送进模型第一层。这种方法受条件信息的影响很大,主要用于图生图、图像修复、超分辨率等任务。
特征融合
将条件转化成一个长向量,经变换后直接加到 UNet 中间每一层的特征图上。由于其为全局性指令,这种方法只能影响全局的风格、亮度和色调。
交叉注意力机制
上面已经解释过了。这种方法主要用于文生图。
如果这篇文章对你有帮助,欢迎分享给更多人!