From e17eb453ef8f20da734a219c7853a980195ddc10 Mon Sep 17 00:00:00 2001 From: Mickael BOURNEUF Date: Wed, 12 Feb 2025 21:21:19 +0100 Subject: [PATCH] Modification du client AMQP --- pkg/amqp/client.go | 98 ++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 98 insertions(+) create mode 100644 pkg/amqp/client.go diff --git a/pkg/amqp/client.go b/pkg/amqp/client.go new file mode 100644 index 0000000..7066aaa --- /dev/null +++ b/pkg/amqp/client.go @@ -0,0 +1,98 @@ +package amqp + +import ( + "crypto/tls" + "errors" + "log" + "sync" + + "deevirt.fr/compute/pkg/config" + "github.com/rabbitmq/amqp091-go" +) + +var ( + //errNotConnected = errors.New("not connected to a server") + errAlreadyClosed = errors.New("already closed: not connected to the server") + //errShutdown = errors.New("client is shutting down") +) + +type Client struct { + m *sync.Mutex + connection *amqp091.Connection + channel *amqp091.Channel + done chan bool + isReady bool +} + +func NewAMQP() (*Client, error) { + config, _ := config.NewConfig() + + amqp_config := amqp091.Config{ + Properties: amqp091.NewConnectionProperties(), + TLSClientConfig: &tls.Config{InsecureSkipVerify: true}, + } + + conn, err := amqp091.DialConfig(config.AmqpURI, amqp_config) + if err != nil { + log.Fatalf("producer: error in dial: %s", err) + } + //defer conn.Close() + + log.Println("producer: got Connection, getting Channel") + channel, err := conn.Channel() + if err != nil { + log.Fatalf("error getting a channel: %s", err) + } + //defer channel.Close() + + return &Client{ + m: &sync.Mutex{}, + connection: conn, + channel: channel, + done: make(chan bool), + isReady: true, + }, nil +} + +func (client *Client) Publisher(exchange string, key string, body []byte) { + + //log.Printf("producer: publishing %dB body (%q)", len(*body), *body) + _, err := client.channel.PublishWithDeferredConfirm( + exchange, + key, + true, + false, + amqp091.Publishing{ + ContentType: "application/json", + DeliveryMode: amqp091.Persistent, + Body: body, + }, + ) + if err != nil { + log.Fatalf("producer: error in publish: %s", err) + } + +} + +func (client *Client) Close() error { + client.m.Lock() + // we read and write isReady in two locations, so we grab the lock and hold onto + // it until we are finished + defer client.m.Unlock() + + if !client.isReady { + return errAlreadyClosed + } + close(client.done) + err := client.channel.Close() + if err != nil { + return err + } + err = client.connection.Close() + if err != nil { + return err + } + + client.isReady = false + return nil +}