博客
关于我
spark之MLLIB
阅读量:558 次
发布时间:2019-03-09

本文共 2682 字,大约阅读时间需要 8 分钟。

1、MLlib简介

MLlib是Spark的机器学习(Machine Learning)库,旨在简化机器学习的工程实践工作,并方便扩展到更大规模。MLlib由一些通用的学习算法和工具组成,包括分类、回归、聚类、协同过滤、降维等,同时还包括底层的优化原语和高层的管道API。具体来说,其主要包括以下几方面的内容:

  1. 算法工具:常用的学习算法,如分类、回归、聚类和协同过滤;
  2. 特征化工具:特征提取、转化、降维,和选择工具;
  3. 管道(Pipeline):用于构建、评估和调整机器学习管道的工具;
  4. 持久性:保存和加载算法,模型和管道;
  5. 实用工具:线性代数,统计,数据处理等工具。

2、MLlib支持的主要机器学习算法

Spark在机器学习方面的发展非常快,目前已经支持了主流的统计和机器学习算法。纵观所有基于分布式架构的开源机器学习库,MLlib可以算是计算效率最高的。MLlib目前支持4种常见的机器学习问题: 分类、回归、聚类和协同过滤。下表列出了目前MLlib支持的主要的机器学习算法:

3、机器学习工作流(PipeLine)

3.1、工作流的组成

  • DataFrame:使用Spark SQL中的DataFrame作为数据集,它可以容纳各种数据类型。 较之 RDD,包含了 schema 信息,更类似传统数据库中的二维表格。它被 ML Pipeline 用来存储源数据。例如,DataFrame中的列可以是存储的文本,特征向量,真实标签和预测的标签等。
  • Transformer:翻译成转换器,是一种可以将一个DataFrame转换为另一个DataFrame的算法。比如一个模型就是一个 Transformer。它可以把 一个不包含预测标签的测试数据集 DataFrame 打上标签,转化成另一个包含预测标签的 DataFrame。技术上,Transformer实现了一个方法transform(),它通过附加一个或多个列将一个DataFrame转换为另一个DataFrame。

     

  • Estimator:翻译成估计器或评估器,它是学习算法或在训练数据上的训练方法的概念抽象。在 Pipeline 里通常是被用来操作 DataFrame 数据并生产一个 Transformer。从技术上讲,Estimator实现了一个方法fit(),它接受一个DataFrame并产生一个转换器。如一个随机森林算法就是一个 Estimator,它可以调用fit(),通过训练特征数据而得到一个随机森林模型。

  • Parameter:Parameter 被用来设置 Transformer 或者 Estimator 的参数。现在,所有转换器和估计器可共享用于指定参数的公共API。ParamMap是一组(参数,值)对。

  • PipeLine:翻译为工作流或者管道。工作流将多个工作流阶段(转换器和估计器)连接在一起,形成机器学习的工作流,并获得结果输出。

3.2、如何构建一个工作流

定义 Pipeline 中的各个工作流阶段PipelineStage,(包括转换器和评估器),比如指标提取和转换模型训练等。有了这些处理特定问题的转换器和 评估器,就可以按照具体的处理逻辑有序的组织PipelineStages 并创建一个Pipeline。

pipeline =Pipeline(stages=[stage1,stage2,stage3])

在训练阶段,管道如下,以DataFrame存储的行形式的文本(Raw text)经过Tokenizer转化变成了词(Words),词经HashingTF转化变成了特征(Feature vectors),特征经LR得到了回归模型。

测试过程的管道如下:

3.3、构建管道实例 

设置sparkSession

from pyspark.sql import SparkSessionspark = SparkSession.builder.master("local").appName("spark ML").getOrCreate()

设置机器学习相关包

from pyspark.ml import Pipelinefrom pyspark.ml.classification import LogisticRegressionfrom pyspark.ml.feature import HashingTF,Tokenizer

创建训练集

#创建DataFrame训练集#训练集包括字段id,text,labeldf_train = spark.createDataFrame([    (0, "a b c d e spark", 1.0),    (1, "b d", 0.0),    (2, "spark f g h", 1.0),    (3, "hadoop mapreduce", 0.0)], ["id", "text", "label"])

定义工作流,构建训练管道,得到训练模型

#构建转化器和评估器#定义分词器,spark自带的Tokenizer以空格分词;inputCol为输入的列名,outputCol为转化输出的列名tokenizer=Tokenizer(inputCol="text", outputCol="words")hashTf=HashingTF(inputCol=tokenizer.getOutputCol(), outputCol="features")lr = LogisticRegression(maxIter=10, regParam=0.001)#创建训练管道pipeline = Pipeline(stages=[tokenizer, hashingTF, lr])#训练模型model=pipeline.fit(df_train)

构建测试集及预测

#测试DataFrame构建df_test=spark.createDataFrame([    (4, "spark i j k"),    (5, "l m n"),    (6, "spark hadoop spark"),    (7, "apache hadoop")], ["id", "text"])#测试predict=model.transform(df_test)#显示预测结果id|text|words|features|rawPrediction|probability|predictionpredict.show()

参考

 

转载地址:http://oubpz.baihongyu.com/

你可能感兴趣的文章
mysql 创建表,不能包含关键字values 以及 表id自增问题
查看>>
mysql 删除日志文件详解
查看>>
mysql 判断表字段是否存在,然后修改
查看>>
MySQL 到底能不能放到 Docker 里跑?
查看>>
mysql 前缀索引 命令_11 | Mysql怎么给字符串字段加索引?
查看>>
MySQL 加锁处理分析
查看>>
mysql 协议的退出命令包及解析
查看>>
mysql 参数 innodb_flush_log_at_trx_commit
查看>>
mysql 取表中分组之后最新一条数据 分组最新数据 分组取最新数据 分组数据 获取每个分类的最新数据
查看>>
MySQL 命令和内置函数
查看>>
mysql 四种存储引擎
查看>>
MySQL 在并发场景下的问题及解决思路
查看>>
MySQL 基础架构
查看>>
MySQL 基础模块的面试题总结
查看>>
MySQL 备份 Xtrabackup
查看>>
mYSQL 外键约束
查看>>
mysql 多个表关联查询查询时间长的问题
查看>>
mySQL 多个表求多个count
查看>>
mysql 多字段删除重复数据,保留最小id数据
查看>>
MySQL 多表联合查询:UNION 和 JOIN 分析
查看>>