Skip to content

介绍

  • asynq 是一个异步任务队列库
  • asynqmon 是一个图形化的 web 监控端(查看任务运行状态)

准备工作

  1. 运行一台 redis 服务器(建议实用docker)
  2. 启动图形化 webui 监控端项目

在使用 asynq 往 redis 中放入异步任务之前, 先将图形化客户端跑起来, 这样方便我们看到效果

sh
mkdir asynqmon-webui && cd -
mkdir redis_data && touch docker-compose.yaml
go mod init asynqmon_webui
go get -u github.com/hibiken/asynq
go get -u github.com/hibiken/asynqmon
yaml
services:
  redis:
    container_name: redis8
    image: redis:8
    restart: always
    ports:
      - 6379:6379
    volumes:
      - ./redis_data:/data
go
package main

import (
	"fmt"
	"log"
	"net/http"

	"github.com/hibiken/asynq"
	"github.com/hibiken/asynqmon"
)

func main() {
	h := asynqmon.New(asynqmon.Options{
		RedisConnOpt: asynq.RedisClientOpt{Addr: ":6379"},
	})
	http.Handle("/", h)

	port := ":8080"
	mess := fmt.Sprintf("asynqmon webui running on http://127.0.0.1%s", port)
	fmt.Println(mess)
	log.Fatal(http.ListenAndServe(port, nil))
}

应用场景

你可能会遇到这样的场景

  1. 为了防止机器人注册, 用户表设计了一个 is_active 字段
  2. 当用户注册成功后, 需要 发送一个激活账户的邮件, 邮件内部带有激活账号的链接
  3. 需要人手动点击访问激活链接才能激活账号, 以此来过滤机器人账号
  4. 此时 发送激活账户邮件 是一个比较耗费时间的任务, 我希望它异步的执行, 不要影响注册接口的快速响应

快速开始

sh
mkdir asynq-demo & cd -
go mod init asynq_demo
touch main.go

# 安装依赖
go get github.com/gofiber/fiber/v3
go get github.com/hibiken/asynq

启动服务并测试

go
package main

import (
	"fmt"
	"time"

	"github.com/gofiber/fiber/v3"
)

// 模拟发送邮件任务
// 1.查询数据库数据
// 2.判断这个账户今天是否到达最大发送邮件数量(防止盗刷)
// 3.生成 "激活链接"
// 4.调用邮件服务API/直接发送邮件
func MockSendActiveMail() {
	time.Sleep(time.Second * 5)
	fmt.Println("邮件发送成功")
}

func main() {
	app := fiber.New()

	// 1.模拟注册接口
	app.Get("/register", func(c fiber.Ctx) error {
		MockSendActiveMail() // 由于发送邮件比较慢, 所以导致这个接口的响应比较慢
		return c.JSON(fiber.Map{
			"success": true,
			"message": "模拟注册接口",
			"results": "注册成功",
		})
	})

	// 2.模拟激活账户接口
	app.Get("/active", func(c fiber.Ctx) error {
		return c.JSON(fiber.Map{
			"success": true,
			"message": "模拟激活账户接口",
			"results": "激活成功",
		})
	})

	err := app.Listen(":3000")
	if err != nil {
		log.Fatal(err)
	}
}

将发送邮件变为异步任务保证接口响应速度

go
package main

import (
	"encoding/json"
	"fmt"
	"log"

	"github.com/gofiber/fiber/v3"
	"github.com/hibiken/asynq"
)

// 模拟发送激活邮件
// 此时它并不会直接去执行, 而是将数据保存到 redis 中
// 等待空闲时, 由 hibiken/asynq 这个包的消费者去自动执行
func MockSendActiveMail() { // 生产者: 生成异步任务/放入任务队列
	// 1.创建客户端(设置 redis 链接信息)
	client := asynq.NewClient(asynq.RedisClientOpt{
		Addr: "127.0.0.1:6379",
	})
	defer client.Close()

	// 2.携带执行异步任务需要的信息
	taskPayload, err := json.Marshal(map[string]string{
		"userId": "10001",            // 用户 id
		"email":  "test@example.com", // 目标邮箱地址
	})
	if err != nil {
		fmt.Println("任务信息生成失败", err)
		return
	}

	// 3.将任务放入队列
	task := asynq.NewTask("send_active_mail", taskPayload)
	info, err := client.Enqueue(task)
	if err != nil {
		fmt.Println("任务入列失败", err)
		return
	}
	fmt.Println("任务入列成功", info)
}

func main() {
	app := fiber.New()

	// 1.模拟注册接口
	app.Get("/register", func(c fiber.Ctx) error {
		MockSendActiveMail()

		// 每次响应都很慢
		return c.JSON(fiber.Map{
			"success": true,
			"message": "模拟注册接口",
			"results": "注册成功",
		})
	})

	// 2.模拟激活账户接口
	app.Get("/active", func(c fiber.Ctx) error {
		return c.JSON(fiber.Map{
			"success": true,
			"message": "模拟激活账户接口",
			"results": "激活成功",
		})
	})

	err := app.Listen(":3000")
	if err != nil {
		log.Fatal(err)
	}
}

实现队列消费者

此时只是将任务放入了 redis 队列, 它并不会执行

go
package main

import (
	"context"
	"encoding/json"
	"fmt"
	"log"
	"time"

	"github.com/hibiken/asynq"
)

var TaskPayload struct {
	UserID string `json:"userId"`
	Email  string `json:"email"`
}

// 发送激活邮件任务处理器
func sendActiveMailHandler(ctx context.Context, t *asynq.Task) error {
	if err := json.Unmarshal(t.Payload(), &TaskPayload); err != nil {
		return fmt.Errorf(">>> 反序列化任务负载失败: %w", err)
	}

	// 模拟发送邮件(实际项目中替换为真实邮件逻辑)
	fmt.Printf(">>> 正在向 %s (用户ID: %s) 发送激活邮件...\n", TaskPayload.Email, TaskPayload.UserID)
	time.Sleep(5 * time.Second) // 模拟耗时操作

	fmt.Println(">>> 邮件发送完成 <<<")
	return nil
}

// 全局 worker server 实例
var workerServer *asynq.Server

// 启动 worker
func StartWorker() {
	workerServer = asynq.NewServer(
		asynq.RedisClientOpt{
			Addr: "127.0.0.1:6379", // 链接 redis 服务器选项
		},
		asynq.Config{
			Concurrency: 5, // 可扩展:Logger, RetryDelayFunc 等
		},
	)

	mux := asynq.NewServeMux()

	mux.HandleFunc("send_active_mail", sendActiveMailHandler)
	// 注: 这个 "send_active_mail" 必须和创建任务时传入的一致,
	// 它表示任务类型, 可以有多个任务类型(比如发送重置密码邮件, 发送欢迎邮件)
	// mux.HandleFunc("send_welcome_mail", SendWelcomeMailHandler)
	// mux.HandleFunc("send_reset_password_mail", SendResetPasswdMailHandler)

	if err := workerServer.Run(mux); err != nil {
		log.Fatalf("[error] Worker 异常: %v", err)
	}
}

// 停止 worker
func ShutdownWorker() {
	if workerServer != nil {
		workerServer.Shutdown()
	}
}
go
package main

import (
	"encoding/json"
	"fmt"
	"log"
	"os"
	"os/signal"
	"syscall"
	"time"

	"github.com/gofiber/fiber/v3"
	"github.com/hibiken/asynq"
)

// 模拟发送激活邮件
// 此时它并不会直接去执行, 而是将数据保存到 redis 中
// 等待空闲时, 由 hibiken/asynq 这个包的消费者去自动执行
func MockSendActiveMail() { // 生产者: 生成异步任务/放入任务队列
	// 1.创建客户端(设置 redis 链接信息)
	client := asynq.NewClient(asynq.RedisClientOpt{
		Addr: "127.0.0.1:6379",
	})
	defer client.Close()

	// 2.携带执行异步任务需要的信息
	taskPayload, err := json.Marshal(map[string]string{
		"userId": "10001",            // 用户 id
		"email":  "test@example.com", // 目标邮箱地址
	})
	if err != nil {
		fmt.Println("任务信息生成失败", err)
		return
	}

	// 3.将任务放入队列
	task := asynq.NewTask("send_active_mail", taskPayload)
	info, err := client.Enqueue(task)
	if err != nil {
		fmt.Println("任务入列失败", err)
		return
	}
	fmt.Println("任务入列成功", info)
}

func main() {
	app := fiber.New()

	// 0.后台启动消费者
	go StartWorker()

	// 1.模拟注册接口
	app.Get("/register", func(c fiber.Ctx) error {
		MockSendActiveMail()

		// 每次响应都很慢
		return c.JSON(fiber.Map{
			"success": true,
			"message": "模拟注册接口",
			"results": "注册成功",
		})
	})

	// 2.模拟激活账户接口
	app.Get("/active", func(c fiber.Ctx) error {
		return c.JSON(fiber.Map{
			"success": true,
			"message": "模拟激活账户接口",
			"results": "激活成功",
		})
	})

	// 3.监听 fiber 服务器退出
	app.Hooks().OnPreShutdown(func() error {
	  ShutdownWorker() // fiber 服务器退出的时候结束 worker 服务
		fmt.Println("=== 服务已全部退出 ===")
		return nil
	})

	// 4.后台启动 fiber 服务器
	go func() {
		if err := app.Listen(":3000"); err != nil {
			log.Fatalf("[server error]: %v", err)
		}
	}()

	// 5.必须阻塞主线程, 否则程序会直接结束
	// 创建一个通道用于接收信号: 监听 SIGINT(Ctrl+C)和SIGTERM(kill 命令)
	quitChan := make(chan os.Signal, 1)
	signal.Notify(quitChan, os.Interrupt, syscall.SIGTERM)

	<-quitChan // 它阻塞主线程,直到收到信号
	fmt.Println("\n>>> 捕获到退出信号, 开始优雅关闭...")

	// 调用 Shutdown 会触发 OnPreShutdown
	if err := app.ShutdownWithTimeout(5 * time.Second); err != nil {
		fmt.Printf("Error during shutdown: %v\n", err)
	}

	fmt.Println("Server gracefully stopped.")
}
sh
go build -o server
chmod +x ./server
./server

# 测试
# curl -i -X GET http://127.0.0.1:3000/register
# 查看控制台输出

超时与截止时间控制

有些任务可能会执行很久, 比如视频转码, 生成报表, 处理大文件, 调用第三方API, 如果任务一直卡住就会导致消费者被一直占用,影响其他任务的执行, 所以就需要在入列的时候添加一些限制

go
func MockSendActiveMail() {
	// 1.创建客户端(设置 redis 链接信息)
	client := asynq.NewClient(asynq.RedisClientOpt{
		Addr: "127.0.0.1:6379",
	})
	defer client.Close()

	// 2.携带执行异步任务需要的信息
	taskPayload, err := json.Marshal(map[string]string{
		"userId": "10001",            // 用户 id
		"email":  "test@example.com", // 目标邮箱地址
	})
	if err != nil {
		fmt.Println("任务信息生成失败", err)
		return
	}

	// 3.将任务放入队列并且设置任务超时时间
	task := asynq.NewTask("send_active_mail:timeout", taskPayload)
	info, err := client.Enqueue(
		task,
		asynq.Timeout(3*time.Second), // 如果一个任务执行时间超过3秒就执行失败
		asynq.MaxRetry(3),            // 自动重试次数
	)
	if err != nil {
		fmt.Println("任务入列失败", err)
		return
	}
	fmt.Println("任务入列成功", info)
}
go
func MockSendActiveMail() {
	// 1.创建客户端(设置 redis 链接信息)
	client := asynq.NewClient(asynq.RedisClientOpt{
		Addr: "127.0.0.1:6379",
	})
	defer client.Close()

	// 2.携带执行异步任务需要的信息
	taskPayload, err := json.Marshal(map[string]string{
		"userId": "10001",            // 用户 id
		"email":  "test@example.com", // 目标邮箱地址
	})
	if err != nil {
		fmt.Println("任务信息生成失败", err)
		return
	}

	// 3.将任务放入队列并且设置任务超时时间
	task := asynq.NewTask("send_active_mail:deadline", taskPayload)
	deadline := time.Now().Add(time.Second * 10)
	info, err := client.Enqueue(
		task,
		asynq.Deadline(deadline), // 如果任务到执行时间(比如10秒钟后)还未执行完就取消
		asynq.MaxRetry(3),        // 自动重试次数, 如果任务被取消, 会自动触发重试
	)
	if err != nil {
		fmt.Println("任务入列失败", err)
		return
	}
	fmt.Println("任务入列成功", info)
}
go
package main

import (
	"context"
	"encoding/json"
	"fmt"
	"log"
	"time"

	"github.com/hibiken/asynq"
)

var TaskPayload struct {
	UserID string `json:"userId"`
	Email  string `json:"email"`
}

func sendActiveMailHandler(ctx context.Context, t *asynq.Task) error {
	if err := json.Unmarshal(t.Payload(), &TaskPayload); err != nil {
		return fmt.Errorf("反序列化任务负载失败: %w", err)
	}

	// 模拟发送邮件(实际项目中替换为真实邮件逻辑)
	fmt.Printf("[模拟] 正在向 %s (用户ID: %s) 发送激活邮件...\n", TaskPayload.Email, TaskPayload.UserID)
	time.Sleep(5 * time.Second) // 模拟耗时操作

	fmt.Println("=== 邮件发送完成 ===")
	return nil
}

func sendActiveMailWithTimeHandler(ctx context.Context, t *asynq.Task) error {
	fmt.Printf("=== 开始执行带有 timeout/deadline 的任务, task=%s payload=%s ===\n", t.Type(), t.Payload())
	select {
	case <-time.After(time.Second * 5): // 假设这个任务需要5s才能执行完成
		fmt.Println("=== timeout/deadline 慢任务执行完成")
		return nil
	case <-ctx.Done():
		fmt.Println("=== timeout/deadline 取消任务:时间超过了限制的时间")
		return ctx.Err()
	}
}

// 全局 worker server 实例
var workerServer *asynq.Server

// 启动 worker
func StartWorker() {
	workerServer = asynq.NewServer(
		asynq.RedisClientOpt{
			Addr: "127.0.0.1:6379", // 链接 redis 服务器选项
		},
		asynq.Config{
			Concurrency: 5, // 可扩展:Logger, RetryDelayFunc 等
		},
	)

	mux := asynq.NewServeMux()

	mux.HandleFunc("send_active_mail", sendActiveMailHandler)

	// 注意这两个不同类型的任务用的是同一个处理函数
	mux.HandleFunc("send_active_mail:timeout", sendActiveMailWithTimeHandler)
	mux.HandleFunc("send_active_mail:deadline", sendActiveMailWithTimeHandler)

	if err := workerServer.Run(mux); err != nil {
		log.Fatalf("[error] Worker 异常: %v", err)
	}
}

// 停止 worker
func ShutdownWorker() {
	if workerServer != nil {
		workerServer.Shutdown()
	}
}

重试与失败任务

有些任务执行失败可能是暂时的, 比如查询订单状态是否是已经支付的状态, 第一次查询可能还没有支付, 但是等一会再查用户就已经支付了, 而这样的任务就不应该放弃, 而是应该让他重试执行

  • 在 asynq 中, 消费者Handler返回 nil 表示任务执行成功, 返回 error 表示任务执行失败, 如果没有超过最大重试次数, 会被放入 retry 队列中
  • 在上面的例子中, 已经在生产者入列时通过 asynq.MaxRetry(3) 设置了重试次数
go
func MockSendActiveMail() {
	// 1.创建客户端(设置 redis 链接信息)
	client := asynq.NewClient(asynq.RedisClientOpt{
		Addr: "127.0.0.1:6379",
	})
	defer client.Close()

	// 2.携带执行异步任务需要的信息
	taskPayload, err := json.Marshal(map[string]string{
		"userId": "10001",            // 用户 id
		"email":  "test@example.com", // 目标邮箱地址
	})
	if err != nil {
		fmt.Println("任务信息生成失败", err)
		return
	}

	// 3.将任务放入队列
	task := asynq.NewTask("send_active_mail_retry", taskPayload)
	info, err := client.Enqueue(
		task,
		asynq.MaxRetry(3), // 只设置自动重试次数, 不设置时间限制
	)
	if err != nil {
		fmt.Println("任务入列失败", err)
		return
	}
	fmt.Println("任务入列成功", info)
}
go
package main

import (
	"context"
	"encoding/json"
	"errors"
	"fmt"
	"log"
	"time"

	"github.com/hibiken/asynq"
)

var TaskPayload struct {
	UserID string `json:"userId"`
	Email  string `json:"email"`
}

func sendActiveMailHandler(ctx context.Context, t *asynq.Task) error {
	if err := json.Unmarshal(t.Payload(), &TaskPayload); err != nil {
		return fmt.Errorf("反序列化任务负载失败: %w", err)
	}

	// 模拟发送邮件(实际项目中替换为真实邮件逻辑)
	fmt.Printf("[模拟] 正在向 %s (用户ID: %s) 发送激活邮件...\n", TaskPayload.Email, TaskPayload.UserID)
	time.Sleep(5 * time.Second) // 模拟耗时操作

	fmt.Println("=== 邮件发送完成 ===")
	return nil
}

func sendActiveMailWithRetryHandler(ctx context.Context, t *asynq.Task) error {
	// 在这个任务处理器中, 必须要返回 error 才能将这个任务放到 retry 队列中
	return errors.New("模拟任务失败")
}

// 全局 worker server 实例
var workerServer *asynq.Server

// 启动 worker
func StartWorker() {
	workerServer = asynq.NewServer(
		asynq.RedisClientOpt{
			Addr: "127.0.0.1:6379", // 链接 redis 服务器选项
		},
		asynq.Config{
			Concurrency: 5,
			RetryDelayFunc: func(n int, e error, t *asynq.Task) time.Duration {
				// 设置重试间隔时间(比如上一次执行失败后, 间隔5秒钟后再重试)
				fmt.Printf("[retry]任务失败, 准备第 %d 次数重试, type=%s err=%v \n", n, t.Type(), e)
				return time.Second * 5
			},
		},
	)

	mux := asynq.NewServeMux()

	mux.HandleFunc("send_active_mail", sendActiveMailHandler)
	mux.HandleFunc("send_active_mail_retry", sendActiveMailWithRetryHandler)

	if err := workerServer.Run(mux); err != nil {
		log.Fatalf("[error] Worker 异常: %v", err)
	}
}

// 停止 worker
func ShutdownWorker() {
	if workerServer != nil {
		workerServer.Shutdown()
	}
}

优先级任务

不是所有异步任务都一样重要, 比如"服务器宕机警告"就应该尽快处理, 而"每天生成财务报表"则可以不着急, 慢一点也没关系, 这时候就可以使用不同的队列, 给不同的队列设置 权重 权重越大被消费者获取到的几率越高

go
func MockSendActiveMail() {
	// 1.创建客户端(设置 redis 链接信息)
	client := asynq.NewClient(asynq.RedisClientOpt{
		Addr: "127.0.0.1:6379",
	})
	defer client.Close()

	// 2.携带执行异步任务需要的信息
	tasks := []struct {
		queue   string // 这个字段必须是消费者 asynq.NewServer#asynq.Config#Queues 中有的
		desc    string
		payload string
	}{
		{
			queue:   "important",
			desc:    "重要任务队列",
			payload: "important-task-data",
		},
		{
			queue:   "high",
			desc:    "高优先级任务",
			payload: "higi-task-data",
		},
		{
			queue:   "default",
			desc:    "普通任务队列",
			payload: "default-task-data",
		},
		{
			queue:   "low",
			desc:    "低优先级任务",
			payload: "low-task-data",
		},
		{
			queue:   "example", // 这个任务不会执行, 因为队列不存在
			desc:    "不存在的队列",
			payload: "unknown-task-data",
		},
	}

	// 3.将任务放入队列
	for _, item := range tasks {
		task := asynq.NewTask("task_with_priority", []byte(item.payload))
		info, _ := client.Enqueue(task, asynq.Queue(item.queue))
		fmt.Println("入列成功", item.desc, info)
	}
}
go
package main

import (
	"context"
	"errors"
	"fmt"
	"log"
	"time"

	"github.com/hibiken/asynq"
)

// var TaskPayload struct {
// 	UserID string `json:"userId"`
// 	Email  string `json:"email"`
// }
// func sendActiveMailHandler(ctx context.Context, t *asynq.Task) error {
// 	if err := json.Unmarshal(t.Payload(), &TaskPayload); err != nil {
// 		return fmt.Errorf("反序列化任务负载失败: %w", err)
// 	}
//
// 	// 模拟发送邮件(实际项目中替换为真实邮件逻辑)
// 	fmt.Printf("[模拟] 正在向 %s (用户ID: %s) 发送激活邮件...\n", TaskPayload.Email, TaskPayload.UserID)
// 	time.Sleep(5 * time.Second) // 模拟耗时操作
//
// 	fmt.Println("=== 邮件发送完成 ===")
// 	return nil
// }

func sendActiveMailWithRetryHandler(ctx context.Context, t *asynq.Task) error {
	// 在这个任务处理器中, 必须要返回 error 才能将这个任务放到 retry 队列中
	return errors.New("模拟任务失败")
}

func taskWithPriorityHandler(ctx context.Context, t *asynq.Task) error {
	fmt.Printf("=== 开始执行任务, task=%s payload=%s ===\n", t.Type(), t.Payload())
	fmt.Println("=== 任务执行完成 ===")
	return nil
}

// 全局 worker server 实例
var workerServer *asynq.Server

// 启动 worker
func StartWorker() {
	workerServer = asynq.NewServer(
		asynq.RedisClientOpt{
			Addr: "127.0.0.1:6379",
		},
		asynq.Config{
			Concurrency: 5,
			RetryDelayFunc: func(n int, e error, t *asynq.Task) time.Duration {
				// 设置重试间隔时间
				fmt.Printf("[retry]任务失败, 准备第 %d 次数重试, type=%s err=%v \n", n, t.Type(), e)
				return time.Second * 5
			},
			Queues: map[string]int{
				"important": 8,
				"high":      6,
				"default":   4,
				"low":       2,
			},
		},
	)

	mux := asynq.NewServeMux()

	// mux.HandleFunc("send_active_mail", sendActiveMailHandler)
	// 优先级队列
	mux.HandleFunc("task_with_priority", taskWithPriorityHandler)

	if err := workerServer.Run(mux); err != nil {
		log.Fatalf("[error] Worker 异常: %v", err)
	}
}

// 停止 worker
func ShutdownWorker() {
	if workerServer != nil {
		workerServer.Shutdown()
	}
}

延迟任务

不需要立即执行的低优先级任务

go
func MockSendActiveMail() {
	// 1.创建客户端(设置 redis 链接信息)
	client := asynq.NewClient(asynq.RedisClientOpt{
		Addr: "127.0.0.1:6379",
	})
	defer client.Close()

	// 2.将任务放入队列
	// 2.1 在10s钟之后执行这个任务
	delayTask1 := asynq.NewTask("delay_task", []byte("execute after 10 seconds"))
	info1, _ := client.Enqueue(delayTask1, asynq.ProcessIn(10*time.Second))
	fmt.Println("入列成功", info1)

	// 2.2 在指定的时间执行这个任务(比如当前时间的1分钟后)
	delayTask2 := asynq.NewTask("delay_task", []byte("execute after 1 minute"))
	info2, _ := client.Enqueue(delayTask2, asynq.ProcessAt(time.Now().Add(1*time.Minute)))
	fmt.Println("入列成功", info2)
}

// 消费者直接正常执行即可, 需要添加什么参数

唯一任务

用来解决 同一件事情不要重复入列 的问题

  • Unique: 按照任务内容去重, 在一段时间内, 相同的任务类型, payload和队列只能入列一次
  • TaskID: 手动指定一个id, 如果已经在队列中存在则不加入队列(推荐使用, 这个更直观)一般来说设计数据表的时候都会有一个唯一主键
go
func MockSendActiveMail() {
	// 1.创建客户端(设置 redis 链接信息)
	client := asynq.NewClient(asynq.RedisClientOpt{
		Addr: "127.0.0.1:6379",
	})
	defer client.Close()

	// 2.携带执行异步任务需要的信息
	payload := map[string]string{
		"userId": "10001",            // 用户 id
		"email":  "test@example.com", // 目标邮箱地址
	}
	taskPayload, err := json.Marshal(payload)
	if err != nil {
		fmt.Println("任务信息生成失败", err)
		return
	}

	// 3.将任务放入队列
	taskID := fmt.Sprintf("%s-%s", payload["userId"], payload["email"])
	task := asynq.NewTask("send_active_mail", taskPayload)
	info, err := client.Enqueue(task, asynq.TaskID(taskID))
	if err != nil {
		fmt.Println("任务入列失败", err)
		return
	}
	fmt.Println("任务入列成功", info)
}

定时任务(周期性任务)

这个就有点类似于 linux 操作系统的 crontab 就是 每隔一段时间执行一次

于其他任务不太相同的是, 它不是有生产者来添加到队列中的, 而是由调度器注册到任务队列中的

go
package main

import (
	"encoding/json"
	"fmt"
	"log"
	"os"
	"os/signal"
	"syscall"
	"time"

	"github.com/gofiber/fiber/v3"
	"github.com/hibiken/asynq"
)

func MockSendActiveMail() {
	// 1.创建客户端(设置 redis 链接信息)
	client := asynq.NewClient(asynq.RedisClientOpt{
		Addr: "127.0.0.1:6379",
	})
	defer client.Close()

	// 2.携带执行异步任务需要的信息
	payload := map[string]string{
		"userId": "10001",            // 用户 id
		"email":  "test@example.com", // 目标邮箱地址
	}
	taskPayload, err := json.Marshal(payload)
	if err != nil {
		fmt.Println("任务信息生成失败", err)
		return
	}

	// 3.将任务放入队列
	taskID := fmt.Sprintf("%s-%s", payload["userId"], payload["email"])
	task := asynq.NewTask("send_active_mail", taskPayload)
	info, err := client.Enqueue(task, asynq.TaskID(taskID))
	if err != nil {
		fmt.Println("任务入列失败", err)
		return
	}
	fmt.Println("任务入列成功", info)
}

func initScheduler() {
	scheduler := asynq.NewScheduler(
		asynq.RedisClientOpt{Addr: "127.0.0.1:6379"},
		&asynq.SchedulerOpts{},
	)

	// 可以使用这种 @every 2s 这种写法: 每2秒钟执行一次
	scheduler.Register("@every 2s", asynq.NewTask("print_tick", []byte("tick")))

	// 也可以使用 crontab 表达式: 每2分钟执行一次
	scheduler.Register("*/2 * * * *", asynq.NewTask("print_cron", []byte("cron")))

	if err := scheduler.Run(); err != nil {
		log.Fatalf("[error] Scheduler 异常: %v", err)
	}
}

func main() {
	app := fiber.New()

	// 0.后台启动消费者 & 调度器
	go initScheduler()
	go StartWorker()

	// 1.模拟注册接口
	app.Get("/register", func(c fiber.Ctx) error {
		MockSendActiveMail()

		// 每次响应都很慢
		return c.JSON(fiber.Map{
			"success": true,
			"message": "模拟注册接口",
			"results": "注册成功",
		})
	})

	// 2.模拟激活账户接口
	app.Get("/active", func(c fiber.Ctx) error {
		return c.JSON(fiber.Map{
			"success": true,
			"message": "模拟激活账户接口",
			"results": "激活成功",
		})
	})

	// 3.监听 fiber 服务器退出
	app.Hooks().OnPreShutdown(func() error {
		fmt.Println("=== 服务已全部退出 ===")
		ShutdownWorker()
		return nil
	})

	// 4. 启动 fiber 服务器
	go func() {
		if err := app.Listen(":3000"); err != nil {
			log.Fatalf("[server error]: %v", err)
		}
	}()

	// 5.阻塞主线程(直到 Ctrl-C 退出)
	exitChan := make(chan os.Signal, 1)
	signal.Notify(exitChan, os.Interrupt, syscall.SIGTERM)

	<-exitChan
	fmt.Println("\n === 捕获到退出信号, 开始优雅关闭 ===")
	if err := app.ShutdownWithTimeout(5 * time.Second); err != nil {
		fmt.Printf("=== Error during shutdown: %v === \n", err)
	}
}
go
package main

import (
	"context"
	"fmt"
	"log"

	"github.com/hibiken/asynq"
)

// 对于消费者来说, 不管是普通异步任务还是周期性任务, 都是一样的
func scheduleTaskHandler(ctx context.Context, t *asynq.Task) error {
	fmt.Printf("=== 开始执行计划任务, task=%s payload=%s ===\n", t.Type(), t.Payload())
	return nil
}

// 全局 worker server 实例
var workerServer *asynq.Server

// 启动 worker
func StartWorker() {
	workerServer = asynq.NewServer(
		asynq.RedisClientOpt{
			Addr: "127.0.0.1:6379",
		},
		asynq.Config{
			Concurrency: 10,
		},
	)

	mux := asynq.NewServeMux()

	// 处理周期性任务
	mux.HandleFunc("print_tick", scheduleTaskHandler)
	mux.HandleFunc("print_cron", scheduleTaskHandler)

	if err := workerServer.Run(mux); err != nil {
		log.Fatalf("[error] Worker 异常: %v", err)
	}
}

// 停止 worker
func ShutdownWorker() {
	if workerServer != nil {
		workerServer.Shutdown()
	}
}

消费者中间件

go
package main

import (
	"context"
	"fmt"
	"log"
	"time"

	"github.com/hibiken/asynq"
)

func scheduleTaskHandler(ctx context.Context, t *asynq.Task) error {
	fmt.Printf("=== 开始执行计划任务, task=%s payload=%s ===\n", t.Type(), t.Payload())
	return nil
}

// 日志中间件于http的muxServer非常类似
func loggingMiddleware(next asynq.Handler) asynq.Handler {
	return asynq.HandlerFunc(func(c context.Context, t *asynq.Task) error {
		startTime := time.Now()
		log.Printf("[info] 开始执行任务, task=%s payload=%s", t.Type(), t.Payload())

		if err := next.ProcessTask(c, t); err != nil {
			log.Printf("[error] 任务执行失败, task=%s payload=%s err=%v", t.Type(), t.Payload(), err)
			return err
		}

		log.Printf("[info] 任务执行完成, task=%s payload=%s cost=%v", t.Type(), t.Payload(), time.Since(startTime))
		return nil
	})
}

// 全局 worker server 实例
var workerServer *asynq.Server

// 启动 worker
func StartWorker() {
	workerServer = asynq.NewServer(
		asynq.RedisClientOpt{
			Addr: "127.0.0.1:6379",
		},
		asynq.Config{
			Concurrency: 10,
		},
	)

	mux := asynq.NewServeMux()

	mux.Use(loggingMiddleware) // 应用日志中间件

	mux.HandleFunc("print_tick", scheduleTaskHandler)
	mux.HandleFunc("print_cron", scheduleTaskHandler)

	if err := workerServer.Run(mux); err != nil {
		log.Fatalf("[error] Worker 异常: %v", err)
	}
}

// 停止 worker
func ShutdownWorker() {
	if workerServer != nil {
		workerServer.Shutdown()
	}
}

Released under the MIT License.