imap: monitor the logout channel with an observer

Untangle the observer functionality from the message handling routine.
Observe the imap client's logout channel and trigger a connection error
when necessary to start the reconnect cycle.

Signed-off-by: Koni Marti <koni.marti@gmail.com>
Acked-by: Robin Jarry <robin@jarry.cc>
This commit is contained in:
Koni Marti 2022-04-30 01:08:56 +02:00 committed by Robin Jarry
parent 397a6f267f
commit e5b339702a
3 changed files with 186 additions and 86 deletions

View File

@ -50,6 +50,9 @@ func (w *IMAPWorker) handleConfigure(msg *types.Configure) error {
w.config.keepalive_period = 0 * time.Second w.config.keepalive_period = 0 * time.Second
w.config.keepalive_probes = 3 w.config.keepalive_probes = 3
w.config.keepalive_interval = 3 w.config.keepalive_interval = 3
w.config.reconnect_maxwait = 30 * time.Second
for key, value := range msg.Config.Params { for key, value := range msg.Config.Params {
switch key { switch key {
case "idle-timeout": case "idle-timeout":
@ -60,6 +63,14 @@ func (w *IMAPWorker) handleConfigure(msg *types.Configure) error {
value, err) value, err)
} }
w.config.idle_timeout = val w.config.idle_timeout = val
case "reconnect-maxwait":
val, err := time.ParseDuration(value)
if err != nil || val < 0 {
return fmt.Errorf(
"invalid reconnect-maxwait value %v: %v",
value, err)
}
w.config.reconnect_maxwait = val
case "connection-timeout": case "connection-timeout":
val, err := time.ParseDuration(value) val, err := time.ParseDuration(value)
if err != nil || val < 0 { if err != nil || val < 0 {
@ -96,6 +107,7 @@ func (w *IMAPWorker) handleConfigure(msg *types.Configure) error {
} }
w.idler = newIdler(w.config, w.worker) w.idler = newIdler(w.config, w.worker)
w.observer = newObserver(w.config, w.worker)
return nil return nil
} }

152
worker/imap/observer.go Normal file
View File

@ -0,0 +1,152 @@
package imap
import (
"fmt"
"math"
"sync"
"time"
"git.sr.ht/~rjarry/aerc/worker/types"
"github.com/emersion/go-imap"
)
// observer monitors the loggedOut channel of the imap client. If the logout
// signal is received, the observer will emit a connection error to the ui in
// order to start the reconnect cycle.
type observer struct {
sync.Mutex
config imapConfig
client *imapClient
worker *types.Worker
done chan struct{}
autoReconnect bool
retries int
running bool
}
func newObserver(cfg imapConfig, w *types.Worker) *observer {
return &observer{config: cfg, worker: w, done: make(chan struct{})}
}
func (o *observer) SetClient(c *imapClient) {
o.Stop()
o.Lock()
o.client = c
o.Unlock()
o.Start()
o.retries = 0
}
func (o *observer) SetAutoReconnect(auto bool) {
o.autoReconnect = auto
}
func (o *observer) AutoReconnect() bool {
return o.autoReconnect
}
func (o *observer) isClientConnected() bool {
o.Lock()
defer o.Unlock()
return o.client != nil && o.client.State() == imap.SelectedState
}
func (o *observer) EmitIfNotConnected() bool {
if !o.isClientConnected() {
o.emit("imap client not connected: attempt reconnect")
return true
}
return false
}
func (o *observer) IsRunning() bool {
return o.running
}
func (o *observer) Start() {
if o.running {
o.log("runs already")
return
}
if o.client == nil {
return
}
if o.EmitIfNotConnected() {
return
}
go func() {
select {
case <-o.client.LoggedOut():
o.log("<-logout")
if o.autoReconnect {
o.emit("logged out")
} else {
o.log("ignore logout (auto-reconnect off)")
}
case <-o.done:
o.log("<-done")
}
o.running = false
o.log("stopped")
}()
o.running = true
o.log("started")
}
func (o *observer) Stop() {
if o.client == nil {
return
}
if o.done != nil {
close(o.done)
}
o.done = make(chan struct{})
o.running = false
}
func (o *observer) DelayedReconnect() error {
if o.client == nil {
return nil
}
var wait time.Duration
var reterr error
if o.retries > 0 {
backoff := int(math.Pow(1.8, float64(o.retries)))
var err error
wait, err = time.ParseDuration(fmt.Sprintf("%ds", backoff))
if err != nil {
return err
}
if wait > o.config.reconnect_maxwait {
wait = o.config.reconnect_maxwait
}
reterr = fmt.Errorf("reconnect in %v", wait)
} else {
reterr = fmt.Errorf("reconnect")
}
go func() {
<-time.After(wait)
o.emit(reterr.Error())
}()
o.retries++
return reterr
}
func (o *observer) emit(errMsg string) {
o.log("disconnect done->")
o.worker.PostMessage(&types.Done{
Message: types.RespondTo(&types.Disconnect{})}, nil)
o.log("connection error->")
o.worker.PostMessage(&types.ConnError{
Error: fmt.Errorf(errMsg),
}, nil)
}
func (o *observer) log(args ...interface{}) {
header := fmt.Sprintf("observer (%p) [running:%t]", o, o.running)
o.worker.Logger.Println(append([]interface{}{header}, args...)...)
}

View File

@ -3,7 +3,6 @@ package imap
import ( import (
"crypto/tls" "crypto/tls"
"fmt" "fmt"
"math"
"net" "net"
"net/url" "net/url"
"time" "time"
@ -44,6 +43,7 @@ type imapConfig struct {
folders []string folders []string
oauthBearer lib.OAuthBearer oauthBearer lib.OAuthBearer
idle_timeout time.Duration idle_timeout time.Duration
reconnect_maxwait time.Duration
// tcp connection parameters // tcp connection parameters
connection_timeout time.Duration connection_timeout time.Duration
keepalive_period time.Duration keepalive_period time.Duration
@ -61,11 +61,8 @@ type IMAPWorker struct {
// Map of sequence numbers to UIDs, index 0 is seq number 1 // Map of sequence numbers to UIDs, index 0 is seq number 1
seqMap []uint32 seqMap []uint32
done chan struct{}
autoReconnect bool
retries int
idler *idler idler *idler
observer *observer
} }
func NewIMAPWorker(worker *types.Worker) (types.Backend, error) { func NewIMAPWorker(worker *types.Worker) (types.Backend, error) {
@ -74,6 +71,7 @@ func NewIMAPWorker(worker *types.Worker) (types.Backend, error) {
worker: worker, worker: worker,
selected: &imap.MailboxStatus{}, selected: &imap.MailboxStatus{},
idler: newIdler(imapConfig{}, worker), idler: newIdler(imapConfig{}, worker),
observer: newObserver(imapConfig{}, worker),
}, nil }, nil
} }
@ -81,6 +79,7 @@ func (w *IMAPWorker) newClient(c *client.Client) {
c.Updates = w.updates c.Updates = w.updates
w.client = &imapClient{c, sortthread.NewThreadClient(c), sortthread.NewSortClient(c)} w.client = &imapClient{c, sortthread.NewThreadClient(c), sortthread.NewSortClient(c)}
w.idler.SetClient(w.client) w.idler.SetClient(w.client)
w.observer.SetClient(w.client)
} }
func (w *IMAPWorker) handleMessage(msg types.WorkerMessage) error { func (w *IMAPWorker) handleMessage(msg types.WorkerMessage) error {
@ -93,12 +92,6 @@ func (w *IMAPWorker) handleMessage(msg types.WorkerMessage) error {
var reterr error // will be returned at the end, needed to support idle var reterr error // will be returned at the end, needed to support idle
checkConn := func(wait time.Duration) {
time.Sleep(wait)
w.stopConnectionObserver()
w.startConnectionObserver()
}
// set connection timeout for calls to imap server // set connection timeout for calls to imap server
if w.client != nil { if w.client != nil {
w.client.Timeout = w.config.connection_timeout w.client.Timeout = w.config.connection_timeout
@ -111,53 +104,43 @@ func (w *IMAPWorker) handleMessage(msg types.WorkerMessage) error {
reterr = w.handleConfigure(msg) reterr = w.handleConfigure(msg)
case *types.Connect: case *types.Connect:
if w.client != nil && w.client.State() == imap.SelectedState { if w.client != nil && w.client.State() == imap.SelectedState {
if !w.autoReconnect { if !w.observer.AutoReconnect() {
w.autoReconnect = true w.observer.SetAutoReconnect(true)
checkConn(0) w.observer.EmitIfNotConnected()
} }
reterr = errAlreadyConnected reterr = errAlreadyConnected
break break
} }
w.autoReconnect = true w.observer.SetAutoReconnect(true)
c, err := w.connect() c, err := w.connect()
if err != nil { if err != nil {
checkConn(0) w.observer.EmitIfNotConnected()
reterr = err reterr = err
break break
} }
w.stopConnectionObserver()
w.newClient(c) w.newClient(c)
w.startConnectionObserver()
w.worker.PostMessage(&types.Done{Message: types.RespondTo(msg)}, nil) w.worker.PostMessage(&types.Done{Message: types.RespondTo(msg)}, nil)
case *types.Reconnect: case *types.Reconnect:
if !w.autoReconnect { if !w.observer.AutoReconnect() {
reterr = fmt.Errorf("auto-reconnect is disabled; run connect to enable it") reterr = fmt.Errorf("auto-reconnect is disabled; run connect to enable it")
break break
} }
c, err := w.connect() c, err := w.connect()
if err != nil { if err != nil {
wait, msg := w.exponentialBackoff() errReconnect := w.observer.DelayedReconnect()
go checkConn(wait) reterr = errors.Wrap(errReconnect, err.Error())
w.retries++
reterr = errors.Wrap(err, msg)
break break
} }
w.stopConnectionObserver()
w.newClient(c) w.newClient(c)
w.startConnectionObserver()
w.worker.PostMessage(&types.Done{Message: types.RespondTo(msg)}, nil) w.worker.PostMessage(&types.Done{Message: types.RespondTo(msg)}, nil)
case *types.Disconnect: case *types.Disconnect:
w.autoReconnect = false w.observer.SetAutoReconnect(false)
w.stopConnectionObserver() w.observer.Stop()
if w.client == nil || w.client.State() != imap.SelectedState { if w.client == nil || w.client.State() != imap.SelectedState {
reterr = errNotConnected reterr = errNotConnected
break break
@ -267,51 +250,6 @@ func (w *IMAPWorker) handleImapUpdate(update client.Update) {
} }
} }
func (w *IMAPWorker) exponentialBackoff() (time.Duration, string) {
maxWait := 16
if w.retries > 0 {
backoff := int(math.Pow(2.0, float64(w.retries)))
if backoff > maxWait {
backoff = maxWait
}
waitStr := fmt.Sprintf("%ds", backoff)
wait, err := time.ParseDuration(waitStr)
if err == nil {
return wait, fmt.Sprintf("wait %s before reconnect", waitStr)
}
}
return 0 * time.Second, ""
}
func (w *IMAPWorker) startConnectionObserver() {
emitConnErr := func(errMsg string) {
w.worker.PostMessage(&types.ConnError{
Error: fmt.Errorf(errMsg),
}, nil)
}
if w.client == nil {
emitConnErr("imap client not connected")
return
}
go func() {
select {
case <-w.client.LoggedOut():
if w.autoReconnect {
emitConnErr("imap: logged out")
}
case <-w.done:
return
}
}()
}
func (w *IMAPWorker) stopConnectionObserver() {
if w.done != nil {
close(w.done)
}
w.done = make(chan struct{})
}
func (w *IMAPWorker) connect() (*client.Client, error) { func (w *IMAPWorker) connect() (*client.Client, error) {
var ( var (
conn *net.TCPConn conn *net.TCPConn
@ -391,8 +329,6 @@ func (w *IMAPWorker) connect() (*client.Client, error) {
return nil, err return nil, err
} }
w.retries = 0
return c, nil return c, nil
} }