ingress
主要逻辑
nginx controller 入口函数
// file:k8s.io/ingress-nginx/nginx/main.gofunc main() { // step1: 初始化日志组件 klog.InitFlags(nil) ...... // step2:创建必要的目录 err = file.CreateRequiredDirectories() ...... // step 3 :初始化ApiserverClient kubeClient, err := createApiserverClient(conf.APIServerHost, conf.RootCAFile, conf.KubeConfigFile) ...... // step4: 检查service配置 if len(conf.DefaultService) > 0 { err := checkService(conf.DefaultService, kubeClient) ...... klog.Infof('Validated %v as the default backend.', conf.DefaultService) } if len(conf.PublishService) > 0 { err := checkService(conf.PublishService, kubeClient) ...... } // step5:获取namespace if conf.Namespace != '' { _, err = kubeClient.CoreV1().Namespaces().Get(context.TODO(), conf.Namespace, metav1.GetOptions{}) if err != nil { klog.Fatalf('No namespace with name %v found: %v', conf.Namespace, err) } } // step6: 创建默认证书 conf.FakeCertificate = ssl.GetFakeSSLCert() klog.Infof('SSL fake certificate created %v', conf.FakeCertificate.PemFileName) // step7: 检查是否支持v1beta API 、k8s 版本是否高于1.18.0 k8s.IsNetworkingIngressAvailable, k8s.IsIngressV1Ready = k8s.NetworkingIngressAvailable(kubeClient) if !k8s.IsNetworkingIngressAvailable { klog.Warningf('Using deprecated 'k8s.io/api/extensions/v1beta1' package because Kubernetes version is < v1.14.0') } if k8s.IsIngressV1Ready { ...... } conf.Client = kubeClient // step8: 注册prometheus reg := prometheus.NewRegistry() reg.MustRegister(prometheus.NewGoCollector()) reg.MustRegister(prometheus.NewProcessCollector(prometheus.ProcessCollectorOpts{ PidFn: func() (int, error) { return os.Getpid(), nil }, ReportErrors: true, })) ...... // step9:启动profile if conf.EnableProfiling { go registerProfiler() } // step10: 实例化nginxcontroller (*) ngx := controller.NewNGINXController(conf, mc) // step11: 启动健康探测和metrics API mux := http.NewServeMux() registerHealthz(nginx.HealthPath, ngx, mux) registerMetrics(reg, mux) go startHTTPServer(conf.ListenPorts.Health, mux) // step12: 启动nginx master进程 go ngx.Start() ......}
nginx controller 初始化
// NewNGINXController creates a new NGINX Ingress controller.func NewNGINXController(config *Configuration, mc metric.Collector) *NGINXController { // 初始化 event broadcaster eventBroadcaster := record.NewBroadcaster() eventBroadcaster.StartLogging(klog.Infof) eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{ Interface: config.Client.CoreV1().Events(config.Namespace), }) // 获取/etc/resolv.conf 中的nameserver 列表 h, err := dns.GetSystemNameServers() if err != nil { klog.Warningf('Error reading system nameservers: %v', err) } // 实例化NGINXController n := &NGINXController{ isIPV6Enabled: ing_net.IsIPv6Enabled(), resolver: h, cfg: config, syncRateLimiter: flowcontrol.NewTokenBucketRateLimiter(config.SyncRateLimit, 1), recorder: eventBroadcaster.NewRecorder(scheme.Scheme, apiv1.EventSource{ Component: 'nginx-ingress-controller', }), stopCh: make(chan struct{}), updateCh: channels.NewRingChannel(1024), ngxErrCh: make(chan error), stopLock: &sync.Mutex{}, runningConfig: new(ingress.Configuration), Proxy: &TCPProxy{}, metricCollector: mc, command: NewNginxCommand(), } // 启动webhook 服务 if n.cfg.ValidationWebhook != '' { n.validationWebhookServer = &http.Server{ Addr: config.ValidationWebhook, Handler: adm_controller.NewAdmissionControllerServer(&adm_controller.IngressAdmission{Checker: n}), TLSConfig: ssl.NewTLSListener(n.cfg.ValidationWebhookCertPath, n.cfg.ValidationWebhookKeyPath).TLSConfig(), } } // 获取pod runtime信息 pod, err := k8s.GetPodDetails(config.Client) if err != nil { klog.Fatalf('unexpected error obtaining pod information: %v', err) } n.podInfo = pod // 实例化store(本地缓存) n.store = store.New( config.Namespace, config.ConfigMapName, config.TCPConfigMapName, config.UDPConfigMapName, config.DefaultSSLCertificate, config.ResyncPeriod, config.Client, n.updateCh, pod, config.DisableCatchAll) // 创建同步队列 n.syncQueue = task.NewTaskQueue(n.syncIngress) ... ... // 格式化template配置模板 onTemplateChange := func() { template, err := ngx_template.NewTemplate(nginx.TemplatePath) if err != nil { // this error is different from the rest because it must be clear why nginx is not working klog.Errorf(`-------------------------------------------------------------------------------Error loading new template: %v-------------------------------------------------------------------------------`, err) return } // 若模板格式化正确,则更新到nginxcontroller 对象中,并往同步队列发送一个template-change事件 n.t = template klog.Info('New NGINX configuration template loaded.') n.syncQueue.EnqueueTask(task.GetDummyObject('template-change')) } // 首次启动加载配置模板文件 ngxTpl, err := ngx_template.NewTemplate(nginx.TemplatePath) ...... n.t = ngxTpl // 监听模板文件变化 // 监听 /etc/nginx/template/nginx.tmpl 模板文件是否有变化,有变化则调用onTemplateChange _, err = watch.NewFileWatcher(nginx.TemplatePath, onTemplateChange) ... ... // 监听/etc/nginx/geoip/ 目录下配置文件变化 filesToWatch := []string{} err = filepath.Walk('/etc/nginx/geoip/', func(path string, info os.FileInfo, err error) error { ...... filesToWatch = append(filesToWatch, path) ...... }) ...... for _, f := range filesToWatch { _, err = watch.NewFileWatcher(f, func() { klog.Infof('File %v changed. Reloading NGINX', f) // 配置文件有变化则往同步队列发送一个file-change 事件 n.syncQueue.EnqueueTask(task.GetDummyObject('file-change')) }) ...... } return n}
ingress controller 结构体
type NGINXController struct { // pod runtime 信息 podInfo *k8s.PodInfo // 配置信息 cfg *Configuration // 事件通知器 recorder record.EventRecorder // 同步队列 syncQueue *task.Queue // 同步状态 syncStatus status.Syncer // 同步限流器 syncRateLimiter flowcontrol.RateLimiter stopLock *sync.Mutex stopCh chan struct{} // 更新环状channel updateCh *channels.RingChannel // 接受nginx 错误信息channel ngxErrCh chan error // 当前配置文件 runningConfig *ingress.Configuration // nginx 配置模板文件 t ngx_template.TemplateWriter // nameserver 列表 resolver []net.IP // 是否启用ipv6 isIPV6Enabled bool // 是否关闭 isShuttingDown bool // TCP代理 Proxy *TCPProxy // 本地缓存 store store.Storer // metrics 收集器 metricCollector metric.Collector // webhook validationWebhookServer *http.Server // nginx 二进制命令 command NginxExecTester}
ngx.Start()
ngx.Start() 主要做3个事情启动store 协程启动syncQueue协程监听updateCh
当从updateCh 见到变化事件时,向syncQueue 发送一个task
// file:internal/ingress/controller/nginx.go// Start starts a new NGINX master process running in the foreground.func (n *NGINXController) Start() { klog.Info('Starting NGINX Ingress controller') // 初始化同步informers 及secret n.store.Run(n.stopCh) // we need to use the defined ingress class to allow multiple leaders // in order to update information about ingress status // 定义节点选举ID (ingress class 用于区分不同集群) // 使用定义的ingress class 来允许多个leader节点更新ingress状态 electionID := fmt.Sprintf('%v-%v', n.cfg.ElectionID, class.DefaultClass) if class.IngressClass != '' { electionID = fmt.Sprintf('%v-%v', n.cfg.ElectionID, class.IngressClass) } // leader节点选举 setupLeaderElection(&leaderElectionConfig{ ...... }) cmd := n.command.ExecCommand() ...... if n.cfg.EnableSSLPassthrough { n.setupSSLProxy() } // 启动nginx klog.Info('Starting NGINX process') n.start(cmd) // 启动同步队列 go n.syncQueue.Run(time.Second, n.stopCh) // force initial sync // 发送initial-sync 事件 n.syncQueue.EnqueueTask(task.GetDummyObject('initial-sync')) // In case of error the temporal configuration file will // be available up to five minutes after the error // 每隔5分钟删除临时配置文件 go func() { for { time.Sleep(5 * time.Minute) err := cleanTempNginxCfg() ...... } }() ...... for { select { case err := <-n.ngxErrCh: if n.isShuttingDown { return } // if the nginx master process dies, the workers continue to process requests // until the failure of the configured livenessProbe and restart of the pod. // master 进程挂掉时,workerInc进程将继续处理请求,直到配置的liveness探针探测失败 if process.IsRespawnIfRequired(err) { return } // 循环从updateCh里面获取事件 case event := <-n.updateCh.Out(): if n.isShuttingDown { break } if evt, ok := event.(store.Event); ok { klog.V(3).Infof('Event %v received - object %v', evt.Type, evt.Obj) if evt.Type == store.ConfigurationEvent { // TODO: is this necessary? Consider removing this special case n.syncQueue.EnqueueTask(task.GetDummyObject('configmap-change')) continue } // 放入可忽略的同步队列 n.syncQueue.EnqueueSkippableTask(evt.Obj) } else { klog.Warningf('Unexpected event type received %T', event) } case <-n.stopCh: return } }}
事件类型
const ( // CreateEvent event associated with new objects in an informer CreateEvent EventType = 'CREATE' // UpdateEvent event associated with an object update in an informer UpdateEvent EventType = 'UPDATE' // DeleteEvent event associated when an object is removed from an informer DeleteEvent EventType = 'DELETE' // ConfigurationEvent event associated when a controller configuration object is created or updated ConfigurationEvent EventType = 'CONFIGURATION')
同步队列结构体
// Queue manages a time work queue through an independent worker that invokes the// given sync function for every work item inserted.// The queue uses an internal timestamp that allows the removal of certain elements// which timestamp is older than the last successful get operation.type Queue struct { // queue is the work queue the worker polls queue workqueue.RateLimitingInterface // sync is called for each item in the queue sync func(interface{}) error // workerDone is closed when the worker exits workerDone chan bool // fn makes a key for an API object fn func(obj interface{}) (interface{}, error) // lastSync is the Unix epoch time of the last execution of 'sync' lastSync int64}
队列类型(1) 可忽略队列 EnqueueSkippableTask(2) 不可忽略队列
// EnqueueTask enqueues ns/name of the given api object in the task queue.func (t *Queue) EnqueueTask(obj interface{}) { t.enqueue(obj, false)}// EnqueueSkippableTask enqueues ns/name of the given api object in// the task queue that can be skippedfunc (t *Queue) EnqueueSkippableTask(obj interface{}) { t.enqueue(obj, true)}// 入队列// enqueue enqueues ns/name of the given api object in the task queue.func (t *Queue) enqueue(obj interface{}, skippable bool) { if t.IsShuttingDown() { klog.Errorf('queue has been shutdown, failed to enqueue: %v', obj) return } ts := time.Now().UnixNano() if !skippable { // make sure the timestamp is bigger than lastSync ts = time.Now().Add(24 * time.Hour).UnixNano() } klog.V(3).Infof('queuing item %v', obj) key, err := t.fn(obj) if err != nil { klog.Errorf('%v', err) return } t.queue.Add(Element{ Key: key, Timestamp: ts, })}
store 协程
// file : k8s.io/ingress-nginx/internal/controller/store/store.go// Run initiates the synchronization of the informers and the initial// synchronization of the secrets.func (s *k8sStore) Run(stopCh chan struct{}) { // start informers s.informers.Run(stopCh)}
调用了informers.Run()方法起多个协程去监听ingress、secret、endpoint、service、configmap、pod 的变化
// Run initiates the synchronization of the informers against the API server.func (i *Informer) Run(stopCh chan struct{}) { // 启动secret、endpoint、service、configmap、pod 的informer go i.Secret.Run(stopCh) go i.Endpoint.Run(stopCh) go i.Service.Run(stopCh) go i.ConfigMap.Run(stopCh) go i.Pod.Run(stopCh) ...... time.Sleep(1 * time.Second) go i.Ingress.Run(stopCh) ......}
这里以监听 ingress 变化为例,接着分析具体实现
// New creates a new object store to be used in the ingress controllerfunc New( namespace, configmap, tcp, udp, defaultSSLCertificate string, resyncPeriod time.Duration, client clientset.Interface, updateCh *channels.RingChannel, pod *k8s.PodInfo, disableCatchAll bool) Storer { store := &k8sStore{ informers: &Informer{}, listers: &Lister{}, sslStore: NewSSLCertTracker(), updateCh: updateCh, backendConfig: ngx_config.NewDefault(), syncSecretMu: &sync.Mutex{}, backendConfigMu: &sync.RWMutex{}, secretIngressMap: NewObjectRefMap(), defaultSSLCertificate: defaultSSLCertificate, pod: pod, } ...... // k8sStore fulfills resolver.Resolver interface // 格式化annotation store.annotations = annotations.NewAnnotationExtractor(store) store.listers.IngressWithAnnotation.Store = cache.NewStore(cache.DeletionHandlingMetaNamespaceKeyFunc) ...... // create informers factory, enable and assign required informers // informer 工厂函数 infFactory := informers.NewSharedInformerFactoryWithOptions(client, resyncPeriod, informers.WithNamespace(namespace), informers.WithTweakListOptions(tweakListOptionsFunc)) if k8s.IsNetworkingIngressAvailable { store.informers.Ingress = infFactory.Networking().V1beta1().Ingresses().Informer() } else { store.informers.Ingress = infFactory.Extensions().V1beta1().Ingresses().Informer() } store.listers.Ingress.Store = store.informers.Ingress.GetStore() store.informers.Endpoint = infFactory.Core().V1().Endpoints().Informer() store.listers.Endpoint.Store = store.informers.Endpoint.GetStore() store.informers.Secret = infFactory.Core().V1().Secrets().Informer() store.listers.Secret.Store = store.informers.Secret.GetStore() store.informers.ConfigMap = infFactory.Core().V1().ConfigMaps().Informer() store.listers.ConfigMap.Store = store.informers.ConfigMap.GetStore() store.informers.Service = infFactory.Core().V1().Services().Informer() store.listers.Service.Store = store.informers.Service.GetStore() labelSelector := labels.SelectorFromSet(store.pod.Labels) // list and watch 机制 store.informers.Pod = cache.NewSharedIndexInformer( &cache.ListWatch{ ListFunc: func(options metav1.ListOptions) (k8sruntime.Object, error) { options.LabelSelector = labelSelector.String() return client.CoreV1().Pods(store.pod.Namespace).List(context.TODO(), options) }, WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) { options.LabelSelector = labelSelector.String() return client.CoreV1().Pods(store.pod.Namespace).Watch(context.TODO(), options) }, }, &corev1.Pod{}, resyncPeriod, cache.Indexers{}, ) store.listers.Pod.Store = store.informers.Pod.GetStore() ingDeleteHandler := func(obj interface{}) { ing, ok := toIngress(obj) if !ok { // If we reached here it means the ingress was deleted but its final state is unrecorded. tombstone, ok := obj.(cache.DeletedFinalStateUnknown) if !ok { klog.Errorf('couldn't get object from tombstone %#v', obj) return } ing, ok = tombstone.Obj.(*networkingv1beta1.Ingress) if !ok { klog.Errorf('Tombstone contained object that is not an Ingress: %#v', obj) return } } if !class.IsValid(ing) { klog.Infof('ignoring delete for ingress %v based on annotation %v', ing.Name, class.IngressKey) return } if isCatchAllIngress(ing.Spec) && disableCatchAll { klog.Infof('ignoring delete for catch-all ingress %v/%v because of --disable-catch-all', ing.Namespace, ing.Name) return } recorder.Eventf(ing, corev1.EventTypeNormal, 'DELETE', fmt.Sprintf('Ingress %s/%s', ing.Namespace, ing.Name)) store.listers.IngressWithAnnotation.Delete(ing) key := k8s.MetaNamespaceKey(ing) store.secretIngressMap.Delete(key) updateCh.In() <- Event{ Type: DeleteEvent, Obj: obj, } } ingEventHandler := cache.ResourceEventHandlerFuncs{ AddFunc: func(obj interface{}) { ing, _ := toIngress(obj) if !class.IsValid(ing) { a, _ := parser.GetStringAnnotation(class.IngressKey, ing) klog.Infof('ignoring add for ingress %v based on annotation %v with value %v', ing.Name, class.IngressKey, a) return } if isCatchAllIngress(ing.Spec) && disableCatchAll { klog.Infof('ignoring add for catch-all ingress %v/%v because of --disable-catch-all', ing.Namespace, ing.Name) return } recorder.Eventf(ing, corev1.EventTypeNormal, 'CREATE', fmt.Sprintf('Ingress %s/%s', ing.Namespace, ing.Name)) store.syncIngress(ing) store.updateSecretIngressMap(ing) store.syncSecrets(ing) updateCh.In() <- Event{ Type: CreateEvent, Obj: obj, } }, DeleteFunc: ingDeleteHandler, UpdateFunc: func(old, cur interface{}) { oldIng, _ := toIngress(old) curIng, _ := toIngress(cur) validOld := class.IsValid(oldIng) validCur := class.IsValid(curIng) if !validOld && validCur { if isCatchAllIngress(curIng.Spec) && disableCatchAll { klog.Infof('ignoring update for catch-all ingress %v/%v because of --disable-catch-all', curIng.Namespace, curIng.Name) return } klog.Infof('creating ingress %v based on annotation %v', curIng.Name, class.IngressKey) recorder.Eventf(curIng, corev1.EventTypeNormal, 'CREATE', fmt.Sprintf('Ingress %s/%s', curIng.Namespace, curIng.Name)) } else if validOld && !validCur { klog.Infof('removing ingress %v based on annotation %v', curIng.Name, class.IngressKey) ingDeleteHandler(old) return } else if validCur && !reflect.DeepEqual(old, cur) { if isCatchAllIngress(curIng.Spec) && disableCatchAll { klog.Infof('ignoring update for catch-all ingress %v/%v and delete old one because of --disable-catch-all', curIng.Namespace, curIng.Name) ingDeleteHandler(old) return } recorder.Eventf(curIng, corev1.EventTypeNormal, 'UPDATE', fmt.Sprintf('Ingress %s/%s', curIng.Namespace, curIng.Name)) } else { klog.V(3).Infof('No changes on ingress %v/%v. Skipping update', curIng.Namespace, curIng.Name) return } store.syncIngress(curIng) store.updateSecretIngressMap(curIng) store.syncSecrets(curIng) updateCh.In() <- Event{ Type: UpdateEvent, Obj: cur, } }, } secrEventHandler := cache.ResourceEventHandlerFuncs{ AddFunc: func(obj interface{}) {...}, UpdateFunc: func(old, cur interface{}) {...}, DeleteFunc: func(obj interface{}) {...}, } epEventHandler := cache.ResourceEventHandlerFuncs{ AddFunc: func(obj interface{}) {...}, DeleteFunc: func(obj interface{}) {...}, UpdateFunc: func(old, cur interface{}) {...}, } ...... cmEventHandler := cache.ResourceEventHandlerFuncs{ AddFunc: func(obj interface{}) {...}, UpdateFunc: func(old, cur interface{}) {...}, } podEventHandler := cache.ResourceEventHandlerFuncs{ AddFunc: func(obj interface{}) {...}, UpdateFunc: func(old, cur interface{}) {...}, DeleteFunc: func(obj interface{}) {...}, } serviceHandler := cache.ResourceEventHandlerFuncs{ UpdateFunc: func(old, cur interface{}) {...}, } // 注册各种类型的eventHandler store.informers.Ingress.AddEventHandler(ingEventHandler) store.informers.Endpoint.AddEventHandler(epEventHandler) store.informers.Secret.AddEventHandler(secrEventHandler) store.informers.ConfigMap.AddEventHandler(cmEventHandler) store.informers.Service.AddEventHandler(serviceHandler) store.informers.Pod.AddEventHandler(podEventHandler) // do not wait for informers to read the configmap configuration ns, name, _ := k8s.ParseNameNS(configmap) cm, err := client.CoreV1().ConfigMaps(ns).Get(context.TODO(), name, metav1.GetOptions{}) if err != nil { klog.Warningf('Unexpected error reading configuration configmap: %v', err) } store.setConfig(cm) return store}
可以看到,每种类型的informer 基本都有相关的回调方法,包括:AddFunc: func(obj interface{}) {...},UpdateFunc: func(old, cur interface{}) {...},DeleteFunc: func(obj interface{}) {...},
每个方法里面都会往updateCh 写入不同类型的事件(CreateEvent、DeleteEvent、UpdateEvent)这一步跟store 协程协同工作,informer 通过list&watch 方法监听资源变化,一旦资源有变化则向updateCh 里面写入事件,store 协程循环监听updateCh变化,一旦收到事件则往syncQueue 写入一个task
队列消费
// file : k8s.io/ingress-controller/internal/ingress/controller/nginx.go// 初始化Queuen.syncQueue = task.NewTaskQueue(n.syncIngress)// NewTaskQueue creates a new task queue with the given sync function.// The sync function is called for every element inserted into the queue.// 对于每个插入进来的项目都会调用sync functionfunc NewTaskQueue(syncFn func(interface{}) error) *Queue { return NewCustomTaskQueue(syncFn, nil)}// NewCustomTaskQueue func NewCustomTaskQueue(syncFn func(interface{}) error, fn func(interface{}) (interface{}, error)) *Queue { // syncFn(也就是syncIngress)被赋值到Queue.sync q := &Queue{ queue: workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()), sync: syncFn, workerDone: make(chan bool), fn: fn, } if fn == nil { q.fn = q.defaultKeyFunc } return q}
消费Queue队列核心方法:t.queue.Get() -> t.sync()
// file: k8s.io/ingress-nginx/internal/ingress/controller/nginx.gofunc (n *NGINXController) Start() { ...... go n.syncQueue.Run(time.Second, n.stopCh) ......}// file: k8s.io/ingress-nginx/internal/task/queue.go// Run starts processing elements in the queuefunc (t *Queue) Run(period time.Duration, stopCh <-chan struct{}) { wait.Until(t.worker, period, stopCh)}// worker processes work in the queue through sync.// 消费Queue队列func (t *Queue) worker() { for { key, quit := t.queue.Get() ...... ts := time.Now().UnixNano() item := key.(Element) // 比对最后一次同步的时间戳与Queue中取出item里面带的时间戳,如果小于最后一次同步时间戳则忽略改变更 if t.lastSync > item.Timestamp { klog.V(3).Infof('skipping %v sync (%v > %v)', item.Key, t.lastSync, item.Timestamp) t.queue.Forget(key) t.queue.Done(key) continue } klog.V(3).Infof('syncing %v', item.Key) // 调用syncIngress if err := t.sync(key); err != nil { klog.Warningf('requeuing %v, err %v', item.Key, err) t.queue.AddRateLimited(Element{ Key: item.Key, Timestamp: time.Now().UnixNano(), }) } else { t.queue.Forget(key) t.lastSync = ts } t.queue.Done(key) }}
syncIngress 工作原理
比对线上在跑的配置跟新生成的配置是否相同,并判断是否能够动态重载配置(仅更新endpoint),减少nginx频繁reload带来性能损耗.pcfg :当前格式化出来的配置n.runningConfig : 当前线上环境运行的配置
比对pcfg 和 n.runningConfig 配置,判断是否可以动态更新配置(仅endpoint列表变化)(1)支持动态更新配置:调用n.configureDynamically(pcfg)将backend 列表以json格式post 到/configuration/backends 这个LUA Handler,动态更新endpoint 列表
(2)不支持动态更新配置,调用 n.OnUpdate(*pcfg) 生成临时配置文件检测临时配置文件语法diff 临时配置文件与当前线上配置文件删除临时配置文件将新生成的配置写入线上配置文件执行nginx -s reload 重载配置
// file: k8s.io/ingress-nginx/internal/ingress/controller/controller.go// syncIngress collects all the pieces required to assemble the NGINX// configuration file and passes the resulting data structures to the backend// (OnUpdate) when a reload is deemed necessary.// 组装nginx 配置文件// 需要reload 时,调用OnUpdatefunc (n *NGINXController) syncIngress(interface{}) error { ...... ings := n.store.ListIngresses(nil) // 格式化新配置 hosts, servers, pcfg := n.getConfiguration(ings) ...... // 判断配置是否有变化 if n.runningConfig.Equal(pcfg) { klog.V(3).Infof('No configuration change detected, skipping backend reload.') return nil } ...... // 配置有变化,则判断是否需要reload nginx if !n.IsDynamicConfigurationEnough(pcfg) { klog.Infof('Configuration changes detected, backend reload required.') // 生成checksum hash值 hash, _ := hashstructure.Hash(pcfg, &hashstructure.HashOptions{ TagName: 'json', }) pcfg.ConfigurationChecksum = fmt.Sprintf('%v', hash) //调用onUpdate 方法 err := n.OnUpdate(*pcfg) ...... klog.Infof('Backend successfully reloaded.') ...... } // 是否首次同步(ingress.Configuration 结构体是否为空) isFirstSync := n.runningConfig.Equal(&ingress.Configuration{}) if isFirstSync { // For the initial sync it always takes some time for NGINX to start listening // For large configurations it might take a while so we loop and back off // 首次初始化需要耗费一定的时间,睡眠1秒 klog.Info('Initial sync, sleeping for 1 second.') time.Sleep(1 * time.Second) } // 重试机制 retry := wait.Backoff{ Steps: 15, Duration: 1 * time.Second, Factor: 0.8, Jitter: 0.1, } err := wait.ExponentialBackoff(retry, func() (bool, error) { // 动态更新nginx 配置 err := n.configureDynamically(pcfg) if err == nil { klog.V(2).Infof('Dynamic reconfiguration succeeded.') return true, nil } klog.Warningf('Dynamic reconfiguration failed: %v', err) return false, err }) ...... n.runningConfig = pcfg return nil}
判断是否可以动态更新配置不需要reload的场景
- endpoint 变化
需要reload的场景
- 新增ingress
- 新增证书配置
- ingress 增加/删除 PATH
- 删除ingress、service、secret
- Secret 更新
- 部分annotation变更,造成上述状态更新
// file: k8s.io/ingress-contoller/internal/ingress/controller/nginx.go// IsDynamicConfigurationEnough returns whether a Configuration can be// dynamically applied, without reloading the backend.// 判断是否nginx 可以动态重载,不需要执行reloadfunc (n *NGINXController) IsDynamicConfigurationEnough(pcfg *ingress.Configuration) bool { copyOfRunningConfig := *n.runningConfig copyOfPcfg := *pcfg copyOfRunningConfig.Backends = []*ingress.Backend{} copyOfPcfg.Backends = []*ingress.Backend{} clearL4serviceEndpoints(©OfRunningConfig) clearL4serviceEndpoints(©OfPcfg) copyOfRunningConfig.ControllerPodsCount = 0 copyOfPcfg.ControllerPodsCount = 0 clearCertificates(©OfRunningConfig) clearCertificates(©OfPcfg) return copyOfRunningConfig.Equal(©OfPcfg)}
不能动态更新,调用nginx reload 重载配置
// OnUpdate is called by the synchronization loop whenever configuration// changes were detected. The received backend Configuration is merged with the// configuration ConfigMap before generating the final configuration file.// Returns nil in case the backend was successfully reloaded.// 当监听到配置发生变化,同步循环将调用OnUdate// 接收到的backend 配置会跟当前配置的configmap 进行合并func (n *NGINXController) OnUpdate(ingressCfg ingress.Configuration) error { cfg := n.store.GetBackendConfiguration() cfg.Resolver = n.resolver // 生成临时配置 content, err := n.generateTemplate(cfg, ingressCfg) ...... // 检查配置是否正确 err = n.testTemplate(content) ...... if klog.V(2) { src, _ := ioutil.ReadFile(cfgPath) if !bytes.Equal(src, content) { tmpfile, err := ioutil.TempFile('', 'new-nginx-cfg') if err != nil { return err } defer tmpfile.Close() // 创建临时配置文件 err = ioutil.WriteFile(tmpfile.Name(), content, file.ReadWriteByUser) ...... // diff 比对生成的临时配置跟当前生效配置 diffOutput, err := exec.Command('diff', '-I', ''# Configuration.*'', '-u', cfgPath, tmpfile.Name()).CombinedOutput() ...... klog.Infof('NGINX configuration diff:n%v', string(diffOutput)) // 删除临时配置文件 os.Remove(tmpfile.Name()) } } // 将新配置写入cfgPath err = ioutil.WriteFile(cfgPath, content, file.ReadWriteByUser) ...... // reload nginx o, err := n.command.ExecCommand('-s', 'reload').CombinedOutput() ...... return nil}
动态更新
// file: k8s.io/ingress-contoller/internal/ingress/controller/nginx.go// configureDynamically encodes new Backends in JSON format and POSTs the// payload to an internal HTTP endpoint handled by Lua.// 以json 的格式封装backend 列表并post 到lua APIfunc (n *NGINXController) configureDynamically(pcfg *ingress.Configuration) error { backendsChanged := !reflect.DeepEqual(n.runningConfig.Backends, pcfg.Backends) if backendsChanged { // 更新endpoint 列表 err := configureBackends(pcfg.Backends) ...... } // 比对TCP/UDP endpoint 列表 streamConfigurationChanged := !reflect.DeepEqual(n.runningConfig.TCPEndpoints, pcfg.TCPEndpoints) || !reflect.DeepEqual(n.runningConfig.UDPEndpoints, pcfg.UDPEndpoints) if streamConfigurationChanged { err := updateStreamConfiguration(pcfg.TCPEndpoints, pcfg.UDPEndpoints) ...... } if n.runningConfig.ControllerPodsCount != pcfg.ControllerPodsCount { // post pod 数目 statusCode, _, err := nginx.NewPostStatusRequest('/configuration/general', 'application/json', ingress.GeneralConfig{ ControllerPodsCount: pcfg.ControllerPodsCount, }) ...... } // 比对servers 变化 serversChanged := !reflect.DeepEqual(n.runningConfig.Servers, pcfg.Servers) if serversChanged { err := configureCertificates(pcfg.Servers) ...... } return nil}
以JSON 格式 POST 调用LUA Handler /configuration/backends
// file: k8s.io/ingress-nginx/internal/controller/nginx.gofunc configureBackends(rawBackends []*ingress.Backend) error { backends := make([]*ingress.Backend, len(rawBackends)) for i, backend := range rawBackends { var service *apiv1.Service if backend.Service != nil { service = &apiv1.Service{Spec: backend.Service.Spec} } luaBackend := &ingress.Backend{ Name: backend.Name, Port: backend.Port, SSLPassthrough: backend.SSLPassthrough, SessionAffinity: backend.SessionAffinity, UpstreamHashBy: backend.UpstreamHashBy, LoadBalancing: backend.LoadBalancing, Service: service, NoServer: backend.NoServer, TrafficShapingPolicy: backend.TrafficShapingPolicy, AlternativeBackends: backend.AlternativeBackends, } var endpoints []ingress.Endpoint for _, endpoint := range backend.Endpoints { endpoints = append(endpoints, ingress.Endpoint{ Address: endpoint.Address, Port: endpoint.Port, }) } luaBackend.Endpoints = endpoints backends[i] = luaBackend } // 更新endpoint 列表 statusCode, _, err := nginx.NewPostStatusRequest('/configuration/backends', 'application/json', backends) if err != nil { return err } if statusCode != http.StatusCreated { return fmt.Errorf('unexpected error code: %d', statusCode) } return nil}