feat: 新增周期性交易处理器及任务队列,实现周期性交易的自动化处理、并发安全与幂等性调度。

This commit is contained in:
2026-01-28 16:40:57 +08:00
parent 297d2715b8
commit 5a0ae912d5
2 changed files with 17 additions and 5 deletions

View File

@@ -49,7 +49,7 @@ func (h *RecurringTransactionHandler) Handle(ctx context.Context, task *DelayedT
// 幂等性检查:防止同一笔交易重复执行 // 幂等性检查:防止同一笔交易重复执行
// 使用 Redis SETNX 锁,锁 24 小时(足以覆盖重试窗口) // 使用 Redis SETNX 锁,锁 24 小时(足以覆盖重试窗口)
// TaskID 本身包含 recurringID 和执行时间戳,是天然的幂等 Key // TaskID 本身包含 recurringID 和执行时间戳,是天然的幂等 Key
lockKey := fmt.Sprintf("novault:lock:recurring:%s", task.ID) lockKey := GenerateLockKey(task.ID)
// 使用 TaskQueue 提供的公开方法获取锁 // 使用 TaskQueue 提供的公开方法获取锁
isNew, err := h.taskQueue.AcquireLock(ctx, lockKey, 24*time.Hour) isNew, err := h.taskQueue.AcquireLock(ctx, lockKey, 24*time.Hour)
if err != nil { if err != nil {

View File

@@ -18,6 +18,13 @@ const (
TaskTypeRecurringTransaction TaskType = "recurring_transaction" TaskTypeRecurringTransaction TaskType = "recurring_transaction"
// TaskTypeAllocationRule 分配规则执行任务 // TaskTypeAllocationRule 分配规则执行任务
TaskTypeAllocationRule TaskType = "allocation_rule" TaskTypeAllocationRule TaskType = "allocation_rule"
// Redis Key Prefixes
KeyPrefixDefault = "novault:tasks"
KeySuffixDelayed = ":delayed"
KeySuffixReady = ":ready"
KeySuffixProcessing = ":processing"
KeyPrefixLock = "novault:lock:"
) )
// DelayedTask 延迟任务结构 // DelayedTask 延迟任务结构
@@ -50,17 +57,22 @@ type TaskQueue struct {
// NewTaskQueue 创建任务队列实例 // NewTaskQueue 创建任务队列实例
func NewTaskQueue(client *redis.Client, keyPrefix string) *TaskQueue { func NewTaskQueue(client *redis.Client, keyPrefix string) *TaskQueue {
if keyPrefix == "" { if keyPrefix == "" {
keyPrefix = "novault:tasks" keyPrefix = KeyPrefixDefault
} }
return &TaskQueue{ return &TaskQueue{
client: client, client: client,
keyPrefix: keyPrefix, keyPrefix: keyPrefix,
delayedKey: keyPrefix + ":delayed", delayedKey: keyPrefix + KeySuffixDelayed,
readyKey: keyPrefix + ":ready", readyKey: keyPrefix + KeySuffixReady,
processingKey: keyPrefix + ":processing", processingKey: keyPrefix + KeySuffixProcessing,
} }
} }
// GenerateLockKey 生成分布式锁 Key
func GenerateLockKey(taskID string) string {
return KeyPrefixLock + "recurring:" + taskID
}
// Schedule 调度延迟任务 // Schedule 调度延迟任务
func (q *TaskQueue) Schedule(ctx context.Context, task *DelayedTask) error { func (q *TaskQueue) Schedule(ctx context.Context, task *DelayedTask) error {
// 序列化任务 // 序列化任务