0%

你好呀!

这里记录我的学习经历和内容

希望能够帮到你们

: )

我们已经学习了虚拟内存的作用和虚拟内存的基本使用过程,为了进一步的深入的理解虚拟内存的机制,我们需要要深入理解虚拟内存的基本原理。

地址翻译

现在我们将从底层除出发,理解硬件在虚拟内存中的角色。为了简化之后的说明,这里提前展示我们所要用到的符号以参考:

image.png

实际上,地址翻译就是一个N元素的虚拟地址空间(VAS)中的元素和一个M元素的物理地址空间(PAS)中元素的映射: $$ \begin{align*} MAP&:VAS \to PAS \cup \varnothing \\ MAP(A) &= \begin{cases} A' \text{ 如果虚拟地址A处的数据在PAS的物理地址A'处} \\ \varnothing \text{ 如果虚拟地址A处的数据不在物理内存中} \end{cases} \end{align*} $$ 下图演示了MMU如何利用页表来实现这种映射:

image.png

控制寄存器——页表基址寄存器(PTBR)指向当前页表。对于n位的虚拟地址,包含两个部分:

  • 一个p位的虚拟页面偏移(VPO)
  • 一个n-p位的虚拟页号(VPN),作为页表的索引

MMU通过VPN选择对应的PTE,同时PTE中的内容实际上就是物理页号(PPN),这样就可以通过虚拟页号访问到对应的物理页。至于偏移量,由于虚拟页和物理页的大小相同,所以虚拟页偏移量和物理页偏移量(PPO)是是相同的。

现在我们可以分析CPU硬件的执行步骤了:

当页面命中时:

image.png
  1. 处理器生成一个虚拟地址VA并发送到MMU中
  2. MMU生成PTE地址(将VA拆分成VPN和VPO,VPN作为PTE地址),从内存中请求它
  3. 内存向MMU返回PTE
  4. MMU构造物理地址(返回的PTE就是PPN,物理地址=PPN+(PPO=VPO))发送给内存
  5. 内存通过物理地址找到数据字节返回给CPU

当缺页时:

image.png
  1. 前三步同上
  2. 返回的PTE有效位为0,MMU触发异常,CPU控制传递到内核中的缺页异常处理程序
  3. 缺页处理程序调入新的页面,并更新内存中的PTE
  4. 缺页处理程序返回原来的进程,再次执行导致缺页的指令。此时页面命中

结合高速缓存和虚拟内存

我们的系统一般既有虚拟内存的机制又有SRAM高速缓存,那么对于SRAM高速缓存,我们是应该使用物理地址访问还是虚拟地址访问呢?下图是一个将其结合起来的方案:

image.png

我们使用物理寻址的方案,先将VA转换为PTEA然后在L1中检索,如果命中就直接返回PTE,未命中则再取一次。拿到PTE后构造出PA再对L1进行检索,如果命中就返回PA,未命中就再取一次。L1的存在可以帮我们节省数据传送的时间。页表条目本质上也是数据字,所以也可以被缓存在L1中。

利用TLB加速地址翻译

CPU每次产生一个虚拟地址都需要查阅一个PTE,以便于将虚拟地址进行翻译。每次从内存取数据都需要花费几十到几百的周期,为了解决这个问题,我们引入了高速缓存,在命中的情况下,将开销降到了1到2周期。为了尽可能的减少开销,我们在MMU中引入了一个关于PTE的小缓存,即翻译后备缓冲器(TLB)

TLB是一个小的缓存,其每一行都保存有一个由单个PTE组成的块。其结构如下:

image.png

用于组选择和行匹配的索引和标记字段都是从VPN提取出来的。如果TLB有T=2^t个组,那么TLB索引(TLBI)是由VPN的t个最低位组成的,而TLB标记(TLBT)是由VPN中剩余的位组成的(用来行匹配)。

下图展示了TLB的工作过程:

image.png

当TLB命中时:

  • CPU产生VA
  • MMU从TLB中取出相应的PTE
  • MMU构造物理地址,发送到缓存
  • 缓存返回数据字到CPU

当不命中时,MMU必须从L1中取出相应的PTE。新取出的PTE会覆盖TLB中的一个条目。

由于所有的地址翻译步骤都是在MMU中完成的,所以速度很快。

多级页表

到此为止,我们一直假设系统只使用一个单独的页表来进行地址翻译。如果我们有一个32位的地址空间、4KB的页面和一个4字节的PTE,那么即使我们只使用虚拟空间中很小的一部分,我们也需要一个4MB的页表驻留在内存中。对于64位的地址空间,这个问题更加明显,我们甚至需要4PB的页表常驻内存,这荒谬。为了解决这个问题,我们引入多级页表,用过层次结构来压缩。

我们以下图为例:

image.png

对于32位的地址空间,我们有4GB的地址空间,空间被分为4KB的页,每个页有一个4字节的页表条目。假设此时,虚拟空间有以下形式:内存的前2K个页面被分配给了代码和数据,接下来的6K个页面没有被分配,再接下里的1K个页面中有1023个未分配的页和一个分配作为栈的页面。

我们用一级页表中的每个PTE负责映射虚拟空间中一个4MB的片,这里的每一片都是由1024个连续的页面组成的。所以我们只需要1024个一级页表条目就可以指向4G大小的片。

如果片i中的每个页面都没有被分配,那么对应的一级页表条目i就是空的。反之,如果片i中至少有一个页是分配了的,那么一级PTEi就需要指向有一个二级页表的基址。每个二级页表的结构和一级页表都是一样的。

这种方法减少了内存的需求:

  • 如果一级页表中的一个PTE是空的,那么对应的二级页表都不需要要存在,这样极大的节省了内存空间,因为虚拟地址空间大部分时候都是未分配的
  • 只有一级页表才需要总是在主存中,VM系统在需要时创建、页面调入或调出二级表,进一步减少了主存的压力,只有常用的二级表才需要缓存在主存中

对于一个k级的页表层次结构的地址翻译。虚拟地址被分隔成k个VPN和一个VPO:

image.png

每个VPNi都是一个到第i个级页表的索引,第j级的每个页表中都指向第j+1级的某个页表的基址。第k级页表中的每个PTE包含某个物理页面的PPN,或一个磁盘块的地址。最终构造物理地址。

为了构造物理地址,MMU必须访问k个PTE。看上去开销很大,实际上TLB在这里会起到重要的作用,通过将不同层次上页表的PTE缓存起来,效率很不错。

端到端的地址翻译

讲了很多原理和过程,只有自己动手实践才是最真实的,我们用下面的的环境——在一个有TLB和L1 d-cache的系统上:

  • 内存是按字节寻址的
  • 内存访问是针对1字节的字
  • 虚拟地址是14为长的(n=14)
  • 物理地址是12位长的(m=12)
  • 页面大小为64字节(P=64)
  • TLB是四路组相联的,共有16个条目
  • L1 d-cache是物理寻址、直接映射的,行大小4个字节16个组
image.png

然后是对于TLB和高速缓存,访问这些设备时,我们通过以下方法对位进行划分:

  • TLB 因为TLB有四个组,所以VPN的低2位用来做TLBI,高6位作为TLBT
image.png
  • 页表 这是一个单级设计,一共有256个页面,这里我们只关注前16个。为了方便,我们直接令VPN来标识PTE
image.png
  • 高速缓存 每个块都是4字节,所以用低2位作为块偏移(CO)。因为有16组,所以接下来的4位做组索引(CI)。剩下的6位做标记(CT)
image.png

现在,假设CPU执行一条读地址0x3d4处字节的加载指令会发生什么?

首先MMU会对VA进行解析,并在TLB中查找是否有缓存的PTE:

image.png

我们根据索引找到,获取了TLB中缓存的PPN0x0D,从而和VPO构造出物理地址0x0354。现在MMU将物理地址发送到缓存。我们对地址进行解析,查看L1是否缓存了我们需要的数据:

image.png

我们根据标记,行索引,块偏移,得到了数据0x36,并将其返回到MMU,随后MMU又将数据返回到CPU。至此,我们就完成了一次虚拟内存的使用。

当然以上的演示,都是理想状态下简化的情况。实际上我们可能会遇到不命中的问题。如果TLB不命中,那么MMU必须从页表PTE中取出PPN。如果得到的PTE是无效的,那么就产生缺页,那么就调用缺页异常处理程序;如果是有效的,但是缓存不命中。那么则又需要进行取用…

总是听到诸如页表,分页机制一类的词汇,听起来让人感到十分的复杂。实际上这些都是涉及到虚拟内存相关的只是。一直以来对这一部分的知识都是望而生畏,现在好好来理解一下它:

物理和虚拟寻址

计算机系统的主存被组织成一个由M个连续的字节大小的单元组成的数组。每个字节都有一个唯一的物理地址,物理地址从0开始依次设置。这是最简单自然的结构,我们把CPU以这个结构用来访问地址的方式称为物理寻址。早期的计算机和一些数字处理器和嵌入式设备仍然使用这种方式,

现代处理器则是使用一种被称为虚拟寻址的方式来进行寻址。使用虚拟寻址,CPU会生成一个虚拟地址(VA)来访问主存,VA被送到内存之前会先转换为适当的物理地址,这个过程叫做地址翻译。有一个专门的硬件单元来完成这个任务——内存管理单元(MMU)。原理是利用存放在主存中的查询表来动态翻译虚拟地址,这个表中的内容由操作系统管理。

地址空间

地址空间是非负整数地址的有序集合。如果地址空间中的整数时连续的,我们说它是一个线性地址空间,我们假定之后用到的所有地址空间都是线性的。在一个带虚拟内存的系统中,CPU从有一个有N=2^n个地址的地址空间中生成虚拟地址,则这个地址口空间称为虚拟地址空间

一个地址空间的大小是由表示最大地址所需要的位数来描述的。对于一个包含N=2^n个地址的虚拟地址空间,我们可以将其叫做为一个n位的地址空间。一个系统还带有一个物理地址空间,对应于系统中的物理内存的M个字节。

地址空间的概念实际上区分了两个概念:

  • 数据对象(字节)
  • 属性(地址)

所以我们应该意识到数据对象实际上可以有多个地址,只不过每一个地址都选自一个不同的地址空间,这就是我们虚拟空间所用到的概念。例如主存中的每一个字节都有有一个选自虚拟地址空间的虚拟地址和一个选自物理地址空间的物理地址。

虚拟内存作为缓存工具

虚拟地址实际上就是一个由存放在磁盘上的N个连续的字节大小的单元组成数组,每一个字节都有着一个对应的虚拟地址,作为对这个数组的索引。磁盘上数组的内容被缓存在主存中。和其他的缓存一样,磁盘上的数据被分隔成块,这些快作为磁盘和主存之间的传输单元。

VM系统将虚拟内存分割为虚拟页的大小固定的块,每个虚拟页的大小为P=2^p字节。物理内存也被分隔为同样大小的物理页,也称页帧。

在任意时刻,虚拟页处于以下中的一种状态:

  • 未分配:VM系统还没有创建的页。未分配的块不会有任何数据关联,不占用磁盘空间
  • 已缓存:当前已缓存在物理内存中的已分配页
  • 已分配:未缓存在物理内存中的已分配页

页表

在这里需要用到DRAM和SRAM的关系,可以查看存储器层次架构进行回顾。

和任何缓存一样,VM系统需要要一种方法来判定一个虚拟页是否被缓存在物理内存中的某个地方。如果命中,怎么确定这个虚拟页被存放在那个物理页中。如果不命中,系统需要判断虚拟页存放在磁盘的哪个位置,并在物理内存中选择有一个牺牲页,将虚拟页复制到这里,替换这个牺牲页。

通过操作系统软件、MMU和存放在物理内存中的页表,软硬联合,从而将虚拟页映射到物理页。每次地址翻译硬件将一个虚拟地址转换为一个物理地址的时候,都会读取页表。操作系统则负责维护页表中的内容,在磁盘和主存间来回传送页。页表的结构大致如下:

image.png

我们认识一下页表的基本数据结构,页表就是一个页表条目(PTE)的数组。虚拟地址空间中的每个页在页表中一个固定的偏移量处都有一个PTE(也就是说PTE的大小是固定的)。根据上面的这个简化模型,每个PTE实际上是由一个有效位和一个n位地址字段组成的:

  • 有效位表明该虚拟也当前是否被缓存在主存中。
  • n位地址字段,在有效位被设置的情况下,表示主存中相应的物理页的起始位置,这个物理页中缓存了该虚拟页。如果没有设置有效位,那么这个地址指向该虚拟页在磁盘中的起始位置。

在上图中我们就可以看到虚拟页的三种状态:未分配、未缓存、已缓存。

页命中

当CPU想要读取包含在VP2中的虚拟内存的一个字时,地址翻译硬件会将虚拟地址作为一个索引来定位PTE2,然后再页表(内存)中读取它。因为设置了有效位,地址翻译硬件就会知道VP2被缓存在内存中,然后就会使用PTE中存储的物理内存地址,构造出这个字的物理地址。

image.png

缺页

缺页实际上就是缓存不命中,同上图。CPU引用了VP3中的一个字,地址翻译硬件根据有效位发现VP3并没有被缓存在内存中,于是触发一个缺页异常。这个异常调用内核中的缺页异常处理程序,该程序会选择一个牺牲页。程序将牺牲页复制回硬盘中,并将VP3覆盖牺牲页。并修改页表中它们的状态。然后返回,并将导致缺页的虚拟地址重新发送给地址翻译硬件,此时页命中,可以被正确处理:

image.png

这个在磁盘和内存之间传送页的活动叫做页面调度,仅在不命中的情况下才进行调度的策略是按需页面调度,我们之后都会使用这个策略。

分配页面

image.png

这个过程展示了当操作系统分配一个新的虚拟内存页时,对我们的示例页表产生的影响。在这个过程中,系统在磁盘上创建了一个空间并更新PTE5,使它指向磁盘上这个新创建的页面。

局部性分析

对于虚拟内存的策略,我们可能会认为这是一个效率极低的方案,因为它的不命中惩罚很大。但是实际上,它有着良好的局部性。局部性保证了,在任意时刻中,程序将趋于一个较小的活动页面上工作,例如空间局部性,较大的页空间确保了很好的空间局部性,因为对于数据结构,程序是按序访问的;对于时间局部性,一段内存往往会被反复利用,所以有着良好的时间局部性。

当然如果出现了工作集大小超出内存大小的情况时,程序可能会发生抖动,页面会不停的换进换出,带来严重的不命中开销。

虚拟内存作为内存管理的工具

虚拟内存不仅有着很好的缓存性能,同时它也很好的简化了内存管理,为我们提供了一个很好的内存保护机制。

实际上,操作系统为每个进程提供了一个独立的页表,也就是一个独立的虚拟空间,下图很好的展示了这一点:

image.png

注意,这里可以看到多个虚拟页面实际上是可以映射到同一个共享物理页面上。

通过按需页面调度和独立的虚拟地址空间的结合,系统对内存的使用和管理被极大的简化,VM系统简化了链接和加载、代码和数据共享、以及应用程序的内存分配…

简化链接

独立的地址空间也允许每个进程的内存映像使用相同的基本格式,而不用考虑代码和数据实际上被存储在哪里。这样的一致性简化了链接器的设计和实现,允许链接器生成完全链接的可执行文件,这些可执行文件是独立于物理内存中代码和数据的最终位置的。

image.png

简化加载

虚拟内存简化了向内存中加载可执行文件和共享对象文件的过程。要把目标文件中.text.data节加载到一个新创建的进程中,Linux加载器会为代码段和数据段分配虚拟页,然后将其标为无效的(即未缓存)。而不是将其进行缓存,只有当页被引用到时,虚拟内存会按需调度这些页面。

将一组连续的虚拟页映射到任意一个文件的任意位置的表示法叫做内存映射我们会在之后涉及这些内容。

简化共享

一般而言,每个进程都有自己私有的代码,数据,堆栈等区域,这个其他进程是不共享的。操作系统为每个进程提供页表,将相应的虚拟页映射到不同的物理页面。也就是说,对于不同进程来说,尽管是同一个虚拟地址,但是实际上映射的是不同的物理地址。极大程度上简化了进程间私有的问题。

当然有时候进程间有也需要共享代码和数据,例如每个进程都调用相同的操作系统内核代码,操作系统会将不同进程中适当的虚拟页面映射到相同的物理页面,从而安排多个进程共享这部分代码的一个副本,而不是为每个进程都分配一个副本。

简化内存分配

虚拟内存为用户进程提供了一个简单的分配额外内存的机制。当一个运行在用户进程的程序要求有一个额外的堆空间时,操作系统只需要分配适当的连续的虚拟内存页面,并将其映射到物理内存中的物理页面就行了。通过页表,操作系统也不用分配连续的物理页。使得页面可以随机的分布在物理内存中,提高了碎片空间的可用性。

虚拟内存作为内存保护的工具

操作系统需要有手段来控制对内存系统的访问,不应该允许用户进程对其只读代码段进行修改,也不应该允许它修改内核中的代码和数据结构,不应该允许它读写其他进程的私有内存或是修改和其他进程共享的虚拟原页面。而虚拟内存能够很好的实现这个机制:

当每次CPU生成一个地址时,地址翻译硬件都会读一个PTE,我们可以通过有效位来判断这个页面的状态。我们也可以通过添加额外的许可页来控制对一个虚拟页面内容的访问。

image.png

例如图中的,SUP位表示进程是否必须在内核模式下才能访问此页。READ和WRITE位则控制对原页面的读写访问。不同进程的页表中对同一个页的访问权是不同的,以此可以实现对进程内存访问的控制。

如果一条指令违反了这些许可条件,那么CPU就会触发保护故障,将控制传递给异常处理程序。LInux Shell一般将这个异常报告为Segmentation fault

我们先前实现了卷积神经网络的各层,以及基本的前向传播,现在我们要进一步的完善整个神经网络,通过反向传播实现对权重的更新,从而提高神经网络的准确性。

反向传播

现在我们已经完成了神经网络的前向传播,现在我们需要对每个层进行反向传播以更新权重,来寻来你神经网络。进行反向传播,我们需要注意两点:

  • 在前向传播的阶段,我们需要在每一层换从它需要用于反向传播的数据(如中间值等)。这也反映了,任意反向传播的阶段,都需要有着相应的前向阶段。
  • 在反向传播阶段,每一层都会接受一个梯度,并返回一个梯度。其接受其输出($\frac{\partial L}{\partial out}$)的损失梯度,并返回其输入($\frac{\partial L}{\partial in}$)的损失梯度

我们的训练过程应该是这样的:

1
2
3
4
5
6
7
8
9
10
11
12
13
# 前向传播
out = conv.forward((image / 255) - 0.5)
out = pool.forward(out)
out = softmax.forward(out)

# 初始化梯度
gradient = np.zeros(10)
# ...

# 反向传播
gradient = softmax.backprop(gradient)
gradient = pool.backprop(gradient)
gradient = conv.backprop(gradient)

现在我们将逐步构建我们的反向传播函数:

Softmax层反向传播

我们的损失函数是: $$ \begin{align*} L = -ln(p_c) \end{align*} $$ 所以我们首先要计算的就是对于softmax层反向传播阶段的输入,其中out_s就是softmax层的输出。一个包含了10个概率的向量,我们只在乎正确类别的损失,所以我们的第一个梯度为: $$ \begin{align*} \frac{\partial L}{\partial out_s(i)} = \begin{cases} 0 \space\space\space\space\space \text{ if i!=c} \\ -\frac{1}{p_i} \text{ if i=c} \end{cases} \end{align*} $$ 所以我们正确的初始梯度应该是:

1
2
gradient = np.zeros(10)
gradient[label] = -1 / out[label]

然后我们对softmax层的前向传播阶段进行一个缓存:

1
2
3
4
5
6
7
8
9
10
11
12
13
def forward(self, input):
# 输入体积的形状
self.last_input_shape = input.shape
input = input.flatten()
# 展平后的输入向量
self.last_input = input

totals = np.dot(input,self.weights) + self.biases
# 输出结果(提供给激活函数)
self.last_totals = totals
exp = np.exp(totals)

return exp/np.sum(exp,axis=0)

现在我们可以开始准备softmax层的反向传播了:

计算

我们已经计算出,损失对于激活函数值的梯度,我们现在需要进一步的推导,最终我们希望得到$\frac{\partial L}{\partial input} \frac{\partial L}{\partial w} \frac{\partial L}{\partial b}$

的梯度,以用于对权重的梯度训练。根据链式法则,我们应该有: $$ \begin{align*} \frac{\partial L}{\partial w} &= \frac{\partial L}{\partial out} * \frac{\partial out}{\partial t} * \frac{\partial t}{\partial w} \\ \frac{\partial L}{\partial b} &= \frac{\partial L}{\partial out} * \frac{\partial out}{\partial t} * \frac{\partial t}{\partial b} \\ \frac{\partial L}{\partial input} &= \frac{\partial L}{\partial out} * \frac{\partial out}{\partial t} * \frac{\partial t}{\partial input} \end{align*} $$ 其中 t = w * input + bout则是softmax函数的输出值,我们可以依次求出。对于$\frac{\partial L}{\partial out}$我们有: $$ \begin{align*} out_s(c) &= \frac{e^{t_c}}{\sum_{i}e^{t_i}} = \frac{e^{t_c}}{S} \\ S &= \sum_{i}e^{t_i} \\ \to out_s(c) &= e^{t_c}S^{-1} \end{align*} $$ 现在我们求$\frac{\partial out_s(c)}{\partial t_k}$,需要分别考虑k=ck!=c的情况,我们依次进行求导: $$ \begin{align*} \frac{\partial out_s(c)}{\partial t_k} &= \frac{\partial out_s(c)}{\partial S} *\frac{\partial S}{\partial t_k} \\ &= -e^{t_c}S^{-2}\frac{\partial S}{\partial t_k} \\ &= -e^{t_c}S^{-2}(e^{t_k}) \\ &= \frac{-e^{t_c}e^{t_k}}{S^2} \\ \\ \frac{\partial out_s(c)}{\partial t_c} &= \frac{Se^{t_c}-e^{t_c}\frac{\partial S}{\partial t_c}}{S^2} \\ &= \frac{Se^{t_c}-e^{t_c}e^{t_c}}{S^2} \\ &= \frac{e^{t_c}(S-e^{t_c})}{S^2} \\ \to \frac{\partial out_s(k)}{\partial t} &= \begin{cases} \frac{-e^{t_c}e^{t_k}}{S^2} \space\space\space\space \text{ if k!=c} \\ \frac{e^{t_c}(S-e^{t_c})}{S^2} \text{ if k=c} \end{cases} \end{align*} $$ 最后我们根据公式t = w * input + b得到: $$ \begin{align*} \frac{\partial t}{\partial w}&=input \\ \frac{\partial t}{\partial b}&=1 \\ \frac{\partial t}{\partial input}&=w \end{align*} $$ 现在我们可以用代码实现这个过程了

实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
def backprop(self,d_L_d_out):
# d_L_d_out是这一层的输出梯度,作为参数
# 返回d_L_d_in作为下一层的参数
for i,gradient in enumerate(d_L_d_out):
if gradient == 0:
continue
# e^totals
t_exp = np.exp(self.last_totals)
# S = sum(e^totals)
S = np.sum(t_exp)
# total对out[i]的梯度关系
# 第一次是对所有的梯度进行更新
d_out_d_t = -t_exp[i]*t_exp / (S**2)
# 第二次是只对 =i 的梯度进行更新 从而使第一次的更新只针对 !=i 的梯度
d_out_d_t[i] = t_exp[i]*(S-t_exp[i]) / (S**2)
# 权重对total的梯度关系
d_t_d_w = self.last_input
d_t_d_b = 1
d_t_d_input = self.weights
# total对Loss的梯度关系
d_L_d_t = gradient * d_out_d_t
# 权重对Loss的梯度关系
d_L_d_w = d_t_d_w[np.newaxis].T @ d_L_d_t[np.newaxis]
d_L_d_b = d_L_d_t * d_t_d_b
d_L_d_input = d_t_d_input @ d_L_d_t
# 梯度训练
self.weights -= self.learn_rate * d_L_d_w
self.biases -= self.learn_rate * d_L_d_b
# 返回梯度
return d_L_d_input.reshape(self.last_input_shape)

由于softmax层的输入是一个输入体积,在一开始被我们展平处理了,但是我们返回的梯度也应该是一个同样大小的输入体积,所以我们需要通过reshape确保这层的返回的梯度和原始的输入格式相同。

我们可以测试一下softmax反向传播后的训练效果:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
import numpy as np

from conv import Conv3x3
from maxpool import MaxPool2
from softmax import Softmax
import tensorflow as tf

(x_train, y_train), (x_test, y_test) = tf.keras.datasets.mnist.load_data()

test_images = x_test[:1000]
test_labels = y_test[:1000]

conv = Conv3x3(8)
pool = MaxPool2()
softmax = Softmax(13*13*8,10)

def forward(image,label):
out = conv.forward((image / 255) - 0.5)
out = pool.forward(out)
out = softmax.forward(out)

loss = -np.log(out[label])
acc = 1 if np.argmax(out) == label else 0

return out,loss,acc

def train(image,label):
out, loss, acc = forward(image,label)
gradient = np.zeros(10)
gradient[label] = -1 / out[label]
gradient = softmax.backprop(gradient)

return loss,acc

print("Start!")
loss = 0
num_correct = 0

for i,(im,label) in enumerate(zip(test_images,test_labels)):
_, l, acc = forward(im,label)
if i%100 == 99:
print(
'[Step %d] Past 100 steps :Average Loss %.3f | Accuracy %d%%' %
(i+1,loss/100,num_correct)
)
loss = 0
num_correct = 0
l,acc = train(im,label)
loss += l
num_correct += acc

可以看到准确率有明显的提升,说明我们softmax层的反向传播在很好的进行

1
2
3
4
5
6
7
8
9
10
11
Start!
[Step 100] Past 100 steps :Average Loss 2.112 | Accuracy 24%
[Step 200] Past 100 steps :Average Loss 1.940 | Accuracy 37%
[Step 300] Past 100 steps :Average Loss 1.686 | Accuracy 50%
[Step 400] Past 100 steps :Average Loss 1.606 | Accuracy 51%
[Step 500] Past 100 steps :Average Loss 1.451 | Accuracy 58%
[Step 600] Past 100 steps :Average Loss 1.362 | Accuracy 65%
[Step 700] Past 100 steps :Average Loss 1.264 | Accuracy 66%
[Step 800] Past 100 steps :Average Loss 1.057 | Accuracy 75%
[Step 900] Past 100 steps :Average Loss 0.978 | Accuracy 81%
[Step 1000] Past 100 steps :Average Loss 0.966 | Accuracy 78%

池化层传播

在前向传播的过程中,最大池化层接收一个输入体积,然后通过2x2区域的最大池化,将宽度和高度都减半。而在反向传播中,执行相反的操作:我们将损失梯度的宽度和高度都翻倍,通过将每个梯度值分配到对应的2x2区域的最大值位置:

image.png

每个梯度都被分配到原始最大值的位置,然后将其他梯度设置为0.

为什么是这这样的呢?在一个2x2区域中,由于我们只关注区域内的最大值,所以对于其他的非最大值,我们可以几乎忽略不计,因为它的改变对我们的输出结果没有影响,所以对于非最大像素,我们有$\frac{\partial L}{\partial inputs}=0$。另一方面来看,最大像素的$\frac{\partial output}{\partial input}=1$,这意味着$\frac{\partial L}{\partial output}=\frac{\partial L}{\partial input}$

所以对于这一层的反向传播,我们只需要简单的还原,并且填充梯度值到最大像素区域就行了

实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
def backprop(self,d_L_d_out):
# 这里的self.last_input是前向阶段的数据缓存
d_L_d_input = np.zeros(self.last_input.shape)
for im_region,i,j in self.iterate_regions(self.last_input):
h,w,f = im_region.shape
amax = np.amax(im_region,axis=(0,1))

for i2 in range(h):
for j2 in range(w):
for f2 in range(f):
# 搜寻区域内的最大值并赋梯度值
if im_region[i2,j2,f2] == amax[f2]:
d_L_d_input[i*2+i2, j*2+j2,f2] = d_L_d_out[i,j,f2]
return d_L_d_input

这一部分并没有什么权重用来训练,所以只是一个简单的数据还原。

卷积层反向传播

卷积层的反向传播,我们需要的是卷积层中的滤波器的损失梯度,因为我们需要利用损失梯度来更新我们滤波器的权重,我们现在已经有了$\frac{\partial L}{\partial output}$,我们现在只需要计算$\frac{\partial output}{\partial filters}$,所以我们需要知道,改变一个滤波器的权重会怎么影响到卷积层的输出?

实际上修改滤波器的任意权重都可能会导致滤波器输出的整个图像,下面便是很好的示例:

image.png
image.png

同样的对任何滤波器权重+1都会使输出增加相应图像像素的值,所以输出像素相对于特定滤波器权重的导数应该就是相应的图像元素。我们可以通过数学计算来论证这一点

计算

$$ \begin{align*} out(i,j) &= convolve(image,filiter) \\ &= \sum_{x=0}^3{}\sum_{y=0}^{3}image(i+x,j+y)*filiter(x,y) \\ \to \frac{\partial out(i,j)}{\partial filiter(x,y)} &=image(i+x,i+y) \end{align*} $$

我们将输出的损失梯度引进来,我们就可以获得特定滤波器权重的损失梯度了: $$ \begin{align*} \frac{\partial L}{\partial filiter(x,y)} = \sum_{i}\sum_{j}\frac{\partial L}{\partial out(i,j)} * \frac{\partial out(i,j)}{\partial filter(x,y)} \end{align*} $$ 现在我们可以实现我们卷积层的反向传播了:

实现

1
2
3
4
5
6
7
8
9
def backprop(self, d_L_d_out):
d_L_d_filters = np.zeros(self.filters.shape)

for im_region, i, j in self.iterate_regions(self.last_input):
for f in range(self.num_filters):
d_L_d_filters[f] += d_L_d_out[i, j, f] * im_region

self.filters -= self.learn_rate * d_L_d_filters
return None

现在我们可以对我们的神经网络进行一个完整的训练了,我们可以看到训练的结果如下:

image.png

效果还是非常不错的。

完善

和之前的网络不同,CNN的训练集比较庞大,如果每次启动都要训练遍参数就太麻烦了,所以我们可以再每次训练之后将参数保存下来。下次再要使用就可以直接加载而不用重复训练。所以我们可以编写保存模型:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
import pickle

class ModelSaver:
def __init__(self, model_name='MNIST_CNN'):
self.model_name = model_name

def save(self, conv, pool, softmax):
data = {
'conv_filters': conv.filters,
'softmax_weights': softmax.weights,
'softmax_biases': softmax.biases
}
filename = f'{self.model_name}.pkl'
with open(filename, 'wb') as f:
pickle.dump(data, f)
print(f"保存参数到{filename}")

def load(self, conv, pool, softmax):
filename = f'{self.model_name}.pkl'
try:
with open(filename, 'rb') as f:
data = pickle.load(f)

conv.filters = data['conv_filters']
softmax.weights = data['softmax_weights']
softmax.biases = data['softmax_biases']
print("模型参数加载成功")
return True

except FileNotFoundError:
print("无可用模型参数")
return False

如果我们想要自己尝试手写输入,来测试模型的效果,我们可能希望有个手写板,所以我们可以再写一个手写板的类:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
class DrawingBoard:
def __init__(self, root):
self.root = root
self.root.title("画板")

# 创建一个 Canvas 作为画板
self.canvas = tk.Canvas(root, width=280, height=280, bg='white')
self.canvas.pack()

# 绑定鼠标事件
self.canvas.bind("<B1-Motion>", self.paint)

# 初始化绘图工具
self.image = Image.new("RGB", (280, 280), "white")
self.draw = ImageDraw.Draw(self.image)

# 初始化画笔颜色和宽度
self.brush_color = "black"
self.brush_width = 5

# 添加输出按钮
self.output_button = tk.Button(root, text="输出", command=self.output_and_exit)
self.output_button.pack()

def paint(self, event):
x1, y1 = (event.x - self.brush_width), (event.y - self.brush_width)
x2, y2 = (event.x + self.brush_width), (event.y + self.brush_width)
self.canvas.create_oval(x1, y1, x2, y2, fill=self.brush_color, outline=self.brush_color)
self.draw.ellipse([x1, y1, x2, y2], fill=self.brush_color, outline=self.brush_color)

def process_image(self):
# 将图像调整为 28x28 像素
processed_image = self.image.resize((28, 28), Image.Resampling.LANCZOS)
processed_image = ImageOps.grayscale(processed_image)

# 将图像转换为 NumPy 数组
image_array = np.array(processed_image)

# 确保像素值是整数
image_array = image_array.astype(np.uint8)

return image_array

def output_and_exit(self):
# 处理图像并获取数组
self.image_array = self.process_image()

# 保存图像
processed_image = Image.fromarray(self.image_array)
processed_image.save("temp.png")
print("图片已保存为 temp.png")

# 退出程序
self.root.destroy()

现在我们就可以使用它了,我们先进行训练,然后用保存的参数,来进行手写数字识别,我把整个网络的源代码放在下面:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
import numpy as np

class Conv3x3:
# 使用3x3滤波器的卷积层
def __init__(self, num_filters, learn_rate=0.01):
self.num_filters = num_filters
self.filters = np.random.randn(num_filters, 3, 3) / 9

self.last_input = None
self.learn_rate = learn_rate

def iterate_regions(self, image):
# 返回所有可以卷积的3x3的图像区域
h, w = image.shape
for i in range(h - 2):
for j in range(w - 2):
im_region = image[i:(i + 3), j:(j + 3)]
yield im_region, i, j

def forward(self, input):
# 执行卷积层的前向传播 输出一个26x26x8的三维输出数组
self.last_input = input
h, w = input.shape
output = np.zeros((h - 2, w - 2, self.num_filters))
for im_region, i, j in self.iterate_regions(input):
output[i, j] = np.sum(im_region * self.filters, axis=(1, 2))

return output

def backprop(self, d_L_d_out):
d_L_d_filters = np.zeros(self.filters.shape)

for im_region, i, j in self.iterate_regions(self.last_input):
for f in range(self.num_filters):
d_L_d_filters[f] += d_L_d_out[i, j, f] * im_region

self.filters -= self.learn_rate * d_L_d_filters

return None


class MaxPool2:
# 池化尺寸为2的最大池化层
def __init__(self):
self.last_input = None

def iterate_regions(self, image):
h, w, _ = image.shape
new_h = h // 2
new_w = w // 2

for i in range(new_h):
for j in range(new_w):
im_region = image[i * 2:(i + 1) * 2, j * 2:(j + 1) * 2]
yield im_region, i, j

def forward(self, input):
self.last_input = input
h, w, num_filters = input.shape
output = np.zeros((h // 2, w // 2, num_filters))

for im_region, i, j in self.iterate_regions(input):
output[i, j] = np.amax(im_region, axis=(0, 1))

return output

def backprop(self, d_L_d_out):
d_L_d_input = np.zeros(self.last_input.shape)
for im_region, i, j in self.iterate_regions(self.last_input):
h, w, f = im_region.shape
amax = np.amax(im_region, axis=(0, 1))

for i2 in range(h):
for j2 in range(w):
for f2 in range(f):
if im_region[i2, j2, f2] == amax[f2]:
d_L_d_input[i * 2 + i2, j * 2 + j2, f2] = d_L_d_out[i, j, f2]
return d_L_d_input


class Softmax:
# 全连接softmax激活层
def __init__(self, input_len, nodes, learn_rate=0.01):
self.weights = np.random.randn(input_len, nodes) / nodes
self.biases = np.zeros(nodes)
self.learn_rate = learn_rate

self.last_input_shape = None
self.last_input = None
self.last_totals = None

def forward(self, input):
self.last_input_shape = input.shape
input = input.flatten()
self.last_input = input

totals = np.dot(input, self.weights) + self.biases
self.last_totals = totals
exp = np.exp(totals)

return exp / np.sum(exp, axis=0)

def backprop(self, d_L_d_out):
# d_L_d_out是这一层的输出梯度,作为参数
# 返回d_L_d_in作为下一层的参数
d_L_d_w = np.zeros(self.weights.shape)
d_L_d_b = np.zeros(self.biases.shape)
d_L_d_input = np.zeros(self.last_input.shape)
for i, gradient in enumerate(d_L_d_out):
if gradient == 0:
continue
# e^totals
t_exp = np.exp(self.last_totals)
# S = sum(e^totals)
S = np.sum(t_exp)
# total对out[i]的梯度关系
# 第一次是对所有的梯度进行更新
d_out_d_t = -t_exp[i] * t_exp / (S ** 2)
# 第二次是只对 =i 的梯度进行更新 从而使第一次的更新只针对 !=i 的梯度
d_out_d_t[i] = t_exp[i] * (S - t_exp[i]) / (S ** 2)
# 权重对total的梯度关系
d_t_d_w = self.last_input
d_t_d_b = 1
d_t_d_input = self.weights
# total对Loss的梯度关系
d_L_d_t = gradient * d_out_d_t
# 权重对Loss的梯度关系
d_L_d_w += d_t_d_w[np.newaxis].T @ d_L_d_t[np.newaxis]
d_L_d_b += d_L_d_t * d_t_d_b
d_L_d_input += d_t_d_input @ d_L_d_t
# 梯度训练
self.weights -= self.learn_rate * d_L_d_w
self.biases -= self.learn_rate * d_L_d_b
# 返回梯度
return d_L_d_input.reshape(self.last_input_shape)


import pickle

class ModelSaver:
def __init__(self, model_name='MNIST_CNN'):
self.model_name = model_name

def save(self, conv, pool, softmax):
data = {
'conv_filters': conv.filters,
'softmax_weights': softmax.weights,
'softmax_biases': softmax.biases
}
filename = f'{self.model_name}.pkl'
with open(filename, 'wb') as f:
pickle.dump(data, f)
print(f"保存参数到{filename}")

def load(self, conv, pool, softmax):
filename = f'{self.model_name}.pkl'
try:
with open(filename, 'rb') as f:
data = pickle.load(f)

conv.filters = data['conv_filters']
softmax.weights = data['softmax_weights']
softmax.biases = data['softmax_biases']
print("模型参数加载成功")
return True

except FileNotFoundError:
print("无可用模型参数")
return False


import tkinter as tk
from PIL import Image,ImageDraw,ImageOps

class DrawingBoard:
def __init__(self, root):
self.root = root
self.root.title("画板")

# 创建一个 Canvas 作为画板
self.canvas = tk.Canvas(root, width=280, height=280, bg='white')
self.canvas.pack()

# 绑定鼠标事件
self.canvas.bind("<B1-Motion>", self.paint)

# 初始化绘图工具
self.image = Image.new("RGB", (280, 280), "white")
self.draw = ImageDraw.Draw(self.image)

# 初始化画笔颜色和宽度
self.brush_color = "black"
self.brush_width = 5

# 添加输出按钮
self.output_button = tk.Button(root, text="输出", command=self.output_and_exit)
self.output_button.pack()

def paint(self, event):
x1, y1 = (event.x - self.brush_width), (event.y - self.brush_width)
x2, y2 = (event.x + self.brush_width), (event.y + self.brush_width)
self.canvas.create_oval(x1, y1, x2, y2, fill=self.brush_color, outline=self.brush_color)
self.draw.ellipse([x1, y1, x2, y2], fill=self.brush_color, outline=self.brush_color)

def process_image(self):
# 将图像调整为 28x28 像素
processed_image = self.image.resize((28, 28), Image.Resampling.LANCZOS)
processed_image = ImageOps.grayscale(processed_image)

# 将图像转换为 NumPy 数组
image_array = np.array(processed_image)

# 确保像素值是整数
image_array = image_array.astype(np.uint8)

return image_array

def output_and_exit(self):
# 处理图像并获取数组
self.image_array = self.process_image()

# 保存图像
processed_image = Image.fromarray(self.image_array)
processed_image.save("temp.png")
print("图片已保存为 temp.png")

# 退出程序
self.root.destroy()

上一篇文章中我们学习了循环神经网络,我们现在已经基本理解了神经网络怎么去处理数据/序列。可是对于图片、音频、文件之类的数据,我们该怎么去处理呢?相较于数据、序列,对图片使用传统神经网络会导致更大的开销。其他的数据类型也是同理,所以接下来我们将要认识卷积神经网络

卷积神经网络简介

卷积神经网络的一个经典应用场景是对图像进行分类,可是我们可不可以使用普通的神经网络来实现呢?可以,但是没必要。对于图像数据处理,我们需要面临两个问题:

  • 图像数据很大 假如我们要处理的图像大小是100x100甚至更大。那么构建一个处理100x100的彩色图像的神经网络,我们需要100x100x3 = 30000个输入特征。我们用一个1024个节点的中间层,意味着我们在一层中就要训练30000x1024 = 30720000个权重。这样会导致我们的神经网络十分庞大
  • 图像特征的位置会改变 同一个特征可能是在图像中的不同位置,你可能可以训练出一个对于特定图像表现良好的网络。但是当你对图像进行一定的偏移,可能就会导致结果发生错误的改变

使用传统的神经网络来解决图像问题,无异于是浪费的。它忽视了图像中任意像素与其邻近像素的上下文关系,图像中的物体是由小范围的局部特征组成的,对每个像素都进行分析,是毫无意义的。

所以我们需要使用卷积神经网络来解决这些问题。

目标

这一次我们的目标是实现一个手写数字识别的卷积神经网络,用到的是MNIST的手写数字数据集。也就是给定一个图像,将其分类为一个数字。

image.png

MNIST数据集中的每张图片都是28*28的大小,包含一个居中的灰度数字。我们将根据这个数据集来对神经网络进行训练。

卷积

我们首先要理解卷积神经网络中的卷积是什么意思。卷积实际上是一种加权平均的操作。它的相当于一个滤波器,能够提取原始数据中的某种特定特征。我们往往使用卷积核来进行这个操作。

而神经网络中的卷积层则是根据过滤器实现对局部特征的处理,我们以下面这个操作为例:

对于一个垂直特征的卷积核,我们可以计算出这里的特征值

image.png
image.png

我们们可以通过对图像中的数据进行卷积操作从而实现对局部特征的提取。这就和我们将要用到的卷积核有关了。

卷积核

image.png

这是一个垂直sobel滤波器,通过它对图像进行卷积操作,我们可以提取出图像的垂直特征:

image.png

同样的,我们有对应的水平SObel卷积核,可以提取出图像的水平特征:

image.png
image.png

而Sobel滤波器,我们可以理解成边缘检测器。通过提取手写数字边缘的特征,有利于网络在后续更好的进行图像识别。

填充

对于卷积这一步,我们对一个4x4的输入图像使用一个3x3的滤波器,我们会得到一个2x2的输出图像。如果我们希望输出图像和输入图像保持相同的大小。我们则需要向周围添加0,使得滤波器可以在更多的位置上覆盖

image.png

这种操作,我们称之为相同填充。如果不适用任何填充,我们称之为有效填充。

卷积层的使用

我们现在知道卷积层通过使用一组滤波器将输入图像转换为输出图像的卷积层了。我们将使用一个具有8个滤波器的小卷积层作为我们网络中的起始层,意味着,它将28x28的输入图像转换为26x26x8的输出体积:

image.png

每个卷积层的8个过滤器产生一个26x26的输出,这是因为我们用到的是3x3的卷积核作为我们的滤波器,所以我们需要训练的权重有3x3x8 = 72个权重

实现

现在我们尝试用代码实现一个卷积层:

1
2
3
4
5
6
7
8
import numpy as np

class Conv3x3:
# 使用3x3滤波器的卷积层
def __init__(self,num_filters):
self.num_filters = num_filters
# 这里除以3是为了对权重进行初始化
self.filters = np.random.randn(num_filters,3,3) / 9

我们注意到我们对生成的卷积核中做了一个权重初始化的工作,这是因为:

  • 如果初始权重太大,那么输入数据经过卷积计算之后会变得很大,在反向传播的过程中梯度值也会变得很大,从而导致参数无法收敛,即梯度爆炸
  • 如果初始权重太小,由于激活函数的作用,输入的数据会层层缩小,导致反向传播过程中的梯度值变得绩效。难以实现对权重的有效更新,我们称之为梯度消失

这里我们用到Xavier初始化来解决这个问题,他指出,在保持网络层在初始化时,其输入核和输出的方差应该尽可能的相同。这样信号就可以在网络中稳定的传播。

我们设输入为y输出为x,权重矩阵为W。则有: $$ \begin{align*} Var(W) = \frac{1}{n_{in}} \end{align*} $$ 其中n_in是输入的节点数量,这里就是3x3,所以初始化时需要/9

接下来是实际的卷积部分的实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
class Conv3x3:
# 使用3x3滤波器的卷积层
def __init__(self,num_filters):
self.num_filters = num_filters
self.filters = np.random.randn(num_filters,3,3) / 9

def iterate_regions(self,image):
# 返回所有可以卷积的3x3的图像区域
h,w = image.shape
for i in range(h-2):
for j in range(w-2):
im_region = image[i:(i+3),j:(j+3)]
yield im_region,i,j

def forward(self,input):
# 执行卷积层的前向传播 输出一个26x26x8的三维输出数组
h,w = input.shape
output = np.zeros((h-2,w-2,self.num_filters))
for im_region,i,j in self.iterate_regions(input):
# 这里用到的是numpy中隐藏的广播机制,详情参考numpy
# 这里im_region*self.filters的大小是(8,3,3),求和是对行列求和,所以axis=(1,2)
output[i,j] = np.sum(im_region * self.filters,axis=(1,2))
return output

这里我们很多用法涉及到numpy的一些高级使用,可以在这里参考NumPy现在我们可以检查我们的卷积层是否输出了我们理想的结果:

1
2
3
4
5
6
7
8
from conv import Conv3x3
import tensorflow as tf # 由于MNIST数据集URL地址有问题,所以这里使用keras库

(x_train, y_train), (x_test, y_test) = tf.keras.datasets.mnist.load_data()

conv = Conv3x3(8)
output = conv.forward(x_train[0])
print(output.shape) # (26,26,8)

池化

图像中的相邻元素往往是相似的,所以卷积层输出中,通常相邻元素产生相似的值。结果导致卷积层输出中包含了大量的冗余信息。为了解决这个问题我们需要对数据进行池化

它所做的事情很简单,往往是将输出中的值聚合称为更小的尺寸。池化往往是通过简单的操作,如max,min,average实现的。比如下面就是一个池化大小为2的最大池化操作

image.png

池化将输入的宽度和高度除以池化大小。在我们的卷积神经网络中,我们将在初始卷积层之后放置一个池化大小为2的最大池化层,池化层将26x26x8的输入转化为13x13x8的输出:

image.png

实现

我们现在用代码实现和conv类相似的MaxPool2类:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
import numpy as np
class MaxPool2:
# 池化尺寸为2的最大池化层
def iterate_regions(self,image):
h,w,_ = image.shape
new_h = h // 2
new_w = w // 2
for i in range(new_h):
for j in range(new_w):
im_region = image[i*2:(i+1)*2,j*2:(j+1)*2]
yield im_region,i,j

def forward(self,input):
h,w,num_filters = input.shape
output = np.zeros((h//2,w//2,num_filters))
for im_region,i,j in self.iterate_regions(input):
# 这里im_region的大小是(3,3,8)因此我们只需要对行列求最大,故axis=(0,1)
output[i,j] = np.amax(im_region,axis=(0,1))
return output

这个类和之前实现的Conv3x3类类似,关键在于从一个给定的图像区域中找到最大值,我们使用数组的最大值方法np.amax()来实现。我们来测试一下池化层:

1
2
3
4
5
6
7
8
9
10
11
12
from conv import Conv3x3
from maxpool import MaxPool2
import tensorflow as tf

(x_train, y_train), (x_test, y_test) = tf.keras.datasets.mnist.load_data()

conv = Conv3x3(8)
pool = MaxPool2()

output = conv.forward(x_train[0])
output = pool.forward(output)
print(output.shape) # (13,13,8)

Softmax层

现在我们通过前两层,已经提取出了数字特征,现在我们希望能够赋予其实际预测的能力。对于多分类问题,我们通常使用Softmax层作为最终层——这是一个使用Softmax函数作为激活函数的全连接层(全连接层就是每个节点都与前一层的每个输入相联)

我们将使用一个包含10个节点的Softmax层作为CNN的最后一层,每个节点代表一个数字。层中的每个节点都连接到之前的输出中。在Softmax变化之后,概率最高的数字就是我们的输出。

image.png

交叉熵损失

我们现在既然可以输出最终的预测结果了,它输出的结果是一个概率,用来量化神经网络的对其预测的信心。同样的,我们也需要一种方法来量化每次预测的损失。这里我们使用交叉熵损失来解决这个问题: $$ \begin{align*} L = -ln(p_c) \end{align*} $$ 其中c指的是正确的类别,即正确的数字。而pc代表类别c的预测概率。我们希望损失越低越好,对网络的损失进行量化,有利于后续的神经网络训练。

实现

我们同上步骤,实现一个Softmax层类:

1
2
3
4
5
6
7
8
9
10
11
12
13
import numpy as np
class Softmax:
# 全连接softmax激活层
def __init__(self, input_len, nodes):
self.weights = np.random.randn(input_len,nodes) / nodes
self.biases = np.zeros(nodes)

def forward(self, input):
# 由于输入是一个输入体积,我们用flatten将其展平,变成一个一维的输出向量
input = input.flatten()
totals = np.dot(input,self.weights) + self.biases
exp = np.exp(totals)
return exp/np.sum(exp,axis=0)

现在,我们已经完成了CNN的整个前向传播,我们可以简单的测试一下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
import numpy as np

from conv import Conv3x3
from maxpool import MaxPool2
from softmax import Softmax
import tensorflow as tf

(x_train, y_train), (x_test, y_test) = tf.keras.datasets.mnist.load_data()

test_images = x_test[:1000]
test_labels = y_test[:1000]

conv = Conv3x3(8)
pool = MaxPool2()
softmax = Softmax(13*13*8,10)

def forward(image,label):
out = conv.forward((image / 255) - 0.5)
out = pool.forward(out)
out = softmax.forward(out)

loss = -np.log(out[label])
acc = 1 if np.argmax(out) == label else 0

return out,loss,acc


print("Start!")
loss = 0
num_correct = 0

for i,(im,label) in enumerate(zip(test_images,test_labels)):
_, l, acc = forward(im,label)
loss += l
num_correct += acc

if i%100 == 99:
print(
'[Step %d] Past 100 steps :Average Loss %.3f | Accuracy %d%%' %
(i+1,loss/100,num_correct)
)
loss = 0
num_correct = 0

我们可以得到下面的输出:

1
2
3
4
5
6
7
8
9
10
11
Start!
[Step 100] Past 100 steps :Average Loss 2.566 | Accuracy 13%
[Step 200] Past 100 steps :Average Loss 2.447 | Accuracy 13%
[Step 300] Past 100 steps :Average Loss 2.500 | Accuracy 13%
[Step 400] Past 100 steps :Average Loss 2.520 | Accuracy 10%
[Step 500] Past 100 steps :Average Loss 2.431 | Accuracy 9%
[Step 600] Past 100 steps :Average Loss 2.477 | Accuracy 6%
[Step 700] Past 100 steps :Average Loss 2.491 | Accuracy 11%
[Step 800] Past 100 steps :Average Loss 2.598 | Accuracy 7%
[Step 900] Past 100 steps :Average Loss 2.545 | Accuracy 7%
[Step 1000] Past 100 steps :Average Loss 2.610 | Accuracy 10%

这是因为我们对权重进行了随机初始化,所以现在神经网络的表现更像是随机猜测,所以准确率趋近于10%

今天是8月24号,暑假已然过去了2/3,我打算明天就提前返校。我想对我的暑假做一个总结,这个暑假学了很多东西,相较于平时有更多的集中连续的时间可以用了学习,提升个人的能力。在校期间,我更多是用一些零散的时间来通过做项目来学习。但感觉难以长时间的专注在一个知识点上。始终是项目驱动学习,被动的去了解一些内容,暑假的话有大把的时间,可以去了解自己感兴趣的内容。我这个暑假主要学习了这些内容:

  • 处理器 及其 流水线机制
  • 程序性能优化简单了解
  • 存储器层次结构的认识
  • 简单算法入门 与 少量的刷题
  • 链接编译过程
  • 深度学习简单神经网络的构造 与 原理认识
  • 异常控制流机制
  • Linux shell编程 以及 简单指令的使用

这么一看学到的东西很多,但是大多数都是浅尝辄止,我打算多多接触之后再来明确自己感兴趣的方向。接下来是对我暑假的批评和反思。从7.3暑假开始到8.24今天,一共是52天,自由可支配(除去睡觉吃饭休息)的时间大概有600+个小时,我每次的学习时长可能也就4~5个小时,零零散散学了25天左右。剩下的时间浪费在打游戏,看动漫,看电影,看小说上面。

我暑假前定了很多目标,基本一个都没达到,超级灰心。但是,这也是意料之中的事情,取乎其上、得乎其中。所以我打算早点返校,好好用这最后的几天拯救一下我的暑假。但是怎么评价这个暑假呢,一直以来我都对自己有着很高的期望,但是通过这个暑假,我很好的认识清楚了自己的懒惰性,我也不太喜欢这样,但是自控力还是太差了,我也会好好的反思自己的行为。其次是这个暑假开拓了很多眼界,我意识到了和别人的差距,我总是把目光拘于学校,年级,班级,寝室,甚至是自己。我在网上看到了,认识到了很多优秀的人,感受到了差距,所以我打算继续前行,去学习各种感兴趣的内容。

对于下个学期的,我也是略有想法。下个学期课程十分繁重,但是大多是专业课程。这个是我的优势,专业课程内容我有很好的基础,我打算不跟着学校的计划走,我打算跟着南京大学的课程设计进行学习,计组方面打算跟着它们的PA学习,操作系统打算跟着JYY老师的课程深入了解一下。数据结构与算法,我打算制定一个刷题计划,要保证每天有一定的算法学习时间。还有科研方面,我打算联系LLF老师尝试写出我的第一篇论文。暑假我看着两位学长保研,我一直以为保研就是对绩点要求很高,但是实际上本科期间的科研成果是及其重要的,我这个暑假一直疏于对这方面的学习。下个学期,我打算花大部分的时间在这上面。

总结下来,下个学期的几个主要任务:

  • 利用专业知识的优势,提升绩点成绩,加深对专业知识的掌握理解
  • 提高算法能力,争取在下一学年中能够掌握常见的算法题型
  • 深入学习逆向工程和二进制漏洞审计的,多刷题,强化竞赛能力
  • 掌握最基本的科研能力,要开始入手自己的第一篇论文
  • 学习计算机网络,完善技术栈

不过我还是很期待9月份的到来,一个是9.3有大阅兵,我特别想看。还有一个是丝之歌发售,我特别想玩,我打算一出就开始玩,通宵玩,累了就睡,饿了就吃,醒了就玩。我初步估计可能通关需要20~30小时,也就是[9.3,9.6]好好玩四天。剩下的时间就对暑期的计划做一个收尾。

在上一篇博客中,我们完成了一个简单的前馈神经网络,完成了对根据身高体重对性别进行猜测的神经网络,以及对他的训练。但是我们不该止步于此,接下来我们将尝试编写一个RNN循环神经网络,并认识它背后的原理。

循环神经网络简介

循环神经网络是一种专门用于处理序列的神经网络,因此其对于处理文本方面十分有效。且对于前馈神经网络和卷积神经网络,我们发现:它们都只能处理预定义的尺寸——接受固定大小的输入并产生固定大小的输出。但是循环神经网络可以处理任意长度的序列,并返回。它可以是这样的:

image.png

这种处理序列的方式可以实现很多功能。例如,文本翻译,事件评价… 我们的目标是让它完成对一个评论的判断(是正面的还是负面的)。将待分析的文本输入神经网络然后,然后给出判断。

实现方式

我们考虑一个输入为x0,x1,x2,...,xn,输出为y0,y1,y2,..,yn的多对多循环神经网络。这些xi和yi是向量,可以是任意维度。RNNs通过迭代更新一个隐藏状态h,重复这些步骤:

  • 下一个隐藏状态ht是前一个状态ht-1和下一个输入xt计算得出的
  • 输出yt是由当前的隐藏状态ht计算得出的
image.png

这就是RNNs为什么是循环神经网络的原因,对于上面步骤的每一步中,都使用的是同一个权重。对于一个典型的RNNs,我们只需要使用3组权重就可以计算:

  • Wxh 用于所有xt -> ht的连接
  • Whh 用于所有ht-1 -> ht的连接
  • Why 用于所有ht -> yt的连接

同时我们还需要为两次输出设置偏置:

  • bh 计算ht时的偏置
  • by 计算yt时的偏置

我们将权重表示为矩阵,将偏置表示为向量,从而组合成整个RNNs。我们的输出是: $$ \begin{align*} h_t &= tanh(W_{xh}x_t + W_{hh}h_{t-1}+b_h) \\ y_t &= W_{hy}h_t + b_y \end{align*} $$ 我们使用tanh作为隐藏状态的激活函数,其图像函数如下:

image.png

目标与计划

我们要从头实现一个RNN,执行一个情感分析任务——判断给定的文本是正面消息还是负面的。

这是我们要用的训练集:data

下面是一些训练集的样例:

image.png

由于这是一个分类问题,所以我们使用多对一的循环神经网络,即最终只使用最终的隐藏状态来生成一个输出。每个xi都是一个代表文本中一个单词的向量。输出y是一个二维向量,分别代表正面和负面。我们最终使用softmax将其转换为概率。

image.png

数据集预处理

神经网络无法直接识别单词,我们需要处理数据集,让它变成能被神经网络使用的数据格式。首先我们需要收集一下数据集中所有单词的词汇表:

1
2
vocab = list(set([w for text in train_data.keys() for w in text.split(" ")]))
vocab_size = len(vocab)

vocab是一个包含训练集中出现的所有的单词的列表。接下来,我们将为每一个词汇中的单词都分配一个整数索引,因为神经网络无法理解单词,所以我们要创造一个单词和整数索引的关系:

1
2
word_to_idx = {w:i for i,w in enumerate(vocab)}
idx_to_word = {i:w for i,w in enumerate(vocab)}

我们还要注意循环神经网络接收的每个输入都是一个向量xi,我们需要使用one-hot编码,将我们的每一个输入都转换成一个向量。对于一个one-hot向量,每个词汇对应于一个唯一的向量,这种向量出了一个位置外,其他位置都是0,在这里我们将每个one-hot向量中的1的位置,对应于单词的整数索引位置。

也就是说,我们的词汇表中有n个单词,我们的每个输入xi就应该是一个n维的one-hot向量。我们写一个函数,以用来创建向量输入,将其作为神经网络的输入:

1
2
3
4
5
6
7
def createInputs(text):
inputs = []
for w in text.split(" "):
v = np.zeros((vocab_size,1)) # 创建一个vocab_size*1的全零向量
v[word_to_idx[w]] = 1
inputs.append(v)
return inputs

向前传播

现在我们开始实现我们的RNN,我们先初始化我们所需的3个权重和2个偏置:

1
2
3
4
5
6
7
8
9
10
from numpy.random import randn	# 正态分布随机函数
class RNN:
def __init__(self, input_size, output_size, hidden_size=64):
# weights
self.Whh = randn(hidden_size,hidden_size) / 1000
self.Wxh = randn(hidden_size,input_size) / 1000
self.Why = randn(output_size,hidden_size) / 1000
# biases
self.bh = np.zeros((hidden_size,1))
self.by = np.zeros((output_size,1))

我们通过np.random.randn()从标准正态分布中初始化我们的权重。接下来我们将根据公式: $$ \begin{align*} h_t &= tanh(W_{xh}x_t + W_{hh}h_{t-1}+b_h) \\ y_t &= W_{hy}h_t + b_y \end{align*} $$ 实现我们的向前传播函数:

1
2
3
4
5
6
def forward(self,inputs):
h = np.zeros((self.Whh.shape[0],1)) # 在刚开始我们的h是零向量,在此之前没有先前的h
for i,x in enumerate(inputs):
h = np.tanh(self.Wxh @ x + self.Whh @ h + self.bh) # @是numpy中的矩阵乘法符号
y = self.Why @ y + self.by
return y,h

现在我们的RNNs神经网络已经可以运行了:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
from data import *
import numpy as np
from numpy.random import randn

def createInputs(text):
inputs = []
for w in text.split(" "):
v = np.zeros((vocab_size,1))
v[word_to_idx[w]] = 1
inputs.append(v)
return inputs

def softmax(x):
return np.exp(x) / sum(np.exp(x))

class RNN:
def __init__(self, input_size, output_size, hidden_size=64):
# weights
self.Whh = randn(hidden_size,hidden_size) / 1000
self.Wxh = randn(hidden_size,input_size) / 1000
self.Why = randn(output_size,hidden_size) / 1000
# biases
self.bh = np.zeros((hidden_size,1))
self.by = np.zeros((output_size,1))

def forward(self,inputs):
h = np.zeros((self.Whh.shape[0],1))
for i,x in enumerate(inputs):
h = np.tanh(self.Wxh @ x + self.Whh @ h + self.bh)
y = self.Why @ h + self.by
return y,h

RNNs = RNN(vocab_size,2)
inputs = createInputs('i am very good')
out, h = RNNs.forward(inputs)
probs = softmax(out)
print(probs)
# [[0.50000228],[0.49999772]]

这里我们用到了softmax函数,softmax函数可以将任意的实值转换为概率(主要用于多分类任务)。它的核心作用是将网络的原始输出,转换为各类别的概率,使得所有概率之和为1。其公式如下 $$ \begin{align*} Softmax(z_i) = \frac{e^{z_i}}{\sum_{j=1}^{C}e^{z_j}} \end{align*} $$

反向传播

为了训练我们RNNs,我们首先需要选择一个损失函数。对于分类模型,Softmax函数经常和交叉熵损失函数配合使用。它的计算方式如下: $$ \begin{align*} L = -ln(p_c) \end{align*} $$ 其中pc是我们的RNNs对正确类别的预测概率(正面或负面)。例如,如果一个正面文本被我们的RNNs预测为90%的正面,那么可以计算出损失为: $$ \begin{align*} L = -ln(0.90) = 0.105 \end{align*} $$

既然有损失函数了,我们就可以使用梯度下降来训练我们的RNN以减小损失。

计算

首先从计算$\frac{\partial L}{\partial y}$开始,我们有: $$ \begin{align*} L &= -ln(p_c) = -ln(softmax(y_c)) \\ \frac{\partial L}{\partial y} &= \frac{\partial L}{\partial p_c}* \frac{\partial p_c}{\partial y_i} \\ \frac{\partial L}{\partial p_c} &= -\frac{1}{p_c} \\ \frac{\partial p_c}{\partial y_i} &= \begin{cases} \frac{\partial p_i}{\partial y_i} = \frac{e^{y_i}\sum_{j}e^{y_j}-(e^{y_i})^2}{(\sum_{j}e^{y_j})^2} = p_i(1-p_i)&\text{if c=i} \\ \frac{\partial p_c}{\partial y_i} = \frac{e^{y_i}\sum_{j}e^{y_j}-(e^{y_i})^2}{(\sum_{j}e^{y_j})^2} = -p_cp_i&\text{if c!=i} \end{cases} \\ \frac{\partial L}{\partial y} &= \begin{cases} -\frac{1}{p_i} * p_i(1-p_i) = p_i-1 & \text{if c=i} \\ -\frac{1}{p_c} * (-p_cp_i) = p_i & \text{if c!=i} \end{cases} \end{align*} $$ 接下来我们尝试对Whyby的梯度,它们将最终隐藏状态转换为RNNs的输出。我们有: $$ \begin{align*} \frac{\partial L}{\partial W_{hy}} &= \frac{\partial L}{\partial y} *\frac{\partial y}{\partial W_{hy}} \\ y &= W_{hy}h_n + b_y \\ \\ \frac{\partial y}{\partial W_{hy}} &= h_n \to \frac{\partial L}{\partial W_{hy}} = \frac{\partial L}{\partial y}h_n \\ \frac{\partial y}{\partial b_{y}} &= 1 \to \frac{\partial L}{\partial b_{y}} = \frac{\partial L}{\partial y} \end{align*} $$ 最后我们还需要Whh,Wxhbh的梯度。由于梯度在每一步中都会被使用,所以根据时间展开和链式法则,我们有: $$ \begin{align*} \frac{\partial L}{\partial W_{xh}} &= \frac{\partial L}{\partial y}\sum_{t=1}^{T}\frac{\partial y}{\partial h_t}*\frac{\partial h_t}{\partial W_{xh}} \\ \end{align*} $$ 这是因为L会被y所影响,而yhT所影响,而hT又依赖于h(T-1)直到递归到h1,因此Wxh通过所有中间状态影响到L,所以在任意时间t,Wxh的贡献为: $$ \begin{align*} \frac{\partial L}{\partial W_{xh}} \Big|_t &= \frac{\partial L}{\partial y}*\frac{\partial y}{\partial h_t}*\frac{\partial h_t}{\partial W_{xh}} \\ \end{align*} $$ 现在我们对其进行计算: $$ \begin{align*} h_t &= tanh(W_{xh}x_t + W_{hh}h_{t-1}+b_h) \\ \frac{dtanh(x)}{dx} &= 1-tanh^2(x) \\ \\ \frac{\partial h_t}{\partial W_{xh}} &= (1-h_t^2)x_t \\ \frac{\partial h_t}{\partial W_{hh}} &= (1-h_t^2)h_{t-1} \\ \frac{\partial h_t}{\partial b_h} &= (1-h_t^2) \\ \end{align*} $$ 最后我们需要计算出$\frac{\partial y}{\partial h_t}$。我们可以递归的计算它: $$ $$ 由于我们是反向训练的,$\frac{\partial y}{\partial h_{t+1}}$是已经计算的最后一步的梯度$\frac{\partial y}{\partial h_n}=W_{hh}$。至此为止我们的推导就结束了

实现

由于反向传播训练需要用到前向传播中的一些数据,所以我们将其进行存储:

1
2
3
4
5
6
7
8
9
10
def forward(self,inputs):
h = np.zeros((self.Whh.shape[0],1))
# 数据存储
self.last_inputs = inputs
self.last_hs = {0:h}
for i,x in enumerate(inputs):
h = np.tanh(self.Wxh @ x + self.Whh @ h + self.bh)
self.last_hs[i+1] = h # 更新存储
y = self.Why @ h + self.by
return y,h

现在我们可以开始实现backprop()了:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
def backprop(self,d_y,learn_rate=2e-2):
# d_y: 是损失函数对于输出的偏导数 d_L/d_y 的结果
n = len(self.last_inputs)
# 计算dL/dWhy,dL/dby
d_Why = d_y @ self.last_hs[n].T
d_by = d_y
# 初始化dL/dWhh,dL/dWxh,dL/dbh为0
d_Whh = np.zeros(self.Whh.shape)
d_Wxh = np.zeros(self.Wxh.shape)
d_bh = np.zeros(self.bh.shape)
# 计算dL/dh
d_h = self.Why.T @ d_y # 因为dy/dh = Why 所以 dL/dh = Why * dL/dy

# 随时间的反向传播
for t in reversed(range(n)):
# 通用数据 dL/dh * (1-h^2)
temp = (d_h * (1 - self.last_hs[t+1] ** 2))
# dL/db = dL/dh * (1-h^2)
d_bh += temp
# dL/dWhh = dL/dh * (1-h^2) * h_{t-1}
d_Whh += temp @ self.last_hs[t].T
# dL/dWxh = dL/dh * (1-h^2) * x
d_Wxh += temp @ self.last_inputs[t].T
# Next dL/dh = dL/dh * (1-h^2) * Whh
d_h = self.Whh @ temp

# 梯度剪裁(防止梯度过大导致梯度爆炸问题)
for d in [d_Wxh,d_Whh,d_Why,d_by,d_bh]:
np.clip(d,-1,1,out=d)

# 梯度下降训练
self.Whh -= learn_rate * d_Whh
self.Wxh -= learn_rate * d_Wxh
self.Why -= learn_rate * d_Why
self.bh -= learn_rate * d_bh
self.by -= learn_rate * d_by

由于这一部分的编写涉及到矩阵的变换,所以在编写时,一定要清楚每个变量的状态,以免造成数学错误。例如,以上程序中@的左乘右乘顺序不能随意改变。

训练

我们现在需要写一个接口,将我们的数据”喂”给神经网络,并量化损失和准确率,用于训练我们的神经网络。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
def processData(data, backprop=True):
# 打乱数据集 避免顺序偏差
items = list(data.items())
random.shuffle(items)

loss = 0
num_correct =0
for x,y in items:
inputs = createInputs(x)
target = int(y)
# 前向传播计算
out,_ = RNN.forward(inputs)
probs = softmax(out)
# 计算损失与准确度
loss -= np.log(probs[target])
num_correct += int(np.argmax(probs) == target)

if backprop:
d_L_d_y = probs
d_L_d_y[target] -= 1
RNN.backprop(d_L_d_y)

return loss/len(data),num_correct /len(data)

这里对于$\frac{\partial L}{\partial y}$的初始化我们需要重点关注一下。由于我们使用的是,交叉熵损失+Softmax函数来进行处理。对于输出层,我们有一个简洁的表达式来进行处理: $$ \begin{align*} \frac{\partial L}{\partial y} = probs - onehot(target) \end{align*} $$ 这里我选用AI的解释来直观的感受为什么这么做:

image.png

我们在前面也推导过这个原因 $$ \begin{align*} \frac{\partial L}{\partial y} &= \begin{cases} -\frac{1}{p_i} * p_i(1-p_i) = p_i-1 & \text{if c=i} \\ -\frac{1}{p_c} * (-p_cp_i) = p_i & \text{if c!=i} \end{cases} \end{align*} $$ 最后我们编写训练循环,来对我们的内容进行训练:

1
2
3
4
5
6
7
8
9
for epoch in range(1000):
train_loss, train_acc = processData(train_data)

if epoch % 100 == 99:
print('--- Epoch %d' % (epoch + 1))
print('Train:\tLoss %.3f | Accuracy: %.3f' % (train_loss, train_acc))

test_loss, test_acc = processData(test_data, backprop=False)
print('Test:\tLoss %.3f | Accuracy: %.3f' % (test_loss, test_acc))

执行可以看到完整的训练过程。

使用

既然完成了训练,那么我们可以尝试与其进行沟通,我们可以写一个接口用于和它进行对话:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
def predict(probs, mid=0.5):
positive_prob = probs[1]
return "Yes,you are positive ^_^" if positive_prob > mid else "No,you are negative qwq"


print("please wait some time to train")

for epoch in range(1000):
processData(train_data)

while True:
text = input("please input a sentence: ").lower()
inputs = createInputs(text)
out, _ = rnn.forward(inputs)
probs = softmax(out)
print(predict(probs))

哈哈效果还可以,只不过只能检测到训练集中用过的单词。

在网上冲浪的时候我们经常会看到一些点开与不点开会呈现不同结果的图片,我们称这种图片为”幻影坦克图”。因为它利用到了光学欺骗的原理,与红警中的幻影坦克相似,故得其名。今天,尝试了解下背后的原理,并自己动手实现一下这个功能。

原理

由于幻影坦克图需要涉及到alpha混合,所以我们使用的图像格式必须带有alpha通道。所以我们的首选就是png图像格式,我们将利用它的透明度来实现。最为核心的部分就是alpha通道作用于图片的计算公式: $$ \begin{align*} Color_{合} = Color_{前}*Alpha + Color_{后}*(1-Alpha) \end{align*} $$ 其中Color是混合后所被看到的颜色,Color是前景色,Color是背景色

我们可以通过改变背景色从而得到图像在背景上呈现的颜色,为了更好的背景效果,我们选择黑底与白底,分别是:(r_w,g_w,b_w,a_w) = (1,1,1,1)(r_b,g_b,b_b,a_b) = (0,0,0,1)。现在我们可以计算出指定的带有透明度的图片分别在白底(r1,g1,b1)和黑底(r2,g2,b2)上呈现的颜色: $$ \begin{cases} r_1 = r*\alpha + (1-\alpha) \\ g_1 = g*\alpha + (1-\alpha) \\ b_1 = b*\alpha + (1-\alpha) \\ \\ r_2 = r*\alpha \\ g_2 = g*\alpha \\ b_2 = b*\alpha \end{cases} $$ 但是实际上我们想要把两张图片做成幻影坦克图,我们的(r1,g1,b1,1)(r2,g2,b2,1)都应该是已知的,实际上我们要求解的应该是幻影坦克图(r,g,b,a),所以我们调整方程组可得: $$ \begin{cases} \alpha = 1-r_1+r_2 \\ \alpha = 1-g_1+g_2 \\ \alpha = 1-b_1+b_2 \\ \\ r = \frac{r_2}{\alpha} = \frac{r_2}{1-r_1+r_2} \\ g = \frac{g_2}{1-g_1+g_2} \\ b = \frac{b_2}{1-b_1+b_2} \end{cases} $$ 现在我们发现透明度有三种不同的计算方式,此时我们该如何处理呢?在正常情况下,我们难以满足这三个式子相等。但是在灰度图下可以做到r = g = b,此时灰度图的亮度信息由像素亮度决定。同样的我们也需要注意到0 <= a <= 1的大小关系,因此我们必须有r1 >= r2。为了避免像素值也超出值域,我们在实现时,需要将r1压缩到[128,255],将r2压缩到[0,127]

现在我们可以计算出我们想要的幻影坦克图片的每一个像素信息了。

代码实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
from PIL import Image
import numpy as np

def mix(img1, img2, output):
# 灰度图转化RGB图
img1 = Image.open(img1).convert("L").convert("RGB")
img2 = Image.open(img2).convert("L").convert("RGB")
# 调整图像大小
img2 = img2.resize(img1.size)
# 归一化
imgarr1 = np.array(img1,dtype=np.float32) / 255.0
imgarr2 = np.array(img2,dtype=np.float32) / 255.0
# 压制像素 r1>= r2
imgarr1 = 0.5 + 0.5 * imgarr1
imgarr2 = 0.5 * imgarr2
# 透明度计算
alpha = 1 - imgarr1 + imgarr2
alpha = np.clip(alpha,0,1)
# 计算幻影坦克图颜色值
rgb = imgarr2 / (alpha + 1e-6)
rgb = np.clip(rgb,0,1)
# 合并RGB与Alpha
rgba = np.dstack((rgb,alpha.mean(axis=2)))
# 输出图片
img = Image.fromarray((rgba * 255).astype(np.uint8),"RGBA")
img.save(output)

mix("D:/Photo/111.png","D:/Photo/123.png","D:/Micro/test.png")

我们可以看到以下效果:

表图
里图

优化

我们通过将图片转换成灰度图的形式从而实现了幻影坦克图,但是我们却丢失了RGB通道的颜色。有没有什么方式能够尽可能的保留原来的色彩信息呢?

我们回到这个公式: $$ \begin{cases} \alpha = 1-r_1+r_2 \\ \alpha = 1-g_1+g_2 \\ \alpha = 1-b_1+b_2 \\ \\ r = \frac{r_2}{\alpha} = \frac{r_2}{1-r_1+r_2} \\ g = \frac{g_2}{1-g_1+g_2} \\ b = \frac{b_2}{1-b_1+b_2} \end{cases} $$ 由于在RGBA模式下,一个像素只能有一个alpha通道,也就是说三个色彩通道只能使用一个alpha。所以我们怎么才能让$1-r_1+r_2-g_1+g_2-b_1+b_2rg b $呢?

我们可以通过两种方式来实现:

  • 调整图片整体亮度

    我们知道,图像的亮度调整通常是通过线性放缩实现的: $$ \begin{align*} RGB_{new} = RGB_{original} * k\space(0<k<1) \end{align*} $$ 所以RGB值之间的差值也是随着线性放缩变化的。当我们的亮度趋近于0时,r ≈ g ≈ b是成立的,我们可以通过减少亮度,从而提升幻影坦克图的视觉效果。

  • 设置插值平衡色彩与灰度的混合比例

    我们可以通过设置差值,来平衡色彩和灰度的混合比例,使得透明度的计算结果在RGB三个通道上趋于一致,,我们基于这个公式可以求出合适的插值: $$ \begin{align*} r_{new} &= r*lerp + gray*(1-lerp) \\ gray &= 0.299*r + 0.587*g + 0.114*b \end{align*} $$ 通过这种方式我们可以强制对齐alpha通道,并减少通道差异。

所以接下来,我们需要考虑合适的合适的亮度与色彩插值,以实现更好的效果。

对于亮度,我们有

image.png

由于人的眼睛对光的线性变化的感受是非线性的,所以即使我们把黑底图的亮度系数调暗到0.22,视觉上也只会认为颜色变暗了0.5。所以[0.18,0.22]是我们选取亮度最合适的区间。

对于色彩插值参数,参数越大,灰度平衡的比例就越小,图片能更好的保留色彩。我们希望白底图能够保留较多的色彩,且黑底图能够更加隐蔽,所以我们设置:

  • 表图 lerp = [0.6,0.8]
  • 里图 lerp = [0.2,0.4]

综上所述我们可以实现我们最终的效果了

最终实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
from PIL import Image,ImageEnhance
import numpy as np

class Phantom:
def __init__(self):
self.default = {
'brightnessW': 1,
'brightnessB': 0.8,
'lerpW':0.8,
'lerpB':0.8
}

def loadImage(self,img1,img2): # 加载图片
img1 = Image.open(img1).convert("RGB")
img2 = Image.open(img2).convert("RGB")
img2 = img2.resize(img1.size)
return img1,img2

def adjustBrightnedd(self,img,brightness): # 亮度调节
if brightness == 1:
return img
return ImageEnhance.Brightness(img).enhance(brightness)

def blendColor(self,arr,lerp): # 色彩插值平衡
if lerp == 1.0:
return arr
gray = np.dot(arr,[0.299,0.587,0.114])
return arr * lerp + gray[...,None] * (1-lerp)

def generate(self,img1,img2,outpath,**kwargs):
params = {**self.default,**kwargs}
# 加载图片
img1, img2 = self.loadImage(img1,img2)
# 亮度调整
img1 = self.adjustBrightnedd(img1,params['brightnessW'])
img2 = self.adjustBrightnedd(img2,params['brightnessB'])
# 归一化
arr1 = np.array(img1, dtype=np.float32) / 255.0
arr2 = np.array(img2, dtype=np.float32) / 255.0
# 压制像素 值域区分
arr1 = 0.5 + 0.5 * arr1
arr2 = 0.5 * arr2
# 色彩差值平衡
arr1 = self.blendColor(arr1,params['lerpW'])
arr2 = self.blendColor(arr2,params['lerpB'])
# 透明度计算
alpha = np.clip(1-arr1+arr2,0,1)
# 幻影坦克图像素计算
rgb = np.clip(arr2/(alpha + 1e-6),0,1)
rgba = np.dstack((rgb, alpha.mean(axis=2)))
# 转化图片
img = Image.fromarray((rgba * 255).astype(np.uint8), "RGBA")
img.save(outpath)


if __name__ == '__main__':
ph = Phantom()
ph.generate("D:/Photo/q.jpg","D:/Photo/w.jpg","D:/Micro/test.png")

效果挺好滴,展示一下:

表图
里图

信号

我们已经认识到了操作系统怎么通过异常使得进程上下文切换,以实现异常控制流的实现。现在我们将尝试另一种实现——Linux信号,来允许进程和内核中断其他的进程。

我们可以将信号理解成一条消息,它通知进程系统中发生了某一个事件。每种信号类型都会对应于某种系统事件。然后由不同的处理程序去处理这个事件。我们可以通过man 7 signal来进一步的认识这些信号:

image.png

信号术语

传送一个信号到目的进程可以分作两个步骤:

  • 发送信号:内核通过更新目的进程的上下文中的某个状态,从而实现发送一个信号给目的进程。发送信号一般有以下两种原因:1)内核检测到了一个系统事件,2)一个进程调用了kill函数,显式的要求内核发送一个信号给目的进程
  • 接收信号:当目的进程被内核强迫对信号的发送做出反应时,我们就说它接受了信号。目的进程可以忽略这个信号,终止,或者通过执行信号处理程序的用户层来捕获这个信号。
image.png

一个发出但没有被接收的信号我们称之为待处理信号。在任何时刻一个类型只会有一个待处理信号。如果一个进程中有一个类型为k的待处理信号。接下来任何发送到这个进程的类型为k的信号都不会再排队等候,而是被丢弃。同时一个进程可以选择阻塞接受某种信号。如果一个信号被阻塞,它可以发送,但是不会再被接收,直到取消对它的阻塞。

这个实现通过内核为每个进程维护这一个信号处理集合实现。在pending向量中维护着一个待处理信号的集合,在blocked向量中维护着一个阻塞信号的集合。当一个信号类型为k的信号被发送时,目的进程会检查其pending位是否已被设置,若是则丢弃;不是则设置。然后检查其block位是否被阻塞,若是则丢弃;若不是则接受信号,并清除pending位。

发送信号

发送信号的机制,是基于进程组实现的。我们接下来进一步的理解信号的发送:

进程组

每个进程都只属于一个进程组。进程组是由一个正整数进程组ID来标识的。我们有以下函数可以认识并改变进程组:

1
2
3
#include <unistd.h>
pid_t getpgrp(void); //返回调用进程的进程组ID
int setpgid(pid_t pid,pid_t pgid); //成功则返回0,失败则返回-1

默认情况下,子进程和它的父进程是同属于一个进程组的。一个进程可以通过使用setpgid函数来改变自己或者其他进程的进程组。setpgid函数将pid(目标进程PID)进程加入到进程组pgid(目标进程组ID)。如果pid是0,就表示当前进程。如果pgid是0,就用pid的值作为新的pgid

kill程序发送信号

/bin/kill程序可以向其他程序发送任意的信号:

1
kill -<signalNumber> <pid>

也可以向指定的进程组中的所有进程发送信号:

1
kill -<signalNumber> -<pgid>

我们可以尝试一下:

image.png

此外我们也可以通过键盘发送信号

从键盘发送信号

Uinx shell 中通常使用job(作业)来表示对一条命令行求值而创建的进程。在任何时刻,只有一个前台作业和0或多个后台作业。其中每个作业的都属于一个独立的进程组。

在键盘上Ctrl+C会导致内核发送一个SIGINT信号到前台进程组中的每个进程,导致作业终止。在键盘上Ctrl+Z会导致内核发送一个SIGTSTP信号到前台进程组中的每个进程,导致作业被停止(挂起)。

用kill函数发送信号

1
2
3
#include <sys/types.h>
#include <signal.h>
int kill(pid_t pid,int sig) //成功则返回0,失败则返回-1

通过调用kill函数发送信号到其他进程。

  • 如果pid>0,那么发送信号sig给进程pid
  • pid=0,那么发送信号sig给调用进程所在进程组中的每个进程,包括自己。
  • pid<0,则发送信号sig给进程组|pid|中的每个进程

我们可以尝试编写一个程序来使用kill函数:

1
2
3
4
5
6
7
8
9
10
11
12
#include <stdio.h>
#include "csapp.h"

int main(){
pid_t pid;
if((pid = Fork())==0){
Pause(); //将进程挂起,直到接收到一个信号
printf("never get it\n");
exit(0);
}
kill(pid,SIGKILL);
}

这样就实现了父进程击杀自己的子进程。

用alarm函数发送信号

进程可以通过调用alarm函数向自己发送SIGALRM信号

1
2
#include <unistd.h>
unsigned int alarm(unsigned int secs); //返回前一次闹钟剩余的秒数,若之前没有设置闹钟,返回0

alarm函数安排内核在secs秒之后发送一个SIGALRM信号给调用进程。如果secs==0,那么不会安排调度新的闹钟。在任何情况下,alarm的调用都会取消之前的待处理的闹钟,并返回前一个闹钟的剩余的秒数。

接收信号

当内核把进程p从内核态切换到用户态时,它会检查进程p的未被阻塞的待处理信号的集合pending & ~blocked。如果这个集合为空,那么内核将控制传递到p的逻辑控制流中的下一条指令。如果集合使非空的,那么内核选择集合中的某个信号k(通常是最小的k),并强制p接收信号k。收到这个信号会触发某种行为。一旦进程完成这个行为,就会将控制传递回p的逻辑控制流中的下一条指令。

在上面展示信号类型的图中,也有每个信号类型相关联的默认行为。我们也可以通过signal函数修改和信号相关联的默认行为。唯一例外的是SIGSTOPSIGKILL。它们的默认行为是不能修改的。

1
2
3
4
#include <signal.h>
typedef void (*sighandler_t)(int);
sighandler_t signal(int signum,sighandler_t handler);
//如果成功则返回指向前次处理程序的指针;否则返回SIG_ERR,不设置errno

signal函数通过以下三种方式之一来改变和信号signum相关联的行为:

  • 如果handler是SIG_IGN,那么忽略这个类型的信号
  • 如果handler是SIG_DFL,那么恢复这个类型的信号的默认行为
  • 否则,handler就是用户定义的函数的地址,这个函数被称为信号处理程序。当接收到指定的信号类型时就会调用信号处理程序,我们称之为捕获信号。一个处理程序可以捕获不同的信号。

比如我们可以写一个程序用来改变SIGINT信号的默认行为(终止进程):

1
2
3
4
5
6
7
8
9
10
11
12
13
#include <stdio.h>
#include "csapp.h"

void handler(int sig){
printf("\nOVER!\n");
exit(0);
}

int main(){
Signal(SIGINT,handler);
Pause();
return 0;
}

当然信号处理程序也可以被其他信号处理程序中断。例如在下图中演示了这个过程:

image.png

阻塞和解除阻塞信号

Linux提供了两种阻塞信号的机制:

  • 隐式阻塞机制 内核默认阻塞任何当前处理程序正在处理的信号类型的待处理信号。
  • 显式阻塞机制 使用sigprocmask函数和它的辅助函数,明确阻塞/解除指定的信号。
1
2
3
4
5
6
7
8
#include <signal.h>
int sigprocmask(int how, const sigset_t* set, sigset_t* oldset);
int sigemptyset(sigset_t* set);
int sigfillset(sigset_t* set);
int sigaddset(sigset_t* set, int signum);
int sigdelset(sigset_t* set, int signum); //成功则返回0,否则返回-1

int sigismember(const sigset_t* set, int signum); //若是则返回1,不是则返回0,出错返回-1

sigprocmask函数改变当前阻塞的信号集合(blocked位向量)。其具体的行为依赖于how值:

  • SIG_BLOCK 把set中的信号添加到blocked中 blocked = set|blocked
  • SIG_UNBLOCK 从blocked中删除set中的信号 blocked = blocked & ~set
  • SIG_SETMASKblocked = set

如果oldset非空,则将之前的blocked保存在其中。对于其他的几个辅助函数:

  • sigemptyset初始化set为空集合
  • sigfillset将每个信号都添加到set中
  • sigaddset将signum加入到set中
  • sigdelset从set中删除signum,如果signum是set的成员,返回1。不是则返回0

编写信号处理程序

这一部分太难了,我难以理解并接受。之后再来看看吧

显式地等待信号

和上面的关联度较高,涉及到竞争并发等内容,我暂时无法理解

非本地跳转

C语言提供了一种用户级的异常控制流形式,即非本地跳转(本地跳转是goto),它将控制从一个函数转移到另一个当前正在执行的函数,而不需要经过正常的调用-返回序列。其中非本地跳转是通过setjmplongjmp实现的。

1
2
#include <setjump.h>
int setjump(jmp_buf env); //setjmp返回0,longjmp返回非0

setjmp函数会在env缓冲区中保存当前的调用环境(相当于设置一个锚点,保存当前状态),以供后面的longjmp使用,并返回0。注意setjump由于其特殊的返回机制,不能被存储在变量之中,但是可以被switch使用。

1
2
#include <setjump.h>
int longjmp(jmp_buf env, int retval); //不返回

longjmp函数从env缓冲区中恢复调用环境,然后触发一个从最近一次初始化env的setjmp调用的返回。然后setjmp返回,并带有非零的返回值retval

注意到,setjmp只被执行一次,但是会返回多次:一次是第一次调用setjmp时,调用环境保存在缓冲区env中。一次时为每个相应的longjmp调用。另一方面,longjmp被调用一次,但是不返回。

通过非本地条状我们可以实现从一个深层嵌套的函数调用中立即返回,从而实现对错误的分析,而不用多次退出复杂的调用栈。我们以下面的程序为例,可以感受到非本地跳转的用途:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
#include <setjmp.h>
#include "csapp.h"
jmp_buf buf;
int error1 = 0;
int error2 = 1;
void foo(),bar();
int main(){
switch(setjmp(buf)){
case 0:
foo();
break;
case 1:
printf("Detected error1 in foo\n");
break;
case 2:
printf("Detected error2 in foo\n");
break;
default:
printf("Unknown error in foo\n");
}
exit(0);
}
void foo(){
if(error1)
longjmp(buf,1);
bar();
}
void bar(){
if(error2)
longjmp(buf,2);
}

虽然C中并没有异常的捕获函数,但是我们可以通过这种方式去实现。当遇到一个错误是,从setjmp返回,并解析它的错误类型。

同时,也要注意。longjmp允许跳过中间调用机制的过程可能回导致许多意外的后果。比如没有释放一些数据结构,导致内存泄露….

写在最后

关于异常控制流我感觉还是比较抽象的。涉及到的函数很多,尤其是信号部分,牵连到许多并发相关的内容。对于现在的我而言还是太过超前,日后再来巩固。

系统调用错误处理

由于之后会用到大量的系统调用函数,我们需要做好错误处理,以便于查找问题。Uinx系统中的系统级函数遇到错误时会返回-1,并设置全局整数变量errno来表示错误类型。这个时候我们可以通过strerror()函数来返回和errno值相关联的错误。我们以处理fork()的错误为例:

1
2
3
4
if((pid = fork()) < 0){
fprintf(stderr,"fork error: %s\n",strerror(errno));
exit(0);
}

我们可以进一步的封装这个错误:

1
2
3
4
5
6
7
8
void unix_error(char *msg){
fprintf(stderr,"%s: %s\n",msg,strerror(errno));
exit(0)
}

if((pid = fork()) < 0){
unix_error("fork error");
}

通过错误处理包装函数,我们可以进一步的优化代码。对于错误处理包装函数,有一个约定成俗的规矩,对于基本函数foo,我们定义一个具有相同参数的包装函数Foo。包装函数调用基本函数,检查错误,如果有问题就终止。比如下面对fork的包装:

1
2
3
4
5
6
pid_t Fork(){
pid_t pid;
if((pid = fork()) < 0)
unix_error("fork error");
return pid;
}

我们之后的包装函数也会按照相同的处理模式,来进行编写。

进程调用

Unix提供了大量从C程序中操作进程的系统调用,我们来详细了解他们:

获取进程ID

每个进程都有一个唯一的正数非零进程PID

1
2
3
4
5
#include <sys/types.h>
#include <unistd.h>

pid_t getpid(); //返回调用进程的PID
pid_t getppid(); //返回调用进程的父进程的PID

这两个函数返回的类型为pid_t,在Linux中它们被types.h定义为int

创建和终止进程

我们可以认为进程总是处于下面三种状态:

  • 运行 进程要么在CPU上执行,要么在等待被执行且最终会被调度
  • 停止 进程的执行被挂起,且不会被调度。当收到SIGSTOP SIGTSTP SIGTTIN SIGTTOU信号时,进程就保持停止,直到它收到一个SIGCONT信号,这个时候,进程在次开始运行
  • 终止 进程永远地停止了。三种原因:1)收到终止进程信号,2)从主程序返回,3)调用exit()函数

下面我们了解进程的创建和终止过程:

1
2
#include <stdlib.h>
void exit(int status); //exit函数以status退出状态来终止进程
1
2
3
#include <sys/types.h>
#include <unistd.h>
pid_t fork(); //子进程返回0,父进程返回子进程的PID,如果出错返回-1

新创建的子进程会得到父进程用户级虚拟地址空间相同的一份副本,包括代码、数据、用户栈、堆、共享库。子进程也会得到与父进程任何打开文件描述符相同的副本,这意味着当父进程调用fork时,子进程可以读取父进程中打开的所有文件。父子进程最大的区别就在于他们的PID不同

fork函数被调用一次,却会返回两次,这是因为调用之后创建了一个新的进程。然而,在两个进程中的返回值会有所不同,因此我们根据fork()的返回值来判断父子进程

我们可以用下面这个程序来展示一个进程的创建:

1
2
3
4
5
6
7
8
9
10
11
int main(){
pid_t pid;
int x = 1;
pid = Fork();
if(pid == 0){ //子进程
printf("child: x = %d\n",x+1);
exit(0);
}
printf("parent: x = %d\n",x-1); //父进程
exit(0);
}

我们可以看到执行的结果:

1
2
3
ylin@Ylin:~/Program/test$ ./a.out
parent: x = 0
child: x = 2

实际的运行过程我们可以简化成流程图:

image.png

对于整个过程我们可以从中注意到一些关键点:

  • 调用一次,返回两次 fork函数被父进程调用一次,但是却返回两次——一次返回到父进程,一次返回到子进程。对于多个fork函数的情况我们之后会涉及
  • 并发执行 父子进程都是并发运行的独立进程。内核可能以任意方式交替执行它们的逻辑控制流的指令。因此我们不能对不同进程中指令的交替执行做出假设。不存在执行的先后关系
  • 相同但是独立的空间 通过观察局部变量x,我们可以看出,父子进程对x所作的改变都是独立的。说明它们之间的空间是独立的。根据数值可以判断,x的值是相同的。因此我们说父子进程的空间相同且独立。
  • 共享文件 printf将输出输入到stdout文件中,结果表明在子进程中的stdout也是打开的。子进程继承了这个文件,所以说父子进程中的文件描述符也是相同的

理解了这些我们就可以理解更复杂的情况,我们可以通过流程图来更好的分析复杂的嵌套情况:

1
2
3
4
5
6
int main(){
Fork();
Fork();
printf("hello");
exit(0);
}
image.png

回收子进程

当一个进程被终止时,内核不会立即将其清除。而是进程会处于一个终止的状态下,直到它的父进程将其回收。当父进程将终止的子进程回收时,内核将子进程的推出状态传递给了父进程,然后抛弃终止的进程。直到现在,这个进程才不存在了。对于处于终止状态,但没有被回收的进程,我们称之为僵死进程。

如果一个父进程终止了,内核会安排init进程作为它们的孤儿进程的养父。init进程的PID为1,是在系统启动时就由内核创建的,它不会终止,是所有内核的祖先,如果父进程没有回收它的僵死子进程就终止了。内核会安排init进程去回收它们。因为即使僵死子进程没有运行,也会消耗系统的内存资源

进程可以通过调用waitpid函数来等待它的子进程终止或停止。

1
2
3
4
#include <sys/types.h>
#include <sys/wait.h>
pid_t waitpid(pid_t pid, int* statusp, int options);
// 如果成功终止就返回子进程的PID;如果WNOHANG,则为0;其他错误,则为-1

waitpid函数比较复杂,我们需要仔细讨论一下。

默认情况下(options=0),waitpid挂起调用进程的执行,直到它的等待集合中的一个子进程终止。如果等待集合中的一个进程在刚调用的时候就已经终止了,那么waitpid就立即返回。在这两种情况中,waitpid会返回导致waitpid返回的已终止进程的PID。此时,已终止的子进程会被回收,内核会清理它的痕迹。

这个过程非常的抽象,我们需要深入去理解waitpid:

判定等待集合的成员

等待集合的成员由参数pid确定:

  • 如果pid>0,那么等待集合就是一个单独的子进程,它的进程ID等于pid
  • 如果pid=-1,那么等待集合就是由父进程所有的子进程组成的

修改默认行为

可以通过修改options为各个常量从而实现修改默认行为

  • WNOHANG

    如果等待集合中的任何子进程都没有终止,那么就立即返回。而默认的行为是挂起调用进程,直到有子进程终止。默认行为是等待的,会阻塞之后的操作。如果想要在等待子进程终止的同时,想要进行别的工作,我们就可以启用这个选项

  • WUNTRACED

    挂起调用进程的执行,直到等待集合中的一个进程变成已终止或者被停止。返回的PID为导致返回的已终止或者被停止的子进程的PID。默认是返回导致返回的已终止的子进程。如果想要检查已终止和被停止的子进程时,可以启用这个选项。

  • WCONTINUED

    挂起调用进程的执行,直到等待集合中一个正在运行的进程变成已终止或等待集合中一个被停止的进程收到SIGCONT信号重新开始执行

这些常量可以通过”|“来连接,从而更改行为

检查已回收子进程的退出状态

status是statusp指向的值,如果这个值不为NULL。waitpid就会在status中放上关于导致返回的子进程的状态信息。我们可以通过<wait.h>中定义的宏来解释status参数:

  • WIFEXITED() 如果子进程通过exit或return正常终止则返回真
  • WEXITSTATUS() 返回一个正常终止的子进程的退出状态。只有WIFEXITED()返回为真时,才有这个状态
  • WIFSIGNALED() 如果子进程是因为一个未被捕获的信号终止的,那么返回真
  • WTERMSIG() 返回导致子进程终止的信号的编号。只有WIFSIGNALED()返回为真时,才有这个状态
  • WIFSTOPPED() 如果子进程是停止的,就返回真
  • WSTOPSIG() 返回引起子进程停止的信号的编号。只有WITSTOPPED()返回为真时,才有这个状态
  • WIFCONTINUED() 如果子进程收到SIGCONT信号重新启动,则返回真

错误条件

如果调用进程没有子进程,则返回-1,设置errno=ECHILD

如果waitpid被信号中断,返回-1,设置errno=EINTR

wait函数

wait()waitpid()的简化版

1
2
3
4
5
#include <sys/types.h>
#include <sys/wait.h>

pid_t wait(int* statusp);
//如果成功,返回子进程PID;否则返回-1

wait(&status)等价于waitpid(-1,&status,0)

让进程休眠

sleep函数可以将一个进程挂起指定的时间

1
2
#include <unistd.h>
unsigned int sleep(unsigned int secs); //返回还要休眠的秒数

如果请求的时间到了,就返回0;否则返回还要休眠的秒数,这种情况是因为sleep可能会被信号中断而过早返回。

另一个函数是puase,该函数让进程休眠,直到收到信号。

1
2
#include <unistd.h>
int pause(); //总是返回-1

加载并运行程序

execve函数用于在当前进程的上下文中加载并运行一个新的程序。

1
2
3
4
5
#include <unistd.h>
int execve(const char* filename,
const char* argv[],
const char* envp[]);
//如果成功,则不返回;否则返回-1

execve函数加载并运行可执行目标文件filename,且待参数列表argv和环境变量列表envp。只有出现错误时,execve才会返回到调用程序。正常情况下调用一次不返回。

在execve加载了filename之哦胡。它会调用启动代码__libc_start_main。启动代码设置栈,并将控制传递给新程序的主函数:

1
int main(int argc,char* argv[],char*envp[]);

当main开始执行时,用户栈的组织结构如下:

image.png

我们从高地址往下看,先是存放了参数和环境的字符串。然后是以NULL结尾的指针数组,其中每个指针都指向栈中的一个字符串。其中全局变量environ指向这些指针中的第一个envp[0]。在栈的顶部是系统启动函数__libc_start_main的栈帧。

main的三个参数:

  • argc 给出argv[]数组中非空指针的数量
  • argv 指向argv[]数组中的第一个条目
  • envp 指向envp[]数组中的第一个条目

同时LInux还提供了几个函数用来操作环境数组:

1
2
3
4
#include <stdlib.h>
char* getenv(const char* name); //若存在则返回指向name的指针;否则返回NULL
int setenv(const char* name,const char* newvalue,int overwrite); //成功则返回0;否则返回-1
void unsetenv(const char* name); //不返回
  • getenv函数在环境数组中搜索字符串“name=value”。如果找到了,就返回指向value的指针。
  • unsetenv函数查找字符串”name=value”,并删除
  • setenv函数找到环境变量”name=value”后,会用新的value替换;否则则创建一个”name=new_value”的环境变量。overwrite用来控制是否覆盖已存在的同名环境变量,0则不覆盖。

使用fork和execve

我们写一个shell。shell打印一个命令行提示符,我们在stdin上输入命令行,然后对这个命令执行。于是我们可以搭建出一个简单的框架:

1
2
3
4
5
6
7
8
9
10
11
#include "shell.h"
int main(){
char cmdline[MAXLINE];
while(1){
printf(">>>");
Fgets(cmdline,MAXLINE,stdin);
if(feof(stdin))
exit(0);
eval(cmdline);
}
}

我们首先需要解析命令行参数,我们以空格作为分隔符,同时返回一个参数列表argv。如果命令的参数以&结尾,我们就把这个程序放在后台运行。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
int parseline(char* buf,char** argv){
char* delim; // 指向分隔符的指针
int argc; // 参数数量
int bg; // 是否为后台程序

buf[strlen(buf)-1]=' '; // \0替换为空格
while(*buf && (*buf==' ')) // 忽略多余的空格
buf++;

//解析参数
argc=0;
while((delim = strchr(buf,' '))){
argv[argc++] = buf;
*delim = '\0';
buf = delim+1;
while(*buf && (*buf==' '))
buf++;
}
argv[argc] = NULL;

if(argc==0)
return 1;

//是否应该在后台运行
if((bg = (*argv[argc-1] == '&')) != 0)
argv[argc-1] = NULL;

return bg;
}

解析好命令参数后,我们也需要判断第一个参数是否为程序名,或者是shell的内置函数。如果是内置函数我们就执行该函数,并返回1;如果不是就返回0。

1
2
3
4
5
6
7
8
9
10
11
int builtin_command(char **argv){
if(!strcmp(argv[0],"quit"))
exit(0);
if(!strcmp(argv[0],"cd")){
if(argv[1]==NULL)
chdir("~");
chdir(argv[1]);
return 1;
}
return 0;
}

最后我们就可以写出我们的执行函数了,如果builtin_comand返回0,我们就需要创建一个新的子进程并加载程序运行。然后根据是否后台运行的需求,使用waitpid进行对前台程序的等待。当作业结束后,再进行一次迭代。我们的Shell就粗略的完成了。但是现在有一个问题,我们的shell不能回收已经结束的子进程,我们会在之后加以改进。程序的完整代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
// csapp.h	
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <string.h>
#include <errno.h>
#include <sys/types.h>
#include <sys/wait.h>

void unix_error(const char *msg){
fprintf(stderr,"%s: %s\n",msg,strerror(errno));
exit(0);
}

pid_t Fork(){
pid_t pid;
if((pid = fork()) < 0)
unix_error("fork error");
return pid;
}

char* Fgets(char* str,int n,FILE* stream){
char* p = fgets(str,n,stream);
if(p == NULL)
unix_error("fgets error");
return p;
}


// shell.h
#include "csapp.h"

#define MAXARGS 32
#define MAXLINE 256

//执行命令行任务
void eval(char* cmdline);
//解析参数
int parseline(char* buf, char** argv);
//判断Shell内联函数
int builtin_command(char** argv);


int parseline(char* buf,char** argv){
char* delim; // 指向分隔符的指针
int argc; // 参数数量
int bg; // 是否为后台程序

buf[strlen(buf)-1]=' '; // \0替换为空格
while(*buf && (*buf==' ')) // 忽略多余的空格
buf++;

//解析参数
argc=0;
while((delim = strchr(buf,' '))){
argv[argc++] = buf;
*delim = '\0';
buf = delim+1;
while(*buf && (*buf==' '))
buf++;
}
argv[argc] = NULL;

if(argc==0)
return 1;

//是否应该在后台运行
if((bg = (*argv[argc-1] == '&')) != 0)
argv[argc-1] = NULL;

return bg;
}


int builtin_command(char **argv){
if(!strcmp(argv[0],"quit"))
exit(0);
if(!strcmp(argv[0],"cd")){
if(argv[1]==NULL)
chdir("~");
chdir(argv[1]);
return 1;
}
return 0;
}

void eval(char* cmdline){
char* argv[MAXARGS]; //参数列表
char buf[MAXLINE]; //命令存储区
int bg; //是否后台调用
pid_t pid;

strcpy(buf,cmdline);
bg = parseline(buf,argv);
if(argv[0]==NULL)
return;

if(!builtin_command(argv)){
if((pid = Fork()) == 0){
// printf("%s",argv[0]);
if(execvp(argv[0],argv)<0){
printf("Command not found\n");
exit(0);
}
}

if(!bg){
int status;
if(waitpid(pid,&status,0) < 0)
unix_error("waitpid error");
}
else
printf("%d %s\n",pid,cmdline);
}
return;
}

// main.c
#include "shell.h"
int main(){
char cmdline[MAXLINE];
while(1){
printf(">>>");
Fgets(cmdline,MAXLINE,stdin);
if(feof(stdin))
exit(0);
eval(cmdline);
}
}