Spark中的管道pipeline
ML Pipelines
管道中的主要概念
MLlib对机器学习算法的API进行了标准化,使得将多种算法合并成一个流水线或工作流变得更加容易。本部分涵盖了Pipelines API引入的关键概念,其中流水线概念主要受scikit-learn项目的启发。
DataFrame:这个ML API使用Spark SQL中的DataFrame作为一个ML数据集,它可以容纳各种数据类型。例如,一个DataFrame可以具有存储文本,特征向量,真实标签和预测的不同列。
Transformer:一个Transformer是可以将一个DataFrame变换成成另一个DataFrame的算法。例如,一个ML模型是一个Transformer将一个DataFrame特征转化为一个DataFrame预测的模型。
Estimator:一个 Estimator是一个可以被应用在DataFrame上来产生一个Transformer的算法。例如,一个学习算法是一种Estimator,它可以在DataFrame上训练并生成模型。
Pipeline:Pipeline将多个Transformers和Estimators连接起来以指定ML工作流程。
Parameter:所有Transformers和Estimators现在对于指定参数共享通用API。
DataFrame(数据帧)
机器学习可以应用于各种数据类型,如向量,文本,图像和结构化数据。这个API采用DataFrameSpark SQL来支持各种数据类型。
DataFrame支持许多基本和结构化的类型; 请参阅Spark SQL数据类型参考以获取受支持类型的列表。除了Spark SQL指南中列出的类型以外,DataFrame还可以使用ML Vector类型。
A DataFrame可以隐式地或显式地从常规创建RDD。有关示例,请参阅下面的代码示例和Spark SQL编程指南。
a DataFrame中的列被命名。下面的代码示例使用“text”,“feature”和“label”等名称。
Pipeline components(管道组件)
Transformers
A Transformer是包含特征变换器和学习模型的抽象。从技术上来说,a Transformer实现了一种方法(transform()),将一个DataFrame转换为另一个的方法,通常通过附加一列或多列。例如:
- 特征转换器选取一个DataFrame,读取列(例如文本),将其映射到新的列(例如特征向量),并且输出具有附加映射列的新DataFrame
- 学习模型可以选取一个DataFrame,读取包含特征向量的列,预测每个特征向量的标签,并输出带有预测标签的新列的DataFrame。
Estimators
一个Estimator是在数据集上训练的学习算法的抽象概念。从技术上讲,一个Estimator实现了一个方法 fit(),它接受DataFrame并生成一个 Model,这是一个Transformer。例如,一个学习算法,如LogisticRegression(它是一个Estimator),调用 fit() 函数来训练一个LogisticRegressionModel模型,它是一个Model也是一个Transformer。
Properties o pipeline components
Transformer.transform()s和Estimator.fit()s都是无状态的。将来,有状态算法可以通过替代概念来支持。
每个Transformer或者Estimator的实例具有唯一的ID,这在指定参数(在下面讨论)中是有用的。
Pipeline
在机器学习中,通常运行一系列算法来处理和学习数据。例如,简单的文本文档处理工作流程可能包括几个阶段:
- 将每个文档的文本分词。
- 将每个文档的单词转换为数字特征向量。
- 使用特征向量和标签来学习预测模型。
MLlib表示这样一个工作流程Pipeline,它由一系列 PipelineStages(Transformers和Estimators)组成,并以特定顺序运行。我们将使用这个简单的工作流程作为本节中的一个运行示例。
How it works
A Pipeline是一个阶段序列,每个阶段是一个Transformer或一个Estimator。这些阶段是按顺序运行的,输入的DataFrame在每个阶段都经过转换。对于Transformer阶段,transform() 方法被调用作用于DataFrame上。对于Estimator阶段,fit()方法被调用,以产生Transformer(它成为PipelineModel或合适的Pipeline的一部分),以及Transformer的transform()方法也被调用作用于DataFrame。
我们用简单的文本文档工作流来说明这一点。下图是a 。训练时间的使用情况Pipeline。
在上面,最上面一行代表一个Pipeline有三个阶段。前两个(Tokenizer和HashingTF)是Transformers(蓝色),第三个(LogisticRegression)是Estimator(红色)。最下面一行代表流经管道的数据,其中圆柱表示DataFrames。这个Pipeline.fit()方法在原始DataFrame文档和标签上被调用。Tokenizer.transform()方法将原始文本文档分词,分词后的words作为一个新列添加到DataFrame中。HashingTF.transform()方法将单词列转换为特征向量,并向这些向量作为一个新列添加到DataFrame中。现在,既然LogisticRegression是一个Estimator,Pipeline首先调用LogisticRegression.fit()方法就生成一个LogisticRegressionModel。如果Pipeline有更多Estimators,它就会在DataFrame传送到下个阶段之前调用LogisticRegressionModel的transform() 方法。
一个Pipeline是一个Estimator。因此,在Pipeline的fit()方法运行后,它产生一个PipelineModel,这是一个 Transformer。这PipelineModel是在测试时使用 ; 下图说明了这种用法。
在上面的图中,PipelineModel具有和原始的Pipeline相同数量的阶段,但所有EstimatorS在原始Pipeline中已变成TransformerS。当PipelineModel的transform()方法在测试数据集被调用,数据在管道上按序传递。每个阶段的transform()方法都会更新数据集并将其传递到下一个阶段。
Pipelines和PipelineModels有助于确保训练和测试数据经过相同的特征处理步骤。
Details
DAG Pipelines:A Pipeline的阶段被指定为一个有序数组。这里给出的例子都是线性Pipeline的,即Pipeline每个阶段使用前一阶段产生的数据。Pipeline只要数据流图形成有向无环图(DAG),就可以创建非线性的PipelineS。该图当前是基于每个阶段的输入和输出列名(通常指定为参数)隐含指定的。如果Pipeline形式为DAG,那么阶段必须按拓扑顺序指定。
Runtime checking:由于Pipelines可以在不同类型的DataFrames上运行,所以不能使用compile-time类型检查。 Pipelines和PipelineModels,而是在实际运行Pipeline之前进行runtime checking检查。这种类型的检查是通过使用DataFrame schema来完成的,schema是对DataFrame的列的数据类型的描述。
Unique Pipeline stages:A Pipeline的阶段应该是独一无二的实例。例如,同一个实例 myHashingTF不应该插入Pipeline两次,因为Pipeline阶段必须有唯一的ID。然而,不同的实例myHashingTF1和myHashingTF2(两个类型HashingTF)可以放在一起,Pipeline因为创建不同的实例使用不同的ID。
Parameters
MLlib Estimators和Transformers使用统一的API来指定参数。
A Param是一个带有自包含文档的命名参数。A ParamMap是一组(参数,值)对。
将参数传递给算法有两种主要方法:
- 为实例设置参数。例如,如果lr是的一个实例LogisticRegression,它可以调用lr.setMaxIter(10)让lr.fit()至多10次迭代使用。这个API类似于spark.mllib包中使用的API 。
- 传递ParamMap给fit()或transform()。任何在ParamMap中额参数将覆盖以前通过setter方法指定的参数。
参数属于Estimators和Transformers的特定实例。例如,如果我们有两个LogisticRegression实例lr1和lr2,然后我们可以建立一个ParamMap与两个maxIter指定的参数:ParamMap(lr1.maxIter -> 10, lr2.maxIter -> 20)。如果一个Pipeline里有两个包含maxIter参数的算法,那么这很有用。
Saving and LoadingPipelines
通常情况下,会将模型或管道保存到磁盘供以后使用。在Spark 1.6中,模型导入/导出功能被添加到管道API中。大多数基本的Transformers都和一些更加基本的ML模型一样被支持。请参阅算法的API文档以查看是否支持保存和加载。
Code examples
本节给出了说明上述功能的代码示例。有关更多信息,请参阅API文档(Scala, Java和Python)。
Example: Estimator, Transformer, and Param
这个例子涉及的概念Estimator,Transformer和Param。
请参阅EstimatorPython文档,TransformerPython文档和ParamsPython文档以获取有关API的更多详细信息。
1 | from pyspark.ml.linalg import Vectors |
Find full example code at “examples/src/main/python/ml/estimator_transformer_param_example.py” in the Spark repo.
Example: Pipeline
本示例遵循Pipeline上图中所示的简单文本文档。
有关APi的更多详细信息,请参阅PipelinePython文档
1 | from pyspark.ml import Pipeline |
Find full example code at “examples/src/main/python/ml/pipeline_example.py” in the Spark repo.
Model selection (hyperparameter tuning)- 模型选择(超参数调整)
使用ML管道的一大好处是超参数优化。有关自动模型选择的更多信息,请参阅ML调整指南。