更多关于kubernetes的深入文章,请看我csdn或者oschina的博客主页。
本文主要描述Clair架构、编译、部署、源码分析等内容。
Clair主要包括以下模块:
-endpoint配置clair部署的主机IP
通过docker-compose部署clair的yaml文件内容如下:
version: '2'
services:
postgresql:
image: /libary/postgres:0.1
restart: always
ports:
- 5432:5432
volumes:
- /docker/postgresql/data:/var/lib/postgresql/data
clair:
image: libary/clair:0.2
depends_on:
- postgresql
ports:
- 6060:6060
- 6061:6061
environment:
- POSTGRESQL_HOST=postgresql
Clair内部各个模块之间的关系如下:
以Rest API请求为入口,相关模块的流程大致如下:
下面将具体进行入口和Post Layer接口的源码进行分析。
/cmd/clair/main.go
func main() {
...
// 加载配置文件
config, err := config.Load(*flagConfigPath)
...
// Enable CPU Profiling if specified
if *flagCPUProfilePath != "" {
defer stopCPUProfiling(startCPUProfiling(*flagCPUProfilePath))
}
// 启动clair
clair.Boot(config)
}
/clair.go
func Boot(config *config.Config) {
...
// 连接后端DB,默认配置为pgsql
db, err := database.Open(config.Database)
if err != nil {
log.Fatal(err)
}
defer db.Close()
// 启动Notifier服务,clair实现了webhook notifier
st.Begin()
go notifier.Run(config.Notifier, db, st)
// 启动clair的Rest API 服务
st.Begin()
go api.Run(config.API, &context.RouteContext{db, config.API}, st)
// 启动clair的健康检查端口
st.Begin()
go api.RunHealth(config.API, &context.RouteContext{db, config.API}, st)
// 启动定期的Updater服务,clair实现了fetcher updater
st.Begin()
go updater.Run(config.Updater, db, st)
...
}
从上面的Boot方法可见,clair在启动时启动了其主体服务有:
细心的你,可能发现,怎么没有启动Worker ?Worker其实只是Post Layer API的后端封装处理封装而已。下面就以Post Layer API请求为例,走读一下代码。
从上面的main方法分析可知,Boot方法会调用api.Run启动服务:
/api/api.go
func Run(config *config.APIConfig, ctx *context.RouteContext, st *utils.Stopper) {
...
srv := &graceful.Server{
Timeout: 0, // Already handled by our TimeOut middleware
NoSignalHandling: true, // We want to use our own Stopper
Server: &http.Server{
Addr: ":" + strconv.Itoa(config.Port),
TLSConfig: tlsConfig,
Handler: http.TimeoutHandler(newAPIHandler(ctx), config.Timeout, timeoutResponse),
},
}
listenAndServeWithStopper(srv, st, config.CertFile, config.KeyFile)
...
}
api.Run中调用api.newAPIHandler生成了一个API Handler来处理所有的API请求。 /api/router.go
func newAPIHandler(ctx *context.RouteContext) http.Handler {
router := make(router)
router["/v1"] = v1.NewRouter(ctx)
return router
}
所有的Router对应Handler配置在: /api/v1/router.go
// NewRouter creates an HTTP router for version 1 of the Clair API.
func NewRouter(ctx *context.RouteContext) *httprouter.Router {
router := httprouter.New()
// Layers
router.POST("/layers", context.HTTPHandler(postLayer, ctx))
router.GET("/layers/:layerName", context.HTTPHandler(getLayer, ctx))
router.DELETE("/layers/:layerName", context.HTTPHandler(deleteLayer, ctx))
// Namespaces
router.GET("/namespaces", context.HTTPHandler(getNamespaces, ctx))
// Vulnerabilities
router.GET("/namespaces/:namespaceName/vulnerabilities", context.HTTPHandler(getVulnerabilities, ctx))
router.POST("/namespaces/:namespaceName/vulnerabilities", context.HTTPHandler(postVulnerability, ctx))
router.GET("/namespaces/:namespaceName/vulnerabilities/:vulnerabilityName", context.HTTPHandler(getVulnerability, ctx))
router.PUT("/namespaces/:namespaceName/vulnerabilities/:vulnerabilityName", context.HTTPHandler(putVulnerability, ctx))
router.DELETE("/namespaces/:namespaceName/vulnerabilities/:vulnerabilityName", context.HTTPHandler(deleteVulnerability, ctx))
// Fixes
router.GET("/namespaces/:namespaceName/vulnerabilities/:vulnerabilityName/fixes", context.HTTPHandler(getFixes, ctx))
router.PUT("/namespaces/:namespaceName/vulnerabilities/:vulnerabilityName/fixes/:fixName", context.HTTPHandler(putFix, ctx))
router.DELETE("/namespaces/:namespaceName/vulnerabilities/:vulnerabilityName/fixes/:fixName", context.HTTPHandler(deleteFix, ctx))
// Notifications
router.GET("/notifications/:notificationName", context.HTTPHandler(getNotification, ctx))
router.DELETE("/notifications/:notificationName", context.HTTPHandler(deleteNotification, ctx))
// Metrics
router.GET("/metrics", context.HTTPHandler(getMetrics, ctx))
return router
}
可见,Post Layer API的Handler为: /api/v1/routes.go
func postLayer(w http.ResponseWriter, r *http.Request, p httprouter.Params, ctx *context.RouteContext) (string, int) {
...
err = worker.Process(ctx.Store, request.Layer.Format, request.Layer.Name, request.Layer.ParentName, request.Layer.Path, request.Layer.Headers)
...
}
流程交给了worker.Process进行处理: /worker/worker.go
func Process(datastore database.Datastore, imageFormat, name, parentName, path string, headers map[string]string) error {
...
// Check to see if the layer is already in the database.
layer, err := datastore.FindLayer(name, false, false)
if err != nil && err != cerrors.ErrNotFound {
return err
}
if err == cerrors.ErrNotFound {
// New layer case.
layer = database.Layer{Name: name, EngineVersion: Version}
// Retrieve the parent if it has one.
// We need to get it with its Features in order to diff them.
if parentName != "" {
parent, err := datastore.FindLayer(parentName, true, false)
if err != nil && err != cerrors.ErrNotFound {
return err
}
if err == cerrors.ErrNotFound {
log.Warningf("layer %s: the parent layer (%s) is unknown. it must be processed first", name,
parentName)
return ErrParentUnknown
}
layer.Parent = &parent
}
} else {
// The layer is already in the database, check if we need to update it.
if layer.EngineVersion >= Version {
log.Debugf(`layer %s: layer content has already been processed in the past with engine %d.
Current engine is %d. skipping analysis`, name, layer.EngineVersion, Version)
return nil
}
log.Debugf(`layer %s: layer content has been analyzed in the past with engine %d. Current
engine is %d. analyzing again`, name, layer.EngineVersion, Version)
}
// Analyze the content.
layer.Namespace, layer.Features, err = detectContent(imageFormat, name, path, headers, layer.Parent)
if err != nil {
return err
}
return datastore.InsertLayer(layer)
}
// detectContent downloads a layer's archive and extracts its Namespace and Features.
func detectContent(imageFormat, name, path string, headers map[string]string, parent *database.Layer) (namespace *database.Namespace, featureVersions []database.FeatureVersion, err error) {
// Detect Data
data, err := detectors.DetectData(imageFormat, path, headers, append(detectors.GetRequiredFilesFeatures(), detectors.GetRequiredFilesNamespace()...), maxFileSize)
...
// Detect namespace.
namespace = detectNamespace(name, data, parent)
// Detect features.
featureVersions, err = detectFeatureVersions(name, data, namespace, parent)
...
}
POST Layer API首先去DB中查询该layer的记录,如果存在并且该layer的Engine Version比DB中记录的大于等于3(目前最大的worker version),则表明已经detect过这个layer,则结束返回。否则就调用detector对data进行map数据封装,然后再根据这个map数据对features, namesapces分别进行扫描检测。
其中,对data的检测是关键的部分: /worker/detectors/data.go
func DetectData(format, path string, headers map[string]string, toExtract []string, maxFileSize int64) (data map[string][]byte, err error) {
...
if strings.HasPrefix(path, "http://") || strings.HasPrefix(path, "https://") {
// Create a new HTTP request object.
request, err := http.NewRequest("GET", path, nil)
...
layerReader = r.Body
} else {
layerReader, err = os.Open(path)
...
}
defer layerReader.Close()
for _, detector := range dataDetectors {
if detector.Supported(path, format) {
data, err = detector.Detect(layerReader, toExtract, maxFileSize)
...
}
}
...
}
detector检测data时,如果layer path是https/http开头的,则会调用GET请求将blog下载下来;否则就认为在本地,直接打开path定义的文件读取blob内容。 之后,根据该image的格式,调用对应的detector.Detect接口实现,完成对应的工作。目前支持aci和docker两种image格式。
/worker/detectors/data/docker/docker.go
func (detector *DockerDataDetector) Detect(layerReader io.ReadCloser, toExtract []string, maxFileSize int64) (map[string][]byte, error) {
return utils.SelectivelyExtractArchive(layerReader, "", toExtract, maxFileSize)
}
/worker/detectors/data/aci/aci.go
func (detector *ACIDataDetector) Detect(layerReader io.ReadCloser, toExtract []string, maxFileSize int64) (map[string][]byte, error) {
return utils.SelectivelyExtractArchive(layerReader, "rootfs/", toExtract, maxFileSize)
}
无论是docker还是aci格式的image,最终都是交给utils.SelectiveExtractArchive进行处理:
/utils/tar.go
// SelectivelyExtractArchive extracts the specified files and folders
// from targz data read from the given reader and store them in a map indexed by file paths
func SelectivelyExtractArchive(r io.Reader, prefix string, toExtract []string, maxFileSize int64) (map[string][]byte, error) {
data := make(map[string][]byte)
...
return data, nil
}
worker.detectNamespace和worker.detectFeatureVersions交给读者自行分析。