介绍
准备工作
- 运行一台 redis 服务器(建议实用docker)
- 启动图形化 webui 监控端项目
在使用 asynq 往 redis 中放入异步任务之前, 先将图形化客户端跑起来, 这样方便我们看到效果
sh
mkdir asynqmon-webui && cd -
mkdir redis_data && touch docker-compose.yaml
go mod init asynqmon_webui
go get -u github.com/hibiken/asynq
go get -u github.com/hibiken/asynqmonyaml
services:
redis:
container_name: redis8
image: redis:8
restart: always
ports:
- 6379:6379
volumes:
- ./redis_data:/datago
package main
import (
"fmt"
"log"
"net/http"
"github.com/hibiken/asynq"
"github.com/hibiken/asynqmon"
)
func main() {
h := asynqmon.New(asynqmon.Options{
RedisConnOpt: asynq.RedisClientOpt{Addr: ":6379"},
})
http.Handle("/", h)
port := ":8080"
mess := fmt.Sprintf("asynqmon webui running on http://127.0.0.1%s", port)
fmt.Println(mess)
log.Fatal(http.ListenAndServe(port, nil))
}应用场景
你可能会遇到这样的场景
- 为了防止机器人注册, 用户表设计了一个
is_active字段 - 当用户注册成功后, 需要
发送一个激活账户的邮件, 邮件内部带有激活账号的链接 - 需要
人手动点击访问激活链接才能激活账号, 以此来过滤机器人账号 - 此时
发送激活账户邮件是一个比较耗费时间的任务, 我希望它异步的执行, 不要影响注册接口的快速响应
快速开始
sh
mkdir asynq-demo & cd -
go mod init asynq_demo
touch main.go
# 安装依赖
go get github.com/gofiber/fiber/v3
go get github.com/hibiken/asynq启动服务并测试
go
package main
import (
"fmt"
"time"
"github.com/gofiber/fiber/v3"
)
// 模拟发送邮件任务
// 1.查询数据库数据
// 2.判断这个账户今天是否到达最大发送邮件数量(防止盗刷)
// 3.生成 "激活链接"
// 4.调用邮件服务API/直接发送邮件
func MockSendActiveMail() {
time.Sleep(time.Second * 5)
fmt.Println("邮件发送成功")
}
func main() {
app := fiber.New()
// 1.模拟注册接口
app.Get("/register", func(c fiber.Ctx) error {
MockSendActiveMail() // 由于发送邮件比较慢, 所以导致这个接口的响应比较慢
return c.JSON(fiber.Map{
"success": true,
"message": "模拟注册接口",
"results": "注册成功",
})
})
// 2.模拟激活账户接口
app.Get("/active", func(c fiber.Ctx) error {
return c.JSON(fiber.Map{
"success": true,
"message": "模拟激活账户接口",
"results": "激活成功",
})
})
err := app.Listen(":3000")
if err != nil {
log.Fatal(err)
}
}将发送邮件变为异步任务保证接口响应速度
go
package main
import (
"encoding/json"
"fmt"
"log"
"github.com/gofiber/fiber/v3"
"github.com/hibiken/asynq"
)
// 模拟发送激活邮件
// 此时它并不会直接去执行, 而是将数据保存到 redis 中
// 等待空闲时, 由 hibiken/asynq 这个包的消费者去自动执行
func MockSendActiveMail() { // 生产者: 生成异步任务/放入任务队列
// 1.创建客户端(设置 redis 链接信息)
client := asynq.NewClient(asynq.RedisClientOpt{
Addr: "127.0.0.1:6379",
})
defer client.Close()
// 2.携带执行异步任务需要的信息
taskPayload, err := json.Marshal(map[string]string{
"userId": "10001", // 用户 id
"email": "test@example.com", // 目标邮箱地址
})
if err != nil {
fmt.Println("任务信息生成失败", err)
return
}
// 3.将任务放入队列
task := asynq.NewTask("send_active_mail", taskPayload)
info, err := client.Enqueue(task)
if err != nil {
fmt.Println("任务入列失败", err)
return
}
fmt.Println("任务入列成功", info)
}
func main() {
app := fiber.New()
// 1.模拟注册接口
app.Get("/register", func(c fiber.Ctx) error {
MockSendActiveMail()
// 每次响应都很慢
return c.JSON(fiber.Map{
"success": true,
"message": "模拟注册接口",
"results": "注册成功",
})
})
// 2.模拟激活账户接口
app.Get("/active", func(c fiber.Ctx) error {
return c.JSON(fiber.Map{
"success": true,
"message": "模拟激活账户接口",
"results": "激活成功",
})
})
err := app.Listen(":3000")
if err != nil {
log.Fatal(err)
}
}实现队列消费者
此时只是将任务放入了 redis 队列, 它并不会执行
go
package main
import (
"context"
"encoding/json"
"fmt"
"log"
"time"
"github.com/hibiken/asynq"
)
var TaskPayload struct {
UserID string `json:"userId"`
Email string `json:"email"`
}
// 发送激活邮件任务处理器
func sendActiveMailHandler(ctx context.Context, t *asynq.Task) error {
if err := json.Unmarshal(t.Payload(), &TaskPayload); err != nil {
return fmt.Errorf(">>> 反序列化任务负载失败: %w", err)
}
// 模拟发送邮件(实际项目中替换为真实邮件逻辑)
fmt.Printf(">>> 正在向 %s (用户ID: %s) 发送激活邮件...\n", TaskPayload.Email, TaskPayload.UserID)
time.Sleep(5 * time.Second) // 模拟耗时操作
fmt.Println(">>> 邮件发送完成 <<<")
return nil
}
// 全局 worker server 实例
var workerServer *asynq.Server
// 启动 worker
func StartWorker() {
workerServer = asynq.NewServer(
asynq.RedisClientOpt{
Addr: "127.0.0.1:6379", // 链接 redis 服务器选项
},
asynq.Config{
Concurrency: 5, // 可扩展:Logger, RetryDelayFunc 等
},
)
mux := asynq.NewServeMux()
mux.HandleFunc("send_active_mail", sendActiveMailHandler)
// 注: 这个 "send_active_mail" 必须和创建任务时传入的一致,
// 它表示任务类型, 可以有多个任务类型(比如发送重置密码邮件, 发送欢迎邮件)
// mux.HandleFunc("send_welcome_mail", SendWelcomeMailHandler)
// mux.HandleFunc("send_reset_password_mail", SendResetPasswdMailHandler)
if err := workerServer.Run(mux); err != nil {
log.Fatalf("[error] Worker 异常: %v", err)
}
}
// 停止 worker
func ShutdownWorker() {
if workerServer != nil {
workerServer.Shutdown()
}
}go
package main
import (
"encoding/json"
"fmt"
"log"
"os"
"os/signal"
"syscall"
"time"
"github.com/gofiber/fiber/v3"
"github.com/hibiken/asynq"
)
// 模拟发送激活邮件
// 此时它并不会直接去执行, 而是将数据保存到 redis 中
// 等待空闲时, 由 hibiken/asynq 这个包的消费者去自动执行
func MockSendActiveMail() { // 生产者: 生成异步任务/放入任务队列
// 1.创建客户端(设置 redis 链接信息)
client := asynq.NewClient(asynq.RedisClientOpt{
Addr: "127.0.0.1:6379",
})
defer client.Close()
// 2.携带执行异步任务需要的信息
taskPayload, err := json.Marshal(map[string]string{
"userId": "10001", // 用户 id
"email": "test@example.com", // 目标邮箱地址
})
if err != nil {
fmt.Println("任务信息生成失败", err)
return
}
// 3.将任务放入队列
task := asynq.NewTask("send_active_mail", taskPayload)
info, err := client.Enqueue(task)
if err != nil {
fmt.Println("任务入列失败", err)
return
}
fmt.Println("任务入列成功", info)
}
func main() {
app := fiber.New()
// 0.后台启动消费者
go StartWorker()
// 1.模拟注册接口
app.Get("/register", func(c fiber.Ctx) error {
MockSendActiveMail()
// 每次响应都很慢
return c.JSON(fiber.Map{
"success": true,
"message": "模拟注册接口",
"results": "注册成功",
})
})
// 2.模拟激活账户接口
app.Get("/active", func(c fiber.Ctx) error {
return c.JSON(fiber.Map{
"success": true,
"message": "模拟激活账户接口",
"results": "激活成功",
})
})
// 3.监听 fiber 服务器退出
app.Hooks().OnPreShutdown(func() error {
ShutdownWorker() // fiber 服务器退出的时候结束 worker 服务
fmt.Println("=== 服务已全部退出 ===")
return nil
})
// 4.后台启动 fiber 服务器
go func() {
if err := app.Listen(":3000"); err != nil {
log.Fatalf("[server error]: %v", err)
}
}()
// 5.必须阻塞主线程, 否则程序会直接结束
// 创建一个通道用于接收信号: 监听 SIGINT(Ctrl+C)和SIGTERM(kill 命令)
quitChan := make(chan os.Signal, 1)
signal.Notify(quitChan, os.Interrupt, syscall.SIGTERM)
<-quitChan // 它阻塞主线程,直到收到信号
fmt.Println("\n>>> 捕获到退出信号, 开始优雅关闭...")
// 调用 Shutdown 会触发 OnPreShutdown
if err := app.ShutdownWithTimeout(5 * time.Second); err != nil {
fmt.Printf("Error during shutdown: %v\n", err)
}
fmt.Println("Server gracefully stopped.")
}sh
go build -o server
chmod +x ./server
./server
# 测试
# curl -i -X GET http://127.0.0.1:3000/register
# 查看控制台输出超时与截止时间控制
有些任务可能会执行很久, 比如视频转码, 生成报表, 处理大文件, 调用第三方API, 如果任务一直卡住就会导致消费者被一直占用,影响其他任务的执行, 所以就需要在入列的时候添加一些限制
go
func MockSendActiveMail() {
// 1.创建客户端(设置 redis 链接信息)
client := asynq.NewClient(asynq.RedisClientOpt{
Addr: "127.0.0.1:6379",
})
defer client.Close()
// 2.携带执行异步任务需要的信息
taskPayload, err := json.Marshal(map[string]string{
"userId": "10001", // 用户 id
"email": "test@example.com", // 目标邮箱地址
})
if err != nil {
fmt.Println("任务信息生成失败", err)
return
}
// 3.将任务放入队列并且设置任务超时时间
task := asynq.NewTask("send_active_mail:timeout", taskPayload)
info, err := client.Enqueue(
task,
asynq.Timeout(3*time.Second), // 如果一个任务执行时间超过3秒就执行失败
asynq.MaxRetry(3), // 自动重试次数
)
if err != nil {
fmt.Println("任务入列失败", err)
return
}
fmt.Println("任务入列成功", info)
}go
func MockSendActiveMail() {
// 1.创建客户端(设置 redis 链接信息)
client := asynq.NewClient(asynq.RedisClientOpt{
Addr: "127.0.0.1:6379",
})
defer client.Close()
// 2.携带执行异步任务需要的信息
taskPayload, err := json.Marshal(map[string]string{
"userId": "10001", // 用户 id
"email": "test@example.com", // 目标邮箱地址
})
if err != nil {
fmt.Println("任务信息生成失败", err)
return
}
// 3.将任务放入队列并且设置任务超时时间
task := asynq.NewTask("send_active_mail:deadline", taskPayload)
deadline := time.Now().Add(time.Second * 10)
info, err := client.Enqueue(
task,
asynq.Deadline(deadline), // 如果任务到执行时间(比如10秒钟后)还未执行完就取消
asynq.MaxRetry(3), // 自动重试次数, 如果任务被取消, 会自动触发重试
)
if err != nil {
fmt.Println("任务入列失败", err)
return
}
fmt.Println("任务入列成功", info)
}go
package main
import (
"context"
"encoding/json"
"fmt"
"log"
"time"
"github.com/hibiken/asynq"
)
var TaskPayload struct {
UserID string `json:"userId"`
Email string `json:"email"`
}
func sendActiveMailHandler(ctx context.Context, t *asynq.Task) error {
if err := json.Unmarshal(t.Payload(), &TaskPayload); err != nil {
return fmt.Errorf("反序列化任务负载失败: %w", err)
}
// 模拟发送邮件(实际项目中替换为真实邮件逻辑)
fmt.Printf("[模拟] 正在向 %s (用户ID: %s) 发送激活邮件...\n", TaskPayload.Email, TaskPayload.UserID)
time.Sleep(5 * time.Second) // 模拟耗时操作
fmt.Println("=== 邮件发送完成 ===")
return nil
}
func sendActiveMailWithTimeHandler(ctx context.Context, t *asynq.Task) error {
fmt.Printf("=== 开始执行带有 timeout/deadline 的任务, task=%s payload=%s ===\n", t.Type(), t.Payload())
select {
case <-time.After(time.Second * 5): // 假设这个任务需要5s才能执行完成
fmt.Println("=== timeout/deadline 慢任务执行完成")
return nil
case <-ctx.Done():
fmt.Println("=== timeout/deadline 取消任务:时间超过了限制的时间")
return ctx.Err()
}
}
// 全局 worker server 实例
var workerServer *asynq.Server
// 启动 worker
func StartWorker() {
workerServer = asynq.NewServer(
asynq.RedisClientOpt{
Addr: "127.0.0.1:6379", // 链接 redis 服务器选项
},
asynq.Config{
Concurrency: 5, // 可扩展:Logger, RetryDelayFunc 等
},
)
mux := asynq.NewServeMux()
mux.HandleFunc("send_active_mail", sendActiveMailHandler)
// 注意这两个不同类型的任务用的是同一个处理函数
mux.HandleFunc("send_active_mail:timeout", sendActiveMailWithTimeHandler)
mux.HandleFunc("send_active_mail:deadline", sendActiveMailWithTimeHandler)
if err := workerServer.Run(mux); err != nil {
log.Fatalf("[error] Worker 异常: %v", err)
}
}
// 停止 worker
func ShutdownWorker() {
if workerServer != nil {
workerServer.Shutdown()
}
}重试与失败任务
有些任务执行失败可能是暂时的, 比如查询订单状态是否是已经支付的状态, 第一次查询可能还没有支付, 但是等一会再查用户就已经支付了, 而这样的任务就不应该放弃, 而是应该让他重试执行
- 在 asynq 中, 消费者Handler返回 nil 表示任务执行成功, 返回 error 表示任务执行失败, 如果没有超过最大重试次数, 会被放入
retry队列中 - 在上面的例子中, 已经在生产者入列时通过
asynq.MaxRetry(3)设置了重试次数
go
func MockSendActiveMail() {
// 1.创建客户端(设置 redis 链接信息)
client := asynq.NewClient(asynq.RedisClientOpt{
Addr: "127.0.0.1:6379",
})
defer client.Close()
// 2.携带执行异步任务需要的信息
taskPayload, err := json.Marshal(map[string]string{
"userId": "10001", // 用户 id
"email": "test@example.com", // 目标邮箱地址
})
if err != nil {
fmt.Println("任务信息生成失败", err)
return
}
// 3.将任务放入队列
task := asynq.NewTask("send_active_mail_retry", taskPayload)
info, err := client.Enqueue(
task,
asynq.MaxRetry(3), // 只设置自动重试次数, 不设置时间限制
)
if err != nil {
fmt.Println("任务入列失败", err)
return
}
fmt.Println("任务入列成功", info)
}go
package main
import (
"context"
"encoding/json"
"errors"
"fmt"
"log"
"time"
"github.com/hibiken/asynq"
)
var TaskPayload struct {
UserID string `json:"userId"`
Email string `json:"email"`
}
func sendActiveMailHandler(ctx context.Context, t *asynq.Task) error {
if err := json.Unmarshal(t.Payload(), &TaskPayload); err != nil {
return fmt.Errorf("反序列化任务负载失败: %w", err)
}
// 模拟发送邮件(实际项目中替换为真实邮件逻辑)
fmt.Printf("[模拟] 正在向 %s (用户ID: %s) 发送激活邮件...\n", TaskPayload.Email, TaskPayload.UserID)
time.Sleep(5 * time.Second) // 模拟耗时操作
fmt.Println("=== 邮件发送完成 ===")
return nil
}
func sendActiveMailWithRetryHandler(ctx context.Context, t *asynq.Task) error {
// 在这个任务处理器中, 必须要返回 error 才能将这个任务放到 retry 队列中
return errors.New("模拟任务失败")
}
// 全局 worker server 实例
var workerServer *asynq.Server
// 启动 worker
func StartWorker() {
workerServer = asynq.NewServer(
asynq.RedisClientOpt{
Addr: "127.0.0.1:6379", // 链接 redis 服务器选项
},
asynq.Config{
Concurrency: 5,
RetryDelayFunc: func(n int, e error, t *asynq.Task) time.Duration {
// 设置重试间隔时间(比如上一次执行失败后, 间隔5秒钟后再重试)
fmt.Printf("[retry]任务失败, 准备第 %d 次数重试, type=%s err=%v \n", n, t.Type(), e)
return time.Second * 5
},
},
)
mux := asynq.NewServeMux()
mux.HandleFunc("send_active_mail", sendActiveMailHandler)
mux.HandleFunc("send_active_mail_retry", sendActiveMailWithRetryHandler)
if err := workerServer.Run(mux); err != nil {
log.Fatalf("[error] Worker 异常: %v", err)
}
}
// 停止 worker
func ShutdownWorker() {
if workerServer != nil {
workerServer.Shutdown()
}
}优先级任务
不是所有异步任务都一样重要, 比如"服务器宕机警告"就应该尽快处理, 而"每天生成财务报表"则可以不着急, 慢一点也没关系, 这时候就可以使用不同的队列, 给不同的队列设置 权重 权重越大被消费者获取到的几率越高
go
func MockSendActiveMail() {
// 1.创建客户端(设置 redis 链接信息)
client := asynq.NewClient(asynq.RedisClientOpt{
Addr: "127.0.0.1:6379",
})
defer client.Close()
// 2.携带执行异步任务需要的信息
tasks := []struct {
queue string // 这个字段必须是消费者 asynq.NewServer#asynq.Config#Queues 中有的
desc string
payload string
}{
{
queue: "important",
desc: "重要任务队列",
payload: "important-task-data",
},
{
queue: "high",
desc: "高优先级任务",
payload: "higi-task-data",
},
{
queue: "default",
desc: "普通任务队列",
payload: "default-task-data",
},
{
queue: "low",
desc: "低优先级任务",
payload: "low-task-data",
},
{
queue: "example", // 这个任务不会执行, 因为队列不存在
desc: "不存在的队列",
payload: "unknown-task-data",
},
}
// 3.将任务放入队列
for _, item := range tasks {
task := asynq.NewTask("task_with_priority", []byte(item.payload))
info, _ := client.Enqueue(task, asynq.Queue(item.queue))
fmt.Println("入列成功", item.desc, info)
}
}go
package main
import (
"context"
"errors"
"fmt"
"log"
"time"
"github.com/hibiken/asynq"
)
// var TaskPayload struct {
// UserID string `json:"userId"`
// Email string `json:"email"`
// }
// func sendActiveMailHandler(ctx context.Context, t *asynq.Task) error {
// if err := json.Unmarshal(t.Payload(), &TaskPayload); err != nil {
// return fmt.Errorf("反序列化任务负载失败: %w", err)
// }
//
// // 模拟发送邮件(实际项目中替换为真实邮件逻辑)
// fmt.Printf("[模拟] 正在向 %s (用户ID: %s) 发送激活邮件...\n", TaskPayload.Email, TaskPayload.UserID)
// time.Sleep(5 * time.Second) // 模拟耗时操作
//
// fmt.Println("=== 邮件发送完成 ===")
// return nil
// }
func sendActiveMailWithRetryHandler(ctx context.Context, t *asynq.Task) error {
// 在这个任务处理器中, 必须要返回 error 才能将这个任务放到 retry 队列中
return errors.New("模拟任务失败")
}
func taskWithPriorityHandler(ctx context.Context, t *asynq.Task) error {
fmt.Printf("=== 开始执行任务, task=%s payload=%s ===\n", t.Type(), t.Payload())
fmt.Println("=== 任务执行完成 ===")
return nil
}
// 全局 worker server 实例
var workerServer *asynq.Server
// 启动 worker
func StartWorker() {
workerServer = asynq.NewServer(
asynq.RedisClientOpt{
Addr: "127.0.0.1:6379",
},
asynq.Config{
Concurrency: 5,
RetryDelayFunc: func(n int, e error, t *asynq.Task) time.Duration {
// 设置重试间隔时间
fmt.Printf("[retry]任务失败, 准备第 %d 次数重试, type=%s err=%v \n", n, t.Type(), e)
return time.Second * 5
},
Queues: map[string]int{
"important": 8,
"high": 6,
"default": 4,
"low": 2,
},
},
)
mux := asynq.NewServeMux()
// mux.HandleFunc("send_active_mail", sendActiveMailHandler)
// 优先级队列
mux.HandleFunc("task_with_priority", taskWithPriorityHandler)
if err := workerServer.Run(mux); err != nil {
log.Fatalf("[error] Worker 异常: %v", err)
}
}
// 停止 worker
func ShutdownWorker() {
if workerServer != nil {
workerServer.Shutdown()
}
}延迟任务
不需要立即执行的低优先级任务
go
func MockSendActiveMail() {
// 1.创建客户端(设置 redis 链接信息)
client := asynq.NewClient(asynq.RedisClientOpt{
Addr: "127.0.0.1:6379",
})
defer client.Close()
// 2.将任务放入队列
// 2.1 在10s钟之后执行这个任务
delayTask1 := asynq.NewTask("delay_task", []byte("execute after 10 seconds"))
info1, _ := client.Enqueue(delayTask1, asynq.ProcessIn(10*time.Second))
fmt.Println("入列成功", info1)
// 2.2 在指定的时间执行这个任务(比如当前时间的1分钟后)
delayTask2 := asynq.NewTask("delay_task", []byte("execute after 1 minute"))
info2, _ := client.Enqueue(delayTask2, asynq.ProcessAt(time.Now().Add(1*time.Minute)))
fmt.Println("入列成功", info2)
}
// 消费者直接正常执行即可, 需要添加什么参数唯一任务
用来解决 同一件事情不要重复入列 的问题
- Unique: 按照任务内容去重, 在一段时间内, 相同的任务类型, payload和队列只能入列一次
- TaskID: 手动指定一个id, 如果已经在队列中存在则不加入队列(推荐使用, 这个更直观)一般来说设计数据表的时候都会有一个唯一主键
go
func MockSendActiveMail() {
// 1.创建客户端(设置 redis 链接信息)
client := asynq.NewClient(asynq.RedisClientOpt{
Addr: "127.0.0.1:6379",
})
defer client.Close()
// 2.携带执行异步任务需要的信息
payload := map[string]string{
"userId": "10001", // 用户 id
"email": "test@example.com", // 目标邮箱地址
}
taskPayload, err := json.Marshal(payload)
if err != nil {
fmt.Println("任务信息生成失败", err)
return
}
// 3.将任务放入队列
taskID := fmt.Sprintf("%s-%s", payload["userId"], payload["email"])
task := asynq.NewTask("send_active_mail", taskPayload)
info, err := client.Enqueue(task, asynq.TaskID(taskID))
if err != nil {
fmt.Println("任务入列失败", err)
return
}
fmt.Println("任务入列成功", info)
}定时任务(周期性任务)
这个就有点类似于 linux 操作系统的 crontab 就是 每隔一段时间执行一次
于其他任务不太相同的是, 它不是有生产者来添加到队列中的, 而是由调度器注册到任务队列中的
go
package main
import (
"encoding/json"
"fmt"
"log"
"os"
"os/signal"
"syscall"
"time"
"github.com/gofiber/fiber/v3"
"github.com/hibiken/asynq"
)
func MockSendActiveMail() {
// 1.创建客户端(设置 redis 链接信息)
client := asynq.NewClient(asynq.RedisClientOpt{
Addr: "127.0.0.1:6379",
})
defer client.Close()
// 2.携带执行异步任务需要的信息
payload := map[string]string{
"userId": "10001", // 用户 id
"email": "test@example.com", // 目标邮箱地址
}
taskPayload, err := json.Marshal(payload)
if err != nil {
fmt.Println("任务信息生成失败", err)
return
}
// 3.将任务放入队列
taskID := fmt.Sprintf("%s-%s", payload["userId"], payload["email"])
task := asynq.NewTask("send_active_mail", taskPayload)
info, err := client.Enqueue(task, asynq.TaskID(taskID))
if err != nil {
fmt.Println("任务入列失败", err)
return
}
fmt.Println("任务入列成功", info)
}
func initScheduler() {
scheduler := asynq.NewScheduler(
asynq.RedisClientOpt{Addr: "127.0.0.1:6379"},
&asynq.SchedulerOpts{},
)
// 可以使用这种 @every 2s 这种写法: 每2秒钟执行一次
scheduler.Register("@every 2s", asynq.NewTask("print_tick", []byte("tick")))
// 也可以使用 crontab 表达式: 每2分钟执行一次
scheduler.Register("*/2 * * * *", asynq.NewTask("print_cron", []byte("cron")))
if err := scheduler.Run(); err != nil {
log.Fatalf("[error] Scheduler 异常: %v", err)
}
}
func main() {
app := fiber.New()
// 0.后台启动消费者 & 调度器
go initScheduler()
go StartWorker()
// 1.模拟注册接口
app.Get("/register", func(c fiber.Ctx) error {
MockSendActiveMail()
// 每次响应都很慢
return c.JSON(fiber.Map{
"success": true,
"message": "模拟注册接口",
"results": "注册成功",
})
})
// 2.模拟激活账户接口
app.Get("/active", func(c fiber.Ctx) error {
return c.JSON(fiber.Map{
"success": true,
"message": "模拟激活账户接口",
"results": "激活成功",
})
})
// 3.监听 fiber 服务器退出
app.Hooks().OnPreShutdown(func() error {
fmt.Println("=== 服务已全部退出 ===")
ShutdownWorker()
return nil
})
// 4. 启动 fiber 服务器
go func() {
if err := app.Listen(":3000"); err != nil {
log.Fatalf("[server error]: %v", err)
}
}()
// 5.阻塞主线程(直到 Ctrl-C 退出)
exitChan := make(chan os.Signal, 1)
signal.Notify(exitChan, os.Interrupt, syscall.SIGTERM)
<-exitChan
fmt.Println("\n === 捕获到退出信号, 开始优雅关闭 ===")
if err := app.ShutdownWithTimeout(5 * time.Second); err != nil {
fmt.Printf("=== Error during shutdown: %v === \n", err)
}
}go
package main
import (
"context"
"fmt"
"log"
"github.com/hibiken/asynq"
)
// 对于消费者来说, 不管是普通异步任务还是周期性任务, 都是一样的
func scheduleTaskHandler(ctx context.Context, t *asynq.Task) error {
fmt.Printf("=== 开始执行计划任务, task=%s payload=%s ===\n", t.Type(), t.Payload())
return nil
}
// 全局 worker server 实例
var workerServer *asynq.Server
// 启动 worker
func StartWorker() {
workerServer = asynq.NewServer(
asynq.RedisClientOpt{
Addr: "127.0.0.1:6379",
},
asynq.Config{
Concurrency: 10,
},
)
mux := asynq.NewServeMux()
// 处理周期性任务
mux.HandleFunc("print_tick", scheduleTaskHandler)
mux.HandleFunc("print_cron", scheduleTaskHandler)
if err := workerServer.Run(mux); err != nil {
log.Fatalf("[error] Worker 异常: %v", err)
}
}
// 停止 worker
func ShutdownWorker() {
if workerServer != nil {
workerServer.Shutdown()
}
}消费者中间件
go
package main
import (
"context"
"fmt"
"log"
"time"
"github.com/hibiken/asynq"
)
func scheduleTaskHandler(ctx context.Context, t *asynq.Task) error {
fmt.Printf("=== 开始执行计划任务, task=%s payload=%s ===\n", t.Type(), t.Payload())
return nil
}
// 日志中间件于http的muxServer非常类似
func loggingMiddleware(next asynq.Handler) asynq.Handler {
return asynq.HandlerFunc(func(c context.Context, t *asynq.Task) error {
startTime := time.Now()
log.Printf("[info] 开始执行任务, task=%s payload=%s", t.Type(), t.Payload())
if err := next.ProcessTask(c, t); err != nil {
log.Printf("[error] 任务执行失败, task=%s payload=%s err=%v", t.Type(), t.Payload(), err)
return err
}
log.Printf("[info] 任务执行完成, task=%s payload=%s cost=%v", t.Type(), t.Payload(), time.Since(startTime))
return nil
})
}
// 全局 worker server 实例
var workerServer *asynq.Server
// 启动 worker
func StartWorker() {
workerServer = asynq.NewServer(
asynq.RedisClientOpt{
Addr: "127.0.0.1:6379",
},
asynq.Config{
Concurrency: 10,
},
)
mux := asynq.NewServeMux()
mux.Use(loggingMiddleware) // 应用日志中间件
mux.HandleFunc("print_tick", scheduleTaskHandler)
mux.HandleFunc("print_cron", scheduleTaskHandler)
if err := workerServer.Run(mux); err != nil {
log.Fatalf("[error] Worker 异常: %v", err)
}
}
// 停止 worker
func ShutdownWorker() {
if workerServer != nil {
workerServer.Shutdown()
}
}