大数据常用算法 Spark mlib(3)GradientDescent梯度下降算法之Spark实现
沉沙 2018-10-11 来源 : 阅读 1309 评论 0

摘要:本篇教程介绍了大数据常用算法 Spark mlib(3)GradientDescent梯度下降算法之Spark实现,希望阅读本篇文章以后大家有所收获,帮助大家对大数据云计算大数据采集的理解更加深入。

本篇教程介绍了大数据常用算法 Spark mlib(3)GradientDescent梯度下降算法之Spark实现,希望阅读本篇文章以后大家有所收获,帮助大家对大数据云计算大数据采集的理解更加深入。

<

1. 什么是梯度下降?
梯度下降法(英语:Gradient descent)是一个一阶最优化算法,通常也称为最速下降法。 要使用梯度下降法找到一个函数的局部极小值,必须向函数上当前点对应梯度(或者是近似梯度)的反方向的规定步长距离点进行迭代搜索。
先来看两个函数:
1.  拟合函数:为参数向量,h(θ)就是通过参数向量计算的值,n为参数的总个数,j代表的是一条记录里的一个参数

h(θ)=∑j=0nθjxj

2. 损失函数:


J(θ)=12m∑i=1m(hθ(x(i))−y(i))2


m为训练的集合数,i代表的是一条记录,hθ(xi)代表的是第i条的h(θ)
在监督学习模型中,需要对原始的模型构建损失函数J(θ),  接着就是最小化损失函数,用以求的最优参数θ
对损失函数θ进行求偏导,获取每个θ的梯度

∂J(θ)∂θ=−1m∑i=1m(yi−hθ(xi))xij


,
xij参数代表着x在第i行里的j列的值,算法如下


1. 我们定义一行diff = yi−hθ(xi).      2. 对一行记录里的每一列都乘以diff.    3. 然后计算下一行重复1,2步骤

最小化损失函数,需要按照梯度的负方向更新θ

θ‘j=θj+1m∑i=1m(yi−hθ(xi))xij)


这种被称为梯度下降
2. 梯度下降的几种方式
2.1 批量梯度下降(BGD)
在前面的方式,我们采样部分数据,就称为批量梯度下降
在公式:θ‘j=θj+1m∑i=1m(yi−hθ(xi))xij)

中我们会发现随着计算θ的梯度下降,需要计算所有的采样数据m,计算量会比较大。
2.2  随机梯度下降 (SGD)

在上面2.1的批量梯度下降,采样的是批量数据,那么随机采样一个数据,进行θ梯度下降,就被称为随机梯度下降。

损失函数:J(θ)=1m∑i=1m12(yi−hθ(xi))2

那么单样本的损失函数:m=1 的情况:

12(yi−hθ(xi))2

对单样本的损失函数进行求偏导,计算梯度下降

θ‘j=θj+(yi−hθ(xi))xij

为了控制梯度下降的速度,引入步长

αα

θ‘j=θj+α(yi−hθ(xi))xij


3. Spark 实现的梯度下降
在代码GradientDescent.scala中

    def runMiniBatchSGD(
          data: RDD[(Double, Vector)],
          gradient: Gradient,
          updater: Updater,
          stepSize: Double,
          numIterations: Int,
          regParam: Double,
          miniBatchFraction: Double,
          initialWeights: Vector,
          convergenceTol: Double): (Vector, Array[Double]) = {
     
        // convergenceTol should be set with non minibatch settings
        if (miniBatchFraction < 1.0 && convergenceTol > 0.0) {
          logWarning("Testing against a convergenceTol when using miniBatchFraction " +
            "< 1.0 can be unstable because of the stochasticity in sampling.")
        }
     
        if (numIterations * miniBatchFraction < 1.0) {
          logWarning("Not all examples will be used if numIterations * miniBatchFraction < 1.0: " +
            s"numIterations=$numIterations and miniBatchFraction=$miniBatchFraction")
        }
     
        val stochasticLossHistory = new ArrayBuffer[Double](numIterations)
        // Record previous weight and current one to calculate solution vector difference
     
        var previousWeights: Option[Vector] = None
        var currentWeights: Option[Vector] = None
     
        val numExamples = data.count()
     
        // if no data, return initial weights to avoid NaNs
        if (numExamples == 0) {
          logWarning("GradientDescent.runMiniBatchSGD returning initial weights, no data found")
          return (initialWeights, stochasticLossHistory.toArray)
        }
     
        if (numExamples * miniBatchFraction < 1) {
          logWarning("The miniBatchFraction is too small")
        }
     
        // Initialize weights as a column vector
        var weights = Vectors.dense(initialWeights.toArray)
        val n = weights.size
     
        /**
         * For the first iteration, the regVal will be initialized as sum of weight squares
         * if it's L2 updater; for L1 updater, the same logic is followed.
         */
        var regVal = updater.compute(
          weights, Vectors.zeros(weights.size), 0, 1, regParam)._2
     
        var converged = false // indicates whether converged based on convergenceTol
        var i = 1
        while (!converged && i <= numIterations) {
          val bcWeights = data.context.broadcast(weights)
          // Sample a subset (fraction miniBatchFraction) of the total data
          // compute and sum up the subgradients on this subset (this is one map-reduce)
          val (gradientSum, lossSum, miniBatchSize) = data.sample(false, miniBatchFraction, 42 + i)
            .treeAggregate((BDV.zeros[Double](n), 0.0, 0L))(
              seqOp = (c, v) => {
                // c: (grad, loss, count), v: (label, features)
                val l = gradient.compute(v._2, v._1, bcWeights.value, Vectors.fromBreeze(c._1))
                (c._1, c._2 + l, c._3 + 1)
              },
              combOp = (c1, c2) => {
                // c: (grad, loss, count)
                (c1._1 += c2._1, c1._2 + c2._2, c1._3 + c2._3)
              })
     
          if (miniBatchSize > 0) {
            /**
             * lossSum is computed using the weights from the previous iteration
             * and regVal is the regularization value computed in the previous iteration as well.
             */
            stochasticLossHistory += lossSum / miniBatchSize + regVal
            val update = updater.compute(
              weights, Vectors.fromBreeze(gradientSum / miniBatchSize.toDouble),
              stepSize, i, regParam)
            weights = update._1
            regVal = update._2
     
            previousWeights = currentWeights
            currentWeights = Some(weights)
            if (previousWeights != None && currentWeights != None) {
              converged = isConverged(previousWeights.get,
                currentWeights.get, convergenceTol)
            }
          } else {
            logWarning(s"Iteration ($i/$numIterations). The size of sampled batch is zero")
          }
          i += 1
        }
     
        logInfo("GradientDescent.runMiniBatchSGD finished. Last 10 stochastic losses %s".format(
          stochasticLossHistory.takeRight(10).mkString(", ")))
     
        (weights, stochasticLossHistory.toArray)
     
      }


3.1 随机梯度?
看函数名字叫做SGD,会以为是随机梯度下降,实际上Spark里实现的是随机批量的梯度下降
我们去看梯度下降的批量算法公式:



θ‘j=θj+1m∑i=1m(yi−hθ(xi))xij)


这个公式可以拆分成两部分

    计算数据的梯度
    根据梯度计算新的权重

3.2 计算梯度
在前面的章节里描述过随机和批量的主要区别就是在计算梯度上,随机采样只是随机采用单一样本,而批量采样如果采样所有数据,涉及到采样的样本、计算量大的问题,Spark采用了择中的策略,随机采样部分数据

    先随机采样部分数据

data.sample(false, miniBatchFraction, 42 + i)

    对部分数据样本进行聚合计算

    treeAggregate((BDV.zeros[Double](n), 0.0, 0L))(
              seqOp = (c, v) => {
                // c: (grad, loss, count), v: (label, features)
                val l = gradient.compute(v._2, v._1, bcWeights.value, Vectors.fromBreeze(c._1))
                (c._1, c._2 + l, c._3 + 1)
              },
              combOp = (c1, c2) => {
                // c: (grad, loss, count)
                (c1._1 += c2._1, c1._2 + c2._2, c1._3 + c2._3)
              })


使用treeAggregate,而没有使用Aggregate,是因为treeAggregate比aggregate更高效,combOp会在executor上执行
在聚合计算的seqOp里我们看到了gradient.compute来计算梯度

3.2.1 Spark 提供的计算梯度的方式

    LeastSquaresGradient 梯度,主要用于线型回归
    HingeGradient 梯度,用于SVM分类
    LogisticGradient 梯度,用于逻辑回归

前面章节里描述的就是基于线性回归模型的计算梯度的方式,也就是如下公式:


∑i=1m(yi−hθ(xi))xij)



代码如下:

    class LeastSquaresGradient extends Gradient {
      override def compute(data: Vector, label: Double, weights: Vector): (Vector, Double) = {
        val diff = dot(data, weights) - label
        val loss = diff * diff / 2.0
        val gradient = data.copy
        scal(diff, gradient)
        (gradient, loss)
      }
      override def compute(
          data: Vector,
          label: Double,
          weights: Vector,
          cumGradient: Vector): Double = {
        val diff = dot(data, weights) - label
        axpy(diff, data, cumGradient)
        diff * diff / 2.0
      }
    }


3.3 跟新权重theta θ
在梯度下降计算中,计算新的theta(也叫权重的更新),更新的算法由你采用的模型来决定

    val update = updater.compute(
              weights, Vectors.fromBreeze(gradientSum / miniBatchSize.toDouble),
              stepSize, i, regParam)



目前Spark默认提供了3种算法跟新theta

    SimpleUpdater  
    L1Updater   
    SquaredL2Updater

3.3.1 SimpleUpdater
以SimpleUpdater来说:

    class SimpleUpdater extends Updater {
      override def compute(
          weightsOld: Vector,
          gradient: Vector,
          stepSize: Double,
          iter: Int,
          regParam: Double): (Vector, Double) = {
        val thisIterStepSize = stepSize / math.sqrt(iter)
        val brzWeights: BV[Double] = weightsOld.asBreeze.toDenseVector
        brzAxpy(-thisIterStepSize, gradient.asBreeze, brzWeights)
        (Vectors.fromBreeze(brzWeights), 0)
      }
    }


也就是上面提到的公式:θ‘j=θj+α∗
相对来说simpleupdater算法比较简单,在这里没有使用正则参数regParam,只是使用了每个迭代的步长作为相同的因子,计算每一个theta,也就是权重。
迭代的步长=总步长/math.sqrt(迭代的次数)

关于正则化参数,在这篇博客里就不叙述了。

3.3.2 其它的正则参数化算法
L1Updater: 正则化算法

    和SimpleUpdater一样更新权重
    将正则化参数乘以迭代步长的到比较参数:shrinkage
    如果权重大于shrinkage,设置权重-shrinkage
    如果权重小于-shrinkage,设置权重+shrinkage
    其它的,设置权重为0

SquaredL2Updater:正则化算法

w' = w - thisIterStepSize * (gradient + regParam * w)

和SimpleUpdater比较,补偿了regParam*w ,这也是逻辑回归所采用的梯度下降算法的更新算法

4.  梯度下降收敛条件
如何判定梯度下降权重值收敛不在需要计算,通常会有两个约束条件

    迭代次数,当达到一定的迭代次数后,权重的值会被收敛到极值点,并且不会受到次数的影响
    筏值:当两次迭代的权重之间的差小于指定的筏值的时候,就认为已经收敛

在Spark里使用了L2范数来比较筏值

      private def isConverged(
          previousWeights: Vector,
          currentWeights: Vector,
          convergenceTol: Double): Boolean = {
        // To compare with convergence tolerance.
        val previousBDV = previousWeights.asBreeze.toDenseVector
        val currentBDV = currentWeights.asBreeze.toDenseVector
     
        // This represents the difference of updated weights in the iteration.
        val solutionVecDiff: Double = norm(previousBDV - currentBDV)
     
        solutionVecDiff < convergenceTol * Math.max(norm(currentBDV), 1.0)
      }


当前后权重的差的L2,小于筏值*当前权重的L2和1的最大值,就认为下降结束。    

本文由职坐标整理并发布,希望对同学们有所帮助。了解更多详情请关注职坐标大数据云计算大数据采集频道!

本文由 @沉沙 发布于职坐标。未经许可,禁止转载。
喜欢 | 0 不喜欢 | 0
看完这篇文章有何感觉?已经有0人表态,0%的人喜欢 快给朋友分享吧~
评论(0)
后参与评论

您输入的评论内容中包含违禁敏感词

我知道了

助您圆梦职场 匹配合适岗位
验证码手机号,获得海同独家IT培训资料
选择就业方向:
人工智能物联网
大数据开发/分析
人工智能Python
Java全栈开发
WEB前端+H5

请输入正确的手机号码

请输入正确的验证码

获取验证码

您今天的短信下发次数太多了,明天再试试吧!

提交

我们会在第一时间安排职业规划师联系您!

您也可以联系我们的职业规划师咨询:

小职老师的微信号:z_zhizuobiao
小职老师的微信号:z_zhizuobiao

版权所有 职坐标-一站式IT培训就业服务领导者 沪ICP备13042190号-4
上海海同信息科技有限公司 Copyright ©2015 www.zhizuobiao.com,All Rights Reserved.
 沪公网安备 31011502005948号    

©2015 www.zhizuobiao.com All Rights Reserved

208小时内训课程