# 4.1 Gateway源码分析

### 依赖

查看一下`github.com/openfaas/faas/gateway`外部依赖的包

```go
$ godepq -from github.com/openfaas/faas/gateway                                    
Packages:
  ...
  github.com/openfaas/nats-queue-worker/handler
  github.com/openfaas/nats-queue-worker/nats
  ...
  github.com/nats-io/go-nats-streaming
  github.com/nats-io/go-nats
  ...
  github.com/gorilla/mux
  ...
  github.com/prometheus/client_golang
  github.com/prometheus/common/model
  github.com/prometheus/client_model/go
  ...

```

`github.com/gorilla/mux` 是执行http请求路由和分发的第三方扩展包。

`github.com/nats-io/go-nats-streaming` `github.com/nats-io/go-nats`  `github.com/openfaas/nats-queue-worker` 是函数异步调用所需要的包。

`github.com/prometheus/client_golang` 是prometheus的客户端。

### 项目结构

<https://github.com/openfaas/faas/tree/master/gateway>

从server.go的main函数入口，中我们可以看到，其实有如下几个模块：

* 基本的身份认证，服务与服务之间的验证
* 和函数相关的代理转发
  * List
  * Deploy
  * Delete
  * Update
  * Query
  * Info
  * Secret
* 异步函数启用NATS
* 设置Prometheus监控
* 设置AlertManager自动扩缩容

#### **基本的安全验证**

如果配置了开启基本安全验证，会从磁盘中读取密钥：

```go
var credentials *types.BasicAuthCredentials
​
if config.UseBasicAuth {
    var readErr error
    reader := types.ReadBasicAuthFromDisk{
        SecretMountPath: config.SecretMountPath,
    }
    credentials, readErr = reader.Read()
​
    if readErr != nil {
        log.Panicf(readErr.Error())
    }
}
```

如果credentials被赋值之后，就会对一些要加密的API handler进行一个修饰`decorateExternalAuth()`，配置了路由的所有的API都要被修饰，也就是访问Gateway API网关时的所有请求都要进行身份认证：

* Alert
* UpdateFunction
* DeleteFunction
* DeployFunction
* ListFunctions
* ScaleFunction
* QueryFunction
* InfoHandler
* AsyncReport
* SecretHandler
* LogProxyHandler

```go
if credentials != nil {
		faasHandlers.Alert =
			decorateExternalAuth(faasHandlers.Alert, config.UpstreamTimeout, config.AuthProxyURL, config.AuthProxyPassBody)
		faasHandlers.UpdateFunction =
			decorateExternalAuth(faasHandlers.UpdateFunction, config.UpstreamTimeout, config.AuthProxyURL, config.AuthProxyPassBody)
		faasHandlers.DeleteFunction =
			decorateExternalAuth(faasHandlers.DeleteFunction, config.UpstreamTimeout, config.AuthProxyURL, config.AuthProxyPassBody)
		faasHandlers.DeployFunction =
			decorateExternalAuth(faasHandlers.DeployFunction, config.UpstreamTimeout, config.AuthProxyURL, config.AuthProxyPassBody)
		faasHandlers.ListFunctions =
			decorateExternalAuth(faasHandlers.ListFunctions, config.UpstreamTimeout, config.AuthProxyURL, config.AuthProxyPassBody)
		faasHandlers.ScaleFunction =
			decorateExternalAuth(faasHandlers.ScaleFunction, config.UpstreamTimeout, config.AuthProxyURL, config.AuthProxyPassBody)
		faasHandlers.QueryFunction =
			decorateExternalAuth(faasHandlers.QueryFunction, config.UpstreamTimeout, config.AuthProxyURL, config.AuthProxyPassBody)
		faasHandlers.InfoHandler =
			decorateExternalAuth(faasHandlers.InfoHandler, config.UpstreamTimeout, config.AuthProxyURL, config.AuthProxyPassBody)
		faasHandlers.AsyncReport =
			decorateExternalAuth(faasHandlers.AsyncReport, config.UpstreamTimeout, config.AuthProxyURL, config.AuthProxyPassBody)
		faasHandlers.SecretHandler =
			decorateExternalAuth(faasHandlers.SecretHandler, config.UpstreamTimeout, config.AuthProxyURL, config.AuthProxyPassBody)
		faasHandlers.LogProxyHandler =
			decorateExternalAuth(faasHandlers.LogProxyHandler, config.UpstreamTimeout, config.AuthProxyURL, config.AuthProxyPassBody)
	}
```

`DecorateWithBasicAuth` 是原先的认证支持，现在支持外部的认证方式配置，变为了`decorateExternalAuth`。`MakeExternalAuthHandler` 也是一个路由中间件：

* 将请求头复制，将认证请求转发到提供外部认证的 `upstreamURL`&#x20;
* 认证过程有超时限制
* 如果外部认证成功，调用next方法继续进入下一个handler；验证失败返回，非200状态码也属于验证失败；

```go
// MakeExternalAuthHandler make an authentication proxy handler
func MakeExternalAuthHandler(next http.HandlerFunc, upstreamTimeout time.Duration, upstreamURL string, passBody bool) http.HandlerFunc {
	return func(w http.ResponseWriter, r *http.Request) {
		req, _ := http.NewRequest(http.MethodGet, upstreamURL, nil)

		copyHeaders(req.Header, &r.Header)

		deadlineContext, cancel := context.WithTimeout(
			context.Background(),
			upstreamTimeout)

		defer cancel()

		res, err := http.DefaultClient.Do(req.WithContext(deadlineContext))
		if err != nil {
			http.Error(w, err.Error(), http.StatusInternalServerError)
			log.Printf("ExternalAuthHandler: %s", err.Error())
			return
		}

		if res.Body != nil {
			defer res.Body.Close()
		}

		if res.StatusCode == http.StatusOK {
			next.ServeHTTP(w, r)
			return
		}

		copyHeaders(w.Header(), &res.Header)
		w.WriteHeader(res.StatusCode)

		if res.Body != nil {
			io.Copy(w, res.Body)
		}
	}
}
```

### **代理转发**

Gateway本身不做任何和部署发布函数的事情，它只是作为一个代理，把请求转发给相应的Provider去处理，所有的请求都要通过这个网关。

### **同步函数转发**

主要转发的API有：

* Proxy
* ListFunctions
* DeployFunction
* DeleteFunction
* UpdateFunction
* QueryFunction

```go
faasHandlers.Proxy = handlers.MakeForwardingProxyHandler(reverseProxy, functionNotifiers, functionURLResolver, functionURLTransformer, nil)

faasHandlers.ListFunctions = handlers.MakeForwardingProxyHandler(reverseProxy, forwardingNotifiers, urlResolver, nilURLTransformer, serviceAuthInjector)
faasHandlers.DeployFunction = handlers.MakeForwardingProxyHandler(reverseProxy, forwardingNotifiers, urlResolver, nilURLTransformer, serviceAuthInjector)
faasHandlers.DeleteFunction = handlers.MakeForwardingProxyHandler(reverseProxy, forwardingNotifiers, urlResolver, nilURLTransformer, serviceAuthInjector)
faasHandlers.UpdateFunction = handlers.MakeForwardingProxyHandler(reverseProxy, forwardingNotifiers, urlResolver, nilURLTransformer, serviceAuthInjector)
faasHandlers.QueryFunction = handlers.MakeForwardingProxyHandler(reverseProxy, forwardingNotifiers, urlResolver, nilURLTransformer, serviceAuthInjector)
faasHandlers.InfoHandler = handlers.MakeInfoHandler(handlers.MakeForwardingProxyHandler(reverseProxy, forwardingNotifiers, urlResolver, nilURLTransformer, serviceAuthInjector))
faasHandlers.SecretHandler = handlers.MakeForwardingProxyHandler(reverseProxy, forwardingNotifiers, urlResolver, nilURLTransformer, serviceAuthInjector)
```

MakeForwardingProxyHandler()有五个参数：

* `proxy`这是一个http的客户端，作者把这个客户端抽成一个类，然后使用该类的`NewHTTPClientReverseProxy`方法创建实例，这样就简化了代码，不用每次都得写一堆相同的配置。
* `notifiers`这个其实是要打印的日志，这里是一个HTTPNotifier的接口。而在这个`MakeForwardingProxyHandler`中其实有三个实现类，一个是`LoggingNotifier`，另两个是`PrometheusFunctionNotifier`、`PrometheusServiceNotifier`，分别用来打印和函数http请求相关的日志以及和Prometheus监控相关的日志。
* `baseURLResolver`这个就是Provider的url地址。
* `urlPathTransformer` (不理解?)
* `serviceAuthInjector` 做服务的认证

```go
// MakeForwardingProxyHandler create a handler which forwards HTTP requests
func MakeForwardingProxyHandler(proxy *types.HTTPClientReverseProxy,
	notifiers []HTTPNotifier,
	baseURLResolver BaseURLResolver,
	urlPathTransformer URLPathTransformer,
	serviceAuthInjector AuthInjector) http.HandlerFunc {

	writeRequestURI := false
	if _, exists := os.LookupEnv("write_request_uri"); exists {
		writeRequestURI = exists
	}

	return func(w http.ResponseWriter, r *http.Request) {
		baseURL := baseURLResolver.Resolve(r)
		originalURL := r.URL.String()

		requestURL := urlPathTransformer.Transform(r)

		start := time.Now()

		statusCode, err := forwardRequest(w, r, proxy.Client, baseURL, requestURL, proxy.Timeout, writeRequestURI, serviceAuthInjector)

		seconds := time.Since(start)
		if err != nil {
			log.Printf("error with upstream request to: %s, %s\n", requestURL, err.Error())
		}

		// defer func() {
		for _, notifier := range notifiers {
			notifier.Notify(r.Method, requestURL, originalURL, statusCode, seconds)
		}
		// }()

	}
}
```

### **异步函数转发**

前面说过，如果是异步函数，Gateway就作为一个发布者，将函数放到队列里。`MakeQueuedProxy`方法就是做这件事儿的：

```go
// MakeQueuedProxy accepts work onto a queue
func MakeQueuedProxy(metrics metrics.MetricOptions, wildcard bool, canQueueRequests queue.CanQueueRequests, pathTransformer URLPathTransformer) http.HandlerFunc {
	return func(w http.ResponseWriter, r *http.Request) {
		if r.Body != nil {
			defer r.Body.Close()
		}

		body, err := ioutil.ReadAll(r.Body)

		if err != nil {
			w.WriteHeader(http.StatusBadRequest)

			w.Write([]byte(err.Error()))
			return
		}

		vars := mux.Vars(r)
		name := vars["name"]

		callbackURLHeader := r.Header.Get("X-Callback-Url")
		var callbackURL *url.URL

		if len(callbackURLHeader) > 0 {
			urlVal, urlErr := url.Parse(callbackURLHeader)
			if urlErr != nil {
				w.WriteHeader(http.StatusBadRequest)

				w.Write([]byte(urlErr.Error()))
				return
			}

			callbackURL = urlVal
		}

		req := &queue.Request{
			Function:    name,
			Body:        body,
			Method:      r.Method,
			QueryString: r.URL.RawQuery,
			Path:        pathTransformer.Transform(r),
			Header:      r.Header,
			Host:        r.Host,
			CallbackURL: callbackURL,
		}

		if err = canQueueRequests.Queue(req); err != nil {
			w.WriteHeader(http.StatusInternalServerError)
			w.Write([]byte(err.Error()))
			fmt.Println(err)
			return
		}

		w.WriteHeader(http.StatusAccepted)
	}
}
```

1. 读取请求体
2. 将`X-Callback-Url`参数从参数中http的header中读出来
3. 实例化用于异步处理的Request对象
4. 调用`canQueueRequests.Queue(req)`，将请求发布到队列中

### **自动伸缩**

伸缩性其实有两种，一种是可以通过调用API接口，来将函数进行缩放。另外一种就是通过AlertManager。

自动伸缩是OpenFaaS的一大特点，触发自动伸缩主要是根据不同的指标需求。

* 根据每秒请求数QPS来做伸缩\
  OpenFaaS附带了一个自动伸缩的规则，这个规则是在AlertManager配置文件中定义。AlertManager从Prometheus中读取使用情况（每秒请求数），然后在满足一定条件时向Gateway发送警报。\
  可以通过删除AlertManager，或者将部署扩展的环境变量设置为0，来禁用此方式。
* 最小/最大副本数\
  通过向函数添加标签, 可以在部署时设置最小 (初始) 和最大副本数。<br>
  * `com.openfaas.scale.min` 默认是 `1`
  * `com.openfaas.scale.max` 默认是 `20`
  * `com.openfaas.scale.factor` 默认是 `20%` ，在0-100之间，这是每次扩容的时候，新增实例的百分比，若是100的话，会瞬间飙升到副本数的最大值。

`com.openfaas.scale.min` 和 `com.openfaas.scale.max`值一样的时候，可以关闭自动伸缩。\
`com.openfaas.scale.factor`是0时，也会关闭自动伸缩。

* 通过内存和CPU的使用量。\
  使用k8s内置的HPA，也可以触发AlertManager。

在当前版本的`faas-provider`项目中，可以看见扩缩容的路由转发。

```go
r.HandleFunc("/system/scale-function/{name:[-a-zA-Z_0-9]+}", handlers.ReplicaUpdater).Methods("POST")
```

### **处理AlertManager的伸缩请求**

Prometheus将监控指标发给AlertManager之后，会触发AlterManager调用`/system/alert`接口，这个接口的handler是由`handlers.MakeAlertHandler`方法生成。

`MakeAlertHandler`方法接收的参数是`ServiceQuery`。`ServiceQuery`是一个接口，它有两个函数，用来get或者set最大的副本数。Gateway中实现这个接口的类是ExternalServiceQuery，这个实现类是在plugin包中，我们也可以直接定制这个实现类，用来实现满足特定条件。

```go
// ServiceQuery provides interface for replica querying/setting
type ServiceQuery interface {
	GetReplicas(service string) (response ServiceQueryResponse, err error)
	SetReplicas(service string, count uint64) error
}

// ExternalServiceQuery proxies service queries to external plugin via HTTP
type ExternalServiceQuery struct {
	URL          url.URL
	ProxyClient  http.Client
	AuthInjector handlers.AuthInjector
}
```

有一个`NewExternalServiceQuery`方法，这个方法也是一个工厂方法，用来创建`ServiceQuery`实例。

* `GetReplicas`方法\
  从`system/function/:name`接口获取到函数的信息，组装一个`ServiceQueryResponse`对象即可。
* `SetReplicas`方法\
  调用`system/scale-function/:name`接口，设置副本数。

`MakeAlertHandler`的函数主要是从`http.Request`中读取body，然后反序列化成`PrometheusAlert`对象请求：

```
// PrometheusAlert as produced by AlertManager
type PrometheusAlert struct {
   Status   string                 `json:"status"`
   Receiver string                 `json:"receiver"`
   Alerts   []PrometheusInnerAlert `json:"alerts"`
}
```

可以发现，这个Alerts是一个数组对象，所以可以是对多个函数进行缩放。反序列化之后，调用`handleAlerts`方法，而`handleAlerts`对Alerts进行遍历，针对每个Alerts调用了`scaleService`方法。`scaleService`才是真正处理伸缩服务的函数。

```go
func scaleService(alert requests.PrometheusInnerAlert, service scaling.ServiceQuery) error {
	var err error
	serviceName := alert.Labels.FunctionName

	if len(serviceName) > 0 {
		queryResponse, getErr := service.GetReplicas(serviceName)
		if getErr == nil {
			status := alert.Status

			newReplicas := CalculateReplicas(status, queryResponse.Replicas, uint64(queryResponse.MaxReplicas), queryResponse.MinReplicas, queryResponse.ScalingFactor)

			log.Printf("[Scale] function=%s %d => %d.\n", serviceName, queryResponse.Replicas, newReplicas)
			if newReplicas == queryResponse.Replicas {
				return nil
			}

			updateErr := service.SetReplicas(serviceName, newReplicas)
			if updateErr != nil {
				err = updateErr
			}
		}
	}
	return err
}
```

从代码总就可以看到，scaleService做了三件事儿：

* 获取现在的副本数
* 计算新的副本数\
  新副本数的计算方法是根据`com.openfaas.scale.factor`计算步长：<br>

  ```
  step := uint64(math.Ceil(float64(maxReplicas) / 100 * float64(scalingFactor)))
  ```
* 设置为新的副本数

### **从0增加副本到最小值**

我们在调用函数的时候，用的路由是：`/function/:name`。如果环境变量里有配置`scale_from_zero`为true，先用`MakeScalingHandler()`方法对proxyHandler进行一次包装。

`MakeScalingHandler`接受参数主要是：

* `next`：就是下一个`httpHandlerFunc`，中间件都会有这样一个参数
* `config`：`ScalingConfig`的对象：

```go
// ScalingConfig for scaling behaviours
type ScalingConfig struct {
    MaxPollCount uint // 查到的最大数量
    FunctionPollInterval time.Duration // 函数调用时间间隔
    CacheExpiry time.Duration // 缓存过期时间
    ServiceQuery ServiceQuery // 外部服务调用的一个接口
}
```

这个`MakeScalingHandler`中间件主要做了如下的事情：

* 先从FunctionCache缓存中获取该函数的基本信息，从这个缓存可以拿到每个函数的副本数量。
* 为了加快函数的启动速度，如果缓存中可以获该得函数，且函数的副本数大于0，满足条件，return即可。
* 如果不满足上一步，就会调用`SetReplicas`方法设置副本数，并更新FunctionCache的缓存。

```go
/ MakeScalingHandler creates handler which can scale a function from
// zero to N replica(s). After scaling the next http.HandlerFunc will
// be called. If the function is not ready after the configured
// amount of attempts / queries then next will not be invoked and a status
// will be returned to the client.
func MakeScalingHandler(next http.HandlerFunc, config scaling.ScalingConfig) http.HandlerFunc {

	scaler := scaling.NewFunctionScaler(config)

	return func(w http.ResponseWriter, r *http.Request) {

		functionName := getServiceName(r.URL.String())
		res := scaler.Scale(functionName)

		if !res.Found {
			errStr := fmt.Sprintf("error finding function %s: %s", functionName, res.Error.Error())
			log.Printf("Scaling: %s", errStr)

			w.WriteHeader(http.StatusNotFound)
			w.Write([]byte(errStr))
			return
		}

		if res.Error != nil {
			errStr := fmt.Sprintf("error finding function %s: %s", functionName, res.Error.Error())
			log.Printf("Scaling: %s", errStr)

			w.WriteHeader(http.StatusInternalServerError)
			w.Write([]byte(errStr))
			return
		}

		if res.Available {
			next.ServeHTTP(w, r)
			return
		}

		log.Printf("[Scale] function=%s 0=>N timed-out after %f seconds", functionName, res.Duration.Seconds())
	}
}
```

### **监控**

监控是一个定时任务，开启了一个新协程，利用go的ticker.C的间隔不停的去调用`/system/functions`接口。反序列化到MetricOptions对象中。

```go
// StartServiceWatcher starts a ticker and collects service replica counts to expose to prometheus
func (e *Exporter) StartServiceWatcher(endpointURL url.URL, metricsOptions MetricOptions, label string, interval time.Duration) {
	ticker := time.NewTicker(interval)
	quit := make(chan struct{})

	timeout := 3 * time.Second

	proxyClient := http.Client{
		Transport: &http.Transport{
			Proxy: http.ProxyFromEnvironment,
			DialContext: (&net.Dialer{
				Timeout:   timeout,
				KeepAlive: 0,
			}).DialContext,
			MaxIdleConns:          1,
			DisableKeepAlives:     true,
			IdleConnTimeout:       120 * time.Millisecond,
			ExpectContinueTimeout: 1500 * time.Millisecond,
		},
	}

	go func() {
		for {
			select {
			case <-ticker.C:

				get, _ := http.NewRequest(http.MethodGet, endpointURL.String()+"system/functions", nil)
				if e.credentials != nil {
					get.SetBasicAuth(e.credentials.User, e.credentials.Password)
				}

				services := []requests.Function{}
				res, err := proxyClient.Do(get)
				if err != nil {
					log.Println(err)
					continue
				}
				bytesOut, readErr := ioutil.ReadAll(res.Body)
				if readErr != nil {
					log.Println(err)
					continue
				}
				unmarshalErr := json.Unmarshal(bytesOut, &services)
				if unmarshalErr != nil {
					log.Println(err)
					continue
				}

				e.services = services

				break
			case <-quit:
				return
			}
		}
	}()
}
```

Gateway是OpenFaaS最为重要的一个组件。回过头看整个项目的结构，Gateway就是一个rest转发服务，一个一个的handler，每个模块之间的耦合性不是很高，可以很容易的去拆卸，自定义实现相应的模块。


---

# Agent Instructions: Querying This Documentation

If you need additional information that is not directly available in this page, you can query the documentation dynamically by asking a question.

Perform an HTTP GET request on the current page URL with the `ask` query parameter:

```
GET https://jahentao.gitbook.io/openfass/4.1-gateway-yuan-ma-fen-xi.md?ask=<question>
```

The question should be specific, self-contained, and written in natural language.
The response will contain a direct answer to the question and relevant excerpts and sources from the documentation.

Use this mechanism when the answer is not explicitly present in the current page, you need clarification or additional context, or you want to retrieve related documentation sections.
