99 lines
2.0 KiB
Go
99 lines
2.0 KiB
Go
package amqp
|
|
|
|
import (
|
|
"crypto/tls"
|
|
"errors"
|
|
"log"
|
|
"sync"
|
|
|
|
"deevirt.fr/compute/pkg/config"
|
|
"github.com/rabbitmq/amqp091-go"
|
|
)
|
|
|
|
var (
|
|
//errNotConnected = errors.New("not connected to a server")
|
|
errAlreadyClosed = errors.New("already closed: not connected to the server")
|
|
//errShutdown = errors.New("client is shutting down")
|
|
)
|
|
|
|
type Client struct {
|
|
m *sync.Mutex
|
|
connection *amqp091.Connection
|
|
channel *amqp091.Channel
|
|
done chan bool
|
|
isReady bool
|
|
}
|
|
|
|
func NewAMQP() (*Client, error) {
|
|
config, _ := config.New()
|
|
|
|
amqp_config := amqp091.Config{
|
|
Properties: amqp091.NewConnectionProperties(),
|
|
TLSClientConfig: &tls.Config{InsecureSkipVerify: true},
|
|
}
|
|
|
|
conn, err := amqp091.DialConfig(config.AmqpURI, amqp_config)
|
|
if err != nil {
|
|
log.Fatalf("producer: error in dial: %s", err)
|
|
}
|
|
//defer conn.Close()
|
|
|
|
log.Println("producer: got Connection, getting Channel")
|
|
channel, err := conn.Channel()
|
|
if err != nil {
|
|
log.Fatalf("error getting a channel: %s", err)
|
|
}
|
|
//defer channel.Close()
|
|
|
|
return &Client{
|
|
m: &sync.Mutex{},
|
|
connection: conn,
|
|
channel: channel,
|
|
done: make(chan bool),
|
|
isReady: true,
|
|
}, nil
|
|
}
|
|
|
|
func (client *Client) Publisher(exchange string, key string, body []byte) {
|
|
|
|
//log.Printf("producer: publishing %dB body (%q)", len(*body), *body)
|
|
_, err := client.channel.PublishWithDeferredConfirm(
|
|
exchange,
|
|
key,
|
|
true,
|
|
false,
|
|
amqp091.Publishing{
|
|
ContentType: "application/json",
|
|
DeliveryMode: amqp091.Persistent,
|
|
Body: body,
|
|
},
|
|
)
|
|
if err != nil {
|
|
log.Fatalf("producer: error in publish: %s", err)
|
|
}
|
|
|
|
}
|
|
|
|
func (client *Client) Close() error {
|
|
client.m.Lock()
|
|
// we read and write isReady in two locations, so we grab the lock and hold onto
|
|
// it until we are finished
|
|
defer client.m.Unlock()
|
|
|
|
if !client.isReady {
|
|
return errAlreadyClosed
|
|
}
|
|
close(client.done)
|
|
err := client.channel.Close()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
err = client.connection.Close()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
client.isReady = false
|
|
return nil
|
|
}
|