Suppression de AMQP des moniteurs, et renvoi de tous les évènements vers le manager

This commit is contained in:
Mickael BOURNEUF 2025-02-20 12:16:19 +01:00
parent 8ead0d1bf6
commit dac1de28b6
8 changed files with 239 additions and 275 deletions

View File

@ -1,240 +0,0 @@
package events
import (
"context"
"encoding/json"
"encoding/xml"
"fmt"
"log"
"strings"
"time"
"deevirt.fr/compute/pkg/amqp"
"deevirt.fr/compute/pkg/config"
"deevirt.fr/compute/pkg/schema"
clientv3 "go.etcd.io/etcd/client/v3"
"libvirt.org/go/libvirt"
)
func AgentLifecycle(c *libvirt.Connect, d *libvirt.Domain, event *libvirt.DomainEventAgentLifecycle) {
println(event.State)
println(event.Reason)
}
func Graphics(c *libvirt.Connect, d *libvirt.Domain, event *libvirt.DomainEventGraphics) {
println(event.String())
}
func JobCompleted(c *libvirt.Connect, d *libvirt.Domain, e *libvirt.DomainEventJobCompleted) {
println(e.Info.DataRemaining)
}
func MigrationIteration(c *libvirt.Connect, d *libvirt.Domain, e *libvirt.DomainEventMigrationIteration) {
println(e.Iteration)
}
func Lifecyle(c *libvirt.Connect, d *libvirt.Domain, e *libvirt.DomainEventLifecycle) {
var detail, event string
config, _ := config.New()
domainID, _ := d.GetUUIDString()
etcd, err := clientv3.New(clientv3.Config{
Endpoints: strings.Split(config.EtcdURI, ","),
DialTimeout: 5 * time.Second,
})
if err != nil {
log.Fatalf("Error connexion to etcd: %v", err)
}
defer etcd.Close()
switch e.Event {
case libvirt.DOMAIN_EVENT_DEFINED:
event = "defined"
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
etcd.Put(ctx, "/cluster/"+config.ClusterID+"/host/"+config.NodeID+"/qemu/"+domainID, "")
cancel()
switch libvirt.DomainEventDefinedDetailType(e.Detail) {
case libvirt.DOMAIN_EVENT_DEFINED_ADDED:
detail = "added"
case libvirt.DOMAIN_EVENT_DEFINED_UPDATED:
detail = "updated"
case libvirt.DOMAIN_EVENT_DEFINED_RENAMED:
detail = "renamed"
case libvirt.DOMAIN_EVENT_DEFINED_FROM_SNAPSHOT:
detail = "snapshot"
default:
detail = "unknown"
}
case libvirt.DOMAIN_EVENT_UNDEFINED:
event = "undefined"
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
etcd.Delete(ctx, "/cluster/"+config.ClusterID+"/host/"+config.NodeID+"/qemu/"+domainID)
cancel()
switch libvirt.DomainEventUndefinedDetailType(e.Detail) {
case libvirt.DOMAIN_EVENT_UNDEFINED_REMOVED:
detail = "removed"
case libvirt.DOMAIN_EVENT_UNDEFINED_RENAMED:
detail = "renamed"
default:
detail = "unknown"
}
case libvirt.DOMAIN_EVENT_STARTED:
event = "started"
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
etcd.Put(ctx, "/cluster/"+config.ClusterID+"/domain/"+domainID+"/state", "2")
cancel()
switch libvirt.DomainEventStartedDetailType(e.Detail) {
case libvirt.DOMAIN_EVENT_STARTED_BOOTED:
detail = "booted"
case libvirt.DOMAIN_EVENT_STARTED_MIGRATED:
detail = "migrated"
case libvirt.DOMAIN_EVENT_STARTED_RESTORED:
detail = "restored"
case libvirt.DOMAIN_EVENT_STARTED_FROM_SNAPSHOT:
detail = "snapshot"
case libvirt.DOMAIN_EVENT_STARTED_WAKEUP:
detail = "wakeup"
default:
detail = "unknown"
}
case libvirt.DOMAIN_EVENT_SUSPENDED:
event = "suspended"
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
etcd.Put(ctx, "/cluster/"+config.ClusterID+"/domain/"+domainID+"/state", "3")
cancel()
switch libvirt.DomainEventSuspendedDetailType(e.Detail) {
case libvirt.DOMAIN_EVENT_SUSPENDED_PAUSED:
detail = "paused"
case libvirt.DOMAIN_EVENT_SUSPENDED_MIGRATED:
detail = "migrated"
case libvirt.DOMAIN_EVENT_SUSPENDED_IOERROR:
detail = "I/O error"
case libvirt.DOMAIN_EVENT_SUSPENDED_WATCHDOG:
detail = "watchdog"
case libvirt.DOMAIN_EVENT_SUSPENDED_RESTORED:
detail = "restored"
case libvirt.DOMAIN_EVENT_SUSPENDED_FROM_SNAPSHOT:
detail = "snapshot"
case libvirt.DOMAIN_EVENT_SUSPENDED_API_ERROR:
detail = "api error"
case libvirt.DOMAIN_EVENT_SUSPENDED_POSTCOPY:
detail = "postcopy"
case libvirt.DOMAIN_EVENT_SUSPENDED_POSTCOPY_FAILED:
detail = "postcopy failed"
default:
detail = "unknown"
}
case libvirt.DOMAIN_EVENT_RESUMED:
event = "resumed"
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
etcd.Put(ctx, "/cluster/"+config.ClusterID+"/domain/"+domainID+"/state", "4")
cancel()
switch libvirt.DomainEventResumedDetailType(e.Detail) {
case libvirt.DOMAIN_EVENT_RESUMED_UNPAUSED:
detail = "unpaused"
case libvirt.DOMAIN_EVENT_RESUMED_MIGRATED:
detail = "migrated"
case libvirt.DOMAIN_EVENT_RESUMED_FROM_SNAPSHOT:
detail = "snapshot"
case libvirt.DOMAIN_EVENT_RESUMED_POSTCOPY:
detail = "postcopy"
case libvirt.DOMAIN_EVENT_RESUMED_POSTCOPY_FAILED:
detail = "postcopy failed"
default:
detail = "unknown"
}
case libvirt.DOMAIN_EVENT_STOPPED:
event = "stopped"
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
etcd.Put(ctx, "/cluster/"+config.ClusterID+"/domain/"+domainID+"/state", "5")
cancel()
switch libvirt.DomainEventStoppedDetailType(e.Detail) {
case libvirt.DOMAIN_EVENT_STOPPED_SHUTDOWN:
detail = "shutdown"
case libvirt.DOMAIN_EVENT_STOPPED_DESTROYED:
detail = "destroyed"
case libvirt.DOMAIN_EVENT_STOPPED_CRASHED:
detail = "crashed"
case libvirt.DOMAIN_EVENT_STOPPED_MIGRATED:
detail = "migrated"
case libvirt.DOMAIN_EVENT_STOPPED_SAVED:
detail = "saved"
case libvirt.DOMAIN_EVENT_STOPPED_FAILED:
detail = "failed"
case libvirt.DOMAIN_EVENT_STOPPED_FROM_SNAPSHOT:
detail = "snapshot"
default:
detail = "unknown"
}
case libvirt.DOMAIN_EVENT_SHUTDOWN:
event = "shutdown"
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
etcd.Put(ctx, "/cluster/"+config.ClusterID+"/domain/"+domainID+"/state", "6")
cancel()
switch libvirt.DomainEventShutdownDetailType(e.Detail) {
case libvirt.DOMAIN_EVENT_SHUTDOWN_FINISHED:
detail = "finished"
case libvirt.DOMAIN_EVENT_SHUTDOWN_GUEST:
detail = "guest"
case libvirt.DOMAIN_EVENT_SHUTDOWN_HOST:
detail = "host"
default:
detail = "unknown"
}
default:
event = "unknown"
}
// Send event for all clients
if e.Event != libvirt.DOMAIN_EVENT_DEFINED|libvirt.DOMAIN_EVENT_UNDEFINED {
xmlDesc, err := d.GetXMLDesc(0)
if err != nil {
log.Fatalln(err)
}
var desc schema.Domain
err = xml.Unmarshal([]byte(xmlDesc), &desc)
if err != nil {
log.Fatalln(err)
}
state, _ := json.Marshal(&schema.DomainStateJSON{
CompanyID: desc.Metadata.DeevirtInstance.DeevirtCompanyID,
DatacenterID: desc.Metadata.DeevirtInstance.DeevirtDatacenterID,
DomainID: domainID,
State: int64(e.Event),
})
a, _ := amqp.NewAMQP()
a.Publisher("vmcenter",
"events."+desc.Metadata.DeevirtInstance.DeevirtCompanyID+
"."+desc.Metadata.DeevirtInstance.DeevirtDatacenterID+
"."+domainID,
state)
a.Close()
}
fmt.Printf("Domain event=%q detail=%q\n", event, detail)
}
func Reboot(c *libvirt.Connect, d *libvirt.Domain) {
}
func Watchdog(c *libvirt.Connect, d *libvirt.Domain, event *libvirt.DomainEventWatchdog) {
println(event.String())
}

View File

@ -0,0 +1,60 @@
package events
import (
"context"
"encoding/json"
"encoding/xml"
"log"
"strings"
"time"
"deevirt.fr/compute/pkg/config"
"deevirt.fr/compute/pkg/schema"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
"libvirt.org/go/libvirt"
pb "deevirt.fr/compute/pkg/api/proto"
)
type Qemu struct {
NodeID string
CompanyID string
DatacenterID string
DomainID string
Event *libvirt.DomainQemuMonitorEvent
}
func QemuEvents(c *libvirt.Connect, d *libvirt.Domain, event *libvirt.DomainQemuMonitorEvent) {
var desc schema.Domain
config, _ := config.New()
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
domainID, _ := d.GetUUIDString()
xmlDesc, _ := d.GetXMLDesc(0)
err := xml.Unmarshal([]byte(xmlDesc), &desc)
if err != nil {
log.Fatalln(err)
}
e, _ := json.Marshal(&Qemu{
NodeID: config.NodeID,
CompanyID: desc.Metadata.DeevirtInstance.DeevirtCompanyID,
DatacenterID: desc.Metadata.DeevirtInstance.DeevirtDatacenterID,
DomainID: domainID,
Event: event,
})
conn, err := grpc.NewClient(strings.Join(config.Manager.Peers, ","), grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
log.Fatalf("Erreur de connexion : %v", err)
}
defer conn.Close()
client := pb.NewDomainClient(conn)
client.Event(ctx, &pb.DomainEventRequest{
Event: e,
})
}

View File

@ -22,13 +22,7 @@ func Server() {
conn.SetKeepAlive(5, 3)
conn.DomainEventAgentLifecycleRegister(nil, AgentLifecycle)
conn.DomainEventGraphicsRegister(nil, Graphics)
conn.DomainEventJobCompletedRegister(nil, JobCompleted)
conn.DomainEventMigrationIterationRegister(nil, MigrationIteration)
conn.DomainEventLifecycleRegister(nil, Lifecyle)
conn.DomainEventRebootRegister(nil, Reboot)
conn.DomainEventWatchdogRegister(nil, Watchdog)
conn.DomainQemuMonitorEventRegister(nil, "(.*)", QemuEvents, libvirt.CONNECT_DOMAIN_QEMU_MONITOR_EVENT_REGISTER_REGEX)
for {
libvirt.EventRunDefaultImpl()

View File

@ -1,13 +1,23 @@
package main
import (
"time"
"deevirt.fr/compute/cmd/monitor/events"
"deevirt.fr/compute/cmd/monitor/metrics"
"github.com/coreos/go-systemd/v22/daemon"
)
func main() {
go metrics.Server()
go events.Server()
select {}
daemon.SdNotify(false, daemon.SdNotifyReady)
for {
daemon.SdNotify(false, daemon.SdNotifyWatchdog)
time.Sleep(5 * time.Second)
}
}

View File

@ -5,7 +5,6 @@ import (
"log"
"strconv"
"deevirt.fr/compute/pkg/config"
"deevirt.fr/compute/pkg/schema"
"github.com/prometheus/client_golang/prometheus"
"libvirt.org/go/libvirt"
@ -231,11 +230,6 @@ var (
)
func CollectDomain(ch chan<- prometheus.Metric, stat libvirt.DomainStats, hostname string) error {
config, err := config.New()
if err != nil {
log.Fatalln(err)
}
domainUUID, err := stat.Domain.GetUUIDString()
if err != nil {
return err
@ -295,15 +289,15 @@ func CollectDomain(ch chan<- prometheus.Metric, stat libvirt.DomainStats, hostna
domainUUID)
// Block Stats
CollectDomainVCPU(ch, stat.Vcpu, hostname, domainUUID, config, desc)
CollectDomainBlock(ch, stat.Block, hostname, domainUUID, config, desc)
CollectDomainNet(ch, stat.Net, hostname, domainUUID, config, desc)
CollectDomainBalloon(ch, stat.Balloon, hostname, domainUUID, config, desc)
CollectDomainVCPU(ch, stat.Vcpu, hostname, domainUUID, desc)
CollectDomainBlock(ch, stat.Block, hostname, domainUUID, desc)
CollectDomainNet(ch, stat.Net, hostname, domainUUID, desc)
CollectDomainBalloon(ch, stat.Balloon, hostname, domainUUID, desc)
return nil
}
func CollectDomainVCPU(ch chan<- prometheus.Metric, stat []libvirt.DomainStatsVcpu, hostname string, domainUUID string, config *config.Config, desc schema.Domain) {
func CollectDomainVCPU(ch chan<- prometheus.Metric, stat []libvirt.DomainStatsVcpu, hostname string, domainUUID string, desc schema.Domain) {
for idx, vcpu := range stat {
ch <- prometheus.MustNewConstMetric(
libvirtDomainVcpuState,
@ -341,7 +335,7 @@ func CollectDomainVCPU(ch chan<- prometheus.Metric, stat []libvirt.DomainStatsVc
}
}
func CollectDomainBalloon(ch chan<- prometheus.Metric, stat *libvirt.DomainStatsBalloon, hostname string, domainUUID string, config *config.Config, desc schema.Domain) {
func CollectDomainBalloon(ch chan<- prometheus.Metric, stat *libvirt.DomainStatsBalloon, hostname string, domainUUID string, desc schema.Domain) {
ch <- prometheus.MustNewConstMetric(
libvirtDomainBalloonStatCurrentBytes,
prometheus.GaugeValue,
@ -422,7 +416,7 @@ func CollectDomainBalloon(ch chan<- prometheus.Metric, stat *libvirt.DomainStats
}
func CollectDomainBlock(ch chan<- prometheus.Metric, stat []libvirt.DomainStatsBlock, hostname string, domainUUID string, config *config.Config, desc schema.Domain) {
func CollectDomainBlock(ch chan<- prometheus.Metric, stat []libvirt.DomainStatsBlock, hostname string, domainUUID string, desc schema.Domain) {
for _, block := range stat {
if block.RdBytesSet {
@ -538,7 +532,7 @@ func CollectDomainBlock(ch chan<- prometheus.Metric, stat []libvirt.DomainStatsB
}
}
func CollectDomainNet(ch chan<- prometheus.Metric, stat []libvirt.DomainStatsNet, hostname string, domainUUID string, config *config.Config, desc schema.Domain) {
func CollectDomainNet(ch chan<- prometheus.Metric, stat []libvirt.DomainStatsNet, hostname string, domainUUID string, desc schema.Domain) {
for _, iface := range stat {
if iface.RxBytesSet {

View File

@ -0,0 +1,84 @@
// Copyright 2014 Docker, Inc.
// Copyright 2015-2018 CoreOS, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// Package daemon provides a Go implementation of the sd_notify protocol.
// It can be used to inform systemd of service start-up completion, watchdog
// events, and other status changes.
//
// https://www.freedesktop.org/software/systemd/man/sd_notify.html#Description
package daemon
import (
"net"
"os"
)
const (
// SdNotifyReady tells the service manager that service startup is finished
// or the service finished loading its configuration.
SdNotifyReady = "READY=1"
// SdNotifyStopping tells the service manager that the service is beginning
// its shutdown.
SdNotifyStopping = "STOPPING=1"
// SdNotifyReloading tells the service manager that this service is
// reloading its configuration. Note that you must call SdNotifyReady when
// it completed reloading.
SdNotifyReloading = "RELOADING=1"
// SdNotifyWatchdog tells the service manager to update the watchdog
// timestamp for the service.
SdNotifyWatchdog = "WATCHDOG=1"
)
// SdNotify sends a message to the init daemon. It is common to ignore the error.
// If `unsetEnvironment` is true, the environment variable `NOTIFY_SOCKET`
// will be unconditionally unset.
//
// It returns one of the following:
// (false, nil) - notification not supported (i.e. NOTIFY_SOCKET is unset)
// (false, err) - notification supported, but failure happened (e.g. error connecting to NOTIFY_SOCKET or while sending data)
// (true, nil) - notification supported, data has been sent
func SdNotify(unsetEnvironment bool, state string) (bool, error) {
socketAddr := &net.UnixAddr{
Name: os.Getenv("NOTIFY_SOCKET"),
Net: "unixgram",
}
// NOTIFY_SOCKET not set
if socketAddr.Name == "" {
return false, nil
}
if unsetEnvironment {
if err := os.Unsetenv("NOTIFY_SOCKET"); err != nil {
return false, err
}
}
conn, err := net.DialUnix(socketAddr.Net, nil, socketAddr)
// Error connecting to NOTIFY_SOCKET
if err != nil {
return false, err
}
defer conn.Close()
if _, err = conn.Write([]byte(state)); err != nil {
return false, err
}
return true, nil
}

View File

@ -0,0 +1,73 @@
// Copyright 2016 CoreOS, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package daemon
import (
"fmt"
"os"
"strconv"
"time"
)
// SdWatchdogEnabled returns watchdog information for a service.
// Processes should call daemon.SdNotify(false, daemon.SdNotifyWatchdog) every
// time / 2.
// If `unsetEnvironment` is true, the environment variables `WATCHDOG_USEC` and
// `WATCHDOG_PID` will be unconditionally unset.
//
// It returns one of the following:
// (0, nil) - watchdog isn't enabled or we aren't the watched PID.
// (0, err) - an error happened (e.g. error converting time).
// (time, nil) - watchdog is enabled and we can send ping. time is delay
// before inactive service will be killed.
func SdWatchdogEnabled(unsetEnvironment bool) (time.Duration, error) {
wusec := os.Getenv("WATCHDOG_USEC")
wpid := os.Getenv("WATCHDOG_PID")
if unsetEnvironment {
wusecErr := os.Unsetenv("WATCHDOG_USEC")
wpidErr := os.Unsetenv("WATCHDOG_PID")
if wusecErr != nil {
return 0, wusecErr
}
if wpidErr != nil {
return 0, wpidErr
}
}
if wusec == "" {
return 0, nil
}
s, err := strconv.Atoi(wusec)
if err != nil {
return 0, fmt.Errorf("error converting WATCHDOG_USEC: %s", err)
}
if s <= 0 {
return 0, fmt.Errorf("error WATCHDOG_USEC must be a positive number")
}
interval := time.Duration(s) * time.Microsecond
if wpid == "" {
return interval, nil
}
p, err := strconv.Atoi(wpid)
if err != nil {
return 0, fmt.Errorf("error converting WATCHDOG_PID: %s", err)
}
if os.Getpid() != p {
return 0, nil
}
return interval, nil
}

15
vendor/modules.txt vendored
View File

@ -15,6 +15,7 @@ github.com/cespare/xxhash/v2
github.com/coreos/go-semver/semver
# github.com/coreos/go-systemd/v22 v22.5.0
## explicit; go 1.12
github.com/coreos/go-systemd/v22/daemon
github.com/coreos/go-systemd/v22/journal
# github.com/denisbrodbeck/machineid v1.0.1
## explicit
@ -30,10 +31,6 @@ github.com/gogo/protobuf/protoc-gen-gogo/descriptor
# github.com/golang/protobuf v1.5.4
## explicit; go 1.17
github.com/golang/protobuf/proto
# github.com/google/licensecheck v0.3.1
## explicit; go 1.12
# github.com/google/safehtml v0.0.3-0.20211026203422-d6f0e11a5516
## explicit; go 1.14
# github.com/hashicorp/errwrap v1.1.0
## explicit
github.com/hashicorp/errwrap
@ -157,8 +154,6 @@ go.uber.org/zap/internal/pool
go.uber.org/zap/internal/stacktrace
go.uber.org/zap/zapcore
go.uber.org/zap/zapgrpc
# golang.org/x/mod v0.23.0
## explicit; go 1.22.0
# golang.org/x/net v0.35.0
## explicit; go 1.18
golang.org/x/net/http/httpguts
@ -168,9 +163,7 @@ golang.org/x/net/idna
golang.org/x/net/internal/httpcommon
golang.org/x/net/internal/timeseries
golang.org/x/net/trace
# golang.org/x/pkgsite v0.0.0-20250214205047-dd488e5da97a
## explicit; go 1.23
# golang.org/x/sync v0.11.0
# golang.org/x/oauth2 v0.26.0
## explicit; go 1.18
# golang.org/x/sys v0.30.0
## explicit; go 1.18
@ -183,8 +176,6 @@ golang.org/x/text/secure/bidirule
golang.org/x/text/transform
golang.org/x/text/unicode/bidi
golang.org/x/text/unicode/norm
# golang.org/x/tools v0.30.0
## explicit; go 1.22.0
# google.golang.org/genproto/googleapis/api v0.0.0-20250207221924-e9438ea467c6
## explicit; go 1.22
google.golang.org/genproto/googleapis/api
@ -302,5 +293,3 @@ gopkg.in/ini.v1
# libvirt.org/go/libvirt v1.11001.0
## explicit; go 1.11
libvirt.org/go/libvirt
# rsc.io/markdown v0.0.0-20231214224604-88bb533a6020
## explicit; go 1.20