依赖
查看一下github.com/openfaas/faas/gateway
外部依赖的包
Copy $ godepq - from github.com / openfaas / faas / gateway
Packages:
...
github.com / openfaas / nats - queue - worker / handler
github.com / openfaas / nats - queue - worker / nats
...
github.com / nats - io /go- nats - streaming
github.com / nats - io /go- nats
...
github.com / gorilla / mux
...
github.com / prometheus / client_golang
github.com / prometheus / common / model
github.com / prometheus / client_model /go
...
github.com/gorilla/mux
是执行http请求路由和分发的第三方扩展包。
github.com/nats-io/go-nats-streaming
github.com/nats-io/go-nats
github.com/openfaas/nats-queue-worker
是函数异步调用所需要的包。
github.com/prometheus/client_golang
是prometheus的客户端。
项目结构
https://github.com/openfaas/faas/tree/master/gateway
从server.go的main函数入口,中我们可以看到,其实有如下几个模块:
基本的安全验证
如果配置了开启基本安全验证,会从磁盘中读取密钥:
Copy var credentials *types.BasicAuthCredentials
if config.UseBasicAuth {
var readErr error
reader := types.ReadBasicAuthFromDisk{
SecretMountPath: config.SecretMountPath,
}
credentials, readErr = reader.Read()
if readErr != nil {
log.Panicf(readErr.Error())
}
}
如果credentials被赋值之后,就会对一些要加密的API handler进行一个修饰decorateExternalAuth()
,配置了路由的所有的API都要被修饰,也就是访问Gateway API网关时的所有请求都要进行身份认证:
Copy if credentials != nil {
faasHandlers.Alert =
decorateExternalAuth(faasHandlers.Alert, config.UpstreamTimeout, config.AuthProxyURL, config.AuthProxyPassBody)
faasHandlers.UpdateFunction =
decorateExternalAuth(faasHandlers.UpdateFunction, config.UpstreamTimeout, config.AuthProxyURL, config.AuthProxyPassBody)
faasHandlers.DeleteFunction =
decorateExternalAuth(faasHandlers.DeleteFunction, config.UpstreamTimeout, config.AuthProxyURL, config.AuthProxyPassBody)
faasHandlers.DeployFunction =
decorateExternalAuth(faasHandlers.DeployFunction, config.UpstreamTimeout, config.AuthProxyURL, config.AuthProxyPassBody)
faasHandlers.ListFunctions =
decorateExternalAuth(faasHandlers.ListFunctions, config.UpstreamTimeout, config.AuthProxyURL, config.AuthProxyPassBody)
faasHandlers.ScaleFunction =
decorateExternalAuth(faasHandlers.ScaleFunction, config.UpstreamTimeout, config.AuthProxyURL, config.AuthProxyPassBody)
faasHandlers.QueryFunction =
decorateExternalAuth(faasHandlers.QueryFunction, config.UpstreamTimeout, config.AuthProxyURL, config.AuthProxyPassBody)
faasHandlers.InfoHandler =
decorateExternalAuth(faasHandlers.InfoHandler, config.UpstreamTimeout, config.AuthProxyURL, config.AuthProxyPassBody)
faasHandlers.AsyncReport =
decorateExternalAuth(faasHandlers.AsyncReport, config.UpstreamTimeout, config.AuthProxyURL, config.AuthProxyPassBody)
faasHandlers.SecretHandler =
decorateExternalAuth(faasHandlers.SecretHandler, config.UpstreamTimeout, config.AuthProxyURL, config.AuthProxyPassBody)
faasHandlers.LogProxyHandler =
decorateExternalAuth(faasHandlers.LogProxyHandler, config.UpstreamTimeout, config.AuthProxyURL, config.AuthProxyPassBody)
}
DecorateWithBasicAuth
是原先的认证支持,现在支持外部的认证方式配置,变为了decorateExternalAuth
。MakeExternalAuthHandler
也是一个路由中间件:
将请求头复制,将认证请求转发到提供外部认证的 upstreamURL
如果外部认证成功,调用next方法继续进入下一个handler;验证失败返回,非200状态码也属于验证失败;
Copy // MakeExternalAuthHandler make an authentication proxy handler
func MakeExternalAuthHandler(next http.HandlerFunc, upstreamTimeout time.Duration, upstreamURL string, passBody bool) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
req, _ := http.NewRequest(http.MethodGet, upstreamURL, nil)
copyHeaders(req.Header, &r.Header)
deadlineContext, cancel := context.WithTimeout(
context.Background(),
upstreamTimeout)
defer cancel()
res, err := http.DefaultClient.Do(req.WithContext(deadlineContext))
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
log.Printf("ExternalAuthHandler: %s", err.Error())
return
}
if res.Body != nil {
defer res.Body.Close()
}
if res.StatusCode == http.StatusOK {
next.ServeHTTP(w, r)
return
}
copyHeaders(w.Header(), &res.Header)
w.WriteHeader(res.StatusCode)
if res.Body != nil {
io.Copy(w, res.Body)
}
}
}
代理转发
Gateway本身不做任何和部署发布函数的事情,它只是作为一个代理,把请求转发给相应的Provider去处理,所有的请求都要通过这个网关。
同步函数转发
主要转发的API有:
Copy faasHandlers.Proxy = handlers.MakeForwardingProxyHandler(reverseProxy, functionNotifiers, functionURLResolver, functionURLTransformer, nil)
faasHandlers.ListFunctions = handlers.MakeForwardingProxyHandler(reverseProxy, forwardingNotifiers, urlResolver, nilURLTransformer, serviceAuthInjector)
faasHandlers.DeployFunction = handlers.MakeForwardingProxyHandler(reverseProxy, forwardingNotifiers, urlResolver, nilURLTransformer, serviceAuthInjector)
faasHandlers.DeleteFunction = handlers.MakeForwardingProxyHandler(reverseProxy, forwardingNotifiers, urlResolver, nilURLTransformer, serviceAuthInjector)
faasHandlers.UpdateFunction = handlers.MakeForwardingProxyHandler(reverseProxy, forwardingNotifiers, urlResolver, nilURLTransformer, serviceAuthInjector)
faasHandlers.QueryFunction = handlers.MakeForwardingProxyHandler(reverseProxy, forwardingNotifiers, urlResolver, nilURLTransformer, serviceAuthInjector)
faasHandlers.InfoHandler = handlers.MakeInfoHandler(handlers.MakeForwardingProxyHandler(reverseProxy, forwardingNotifiers, urlResolver, nilURLTransformer, serviceAuthInjector))
faasHandlers.SecretHandler = handlers.MakeForwardingProxyHandler(reverseProxy, forwardingNotifiers, urlResolver, nilURLTransformer, serviceAuthInjector)
MakeForwardingProxyHandler()有五个参数:
proxy
这是一个http的客户端,作者把这个客户端抽成一个类,然后使用该类的NewHTTPClientReverseProxy
方法创建实例,这样就简化了代码,不用每次都得写一堆相同的配置。
notifiers
这个其实是要打印的日志,这里是一个HTTPNotifier的接口。而在这个MakeForwardingProxyHandler
中其实有三个实现类,一个是LoggingNotifier
,另两个是PrometheusFunctionNotifier
、PrometheusServiceNotifier
,分别用来打印和函数http请求相关的日志以及和Prometheus监控相关的日志。
baseURLResolver
这个就是Provider的url地址。
urlPathTransformer
(不理解?)
serviceAuthInjector
做服务的认证
Copy // MakeForwardingProxyHandler create a handler which forwards HTTP requests
func MakeForwardingProxyHandler(proxy *types.HTTPClientReverseProxy,
notifiers []HTTPNotifier,
baseURLResolver BaseURLResolver,
urlPathTransformer URLPathTransformer,
serviceAuthInjector AuthInjector) http.HandlerFunc {
writeRequestURI := false
if _, exists := os.LookupEnv("write_request_uri"); exists {
writeRequestURI = exists
}
return func(w http.ResponseWriter, r *http.Request) {
baseURL := baseURLResolver.Resolve(r)
originalURL := r.URL.String()
requestURL := urlPathTransformer.Transform(r)
start := time.Now()
statusCode, err := forwardRequest(w, r, proxy.Client, baseURL, requestURL, proxy.Timeout, writeRequestURI, serviceAuthInjector)
seconds := time.Since(start)
if err != nil {
log.Printf("error with upstream request to: %s, %s\n", requestURL, err.Error())
}
// defer func() {
for _, notifier := range notifiers {
notifier.Notify(r.Method, requestURL, originalURL, statusCode, seconds)
}
// }()
}
}
异步函数转发
前面说过,如果是异步函数,Gateway就作为一个发布者,将函数放到队列里。MakeQueuedProxy
方法就是做这件事儿的:
Copy // MakeQueuedProxy accepts work onto a queue
func MakeQueuedProxy(metrics metrics.MetricOptions, wildcard bool, canQueueRequests queue.CanQueueRequests, pathTransformer URLPathTransformer) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
if r.Body != nil {
defer r.Body.Close()
}
body, err := ioutil.ReadAll(r.Body)
if err != nil {
w.WriteHeader(http.StatusBadRequest)
w.Write([]byte(err.Error()))
return
}
vars := mux.Vars(r)
name := vars["name"]
callbackURLHeader := r.Header.Get("X-Callback-Url")
var callbackURL *url.URL
if len(callbackURLHeader) > 0 {
urlVal, urlErr := url.Parse(callbackURLHeader)
if urlErr != nil {
w.WriteHeader(http.StatusBadRequest)
w.Write([]byte(urlErr.Error()))
return
}
callbackURL = urlVal
}
req := &queue.Request{
Function: name,
Body: body,
Method: r.Method,
QueryString: r.URL.RawQuery,
Path: pathTransformer.Transform(r),
Header: r.Header,
Host: r.Host,
CallbackURL: callbackURL,
}
if err = canQueueRequests.Queue(req); err != nil {
w.WriteHeader(http.StatusInternalServerError)
w.Write([]byte(err.Error()))
fmt.Println(err)
return
}
w.WriteHeader(http.StatusAccepted)
}
}
将X-Callback-Url
参数从参数中http的header中读出来
调用canQueueRequests.Queue(req)
,将请求发布到队列中
自动伸缩
伸缩性其实有两种,一种是可以通过调用API接口,来将函数进行缩放。另外一种就是通过AlertManager。
自动伸缩是OpenFaaS的一大特点,触发自动伸缩主要是根据不同的指标需求。
根据每秒请求数QPS来做伸缩
OpenFaaS附带了一个自动伸缩的规则,这个规则是在AlertManager配置文件中定义。AlertManager从Prometheus中读取使用情况(每秒请求数),然后在满足一定条件时向Gateway发送警报。
可以通过删除AlertManager,或者将部署扩展的环境变量设置为0,来禁用此方式。
最小/最大副本数
通过向函数添加标签, 可以在部署时设置最小 (初始) 和最大副本数。
com.openfaas.scale.min
默认是 1
com.openfaas.scale.max
默认是 20
com.openfaas.scale.factor
默认是 20%
,在0-100之间,这是每次扩容的时候,新增实例的百分比,若是100的话,会瞬间飙升到副本数的最大值。
com.openfaas.scale.min
和 com.openfaas.scale.max
值一样的时候,可以关闭自动伸缩。
com.openfaas.scale.factor
是0时,也会关闭自动伸缩。
通过内存和CPU的使用量。
使用k8s内置的HPA,也可以触发AlertManager。
在当前版本的faas-provider
项目中,可以看见扩缩容的路由转发。
Copy r.HandleFunc("/system/scale-function/{name:[-a-zA-Z_0-9]+}", handlers.ReplicaUpdater).Methods("POST")
处理AlertManager的伸缩请求
Prometheus将监控指标发给AlertManager之后,会触发AlterManager调用/system/alert
接口,这个接口的handler是由handlers.MakeAlertHandler
方法生成。
MakeAlertHandler
方法接收的参数是ServiceQuery
。ServiceQuery
是一个接口,它有两个函数,用来get或者set最大的副本数。Gateway中实现这个接口的类是ExternalServiceQuery,这个实现类是在plugin包中,我们也可以直接定制这个实现类,用来实现满足特定条件。
Copy // ServiceQuery provides interface for replica querying/setting
type ServiceQuery interface {
GetReplicas(service string) (response ServiceQueryResponse, err error)
SetReplicas(service string, count uint64) error
}
// ExternalServiceQuery proxies service queries to external plugin via HTTP
type ExternalServiceQuery struct {
URL url.URL
ProxyClient http.Client
AuthInjector handlers.AuthInjector
}
有一个NewExternalServiceQuery
方法,这个方法也是一个工厂方法,用来创建ServiceQuery
实例。
GetReplicas
方法
从system/function/:name
接口获取到函数的信息,组装一个ServiceQueryResponse
对象即可。
SetReplicas
方法
调用system/scale-function/:name
接口,设置副本数。
MakeAlertHandler
的函数主要是从http.Request
中读取body,然后反序列化成PrometheusAlert
对象请求:
Copy // PrometheusAlert as produced by AlertManager
type PrometheusAlert struct {
Status string `json:"status"`
Receiver string `json:"receiver"`
Alerts []PrometheusInnerAlert `json:"alerts"`
}
可以发现,这个Alerts是一个数组对象,所以可以是对多个函数进行缩放。反序列化之后,调用handleAlerts
方法,而handleAlerts
对Alerts进行遍历,针对每个Alerts调用了scaleService
方法。scaleService
才是真正处理伸缩服务的函数。
Copy func scaleService(alert requests.PrometheusInnerAlert, service scaling.ServiceQuery) error {
var err error
serviceName := alert.Labels.FunctionName
if len(serviceName) > 0 {
queryResponse, getErr := service.GetReplicas(serviceName)
if getErr == nil {
status := alert.Status
newReplicas := CalculateReplicas(status, queryResponse.Replicas, uint64(queryResponse.MaxReplicas), queryResponse.MinReplicas, queryResponse.ScalingFactor)
log.Printf("[Scale] function=%s %d => %d.\n", serviceName, queryResponse.Replicas, newReplicas)
if newReplicas == queryResponse.Replicas {
return nil
}
updateErr := service.SetReplicas(serviceName, newReplicas)
if updateErr != nil {
err = updateErr
}
}
}
return err
}
从代码总就可以看到,scaleService做了三件事儿:
计算新的副本数
新副本数的计算方法是根据com.openfaas.scale.factor
计算步长:
Copy step := uint64(math.Ceil(float64(maxReplicas) / 100 * float64(scalingFactor)))
从0增加副本到最小值
我们在调用函数的时候,用的路由是:/function/:name
。如果环境变量里有配置scale_from_zero
为true,先用MakeScalingHandler()
方法对proxyHandler进行一次包装。
MakeScalingHandler
接受参数主要是:
next
:就是下一个httpHandlerFunc
,中间件都会有这样一个参数
Copy // ScalingConfig for scaling behaviours
type ScalingConfig struct {
MaxPollCount uint // 查到的最大数量
FunctionPollInterval time.Duration // 函数调用时间间隔
CacheExpiry time.Duration // 缓存过期时间
ServiceQuery ServiceQuery // 外部服务调用的一个接口
}
这个MakeScalingHandler
中间件主要做了如下的事情:
先从FunctionCache缓存中获取该函数的基本信息,从这个缓存可以拿到每个函数的副本数量。
为了加快函数的启动速度,如果缓存中可以获该得函数,且函数的副本数大于0,满足条件,return即可。
如果不满足上一步,就会调用SetReplicas
方法设置副本数,并更新FunctionCache的缓存。
Copy / MakeScalingHandler creates handler which can scale a function from
// zero to N replica(s). After scaling the next http.HandlerFunc will
// be called. If the function is not ready after the configured
// amount of attempts / queries then next will not be invoked and a status
// will be returned to the client.
func MakeScalingHandler(next http.HandlerFunc, config scaling.ScalingConfig) http.HandlerFunc {
scaler := scaling.NewFunctionScaler(config)
return func(w http.ResponseWriter, r *http.Request) {
functionName := getServiceName(r.URL.String())
res := scaler.Scale(functionName)
if !res.Found {
errStr := fmt.Sprintf("error finding function %s: %s", functionName, res.Error.Error())
log.Printf("Scaling: %s", errStr)
w.WriteHeader(http.StatusNotFound)
w.Write([]byte(errStr))
return
}
if res.Error != nil {
errStr := fmt.Sprintf("error finding function %s: %s", functionName, res.Error.Error())
log.Printf("Scaling: %s", errStr)
w.WriteHeader(http.StatusInternalServerError)
w.Write([]byte(errStr))
return
}
if res.Available {
next.ServeHTTP(w, r)
return
}
log.Printf("[Scale] function=%s 0=>N timed-out after %f seconds", functionName, res.Duration.Seconds())
}
}
监控
监控是一个定时任务,开启了一个新协程,利用go的ticker.C的间隔不停的去调用/system/functions
接口。反序列化到MetricOptions对象中。
Copy // StartServiceWatcher starts a ticker and collects service replica counts to expose to prometheus
func (e *Exporter) StartServiceWatcher(endpointURL url.URL, metricsOptions MetricOptions, label string, interval time.Duration) {
ticker := time.NewTicker(interval)
quit := make(chan struct{})
timeout := 3 * time.Second
proxyClient := http.Client{
Transport: &http.Transport{
Proxy: http.ProxyFromEnvironment,
DialContext: (&net.Dialer{
Timeout: timeout,
KeepAlive: 0,
}).DialContext,
MaxIdleConns: 1,
DisableKeepAlives: true,
IdleConnTimeout: 120 * time.Millisecond,
ExpectContinueTimeout: 1500 * time.Millisecond,
},
}
go func() {
for {
select {
case <-ticker.C:
get, _ := http.NewRequest(http.MethodGet, endpointURL.String()+"system/functions", nil)
if e.credentials != nil {
get.SetBasicAuth(e.credentials.User, e.credentials.Password)
}
services := []requests.Function{}
res, err := proxyClient.Do(get)
if err != nil {
log.Println(err)
continue
}
bytesOut, readErr := ioutil.ReadAll(res.Body)
if readErr != nil {
log.Println(err)
continue
}
unmarshalErr := json.Unmarshal(bytesOut, &services)
if unmarshalErr != nil {
log.Println(err)
continue
}
e.services = services
break
case <-quit:
return
}
}
}()
}
Gateway是OpenFaaS最为重要的一个组件。回过头看整个项目的结构,Gateway就是一个rest转发服务,一个一个的handler,每个模块之间的耦合性不是很高,可以很容易的去拆卸,自定义实现相应的模块。