package amqp import ( "crypto/tls" "errors" "log" "sync" "deevirt.fr/compute/pkg/config" "github.com/rabbitmq/amqp091-go" ) var ( //errNotConnected = errors.New("not connected to a server") errAlreadyClosed = errors.New("already closed: not connected to the server") //errShutdown = errors.New("client is shutting down") ) type Client struct { m *sync.Mutex connection *amqp091.Connection channel *amqp091.Channel done chan bool isReady bool } func NewAMQP() (*Client, error) { config, _ := config.New() amqp_config := amqp091.Config{ Properties: amqp091.NewConnectionProperties(), TLSClientConfig: &tls.Config{InsecureSkipVerify: true}, } conn, err := amqp091.DialConfig(config.AmqpURI, amqp_config) if err != nil { log.Fatalf("producer: error in dial: %s", err) } //defer conn.Close() log.Println("producer: got Connection, getting Channel") channel, err := conn.Channel() if err != nil { log.Fatalf("error getting a channel: %s", err) } //defer channel.Close() return &Client{ m: &sync.Mutex{}, connection: conn, channel: channel, done: make(chan bool), isReady: true, }, nil } func (client *Client) Publisher(exchange string, key string, body []byte) { //log.Printf("producer: publishing %dB body (%q)", len(*body), *body) _, err := client.channel.PublishWithDeferredConfirm( exchange, key, true, false, amqp091.Publishing{ ContentType: "application/json", DeliveryMode: amqp091.Persistent, Body: body, }, ) if err != nil { log.Fatalf("producer: error in publish: %s", err) } } func (client *Client) Close() error { client.m.Lock() // we read and write isReady in two locations, so we grab the lock and hold onto // it until we are finished defer client.m.Unlock() if !client.isReady { return errAlreadyClosed } close(client.done) err := client.channel.Close() if err != nil { return err } err = client.connection.Close() if err != nil { return err } client.isReady = false return nil }