KNN(K-Nearest Neighbor)最邻近分类算法是数据挖掘分类(classification)技术中常用算法之一,其指导思想是"近朱者赤,近墨者黑",即由你的邻居来推断出你的类别。
KNN最邻近分类算法的实现原理:为了判断未知样本的类别,以所有已知类别的样本作为参照,计算未知样本与所有已知样本的距离,从中选取与未知样本距离最近的K个已知样本,再根据少数服从多数的投票法则(majority-voting),将未知样本与K个最邻近样本中所属类别占比较多的归为一类。
KNN算法的核心思想:寻找最近的k个数据,推测新数据的分类
KNN算法的关键:
以计算二维空间中的A(x1,y1)、B(x2,y2)两点之间的距离为例,常用的欧氏距离的计算方法如下图所示:
要自己动手用 Python 实现 KNN 算法,主要有以下三个步骤:
数据来源:http://www.tianqihoubao.com/aqi/chengdu-201901.html
对于这种 Table 表格型数据,可以直接用 pandas 的 read_html() 大法,将数据保存到csv,也就不用再写爬虫去解析网页和提取数据了。
# -*- coding: UTF-8 -*-
"""
@File :spider.py
@Author :叶庭云
@CSDN :https://yetingyun.blog.csdn.net/
@http://www.tianqihoubao.com/aqi/beijing-201901.html
"""
import pandas as pd
import logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s: %(message)s')
for page in range(1, 13): # 12个月
if page < 10:
url = f'http://www.tianqihoubao.com/aqi/guangzhou-20190{page}.html'
df = pd.read_html(url, encoding='gbk')[0]
if page == 1:
df.to_csv('2019年广州空气质量数据.csv', mode='a+', index=False, header=False)
else:
df.iloc[1:,::].to_csv('2019年广州空气质量数据.csv', mode='a+', index=False, header=False)
else:
url = f'http://www.tianqihoubao.com/aqi/guangzhou-2019{page}.html'
df = pd.read_html(url, encoding='gbk')[0]
df.iloc[1:,::].to_csv('2019年广州空气质量数据.csv', mode='a+', index=False, header=False)
logging.info(f'{page}月空气质量数据下载完成!')
多爬取几个城市 2019 年历史空气质量数据保存到本地
import pandas as pd
# 将2019年成都空气质量数据作为测试集
df = pd.read_csv('2019年成都空气质量数据.csv')
# 取质量等级 AQI指数 当天AQI排名 PM2.5 。。。8列数据
# SettingWithCopyWarning: A value is trying to be set on a copy of a slice from a DataFrame 解决方法
df1 = df[['AQI指数', '当天AQI排名', 'PM2.5', 'PM10', 'So2', 'No2', 'Co', 'O3']].copy()
air_quality = []
# print(df['质量等级'].value_counts())
# 质量等级列数据为字符串 转为为标签 便于判断预测
for i in df['质量等级']:
if i == "优":
air_quality.append('1')
elif i == "良":
air_quality.append('2')
elif i == "轻度污染":
air_quality.append('3')
elif i == "中度污染":
air_quality.append('4')
elif i == "重度污染":
air_quality.append('5')
elif i == "严重污染":
air_quality.append('6')
print(air_quality)
df1['空气质量'] = air_quality
# 将数据写入test.txt
# print(df1.values, type(df1.values)) # <class 'numpy.ndarray'>
with open('test.txt', 'w') as f:
for x in df1.values:
print(x)
s = ','.join([str(i) for i in x])
# print(s, type(s))
f.write(s + '\n')
import pandas as pd
# 自定义其他几个城市空气质量数据作为训练集
df = pd.read_csv('2019年天津空气质量数据.csv', encoding='utf-8')
# 取质量等级 AQI指数 当天AQI排名 PM2.5 。。。8列数据
# SettingWithCopyWarning: A value is trying to be set on a copy of a slice from a DataFrame 解决方法
df1 = df[['AQI指数', '当天AQI排名', 'PM2.5', 'PM10', 'So2', 'No2', 'Co', 'O3']].copy()
air_quality = []
# print(df['质量等级'].value_counts())
# 质量等级列数据为字符串 转为为数字标识
for i in df['质量等级']:
if i == "优":
air_quality.append('1')
elif i == "良":
air_quality.append('2')
elif i == "轻度污染":
air_quality.append('3')
elif i == "中度污染":
air_quality.append('4')
elif i == "重度污染":
air_quality.append('5')
elif i == "严重污染":
air_quality.append('6')
print(air_quality)
df1['空气质量'] = air_quality
# 将数据写入追加写入到train.txt
# print(df1.values, type(df1.values)) # <class 'numpy.ndarray'>
with open('train.txt', 'a+') as f:
for x in df1.values:
print(x)
s = ','.join([str(i) for i in x])
# print(s, type(s))
f.write(s + '\n')
读取数据集
def read_dataset(filename1, filename2, trainingSet, testSet):
with open(filename1, 'r') as csvfile:
lines = csv.reader(csvfile) # 读取所有的行
dataset1 = list(lines) # 转化成列表
for x in range(len(dataset1)): # 每一行数据
for y in range(8):
dataset1[x][y] = float(dataset1[x][y]) # 8个参数转换为浮点数
testSet.append(dataset1[x]) # 生成测试集
with open(filename2, 'r') as csvfile:
lines = csv.reader(csvfile) # 读取所有的行
dataset2 = list(lines) # 转化成列表
for x in range(len(dataset2)): # 每一行数据
for y in range(8):
dataset2[x][y] = float(dataset2[x][y]) # 8个参数转换为浮点数
trainingSet.append(dataset2[x]) # 生成训练集
计算欧氏距离
def calculateDistance(testdata, traindata, length): # 计算距离
distance = 0 # length表示维度 数据共有几维
for x in range(length):
distance += pow((int(testdata[x]) - int(traindata[x])), 2)
return round(math.sqrt(distance), 3) # 保留3位小数
找K个相邻最近的邻居
def getNeighbors(self, trainingSet, test_instance, k): # 返回最近的k个边距
distances = []
length = len(test_instance)
# 对训练集的每一个数计算其到测试集的实际距离
for x in range(len(trainingSet)):
dist = self.calculateDistance(test_instance, trainingSet[x], length)
print('训练集:{} --- 距离:{}'.format(trainingSet[x], dist))
distances.append((trainingSet[x], dist))
distances.sort(key=operator.itemgetter(1)) # 按距离从小到大排列
# print(distances)
neighbors = []
# 排序完成后取距离最小的前k个
for x in range(k):
neighbors.append(distances[x][0])
print(neighbors)
return neighbors
计算比例最大的分类
def getResponse(neighbors): # 根据少数服从多数,决定归类到哪一类
class_votes = {}
for x in range(len(neighbors)):
response = neighbors[x][-1] # 统计每一个分类的多少 空气质量的数字标识
if response in class_votes:
class_votes[response] += 1
else:
class_votes[response] = 1
print(class_votes.items())
sortedVotes = sorted(class_votes.items(), key=operator.itemgetter(1), reverse=True) # 按分类大小排序 降序
return sortedVotes[0][0] # 分类最大的 少数服从多数 为预测结果
预测准确率计算
def getAccuracy(test_set, predictions):
correct = 0
for x in range(len(test_set)):
# predictions预测的与testset实际的比对 计算预测的准确率
if test_set[x][-1] == predictions[x]:
correct += 1
else:
# 查看错误预测
print(test_set[x], predictions[x])
print('有{}个预测正确,共有{}个测试数据'.format(correct, len(test_set)))
return (correct / (len(test_set))) * 100.0
run函数调用
def run(self):
training_set = [] # 训练集
test_set = [] # 测试集
self.read_dataset('./train_4/test.txt', './train_4/train.txt', training_set, test_set) # 数据划分
print('Train set: ' + str(len(training_set)))
print('Test set: ' + str(len(test_set)))
# generate predictions
predictions = []
k = 7 # 取最近的6个数据
for x in range(len(test_set)): # 对所有的测试集进行测试
neighbors = self.getNeighbors(training_set, test_set[x], k) # 找到8个最近的邻居
result = self.getResponse(neighbors) # 找这7个邻居归类到哪一类
predictions.append(result)
accuracy = self.getAccuracy(test_set, predictions)
print('预测准确度为: {:.2f}%'.format(accuracy)) # 保留2位小数
运行效果如下:
可以通过增加训练集城市空气质量数据量,调节找邻居的数量k,提高预测准确率。
作者:叶庭云 微信公众号:修炼Python CSDN:https://yetingyun.blog.csdn.net/ 本文仅用于交流学习,未经作者允许,禁止转载,更勿做其他用途,违者必究。 觉得文章对你有帮助、让你有所收获的话,期待你的点赞呀,不足之处,也可以在评论区多多指正。