Statistical-Learning-Method.../SVM/SVM.py

430 lines
19 KiB
Python
Raw Normal View History

2018-12-03 13:27:50 +08:00
#coding=utf-8
#Author:Dodo
#Date:2018-12-03
#Email:lvtengchao@pku.edu.cn
#Blog:www.pkudodo.com
'''
数据集Mnist
训练集数量60000(实际使用1000)
测试集数量10000实际使用100)
------------------------------
运行结果
正确率99%
运行时长50s
'''
import time
import numpy as np
import math
import random
def loadData(fileName):
'''
加载文件
:param fileName:要加载的文件路径
:return: 数据集和标签集
'''
#存放数据及标记
dataArr = []; labelArr = []
#读取文件
fr = open(fileName)
#遍历文件中的每一行
for line in fr.readlines():
#获取当前行,并按“,”切割成字段放入列表中
#strip去掉每行字符串首尾指定的字符默认空格或换行符
#split按照指定的字符将字符串切割成每个字段返回列表形式
curLine = line.strip().split(',')
#将每行中除标记外的数据放入数据集中curLine[0]为标记信息)
#在放入的同时将原先字符串形式的数据转换为0-1的浮点型
dataArr.append([int(num) / 255 for num in curLine[1:]])
#将标记信息放入标记集中
#放入的同时将标记转换为整型
#数字0标记为1 其余标记为-1
if int(curLine[0]) == 0:
labelArr.append(1)
else:
labelArr.append(-1)
#返回数据集和标记
return dataArr, labelArr
class SVM:
'''
SVM类
'''
def __init__(self, trainDataList, trainLabelList, sigma = 10, C = 200, toler = 0.001):
'''
SVM相关参数初始化
:param trainDataList:训练数据集
:param trainLabelList: 训练测试集
:param sigma: 高斯核中分母的σ
:param C:软间隔中的惩罚参数
:param toler:松弛变量
关于这些参数的初始值参数的初始值大部分没有强要求请参照书中给的参考例如C是调和间隔与误分类点的系数
在选值时通过经验法依据结果来动态调整本程序中的初始值参考于机器学习实战中SVM章节因为书中也
使用了该数据集只不过抽取了很少的数据测试参数在一定程度上有参考性
如果使用的是其他数据集且结果不太好强烈建议重新通读所有参数所在的公式进行修改例如在核函数中σ的值
高度依赖样本特征值范围特征值范围较大时若不相应增大σ会导致所有计算得到的核函数均为0
'''
self.trainDataMat = np.mat(trainDataList) #训练数据集
self.trainLabelMat = np.mat(trainLabelList).T #训练标签集,为了方便后续运算提前做了转置,变为列向量
self.m, self.n = np.shape(self.trainDataMat) #m训练集数量 n样本特征数目
self.sigma = sigma #高斯核分母中的σ
self.C = C #惩罚参数
self.toler = toler #松弛变量
self.k = self.calcKernel() #核函数(初始化时提前计算)
self.b = 0 #SVM中的偏置b
self.alpha = [0] * self.trainDataMat.shape[0] # α 长度为训练集数目
self.E = [0 * self.trainLabelMat[i, 0] for i in range(self.trainLabelMat.shape[0])] #SMO运算过程中的Ei
self.supportVecIndex = []
def calcKernel(self):
'''
计算核函数
使用的是高斯核 详见7.3.3 常用核函数 式7.90
:return: 高斯核矩阵
'''
#初始化高斯核结果矩阵 大小 = 训练集长度m * 训练集长度m
#k[i][j] = Xi * Xj
k = [[0 for i in range(self.m)] for j in range(self.m)]
#大循环遍历XiXi为式7.90中的x
for i in range(self.m):
#每100个打印一次
#不能每次都打印,会极大拖慢程序运行速度
#因为print是比较慢的
if i % 100 == 0:
print('construct the kernel:', i, self.m)
#得到式7.90中的X
X = self.trainDataMat[i, :]
#小循环遍历XjXj为式7.90中的Z
# 由于 Xi * Xj 等于 Xj * Xi一次计算得到的结果可以
# 同时放在k[i][j]和k[j][i]中,这样一个矩阵只需要计算一半即可
#所以小循环直接从i开始
for j in range(i, self.m):
#获得Z
Z = self.trainDataMat[j, :]
#先计算||X - Z||^2
result = (X - Z) * (X - Z).T
#分子除以分母后去指数,得到的即为高斯核结果
result = np.exp(-1 * result / (2 * self.sigma**2))
#将Xi*Xj的结果存放入k[i][j]和k[j][i]中
k[i][j] = result
k[j][i] = result
#返回高斯核矩阵
return k
def isSatisfyKKT(self, i):
'''
查看第i个α是否满足KKT条件
:param i:α的下标
:return:
True满足
False不满足
'''
gxi =self.calc_gxi(i)
yi = self.trainLabelMat[i]
#判断依据参照“7.4.2 变量的选择方法”中“1.第1个变量的选择”
#式7.111到7.113
#--------------------
#依据7.111
if (math.fabs(self.alpha[i]) < self.toler) and (yi * gxi >= 1):
return True
#依据7.113
elif (math.fabs(self.alpha[i] - self.C) < self.toler) and (yi * gxi <= 1):
return True
#依据7.112
elif (self.alpha[i] > -self.toler) and (self.alpha[i] < (self.C + self.toler)) \
and (math.fabs(yi * gxi - 1) < self.toler):
return True
return False
def calc_gxi(self, i):
'''
计算g(xi)
依据7.101 两个变量二次规划的求解方法式7.104
:param i:x的下标
:return: g(xi)的值
'''
#初始化g(xi)
gxi = 0
#因为g(xi)是一个求和式+b的形式普通做法应该是直接求出求和式中的每一项再相加即可
#但是读者应该有发现在“7.2.3 支持向量”开头第一句话有说到“对应于α>0的样本点
#(xi, yi)的实例xi称为支持向量”。也就是说只有支持向量的α是大于0的在求和式内的
#对应的αi*yi*K(xi, xj)不为0非支持向量的αi*yi*K(xi, xj)必为0也就不需要参与
#到计算中。也就是说在g(xi)内部求和式的运算中,只需要计算α>0的部分其余部分可
#忽略。因为支持向量的数量是比较少的,这样可以再很大程度上节约时间
#从另一角度看抛掉支持向量的概念如果α为0αi*yi*K(xi, xj)本身也必为0从数学
#角度上将也可以扔掉不算
#index获得非零α的下标并做成列表形式方便后续遍历
index = [i for i, alpha in enumerate(self.alpha) if alpha != 0]
#遍历每一个非零αi为非零α的下标
for j in index:
#计算g(xi)
gxi += self.alpha[j] * self.trainLabelMat[j] * self.k[j][i]
#求和结束后再单独加上偏置b
gxi += self.b
#返回
return gxi
def calcEi(self, i):
'''
计算Ei
根据7.4.1 两个变量二次规划的求解方法式7.105
:param i: E的下标
:return:
'''
#计算g(xi)
gxi = self.calc_gxi(i)
#Ei = g(xi) - yi,直接将结果作为Ei返回
return gxi - self.trainLabelMat[i]
def getAlphaJ(self, E1, i):
'''
SMO中选择第二个变量
:param E1: 第一个变量的E1
:param i: 第一个变量α的下标
:return: E2α2的下标
'''
#初始化E2
E2 = 0
#初始化|E1-E2|为-1
maxE1_E2 = -1
#初始化第二个变量的下标
maxIndex = -1
#这一步是一个优化性的算法
#实际上书上算法中初始时每一个Ei应当都为-yi因为g(xi)由于初始α为0必然为0
#然后每次按照书中第二步去计算不同的E2来使得|E1-E2|最大,但是时间耗费太长了
#作者最初是全部按照书中缩写但是本函数在需要3秒左右所以进行了一些优化措施
#--------------------------------------------------
#在Ei的初始化中由于所有α为0所以一开始是设置Ei初始值为-yi。这里修改为与α
#一致初始状态所有Ei为0在运行过程中再逐步更新
#因此在挑选第二个变量时只考虑更新过Ei的变量但是存在问题
#1.当程序刚开始运行时所有Ei都是0那挑谁呢
# 当程序检测到并没有Ei为非0时将会使用随机函数随机挑选一个
#2.怎么保证能和书中的方法保持一样的有效性呢?
# 在挑选第一个变量时是有一个大循环的它能保证遍历到每一个xi并更新xi的值
#在程序运行后期后其实绝大部分Ei都已经更新完毕了。下方优化算法只不过是在程序运行
#的前半程进行了时间的加速,在程序后期其实与未优化的情况无异
#------------------------------------------------------
#获得Ei非0的对应索引组成的列表列表内容为非0Ei的下标i
nozeroE = [i for i, Ei in enumerate(self.E) if Ei != 0]
#对每个非零Ei的下标i进行遍历
for j in nozeroE:
#计算E2
E2_tmp = self.calcEi(j)
#如果|E1-E2|大于目前最大值
if math.fabs(E1 - E2_tmp) > maxE1_E2:
#更新最大值
maxE1_E2 = math.fabs(E1 - E2_tmp)
#更新最大值E2
E2 = E2_tmp
#更新最大值E2的索引j
maxIndex = j
#如果列表中没有非0元素了对应程序最开始运行时的情况
if maxIndex == -1:
maxIndex = i
while maxIndex == i:
#获得随机数如果随机数与第一个变量的下标i一致则重新随机
maxIndex = int(random.uniform(0, self.m))
#获得E2
E2 = self.calcEi(maxIndex)
#返回第二个变量的E2值以及其索引
return E2, maxIndex
def train(self, iter = 100):
#iterStep迭代次数超过设置次数还未收敛则强制停止
#parameterChanged单次迭代中有参数改变则增加1
iterStep = 0; parameterChanged = 1
#如果没有达到限制的迭代次数以及上次迭代中有参数改变则继续迭代
#parameterChanged==0时表示上次迭代没有参数改变如果遍历了一遍都没有参数改变说明
#达到了收敛状态,可以停止了
while (iterStep < iter) and (parameterChanged > 0):
#打印当前迭代轮数
print('iter:%d:%d'%( iterStep, iter))
#迭代步数加1
iterStep += 1
#新的一轮将参数改变标志位重新置0
parameterChanged = 0
#大循环遍历所有样本用于找SMO中第一个变量
for i in range(self.m):
#查看第一个遍历是否满足KKT条件如果不满足则作为SMO中第一个变量从而进行优化
if self.isSatisfyKKT(i) == False:
#如果下标为i的α不满足KKT条件则进行优化
#第一个变量α的下标i已经确定接下来按照“7.4.2 变量的选择方法”第二步
#选择变量2。由于变量2的选择中涉及到|E1 - E2|因此先计算E1
E1 = self.calcEi(i)
#选择第2个变量
E2, j = self.getAlphaJ(E1, i)
#参考“7.4.1两个变量二次规划的求解方法” P126 下半部分
#获得两个变量的标签
y1 = self.trainLabelMat[i]
y2 = self.trainLabelMat[j]
#复制α值作为old值
alphaOld_1 = self.alpha[i]
alphaOld_2 = self.alpha[j]
#依据标签是否一致来生成不同的L和H
if y1 != y2:
L = max(0, alphaOld_2 - alphaOld_1)
H = min(self.C, self.C + alphaOld_2 - alphaOld_1)
else:
L = max(0, alphaOld_2 + alphaOld_1 - self.C)
H = min(self.C, alphaOld_2 + alphaOld_1)
#如果两者相等,说明该变量无法再优化,直接跳到下一次循环
if L == H: continue
#计算α的新值
#依据“7.4.1两个变量二次规划的求解方法”式7.106更新α2值
#先获得几个k值用来计算事7.106中的分母η
k11 = self.k[i][i]
k22 = self.k[j][j]
k21 = self.k[j][i]
k12 = self.k[i][j]
#依据式7.106更新α2α2还未经剪切
alphaNew_2 = alphaOld_2 + y2 * (E1 - E2) / (k11 + k22 - 2 * k12)
#剪切α2
if alphaNew_2 < L: alphaNew_2 = L
elif alphaNew_2 > H: alphaNew_2 = H
#更新α1依据式7.109
alphaNew_1 = alphaOld_1 + y1 * y2 * (alphaOld_2 - alphaNew_2)
#依据“7.4.2 变量的选择方法”第三步式7.115和7.116计算b1和b2
b1New = -1 * E1 - y1 * k11 * (alphaNew_1 - alphaOld_1) \
- y2 * k21 * (alphaNew_2 - alphaOld_2) + self.b
b2New = -1 * E2 - y1 * k12 * (alphaNew_1 - alphaOld_1) \
- y2 * k22 * (alphaNew_2 - alphaOld_2) + self.b
#依据α1和α2的值范围确定新b
if (alphaNew_1 > 0) and (alphaNew_1 < self.C):
bNew = b1New
elif (alphaNew_2 > 0) and (alphaNew_2 < self.C):
bNew = b2New
else:
bNew = (b1New + b2New) / 2
#将更新后的各类值写入,进行更新
self.alpha[i] = alphaNew_1
self.alpha[j] = alphaNew_2
self.b = bNew
self.E[i] = self.calcEi(i)
self.E[j] = self.calcEi(j)
#如果α2的改变量过于小就认为该参数未改变不增加parameterChanged值
#反之则自增1
if math.fabs(alphaNew_2 - alphaOld_2) >= 0.00001:
parameterChanged += 1
#打印迭代轮数i值该迭代轮数修改α数目
print("iter: %d i:%d, pairs changed %d" % (iterStep, i, parameterChanged))
#全部计算结束后,重新遍历一遍α,查找里面的支持向量
for i in range(self.m):
#如果α>0说明是支持向量
if self.alpha[i] > 0:
#将支持向量的索引保存起来
self.supportVecIndex.append(i)
def calcSinglKernel(self, x1, x2):
'''
单独计算核函数
:param x1:向量1
:param x2: 向量2
:return: 核函数结果
'''
#按照“7.3.3 常用核函数”式7.90计算高斯核
result = (x1 - x2) * (x1 - x2).T
result = np.exp(-1 * result / (2 * self.sigma ** 2))
#返回结果
return np.exp(result)
def predict(self, x):
'''
对样本的标签进行预测
公式依据7.3.4 非线性支持向量分类机中的式7.94
:param x: 要预测的样本x
:return: 预测结果
'''
result = 0
for i in self.supportVecIndex:
#遍历所有支持向量,计算求和式
#如果是非支持向量求和子式必为0没有必须进行计算
#这也是为什么在SVM最后只有支持向量起作用
#------------------
#先单独将核函数计算出来
tmp = self.calcSinglKernel(self.trainDataMat[i, :], np.mat(x))
#对每一项子式进行求和,最终计算得到求和项的值
result += self.alpha[i] * self.trainLabelMat[i] * tmp
#求和项计算结束后加上偏置b
result += self.b
#使用sign函数返回预测结果
return np.sign(result)
def test(self, testDataList, testLabelList):
'''
测试
:param testDataList:测试数据集
:param testLabelList: 测试标签集
:return: 正确率
'''
#错误计数值
errorCnt = 0
#遍历测试集所有样本
for i in range(len(testDataList)):
#打印目前进度
print('test:%d:%d'%(i, len(testDataList)))
#获取预测结果
result = self.predict(testDataList[i])
#如果预测与标签不一致,错误计数值加一
if result != testLabelList[i]:
errorCnt += 1
#返回正确率
return 1 - errorCnt / len(testDataList)
if __name__ == '__main__':
start = time.time()
# 获取训练集及标签
print('start read transSet')
trainDataList, trainLabelList = loadData('../Mnist/mnist_train.csv')
# 获取测试集及标签
print('start read testSet')
testDataList, testLabelList = loadData('../Mnist/mnist_test.csv')
#初始化SVM类
print('start init SVM')
svm = SVM(trainDataList[:1000], trainLabelList[:1000], 10, 200, 0.001)
# 开始训练
print('start to train')
svm.train()
# 开始测试
print('start to test')
accuracy = svm.test(testDataList[:100], testLabelList[:100])
print('the accuracy is:%d'%(accuracy * 100), '%')
# 打印时间
print('time span:', time.time() - start)