Compare commits

...

7 Commits

10 changed files with 257 additions and 250 deletions

View File

@ -2,29 +2,27 @@ package events
import ( import (
"context" "context"
"encoding/json"
"encoding/xml"
"fmt" "fmt"
"log" "log"
"strconv"
"strings" "strings"
"time" "time"
"deevirt.fr/compute/pkg/amqp" "deevirt.fr/compute/pkg/amqp"
"deevirt.fr/compute/pkg/config"
"deevirt.fr/compute/pkg/schema"
clientv3 "go.etcd.io/etcd/client/v3" clientv3 "go.etcd.io/etcd/client/v3"
"libvirt.org/go/libvirt" "libvirt.org/go/libvirt"
) )
func Lifecyle(c *libvirt.Connect, d *libvirt.Domain, e *libvirt.DomainEventLifecycle) { func Lifecyle(c *libvirt.Connect, d *libvirt.Domain, e *libvirt.DomainEventLifecycle) {
var detail, event string var detail, event string
config, _ := config.NewConfig()
domainID, _ := d.GetUUIDString() domainID, _ := d.GetUUIDString()
// Read Etcd URI
config, err := Config()
if err != nil {
log.Fatalf("Fail to read file: %v", err)
}
etcd, err := clientv3.New(clientv3.Config{ etcd, err := clientv3.New(clientv3.Config{
Endpoints: strings.Split(config.Section("etcd").Key("uri").String(), ","), Endpoints: strings.Split(config.EtcdURI, ","),
DialTimeout: 5 * time.Second, DialTimeout: 5 * time.Second,
}) })
if err != nil { if err != nil {
@ -37,7 +35,7 @@ func Lifecyle(c *libvirt.Connect, d *libvirt.Domain, e *libvirt.DomainEventLifec
switch e.Event { switch e.Event {
case libvirt.DOMAIN_EVENT_DEFINED: case libvirt.DOMAIN_EVENT_DEFINED:
event = "defined" event = "defined"
etcd.Put(ctx, "/cluster/"+ClusterID()+"/host/"+MachineID()+"/qemu/"+domainID, "") etcd.Put(ctx, "/cluster/"+config.ClusterID+"/host/"+config.NodeID+"/qemu/"+domainID, "")
switch libvirt.DomainEventDefinedDetailType(e.Detail) { switch libvirt.DomainEventDefinedDetailType(e.Detail) {
case libvirt.DOMAIN_EVENT_DEFINED_ADDED: case libvirt.DOMAIN_EVENT_DEFINED_ADDED:
@ -54,7 +52,7 @@ func Lifecyle(c *libvirt.Connect, d *libvirt.Domain, e *libvirt.DomainEventLifec
case libvirt.DOMAIN_EVENT_UNDEFINED: case libvirt.DOMAIN_EVENT_UNDEFINED:
event = "undefined" event = "undefined"
etcd.Delete(ctx, "/cluster/"+ClusterID()+"/host/"+MachineID()+"/qemu/"+domainID) etcd.Delete(ctx, "/cluster/"+config.ClusterID+"/host/"+config.NodeID+"/qemu/"+domainID)
switch libvirt.DomainEventUndefinedDetailType(e.Detail) { switch libvirt.DomainEventUndefinedDetailType(e.Detail) {
case libvirt.DOMAIN_EVENT_UNDEFINED_REMOVED: case libvirt.DOMAIN_EVENT_UNDEFINED_REMOVED:
@ -67,7 +65,7 @@ func Lifecyle(c *libvirt.Connect, d *libvirt.Domain, e *libvirt.DomainEventLifec
case libvirt.DOMAIN_EVENT_STARTED: case libvirt.DOMAIN_EVENT_STARTED:
event = "started" event = "started"
etcd.Put(ctx, "/cluster/"+ClusterID()+"/domain/"+domainID+"/state", "2") etcd.Put(ctx, "/cluster/"+config.ClusterID+"/domain/"+domainID+"/state", "2")
switch libvirt.DomainEventStartedDetailType(e.Detail) { switch libvirt.DomainEventStartedDetailType(e.Detail) {
case libvirt.DOMAIN_EVENT_STARTED_BOOTED: case libvirt.DOMAIN_EVENT_STARTED_BOOTED:
@ -86,7 +84,7 @@ func Lifecyle(c *libvirt.Connect, d *libvirt.Domain, e *libvirt.DomainEventLifec
case libvirt.DOMAIN_EVENT_SUSPENDED: case libvirt.DOMAIN_EVENT_SUSPENDED:
event = "suspended" event = "suspended"
etcd.Put(ctx, "/cluster/"+ClusterID()+"/domain/"+domainID+"/state", "3") etcd.Put(ctx, "/cluster/"+config.ClusterID+"/domain/"+domainID+"/state", "3")
switch libvirt.DomainEventSuspendedDetailType(e.Detail) { switch libvirt.DomainEventSuspendedDetailType(e.Detail) {
case libvirt.DOMAIN_EVENT_SUSPENDED_PAUSED: case libvirt.DOMAIN_EVENT_SUSPENDED_PAUSED:
@ -113,7 +111,7 @@ func Lifecyle(c *libvirt.Connect, d *libvirt.Domain, e *libvirt.DomainEventLifec
case libvirt.DOMAIN_EVENT_RESUMED: case libvirt.DOMAIN_EVENT_RESUMED:
event = "resumed" event = "resumed"
etcd.Put(ctx, "/cluster/"+ClusterID()+"/domain/"+domainID+"/state", "4") etcd.Put(ctx, "/cluster/"+config.ClusterID+"/domain/"+domainID+"/state", "4")
switch libvirt.DomainEventResumedDetailType(e.Detail) { switch libvirt.DomainEventResumedDetailType(e.Detail) {
case libvirt.DOMAIN_EVENT_RESUMED_UNPAUSED: case libvirt.DOMAIN_EVENT_RESUMED_UNPAUSED:
@ -132,7 +130,7 @@ func Lifecyle(c *libvirt.Connect, d *libvirt.Domain, e *libvirt.DomainEventLifec
case libvirt.DOMAIN_EVENT_STOPPED: case libvirt.DOMAIN_EVENT_STOPPED:
event = "stopped" event = "stopped"
etcd.Put(ctx, "/cluster/"+ClusterID()+"/domain/"+domainID+"/state", "5") etcd.Put(ctx, "/cluster/"+config.ClusterID+"/domain/"+domainID+"/state", "5")
switch libvirt.DomainEventStoppedDetailType(e.Detail) { switch libvirt.DomainEventStoppedDetailType(e.Detail) {
case libvirt.DOMAIN_EVENT_STOPPED_SHUTDOWN: case libvirt.DOMAIN_EVENT_STOPPED_SHUTDOWN:
@ -155,7 +153,7 @@ func Lifecyle(c *libvirt.Connect, d *libvirt.Domain, e *libvirt.DomainEventLifec
case libvirt.DOMAIN_EVENT_SHUTDOWN: case libvirt.DOMAIN_EVENT_SHUTDOWN:
event = "shutdown" event = "shutdown"
etcd.Put(ctx, "/cluster/"+ClusterID()+"/domain/"+domainID+"/state", "6") etcd.Put(ctx, "/cluster/"+config.ClusterID+"/domain/"+domainID+"/state", "6")
switch libvirt.DomainEventShutdownDetailType(e.Detail) { switch libvirt.DomainEventShutdownDetailType(e.Detail) {
case libvirt.DOMAIN_EVENT_SHUTDOWN_FINISHED: case libvirt.DOMAIN_EVENT_SHUTDOWN_FINISHED:
@ -172,8 +170,33 @@ func Lifecyle(c *libvirt.Connect, d *libvirt.Domain, e *libvirt.DomainEventLifec
event = "unknown" event = "unknown"
} }
// AMQP // Send event for all clients
amqp.Publisher([]byte(strconv.FormatInt(int64(e.Event), 10))) if e.Event != libvirt.DOMAIN_EVENT_DEFINED|libvirt.DOMAIN_EVENT_UNDEFINED {
xmlDesc, err := d.GetXMLDesc(0)
if err != nil {
log.Fatalln(err)
}
var desc schema.Domain
err = xml.Unmarshal([]byte(xmlDesc), &desc)
if err != nil {
log.Fatalln(err)
}
state, _ := json.Marshal(&schema.DomainStateJSON{
CompanyID: desc.Metadata.DeevirtInstance.DeevirtCompanyID,
DatacenterID: desc.Metadata.DeevirtInstance.DeevirtDatacenterID,
DomainID: domainID,
State: int64(e.Event),
})
a, _ := amqp.NewAMQP()
a.Publisher("vmcenter",
"events."+desc.Metadata.DeevirtInstance.DeevirtCompanyID+
"."+desc.Metadata.DeevirtInstance.DeevirtDatacenterID+
"."+domainID,
state)
a.Close()
}
fmt.Printf("Domain event=%q detail=%q\n", event, detail) fmt.Printf("Domain event=%q detail=%q\n", event, detail)
} }

View File

@ -1,34 +1,12 @@
package events package events
import ( import (
"encoding/hex"
"fmt"
"log" "log"
"os" "os"
"github.com/denisbrodbeck/machineid"
"gopkg.in/ini.v1"
"libvirt.org/go/libvirt" "libvirt.org/go/libvirt"
) )
func Config() (*ini.File, error) {
return ini.Load("/etc/deevirt/config.ini")
}
var ClusterID = func() string {
config, _ := Config()
return config.Section("").Key("id").String()
}
var MachineID = func() string {
id, err := machineid.ID()
if err != nil {
log.Fatal(err)
}
u, _ := hex.DecodeString(id)
return fmt.Sprintf("%x-%x-%x-%x-%x\n", u[:4], u[4:6], u[6:8], u[8:10], u[10:])
}
func agentLifecycle(c *libvirt.Connect, d *libvirt.Domain, event *libvirt.DomainEventAgentLifecycle) { func agentLifecycle(c *libvirt.Connect, d *libvirt.Domain, event *libvirt.DomainEventAgentLifecycle) {
println(event.State) println(event.State)
println(event.Reason) println(event.Reason)

View File

@ -2,8 +2,8 @@ package metrics
import ( import (
"log" "log"
"strconv"
"deevirt.fr/compute/pkg/config"
"github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus"
"libvirt.org/go/libvirt" "libvirt.org/go/libvirt"
) )
@ -13,26 +13,20 @@ var (
libvirtNodeCPUUsage = prometheus.NewDesc( libvirtNodeCPUUsage = prometheus.NewDesc(
prometheus.BuildFQName("libvirt", "node", "cpu_time_seconds_total"), prometheus.BuildFQName("libvirt", "node", "cpu_time_seconds_total"),
"CPU usage of node", "CPU usage of node",
[]string{"cluster", "node", "thread"}, []string{"cluster_id", "node_id", "node"},
nil) nil)
libvirtNodeMemoryUsageBytes = prometheus.NewDesc( libvirtNodeMemoryUsageBytes = prometheus.NewDesc(
prometheus.BuildFQName("libvirt", "node", "memory_usage_bytes"), prometheus.BuildFQName("libvirt", "node", "memory_usage_bytes"),
"Memory usage of the node, in bytes.", "Memory usage of the node, in bytes.",
[]string{"cluster", "node", "total"}, []string{"cluster_id", "node_id", "node"},
nil) nil)
) )
func CollectNode(conn *libvirt.Connect, ch chan<- prometheus.Metric, hostname string) error { func CollectNode(conn *libvirt.Connect, ch chan<- prometheus.Metric, hostname string) error {
config, err := Config() config, err := config.NewConfig()
if err != nil { if err != nil {
log.Fatalf("Fail to read file: %v", err) log.Fatalln(err)
}
// Node
node_info, err := conn.GetNodeInfo()
if err != nil {
return err
} }
nodeCPU, _ := conn.GetCPUStats(int(libvirt.NODE_CPU_STATS_ALL_CPUS), 0) // rate(libvirt_node_cpu_time_seconds_total[10s]) * 100 nodeCPU, _ := conn.GetCPUStats(int(libvirt.NODE_CPU_STATS_ALL_CPUS), 0) // rate(libvirt_node_cpu_time_seconds_total[10s]) * 100
@ -42,18 +36,18 @@ func CollectNode(conn *libvirt.Connect, ch chan<- prometheus.Metric, hostname st
libvirtNodeCPUUsage, libvirtNodeCPUUsage,
prometheus.CounterValue, prometheus.CounterValue,
float64(nodeCPU.Kernel+nodeCPU.User+nodeCPU.Iowait)/1e9, // From nsec to sec float64(nodeCPU.Kernel+nodeCPU.User+nodeCPU.Iowait)/1e9, // From nsec to sec
config.Section("").Key("id").String(), config.ClusterID,
config.NodeID,
hostname, hostname,
strconv.FormatInt(int64(node_info.Sockets*node_info.Cores*node_info.Threads), 10),
) )
ch <- prometheus.MustNewConstMetric( ch <- prometheus.MustNewConstMetric(
libvirtNodeMemoryUsageBytes, libvirtNodeMemoryUsageBytes,
prometheus.GaugeValue, prometheus.GaugeValue,
float64(nodeMemory.Total-(nodeMemory.Buffers+nodeMemory.Free+nodeMemory.Cached))*1024, float64(nodeMemory.Total-(nodeMemory.Buffers+nodeMemory.Free+nodeMemory.Cached))*1024,
config.Section("").Key("id").String(), config.ClusterID,
config.NodeID,
hostname, hostname,
strconv.FormatInt(int64(nodeMemory.Total), 10),
) )
return nil return nil

View File

@ -5,9 +5,9 @@ import (
"log" "log"
"strconv" "strconv"
"deevirt.fr/compute/cmd/qemu/metrics/schema" "deevirt.fr/compute/pkg/config"
"deevirt.fr/compute/pkg/schema"
"github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus"
"gopkg.in/ini.v1"
"libvirt.org/go/libvirt" "libvirt.org/go/libvirt"
) )
@ -231,9 +231,9 @@ var (
) )
func CollectDomain(ch chan<- prometheus.Metric, stat libvirt.DomainStats, hostname string) error { func CollectDomain(ch chan<- prometheus.Metric, stat libvirt.DomainStats, hostname string) error {
config, err := Config() config, err := config.NewConfig()
if err != nil { if err != nil {
log.Fatalf("Fail to read file: %v", err) log.Fatalln(err)
} }
domainUUID, err := stat.Domain.GetUUIDString() domainUUID, err := stat.Domain.GetUUIDString()
@ -262,7 +262,7 @@ func CollectDomain(ch chan<- prometheus.Metric, stat libvirt.DomainStats, hostna
libvirtDomainInfoMaxMemBytes, libvirtDomainInfoMaxMemBytes,
prometheus.GaugeValue, prometheus.GaugeValue,
float64(info.MaxMem)*1024, float64(info.MaxMem)*1024,
config.Section("").Key("id").String(), config.ClusterID,
desc.Metadata.DeevirtInstance.DeevirtCompanyID, desc.Metadata.DeevirtInstance.DeevirtCompanyID,
desc.Metadata.DeevirtInstance.DeevirtDatacenterID, desc.Metadata.DeevirtInstance.DeevirtDatacenterID,
domainUUID) domainUUID)
@ -270,7 +270,7 @@ func CollectDomain(ch chan<- prometheus.Metric, stat libvirt.DomainStats, hostna
libvirtDomainInfoMemoryUsageBytes, libvirtDomainInfoMemoryUsageBytes,
prometheus.GaugeValue, prometheus.GaugeValue,
float64(info.Memory)*1024, float64(info.Memory)*1024,
config.Section("").Key("id").String(), config.ClusterID,
desc.Metadata.DeevirtInstance.DeevirtCompanyID, desc.Metadata.DeevirtInstance.DeevirtCompanyID,
desc.Metadata.DeevirtInstance.DeevirtDatacenterID, desc.Metadata.DeevirtInstance.DeevirtDatacenterID,
domainUUID) domainUUID)
@ -278,7 +278,7 @@ func CollectDomain(ch chan<- prometheus.Metric, stat libvirt.DomainStats, hostna
libvirtDomainInfoNrVirtCPU, libvirtDomainInfoNrVirtCPU,
prometheus.GaugeValue, prometheus.GaugeValue,
float64(info.NrVirtCpu), float64(info.NrVirtCpu),
config.Section("").Key("id").String(), config.ClusterID,
desc.Metadata.DeevirtInstance.DeevirtCompanyID, desc.Metadata.DeevirtInstance.DeevirtCompanyID,
desc.Metadata.DeevirtInstance.DeevirtDatacenterID, desc.Metadata.DeevirtInstance.DeevirtDatacenterID,
domainUUID) domainUUID)
@ -286,7 +286,7 @@ func CollectDomain(ch chan<- prometheus.Metric, stat libvirt.DomainStats, hostna
libvirtDomainInfoCPUTime, libvirtDomainInfoCPUTime,
prometheus.CounterValue, prometheus.CounterValue,
float64(info.CpuTime)/1e9, // From nsec to sec float64(info.CpuTime)/1e9, // From nsec to sec
config.Section("").Key("id").String(), config.ClusterID,
desc.Metadata.DeevirtInstance.DeevirtCompanyID, desc.Metadata.DeevirtInstance.DeevirtCompanyID,
desc.Metadata.DeevirtInstance.DeevirtDatacenterID, desc.Metadata.DeevirtInstance.DeevirtDatacenterID,
domainUUID) domainUUID)
@ -294,7 +294,7 @@ func CollectDomain(ch chan<- prometheus.Metric, stat libvirt.DomainStats, hostna
libvirtDomainInfoVirDomainState, libvirtDomainInfoVirDomainState,
prometheus.GaugeValue, prometheus.GaugeValue,
float64(info.State), float64(info.State),
config.Section("").Key("id").String(), config.ClusterID,
desc.Metadata.DeevirtInstance.DeevirtCompanyID, desc.Metadata.DeevirtInstance.DeevirtCompanyID,
desc.Metadata.DeevirtInstance.DeevirtDatacenterID, desc.Metadata.DeevirtInstance.DeevirtDatacenterID,
domainUUID) domainUUID)
@ -308,13 +308,13 @@ func CollectDomain(ch chan<- prometheus.Metric, stat libvirt.DomainStats, hostna
return nil return nil
} }
func CollectDomainVCPU(ch chan<- prometheus.Metric, stat []libvirt.DomainStatsVcpu, hostname string, domainUUID string, config *ini.File, desc schema.Domain) { func CollectDomainVCPU(ch chan<- prometheus.Metric, stat []libvirt.DomainStatsVcpu, hostname string, domainUUID string, config *config.Config, desc schema.Domain) {
for idx, vcpu := range stat { for idx, vcpu := range stat {
ch <- prometheus.MustNewConstMetric( ch <- prometheus.MustNewConstMetric(
libvirtDomainVcpuState, libvirtDomainVcpuState,
prometheus.GaugeValue, prometheus.GaugeValue,
float64(vcpu.State), float64(vcpu.State),
config.Section("").Key("id").String(), config.ClusterID,
desc.Metadata.DeevirtInstance.DeevirtCompanyID, desc.Metadata.DeevirtInstance.DeevirtCompanyID,
desc.Metadata.DeevirtInstance.DeevirtDatacenterID, desc.Metadata.DeevirtInstance.DeevirtDatacenterID,
domainUUID, domainUUID,
@ -323,7 +323,7 @@ func CollectDomainVCPU(ch chan<- prometheus.Metric, stat []libvirt.DomainStatsVc
libvirtDomainVcpuTime, libvirtDomainVcpuTime,
prometheus.CounterValue, prometheus.CounterValue,
float64(vcpu.Time)/1000/1000/1000, // From nsec to sec float64(vcpu.Time)/1000/1000/1000, // From nsec to sec
config.Section("").Key("id").String(), config.ClusterID,
desc.Metadata.DeevirtInstance.DeevirtCompanyID, desc.Metadata.DeevirtInstance.DeevirtCompanyID,
desc.Metadata.DeevirtInstance.DeevirtDatacenterID, desc.Metadata.DeevirtInstance.DeevirtDatacenterID,
domainUUID, domainUUID,
@ -332,7 +332,7 @@ func CollectDomainVCPU(ch chan<- prometheus.Metric, stat []libvirt.DomainStatsVc
libvirtDomainVcpuWait, libvirtDomainVcpuWait,
prometheus.CounterValue, prometheus.CounterValue,
float64(vcpu.Wait)/1e9, // From nsec to sec float64(vcpu.Wait)/1e9, // From nsec to sec
config.Section("").Key("id").String(), config.ClusterID,
desc.Metadata.DeevirtInstance.DeevirtCompanyID, desc.Metadata.DeevirtInstance.DeevirtCompanyID,
desc.Metadata.DeevirtInstance.DeevirtDatacenterID, desc.Metadata.DeevirtInstance.DeevirtDatacenterID,
domainUUID, domainUUID,
@ -341,7 +341,7 @@ func CollectDomainVCPU(ch chan<- prometheus.Metric, stat []libvirt.DomainStatsVc
libvirtDomainVcpuDelay, libvirtDomainVcpuDelay,
prometheus.CounterValue, prometheus.CounterValue,
float64(vcpu.Delay)/1e9, // From nsec to sec float64(vcpu.Delay)/1e9, // From nsec to sec
config.Section("").Key("id").String(), config.ClusterID,
desc.Metadata.DeevirtInstance.DeevirtCompanyID, desc.Metadata.DeevirtInstance.DeevirtCompanyID,
desc.Metadata.DeevirtInstance.DeevirtDatacenterID, desc.Metadata.DeevirtInstance.DeevirtDatacenterID,
domainUUID, domainUUID,
@ -350,12 +350,12 @@ func CollectDomainVCPU(ch chan<- prometheus.Metric, stat []libvirt.DomainStatsVc
} }
} }
func CollectDomainBalloon(ch chan<- prometheus.Metric, stat *libvirt.DomainStatsBalloon, hostname string, domainUUID string, config *ini.File, desc schema.Domain) { func CollectDomainBalloon(ch chan<- prometheus.Metric, stat *libvirt.DomainStatsBalloon, hostname string, domainUUID string, config *config.Config, desc schema.Domain) {
ch <- prometheus.MustNewConstMetric( ch <- prometheus.MustNewConstMetric(
libvirtDomainBalloonStatCurrentBytes, libvirtDomainBalloonStatCurrentBytes,
prometheus.GaugeValue, prometheus.GaugeValue,
float64(stat.Current)*1024, float64(stat.Current)*1024,
config.Section("").Key("id").String(), config.ClusterID,
desc.Metadata.DeevirtInstance.DeevirtCompanyID, desc.Metadata.DeevirtInstance.DeevirtCompanyID,
desc.Metadata.DeevirtInstance.DeevirtDatacenterID, desc.Metadata.DeevirtInstance.DeevirtDatacenterID,
domainUUID) domainUUID)
@ -363,7 +363,7 @@ func CollectDomainBalloon(ch chan<- prometheus.Metric, stat *libvirt.DomainStats
libvirtDomainBalloonStatMaximumBytes, libvirtDomainBalloonStatMaximumBytes,
prometheus.GaugeValue, prometheus.GaugeValue,
float64(stat.Maximum)*1024, float64(stat.Maximum)*1024,
config.Section("").Key("id").String(), config.ClusterID,
desc.Metadata.DeevirtInstance.DeevirtCompanyID, desc.Metadata.DeevirtInstance.DeevirtCompanyID,
desc.Metadata.DeevirtInstance.DeevirtDatacenterID, desc.Metadata.DeevirtInstance.DeevirtDatacenterID,
domainUUID) domainUUID)
@ -371,7 +371,7 @@ func CollectDomainBalloon(ch chan<- prometheus.Metric, stat *libvirt.DomainStats
libvirtDomainBalloonStatSwapInBytes, libvirtDomainBalloonStatSwapInBytes,
prometheus.GaugeValue, prometheus.GaugeValue,
float64(stat.SwapIn)*1024, float64(stat.SwapIn)*1024,
config.Section("").Key("id").String(), config.ClusterID,
desc.Metadata.DeevirtInstance.DeevirtCompanyID, desc.Metadata.DeevirtInstance.DeevirtCompanyID,
desc.Metadata.DeevirtInstance.DeevirtDatacenterID, desc.Metadata.DeevirtInstance.DeevirtDatacenterID,
domainUUID) domainUUID)
@ -379,7 +379,7 @@ func CollectDomainBalloon(ch chan<- prometheus.Metric, stat *libvirt.DomainStats
libvirtDomainBalloonStatSwapOutBytes, libvirtDomainBalloonStatSwapOutBytes,
prometheus.GaugeValue, prometheus.GaugeValue,
float64(stat.SwapOut)*1024, float64(stat.SwapOut)*1024,
config.Section("").Key("id").String(), config.ClusterID,
desc.Metadata.DeevirtInstance.DeevirtCompanyID, desc.Metadata.DeevirtInstance.DeevirtCompanyID,
desc.Metadata.DeevirtInstance.DeevirtDatacenterID, desc.Metadata.DeevirtInstance.DeevirtDatacenterID,
domainUUID) domainUUID)
@ -387,7 +387,7 @@ func CollectDomainBalloon(ch chan<- prometheus.Metric, stat *libvirt.DomainStats
libvirtDomainBalloonStatMajorFaultTotal, libvirtDomainBalloonStatMajorFaultTotal,
prometheus.CounterValue, prometheus.CounterValue,
float64(stat.MajorFault), float64(stat.MajorFault),
config.Section("").Key("id").String(), config.ClusterID,
desc.Metadata.DeevirtInstance.DeevirtCompanyID, desc.Metadata.DeevirtInstance.DeevirtCompanyID,
desc.Metadata.DeevirtInstance.DeevirtDatacenterID, desc.Metadata.DeevirtInstance.DeevirtDatacenterID,
domainUUID) domainUUID)
@ -395,7 +395,7 @@ func CollectDomainBalloon(ch chan<- prometheus.Metric, stat *libvirt.DomainStats
libvirtDomainBalloonStatMinorFaultTotal, libvirtDomainBalloonStatMinorFaultTotal,
prometheus.CounterValue, prometheus.CounterValue,
float64(stat.MinorFault), float64(stat.MinorFault),
config.Section("").Key("id").String(), config.ClusterID,
desc.Metadata.DeevirtInstance.DeevirtCompanyID, desc.Metadata.DeevirtInstance.DeevirtCompanyID,
desc.Metadata.DeevirtInstance.DeevirtDatacenterID, desc.Metadata.DeevirtInstance.DeevirtDatacenterID,
domainUUID) domainUUID)
@ -403,7 +403,7 @@ func CollectDomainBalloon(ch chan<- prometheus.Metric, stat *libvirt.DomainStats
libvirtDomainBalloonStatUnusedBytes, libvirtDomainBalloonStatUnusedBytes,
prometheus.GaugeValue, prometheus.GaugeValue,
float64(stat.Unused)*1024, float64(stat.Unused)*1024,
config.Section("").Key("id").String(), config.ClusterID,
desc.Metadata.DeevirtInstance.DeevirtCompanyID, desc.Metadata.DeevirtInstance.DeevirtCompanyID,
desc.Metadata.DeevirtInstance.DeevirtDatacenterID, desc.Metadata.DeevirtInstance.DeevirtDatacenterID,
domainUUID) domainUUID)
@ -411,7 +411,7 @@ func CollectDomainBalloon(ch chan<- prometheus.Metric, stat *libvirt.DomainStats
libvirtDomainBalloonStatAvailableBytes, libvirtDomainBalloonStatAvailableBytes,
prometheus.GaugeValue, prometheus.GaugeValue,
float64(stat.Available)*1024, float64(stat.Available)*1024,
config.Section("").Key("id").String(), config.ClusterID,
desc.Metadata.DeevirtInstance.DeevirtCompanyID, desc.Metadata.DeevirtInstance.DeevirtCompanyID,
desc.Metadata.DeevirtInstance.DeevirtDatacenterID, desc.Metadata.DeevirtInstance.DeevirtDatacenterID,
domainUUID) domainUUID)
@ -419,7 +419,7 @@ func CollectDomainBalloon(ch chan<- prometheus.Metric, stat *libvirt.DomainStats
libvirtDomainBalloonStatRssBytes, libvirtDomainBalloonStatRssBytes,
prometheus.GaugeValue, prometheus.GaugeValue,
float64(stat.Rss)*1024, float64(stat.Rss)*1024,
config.Section("").Key("id").String(), config.ClusterID,
desc.Metadata.DeevirtInstance.DeevirtCompanyID, desc.Metadata.DeevirtInstance.DeevirtCompanyID,
desc.Metadata.DeevirtInstance.DeevirtDatacenterID, desc.Metadata.DeevirtInstance.DeevirtDatacenterID,
domainUUID) domainUUID)
@ -427,7 +427,7 @@ func CollectDomainBalloon(ch chan<- prometheus.Metric, stat *libvirt.DomainStats
libvirtDomainBalloonStatUsableBytes, libvirtDomainBalloonStatUsableBytes,
prometheus.GaugeValue, prometheus.GaugeValue,
float64(stat.Usable)*1024, float64(stat.Usable)*1024,
config.Section("").Key("id").String(), config.ClusterID,
desc.Metadata.DeevirtInstance.DeevirtCompanyID, desc.Metadata.DeevirtInstance.DeevirtCompanyID,
desc.Metadata.DeevirtInstance.DeevirtDatacenterID, desc.Metadata.DeevirtInstance.DeevirtDatacenterID,
domainUUID) domainUUID)
@ -435,14 +435,14 @@ func CollectDomainBalloon(ch chan<- prometheus.Metric, stat *libvirt.DomainStats
libvirtDomainBalloonStatDiskCachesBytes, libvirtDomainBalloonStatDiskCachesBytes,
prometheus.GaugeValue, prometheus.GaugeValue,
float64(stat.DiskCaches)*1024, float64(stat.DiskCaches)*1024,
config.Section("").Key("id").String(), config.ClusterID,
desc.Metadata.DeevirtInstance.DeevirtCompanyID, desc.Metadata.DeevirtInstance.DeevirtCompanyID,
desc.Metadata.DeevirtInstance.DeevirtDatacenterID, desc.Metadata.DeevirtInstance.DeevirtDatacenterID,
domainUUID) domainUUID)
} }
func CollectDomainBlock(ch chan<- prometheus.Metric, stat []libvirt.DomainStatsBlock, hostname string, domainUUID string, config *ini.File, desc schema.Domain) { func CollectDomainBlock(ch chan<- prometheus.Metric, stat []libvirt.DomainStatsBlock, hostname string, domainUUID string, config *config.Config, desc schema.Domain) {
for _, block := range stat { for _, block := range stat {
if block.RdBytesSet { if block.RdBytesSet {
@ -450,7 +450,7 @@ func CollectDomainBlock(ch chan<- prometheus.Metric, stat []libvirt.DomainStatsB
libvirtDomainBlockRdBytes, libvirtDomainBlockRdBytes,
prometheus.CounterValue, prometheus.CounterValue,
float64(block.RdBytes), float64(block.RdBytes),
config.Section("").Key("id").String(), config.ClusterID,
desc.Metadata.DeevirtInstance.DeevirtCompanyID, desc.Metadata.DeevirtInstance.DeevirtCompanyID,
desc.Metadata.DeevirtInstance.DeevirtDatacenterID, desc.Metadata.DeevirtInstance.DeevirtDatacenterID,
domainUUID, domainUUID,
@ -461,7 +461,7 @@ func CollectDomainBlock(ch chan<- prometheus.Metric, stat []libvirt.DomainStatsB
libvirtDomainBlockRdReq, libvirtDomainBlockRdReq,
prometheus.CounterValue, prometheus.CounterValue,
float64(block.RdReqs), float64(block.RdReqs),
config.Section("").Key("id").String(), config.ClusterID,
desc.Metadata.DeevirtInstance.DeevirtCompanyID, desc.Metadata.DeevirtInstance.DeevirtCompanyID,
desc.Metadata.DeevirtInstance.DeevirtDatacenterID, desc.Metadata.DeevirtInstance.DeevirtDatacenterID,
domainUUID, domainUUID,
@ -472,7 +472,7 @@ func CollectDomainBlock(ch chan<- prometheus.Metric, stat []libvirt.DomainStatsB
libvirtDomainBlockRdTotalTimeSeconds, libvirtDomainBlockRdTotalTimeSeconds,
prometheus.CounterValue, prometheus.CounterValue,
float64(block.RdTimes)/1e9, float64(block.RdTimes)/1e9,
config.Section("").Key("id").String(), config.ClusterID,
desc.Metadata.DeevirtInstance.DeevirtCompanyID, desc.Metadata.DeevirtInstance.DeevirtCompanyID,
desc.Metadata.DeevirtInstance.DeevirtDatacenterID, desc.Metadata.DeevirtInstance.DeevirtDatacenterID,
domainUUID, domainUUID,
@ -483,7 +483,7 @@ func CollectDomainBlock(ch chan<- prometheus.Metric, stat []libvirt.DomainStatsB
libvirtDomainBlockWrBytes, libvirtDomainBlockWrBytes,
prometheus.CounterValue, prometheus.CounterValue,
float64(block.WrBytes), float64(block.WrBytes),
config.Section("").Key("id").String(), config.ClusterID,
desc.Metadata.DeevirtInstance.DeevirtCompanyID, desc.Metadata.DeevirtInstance.DeevirtCompanyID,
desc.Metadata.DeevirtInstance.DeevirtDatacenterID, desc.Metadata.DeevirtInstance.DeevirtDatacenterID,
domainUUID, domainUUID,
@ -494,7 +494,7 @@ func CollectDomainBlock(ch chan<- prometheus.Metric, stat []libvirt.DomainStatsB
libvirtDomainBlockWrReq, libvirtDomainBlockWrReq,
prometheus.CounterValue, prometheus.CounterValue,
float64(block.WrReqs), float64(block.WrReqs),
config.Section("").Key("id").String(), config.ClusterID,
desc.Metadata.DeevirtInstance.DeevirtCompanyID, desc.Metadata.DeevirtInstance.DeevirtCompanyID,
desc.Metadata.DeevirtInstance.DeevirtDatacenterID, desc.Metadata.DeevirtInstance.DeevirtDatacenterID,
domainUUID, domainUUID,
@ -505,7 +505,7 @@ func CollectDomainBlock(ch chan<- prometheus.Metric, stat []libvirt.DomainStatsB
libvirtDomainBlockWrTotalTimes, libvirtDomainBlockWrTotalTimes,
prometheus.CounterValue, prometheus.CounterValue,
float64(block.WrTimes)/1e9, float64(block.WrTimes)/1e9,
config.Section("").Key("id").String(), config.ClusterID,
desc.Metadata.DeevirtInstance.DeevirtCompanyID, desc.Metadata.DeevirtInstance.DeevirtCompanyID,
desc.Metadata.DeevirtInstance.DeevirtDatacenterID, desc.Metadata.DeevirtInstance.DeevirtDatacenterID,
domainUUID, domainUUID,
@ -516,7 +516,7 @@ func CollectDomainBlock(ch chan<- prometheus.Metric, stat []libvirt.DomainStatsB
libvirtDomainBlockFlushReq, libvirtDomainBlockFlushReq,
prometheus.CounterValue, prometheus.CounterValue,
float64(block.FlReqs), float64(block.FlReqs),
config.Section("").Key("id").String(), config.ClusterID,
desc.Metadata.DeevirtInstance.DeevirtCompanyID, desc.Metadata.DeevirtInstance.DeevirtCompanyID,
desc.Metadata.DeevirtInstance.DeevirtDatacenterID, desc.Metadata.DeevirtInstance.DeevirtDatacenterID,
domainUUID, domainUUID,
@ -527,7 +527,7 @@ func CollectDomainBlock(ch chan<- prometheus.Metric, stat []libvirt.DomainStatsB
libvirtDomainBlockFlushTotalTimeSeconds, libvirtDomainBlockFlushTotalTimeSeconds,
prometheus.CounterValue, prometheus.CounterValue,
float64(block.FlTimes)/1e9, float64(block.FlTimes)/1e9,
config.Section("").Key("id").String(), config.ClusterID,
desc.Metadata.DeevirtInstance.DeevirtCompanyID, desc.Metadata.DeevirtInstance.DeevirtCompanyID,
desc.Metadata.DeevirtInstance.DeevirtDatacenterID, desc.Metadata.DeevirtInstance.DeevirtDatacenterID,
domainUUID, domainUUID,
@ -538,7 +538,7 @@ func CollectDomainBlock(ch chan<- prometheus.Metric, stat []libvirt.DomainStatsB
libvirtDomainBlockAllocation, libvirtDomainBlockAllocation,
prometheus.GaugeValue, prometheus.GaugeValue,
float64(block.Allocation), float64(block.Allocation),
config.Section("").Key("id").String(), config.ClusterID,
desc.Metadata.DeevirtInstance.DeevirtCompanyID, desc.Metadata.DeevirtInstance.DeevirtCompanyID,
desc.Metadata.DeevirtInstance.DeevirtDatacenterID, desc.Metadata.DeevirtInstance.DeevirtDatacenterID,
domainUUID, domainUUID,
@ -549,7 +549,7 @@ func CollectDomainBlock(ch chan<- prometheus.Metric, stat []libvirt.DomainStatsB
libvirtDomainBlockCapacityBytes, libvirtDomainBlockCapacityBytes,
prometheus.GaugeValue, prometheus.GaugeValue,
float64(block.Capacity), float64(block.Capacity),
config.Section("").Key("id").String(), config.ClusterID,
desc.Metadata.DeevirtInstance.DeevirtCompanyID, desc.Metadata.DeevirtInstance.DeevirtCompanyID,
desc.Metadata.DeevirtInstance.DeevirtDatacenterID, desc.Metadata.DeevirtInstance.DeevirtDatacenterID,
domainUUID, domainUUID,
@ -560,7 +560,7 @@ func CollectDomainBlock(ch chan<- prometheus.Metric, stat []libvirt.DomainStatsB
libvirtDomainBlockPhysicalSizeBytes, libvirtDomainBlockPhysicalSizeBytes,
prometheus.GaugeValue, prometheus.GaugeValue,
float64(block.Physical), float64(block.Physical),
config.Section("").Key("id").String(), config.ClusterID,
desc.Metadata.DeevirtInstance.DeevirtCompanyID, desc.Metadata.DeevirtInstance.DeevirtCompanyID,
desc.Metadata.DeevirtInstance.DeevirtDatacenterID, desc.Metadata.DeevirtInstance.DeevirtDatacenterID,
domainUUID, domainUUID,
@ -569,7 +569,7 @@ func CollectDomainBlock(ch chan<- prometheus.Metric, stat []libvirt.DomainStatsB
} }
} }
func CollectDomainNet(ch chan<- prometheus.Metric, stat []libvirt.DomainStatsNet, hostname string, domainUUID string, config *ini.File, desc schema.Domain) { func CollectDomainNet(ch chan<- prometheus.Metric, stat []libvirt.DomainStatsNet, hostname string, domainUUID string, config *config.Config, desc schema.Domain) {
for _, iface := range stat { for _, iface := range stat {
if iface.RxBytesSet { if iface.RxBytesSet {
@ -577,7 +577,7 @@ func CollectDomainNet(ch chan<- prometheus.Metric, stat []libvirt.DomainStatsNet
libvirtDomainInterfaceRxBytes, libvirtDomainInterfaceRxBytes,
prometheus.CounterValue, prometheus.CounterValue,
float64(iface.RxBytes), float64(iface.RxBytes),
config.Section("").Key("id").String(), config.ClusterID,
desc.Metadata.DeevirtInstance.DeevirtCompanyID, desc.Metadata.DeevirtInstance.DeevirtCompanyID,
desc.Metadata.DeevirtInstance.DeevirtDatacenterID, desc.Metadata.DeevirtInstance.DeevirtDatacenterID,
domainUUID, domainUUID,
@ -588,7 +588,7 @@ func CollectDomainNet(ch chan<- prometheus.Metric, stat []libvirt.DomainStatsNet
libvirtDomainInterfaceRxPackets, libvirtDomainInterfaceRxPackets,
prometheus.CounterValue, prometheus.CounterValue,
float64(iface.RxPkts), float64(iface.RxPkts),
config.Section("").Key("id").String(), config.ClusterID,
desc.Metadata.DeevirtInstance.DeevirtCompanyID, desc.Metadata.DeevirtInstance.DeevirtCompanyID,
desc.Metadata.DeevirtInstance.DeevirtDatacenterID, desc.Metadata.DeevirtInstance.DeevirtDatacenterID,
domainUUID, domainUUID,
@ -599,7 +599,7 @@ func CollectDomainNet(ch chan<- prometheus.Metric, stat []libvirt.DomainStatsNet
libvirtDomainInterfaceRxErrs, libvirtDomainInterfaceRxErrs,
prometheus.CounterValue, prometheus.CounterValue,
float64(iface.RxErrs), float64(iface.RxErrs),
config.Section("").Key("id").String(), config.ClusterID,
desc.Metadata.DeevirtInstance.DeevirtCompanyID, desc.Metadata.DeevirtInstance.DeevirtCompanyID,
desc.Metadata.DeevirtInstance.DeevirtDatacenterID, desc.Metadata.DeevirtInstance.DeevirtDatacenterID,
domainUUID, domainUUID,
@ -610,7 +610,7 @@ func CollectDomainNet(ch chan<- prometheus.Metric, stat []libvirt.DomainStatsNet
libvirtDomainInterfaceRxDrop, libvirtDomainInterfaceRxDrop,
prometheus.CounterValue, prometheus.CounterValue,
float64(iface.RxDrop), float64(iface.RxDrop),
config.Section("").Key("id").String(), config.ClusterID,
desc.Metadata.DeevirtInstance.DeevirtCompanyID, desc.Metadata.DeevirtInstance.DeevirtCompanyID,
desc.Metadata.DeevirtInstance.DeevirtDatacenterID, desc.Metadata.DeevirtInstance.DeevirtDatacenterID,
domainUUID, domainUUID,
@ -621,7 +621,7 @@ func CollectDomainNet(ch chan<- prometheus.Metric, stat []libvirt.DomainStatsNet
libvirtDomainInterfaceTxBytes, libvirtDomainInterfaceTxBytes,
prometheus.CounterValue, prometheus.CounterValue,
float64(iface.TxBytes), float64(iface.TxBytes),
config.Section("").Key("id").String(), config.ClusterID,
desc.Metadata.DeevirtInstance.DeevirtCompanyID, desc.Metadata.DeevirtInstance.DeevirtCompanyID,
desc.Metadata.DeevirtInstance.DeevirtDatacenterID, desc.Metadata.DeevirtInstance.DeevirtDatacenterID,
domainUUID, domainUUID,
@ -632,7 +632,7 @@ func CollectDomainNet(ch chan<- prometheus.Metric, stat []libvirt.DomainStatsNet
libvirtDomainInterfaceTxPackets, libvirtDomainInterfaceTxPackets,
prometheus.CounterValue, prometheus.CounterValue,
float64(iface.TxPkts), float64(iface.TxPkts),
config.Section("").Key("id").String(), config.ClusterID,
desc.Metadata.DeevirtInstance.DeevirtCompanyID, desc.Metadata.DeevirtInstance.DeevirtCompanyID,
desc.Metadata.DeevirtInstance.DeevirtDatacenterID, desc.Metadata.DeevirtInstance.DeevirtDatacenterID,
domainUUID, domainUUID,
@ -643,7 +643,7 @@ func CollectDomainNet(ch chan<- prometheus.Metric, stat []libvirt.DomainStatsNet
libvirtDomainInterfaceTxErrs, libvirtDomainInterfaceTxErrs,
prometheus.CounterValue, prometheus.CounterValue,
float64(iface.TxErrs), float64(iface.TxErrs),
config.Section("").Key("id").String(), config.ClusterID,
desc.Metadata.DeevirtInstance.DeevirtCompanyID, desc.Metadata.DeevirtInstance.DeevirtCompanyID,
desc.Metadata.DeevirtInstance.DeevirtDatacenterID, desc.Metadata.DeevirtInstance.DeevirtDatacenterID,
domainUUID, domainUUID,
@ -654,7 +654,7 @@ func CollectDomainNet(ch chan<- prometheus.Metric, stat []libvirt.DomainStatsNet
libvirtDomainInterfaceTxDrop, libvirtDomainInterfaceTxDrop,
prometheus.CounterValue, prometheus.CounterValue,
float64(iface.TxDrop), float64(iface.TxDrop),
config.Section("").Key("id").String(), config.ClusterID,
desc.Metadata.DeevirtInstance.DeevirtCompanyID, desc.Metadata.DeevirtInstance.DeevirtCompanyID,
desc.Metadata.DeevirtInstance.DeevirtDatacenterID, desc.Metadata.DeevirtInstance.DeevirtDatacenterID,
domainUUID, domainUUID,

View File

@ -1,92 +0,0 @@
package schema
type Domain struct {
Devices Devices `xml:"devices"`
Metadata Metadata `xml:"metadata"`
}
type Metadata struct {
DeevirtInstance Instance `xml:"instance"`
}
type Instance struct {
DeevirtCompanyID string `xml:"company_id"`
DeevirtDatacenterID string `xml:"datacenter_id"`
}
type User struct {
UserName string `xml:",chardata"`
UserUUID string `xml:"uuid,attr"`
}
type Project struct {
ProjectName string `xml:",chardata"`
ProjectUUID string `xml:"uuid,attr"`
}
type Root struct {
RootType string `xml:"type,attr"`
RootUUID string `xml:"uuid,attr"`
}
type Devices struct {
Disks []Disk `xml:"disk"`
Interfaces []Interface `xml:"interface"`
}
type Disk struct {
Device string `xml:"device,attr"`
Driver DiskDriver `xml:"driver"`
Source DiskSource `xml:"source"`
Target DiskTarget `xml:"target"`
DiskType string `xml:"type,attr"`
Serial string `xml:"serial"`
}
type DiskDriver struct {
Type string `xml:"type,attr"`
Cache string `xml:"cache,attr"`
Discard string `xml:"discard,attr"`
}
type DiskSource struct {
File string `xml:"file,attr"`
Name string `xml:"name,attr"`
}
type DiskTarget struct {
Device string `xml:"dev,attr"`
Bus string `xml:"bus,attr"`
}
type Interface struct {
Source InterfaceSource `xml:"source"`
Target InterfaceTarget `xml:"target"`
Virtualport InterfaceVirtualPort `xml:"virtualport"`
}
type InterfaceVirtualPort struct {
Parameters InterfaceVirtualPortParam `xml:"parameters"`
}
type InterfaceVirtualPortParam struct {
InterfaceID string `xml:"interfaceid,attr"`
}
type InterfaceSource struct {
Bridge string `xml:"bridge,attr"`
}
type InterfaceTarget struct {
Device string `xml:"dev,attr"`
}
type VirDomainMemoryStats struct {
MajorFault uint64
MinorFault uint64
Unused uint64
Available uint64
ActualBalloon uint64
Rss uint64
Usable uint64
DiskCaches uint64
}

View File

@ -8,7 +8,6 @@ import (
"github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promhttp" "github.com/prometheus/client_golang/prometheus/promhttp"
"gopkg.in/ini.v1"
"libvirt.org/go/libvirt" "libvirt.org/go/libvirt"
) )
@ -29,10 +28,6 @@ var (
errorsMap map[string]struct{} errorsMap map[string]struct{}
) )
func Config() (*ini.File, error) {
return ini.Load("/etc/deevirt/config.ini")
}
// WriteErrorOnce writes message to stdout only once // WriteErrorOnce writes message to stdout only once
// for the error // for the error
// "err" - an error message // "err" - an error message

98
pkg/amqp/client.go Normal file
View File

@ -0,0 +1,98 @@
package amqp
import (
"crypto/tls"
"errors"
"log"
"sync"
"deevirt.fr/compute/pkg/config"
"github.com/rabbitmq/amqp091-go"
)
var (
//errNotConnected = errors.New("not connected to a server")
errAlreadyClosed = errors.New("already closed: not connected to the server")
//errShutdown = errors.New("client is shutting down")
)
type Client struct {
m *sync.Mutex
connection *amqp091.Connection
channel *amqp091.Channel
done chan bool
isReady bool
}
func NewAMQP() (*Client, error) {
config, _ := config.NewConfig()
amqp_config := amqp091.Config{
Properties: amqp091.NewConnectionProperties(),
TLSClientConfig: &tls.Config{InsecureSkipVerify: true},
}
conn, err := amqp091.DialConfig(config.AmqpURI, amqp_config)
if err != nil {
log.Fatalf("producer: error in dial: %s", err)
}
//defer conn.Close()
log.Println("producer: got Connection, getting Channel")
channel, err := conn.Channel()
if err != nil {
log.Fatalf("error getting a channel: %s", err)
}
//defer channel.Close()
return &Client{
m: &sync.Mutex{},
connection: conn,
channel: channel,
done: make(chan bool),
isReady: true,
}, nil
}
func (client *Client) Publisher(exchange string, key string, body []byte) {
//log.Printf("producer: publishing %dB body (%q)", len(*body), *body)
_, err := client.channel.PublishWithDeferredConfirm(
exchange,
key,
true,
false,
amqp091.Publishing{
ContentType: "application/json",
DeliveryMode: amqp091.Persistent,
Body: body,
},
)
if err != nil {
log.Fatalf("producer: error in publish: %s", err)
}
}
func (client *Client) Close() error {
client.m.Lock()
// we read and write isReady in two locations, so we grab the lock and hold onto
// it until we are finished
defer client.m.Unlock()
if !client.isReady {
return errAlreadyClosed
}
close(client.done)
err := client.channel.Close()
if err != nil {
return err
}
err = client.connection.Close()
if err != nil {
return err
}
client.isReady = false
return nil
}

View File

@ -1,52 +0,0 @@
package amqp
import (
"crypto/tls"
"log"
"github.com/rabbitmq/amqp091-go"
"gopkg.in/ini.v1"
)
func Config() (*ini.File, error) {
return ini.Load("/etc/deevirt/config.ini")
}
func Publisher(body []byte) {
config, _ := Config()
amqp_config := amqp091.Config{
Properties: amqp091.NewConnectionProperties(),
TLSClientConfig: &tls.Config{InsecureSkipVerify: true},
}
amqp_config.Properties.SetClientConnectionName("producer-with-confirms")
conn, err := amqp091.DialConfig(config.Section("broker").Key("uri").String(), amqp_config)
if err != nil {
log.Fatalf("producer: error in dial: %s", err)
}
defer conn.Close()
log.Println("producer: got Connection, getting Channel")
channel, err := conn.Channel()
if err != nil {
log.Fatalf("error getting a channel: %s", err)
}
defer channel.Close()
//log.Printf("producer: publishing %dB body (%q)", len(*body), *body)
_, err = channel.PublishWithDeferredConfirm(
"vmcenter",
"cluster.f242b4bb-b6d0-415f-b3f9-9e9d439532b5.dom.add",
true,
false,
amqp091.Publishing{
ContentType: "text/plain",
DeliveryMode: amqp091.Persistent,
Body: body,
},
)
if err != nil {
log.Fatalf("producer: error in publish: %s", err)
}
}

41
pkg/config/config.go Normal file
View File

@ -0,0 +1,41 @@
package config
import (
"encoding/hex"
"fmt"
"log"
"github.com/denisbrodbeck/machineid"
"gopkg.in/ini.v1"
)
type Config struct {
ClusterID string
NodeID string
AmqpURI string
EtcdURI string
LibvirtTLS bool
}
func NewConfig() (*Config, error) {
c, err := ini.Load("/etc/deevirt/config.ini")
if err != nil {
log.Fatal(err)
}
id, err := machineid.ID()
if err != nil {
log.Fatal(err)
}
mID, _ := hex.DecodeString(id)
libvirtTLS, _ := c.Section("libvirt").Key("tls").Bool()
return &Config{
ClusterID: c.Section("").Key("id").String(),
NodeID: fmt.Sprintf("%x-%x-%x-%x-%x", mID[:4], mID[4:6], mID[6:8], mID[8:10], mID[10:]),
AmqpURI: c.Section("broker").Key("uri").String(),
EtcdURI: c.Section("etcd").Key("uri").String(),
LibvirtTLS: libvirtTLS,
}, nil
}

22
pkg/schema/domain.go Normal file
View File

@ -0,0 +1,22 @@
package schema
type Domain struct {
Metadata Metadata `xml:"metadata"`
}
type Metadata struct {
DeevirtInstance Instance `xml:"instance"`
}
type Instance struct {
DeevirtCompanyID string `xml:"company_id"`
DeevirtDatacenterID string `xml:"datacenter_id"`
}
// JSON SCHEMA for AMQP
type DomainStateJSON struct {
CompanyID string `json:"company_id"`
DatacenterID string `json:"datacenter_id"`
DomainID string `json:"domain_id"`
State int64 `json:"state"`
}