4.1 Gateway源码分析

依赖

查看一下github.com/openfaas/faas/gateway外部依赖的包

$ godepq -from github.com/openfaas/faas/gateway                                    
Packages:
  ...
  github.com/openfaas/nats-queue-worker/handler
  github.com/openfaas/nats-queue-worker/nats
  ...
  github.com/nats-io/go-nats-streaming
  github.com/nats-io/go-nats
  ...
  github.com/gorilla/mux
  ...
  github.com/prometheus/client_golang
  github.com/prometheus/common/model
  github.com/prometheus/client_model/go
  ...

github.com/gorilla/mux 是执行http请求路由和分发的第三方扩展包。

github.com/nats-io/go-nats-streaming github.com/nats-io/go-nats github.com/openfaas/nats-queue-worker 是函数异步调用所需要的包。

github.com/prometheus/client_golang 是prometheus的客户端。

项目结构

https://github.com/openfaas/faas/tree/master/gateway

从server.go的main函数入口,中我们可以看到,其实有如下几个模块:

  • 基本的身份认证,服务与服务之间的验证

  • 和函数相关的代理转发

    • List

    • Deploy

    • Delete

    • Update

    • Query

    • Info

    • Secret

  • 异步函数启用NATS

  • 设置Prometheus监控

  • 设置AlertManager自动扩缩容

基本的安全验证

如果配置了开启基本安全验证,会从磁盘中读取密钥:

var credentials *types.BasicAuthCredentials

if config.UseBasicAuth {
    var readErr error
    reader := types.ReadBasicAuthFromDisk{
        SecretMountPath: config.SecretMountPath,
    }
    credentials, readErr = reader.Read()

    if readErr != nil {
        log.Panicf(readErr.Error())
    }
}

如果credentials被赋值之后,就会对一些要加密的API handler进行一个修饰decorateExternalAuth(),配置了路由的所有的API都要被修饰,也就是访问Gateway API网关时的所有请求都要进行身份认证:

  • Alert

  • UpdateFunction

  • DeleteFunction

  • DeployFunction

  • ListFunctions

  • ScaleFunction

  • QueryFunction

  • InfoHandler

  • AsyncReport

  • SecretHandler

  • LogProxyHandler

if credentials != nil {
		faasHandlers.Alert =
			decorateExternalAuth(faasHandlers.Alert, config.UpstreamTimeout, config.AuthProxyURL, config.AuthProxyPassBody)
		faasHandlers.UpdateFunction =
			decorateExternalAuth(faasHandlers.UpdateFunction, config.UpstreamTimeout, config.AuthProxyURL, config.AuthProxyPassBody)
		faasHandlers.DeleteFunction =
			decorateExternalAuth(faasHandlers.DeleteFunction, config.UpstreamTimeout, config.AuthProxyURL, config.AuthProxyPassBody)
		faasHandlers.DeployFunction =
			decorateExternalAuth(faasHandlers.DeployFunction, config.UpstreamTimeout, config.AuthProxyURL, config.AuthProxyPassBody)
		faasHandlers.ListFunctions =
			decorateExternalAuth(faasHandlers.ListFunctions, config.UpstreamTimeout, config.AuthProxyURL, config.AuthProxyPassBody)
		faasHandlers.ScaleFunction =
			decorateExternalAuth(faasHandlers.ScaleFunction, config.UpstreamTimeout, config.AuthProxyURL, config.AuthProxyPassBody)
		faasHandlers.QueryFunction =
			decorateExternalAuth(faasHandlers.QueryFunction, config.UpstreamTimeout, config.AuthProxyURL, config.AuthProxyPassBody)
		faasHandlers.InfoHandler =
			decorateExternalAuth(faasHandlers.InfoHandler, config.UpstreamTimeout, config.AuthProxyURL, config.AuthProxyPassBody)
		faasHandlers.AsyncReport =
			decorateExternalAuth(faasHandlers.AsyncReport, config.UpstreamTimeout, config.AuthProxyURL, config.AuthProxyPassBody)
		faasHandlers.SecretHandler =
			decorateExternalAuth(faasHandlers.SecretHandler, config.UpstreamTimeout, config.AuthProxyURL, config.AuthProxyPassBody)
		faasHandlers.LogProxyHandler =
			decorateExternalAuth(faasHandlers.LogProxyHandler, config.UpstreamTimeout, config.AuthProxyURL, config.AuthProxyPassBody)
	}

DecorateWithBasicAuth 是原先的认证支持,现在支持外部的认证方式配置,变为了decorateExternalAuthMakeExternalAuthHandler 也是一个路由中间件:

  • 将请求头复制,将认证请求转发到提供外部认证的 upstreamURL

  • 认证过程有超时限制

  • 如果外部认证成功,调用next方法继续进入下一个handler;验证失败返回,非200状态码也属于验证失败;

// MakeExternalAuthHandler make an authentication proxy handler
func MakeExternalAuthHandler(next http.HandlerFunc, upstreamTimeout time.Duration, upstreamURL string, passBody bool) http.HandlerFunc {
	return func(w http.ResponseWriter, r *http.Request) {
		req, _ := http.NewRequest(http.MethodGet, upstreamURL, nil)

		copyHeaders(req.Header, &r.Header)

		deadlineContext, cancel := context.WithTimeout(
			context.Background(),
			upstreamTimeout)

		defer cancel()

		res, err := http.DefaultClient.Do(req.WithContext(deadlineContext))
		if err != nil {
			http.Error(w, err.Error(), http.StatusInternalServerError)
			log.Printf("ExternalAuthHandler: %s", err.Error())
			return
		}

		if res.Body != nil {
			defer res.Body.Close()
		}

		if res.StatusCode == http.StatusOK {
			next.ServeHTTP(w, r)
			return
		}

		copyHeaders(w.Header(), &res.Header)
		w.WriteHeader(res.StatusCode)

		if res.Body != nil {
			io.Copy(w, res.Body)
		}
	}
}

代理转发

Gateway本身不做任何和部署发布函数的事情,它只是作为一个代理,把请求转发给相应的Provider去处理,所有的请求都要通过这个网关。

同步函数转发

主要转发的API有:

  • Proxy

  • ListFunctions

  • DeployFunction

  • DeleteFunction

  • UpdateFunction

  • QueryFunction

faasHandlers.Proxy = handlers.MakeForwardingProxyHandler(reverseProxy, functionNotifiers, functionURLResolver, functionURLTransformer, nil)

faasHandlers.ListFunctions = handlers.MakeForwardingProxyHandler(reverseProxy, forwardingNotifiers, urlResolver, nilURLTransformer, serviceAuthInjector)
faasHandlers.DeployFunction = handlers.MakeForwardingProxyHandler(reverseProxy, forwardingNotifiers, urlResolver, nilURLTransformer, serviceAuthInjector)
faasHandlers.DeleteFunction = handlers.MakeForwardingProxyHandler(reverseProxy, forwardingNotifiers, urlResolver, nilURLTransformer, serviceAuthInjector)
faasHandlers.UpdateFunction = handlers.MakeForwardingProxyHandler(reverseProxy, forwardingNotifiers, urlResolver, nilURLTransformer, serviceAuthInjector)
faasHandlers.QueryFunction = handlers.MakeForwardingProxyHandler(reverseProxy, forwardingNotifiers, urlResolver, nilURLTransformer, serviceAuthInjector)
faasHandlers.InfoHandler = handlers.MakeInfoHandler(handlers.MakeForwardingProxyHandler(reverseProxy, forwardingNotifiers, urlResolver, nilURLTransformer, serviceAuthInjector))
faasHandlers.SecretHandler = handlers.MakeForwardingProxyHandler(reverseProxy, forwardingNotifiers, urlResolver, nilURLTransformer, serviceAuthInjector)

MakeForwardingProxyHandler()有五个参数:

  • proxy这是一个http的客户端,作者把这个客户端抽成一个类,然后使用该类的NewHTTPClientReverseProxy方法创建实例,这样就简化了代码,不用每次都得写一堆相同的配置。

  • notifiers这个其实是要打印的日志,这里是一个HTTPNotifier的接口。而在这个MakeForwardingProxyHandler中其实有三个实现类,一个是LoggingNotifier,另两个是PrometheusFunctionNotifierPrometheusServiceNotifier,分别用来打印和函数http请求相关的日志以及和Prometheus监控相关的日志。

  • baseURLResolver这个就是Provider的url地址。

  • urlPathTransformer (不理解?)

  • serviceAuthInjector 做服务的认证

// MakeForwardingProxyHandler create a handler which forwards HTTP requests
func MakeForwardingProxyHandler(proxy *types.HTTPClientReverseProxy,
	notifiers []HTTPNotifier,
	baseURLResolver BaseURLResolver,
	urlPathTransformer URLPathTransformer,
	serviceAuthInjector AuthInjector) http.HandlerFunc {

	writeRequestURI := false
	if _, exists := os.LookupEnv("write_request_uri"); exists {
		writeRequestURI = exists
	}

	return func(w http.ResponseWriter, r *http.Request) {
		baseURL := baseURLResolver.Resolve(r)
		originalURL := r.URL.String()

		requestURL := urlPathTransformer.Transform(r)

		start := time.Now()

		statusCode, err := forwardRequest(w, r, proxy.Client, baseURL, requestURL, proxy.Timeout, writeRequestURI, serviceAuthInjector)

		seconds := time.Since(start)
		if err != nil {
			log.Printf("error with upstream request to: %s, %s\n", requestURL, err.Error())
		}

		// defer func() {
		for _, notifier := range notifiers {
			notifier.Notify(r.Method, requestURL, originalURL, statusCode, seconds)
		}
		// }()

	}
}

异步函数转发

前面说过,如果是异步函数,Gateway就作为一个发布者,将函数放到队列里。MakeQueuedProxy方法就是做这件事儿的:

// MakeQueuedProxy accepts work onto a queue
func MakeQueuedProxy(metrics metrics.MetricOptions, wildcard bool, canQueueRequests queue.CanQueueRequests, pathTransformer URLPathTransformer) http.HandlerFunc {
	return func(w http.ResponseWriter, r *http.Request) {
		if r.Body != nil {
			defer r.Body.Close()
		}

		body, err := ioutil.ReadAll(r.Body)

		if err != nil {
			w.WriteHeader(http.StatusBadRequest)

			w.Write([]byte(err.Error()))
			return
		}

		vars := mux.Vars(r)
		name := vars["name"]

		callbackURLHeader := r.Header.Get("X-Callback-Url")
		var callbackURL *url.URL

		if len(callbackURLHeader) > 0 {
			urlVal, urlErr := url.Parse(callbackURLHeader)
			if urlErr != nil {
				w.WriteHeader(http.StatusBadRequest)

				w.Write([]byte(urlErr.Error()))
				return
			}

			callbackURL = urlVal
		}

		req := &queue.Request{
			Function:    name,
			Body:        body,
			Method:      r.Method,
			QueryString: r.URL.RawQuery,
			Path:        pathTransformer.Transform(r),
			Header:      r.Header,
			Host:        r.Host,
			CallbackURL: callbackURL,
		}

		if err = canQueueRequests.Queue(req); err != nil {
			w.WriteHeader(http.StatusInternalServerError)
			w.Write([]byte(err.Error()))
			fmt.Println(err)
			return
		}

		w.WriteHeader(http.StatusAccepted)
	}
}
  1. 读取请求体

  2. X-Callback-Url参数从参数中http的header中读出来

  3. 实例化用于异步处理的Request对象

  4. 调用canQueueRequests.Queue(req),将请求发布到队列中

自动伸缩

伸缩性其实有两种,一种是可以通过调用API接口,来将函数进行缩放。另外一种就是通过AlertManager。

自动伸缩是OpenFaaS的一大特点,触发自动伸缩主要是根据不同的指标需求。

  • 根据每秒请求数QPS来做伸缩 OpenFaaS附带了一个自动伸缩的规则,这个规则是在AlertManager配置文件中定义。AlertManager从Prometheus中读取使用情况(每秒请求数),然后在满足一定条件时向Gateway发送警报。 可以通过删除AlertManager,或者将部署扩展的环境变量设置为0,来禁用此方式。

  • 最小/最大副本数 通过向函数添加标签, 可以在部署时设置最小 (初始) 和最大副本数。

    • com.openfaas.scale.min 默认是 1

    • com.openfaas.scale.max 默认是 20

    • com.openfaas.scale.factor 默认是 20% ,在0-100之间,这是每次扩容的时候,新增实例的百分比,若是100的话,会瞬间飙升到副本数的最大值。

com.openfaas.scale.mincom.openfaas.scale.max值一样的时候,可以关闭自动伸缩。 com.openfaas.scale.factor是0时,也会关闭自动伸缩。

  • 通过内存和CPU的使用量。 使用k8s内置的HPA,也可以触发AlertManager。

在当前版本的faas-provider项目中,可以看见扩缩容的路由转发。

r.HandleFunc("/system/scale-function/{name:[-a-zA-Z_0-9]+}", handlers.ReplicaUpdater).Methods("POST")

处理AlertManager的伸缩请求

Prometheus将监控指标发给AlertManager之后,会触发AlterManager调用/system/alert接口,这个接口的handler是由handlers.MakeAlertHandler方法生成。

MakeAlertHandler方法接收的参数是ServiceQueryServiceQuery是一个接口,它有两个函数,用来get或者set最大的副本数。Gateway中实现这个接口的类是ExternalServiceQuery,这个实现类是在plugin包中,我们也可以直接定制这个实现类,用来实现满足特定条件。

// ServiceQuery provides interface for replica querying/setting
type ServiceQuery interface {
	GetReplicas(service string) (response ServiceQueryResponse, err error)
	SetReplicas(service string, count uint64) error
}

// ExternalServiceQuery proxies service queries to external plugin via HTTP
type ExternalServiceQuery struct {
	URL          url.URL
	ProxyClient  http.Client
	AuthInjector handlers.AuthInjector
}

有一个NewExternalServiceQuery方法,这个方法也是一个工厂方法,用来创建ServiceQuery实例。

  • GetReplicas方法 从system/function/:name接口获取到函数的信息,组装一个ServiceQueryResponse对象即可。

  • SetReplicas方法 调用system/scale-function/:name接口,设置副本数。

MakeAlertHandler的函数主要是从http.Request中读取body,然后反序列化成PrometheusAlert对象请求:

// PrometheusAlert as produced by AlertManager
type PrometheusAlert struct {
   Status   string                 `json:"status"`
   Receiver string                 `json:"receiver"`
   Alerts   []PrometheusInnerAlert `json:"alerts"`
}

可以发现,这个Alerts是一个数组对象,所以可以是对多个函数进行缩放。反序列化之后,调用handleAlerts方法,而handleAlerts对Alerts进行遍历,针对每个Alerts调用了scaleService方法。scaleService才是真正处理伸缩服务的函数。

func scaleService(alert requests.PrometheusInnerAlert, service scaling.ServiceQuery) error {
	var err error
	serviceName := alert.Labels.FunctionName

	if len(serviceName) > 0 {
		queryResponse, getErr := service.GetReplicas(serviceName)
		if getErr == nil {
			status := alert.Status

			newReplicas := CalculateReplicas(status, queryResponse.Replicas, uint64(queryResponse.MaxReplicas), queryResponse.MinReplicas, queryResponse.ScalingFactor)

			log.Printf("[Scale] function=%s %d => %d.\n", serviceName, queryResponse.Replicas, newReplicas)
			if newReplicas == queryResponse.Replicas {
				return nil
			}

			updateErr := service.SetReplicas(serviceName, newReplicas)
			if updateErr != nil {
				err = updateErr
			}
		}
	}
	return err
}

从代码总就可以看到,scaleService做了三件事儿:

  • 获取现在的副本数

  • 计算新的副本数 新副本数的计算方法是根据com.openfaas.scale.factor计算步长:

    step := uint64(math.Ceil(float64(maxReplicas) / 100 * float64(scalingFactor)))
  • 设置为新的副本数

从0增加副本到最小值

我们在调用函数的时候,用的路由是:/function/:name。如果环境变量里有配置scale_from_zero为true,先用MakeScalingHandler()方法对proxyHandler进行一次包装。

MakeScalingHandler接受参数主要是:

  • next:就是下一个httpHandlerFunc,中间件都会有这样一个参数

  • configScalingConfig的对象:

// ScalingConfig for scaling behaviours
type ScalingConfig struct {
    MaxPollCount uint // 查到的最大数量
    FunctionPollInterval time.Duration // 函数调用时间间隔
    CacheExpiry time.Duration // 缓存过期时间
    ServiceQuery ServiceQuery // 外部服务调用的一个接口
}

这个MakeScalingHandler中间件主要做了如下的事情:

  • 先从FunctionCache缓存中获取该函数的基本信息,从这个缓存可以拿到每个函数的副本数量。

  • 为了加快函数的启动速度,如果缓存中可以获该得函数,且函数的副本数大于0,满足条件,return即可。

  • 如果不满足上一步,就会调用SetReplicas方法设置副本数,并更新FunctionCache的缓存。

/ MakeScalingHandler creates handler which can scale a function from
// zero to N replica(s). After scaling the next http.HandlerFunc will
// be called. If the function is not ready after the configured
// amount of attempts / queries then next will not be invoked and a status
// will be returned to the client.
func MakeScalingHandler(next http.HandlerFunc, config scaling.ScalingConfig) http.HandlerFunc {

	scaler := scaling.NewFunctionScaler(config)

	return func(w http.ResponseWriter, r *http.Request) {

		functionName := getServiceName(r.URL.String())
		res := scaler.Scale(functionName)

		if !res.Found {
			errStr := fmt.Sprintf("error finding function %s: %s", functionName, res.Error.Error())
			log.Printf("Scaling: %s", errStr)

			w.WriteHeader(http.StatusNotFound)
			w.Write([]byte(errStr))
			return
		}

		if res.Error != nil {
			errStr := fmt.Sprintf("error finding function %s: %s", functionName, res.Error.Error())
			log.Printf("Scaling: %s", errStr)

			w.WriteHeader(http.StatusInternalServerError)
			w.Write([]byte(errStr))
			return
		}

		if res.Available {
			next.ServeHTTP(w, r)
			return
		}

		log.Printf("[Scale] function=%s 0=>N timed-out after %f seconds", functionName, res.Duration.Seconds())
	}
}

监控

监控是一个定时任务,开启了一个新协程,利用go的ticker.C的间隔不停的去调用/system/functions接口。反序列化到MetricOptions对象中。

// StartServiceWatcher starts a ticker and collects service replica counts to expose to prometheus
func (e *Exporter) StartServiceWatcher(endpointURL url.URL, metricsOptions MetricOptions, label string, interval time.Duration) {
	ticker := time.NewTicker(interval)
	quit := make(chan struct{})

	timeout := 3 * time.Second

	proxyClient := http.Client{
		Transport: &http.Transport{
			Proxy: http.ProxyFromEnvironment,
			DialContext: (&net.Dialer{
				Timeout:   timeout,
				KeepAlive: 0,
			}).DialContext,
			MaxIdleConns:          1,
			DisableKeepAlives:     true,
			IdleConnTimeout:       120 * time.Millisecond,
			ExpectContinueTimeout: 1500 * time.Millisecond,
		},
	}

	go func() {
		for {
			select {
			case <-ticker.C:

				get, _ := http.NewRequest(http.MethodGet, endpointURL.String()+"system/functions", nil)
				if e.credentials != nil {
					get.SetBasicAuth(e.credentials.User, e.credentials.Password)
				}

				services := []requests.Function{}
				res, err := proxyClient.Do(get)
				if err != nil {
					log.Println(err)
					continue
				}
				bytesOut, readErr := ioutil.ReadAll(res.Body)
				if readErr != nil {
					log.Println(err)
					continue
				}
				unmarshalErr := json.Unmarshal(bytesOut, &services)
				if unmarshalErr != nil {
					log.Println(err)
					continue
				}

				e.services = services

				break
			case <-quit:
				return
			}
		}
	}()
}

Gateway是OpenFaaS最为重要的一个组件。回过头看整个项目的结构,Gateway就是一个rest转发服务,一个一个的handler,每个模块之间的耦合性不是很高,可以很容易的去拆卸,自定义实现相应的模块。

Last updated