callbackqueue.go
TLDR
The callbackqueue.go
file in the server/utils
package provides a CallbackQueue struct and several methods to enqueue and process callback functions in a thread pool.
Methods
NewCallbackQueue
Creates a new CallbackQueue and starts a thread pool to service it.
Shutdown
Stops accepting enqueues and exits all pool threads. This method waits as long as the context allows for the threads to exit. Returns true if the pool exited, false on timeout.
Enqueue
Adds a callback function to the queue.
Classes
None
package utils
import (
"context"
"runtime/debug"
"sync/atomic"
"time"
"github.com/mattermost/mattermost-server/v6/shared/mlog"
)
// CallbackFunc is a func that can enqueued in the callback queue and will be
// called when dequeued.
type CallbackFunc func() error
// CallbackQueue provides a simple thread pool for processing callbacks. Callbacks will
// be executed in the order in which they are enqueued, but no guarantees are provided
// regarding the order in which they finish (unless poolSize == 1).
type CallbackQueue struct {
name string
poolSize int
queue chan CallbackFunc
done chan struct{}
alive chan int
idone uint32
logger mlog.LoggerIFace
}
// NewCallbackQueue creates a new CallbackQueue and starts a thread pool to service it.
func NewCallbackQueue(name string, queueSize int, poolSize int, logger mlog.LoggerIFace) *CallbackQueue {
cn := &CallbackQueue{
name: name,
poolSize: poolSize,
queue: make(chan CallbackFunc, queueSize),
done: make(chan struct{}),
alive: make(chan int, poolSize),
logger: logger,
}
for i := 0; i < poolSize; i++ {
go cn.loop(i)
}
return cn
}
// Shutdown stops accepting enqueues and exits all pool threads. This method waits
// as long as the context allows for the threads to exit.
// Returns true if the pool exited, false on timeout.
func (cn *CallbackQueue) Shutdown(context context.Context) bool {
if !atomic.CompareAndSwapUint32(&cn.idone, 0, 1) {
// already shutdown
return true
}
// signal threads to exit
close(cn.done)
// wait for the threads to exit or timeout
count := 0
for count < cn.poolSize {
select {
case <-cn.alive:
count++
case <-context.Done():
return false
}
}
// try to drain any remaining callbacks
for {
select {
case f := <-cn.queue:
cn.exec(f)
case <-context.Done():
return false
default:
return true
}
}
}
// Enqueue adds a callback to the queue.
func (cn *CallbackQueue) Enqueue(f CallbackFunc) {
if atomic.LoadUint32(&cn.idone) != 0 {
cn.logger.Debug("CallbackQueue skipping enqueue, notifier is shutdown", mlog.String("name", cn.name))
return
}
select {
case cn.queue <- f:
default:
start := time.Now()
cn.queue <- f
dur := time.Since(start)
cn.logger.Warn("CallbackQueue queue backlog", mlog.String("name", cn.name), mlog.Duration("wait_time", dur))
}
}
func (cn *CallbackQueue) loop(id int) {
defer func() {
cn.logger.Trace("CallbackQueue thread exited", mlog.String("name", cn.name), mlog.Int("id", id))
cn.alive <- id
}()
for {
select {
case f := <-cn.queue:
cn.exec(f)
case <-cn.done:
return
}
}
}
func (cn *CallbackQueue) exec(f CallbackFunc) {
// don't let a panic in the callback exit the thread.
defer func() {
if r := recover(); r != nil {
stack := debug.Stack()
cn.logger.Error("CallbackQueue callback panic",
mlog.String("name", cn.name),
mlog.Any("panic", r),
mlog.String("stack", string(stack)),
)
}
}()
if err := f(); err != nil {
cn.logger.Error("CallbackQueue callback error", mlog.String("name", cn.name), mlog.Err(err))
}
}