Modification du client AMQP
This commit is contained in:
parent
a8fb718465
commit
e17eb453ef
98
pkg/amqp/client.go
Normal file
98
pkg/amqp/client.go
Normal file
@ -0,0 +1,98 @@
|
||||
package amqp
|
||||
|
||||
import (
|
||||
"crypto/tls"
|
||||
"errors"
|
||||
"log"
|
||||
"sync"
|
||||
|
||||
"deevirt.fr/compute/pkg/config"
|
||||
"github.com/rabbitmq/amqp091-go"
|
||||
)
|
||||
|
||||
var (
|
||||
//errNotConnected = errors.New("not connected to a server")
|
||||
errAlreadyClosed = errors.New("already closed: not connected to the server")
|
||||
//errShutdown = errors.New("client is shutting down")
|
||||
)
|
||||
|
||||
type Client struct {
|
||||
m *sync.Mutex
|
||||
connection *amqp091.Connection
|
||||
channel *amqp091.Channel
|
||||
done chan bool
|
||||
isReady bool
|
||||
}
|
||||
|
||||
func NewAMQP() (*Client, error) {
|
||||
config, _ := config.NewConfig()
|
||||
|
||||
amqp_config := amqp091.Config{
|
||||
Properties: amqp091.NewConnectionProperties(),
|
||||
TLSClientConfig: &tls.Config{InsecureSkipVerify: true},
|
||||
}
|
||||
|
||||
conn, err := amqp091.DialConfig(config.AmqpURI, amqp_config)
|
||||
if err != nil {
|
||||
log.Fatalf("producer: error in dial: %s", err)
|
||||
}
|
||||
//defer conn.Close()
|
||||
|
||||
log.Println("producer: got Connection, getting Channel")
|
||||
channel, err := conn.Channel()
|
||||
if err != nil {
|
||||
log.Fatalf("error getting a channel: %s", err)
|
||||
}
|
||||
//defer channel.Close()
|
||||
|
||||
return &Client{
|
||||
m: &sync.Mutex{},
|
||||
connection: conn,
|
||||
channel: channel,
|
||||
done: make(chan bool),
|
||||
isReady: true,
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (client *Client) Publisher(exchange string, key string, body []byte) {
|
||||
|
||||
//log.Printf("producer: publishing %dB body (%q)", len(*body), *body)
|
||||
_, err := client.channel.PublishWithDeferredConfirm(
|
||||
exchange,
|
||||
key,
|
||||
true,
|
||||
false,
|
||||
amqp091.Publishing{
|
||||
ContentType: "application/json",
|
||||
DeliveryMode: amqp091.Persistent,
|
||||
Body: body,
|
||||
},
|
||||
)
|
||||
if err != nil {
|
||||
log.Fatalf("producer: error in publish: %s", err)
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
func (client *Client) Close() error {
|
||||
client.m.Lock()
|
||||
// we read and write isReady in two locations, so we grab the lock and hold onto
|
||||
// it until we are finished
|
||||
defer client.m.Unlock()
|
||||
|
||||
if !client.isReady {
|
||||
return errAlreadyClosed
|
||||
}
|
||||
close(client.done)
|
||||
err := client.channel.Close()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
err = client.connection.Close()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
client.isReady = false
|
||||
return nil
|
||||
}
|
Loading…
x
Reference in New Issue
Block a user