4.3 faas-nomad源码分析
faas-nomad
的分析按初始化、工作机制两部分。
初始化
初始化的过程要和faas-provider SDK的交互,联着分析。
组件依赖
身份认证
faas-provider定义的逻辑
具体Provider相关Handlers创建
faas-nomad通过makeDependencies
将需要的nomad、consul、statsd三个外部组件依赖的API客户端创建出来。
makeDependencies(
*statsdServer,
*nodeURI,
*nomadConfig,
*consulAddr,
*consulACL,
*nomadRegion,
)
调用faas-provider定义的初始化逻辑,将
bootstrap "github.com/openfaas/faas-provider"
...
bootstrap.Serve(handlers, config)
faas-provider的Serve
将路由与处理请求的Handlers绑定。faas-provider中规定好了这些FaaSHandlers
,这些Handlers在OpenFaaS API Gateway路由处也有设置,由网关转发给具体的Provider处理。
// FaaSHandlers provide handlers for OpenFaaS
type FaaSHandlers struct {
FunctionReader http.HandlerFunc
DeployHandler http.HandlerFunc
// FunctionProxy provides the function invocation proxy logic. Use proxy.NewHandlerFunc to
// use the standard OpenFaaS proxy implementation or provide completely custom proxy logic.
FunctionProxy http.HandlerFunc
DeleteHandler http.HandlerFunc
ReplicaReader http.HandlerFunc
ReplicaUpdater http.HandlerFunc
SecretHandler http.HandlerFunc
// LogHandler provides streaming json logs of functions
LogHandler http.HandlerFunc
// Optional: Update an existing function
UpdateHandler http.HandlerFunc
HealthHandler http.HandlerFunc
InfoHandler http.HandlerFunc
}
基于前述依赖的组件,开始createFaaSHandlers
handlers := createFaaSHandlers(nomadClient, consulResolver, stats, logger)
//createFaaSHandlers:
...
return &types.FaaSHandlers{
FunctionReader: handlers.MakeReader(nomadClient.Jobs(), logger, stats),
DeployHandler: handlers.MakeDeploy(nomadClient.Jobs(), *providerConfig, logger, stats),
DeleteHandler: handlers.MakeDelete(consulResolver, nomadClient.Jobs(), logger, stats),
ReplicaReader: makeReplicationReader(nomadClient.Jobs(), logger, stats),
ReplicaUpdater: makeReplicationUpdater(nomadClient.Jobs(), logger, stats),
FunctionProxy: makeFunctionProxyHandler(consulResolver, logger, stats, *functionTimeout),
UpdateHandler: handlers.MakeDeploy(nomadClient.Jobs(), *providerConfig, logger, stats),
InfoHandler: handlers.MakeInfo(logger, stats, version),
Health: handlers.MakeHealthHandler(),
SecretHandler: handlers.MakeSecretHandler(vs, logger.Named("secrets_handler")),
}
实例化这些Handlers,在handlers
包下。
这里面需要注意的是makeFunctionProxyHandler
。它创建了一个处理函数的中间件,这个中间件感觉就像切面的概念,在下一个next http.HandlerFunc
处理前验证了请求参数中函数名是否存在。
// MakeExtractFunctionMiddleWare returns a middleware handler which validates
// the presence of the function name in the URI query.
func MakeExtractFunctionMiddleWare(
getVars func(*http.Request) map[string]string,
next http.HandlerFunc) http.HandlerFunc {
return func(rw http.ResponseWriter, r *http.Request) {
vars := getVars(r)
functionName := vars["name"]
if functionName == "" {
rw.WriteHeader(http.StatusBadRequest)
fmt.Fprint(rw, fmt.Errorf("No function name"))
return
}
r = r.WithContext(context.WithValue(r.Context(), FunctionNameCTXKey, functionName))
next(rw, r)
}
}
handlers包中的Proxy
是一个http.Handler
,是真正用于调用下游函数的方法。具体来说,对faas-nomad Provider,下游函数是Nomad的job方式组织运行的,而且当前特定的是打包成docker镜像(其中有watchdog,来启动、监控forked process),以nomad的docker驱动启动job。
工作机制
上面说到,faas-nomad的main.go main
方法最后调用了faas-provider的Serve
方法,这是一个阻塞方法,默认在8080
端口,启动了server,接受Gateway路由到Provider的调用请求,具体地,处理这些请求:CRUD函数、调用函数、设置副本数……。
faas-provider的功能在于调用nomad、consul、vault、statsd等客户端API,在调用过程中,有些缓存优化,如缓存函数名等。
以delete函数为例,Handler定义:
// main.go:150
DeleteHandler: handlers.MakeDelete(consulResolver, nomadClient.Jobs(), logger, stats),
调用nomad client的job API,删除函数,再删除函数名的缓存
// handlers/delete.go:41
// Delete job /v1/jobs
_, _, err = client.Deregister(nomad.JobPrefix+req.FunctionName, false, nil)
// client 经过本地的封装 定义是 client nomad.Job
// func MakeDelete(sr consul.ServiceResolver, client nomad.Job, logger hclog.Logger, stats metrics.StatsD) http.HandlerFunc
...
// handlers/delete.go:51
sr.RemoveCacheItem(req.FunctionName)
正是MakeDelete
方法传入的,实例化只赋值了Register
、Info
、List
、Deregister
、Allocations
需要的方法。
// main.go:150
nomadClient.Jobs() -> client nomad.Job
nomad.Job 中 Job接口定义的方法与完全相同,只是抽出来faas-nomad中需要的接口,屏蔽了真实的nomad jobs
API的细节。这是Go语法的特点,刚看到的时候有点奇怪(我没有完整地系统学GO语法),要是Java的话得声明相同的接口吧,其中不用的方法,还占用内存;这种赋值,也是Go语言中“实现接口的结构体中所有方法,即是接口定义的类型”的运用吧。
type Job interface {
// Register creates a new Nomad job
Register(*api.Job, *api.WriteOptions) (*api.JobRegisterResponse, *api.WriteMeta, error)
Info(jobID string, q *api.QueryOptions) (*api.Job, *api.QueryMeta, error)
List(q *api.QueryOptions) ([]*api.JobListStub, *api.QueryMeta, error)
Deregister(jobID string, purge bool, q *api.WriteOptions) (string, *api.WriteMeta, error)
Allocations(jobID string, allAllocs bool, q *api.QueryOptions) ([]*api.AllocationListStub, *api.QueryMeta, error)
}
Last updated
Was this helpful?