// Copyright (c) 2021 VMware, Inc. or its affiliates. All Rights Reserved. // Copyright (c) 2012-2021, Sean Treadway, SoundCloud Ltd. // Use of this source code is governed by a BSD-style // license that can be found in the LICENSE file. package amqp091 import ( "context" "sync" ) // confirms resequences and notifies one or multiple publisher confirmation listeners type confirms struct { m sync.Mutex listeners []chan Confirmation sequencer map[uint64]Confirmation deferredConfirmations *deferredConfirmations published uint64 publishedMut sync.Mutex expecting uint64 } // newConfirms allocates a confirms func newConfirms() *confirms { return &confirms{ sequencer: map[uint64]Confirmation{}, deferredConfirmations: newDeferredConfirmations(), published: 0, expecting: 1, } } func (c *confirms) Listen(l chan Confirmation) { c.m.Lock() defer c.m.Unlock() c.listeners = append(c.listeners, l) } // Publish increments the publishing counter func (c *confirms) publish() *DeferredConfirmation { c.publishedMut.Lock() defer c.publishedMut.Unlock() c.published++ return c.deferredConfirmations.Add(c.published) } // unpublish decrements the publishing counter and removes the // DeferredConfirmation. It must be called immediately after a publish fails. func (c *confirms) unpublish() { c.publishedMut.Lock() defer c.publishedMut.Unlock() c.deferredConfirmations.remove(c.published) c.published-- } // confirm confirms one publishing, increments the expecting delivery tag, and // removes bookkeeping for that delivery tag. func (c *confirms) confirm(confirmation Confirmation) { delete(c.sequencer, c.expecting) c.expecting++ for _, l := range c.listeners { l <- confirmation } } // resequence confirms any out of order delivered confirmations func (c *confirms) resequence() { c.publishedMut.Lock() defer c.publishedMut.Unlock() for c.expecting <= c.published { sequenced, found := c.sequencer[c.expecting] if !found { return } c.confirm(sequenced) } } // One confirms one publishing and all following in the publishing sequence func (c *confirms) One(confirmed Confirmation) { c.m.Lock() defer c.m.Unlock() c.deferredConfirmations.Confirm(confirmed) if c.expecting == confirmed.DeliveryTag { c.confirm(confirmed) } else { c.sequencer[confirmed.DeliveryTag] = confirmed } c.resequence() } // Multiple confirms all publishings up until the delivery tag func (c *confirms) Multiple(confirmed Confirmation) { c.m.Lock() defer c.m.Unlock() c.deferredConfirmations.ConfirmMultiple(confirmed) for c.expecting <= confirmed.DeliveryTag { c.confirm(Confirmation{c.expecting, confirmed.Ack}) } c.resequence() } // Cleans up the confirms struct and its dependencies. // Closes all listeners, discarding any out of sequence confirmations func (c *confirms) Close() error { c.m.Lock() defer c.m.Unlock() c.deferredConfirmations.Close() for _, l := range c.listeners { close(l) } c.listeners = nil return nil } type deferredConfirmations struct { m sync.Mutex confirmations map[uint64]*DeferredConfirmation } func newDeferredConfirmations() *deferredConfirmations { return &deferredConfirmations{ confirmations: map[uint64]*DeferredConfirmation{}, } } func (d *deferredConfirmations) Add(tag uint64) *DeferredConfirmation { d.m.Lock() defer d.m.Unlock() dc := &DeferredConfirmation{DeliveryTag: tag} dc.done = make(chan struct{}) d.confirmations[tag] = dc return dc } // remove is only used to drop a tag whose publish failed func (d *deferredConfirmations) remove(tag uint64) { d.m.Lock() defer d.m.Unlock() dc, found := d.confirmations[tag] if !found { return } close(dc.done) delete(d.confirmations, tag) } func (d *deferredConfirmations) Confirm(confirmation Confirmation) { d.m.Lock() defer d.m.Unlock() dc, found := d.confirmations[confirmation.DeliveryTag] if !found { // We should never receive a confirmation for a tag that hasn't // been published, but a test causes this to happen. return } dc.setAck(confirmation.Ack) delete(d.confirmations, confirmation.DeliveryTag) } func (d *deferredConfirmations) ConfirmMultiple(confirmation Confirmation) { d.m.Lock() defer d.m.Unlock() for k, v := range d.confirmations { if k <= confirmation.DeliveryTag { v.setAck(confirmation.Ack) delete(d.confirmations, k) } } } // Close nacks all pending DeferredConfirmations being blocked by dc.Wait(). func (d *deferredConfirmations) Close() { d.m.Lock() defer d.m.Unlock() for k, v := range d.confirmations { v.setAck(false) delete(d.confirmations, k) } } // setAck sets the acknowledgement status of the confirmation. Note that it must // not be called more than once. func (d *DeferredConfirmation) setAck(ack bool) { d.ack = ack close(d.done) } // Done returns the channel that can be used to wait for the publisher // confirmation. func (d *DeferredConfirmation) Done() <-chan struct{} { return d.done } // Acked returns the publisher confirmation in a non-blocking manner. It returns // false if the confirmation was not acknowledged yet or received negative // acknowledgement. func (d *DeferredConfirmation) Acked() bool { select { case <-d.done: default: return false } return d.ack } // Wait blocks until the publisher confirmation. It returns true if the server // successfully received the publishing. func (d *DeferredConfirmation) Wait() bool { <-d.done return d.ack } // WaitContext waits until the publisher confirmation. It returns true if the // server successfully received the publishing. If the context expires before // that, ctx.Err() is returned. func (d *DeferredConfirmation) WaitContext(ctx context.Context) (bool, error) { select { case <-ctx.Done(): return false, ctx.Err() case <-d.done: } return d.ack, nil }