admin管理员组

文章数量:1037775

k8s client

本文阅读代码链接:.30

下面代码全部在 util/workqueue 文件中:

一. workqueue基本概念

在 client-go 的 util/workqueue 包中,主要有三个队列:普通队列、延时队列和限速队列。每个队列都是在前一个队列的基础上实现的,逐层添加新功能。按照 Queue -> DelayingQueue -> RateLimitingQueue 的顺序来逐层分析限速队列的实现。

普通队列 ( Queue ) :

  • 普通队列的实现主要是通过一个简单的队列结构来管理待处理的任务。
  • 它提供了基本的队列操作,如添加任务 ( Add )、获取任务 ( Get )、完成任务 ( Done ) 等。
  • 通过 sync.Cond 来实现队列的同步操作,确保线程安全。

延时队列 ( DelayingQueue ) :

  • 延时队列在普通队列的基础上增加了延时功能。
  • 它允许任务在指定的时间后再加入队列进行处理。
  • 使用 heap 实现优先级队列,确保任务按照时间顺序进行处理。
  • 通过 clock 来管理时间,支持真实时间和模拟时间。

限速队列 ( RateLimitingQueue ) :

  • 限速队列在延时队列的基础上增加了限速功能。
  • 使用 RateLimiter 来控制任务的处理速度,避免任务过快地重新加入队列。
  • 提供了 AddRateLimited 方法,根据 RateLimiter 的策略来决定任务何时可以重新加入队列。
  • 通过 Forget 方法来清除任务的重试记录。
  • 通过这种层层递进的设计, RateLimitingQueue 能够在普通队列和延时队列的基础上,提供更复杂的任务处理能力,适用于需要限速的场景。

二. workqueue主要作用

workqueue 主要用于 Kubernetes 控制器中处理事件和任务的队列系统,它具有以下主要作用:

  1. 任务管理 :提供了一种机制来管理需要处理的任务,确保任务按照一定的顺序被处理。
  2. 并发控制 :通过队列机制控制并发处理的任务数量,避免系统过载。
  3. 延迟处理 :支持将任务延迟一段时间后再处理,适用于需要重试的场景。
  4. 限速处理 :控制任务重新入队的频率,避免因频繁重试导致的系统压力。
  5. 错误处理 :提供了一种机制来处理任务执行失败的情况,支持重试策略。

三. 普通队列 - Queue

3.1 接口和结构体

普通队列定义了基本的队列接口和实现:

代码语言:go复制
// Interface 定义了队列的基本操作
type Interface interface {
	Add(item interface{})
	Len() int
	Get() (item interface{}, shutdown bool)
	Done(item interface{})
	ShutDown()
	ShutDownWithDrain()
	ShuttingDown() bool
}

// Type 是工作队列的具体实现
type Type struct {
    // queue 定义了我们处理项目的顺序
    queue []t
    
    // dirty 定义了所有需要处理的项目
    dirty set
    
    // processing 包含当前正在处理的项目
    // 这些项目可能同时在 dirty 集合中
	processing set

	cond *sync.Cond

	shuttingDown bool
	drain        bool

	metrics queueMetrics

	unfinishedWorkUpdatePeriod time.Duration
	clock                      clock.WithTicker
}

3.2 核心方法:

Add 方法

Add 方法用于将项目标记为需要处理

工作流程:

  1. 首先检查队列是否正在关闭,如果是则直接返回。
  2. 检查项目是否已经在 dirty 集合中,如果是则直接返回。
  3. 将项目添加到 dirty 集合中。
  4. 如果项目不在 processing 集合中,则将其添加到队列中。
  5. 通过 cond.Signal() 唤醒一个等待的 goroutine 来处理新添加的项目。
代码语言:go复制
func (q *Type) Add(item interface{}) {
    // 获取锁,确保线程安全
    q.cond.L.Lock()
    defer q.cond.L.Unlock()
    
    // 检查队列是否正在关闭,如果是则直接返回
    if q.shuttingDown {
        return
    }
    
    // 检查项目是否已经在 dirty 集合中,如果是则直接返回
    // 这避免了重复添加相同的项目
    if q.dirty.has(item) {
        return
    }

    // 更新指标,记录添加操作
    q.metrics.add(item)

    // 将项目添加到 dirty 集合中,标记为需要处理
    q.dirty.insert(item)
    
    // 如果项目已经在 processing 集合中(正在被处理),则不添加到队列
    // 当处理完成时,Done 方法会检查 dirty 集合并重新添加到队列
    if q.processing.has(item) {
        return
    }

    // 将项目添加到队列末尾
    q.queue = append(q.queue, item)
    
    // 通知一个等待的 goroutine 有新项目可处理
    q.cond.Signal()
}
Get方法

Get 方法用于从队列中获取待处理的项目

工作流程:

  1. 如果队列为空且未关闭,则阻塞等待。
  2. 如果队列为空且正在关闭,则返回 shutdown = true。
  3. 从队列头部取出一个项目。
  4. 将项目从 dirty 集合中删除,并添加到 processing 集合中。
  5. 返回项目和 shutdown = false。
代码语言:go复制
// Get 阻塞直到可以返回一个要处理的项目
// 如果 shutdown = true,调用者应该结束其 goroutine
// 处理完项目后必须调用 Done
func (q *Type) Get() (item interface{}, shutdown bool) {
    // 获取锁,确保线程安全
    q.cond.L.Lock()
    defer q.cond.L.Unlock()
    
    // 如果队列为空且未关闭,则阻塞等待
    // 这里使用 for 循环而不是 if 语句,是为了防止虚假唤醒
    for len(q.queue) == 0 && !q.shuttingDown {
        q.cond.Wait()
    }
    
    // 如果队列为空,说明队列正在关闭
    // 返回 nil 和 true,通知调用者结束其 goroutine
    if len(q.queue) == 0 {
        return nil, true
    }

    // 从队列头部取出一个项目
    item = q.queue[0]
    
    // 将队列头部元素设为 nil,避免内存泄漏
    // 注释解释了底层数组仍然存在并引用此对象,所以需要显式设为 nil
    q.queue[0] = nil
    
    // 更新队列,移除头部元素
    q.queue = q.queue[1:]

    // 更新指标,记录获取操作
    q.metrics.get(item)

    // 将项目添加到 processing 集合中,标记为正在处理
    q.processing.insert(item)
    
    // 将项目从 dirty 集合中删除,因为它已经被取出处理
    q.dirty.delete(item)

    // 返回项目和 false,表示队列未关闭
    return item, false
}
Done方法

Done 方法用于标记项目处理完成

工作流程:

  1. 将项目从 processing 集合中删除
  2. 如果项目在 dirty 集合中,则将其重新添加到队列中
  3. 如果 processing 集合为空,则通知等待的 goroutine
代码语言:go复制
// Done 标记项目处理完成
// 如果在处理过程中项目被再次标记为 dirty,则会重新添加到队列中进行重新处理
func (q *Type) Done(item interface{}) {
    // 获取锁,确保线程安全
    q.cond.L.Lock()
    defer q.cond.L.Unlock()

    // 更新指标,记录完成操作
    q.metrics.done(item)

    // 将项目从 processing 集合中删除,表示处理完成
    q.processing.delete(item)
    
    // 如果项目在 dirty 集合中(说明在处理过程中被再次标记为需要处理)
    // 则将其重新添加到队列中,以便再次处理
    if q.dirty.has(item) {
        q.queue = append(q.queue, item)
        q.cond.Signal()  // 通知一个等待的 goroutine 有新项目可处理
    } else if q.processing.len() == 0 {
        // 如果 processing 集合为空(没有正在处理的项目)
        // 也发送信号,用于唤醒等待队列排空的 goroutine
        q.cond.Signal()
    }
}

四. 延时队列 - DelayingQueue

4.1 接口和结构体

延时队列在普通队列的基础上增加了延时功能:

代码语言:go复制
// DelayingInterface 是一个可以在稍后时间添加项目的接口
// 这使得在失败后重新入队项目更容易,而不会陷入热循环
type DelayingInterface interface {
    Interface
    // AddAfter 在指定的持续时间过后将项目添加到工作队列
    AddAfter(item interface{}, duration time.Duration)
}

// delayingType 包装了一个 Interface 并提供延迟重新入队功能
type delayingType struct {
    Interface

    // clock 跟踪延迟触发的时间
    clock clock.Clock

    // stopCh 让我们向等待循环发送关闭信号
    stopCh chan struct{}
    // stopOnce 保证我们只发送一次关闭信号
    stopOnce sync.Once

    // heartbeat 确保我们等待不超过 maxWait 就触发
    heartbeat clock.Ticker

    // waitingForAddCh 是一个缓冲通道,用于提供 waitingForAdd
    waitingForAddCh chan *waitFor

    // metrics 计算重试次数
    metrics retryMetrics
}

// waitFor 保存要添加的数据和应该添加的时间
type waitFor struct {
    data    t
    readyAt time.Time
    // 在优先级队列中的索引
    index int
}

延时队列使用优先级队列来管理延时任务:

代码语言:go复制
// waitForPriorityQueue 实现了 waitFor 项目的优先级队列
//
// waitForPriorityQueue 实现了 heap.Interface。时间上最早发生的项目
//(即具有最小 readyAt 的项目)位于根部(索引 0)。
// Peek 返回索引 0 处的最小项目。Pop 返回最小项目,
// 该项目已从队列中移除并由 container/heap 放置在索引 Len()-1 处。
// Push 在索引 Len() 处添加一个项目,container/heap 将其渗透到正确的位置。
type waitForPriorityQueue []*waitFor

func (pq waitForPriorityQueue) Len() int {
    return len(pq)
}
func (pq waitForPriorityQueue) Less(i, j int) bool {
    return pq[i].readyAt.Before(pq[j].readyAt)
}
func (pq waitForPriorityQueue) Swap(i, j int) {
    pq[i], pq[j] = pq[j], pq[i]
    pq[i].index = i
    pq[j].index = j
}

// Push 向队列添加一个项目。不应直接调用 Push;
// 而是使用 `heap.Push`。
func (pq *waitForPriorityQueue) Push(x interface{}) {
    n := len(*pq)
    item := x.(*waitFor)
    item.index = n
    *pq = append(*pq, item)
}

// Pop 从队列中移除一个项目。不应直接调用 Pop;
// 而是使用 `heap.Pop`。
func (pq *waitForPriorityQueue) Pop() interface{} {
    n := len(*pq)
    item := (*pq)[n-1]
    item.index = -1
    *pq = (*pq)[0:(n - 1)]
    return item
}

// Peek 返回队列开头的项目,而不移除项目或以其他方式改变队列。
// 可以安全地直接调用。
func (pq waitForPriorityQueue) Peek() interface{} {
    return pq[0]
}

4.2 核心方法:

AddAfter 方法

AddAfter 方法它允许在指定的延迟后将项目添加到队列中

工作流程:

  1. 首先检查队列是否正在关闭,如果是则直接返回
  2. 更新重试指标
  3. 如果延迟时间小于等于 0,则立即将项目添加到队列中
  4. 否则,创建一个 waitFor 结构体,包含项目数据和准备时间,并将其发送到 waitingForAddCh 通道
代码语言:go复制
func (q *delayingType) AddAfter(item interface{}, duration time.Duration) {
    // 首先检查队列是否正在关闭,如果是则直接返回
    if q.ShuttingDown() {
        return
    }

    // 更新重试指标,记录重试操作
    q.metrics.retry()

    // 如果延迟时间小于等于 0,则立即将项目添加到队列中
    if duration <= 0 {
        q.Add(item)
        return
    }

    // 使用 select 语句处理通道操作
    select {
    case <-q.stopCh:
        // 如果队列已关闭(stopCh 被关闭),则解除阻塞并返回
    case q.waitingForAddCh <- &waitFor{data: item, readyAt: q.clock.Now().Add(duration)}:
        // 创建一个 waitFor 结构体,包含项目数据和准备时间
        // 将其发送到 waitingForAddCh 通道,由 waitingLoop 协程处理
    }
}
waitingLoop 方法

waitingLoop 方法是延时队列的核心循环,它负责管理延时任务

工作流程:

初始化一个优先级队列 waitingForQueue 和一个映射 waitingEntryByData

无限循环中:

  1. 检查队列是否正在关闭,如果是则返回
  2. 获取当前时间
  3. 检查优先级队列中是否有准备好的项目,如果有则将其添加到队列中
  4. 为优先级队列中的第一个项目设置定时器
  5. 使用 select 语句等待以下事件之一:
    • 队列关闭信号
    • 心跳信号(确保不会等待太长时间)
    • 下一个项目准备好的信号
    • 新的延时任务到达
  6. 处理新的延时任务:如果已经准备好则立即添加到队列中,否则插入到优先级队列中
  7. 尝试排空通道,处理所有待处理的延时任务
代码语言:go复制
// waitingLoop 运行直到工作队列关闭,并保持检查要添加的项目列表
func (q *delayingType) waitingLoop() {
    // 注册 panic 处理函数,确保即使发生 panic 也能恢复
    defer utilruntime.HandleCrash()

    // 创建一个永不触发的通道,用于 select 语句中的占位符
    never := make(<-chan time.Time)

    // 声明一个定时器变量,用于等待下一个项目准备好的时间
    var nextReadyAtTimer clock.Timer

    // 初始化一个优先级队列,用于存储延时项目
    waitingForQueue := &waitForPriorityQueue{}
    // 使用 container/heap 包实现的优先级队列,确保最早准备好的项目在队列头部。
    heap.Init(waitingForQueue)

    // 创建一个映射,用于快速查找已存在的项目
    waitingEntryByData := map[t]*waitFor{}

    // 无限循环,直到队列关闭
    for {
        // 检查队列是否正在关闭,如果是则返回
        if q.Interface.ShuttingDown() {
            return
        }

        // 获取当前时间
        now := q.clock.Now()

        // 处理所有已准备好的项目
        for waitingForQueue.Len() > 0 {
            // 查看队列头部的项目
            entry := waitingForQueue.Peek().(*waitFor)
            // 如果项目还没准备好,则跳出循环
            if entry.readyAt.After(now) {
                break
            }

            // 从优先级队列中弹出项目
            entry = heap.Pop(waitingForQueue).(*waitFor)
            // 将项目添加到工作队列中
            q.Add(entry.data)
            // 从映射中删除项目
            delete(waitingEntryByData, entry.data)
        }

        // 设置下一个项目的定时器
        nextReadyAt := never
        if waitingForQueue.Len() > 0 {
            // 如果已有定时器,先停止它
            if nextReadyAtTimer != nil {
                nextReadyAtTimer.Stop()
            }
            // 获取队列头部的项目
            entry := waitingForQueue.Peek().(*waitFor)
            // 创建一个新的定时器,等待到项目的准备时间
            nextReadyAtTimer = q.clock.NewTimer(entry.readyAt.Sub(now))
            // 获取定时器的通道
            nextReadyAt = nextReadyAtTimer.C()
        }

        // 使用 select 语句等待多个事件
        select {
        case <-q.stopCh:
            // 如果收到停止信号,则返回
            return

        case <-q.heartbeat.C():
            // 如果收到心跳信号,继续循环
            // 心跳确保我们不会等待太长时间

        case <-nextReadyAt:
            // 如果下一个项目准备好了,继续循环

        case waitEntry := <-q.waitingForAddCh:
            // 如果收到新的延时项目
            if waitEntry.readyAt.After(q.clock.Now()) {
                // 如果项目还没准备好,插入到优先级队列中
                insert(waitingForQueue, waitingEntryByData, waitEntry)
            } else {
                // 如果项目已经准备好,直接添加到工作队列中
                q.Add(waitEntry.data)
            }

            // 尝试排空通道,处理所有待处理的延时任务
            drained := false
            for !drained {
                select {
                case waitEntry := <-q.waitingForAddCh:
                    // 处理更多的延时项目
                    if waitEntry.readyAt.After(q.clock.Now()) {
                        insert(waitingForQueue, waitingEntryByData, waitEntry)
                    } else {
                        q.Add(waitEntry.data)
                    }
                default:
                    // 如果通道为空,则标记为已排空
                    drained = true
                }
            }
        }
    }
}
insert 方法

insert 函数是延时队列中的辅助函数,用于将延时项目添加到优先级队列中,或者更新已存在项目的准备时间

工作流程:

  1. 检查项目是否已经存在于映射中
  2. 如果存在,则仅在新的准备时间早于现有准备时间的情况下更新时间
  3. 如果不存在,则将项目添加到优先级队列和映射中
代码语言:go复制
// insert 将条目添加到优先级队列,或者如果它已经存在于队列中,则更新 readyAt
func insert(q *waitForPriorityQueue, knownEntries map[t]*waitFor, entry *waitFor) {
    // 检查项目是否已经存在于映射中
    existing, exists := knownEntries[entry.data]
    
    // 如果项目已存在
    if exists {
        // 仅在新的准备时间早于现有准备时间的情况下更新时间
        // 这确保了项目总是以最早的准备时间被处理
        if existing.readyAt.After(entry.readyAt) {
            // 更新准备时间
            existing.readyAt = entry.readyAt
            // 修复堆,确保优先级队列的顺序正确
            heap.Fix(q, existing.index)
        }
        
        // 如果新的准备时间不早于现有准备时间,则不做任何更改
        return
    }
    
    // 如果项目不存在,则将其添加到优先级队列中
    heap.Push(q, entry)
    // 同时将项目添加到映射中,以便快速查找
    knownEntries[entry.data] = entry
}

五. 限速队列 - RateLimitingQueue

5.1 接口和结构体

限速队列在延时队列的基础上增加了限速功能,它使用 RateLimiter 来控制任务的处理速度,避免任务过快地重新加入队列。

代码语言:go复制
// RateLimitingInterface 是一个限制项目添加到队列速率的接口
type RateLimitingInterface interface {
    DelayingInterface

    // AddRateLimited 在速率限制器允许的情况下将项目添加到工作队列
    AddRateLimited(item interface{})

    // Forget 表示项目已完成重试
    // 无论是永久失败还是成功,我们都将停止速率限制器跟踪它
    // 这只清除 `rateLimiter`,你仍然需要在队列上调用 `Done`
    Forget(item interface{})

    // NumRequeues 返回项目被重新入队的次数
    NumRequeues(item interface{}) int
}

// rateLimitingType 包装了一个 RateLimiter Interface 并提供限速重新入队功能
type rateLimitingType struct {
    DelayingInterface

    rateLimiter RateLimiter
}

RateLimiter 接口定义了限速器的基本操作:

代码语言:go复制
// RateLimiter 控制项目重新入队的速率
type RateLimiter interface {
    // When 获取一个项目并返回它应该在多长时间后重新入队
    When(item interface{}) time.Duration
    // NumRequeues 返回项目被重新入队的次数
    NumRequeues(item interface{}) int
    // Forget 表示项目已完成重试,不再需要跟踪
    Forget(item interface{})
}

5.2 核心方法:

AddRateLimited 方法

AddRateLimited 方法是限速队列的核心方法,它根据限速器的策略来决定任务何时可以重新加入队列

工作流程:

  1. 调用 q.rateLimiter.When(item) 获取项目应该在多长时间后重新入队
  2. 调用 q.DelayingInterface.AddAfter 方法,将项目延迟指定时间后添加到队列中
代码语言:go复制
// AddRateLimited 根据速率限制器允许的时间将项目添加到队列
func (q *rateLimitingType) AddRateLimited(item interface{}) {
    q.DelayingInterface.AddAfter(item, q.rateLimiter.When(item))
}
Forget 方法

Forget 方法用于清除任务的重试记录

工作流程:

  1. 调用 rateLimiter.Forget(item) 清除项目的重试记录
代码语言:go复制
// Forget 表示项目已完成重试
func (q *rateLimitingType) Forget(item interface{}) {
    q.rateLimiter.Forget(item)
}
NumRequeues 方法

NumRequeues 方法用于获取任务被重新入队的次数

工作流程:

  1. 调用 rateLimiter.NumRequeues(item) 获取项目被重新入队的次数
代码语言:go复制
// NumRequeues 返回项目被重新入队的次数
func (q *rateLimitingType) NumRequeues(item interface{}) int {
    return q.rateLimiter.NumRequeues(item)
}

5.3 限速器实现

Kubernetes 提供了多种限速器实现,以适应不同的场景

限速器在util/workqueue/default_rate_limiters.go

ItemExponentialFailureRateLimiter

指数退避限速器,随着重试次数的增加,延迟时间呈指数增长

  • failuresLock :用于保护 failures 映射的互斥锁,确保线程安全。
  • failures :记录每个项目的重试次数。
  • baseDelay :基础延迟时间。
  • maxDelay :最大延迟时间,防止延迟时间过长。
代码语言:go复制
// ItemExponentialFailureRateLimiter 对每个项目进行指数退避
type ItemExponentialFailureRateLimiter struct {
    failuresLock sync.Mutex
    failures     map[interface{}]int

    baseDelaySec int
    maxDelaySec  int
}

// When 获取一个项目并返回它应该在多长时间后重新入队
func (r *ItemExponentialFailureRateLimiter) When(item interface{}) time.Duration {
    r.failuresLock.Lock()
    defer r.failuresLock.Unlock()

    // 获取项目的重试次数
    exp := r.failures[item]
    // 增加项目的重试次数
    r.failures[item] = r.failures[item] + 1

    // 计算指数退避延迟时间
    backoff := float64(r.baseDelay.Nanoseconds()) * math.Pow(2, float64(exp))
    // 防止延迟时间溢出
    if backoff > math.MaxInt64 {
        return r.maxDelay
    }

    // 将计算的延迟时间转换为 Duration 类型
    calculated := time.Duration(backoff)
    // 如果计算的延迟时间超过最大延迟时间,则返回最大延迟时间
    if calculated > r.maxDelay {
        return r.maxDelay
    }

    // 返回计算的延迟时间
    return calculated
}
ItemFastSlowRateLimiter

快慢限速器,前几次重试使用较短的延迟,之后使用较长的延迟

  • failuresLock :用于保护 failures 映射的互斥锁,确保线程安全。
  • failures :记录每个项目的重试次数。
  • maxFastAttempts :最大快速重试次数。
  • fastDelay :快速重试的延迟时间。
  • slowDelay :慢速重试的延迟时间。
代码语言:go复制
// ItemFastSlowRateLimiter 对前 N 次失败使用快速重试,之后使用慢速重试
type ItemFastSlowRateLimiter struct {
    failuresLock sync.Mutex
    failures     map[interface{}]int

    fastDelay time.Duration
    slowDelay time.Duration
    threshold int
}

// When 获取一个项目并返回它应该在多长时间后重新入队
func (r *ItemFastSlowRateLimiter) When(item interface{}) time.Duration {
    r.failuresLock.Lock()
    defer r.failuresLock.Unlock()

    // 增加项目的重试次数
    r.failures[item] = r.failures[item] + 1

    // 如果重试次数小于或等于最大快速重试次数,返回快速重试延迟
    if r.failures[item] <= r.maxFastAttempts {
        return r.fastDelay
    }

    // 否则,返回慢速重试延迟
    return r.slowDelay
}
MaxOfRateLimiter

组合多个限速器,使用其中延迟最长的一个

  • limiters :存储多个 RateLimiter 实例的切片。
代码语言:go复制
// MaxOfRateLimiter 使用多个限速器中延迟最长的一个
type MaxOfRateLimiter struct {
    limiters []RateLimiter
}

// When 获取一个项目并返回它应该在多长时间后重新入队
func (r *MaxOfRateLimiter) When(item interface{}) time.Duration {
    ret := time.Duration(0)
    for _, limiter := range r.limiters {
        // 调用每个限速器的 When 方法
        curr := limiter.When(item)
        // 记录最长的延迟时间
        if curr > ret {
            ret = curr
        }
    }

    // 返回最长的延迟时间
    return ret
}

六. 使用示例

6.1 Queue

代码语言:go复制
// 创建一个普通队列
queue := workqueue.New()

// 添加一个项目
queue.Add("task1")

// 获取一个项目
item, shutdown := queue.Get()
if !shutdown {
    // 处理项目
    fmt.Println("处理项目:", item)
    
    // 标记项目处理完成
    queue.Done(item)
}

// 关闭队列
queue.ShutDown()

6.2 DelayingQueue

代码语言:go复制
// 创建一个延时队列
delayingQueue := workqueue.NewDelayingQueue()

// 添加一个延时项目,5秒后处理
delayingQueue.AddAfter("task2", 5*time.Second)

// 获取一个项目
item, shutdown := delayingQueue.Get()
if !shutdown {
    // 处理项目
    fmt.Println("处理项目:", item)
    
    // 标记项目处理完成
    delayingQueue.Done(item)
}

// 关闭队列
delayingQueue.ShutDown()

6.3 RateLimitingQueue

代码语言:go复制
// 创建一个指数退避限速器
rateLimiter := workqueue.DefaultControllerRateLimiter()

// 创建一个限速队列
rateLimitingQueue := workqueue.NewRateLimitingQueue(rateLimiter)

// 添加一个限速项目
rateLimitingQueue.AddRateLimited("task3")

// 获取一个项目
item, shutdown := rateLimitingQueue.Get()
if !shutdown {
    // 处理项目
    fmt.Println("处理项目:", item)
    
    // 如果处理成功,忘记项目的重试历史
    rateLimitingQueue.Forget(item)
    
    // 标记项目处理完成
    rateLimitingQueue.Done(item)
}

// 如果处理失败,重新添加到队列
if processFailed {
    rateLimitingQueue.AddRateLimited("task3")
}

// 关闭队列
rateLimitingQueue.ShutDown()

6.4 实际场景

在 Kubernetes 控制器中,通常会使用限速队列来处理资源的变更事件:

代码语言:go复制
// 创建一个限速队列
queue := workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter())

// 添加事件处理函数
informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
    AddFunc: func(obj interface{}) {
        key, err := cache.MetaNamespaceKeyFunc(obj)
        if err == nil {
            queue.Add(key)
        }
    },
    UpdateFunc: func(old, new interface{}) {
        key, err := cache.MetaNamespaceKeyFunc(new)
        if err == nil {
            queue.Add(key)
        }
    },
    DeleteFunc: func(obj interface{}) {
        key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(obj)
        if err == nil {
            queue.Add(key)
        }
    },
})

// 处理队列中的项目
for {
    // 获取一个项目
    key, quit := queue.Get()
    if quit {
        return
    }
    
    // 处理完成后调用 Done
    defer queue.Done(key)
    
    // 处理项目
    err := processItem(key.(string))
    if err == nil {
        // 处理成功,忘记项目的重试历史
        queue.Forget(key)
    } else if queue.NumRequeues(key) < maxRetries {
        // 处理失败,重新添加到队列
        queue.AddRateLimited(key)
    } else {
        // 超过最大重试次数,放弃处理
        queue.Forget(key)
        utilruntime.HandleError(err)
    }
}

通过这种方式,Kubernetes 控制器可以有效地处理资源变更事件,并在处理失败时进行有限次数的重试,避免因频繁重试导致的系统压力。

k8s client

本文阅读代码链接:.30

下面代码全部在 util/workqueue 文件中:

一. workqueue基本概念

在 client-go 的 util/workqueue 包中,主要有三个队列:普通队列、延时队列和限速队列。每个队列都是在前一个队列的基础上实现的,逐层添加新功能。按照 Queue -> DelayingQueue -> RateLimitingQueue 的顺序来逐层分析限速队列的实现。

普通队列 ( Queue ) :

  • 普通队列的实现主要是通过一个简单的队列结构来管理待处理的任务。
  • 它提供了基本的队列操作,如添加任务 ( Add )、获取任务 ( Get )、完成任务 ( Done ) 等。
  • 通过 sync.Cond 来实现队列的同步操作,确保线程安全。

延时队列 ( DelayingQueue ) :

  • 延时队列在普通队列的基础上增加了延时功能。
  • 它允许任务在指定的时间后再加入队列进行处理。
  • 使用 heap 实现优先级队列,确保任务按照时间顺序进行处理。
  • 通过 clock 来管理时间,支持真实时间和模拟时间。

限速队列 ( RateLimitingQueue ) :

  • 限速队列在延时队列的基础上增加了限速功能。
  • 使用 RateLimiter 来控制任务的处理速度,避免任务过快地重新加入队列。
  • 提供了 AddRateLimited 方法,根据 RateLimiter 的策略来决定任务何时可以重新加入队列。
  • 通过 Forget 方法来清除任务的重试记录。
  • 通过这种层层递进的设计, RateLimitingQueue 能够在普通队列和延时队列的基础上,提供更复杂的任务处理能力,适用于需要限速的场景。

二. workqueue主要作用

workqueue 主要用于 Kubernetes 控制器中处理事件和任务的队列系统,它具有以下主要作用:

  1. 任务管理 :提供了一种机制来管理需要处理的任务,确保任务按照一定的顺序被处理。
  2. 并发控制 :通过队列机制控制并发处理的任务数量,避免系统过载。
  3. 延迟处理 :支持将任务延迟一段时间后再处理,适用于需要重试的场景。
  4. 限速处理 :控制任务重新入队的频率,避免因频繁重试导致的系统压力。
  5. 错误处理 :提供了一种机制来处理任务执行失败的情况,支持重试策略。

三. 普通队列 - Queue

3.1 接口和结构体

普通队列定义了基本的队列接口和实现:

代码语言:go复制
// Interface 定义了队列的基本操作
type Interface interface {
	Add(item interface{})
	Len() int
	Get() (item interface{}, shutdown bool)
	Done(item interface{})
	ShutDown()
	ShutDownWithDrain()
	ShuttingDown() bool
}

// Type 是工作队列的具体实现
type Type struct {
    // queue 定义了我们处理项目的顺序
    queue []t
    
    // dirty 定义了所有需要处理的项目
    dirty set
    
    // processing 包含当前正在处理的项目
    // 这些项目可能同时在 dirty 集合中
	processing set

	cond *sync.Cond

	shuttingDown bool
	drain        bool

	metrics queueMetrics

	unfinishedWorkUpdatePeriod time.Duration
	clock                      clock.WithTicker
}

3.2 核心方法:

Add 方法

Add 方法用于将项目标记为需要处理

工作流程:

  1. 首先检查队列是否正在关闭,如果是则直接返回。
  2. 检查项目是否已经在 dirty 集合中,如果是则直接返回。
  3. 将项目添加到 dirty 集合中。
  4. 如果项目不在 processing 集合中,则将其添加到队列中。
  5. 通过 cond.Signal() 唤醒一个等待的 goroutine 来处理新添加的项目。
代码语言:go复制
func (q *Type) Add(item interface{}) {
    // 获取锁,确保线程安全
    q.cond.L.Lock()
    defer q.cond.L.Unlock()
    
    // 检查队列是否正在关闭,如果是则直接返回
    if q.shuttingDown {
        return
    }
    
    // 检查项目是否已经在 dirty 集合中,如果是则直接返回
    // 这避免了重复添加相同的项目
    if q.dirty.has(item) {
        return
    }

    // 更新指标,记录添加操作
    q.metrics.add(item)

    // 将项目添加到 dirty 集合中,标记为需要处理
    q.dirty.insert(item)
    
    // 如果项目已经在 processing 集合中(正在被处理),则不添加到队列
    // 当处理完成时,Done 方法会检查 dirty 集合并重新添加到队列
    if q.processing.has(item) {
        return
    }

    // 将项目添加到队列末尾
    q.queue = append(q.queue, item)
    
    // 通知一个等待的 goroutine 有新项目可处理
    q.cond.Signal()
}
Get方法

Get 方法用于从队列中获取待处理的项目

工作流程:

  1. 如果队列为空且未关闭,则阻塞等待。
  2. 如果队列为空且正在关闭,则返回 shutdown = true。
  3. 从队列头部取出一个项目。
  4. 将项目从 dirty 集合中删除,并添加到 processing 集合中。
  5. 返回项目和 shutdown = false。
代码语言:go复制
// Get 阻塞直到可以返回一个要处理的项目
// 如果 shutdown = true,调用者应该结束其 goroutine
// 处理完项目后必须调用 Done
func (q *Type) Get() (item interface{}, shutdown bool) {
    // 获取锁,确保线程安全
    q.cond.L.Lock()
    defer q.cond.L.Unlock()
    
    // 如果队列为空且未关闭,则阻塞等待
    // 这里使用 for 循环而不是 if 语句,是为了防止虚假唤醒
    for len(q.queue) == 0 && !q.shuttingDown {
        q.cond.Wait()
    }
    
    // 如果队列为空,说明队列正在关闭
    // 返回 nil 和 true,通知调用者结束其 goroutine
    if len(q.queue) == 0 {
        return nil, true
    }

    // 从队列头部取出一个项目
    item = q.queue[0]
    
    // 将队列头部元素设为 nil,避免内存泄漏
    // 注释解释了底层数组仍然存在并引用此对象,所以需要显式设为 nil
    q.queue[0] = nil
    
    // 更新队列,移除头部元素
    q.queue = q.queue[1:]

    // 更新指标,记录获取操作
    q.metrics.get(item)

    // 将项目添加到 processing 集合中,标记为正在处理
    q.processing.insert(item)
    
    // 将项目从 dirty 集合中删除,因为它已经被取出处理
    q.dirty.delete(item)

    // 返回项目和 false,表示队列未关闭
    return item, false
}
Done方法

Done 方法用于标记项目处理完成

工作流程:

  1. 将项目从 processing 集合中删除
  2. 如果项目在 dirty 集合中,则将其重新添加到队列中
  3. 如果 processing 集合为空,则通知等待的 goroutine
代码语言:go复制
// Done 标记项目处理完成
// 如果在处理过程中项目被再次标记为 dirty,则会重新添加到队列中进行重新处理
func (q *Type) Done(item interface{}) {
    // 获取锁,确保线程安全
    q.cond.L.Lock()
    defer q.cond.L.Unlock()

    // 更新指标,记录完成操作
    q.metrics.done(item)

    // 将项目从 processing 集合中删除,表示处理完成
    q.processing.delete(item)
    
    // 如果项目在 dirty 集合中(说明在处理过程中被再次标记为需要处理)
    // 则将其重新添加到队列中,以便再次处理
    if q.dirty.has(item) {
        q.queue = append(q.queue, item)
        q.cond.Signal()  // 通知一个等待的 goroutine 有新项目可处理
    } else if q.processing.len() == 0 {
        // 如果 processing 集合为空(没有正在处理的项目)
        // 也发送信号,用于唤醒等待队列排空的 goroutine
        q.cond.Signal()
    }
}

四. 延时队列 - DelayingQueue

4.1 接口和结构体

延时队列在普通队列的基础上增加了延时功能:

代码语言:go复制
// DelayingInterface 是一个可以在稍后时间添加项目的接口
// 这使得在失败后重新入队项目更容易,而不会陷入热循环
type DelayingInterface interface {
    Interface
    // AddAfter 在指定的持续时间过后将项目添加到工作队列
    AddAfter(item interface{}, duration time.Duration)
}

// delayingType 包装了一个 Interface 并提供延迟重新入队功能
type delayingType struct {
    Interface

    // clock 跟踪延迟触发的时间
    clock clock.Clock

    // stopCh 让我们向等待循环发送关闭信号
    stopCh chan struct{}
    // stopOnce 保证我们只发送一次关闭信号
    stopOnce sync.Once

    // heartbeat 确保我们等待不超过 maxWait 就触发
    heartbeat clock.Ticker

    // waitingForAddCh 是一个缓冲通道,用于提供 waitingForAdd
    waitingForAddCh chan *waitFor

    // metrics 计算重试次数
    metrics retryMetrics
}

// waitFor 保存要添加的数据和应该添加的时间
type waitFor struct {
    data    t
    readyAt time.Time
    // 在优先级队列中的索引
    index int
}

延时队列使用优先级队列来管理延时任务:

代码语言:go复制
// waitForPriorityQueue 实现了 waitFor 项目的优先级队列
//
// waitForPriorityQueue 实现了 heap.Interface。时间上最早发生的项目
//(即具有最小 readyAt 的项目)位于根部(索引 0)。
// Peek 返回索引 0 处的最小项目。Pop 返回最小项目,
// 该项目已从队列中移除并由 container/heap 放置在索引 Len()-1 处。
// Push 在索引 Len() 处添加一个项目,container/heap 将其渗透到正确的位置。
type waitForPriorityQueue []*waitFor

func (pq waitForPriorityQueue) Len() int {
    return len(pq)
}
func (pq waitForPriorityQueue) Less(i, j int) bool {
    return pq[i].readyAt.Before(pq[j].readyAt)
}
func (pq waitForPriorityQueue) Swap(i, j int) {
    pq[i], pq[j] = pq[j], pq[i]
    pq[i].index = i
    pq[j].index = j
}

// Push 向队列添加一个项目。不应直接调用 Push;
// 而是使用 `heap.Push`。
func (pq *waitForPriorityQueue) Push(x interface{}) {
    n := len(*pq)
    item := x.(*waitFor)
    item.index = n
    *pq = append(*pq, item)
}

// Pop 从队列中移除一个项目。不应直接调用 Pop;
// 而是使用 `heap.Pop`。
func (pq *waitForPriorityQueue) Pop() interface{} {
    n := len(*pq)
    item := (*pq)[n-1]
    item.index = -1
    *pq = (*pq)[0:(n - 1)]
    return item
}

// Peek 返回队列开头的项目,而不移除项目或以其他方式改变队列。
// 可以安全地直接调用。
func (pq waitForPriorityQueue) Peek() interface{} {
    return pq[0]
}

4.2 核心方法:

AddAfter 方法

AddAfter 方法它允许在指定的延迟后将项目添加到队列中

工作流程:

  1. 首先检查队列是否正在关闭,如果是则直接返回
  2. 更新重试指标
  3. 如果延迟时间小于等于 0,则立即将项目添加到队列中
  4. 否则,创建一个 waitFor 结构体,包含项目数据和准备时间,并将其发送到 waitingForAddCh 通道
代码语言:go复制
func (q *delayingType) AddAfter(item interface{}, duration time.Duration) {
    // 首先检查队列是否正在关闭,如果是则直接返回
    if q.ShuttingDown() {
        return
    }

    // 更新重试指标,记录重试操作
    q.metrics.retry()

    // 如果延迟时间小于等于 0,则立即将项目添加到队列中
    if duration <= 0 {
        q.Add(item)
        return
    }

    // 使用 select 语句处理通道操作
    select {
    case <-q.stopCh:
        // 如果队列已关闭(stopCh 被关闭),则解除阻塞并返回
    case q.waitingForAddCh <- &waitFor{data: item, readyAt: q.clock.Now().Add(duration)}:
        // 创建一个 waitFor 结构体,包含项目数据和准备时间
        // 将其发送到 waitingForAddCh 通道,由 waitingLoop 协程处理
    }
}
waitingLoop 方法

waitingLoop 方法是延时队列的核心循环,它负责管理延时任务

工作流程:

初始化一个优先级队列 waitingForQueue 和一个映射 waitingEntryByData

无限循环中:

  1. 检查队列是否正在关闭,如果是则返回
  2. 获取当前时间
  3. 检查优先级队列中是否有准备好的项目,如果有则将其添加到队列中
  4. 为优先级队列中的第一个项目设置定时器
  5. 使用 select 语句等待以下事件之一:
    • 队列关闭信号
    • 心跳信号(确保不会等待太长时间)
    • 下一个项目准备好的信号
    • 新的延时任务到达
  6. 处理新的延时任务:如果已经准备好则立即添加到队列中,否则插入到优先级队列中
  7. 尝试排空通道,处理所有待处理的延时任务
代码语言:go复制
// waitingLoop 运行直到工作队列关闭,并保持检查要添加的项目列表
func (q *delayingType) waitingLoop() {
    // 注册 panic 处理函数,确保即使发生 panic 也能恢复
    defer utilruntime.HandleCrash()

    // 创建一个永不触发的通道,用于 select 语句中的占位符
    never := make(<-chan time.Time)

    // 声明一个定时器变量,用于等待下一个项目准备好的时间
    var nextReadyAtTimer clock.Timer

    // 初始化一个优先级队列,用于存储延时项目
    waitingForQueue := &waitForPriorityQueue{}
    // 使用 container/heap 包实现的优先级队列,确保最早准备好的项目在队列头部。
    heap.Init(waitingForQueue)

    // 创建一个映射,用于快速查找已存在的项目
    waitingEntryByData := map[t]*waitFor{}

    // 无限循环,直到队列关闭
    for {
        // 检查队列是否正在关闭,如果是则返回
        if q.Interface.ShuttingDown() {
            return
        }

        // 获取当前时间
        now := q.clock.Now()

        // 处理所有已准备好的项目
        for waitingForQueue.Len() > 0 {
            // 查看队列头部的项目
            entry := waitingForQueue.Peek().(*waitFor)
            // 如果项目还没准备好,则跳出循环
            if entry.readyAt.After(now) {
                break
            }

            // 从优先级队列中弹出项目
            entry = heap.Pop(waitingForQueue).(*waitFor)
            // 将项目添加到工作队列中
            q.Add(entry.data)
            // 从映射中删除项目
            delete(waitingEntryByData, entry.data)
        }

        // 设置下一个项目的定时器
        nextReadyAt := never
        if waitingForQueue.Len() > 0 {
            // 如果已有定时器,先停止它
            if nextReadyAtTimer != nil {
                nextReadyAtTimer.Stop()
            }
            // 获取队列头部的项目
            entry := waitingForQueue.Peek().(*waitFor)
            // 创建一个新的定时器,等待到项目的准备时间
            nextReadyAtTimer = q.clock.NewTimer(entry.readyAt.Sub(now))
            // 获取定时器的通道
            nextReadyAt = nextReadyAtTimer.C()
        }

        // 使用 select 语句等待多个事件
        select {
        case <-q.stopCh:
            // 如果收到停止信号,则返回
            return

        case <-q.heartbeat.C():
            // 如果收到心跳信号,继续循环
            // 心跳确保我们不会等待太长时间

        case <-nextReadyAt:
            // 如果下一个项目准备好了,继续循环

        case waitEntry := <-q.waitingForAddCh:
            // 如果收到新的延时项目
            if waitEntry.readyAt.After(q.clock.Now()) {
                // 如果项目还没准备好,插入到优先级队列中
                insert(waitingForQueue, waitingEntryByData, waitEntry)
            } else {
                // 如果项目已经准备好,直接添加到工作队列中
                q.Add(waitEntry.data)
            }

            // 尝试排空通道,处理所有待处理的延时任务
            drained := false
            for !drained {
                select {
                case waitEntry := <-q.waitingForAddCh:
                    // 处理更多的延时项目
                    if waitEntry.readyAt.After(q.clock.Now()) {
                        insert(waitingForQueue, waitingEntryByData, waitEntry)
                    } else {
                        q.Add(waitEntry.data)
                    }
                default:
                    // 如果通道为空,则标记为已排空
                    drained = true
                }
            }
        }
    }
}
insert 方法

insert 函数是延时队列中的辅助函数,用于将延时项目添加到优先级队列中,或者更新已存在项目的准备时间

工作流程:

  1. 检查项目是否已经存在于映射中
  2. 如果存在,则仅在新的准备时间早于现有准备时间的情况下更新时间
  3. 如果不存在,则将项目添加到优先级队列和映射中
代码语言:go复制
// insert 将条目添加到优先级队列,或者如果它已经存在于队列中,则更新 readyAt
func insert(q *waitForPriorityQueue, knownEntries map[t]*waitFor, entry *waitFor) {
    // 检查项目是否已经存在于映射中
    existing, exists := knownEntries[entry.data]
    
    // 如果项目已存在
    if exists {
        // 仅在新的准备时间早于现有准备时间的情况下更新时间
        // 这确保了项目总是以最早的准备时间被处理
        if existing.readyAt.After(entry.readyAt) {
            // 更新准备时间
            existing.readyAt = entry.readyAt
            // 修复堆,确保优先级队列的顺序正确
            heap.Fix(q, existing.index)
        }
        
        // 如果新的准备时间不早于现有准备时间,则不做任何更改
        return
    }
    
    // 如果项目不存在,则将其添加到优先级队列中
    heap.Push(q, entry)
    // 同时将项目添加到映射中,以便快速查找
    knownEntries[entry.data] = entry
}

五. 限速队列 - RateLimitingQueue

5.1 接口和结构体

限速队列在延时队列的基础上增加了限速功能,它使用 RateLimiter 来控制任务的处理速度,避免任务过快地重新加入队列。

代码语言:go复制
// RateLimitingInterface 是一个限制项目添加到队列速率的接口
type RateLimitingInterface interface {
    DelayingInterface

    // AddRateLimited 在速率限制器允许的情况下将项目添加到工作队列
    AddRateLimited(item interface{})

    // Forget 表示项目已完成重试
    // 无论是永久失败还是成功,我们都将停止速率限制器跟踪它
    // 这只清除 `rateLimiter`,你仍然需要在队列上调用 `Done`
    Forget(item interface{})

    // NumRequeues 返回项目被重新入队的次数
    NumRequeues(item interface{}) int
}

// rateLimitingType 包装了一个 RateLimiter Interface 并提供限速重新入队功能
type rateLimitingType struct {
    DelayingInterface

    rateLimiter RateLimiter
}

RateLimiter 接口定义了限速器的基本操作:

代码语言:go复制
// RateLimiter 控制项目重新入队的速率
type RateLimiter interface {
    // When 获取一个项目并返回它应该在多长时间后重新入队
    When(item interface{}) time.Duration
    // NumRequeues 返回项目被重新入队的次数
    NumRequeues(item interface{}) int
    // Forget 表示项目已完成重试,不再需要跟踪
    Forget(item interface{})
}

5.2 核心方法:

AddRateLimited 方法

AddRateLimited 方法是限速队列的核心方法,它根据限速器的策略来决定任务何时可以重新加入队列

工作流程:

  1. 调用 q.rateLimiter.When(item) 获取项目应该在多长时间后重新入队
  2. 调用 q.DelayingInterface.AddAfter 方法,将项目延迟指定时间后添加到队列中
代码语言:go复制
// AddRateLimited 根据速率限制器允许的时间将项目添加到队列
func (q *rateLimitingType) AddRateLimited(item interface{}) {
    q.DelayingInterface.AddAfter(item, q.rateLimiter.When(item))
}
Forget 方法

Forget 方法用于清除任务的重试记录

工作流程:

  1. 调用 rateLimiter.Forget(item) 清除项目的重试记录
代码语言:go复制
// Forget 表示项目已完成重试
func (q *rateLimitingType) Forget(item interface{}) {
    q.rateLimiter.Forget(item)
}
NumRequeues 方法

NumRequeues 方法用于获取任务被重新入队的次数

工作流程:

  1. 调用 rateLimiter.NumRequeues(item) 获取项目被重新入队的次数
代码语言:go复制
// NumRequeues 返回项目被重新入队的次数
func (q *rateLimitingType) NumRequeues(item interface{}) int {
    return q.rateLimiter.NumRequeues(item)
}

5.3 限速器实现

Kubernetes 提供了多种限速器实现,以适应不同的场景

限速器在util/workqueue/default_rate_limiters.go

ItemExponentialFailureRateLimiter

指数退避限速器,随着重试次数的增加,延迟时间呈指数增长

  • failuresLock :用于保护 failures 映射的互斥锁,确保线程安全。
  • failures :记录每个项目的重试次数。
  • baseDelay :基础延迟时间。
  • maxDelay :最大延迟时间,防止延迟时间过长。
代码语言:go复制
// ItemExponentialFailureRateLimiter 对每个项目进行指数退避
type ItemExponentialFailureRateLimiter struct {
    failuresLock sync.Mutex
    failures     map[interface{}]int

    baseDelaySec int
    maxDelaySec  int
}

// When 获取一个项目并返回它应该在多长时间后重新入队
func (r *ItemExponentialFailureRateLimiter) When(item interface{}) time.Duration {
    r.failuresLock.Lock()
    defer r.failuresLock.Unlock()

    // 获取项目的重试次数
    exp := r.failures[item]
    // 增加项目的重试次数
    r.failures[item] = r.failures[item] + 1

    // 计算指数退避延迟时间
    backoff := float64(r.baseDelay.Nanoseconds()) * math.Pow(2, float64(exp))
    // 防止延迟时间溢出
    if backoff > math.MaxInt64 {
        return r.maxDelay
    }

    // 将计算的延迟时间转换为 Duration 类型
    calculated := time.Duration(backoff)
    // 如果计算的延迟时间超过最大延迟时间,则返回最大延迟时间
    if calculated > r.maxDelay {
        return r.maxDelay
    }

    // 返回计算的延迟时间
    return calculated
}
ItemFastSlowRateLimiter

快慢限速器,前几次重试使用较短的延迟,之后使用较长的延迟

  • failuresLock :用于保护 failures 映射的互斥锁,确保线程安全。
  • failures :记录每个项目的重试次数。
  • maxFastAttempts :最大快速重试次数。
  • fastDelay :快速重试的延迟时间。
  • slowDelay :慢速重试的延迟时间。
代码语言:go复制
// ItemFastSlowRateLimiter 对前 N 次失败使用快速重试,之后使用慢速重试
type ItemFastSlowRateLimiter struct {
    failuresLock sync.Mutex
    failures     map[interface{}]int

    fastDelay time.Duration
    slowDelay time.Duration
    threshold int
}

// When 获取一个项目并返回它应该在多长时间后重新入队
func (r *ItemFastSlowRateLimiter) When(item interface{}) time.Duration {
    r.failuresLock.Lock()
    defer r.failuresLock.Unlock()

    // 增加项目的重试次数
    r.failures[item] = r.failures[item] + 1

    // 如果重试次数小于或等于最大快速重试次数,返回快速重试延迟
    if r.failures[item] <= r.maxFastAttempts {
        return r.fastDelay
    }

    // 否则,返回慢速重试延迟
    return r.slowDelay
}
MaxOfRateLimiter

组合多个限速器,使用其中延迟最长的一个

  • limiters :存储多个 RateLimiter 实例的切片。
代码语言:go复制
// MaxOfRateLimiter 使用多个限速器中延迟最长的一个
type MaxOfRateLimiter struct {
    limiters []RateLimiter
}

// When 获取一个项目并返回它应该在多长时间后重新入队
func (r *MaxOfRateLimiter) When(item interface{}) time.Duration {
    ret := time.Duration(0)
    for _, limiter := range r.limiters {
        // 调用每个限速器的 When 方法
        curr := limiter.When(item)
        // 记录最长的延迟时间
        if curr > ret {
            ret = curr
        }
    }

    // 返回最长的延迟时间
    return ret
}

六. 使用示例

6.1 Queue

代码语言:go复制
// 创建一个普通队列
queue := workqueue.New()

// 添加一个项目
queue.Add("task1")

// 获取一个项目
item, shutdown := queue.Get()
if !shutdown {
    // 处理项目
    fmt.Println("处理项目:", item)
    
    // 标记项目处理完成
    queue.Done(item)
}

// 关闭队列
queue.ShutDown()

6.2 DelayingQueue

代码语言:go复制
// 创建一个延时队列
delayingQueue := workqueue.NewDelayingQueue()

// 添加一个延时项目,5秒后处理
delayingQueue.AddAfter("task2", 5*time.Second)

// 获取一个项目
item, shutdown := delayingQueue.Get()
if !shutdown {
    // 处理项目
    fmt.Println("处理项目:", item)
    
    // 标记项目处理完成
    delayingQueue.Done(item)
}

// 关闭队列
delayingQueue.ShutDown()

6.3 RateLimitingQueue

代码语言:go复制
// 创建一个指数退避限速器
rateLimiter := workqueue.DefaultControllerRateLimiter()

// 创建一个限速队列
rateLimitingQueue := workqueue.NewRateLimitingQueue(rateLimiter)

// 添加一个限速项目
rateLimitingQueue.AddRateLimited("task3")

// 获取一个项目
item, shutdown := rateLimitingQueue.Get()
if !shutdown {
    // 处理项目
    fmt.Println("处理项目:", item)
    
    // 如果处理成功,忘记项目的重试历史
    rateLimitingQueue.Forget(item)
    
    // 标记项目处理完成
    rateLimitingQueue.Done(item)
}

// 如果处理失败,重新添加到队列
if processFailed {
    rateLimitingQueue.AddRateLimited("task3")
}

// 关闭队列
rateLimitingQueue.ShutDown()

6.4 实际场景

在 Kubernetes 控制器中,通常会使用限速队列来处理资源的变更事件:

代码语言:go复制
// 创建一个限速队列
queue := workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter())

// 添加事件处理函数
informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
    AddFunc: func(obj interface{}) {
        key, err := cache.MetaNamespaceKeyFunc(obj)
        if err == nil {
            queue.Add(key)
        }
    },
    UpdateFunc: func(old, new interface{}) {
        key, err := cache.MetaNamespaceKeyFunc(new)
        if err == nil {
            queue.Add(key)
        }
    },
    DeleteFunc: func(obj interface{}) {
        key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(obj)
        if err == nil {
            queue.Add(key)
        }
    },
})

// 处理队列中的项目
for {
    // 获取一个项目
    key, quit := queue.Get()
    if quit {
        return
    }
    
    // 处理完成后调用 Done
    defer queue.Done(key)
    
    // 处理项目
    err := processItem(key.(string))
    if err == nil {
        // 处理成功,忘记项目的重试历史
        queue.Forget(key)
    } else if queue.NumRequeues(key) < maxRetries {
        // 处理失败,重新添加到队列
        queue.AddRateLimited(key)
    } else {
        // 超过最大重试次数,放弃处理
        queue.Forget(key)
        utilruntime.HandleError(err)
    }
}

通过这种方式,Kubernetes 控制器可以有效地处理资源变更事件,并在处理失败时进行有限次数的重试,避免因频繁重试导致的系统压力。

本文标签: k8s client