跳到主要内容

定时任务

引言

很多时候我们需要定时处理一些任务,我们今天来学习一下接入到gin框架 选用github.com/robfig/cron/v3的原因 github star 6.7k 作者一直在维护 常见写法 源码解读 如何保证关闭/重启的时候,正在运行的任务要完成后才能继续执行关闭/重启 案例代码地址

1、快速使用 go mod init github.com/18211167516/go-lib/cron/rebfig_cron 复制代码

新建main.go文件

package main

import (
"fmt"
"time"

"github.com/robfig/cron/v3"
)

type testJob struct{}

//实现了 type Job interface {Run()}
func (t testJob) Run() {
fmt.Println("i.m test job")
}

func main() {
c := cron.New()

c.AddFunc("@every 1s", func() {
fmt.Println("tick every 1 second")
})

c.AddJob("* * * * *", testJob{})

c.Start()

select {}
}

复制代码

创建corn对象,用于管理定时任务 使用addFunc添加定时任务 Start方法启动定时任务。启动一个新goroutine select 防止主goroutine 退出

2、New()的选项

实际上是返回 type Option func(*Cron)

目前有内置5个选项 2.1 WithLocation 指定时区 loc,_ := time.LoadLocation("America/Los_Angeles") c := cron.New(cron.WithLocation(loc)) 复制代码 2.2 WithSeconds 支持粒度到秒级(默认是和crontab一样分钟级)

实际上WithSeconds就是使用WithParser实现

c:=cron.New(cron.WithSeconds())
//每2秒执行一次
c.AddFunc("*/2 * * * * *", func() {
file, _ := os.OpenFile("log.txt", os.O_APPEND|os.O_CREATE, 0755)
defer file.Close()
fmt.Println("test 11")
file.Write([]byte("test 111\r\n"))
})
复制代码

2.3 WithParser 使用自定义解析器

实现接口

type ScheduleParser interface {

Parse(spec string) (Schedule, error)
复制代码
}
paeser:= cron.NewParser(
cron.Second | cron.Minute | cron.Hour | cron.Dom | cron.Month | cron.Dow | cron.Descriptor,
)

c := cron.New(cron.WithParser(paeser))
c.AddFunc("1 * * * * *", func () {
fmt.Println("every 1 second")
})
复制代码

2.4 WithChain Job包装器(Job中间件)

默认3个中间件

Recover 捕获内部Job产生的 panic; DelayIfStillRunning 触发时,如果上一次任务还未执行完成(耗时太长),则等待上一次任务完成之后再执行 SkipIfStillRunning 触发时,如果上一次任务还未完成,则跳过此次执行 复制代码

使用

type testJob struct{}

func (t testJob) Run() {
panic("test job")
//fmt.Println("i.m test job")
}

logger := cron.VerbosePrintfLogger(log.New(io.MultiWriter(f, os.Stdout), "cron: ", log.LstdFlags))
//全局
c := cron.New(cron.WithChain(cron.Recover(logger)))

c.addFunc("* * * * * *",func(){
panic("1232132")
})

//局部中间件
c.AddJob("@every 1s", cron.NewChain(cron.Recover(cron.DefaultLogger)).Then(testJob{}))

复制代码

主要实现

//先执行Job中间件在执行具体Job
func (c Chain) Then(j Job) Job {
for i := range c.wrappers {
j = c.wrappers[len(c.wrappers)-i-1](j)
}
return j
}

复制代码 2.5 WithLogger 自定义日志Logger

先看默认的Logger

var DefaultLogger Logger = PrintfLogger(log.New(os.Stdout, "cron: ", log.LstdFlags))

func PrintfLogger(l interface{ Printf(string, ...interface{}) }) Logger {
return printfLogger{l, false}
}

func VerbosePrintfLogger(l interface{ Printf(string, ...interface{}) }) Logger {
return printfLogger{l, true}
}

type printfLogger struct {
logger interface{ Printf(string, ...interface{}) }
logInfo bool
}

func (pl printfLogger) Info(msg string, keysAndValues ...interface{}) {
if pl.logInfo {
keysAndValues = formatTimes(keysAndValues)
pl.logger.Printf(
formatString(len(keysAndValues)),
append([]interface{}{msg}, keysAndValues...)...)
}
}

func (pl printfLogger) Error(err error, msg string, keysAndValues ...interface{}) {
keysAndValues = formatTimes(keysAndValues)
pl.logger.Printf(
formatString(len(keysAndValues)+2),
append([]interface{}{msg, "error", err}, keysAndValues...)...)
}
复制代码

只要实现Logger接口就可以自定义日志Logger

  type Logger interface {
// Info logs routine messages about cron's operation.
Info(msg string, keysAndValues ...interface{})
// Error logs an error condition.
Error(err error, msg string, keysAndValues ...interface{})
}
复制代码
//实现控制台和文件日志双写
f, _ := os.Create("cron.log")

c := cron.New(cron.WithSeconds(), cron.WithLogger(
cron.VerbosePrintfLogger(log.New(io.MultiWriter(f, os.Stdout), "cron: ", log.LstdFlags))))

c.AddFunc("*/2 * * * * *", func() {
fmt.Println("test 11")
})
复制代码
3、创建任务
3.1 addFunc()
c := cron.New()

c.AddFunc("@every 1s", func() {
fmt.Println("tick every 1 second")
})

c.Start()
复制代码
3.2 addJob()
type testJob struct{}

//实现了 type Job interface {Run()}
func (t testJob) Run() {
fmt.Println("i.m test job")
}

c.AddJob("* * * * *", testJob{})

c.Start()
复制代码
3.3 源码解读

step1 AddFunc基于addJob实现


func (c *Cron) AddFunc(spec string, cmd func()) (EntryID, error) {
return c.AddJob(spec, FuncJob(cmd))
}

// AddJob adds a Job to the Cron to be run on the given schedule.
// The spec is parsed using the time zone of this Cron instance as the default.
// An opaque ID is returned that can be used to later remove it.
func (c *Cron) AddJob(spec string, cmd Job) (EntryID, error) {
//解析时间格式
schedule, err := c.parser.Parse(spec)
if err != nil {
return 0, err
}
return c.Schedule(schedule, cmd), nil
}
复制代码

step2

func (c *Cron) Schedule(schedule Schedule, cmd Job) EntryID {
//并发锁
c.runningMu.Lock()
defer c.runningMu.Unlock()
//自增ID
c.nextID++
entry := &Entry{
ID: c.nextID,
Schedule: schedule,
WrappedJob: c.chain.Then(cmd),
Job: cmd,
}
//在服务已经启动的情况下新增任务(不知道为什么有这种操作)
if !c.running {
c.entries = append(c.entries, entry)
} else {
c.add <- entry
}
return entry.ID
}
复制代码

step3 最终运行 c.Start(),核心是cron,run()方法


遍历任务,获取每个任务下次执行时间

for _, entry := range c.entries {
entry.Next = entry.Schedule.Next(now)
c.logger.Info("schedule", "now", now, "entry", entry.ID, "next", entry.Next)
}
复制代码

排序启动定时器

//无线循环
for {
// 任务排序
sort.Sort(byTime(c.entries))

var timer *time.Timer
//设置定时器(多少时间后执行)
if len(c.entries) == 0 || c.entries[0].Next.IsZero() {
// If there are no entries yet, just sleep - it still handles new entries
// and stop requests.
timer = time.NewTimer(100000 * time.Hour)
} else {
timer = time.NewTimer(c.entries[0].Next.Sub(now))
}
}
复制代码

监听channel

for {
select {
//定时器触发
case now = <-timer.C:
now = now.In(c.location)
c.logger.Info("wake", "now", now)

// Run every entry whose next time was less than now
//遍历全部任务
for _, e := range c.entries {
// 判断如果第一个执行时间少于当前时间或者时间零点,则跳出不执行
if e.Next.After(now) || e.Next.IsZero() {
break
}
//执行Job包装器
c.startJob(e.WrappedJob)
//更新
e.Prev = e.Next
e.Next = e.Schedule.Next(now)
c.logger.Info("run", "now", now, "entry", e.ID, "next", e.Next)
}
// 运行后新增任务
case newEntry := <-c.add:
timer.Stop()
now = c.now()
newEntry.Next = newEntry.Schedule.Next(now)
c.entries = append(c.entries, newEntry)
c.logger.Info("added", "now", now, "entry", newEntry.ID, "next", newEntry.Next)
//
case replyChan := <-c.snapshot:
replyChan <- c.entrySnapshot()
continue

//服务停止

case <-c.stop:
//停止服务
timer.Stop()
c.logger.Info("stop")
return
//移除任务会影响服务停止
case id := <-c.remove:
timer.Stop()
now = c.now()
c.removeEntry(id)
c.logger.Info("removed", "entry", id)
}

break
}

复制代码

time.Stop()


使用 sync.WaitGroup来实现

func (c *Cron) Stop() context.Context {
c.runningMu.Lock()
defer c.runningMu.Unlock()
if c.running {
c.stop <- struct{}{}
c.running = false
}
//后台阻塞阻塞代码
ctx, cancel := context.WithCancel(context.Background())
go func() {
//计数器为0
c.jobWaiter.Wait()
//取消阻塞
cancel()
}()
return ctx
}
复制代码

执行任务

func (c *Cron) startJob(j Job) {
//计数器加一
c.jobWaiter.Add(1)
go func() {
defer c.jobWaiter.Done()
j.Run()
}()
}
复制代码
4. 时间格式

预定义格式

@yearly:也可以写作@annually,表示每年第一天的 0 点。等价于0 0 1 1 *;
@monthly:表示每月第一天的 0 点。等价于0 0 1 * *;
@weekly:表示每周第一天的 0 点,注意第一天为周日,即周六结束,周日开始的那个 0 点。等价于0 0 * * 0;
@daily:也可以写作@midnight,表示每天 0 点。等价于0 0 * * *;
@hourly:表示每小时的开始。等价于0 * * * *。
复制代码

固定间隔格式

@every <duration> 每个duration触发
c.AddFunc("@every 1s", func() {
fmt.Println("test 111")
})

c.AddFunc(fmt.Sprint("@every ", time.Duration(1)*time.Second), func() {
fmt.Println("test 222")
})
复制代码

自定义时间格式


默认支持5位到分钟级(等同于crontab)

c.AddFunc("* * * * *", func () {
fmt.Println("every 1 分钟")
})

//支持秒
c := cron.New(cron.WithSeconds())
c.AddFunc("* * * * * *", func () {
fmt.Println("every 1 秒钟")
})
  1. 总结

麻雀虽小五脏俱全,因为golang特性,天生支持线程安全,保证任务执行的完整性 如果考虑将任务持久化层也抽离,可能更利于日后扩展分布式,3个中间件个人认为应该默认就加上(我想没有人愿意因为某个任务报错导致全部任务终止)

作者:Rocket 链接:https://juejin.cn/post/6885649228824051726 来源:掘金 著作权归作者所有。商业转载请联系作者获得授权,非商业转载请注明出处。