main

mattermost/focalboard

Last updated at: 29/12/2023 09:46

callbackqueue.go

TLDR

The callbackqueue.go file in the server/utils package provides a CallbackQueue struct and several methods to enqueue and process callback functions in a thread pool.

Methods

NewCallbackQueue

Creates a new CallbackQueue and starts a thread pool to service it.

Shutdown

Stops accepting enqueues and exits all pool threads. This method waits as long as the context allows for the threads to exit. Returns true if the pool exited, false on timeout.

Enqueue

Adds a callback function to the queue.

Classes

None

package utils

import (
	"context"
	"runtime/debug"
	"sync/atomic"
	"time"

	"github.com/mattermost/mattermost-server/v6/shared/mlog"
)

// CallbackFunc is a func that can enqueued in the callback queue and will be
// called when dequeued.
type CallbackFunc func() error

// CallbackQueue provides a simple thread pool for processing callbacks. Callbacks will
// be executed in the order in which they are enqueued, but no guarantees are provided
// regarding the order in which they finish (unless poolSize == 1).
type CallbackQueue struct {
	name     string
	poolSize int

	queue chan CallbackFunc
	done  chan struct{}
	alive chan int

	idone uint32

	logger mlog.LoggerIFace
}

// NewCallbackQueue creates a new CallbackQueue and starts a thread pool to service it.
func NewCallbackQueue(name string, queueSize int, poolSize int, logger mlog.LoggerIFace) *CallbackQueue {
	cn := &CallbackQueue{
		name:     name,
		poolSize: poolSize,
		queue:    make(chan CallbackFunc, queueSize),
		done:     make(chan struct{}),
		alive:    make(chan int, poolSize),
		logger:   logger,
	}

	for i := 0; i < poolSize; i++ {
		go cn.loop(i)
	}

	return cn
}

// Shutdown stops accepting enqueues and exits all pool threads. This method waits
// as long as the context allows for the threads to exit.
// Returns true if the pool exited, false on timeout.
func (cn *CallbackQueue) Shutdown(context context.Context) bool {
	if !atomic.CompareAndSwapUint32(&cn.idone, 0, 1) {
		// already shutdown
		return true
	}

	// signal threads to exit
	close(cn.done)

	// wait for the threads to exit or timeout
	count := 0
	for count < cn.poolSize {
		select {
		case <-cn.alive:
			count++
		case <-context.Done():
			return false
		}
	}

	// try to drain any remaining callbacks
	for {
		select {
		case f := <-cn.queue:
			cn.exec(f)
		case <-context.Done():
			return false
		default:
			return true
		}
	}
}

// Enqueue adds a callback to the queue.
func (cn *CallbackQueue) Enqueue(f CallbackFunc) {
	if atomic.LoadUint32(&cn.idone) != 0 {
		cn.logger.Debug("CallbackQueue skipping enqueue, notifier is shutdown", mlog.String("name", cn.name))
		return
	}

	select {
	case cn.queue <- f:
	default:
		start := time.Now()
		cn.queue <- f
		dur := time.Since(start)
		cn.logger.Warn("CallbackQueue queue backlog", mlog.String("name", cn.name), mlog.Duration("wait_time", dur))
	}
}

func (cn *CallbackQueue) loop(id int) {
	defer func() {
		cn.logger.Trace("CallbackQueue thread exited", mlog.String("name", cn.name), mlog.Int("id", id))
		cn.alive <- id
	}()

	for {
		select {
		case f := <-cn.queue:
			cn.exec(f)
		case <-cn.done:
			return
		}
	}
}

func (cn *CallbackQueue) exec(f CallbackFunc) {
	// don't let a panic in the callback exit the thread.
	defer func() {
		if r := recover(); r != nil {
			stack := debug.Stack()
			cn.logger.Error("CallbackQueue callback panic",
				mlog.String("name", cn.name),
				mlog.Any("panic", r),
				mlog.String("stack", string(stack)),
			)
		}
	}()

	if err := f(); err != nil {
		cn.logger.Error("CallbackQueue callback error", mlog.String("name", cn.name), mlog.Err(err))
	}
}