登录
首页精彩阅读spark机器学习-聚类
spark机器学习-聚类
2018-04-05
收藏

spark机器学习-聚类

聚类算法是一种无监督学习任务,用于将对象分到具有高度相似性的聚类中,聚类算法的思想简单的说就是物以类聚的思想,相同性质的点在空间中表现的较为紧密和接近,主要用于数据探索与异常检测,最常用的一种聚类算法是K均值(K-means)聚类算法

算法原理
kmeans的计算方法如下:
1 选取k个中心点
2 遍历所有数据,将每个数据划分到最近的中心点中
3 计算每个聚类的平均值,并作为新的中心点
4 重复2-3,直到这k个中线点不再变化(收敛了),或执行了足够多的迭代
算法的时间复杂度上界为O(n*k*t), 其中k为输入的聚类个数,n为数据量,t为迭代次数。一般t,k,n均可认为是常量,时间和空间复杂度可以简化为O(n),即线性的

spark ml编码实践

可在spark-shell环境下修改参数调试以下代码,可以用实际的业务数据做测试评估,业务数据一般是多列,可以把维度列用VectorAssembler组装成向量列做为Kmeans算法的输入列,考虑现实的应用场景,比如做异常数据检测,正常数据分为一类,异常数据分为几类,分别统计正常数据与异常数据的数据量,求百分比等

<span style="font-size:18px;">import org.apache.spark.ml.clustering.KMeans  
    import org.apache.spark.mllib.linalg.Vectors  
      
    val dataset = sqlContext.createDataFrame(Seq(  
    (1, Vectors.dense(0.0, 0.0, 0.0)),  
    (2, Vectors.dense(0.1, 0.1, 0.1)),  
    (3, Vectors.dense(0.2, 0.2, 0.2)),  
    (4, Vectors.dense(9.0, 9.0, 9.0)),  
    (5, Vectors.dense(1.1, 1.1, 0.1)),  
    (6, Vectors.dense(12, 14, 100)),  
    (6, Vectors.dense(1.1, 0.1, 0.2)),  
    (6, Vectors.dense(-2, -3, -4)),  
    (6, Vectors.dense(1.6, 0.6, 0.2))  
    )).toDF("id", "features")  
      
    // Trains a k-means model  
    val kmeans = new KMeans().setK(3).setMaxIter(20).setFeaturesCol("features").setPredictionCol("prediction")  
    val model = kmeans.fit(dataset)  
      
    // Shows the result  
    println("Final Centers: ")  
    model.clusterCenters.foreach(println)  
    model.clusterCenters.zipWithIndex.foreach(println)  
      
    val myres = model.transform(dataset).select("features","prediction")  
    myres.show()</span>
聚类算法是一类无监督机器学习算法,聚类效果怎么评估,模型训练参数怎么调优,是否能用管道来训练模型来比较各种不同组合的参数的效果,即网格搜索法(grid search),先设置好待测试的参数,MLLib就会自动完成这些参数的不同组合,管道搭建了一条工作流,一次性完成了整个模型的调优,而不是独立对每个参数进行调优,这个还要再确认一下,查看SPARK-14516好像目前还没有一个聚类效果通用的自动的度量方法
像这种代码(不过现在这个代码有问题):

<span style="font-size:18px;">import org.apache.spark.ml.clustering.KMeans  
    import org.apache.spark.mllib.linalg.Vectors  
    import org.apache.spark.ml.tuning.{ ParamGridBuilder, CrossValidator }  
    import org.apache.spark.ml.{ Pipeline, PipelineStage }  
      
    val dataset = sqlContext.createDataFrame(Seq(  
    (1, Vectors.dense(0.0, 0.0, 0.0)),  
    (2, Vectors.dense(0.1, 0.1, 0.1)),  
    (3, Vectors.dense(0.2, 0.2, 0.2)),  
    (4, Vectors.dense(9.0, 9.0, 9.0)),  
    (5, Vectors.dense(1.1, 1.1, 0.1)),  
    (6, Vectors.dense(12, 14, 100)),  
    (6, Vectors.dense(1.1, 0.1, 0.2)),  
    (6, Vectors.dense(-2, -3, -4)),  
    (6, Vectors.dense(1.6, 0.6, 0.2))  
    )).toDF("id", "features")  
      
    val kmeans = new KMeans().setK(2).setMaxIter(20).setFeaturesCol("features").setPredictionCol("prediction")  
    //主要问题在这里,没有可用的评估器与label列设置  
    val evaluator = new BinaryClassificationEvaluator().setLabelCol("prediction")  
    val paramGrid = new ParamGridBuilder().addGrid(kmeans.initMode, Array("random")).addGrid(kmeans.k, Array(3, 4)).addGrid(kmeans.maxIter, Array(20, 60)).addGrid(kmeans.seed, Array(1L, 2L)).build()  
    val steps: Array[PipelineStage] = Array(kmeans)  
    val pipeline = new Pipeline().setStages(steps)  
      
    val cv = new CrossValidator().setEstimator(pipeline).setEvaluator(evaluator).setEstimatorParamMaps(paramGrid).setNumFolds(10)  
    // Trains a model  
    val pipelineFittedModel = cv.fit(dataset)</span>


数据分析咨询请扫描二维码

客服在线
立即咨询