Statistical-Learning-Method.../Logistic_and_maximum_entropy_models/maxEntropy.py

300 lines
12 KiB
Python
Raw Normal View History

2018-11-30 11:24:29 +08:00
# coding=utf-8
# Author:Dodo
# Date:2018-11-30
# Email:lvtengchao@pku.edu.cn
# Blog:www.pkudodo.com
'''
数据集Mnist
训练集数量60000(实际使用:20000)
测试集数量10000
------------------------------
运行结果
正确率96.9%
运行时长8.8h
备注对于mnist而言李航的统计学习方法中有一些关键细节没有阐述
建议先阅读我的个人博客其中有详细阐述阅读结束后再看该程序
Blogwww.pkudodo.com
'''
import time
import numpy as np
from collections import defaultdict
def loadData(fileName):
'''
加载Mnist数据集
:param fileName:要加载的数据集路径
:return: list形式的数据集及标记
'''
# 存放数据及标记的list
dataList = []; labelList = []
# 打开文件
fr = open(fileName, 'r')
# 将文件按行读取
for line in fr.readlines():
# 对每一行数据按切割福','进行切割,返回字段列表
curLine = line.strip().split(',')
2019-02-01 22:35:02 +08:00
#二分类list中放置标签
2018-11-30 11:24:29 +08:00
if int(curLine[0]) == 0:
labelList.append(1)
else:
labelList.append(0)
#二值化
dataList.append([int(int(num) > 128) for num in curLine[1:]])
#返回data和label
return dataList, labelList
class maxEnt:
'''
最大熵类
'''
def __init__(self, trainDataList, trainLabelList, testDataList, testLabelList):
'''
各参数初始化
'''
self.trainDataList = trainDataList #训练数据集
self.trainLabelList = trainLabelList #训练标签集
self.testDataList = testDataList #测试数据集
self.testLabelList = testLabelList #测试标签集
self.featureNum = len(trainDataList[0]) #特征数量
self.N = len(trainDataList) #总训练集长度
self.n = 0 #训练集中xiy对数量
self.M = 10000 #
self.fixy = self.calc_fixy() #所有(x, y)对出现的次数
self.w = [0] * self.n #Pw(y|x)中的w
self.xy2idDict, self.id2xyDict = self.createSearchDict() #(x, y)->id和id->(x, y)的搜索字典
self.Ep_xy = self.calcEp_xy() #Ep_xy期望值
def calcEpxy(self):
'''
计算特征函数f(x, y)关于模型P(Y|X)与经验分布P_(X, Y)的期望值P后带下划线_表示P上方的横线
程序中部分下划线表示|部分表示上方横线请根据具体公式自行判断,
6.2.2 最大熵模型的定义中第二个期望83页最上方的期望
:return:
'''
#初始化期望存放列表对于每一个xy对都有一个期望
#这里的x是单个的特征不是一个样本的全部特征。例如x={x1x2x3.....xk}实际上是x1yx2y。。。
#但是在存放过程中需要将不同特诊的分开存放,李航的书可能是为了公式的泛化性高一点,所以没有对这部分提及
#具体可以看我的博客,里面有详细介绍 www.pkudodo.com
Epxy = [0] * self.n
#对于每一个样本进行遍历
for i in range(self.N):
#初始化公式中的P(y|x)列表
Pwxy = [0] * 2
#计算P(y = 0 } X)
#注程序中X表示是一个样本的全部特征x表示单个特征这里是全部特征的一个样本
Pwxy[0] = self.calcPwy_x(self.trainDataList[i], 0)
#计算P(y = 1 } X)
Pwxy[1] = self.calcPwy_x(self.trainDataList[i], 1)
for feature in range(self.featureNum):
for y in range(2):
if (self.trainDataList[i][feature], y) in self.fixy[feature]:
id = self.xy2idDict[feature][(self.trainDataList[i][feature], y)]
Epxy[id] += (1 / self.N) * Pwxy[y]
return Epxy
def calcEp_xy(self):
'''
计算特征函数f(x, y)关于经验分布P_(x, y)的期望值下划线表示P上方的横线
同理Ep_xy中的_也表示p上方的横线
6.2.2 最大熵的定义中第一个期望82页最下方那个式子
:return: 计算得到的Ep_xy
'''
#初始化Ep_xy列表长度为n
Ep_xy = [0] * self.n
#遍历每一个特征
for feature in range(self.featureNum):
#遍历每个特征中的(x, y)对
for (x, y) in self.fixy[feature]:
#获得其id
id = self.xy2idDict[feature][(x, y)]
#将计算得到的Ep_xy写入对应的位置中
#fixy中存放所有对在训练集中出现过的次数处于训练集总长度N就是概率了
Ep_xy[id] = self.fixy[feature][(x, y)] / self.N
#返回期望
return Ep_xy
def createSearchDict(self):
'''
创建查询字典
xy2idDict通过(x,y)对找到其id,所有出现过的xy对都有一个id
id2xyDict通过id找到对应的(x,y)
'''
#设置xy搜多id字典
#这里的x指的是单个的特征而不是某个样本因此将特征存入字典时也需要存入这是第几个特征
#这一信息,这是为了后续的方便,否则会乱套。
#比如说一个样本X = (0, 1, 1) label =(1)
#生成的标签对有(0, 1), (1, 1), (1, 1),三个(xy)对并不能判断属于哪个特征的,后续就没法往下写
#不可能通过(1, 1)就能找到对应的id因为对于(1, 1),字典中有多重映射
#所以在生成字典的时总共生成了特征数个字典例如在mnist中样本有784维特征所以生成784个字典属于
#不同特征的xy存入不同特征内的字典中使其不会混淆
xy2idDict = [{} for i in range(self.featureNum)]
#初始化id到xy对的字典。因为id与(xy)的指向是唯一的,所以可以使用一个字典
id2xyDict = {}
#设置缩影其实就是最后的id
index = 0
#对特征进行遍历
for feature in range(self.featureNum):
#对出现过的每一个(x, y)对进行遍历
#fixy内部存放特征数目个字典对于遍历的每一个特征单独读取对应字典内的(x, y)对
for (x, y) in self.fixy[feature]:
#将该(x, y)对存入字典中,要注意存入时通过[feature]指定了存入哪个特征内部的字典
#同时将index作为该对的id号
xy2idDict[feature][(x, y)] = index
#同时在id->xy字典中写入id号val为(x, y)对
id2xyDict[index] = (x, y)
#id加一
index += 1
#返回创建的两个字典
return xy2idDict, id2xyDict
def calc_fixy(self):
'''
计算(x, y)在训练集中出现过的次数
:return:
'''
#建立特征数目个字典,属于不同特征的(x, y)对存入不同的字典中,保证不被混淆
fixyDict = [defaultdict(int) for i in range(self.featureNum)]
#遍历训练集中所有样本
for i in range(len(self.trainDataList)):
#遍历样本中所有特征
for j in range(self.featureNum):
#将出现过的(x, y)对放入字典中并计数值加1
fixyDict[j][(self.trainDataList[i][j], self.trainLabelList[i])] += 1
#对整个大字典进行计数,判断去重后还有多少(x, y)对写入n
for i in fixyDict:
self.n += len(i)
#返回大字典
return fixyDict
def calcPwy_x(self, X, y):
'''
计算6.23 最大熵模型的学习 式6.22
:param X: 要计算的样本X一个包含全部特征的样本
:param y: 该样本的标签
:return: 计算得到的Pw(Y|X)
'''
#分子
numerator = 0
#分母
Z = 0
#对每个特征进行遍历
for i in range(self.featureNum):
#如果该(xi,y)对在训练集中出现过
if (X[i], y) in self.xy2idDict[i]:
#在xy->id字典中指定当前特征i以及(x, y)对:(X[i], y)读取其id
index = self.xy2idDict[i][(X[i], y)]
#分子是wi和fi(xy)的连乘再求和,最后指数
#由于当(x, y)存在时fi(xy)为1因为xy对肯定存在所以直接就是1
#对于分子来说就是n个wi累加最后再指数就可以了
#因为有n个w所以通过id将w与xy绑定前文的两个搜索字典中的id就是用在这里
numerator += self.w[index]
#同时计算其他一种标签y时候的分子下面的z并不是全部的分母再加上上式的分子以后
#才是完整的分母即z = z + numerator
if (X[i], 1-y) in self.xy2idDict[i]:
#原理与上式相同
index = self.xy2idDict[i][(X[i], 1-y)]
Z += self.w[index]
#计算分子的指数
numerator = np.exp(numerator)
#计算分母的z
Z = np.exp(Z) + numerator
#返回Pw(y|x)
return numerator / Z
def maxEntropyTrain(self, iter = 500):
#设置迭代次数寻找最优解
for i in range(iter):
#单次迭代起始时间点
iterStart = time.time()
#计算“6.2.3 最大熵模型的学习”中的第二个期望83页最上方哪个
Epxy = self.calcEpxy()
#使用的是IIS所以设置sigma列表
sigmaList = [0] * self.n
#对于所有的n进行一次遍历
for j in range(self.n):
#依据“6.3.1 改进的迭代尺度法” 式6.34计算
sigmaList[j] = (1 / self.M) * np.log(self.Ep_xy[j] / Epxy[j])
#按照算法6.1步骤二中的b更新w
self.w = [self.w[i] + sigmaList[i] for i in range(self.n)]
#单次迭代结束
iterEnd = time.time()
#打印运行时长信息
print('iter:%d:%d, time:%d'%(i, iter, iterStart - iterEnd))
def predict(self, X):
'''
预测标签
:param X:要预测的样本
:return: 预测值
'''
#因为y只有0和1所有建立两个长度的概率列表
result = [0] * 2
#循环计算两个概率
for i in range(2):
#计算样本x的标签为i的概率
result[i] = self.calcPwy_x(X, i)
#返回标签
#max(result)找到result中最大的那个概率值
#result.index(max(result))通过最大的那个概率值再找到其索引索引是0就返回01就返回1
return result.index(max(result))
def test(self):
'''
对测试集进行测试
:return:
'''
#错误值计数
errorCnt = 0
#对测试集中所有样本进行遍历
for i in range(len(self.testDataList)):
#预测该样本对应的标签
result = self.predict(self.testDataList[i])
#如果错误计数值加1
if result != self.testLabelList[i]: errorCnt += 1
#返回准确率
return 1 - errorCnt / len(self.testDataList)
if __name__ == '__main__':
start = time.time()
# 获取训练集及标签
print('start read transSet')
trainData, trainLabel = loadData('../Mnist/mnist_train.csv')
# 获取测试集及标签
print('start read testSet')
testData, testLabel = loadData('../Mnist/mnist_test.csv')
#初始化最大熵类
maxEnt = maxEnt(trainData[:20000], trainLabel[:20000], testData, testLabel)
#开始训练
print('start to train')
maxEnt.maxEntropyTrain()
#开始测试
print('start to test')
accuracy = maxEnt.test()
print('the accuracy is:', accuracy)
# 打印时间
2019-02-01 22:35:02 +08:00
print('time span:', time.time() - start)