Knative 驾驭篇:带你 '纵横驰骋' Knative 自动扩缩容实现

Knative 中提供了自动扩缩容灵活的实现机制,本文从 三横两纵 的维度带你深入了解 KPA 自动扩缩容的实现机制。让你轻松驾驭 Knative 自动扩缩容。

注:本文基于最新 Knative v0.11.0 版本代码解读

KPA 实现流程图

在 Knative 中,创建一个 Revision 会相应的创建 PodAutoScaler 资源。在KPA中通过操作 PodAutoScaler 资源,对当前的 Revision 中的 POD 进行扩缩容。

针对上面的流程实现,我们从三横两纵的维度进行剖析其实现机制。

三横

  • KPA 控制器
  • 根据指标定时计算 POD 数
  • 指标采集

KPA 控制器

通过Revision 创建PodAutoScaler, 在 KPA 控制器中主要包括两个资源(Decider 和 Metric)和一个操作(Scale)。主要代码如下


func (c *Reconciler) reconcile(ctx context.Context, pa *pav1alpha1.PodAutoscaler) error {
    ......
    decider, err := c.reconcileDecider(ctx, pa, pa.Status.MetricsServiceName)
    if err != nil {
        return fmt.Errorf("error reconciling Decider: %w", err)
    }

    if err := c.ReconcileMetric(ctx, pa, pa.Status.MetricsServiceName); err != nil {
        return fmt.Errorf("error reconciling Metric: %w", err)
    }

    // Metrics services are no longer needed as we use the private services now.
    if err := c.DeleteMetricsServices(ctx, pa); err != nil {
        return err
    }

    // Get the appropriate current scale from the metric, and right size
    // the scaleTargetRef based on it.
    want, err := c.scaler.Scale(ctx, pa, sks, decider.Status.DesiredScale)
    if err != nil {
        return fmt.Errorf("error scaling target: %w", err)
    }
......
}

这里先介绍一下两个资源:

  • Decider : 扩缩容决策的资源,通过Decider获取扩缩容POD数: DesiredScale。
  • Metric:采集指标的资源,通过Metric会采集当前Revision下的POD指标。

再看一下Scale操作,在Scale方法中,根据扩缩容POD数、最小实例数和最大实例数确定最终需要扩容的POD实例数,然后修改deployment的Replicas值,最终实现POD的扩缩容, 代码实现如下:


// Scale attempts to scale the given PA's target reference to the desired scale.
func (ks *scaler) Scale(ctx context.Context, pa *pav1alpha1.PodAutoscaler, sks *nv1a1.ServerlessService, desiredScale int32) (int32, error) {
......
    min, max := pa.ScaleBounds()
    if newScale := applyBounds(min, max, desiredScale); newScale != desiredScale {
        logger.Debugf("Adjusting desiredScale to meet the min and max bounds before applying: %d -> %d", desiredScale, newScale)
        desiredScale = newScale
    }

    desiredScale, shouldApplyScale := ks.handleScaleToZero(ctx, pa, sks, desiredScale)
    if !shouldApplyScale {
        return desiredScale, nil
    }

    ps, err := resources.GetScaleResource(pa.Namespace, pa.Spec.ScaleTargetRef, ks.psInformerFactory)
    if err != nil {
        return desiredScale, fmt.Errorf("failed to get scale target %v: %w", pa.Spec.ScaleTargetRef, err)
    }

    currentScale := int32(1)
    if ps.Spec.Replicas != nil {
        currentScale = *ps.Spec.Replicas
    }
    if desiredScale == currentScale {
        return desiredScale, nil
    }

    logger.Infof("Scaling from %d to %d", currentScale, desiredScale)
    return ks.applyScale(ctx, pa, desiredScale, ps)
}

根据指标定时计算 POD 数

这是一个关于Decider的故事。Decider创建之后会同时创建出来一个定时器,该定时器默认每隔 2 秒(可以通过TickInterval 参数配置)会调用Scale方法,该Scale方法实现如下:

func (a *Autoscaler) Scale(ctx context.Context, now time.Time) (desiredPodCount int32, excessBC int32, validScale bool) {
    ......
    metricName := spec.ScalingMetric
    var observedStableValue, observedPanicValue float64
    switch spec.ScalingMetric {
    case autoscaling.RPS:
        observedStableValue, observedPanicValue, err = a.metricClient.StableAndPanicRPS(metricKey, now)
        a.reporter.ReportStableRPS(observedStableValue)
        a.reporter.ReportPanicRPS(observedPanicValue)
        a.reporter.ReportTargetRPS(spec.TargetValue)
    default:
        metricName = autoscaling.Concurrency // concurrency is used by default
        observedStableValue, observedPanicValue, err = a.metricClient.StableAndPanicConcurrency(metricKey, now)
        a.reporter.ReportStableRequestConcurrency(observedStableValue)
        a.reporter.ReportPanicRequestConcurrency(observedPanicValue)
        a.reporter.ReportTargetRequestConcurrency(spec.TargetValue)
    }

    // Put the scaling metric to logs.
    logger = logger.With(zap.String("metric", metricName))

    if err != nil {
        if err == ErrNoData {
            logger.Debug("No data to scale on yet")
        } else {
            logger.Errorw("Failed to obtain metrics", zap.Error(err))
        }
        return 0, 0, false
    }

    // Make sure we don't get stuck with the same number of pods, if the scale up rate
    // is too conservative and MaxScaleUp*RPC==RPC, so this permits us to grow at least by a single
    // pod if we need to scale up.
    // E.g. MSUR=1.1, OCC=3, RPC=2, TV=1 => OCC/TV=3, MSU=2.2 => DSPC=2, while we definitely, need
    // 3 pods. See the unit test for this scenario in action.
    maxScaleUp := math.Ceil(spec.MaxScaleUpRate * readyPodsCount)
    // Same logic, opposite math applies here.
    maxScaleDown := math.Floor(readyPodsCount / spec.MaxScaleDownRate)

    dspc := math.Ceil(observedStableValue / spec.TargetValue)
    dppc := math.Ceil(observedPanicValue / spec.TargetValue)
    logger.Debugf("DesiredStablePodCount = %0.3f, DesiredPanicPodCount = %0.3f, MaxScaleUp = %0.3f, MaxScaleDown = %0.3f",
        dspc, dppc, maxScaleUp, maxScaleDown)

    // We want to keep desired pod count in the  [maxScaleDown, maxScaleUp] range.
    desiredStablePodCount := int32(math.Min(math.Max(dspc, maxScaleDown), maxScaleUp))
    desiredPanicPodCount := int32(math.Min(math.Max(dppc, maxScaleDown), maxScaleUp))
......
    return desiredPodCount, excessBC, true
}

该方法主要是从 MetricCollector 中获取指标信息,根据指标信息计算出需要扩缩的POD数。然后设置在 Decider 中。另外当 Decider 中 POD 期望值发生变化时会触发 PodAutoscaler 重新调和的操作,关键代码如下:

......
if runner.updateLatestScale(desiredScale, excessBC) {
        m.Inform(metricKey)
    }
......    

在KPA controller中设置调和Watch操作:

......
    // Have the Deciders enqueue the PAs whose decisions have changed.
    deciders.Watch(impl.EnqueueKey)
......    

指标采集

通过两种方式收集POD指标:

  • PUSH 收集指标:通过暴露指标接口,外部服务(如Activitor)可以调用该接口推送 metric 信息
  • PULL 收集指标:通过调用 Queue Proxy 服务接口收集指标。

PUSH 收集指标实现比较简单,在main.go中 暴露服务,将接收到的 metric 推送到 MetricCollector 中:

// Set up a statserver.
    statsServer := statserver.New(statsServerAddr, statsCh, logger)
....
go func() {
        for sm := range statsCh {
            collector.Record(sm.Key, sm.Stat)
            multiScaler.Poke(sm.Key, sm.Stat)
        }
    }()

PULL 收集指标是如何收集的呢? 还记得上面提到的Metric资源吧,这里接收到Metric资源又会创建出一个定时器,这个定时器每隔 1 秒会访问 queue-proxy 9090 端口采集指标信息。关键代码如下:

// newCollection creates a new collection, which uses the given scraper to
// collect stats every scrapeTickInterval.
func newCollection(metric *av1alpha1.Metric, scraper StatsScraper, logger *zap.SugaredLogger) *collection {
    c := &collection{
        metric:             metric,
        concurrencyBuckets: aggregation.NewTimedFloat64Buckets(BucketSize),
        rpsBuckets:         aggregation.NewTimedFloat64Buckets(BucketSize),
        scraper:            scraper,

        stopCh: make(chan struct{}),
    }

    logger = logger.Named("collector").With(
        zap.String(logkey.Key, fmt.Sprintf("%s/%s", metric.Namespace, metric.Name)))

    c.grp.Add(1)
    go func() {
        defer c.grp.Done()

        scrapeTicker := time.NewTicker(scrapeTickInterval)
        for {
            select {
            case <-c.stopCh:
                scrapeTicker.Stop()
                return
            case <-scrapeTicker.C:
                stat, err := c.getScraper().Scrape()
                if err != nil {
                    copy := metric.DeepCopy()
                    switch {
                    case err == ErrFailedGetEndpoints:
                        copy.Status.MarkMetricNotReady("NoEndpoints", ErrFailedGetEndpoints.Error())
                    case err == ErrDidNotReceiveStat:
                        copy.Status.MarkMetricFailed("DidNotReceiveStat", ErrDidNotReceiveStat.Error())
                    default:
                        copy.Status.MarkMetricNotReady("CreateOrUpdateFailed", "Collector has failed.")
                    }
                    logger.Errorw("Failed to scrape metrics", zap.Error(err))
                    c.updateMetric(copy)
                }
                if stat != emptyStat {
                    c.record(stat)
                }
            }
        }
    }()
    return c
}

两纵

  • 0-1 扩容
  • 1-N 扩缩容

上面从KPA实现的 3个横向角度进行了分析,KPA 实现了0-1扩容以及1-N 扩缩容,下面我们从这两个纵向的角度进一步分析。

我们知道,在 Knative 中,流量通过两种模式到达POD: Serve 模式和 Proxy 模式。

Proxy 模式: POD数为 0 时(另外针对突发流量的场景也会切换到 Proxy 模式,这里先不做详细解读),切换到 Proxy 模式。

Serve 模式:POD数不为 0 时,切换成 Serve 模式。

那么在什么时候进行模式的切换呢?在KPA中的代码实现如下:

mode := nv1alpha1.SKSOperationModeServe
    // We put activator in the serving path in the following cases:
    // 1\. The revision is scaled to 0:
    //   a. want == 0
    //   b. want == -1 && PA is inactive (Autoscaler has no previous knowledge of
    //            this revision, e.g. after a restart) but PA status is inactive (it was
    //            already scaled to 0).
    // 2\. The excess burst capacity is negative.
    if want == 0 || decider.Status.ExcessBurstCapacity < 0 || want == -1 && pa.Status.IsInactive() {
        logger.Infof("SKS should be in proxy mode: want = %d, ebc = %d, PA Inactive? = %v",
            want, decider.Status.ExcessBurstCapacity, pa.Status.IsInactive())
        mode = nv1alpha1.SKSOperationModeProxy
    }

0-1 扩容

第一步:指标采集

在POD数为0时,流量请求模式为Proxy 模式。这时候流量是通过 Activitor 接管的,在 Activitor 中,会根据请求数的指标信息,通过WebSockt调用 KPA中提供的指标接口,将指标信息发送给 KPA 中的 MetricCollector。

在 Activitor 中 main 函数中,访问 KPA 服务 代码实现如下

    // Open a WebSocket connection to the autoscaler.
    autoscalerEndpoint := fmt.Sprintf("ws://%s.%s.svc.%s%s", "autoscaler", system.Namespace(), pkgnet.GetClusterDomainName(), autoscalerPort)
    logger.Info("Connecting to Autoscaler at ", autoscalerEndpoint)
    statSink := websocket.NewDurableSendingConnection(autoscalerEndpoint, logger)
    go statReporter(statSink, ctx.Done(), statCh, logger)

通过 WebSockt 发送请求指标代码实现:

func statReporter(statSink *websocket.ManagedConnection, stopCh <-chan struct{},
    statChan <-chan []autoscaler.StatMessage, logger *zap.SugaredLogger) {
    for {
        select {
        case sm := <-statChan:
            go func() {
                for _, msg := range sm {
                    if err := statSink.Send(msg); err != nil {
                        logger.Errorw("Error while sending stat", zap.Error(err))
                    }
                }
            }()
        case <-stopCh:
            // It's a sending connection, so no drainage required.
            statSink.Shutdown()
            return
        }
    }
}

第二步:根据指标计算 POD 数

在 Scale 方法中,根据 PUSH 获取的指标信息,计算出期望的POD数。修改 Decider 期望 POD 值,触发 PodAutoScaler 重新调和。

第三步:扩容

在KPA controller中,重新执行 reconcile 方法,执行 scaler 对当前Revision进行扩容操作。然后将流量模式切换成 Server 模式。最终实现 0-1 的扩容操作。

1-N 扩缩容

第一步:指标采集

在 POD 数不为0时,流量请求模式为 Server 模式。这时候会通过PULL 的方式访问当前 revision 中所有 POD queue proxy 9090 端口,拉取业务指标信息, 访问服务 URL 代码实现如下:

...
func urlFromTarget(t, ns string) string {
    return fmt.Sprintf(
        "http://%s.%s:%d/metrics",
        t, ns, networking.AutoscalingQueueMetricsPort)
}

第二步:根据指标计算 POD 数

在 Scale 方法中,根据 PULL 获取的指标信息,计算出期望的POD数。修改 Decider 期望 POD 值,触发 PodAutoScaler 重新调和。

第三步: 扩缩容

在 KPA controller中,重新执行 reconcile 方法,执行 scaler 对当前Revision进行扩缩容操作。如果缩容为 0 或者触发突发流量场景,则将流量模式切换成 Proxy 模式。最终实现 1-N 扩缩容操作。

总结

相信通过上面的介绍,对Knative KPA的实现有了更深入的理解,了解了其实现原理不仅有助于我们排查相关的问题,更在于我们可以基于这样的扩缩容机制实现自定义的扩缩容组件,这也正是 Knative 自动扩缩容可扩展性灵魂所在。


本文作者:元毅

阅读原文

本文为阿里云内容,未经允许不得转载。

Image placeholder
alexlis
未设置
  66人点赞

没有讨论,发表一下自己的看法吧

推荐文章
老司机带你用 PHP 实现 Websocket 协议

我为什么会写这篇文章? 当初作为编程小白的我,刚刚从事后台工作,觉得http是个很牛逼的东西,然而后面随着自己深入学习并实践之后,觉得原来和我所想的天壤之别,没大家想象的那么复杂,仅仅是个协议嘛!。后

Knative 实战:基于 Kafka 实现消息推送

导读:当前在Knative中已经提供了对Kafka事件源的支持,那么如何基于Kafka实现消息推送呢?本文作者将以阿里云Kafka产品为例,给大家解锁这一新的姿势。作者| 元毅 阿里云智能事业群高级开

老司机带你深入理解 Laravel 之 Facade

前言 时间真的过的很快啊,今天都2019年12月2号了,准确的说,写这篇博客的时间是晚上21点40分,刚从公司加班回来,洗完澡就坐下来写这篇文章了,不知不觉除这篇博客外,我已经写了11篇了,要讲的东西

核心业务“瘦身”进行时!手把手带你搭建海量数据实时处理架构

01背景 在线交易服务平台目的是减轻核心系统计算压力和核心性能负荷压力,通过该平台可以将核心系统的交易数据实时捕获、实时计算加工、计算结果保存于SequoiaDB中。并能实时的为用户提供在线交易查询服

5分钟带你了解浪潮商用机器FP5466G2服务器

海量数据时代,传统的存储架构已经难以满足大规模高并发下系统稳定性,存储设备的弹性扩展和异构存储资源整合等诸多挑战。浪潮商用机器正是针对复杂而多样化的应用需求和大数据、人工智能等新兴应用场景,全新推出企

华为“鸿蒙”所涉及的微内核到底是什么?一文带你认识微内核

微内核最近微内核的概念常常被大家提及,同时还有GoogleFuchisa这样的微内核新星,这里让我们一起来认识下微内核吧。背景庞大的UNIX家族计算机技术在二战后快速发展,构成计算机的主要基本单元从电

阿里大佬带你,深入理解线程池底层原理

为什么要使用线程池在实际使用中,线程是很占用系统资源的,如果对线程管理不善很容易导致系统问题。因此,在大多数并发框架中都会使用线程池来管理线程,使用线程池管理线程主要有如下好处:(1)降低资源消耗。通

一文带你掌握常见的Pandas性能优化方法,让你的pandas飞起来!

微信公众号:「Python读财」如有问题或建议,请公众号留言Pandas是Python中用于数据处理与分析的屠龙刀,想必大家也都不陌生,但Pandas在使用上有一些技巧和需要注意的地方,尤其是对于较大

Python数据分析实战 | 爬遍拉勾网,带你看看数据分析师还吃香吗?

微信公众号:「Python读财」如有问题或建议,请公众号留言伴随着移动互联网的飞速发展,越来越多用户被互联网连接在一起,用户所积累下来的数据越来越多,市场对数据方面人才的需求也越来越大,由此也带火了如

[随笔] ios连接charles提示unknown

前提:ios移动端和charles处于同一局域网,且可以互相访问。charles配置点击Proxy->sslproxyingsettings->sslproxying.enablesslproxyin

我们做了大量工作,可自动化 UI 测试依旧实现不了

对开发者而言,测试的重要性不言而喻。在发布新功能前,开发者需要确保已有功能有效,这就需要将每个发布版本给到QA团队执行人工回归测试。然后,测试人员或QA团队花费数天时间执行脚本以寻找Bug。本文是S

css能实现自动更换图片吗

css能实现自动更换图片吗css能实现自动更换图片,具体的做法是为每一张图片创建一个animation动画,设置不同的duration持续时间,再通过keyframes实现不透明度的变化,来实现更换图

css如何实现文字颜色渐变?3种实现方法

基础样式:.gradient-text{ text-align:left; text-indent:30px; line-height:50px; font-size:40px; font-

自动驾驶思考:仿真系统构建

如何构建自动驾驶仿真系统? 仿真最主要的目的是:通过模拟真实环境和构建汽车模型,找出自动驾驶过程中可能出现的问题。 那么如何构建自动驾驶仿真系统呢?目前主流的实现方式是通过游戏引擎来模拟真实环境,通

巧用自动化测试组合拳保证产品质量

一、背景 我们的测试工作经历了以下四个阶段。第一阶段,产品需求评审完成,开发团队实现功能开发,然后草草提测,不写单元测试。测试人员进行人工测试,没有工具或系统做辅助,测试用例编写是在excel或脑图中

CentOS7 下使用 rsync+sersync 配置文件自动同步

为什么需要文件自动同步功能? 我们平时上传代码,可以通过ftp、sftp等将文件上传至服务器,耗时耗力,而且很容易出错。如果服务器数量少还好,一但服务器数量增加,压力可想而知。 这个时候我们可以使用各

你知道如何自动保存 Spring Boot 应用进程号吗

1.前言 欢迎阅读SpringBoot2实战系列文章PID对于系统运维来说并不陌生,但是对于一些开发者特别是新手还是要简单介绍一下的。它是ProcessID的简称,是系统分配给一个进程的唯一标识符,

手摸手教你搭建简单的 Git 的代码自动发布

1.为什么我要弄这个? emmmm,因为有个自己的项目每次发布到线上,都要登录一下服务器,然后pull一下代码,执行一些项目初始化的命令(诸如:gitsubmoudleupdate,phpartisa

Spring Boot自动装配整理

首先写一个我们自己的HelloWorld配置类 1、基于"注解驱动"实现@Enable模块 @ConfigurationpublicclassHelloWorldConfiguration{@Bean

beego 使用 coding 的 webhook 2.0 进行自动部署

beego使用coding的webhook2.0进行自动部署本文介绍beego在coding上如果使用webhook2.0进行自动部署。coding的webhook1.0教程coding平台端的设置这

甲骨文将自动化的安全特性植入云中!

在一年一度的OpenWorld大会上,甲骨文公司重点介绍了自治Linux、合作伙伴生态,以及在云计算领域的新功能扩展,包括云的可访问性,更严格的安全选项,虚拟基础设施层面的改进,向系统注入更多的智能化

浙江小学生戴上“金箍”:售价3千5的脑机接口头环,实时监测上课走神,自动报送老师家长

当最新“黑科技”被用在教育上,一件引起热议的怪事就发生了:小学生上课,要带上孙悟空的“金箍”了。这个硬件是一种脑机接口头环,可以检测脑电波,评判学生上课、写作业时是否集中了注意力,并给学生的集中注意力

欧洲最大笔融资,骗过软银!印度AI公司被曝造假,自动开发背后是真人码农

大数据文摘编辑部出品AI融资有泡沫,这大家都知道。但是,这泡沫能有多大呢?一家名叫Engineer.ai的明星AI初创公司刚刚刷新了这一纪录。这家以ai作为域名的公司由两名印度创始人创建,号称可以通过

可自动生成代码,5款基于AI的顶级开发工具

如今,对机器学习潜力感兴趣的程序员都在讨论,如何使用人工智能和基于人工智能的软件开发工具构建应用程序。例如PyTorch和TensorFlow之类的解决方案。除此之外,机器学习技术正以另一种有趣的方式

面向DevOps的企业自动化运维体系如何构建?

随着软件交付速度的加快,过去那种研发、测试、部署和运维各自为政的模式,已经无法满足用户需求。越来越多的企业希望通过更高效、更敏捷的方式,快速交付和部署相关应用。所以,DevOps顺势而生!那么,什么是