Error message here!

Hide Error message here!

忘记密码?

Error message here!

请输入正确邮箱

Hide Error message here!

密码丢失?请输入您的电子邮件地址。您将收到一个重设密码链接。

Error message here!

返回登录

Close

Implementation of time wheel in go language

luozhiyun 2021-02-13 17:15:34 阅读数:22 评论数:0 点赞数:0 收藏数:0

Please state the source of reprint ~, This article was published at luozhiyun The blog of :https://www.luozhiyun.com/archives/444

Recently, there is a need at work , In short, millions of timed tasks will be created in a short time , When it is created, the corresponding amount will be added up , Prevent overselling , It will take half an hour to check the data , If the data is not correct, you need to reduce the added amount back .

If this requirement is used Go Built in Timer The performance is relatively low , because Timer It's implemented with minimal heap , The time complexity of creation and deletion is O(log n). If you use the time wheel, it's O(1) The performance will be much better .

For the time wheel , I've written one before java Version of the time wheel algorithm analysis :https://www.luozhiyun.com/archives/59, Let's have a look at Go The realization of time wheel of language , By the way, you can also compare the differences between the two if you are interested , And my level of writing articles and whether I have improved more than a year ago , Ha ha ha .

The use of time wheel is very extensive , stay Netty、Akka、Quartz、ZooKeeper、Kafka There are traces of time wheel in the components . The following is used Go The time wheel of realization is based on Kafka It's based on the prototype , Complete code :https://github.com/devYun/timingwheel.

Introduce

Simple time wheel

What stores tasks in the time wheel is a circular queue , The bottom layer is implemented by array , Each element in the array can hold a list of scheduled tasks . The timed task list is a circular two-way linked list , Each item in the list represents a scheduled task item , It encapsulates real timed tasks .

The time wheel is made up of several time slots , Each time grid represents the basic time span of the current time wheel (tickMs). The number of time slots in the time wheel is fixed , You can use wheelSize To express , So the overall time span of the whole time round (interval) You can use the formula tickMs×wheelSize calculated .

The time wheel also has a dial pointer (currentTime), Used to indicate the current time of the time wheel ,currentTime yes tickMs Integer multiple .currentTime The point is to indicate the due time grid , Represents all tasks in the linked list corresponding to the time lattice to be processed .

As shown in the figure below is a tickMs by 1s,wheelSize be equal to 10 Time wheel of , Inside each grid is a list of timed tasks , There are real task items in the linked list :

taskList

Initially, the dial pointer currentTime Point to the time frame 0, If time turns tickMs by 1ms And wheelSize be equal to 10, that interval It is equal to 10s. As shown in the figure below, there is a timing of 2s When the task is inserted, it will be stored in the time grid of 2 In the task list of , Mark... In red . As time goes on , The pointer currentTime Keep moving forward , If it's over 2s, that currentTime It will point to the time grid 2 The location of , The task list of this time grid will be obtained and processed .

timewheel

If the current pointer currentTime Pointing to 2, At this point, if you insert a 9s Come in , Then the new task will take the original time lattice list , It will be stored in the time grid 1 in

timewheelAdd9S

All the time wheels mentioned here are simple time wheels , There is only one floor , The overall time range is currentTime and currentTime+interval Between . If there is one now 15s The timing task is to restart a time wheel , Set a time span of at least 15s We'll have enough time . But there is no bottom line for such expansion , If you need one 1 Ten thousand second time wheel , Then you need such a large array to store , Not only does it take up a lot of memory space , And it will be inefficient because it needs to traverse such a large array .

So the concept of hierarchical time wheel is introduced .

Hierarchical time wheel

This is a two-tier time wheel , The second time wheel is also made up of 10 It's a time grid , The span of each time grid is 10s. On the second floor of the time wheel tickMs For the first time round interval, namely 10s. Every time round wheelSize Is constant , All are 10, So the overall time span of the time wheel on the second floor interval by 100s.

The figure shows the expiration time range of each time grid , We can see it clearly , The second level of the time wheel 0 The expiration time range of each time slot is [0,9]. in other words , One time lattice of the second level time wheel can represent all of the time wheels of the first level (10 individual ) Time frame ;

If you add a 15s The task of , So when the first time wheel can't hold it , Enter the second time wheel , And insert it to the expiration time of [10,19] In the time frame of .

timewheellevel2

Over time , When the original 15s The task of 5s When , Here's a time round downgrade operation , At this point, the overall time span of the first level time wheel is enough , This task is added to the first level when the time round expires at 5 In the time frame of , And then experience 5s after , This task is really due , Finally execute the corresponding expiration operation .

Code implementation

Because of our Go The language version of the time wheel code is modeled on Kafka Written , So in the concrete implementation of the time wheel TimingWheel There are also some small details :

  • Each linked list in the time grid of the time wheel will have one root Nodes are used to simplify the boundary conditions . It's an additional linked list node , This node acts as the first node , It doesn't store anything in its range , Just for the convenience of operation ;
  • Except for the first time wheel , The starting time of the other high-level time rounds (startMs) All are set to the first round when creating this layer time round currentTime. Each level currentTime It has to be tickMs Integer multiple , If not, it will currentTime Trim to tickMs Integer multiple . The pruning method is :currentTime = startMs - (startMs % tickMs);
  • Kafka The timer in just holds TimingWheel The first layer of time wheel reference , They don't directly hold other senior time wheels , But there is a reference to each layer of time wheel (overflowWheel) Applications that point to a higher level ;
  • Kafka The timer in uses DelayQueue To help advance the time wheel . In the operation, every linked list in the time grid will be added DelayQueue,DelayQueue According to the expiration time corresponding to the time wheel expiration Sort by , The shortest expiration My task will be placed in DelayQueue Team leader of , Get... Through a separate thread DelayQueue Tasks due in ;

Structure

type TimingWheel struct {
// time span , In milliseconds
tick int64 // in milliseconds
// The number of time rounds
wheelSize int64
// Total span
interval int64 // in milliseconds
// The current pointer points to time
currentTime int64 // in milliseconds
// Time grid list
buckets []*bucket
// Delay queue
queue *delayqueue.DelayQueue
// Superior time wheel reference
overflowWheel unsafe.Pointer // type: *TimingWheel
exitC chan struct{}
waitGroup waitGroupWrapper
}

tick、wheelSize、interval、currentTime It's easy to understand ,buckets The field represents the time grid list ,queue It's a delayed queue , All tasks are triggered by delay queues ,overflowWheel It's a reference to the upper time wheel .

type bucket struct {
// The expiration time of the task
expiration int64
mu sync.Mutex
// A task queue with the same expiration time
timers *list.List
}

bucket It actually encapsulates the task queue in the time grid , It contains tasks with the same expiration date , After the expiration, the queue will be timers Take it out and deal with it . What's interesting here is that there will be multiple threads accessing concurrently bucket, So you need to use atomic classes to get int64 The value of a , In order to ensure 32 Bit system 64 Bit data consistency , Need to carry out 64 Bit alignment . See this article for details :https://www.luozhiyun.com/archives/429, It's about thinking about memory alignment .

type Timer struct {
// Due time
expiration int64 // in milliseconds
// Specific tasks to be performed
task func()
// Timer Where bucket The pointer to
b unsafe.Pointer // type: *bucket
// bucket The corresponding element in the list
element *list.Element
}

Timer Is the smallest execution unit of the time wheel , It's the encapsulation of timing tasks , When it expires, it will call task To perform the task .

Group 37

Initialize the time wheel

For example, now initialize a tick yes 1s,wheelSize yes 10 Time wheel of :

func main() {
tw := timingwheel.NewTimingWheel(time.Second, 10)
tw.Start()
}
func NewTimingWheel(tick time.Duration, wheelSize int64) *TimingWheel {
// Will the incoming tick Into milliseconds
tickMs := int64(tick / time.Millisecond)
// If it's less than zero , that panic
if tickMs <= 0 {
panic(errors.New("tick must be greater than or equal to 1ms"))
}
// Set the start time
startMs := timeToMs(time.Now().UTC())
// initialization TimingWheel
return newTimingWheel(
tickMs,
wheelSize,
startMs,
delayqueue.New(int(wheelSize)),
)
}
func newTimingWheel(tickMs int64, wheelSize int64, startMs int64, queue *delayqueue.DelayQueue) *TimingWheel {
// initialization buckets Size
buckets := make([]*bucket, wheelSize)
for i := range buckets {
buckets[i] = newBucket()
}
// Instantiation TimingWheel
return &TimingWheel{
tick: tickMs,
wheelSize: wheelSize,
// currentTime Must be tickMs Multiple , So here we use truncate Prune
currentTime: truncate(startMs, tickMs),
interval: tickMs * wheelSize,
buckets: buckets,
queue: queue,
exitC: make(chan struct{}),
}
}

Initialization is very simple , You can take a look at the code comments above .

Start the time wheel

So let's see start Method :

func (tw *TimingWheel) Start() {
// Poll Will execute an infinite loop , Put expired elements into queue Of C In the pipeline
tw.waitGroup.Wrap(func() {
tw.queue.Poll(tw.exitC, func() int64 {
return timeToMs(time.Now().UTC())
})
})
// Open infinite loop to get queue in C The data of
tw.waitGroup.Wrap(func() {
for {
select {
// The data coming out of the queue is due bucket
case elem := <-tw.queue.C:
b := elem.(*bucket)
// The time wheel will change the current time currentTime Move forward to bucket The expiration date of
tw.advanceClock(b.Expiration())
// Take out bucket Queue data , And call addOrRun Method execution
b.Flush(tw.addOrRun)
case <-tw.exitC:
return
}
}
})
}

It's used here util Packaged one Wrap Method , This method will have a goroutines Execute the incoming function asynchronously , Specific can go to the link I gave above to see the source code .

Start Method will start two goroutines. first goroutines To call the delay queue queue Of Poll Method , This method will always loop to get the data in the queue , Then put the expired data into queue Of C In the pipeline ; the second goroutines It's going to go through an infinite loop queue in C The data of , If C There is data in the report indicating that it has expired , Then it will call advanceClock Method change the current time currentTime Move forward to bucket The expiration date of , Then call Flush Method take out bucket The queue in , And call addOrRun Method execution .

func (tw *TimingWheel) advanceClock(expiration int64) {
currentTime := atomic.LoadInt64(&tw.currentTime)
// The expiration time is greater than or equal to ( current time +tick)
if expiration >= currentTime+tw.tick {
// take currentTime Set to expiration, To advance currentTime
currentTime = truncate(expiration, tw.tick)
atomic.StoreInt64(&tw.currentTime, currentTime)
// Try to advance the clock of the overflow wheel if present
// If there is an upper time wheel , Then recursively call the reference of the upper time wheel
overflowWheel := atomic.LoadPointer(&tw.overflowWheel)
if overflowWheel != nil {
(*TimingWheel)(overflowWheel).advanceClock(currentTime)
}
}
}

advanceClock Method will be reset according to the expiration time currentTime, So as to advance the wheel of time .

func (b *bucket) Flush(reinsert func(*Timer)) {
var ts []*Timer
b.mu.Lock()
// Cycle to get bucket Queue node
for e := b.timers.Front(); e != nil; {
next := e.Next()
t := e.Value.(*Timer)
// Remove the head node bucket queue
b.remove(t)
ts = append(ts, t)
e = next
}
b.mu.Unlock()
b.SetExpiration(-1) // TODO: Improve the coordination with b.Add()
for _, t := range ts {
reinsert(t)
}
}

Flush The method will be based on bucket Inside timers The list is traversed and inserted into ts Array , And then call reinsert Method , This is called addOrRun Method .

func (tw *TimingWheel) addOrRun(t *Timer) {
// If it has expired , So direct execution
if !tw.add(t) {
// Asynchronous execution of timed tasks
go t.task()
}
}

addOrRun Would call add Method to check the incoming timing task Timer Has it expired , If it's due, call asynchronously task Method directly .add Methods we will continue to analyze .

Whole start The execution process is shown in the figure :

timewheel_start

  1. start Method to start a goroutines call poll To deal with it DelayQueue Data due in , And put the data in the pipeline C in ;
  2. start Method to start the second goroutines Method loops to get DelayQueue Middle pipe C The data of , The Conduit C It's actually a bucket, Then traverse bucket Of timers list , If the task is due , So asynchronous execution , If it doesn't expire, put it back into DelayQueue in .

add task

func main() {
tw := timingwheel.NewTimingWheel(time.Second, 10)
tw.Start()
// Add tasks
tw.AfterFunc(time.Second*15, func() {
fmt.Println("The timer fires")
exitC <- time.Now().UTC()
})
}

We go through AfterFunc Method to add a 15s Scheduled tasks for , If it's due , So execute the incoming function .

func (tw *TimingWheel) AfterFunc(d time.Duration, f func()) *Timer {
t := &Timer{
expiration: timeToMs(time.Now().UTC().Add(d)),
task: f,
}
tw.addOrRun(t)
return t
}

AfterFunc Method according to the task expiration time , And the functions that need to be executed after expiration are encapsulated into Timer, call addOrRun Method .addOrRun We've seen it above , According to the due time, we will decide whether we need to perform the scheduled task .

So let's see add Method :

func (tw *TimingWheel) add(t *Timer) bool {
currentTime := atomic.LoadInt64(&tw.currentTime)
// It's overdue
if t.expiration < currentTime+tw.tick {
// Already expired
return false
// The expiration time is in the first ring
} else if t.expiration < currentTime+tw.interval {
// Put it into its own bucket
// Get the position of the time wheel
virtualID := t.expiration / tw.tick
b := tw.buckets[virtualID%tw.wheelSize]
// Put task into bucket In line
b.Add(t)
// If it's the same time , Then the return false, Prevent being inserted into the queue many times
if b.SetExpiration(virtualID * tw.tick) {
// Will be bucket Join the delay queue
tw.queue.Offer(b, b.Expiration())
}
return true
} else {
// Out of the interval. Put it into the overflow wheel
// If the put in expiration time exceeds the first layer time round , Then put it on the next level
overflowWheel := atomic.LoadPointer(&tw.overflowWheel)
if overflowWheel == nil {
atomic.CompareAndSwapPointer(
&tw.overflowWheel,
nil,
// It should be noted that , here tick Turned into interval
unsafe.Pointer(newTimingWheel(
tw.interval,
tw.wheelSize,
currentTime,
tw.queue,
)),
)
overflowWheel = atomic.LoadPointer(&tw.overflowWheel)
}
// Recursion up
return (*TimingWheel)(overflowWheel).add(t)
}
}

add The method is divided into three parts according to the due time , The first part is less than the current time +tick, It means it has expired , Then the return false Just carry out the task ;

The second part of the judgment will be based on expiration Is it less than the span of the time wheel , If it is less than, it means that the timing task can be put into the current time round , Find... By taking the mold buckets And put it into the bucket In line ,SetExpiration Method will determine whether the delay queue has been executed according to the parameters passed in Offer Method , Prevent repeated insertion ;

The third part indicates that the time span of the timing task exceeds the current time round , Need to upgrade to the next level of the time wheel . It should be noted that , The time wheel of the upper layer tick It's the current time round interval, The delay queue is still the same , Then set it to pointer overflowWheel, And call add Method recursion to the upper level .

So far, the time wheel is over , But there's something else to pay attention to , We're using the time wheel above , Used DelayQueue Add ring queue to realize the time wheel . For the insertion and deletion of timed task items ,TimingWheel The time complexity is O(1), stay DelayQueue The queues in use priority queues , The time complexity is O(log n), But because of buckets The list is actually very small , So it doesn't affect performance .

Reference

https://github.com/RussellLuo/timingwheel

https://zhuanlan.zhihu.com/p/121483218

https://github.com/apache/kafka/tree/3cdc78e6bb1f83973a14ce1550fe3874f7348b05/core/src/main/scala/kafka/utils/timer

Copyright statement
In this paper,the author:[luozhiyun],Reprint please bring the original link, thank you

编程之旅,人生之路,不止于编程,还有诗和远方。
阅代码原理,看框架知识,学企业实践;
赏诗词,读日记,踏人生之路,观世界之行;