Compare commits

...

7 Commits

10 changed files with 257 additions and 250 deletions

View File

@ -2,29 +2,27 @@ package events
import (
"context"
"encoding/json"
"encoding/xml"
"fmt"
"log"
"strconv"
"strings"
"time"
"deevirt.fr/compute/pkg/amqp"
"deevirt.fr/compute/pkg/config"
"deevirt.fr/compute/pkg/schema"
clientv3 "go.etcd.io/etcd/client/v3"
"libvirt.org/go/libvirt"
)
func Lifecyle(c *libvirt.Connect, d *libvirt.Domain, e *libvirt.DomainEventLifecycle) {
var detail, event string
config, _ := config.NewConfig()
domainID, _ := d.GetUUIDString()
// Read Etcd URI
config, err := Config()
if err != nil {
log.Fatalf("Fail to read file: %v", err)
}
etcd, err := clientv3.New(clientv3.Config{
Endpoints: strings.Split(config.Section("etcd").Key("uri").String(), ","),
Endpoints: strings.Split(config.EtcdURI, ","),
DialTimeout: 5 * time.Second,
})
if err != nil {
@ -37,7 +35,7 @@ func Lifecyle(c *libvirt.Connect, d *libvirt.Domain, e *libvirt.DomainEventLifec
switch e.Event {
case libvirt.DOMAIN_EVENT_DEFINED:
event = "defined"
etcd.Put(ctx, "/cluster/"+ClusterID()+"/host/"+MachineID()+"/qemu/"+domainID, "")
etcd.Put(ctx, "/cluster/"+config.ClusterID+"/host/"+config.NodeID+"/qemu/"+domainID, "")
switch libvirt.DomainEventDefinedDetailType(e.Detail) {
case libvirt.DOMAIN_EVENT_DEFINED_ADDED:
@ -54,7 +52,7 @@ func Lifecyle(c *libvirt.Connect, d *libvirt.Domain, e *libvirt.DomainEventLifec
case libvirt.DOMAIN_EVENT_UNDEFINED:
event = "undefined"
etcd.Delete(ctx, "/cluster/"+ClusterID()+"/host/"+MachineID()+"/qemu/"+domainID)
etcd.Delete(ctx, "/cluster/"+config.ClusterID+"/host/"+config.NodeID+"/qemu/"+domainID)
switch libvirt.DomainEventUndefinedDetailType(e.Detail) {
case libvirt.DOMAIN_EVENT_UNDEFINED_REMOVED:
@ -67,7 +65,7 @@ func Lifecyle(c *libvirt.Connect, d *libvirt.Domain, e *libvirt.DomainEventLifec
case libvirt.DOMAIN_EVENT_STARTED:
event = "started"
etcd.Put(ctx, "/cluster/"+ClusterID()+"/domain/"+domainID+"/state", "2")
etcd.Put(ctx, "/cluster/"+config.ClusterID+"/domain/"+domainID+"/state", "2")
switch libvirt.DomainEventStartedDetailType(e.Detail) {
case libvirt.DOMAIN_EVENT_STARTED_BOOTED:
@ -86,7 +84,7 @@ func Lifecyle(c *libvirt.Connect, d *libvirt.Domain, e *libvirt.DomainEventLifec
case libvirt.DOMAIN_EVENT_SUSPENDED:
event = "suspended"
etcd.Put(ctx, "/cluster/"+ClusterID()+"/domain/"+domainID+"/state", "3")
etcd.Put(ctx, "/cluster/"+config.ClusterID+"/domain/"+domainID+"/state", "3")
switch libvirt.DomainEventSuspendedDetailType(e.Detail) {
case libvirt.DOMAIN_EVENT_SUSPENDED_PAUSED:
@ -113,7 +111,7 @@ func Lifecyle(c *libvirt.Connect, d *libvirt.Domain, e *libvirt.DomainEventLifec
case libvirt.DOMAIN_EVENT_RESUMED:
event = "resumed"
etcd.Put(ctx, "/cluster/"+ClusterID()+"/domain/"+domainID+"/state", "4")
etcd.Put(ctx, "/cluster/"+config.ClusterID+"/domain/"+domainID+"/state", "4")
switch libvirt.DomainEventResumedDetailType(e.Detail) {
case libvirt.DOMAIN_EVENT_RESUMED_UNPAUSED:
@ -132,7 +130,7 @@ func Lifecyle(c *libvirt.Connect, d *libvirt.Domain, e *libvirt.DomainEventLifec
case libvirt.DOMAIN_EVENT_STOPPED:
event = "stopped"
etcd.Put(ctx, "/cluster/"+ClusterID()+"/domain/"+domainID+"/state", "5")
etcd.Put(ctx, "/cluster/"+config.ClusterID+"/domain/"+domainID+"/state", "5")
switch libvirt.DomainEventStoppedDetailType(e.Detail) {
case libvirt.DOMAIN_EVENT_STOPPED_SHUTDOWN:
@ -155,7 +153,7 @@ func Lifecyle(c *libvirt.Connect, d *libvirt.Domain, e *libvirt.DomainEventLifec
case libvirt.DOMAIN_EVENT_SHUTDOWN:
event = "shutdown"
etcd.Put(ctx, "/cluster/"+ClusterID()+"/domain/"+domainID+"/state", "6")
etcd.Put(ctx, "/cluster/"+config.ClusterID+"/domain/"+domainID+"/state", "6")
switch libvirt.DomainEventShutdownDetailType(e.Detail) {
case libvirt.DOMAIN_EVENT_SHUTDOWN_FINISHED:
@ -172,8 +170,33 @@ func Lifecyle(c *libvirt.Connect, d *libvirt.Domain, e *libvirt.DomainEventLifec
event = "unknown"
}
// AMQP
amqp.Publisher([]byte(strconv.FormatInt(int64(e.Event), 10)))
// Send event for all clients
if e.Event != libvirt.DOMAIN_EVENT_DEFINED|libvirt.DOMAIN_EVENT_UNDEFINED {
xmlDesc, err := d.GetXMLDesc(0)
if err != nil {
log.Fatalln(err)
}
var desc schema.Domain
err = xml.Unmarshal([]byte(xmlDesc), &desc)
if err != nil {
log.Fatalln(err)
}
state, _ := json.Marshal(&schema.DomainStateJSON{
CompanyID: desc.Metadata.DeevirtInstance.DeevirtCompanyID,
DatacenterID: desc.Metadata.DeevirtInstance.DeevirtDatacenterID,
DomainID: domainID,
State: int64(e.Event),
})
a, _ := amqp.NewAMQP()
a.Publisher("vmcenter",
"events."+desc.Metadata.DeevirtInstance.DeevirtCompanyID+
"."+desc.Metadata.DeevirtInstance.DeevirtDatacenterID+
"."+domainID,
state)
a.Close()
}
fmt.Printf("Domain event=%q detail=%q\n", event, detail)
}

View File

@ -1,34 +1,12 @@
package events
import (
"encoding/hex"
"fmt"
"log"
"os"
"github.com/denisbrodbeck/machineid"
"gopkg.in/ini.v1"
"libvirt.org/go/libvirt"
)
func Config() (*ini.File, error) {
return ini.Load("/etc/deevirt/config.ini")
}
var ClusterID = func() string {
config, _ := Config()
return config.Section("").Key("id").String()
}
var MachineID = func() string {
id, err := machineid.ID()
if err != nil {
log.Fatal(err)
}
u, _ := hex.DecodeString(id)
return fmt.Sprintf("%x-%x-%x-%x-%x\n", u[:4], u[4:6], u[6:8], u[8:10], u[10:])
}
func agentLifecycle(c *libvirt.Connect, d *libvirt.Domain, event *libvirt.DomainEventAgentLifecycle) {
println(event.State)
println(event.Reason)

View File

@ -2,8 +2,8 @@ package metrics
import (
"log"
"strconv"
"deevirt.fr/compute/pkg/config"
"github.com/prometheus/client_golang/prometheus"
"libvirt.org/go/libvirt"
)
@ -13,26 +13,20 @@ var (
libvirtNodeCPUUsage = prometheus.NewDesc(
prometheus.BuildFQName("libvirt", "node", "cpu_time_seconds_total"),
"CPU usage of node",
[]string{"cluster", "node", "thread"},
[]string{"cluster_id", "node_id", "node"},
nil)
libvirtNodeMemoryUsageBytes = prometheus.NewDesc(
prometheus.BuildFQName("libvirt", "node", "memory_usage_bytes"),
"Memory usage of the node, in bytes.",
[]string{"cluster", "node", "total"},
[]string{"cluster_id", "node_id", "node"},
nil)
)
func CollectNode(conn *libvirt.Connect, ch chan<- prometheus.Metric, hostname string) error {
config, err := Config()
config, err := config.NewConfig()
if err != nil {
log.Fatalf("Fail to read file: %v", err)
}
// Node
node_info, err := conn.GetNodeInfo()
if err != nil {
return err
log.Fatalln(err)
}
nodeCPU, _ := conn.GetCPUStats(int(libvirt.NODE_CPU_STATS_ALL_CPUS), 0) // rate(libvirt_node_cpu_time_seconds_total[10s]) * 100
@ -42,18 +36,18 @@ func CollectNode(conn *libvirt.Connect, ch chan<- prometheus.Metric, hostname st
libvirtNodeCPUUsage,
prometheus.CounterValue,
float64(nodeCPU.Kernel+nodeCPU.User+nodeCPU.Iowait)/1e9, // From nsec to sec
config.Section("").Key("id").String(),
config.ClusterID,
config.NodeID,
hostname,
strconv.FormatInt(int64(node_info.Sockets*node_info.Cores*node_info.Threads), 10),
)
ch <- prometheus.MustNewConstMetric(
libvirtNodeMemoryUsageBytes,
prometheus.GaugeValue,
float64(nodeMemory.Total-(nodeMemory.Buffers+nodeMemory.Free+nodeMemory.Cached))*1024,
config.Section("").Key("id").String(),
config.ClusterID,
config.NodeID,
hostname,
strconv.FormatInt(int64(nodeMemory.Total), 10),
)
return nil

View File

@ -5,9 +5,9 @@ import (
"log"
"strconv"
"deevirt.fr/compute/cmd/qemu/metrics/schema"
"deevirt.fr/compute/pkg/config"
"deevirt.fr/compute/pkg/schema"
"github.com/prometheus/client_golang/prometheus"
"gopkg.in/ini.v1"
"libvirt.org/go/libvirt"
)
@ -231,9 +231,9 @@ var (
)
func CollectDomain(ch chan<- prometheus.Metric, stat libvirt.DomainStats, hostname string) error {
config, err := Config()
config, err := config.NewConfig()
if err != nil {
log.Fatalf("Fail to read file: %v", err)
log.Fatalln(err)
}
domainUUID, err := stat.Domain.GetUUIDString()
@ -262,7 +262,7 @@ func CollectDomain(ch chan<- prometheus.Metric, stat libvirt.DomainStats, hostna
libvirtDomainInfoMaxMemBytes,
prometheus.GaugeValue,
float64(info.MaxMem)*1024,
config.Section("").Key("id").String(),
config.ClusterID,
desc.Metadata.DeevirtInstance.DeevirtCompanyID,
desc.Metadata.DeevirtInstance.DeevirtDatacenterID,
domainUUID)
@ -270,7 +270,7 @@ func CollectDomain(ch chan<- prometheus.Metric, stat libvirt.DomainStats, hostna
libvirtDomainInfoMemoryUsageBytes,
prometheus.GaugeValue,
float64(info.Memory)*1024,
config.Section("").Key("id").String(),
config.ClusterID,
desc.Metadata.DeevirtInstance.DeevirtCompanyID,
desc.Metadata.DeevirtInstance.DeevirtDatacenterID,
domainUUID)
@ -278,7 +278,7 @@ func CollectDomain(ch chan<- prometheus.Metric, stat libvirt.DomainStats, hostna
libvirtDomainInfoNrVirtCPU,
prometheus.GaugeValue,
float64(info.NrVirtCpu),
config.Section("").Key("id").String(),
config.ClusterID,
desc.Metadata.DeevirtInstance.DeevirtCompanyID,
desc.Metadata.DeevirtInstance.DeevirtDatacenterID,
domainUUID)
@ -286,7 +286,7 @@ func CollectDomain(ch chan<- prometheus.Metric, stat libvirt.DomainStats, hostna
libvirtDomainInfoCPUTime,
prometheus.CounterValue,
float64(info.CpuTime)/1e9, // From nsec to sec
config.Section("").Key("id").String(),
config.ClusterID,
desc.Metadata.DeevirtInstance.DeevirtCompanyID,
desc.Metadata.DeevirtInstance.DeevirtDatacenterID,
domainUUID)
@ -294,7 +294,7 @@ func CollectDomain(ch chan<- prometheus.Metric, stat libvirt.DomainStats, hostna
libvirtDomainInfoVirDomainState,
prometheus.GaugeValue,
float64(info.State),
config.Section("").Key("id").String(),
config.ClusterID,
desc.Metadata.DeevirtInstance.DeevirtCompanyID,
desc.Metadata.DeevirtInstance.DeevirtDatacenterID,
domainUUID)
@ -308,13 +308,13 @@ func CollectDomain(ch chan<- prometheus.Metric, stat libvirt.DomainStats, hostna
return nil
}
func CollectDomainVCPU(ch chan<- prometheus.Metric, stat []libvirt.DomainStatsVcpu, hostname string, domainUUID string, config *ini.File, desc schema.Domain) {
func CollectDomainVCPU(ch chan<- prometheus.Metric, stat []libvirt.DomainStatsVcpu, hostname string, domainUUID string, config *config.Config, desc schema.Domain) {
for idx, vcpu := range stat {
ch <- prometheus.MustNewConstMetric(
libvirtDomainVcpuState,
prometheus.GaugeValue,
float64(vcpu.State),
config.Section("").Key("id").String(),
config.ClusterID,
desc.Metadata.DeevirtInstance.DeevirtCompanyID,
desc.Metadata.DeevirtInstance.DeevirtDatacenterID,
domainUUID,
@ -323,7 +323,7 @@ func CollectDomainVCPU(ch chan<- prometheus.Metric, stat []libvirt.DomainStatsVc
libvirtDomainVcpuTime,
prometheus.CounterValue,
float64(vcpu.Time)/1000/1000/1000, // From nsec to sec
config.Section("").Key("id").String(),
config.ClusterID,
desc.Metadata.DeevirtInstance.DeevirtCompanyID,
desc.Metadata.DeevirtInstance.DeevirtDatacenterID,
domainUUID,
@ -332,7 +332,7 @@ func CollectDomainVCPU(ch chan<- prometheus.Metric, stat []libvirt.DomainStatsVc
libvirtDomainVcpuWait,
prometheus.CounterValue,
float64(vcpu.Wait)/1e9, // From nsec to sec
config.Section("").Key("id").String(),
config.ClusterID,
desc.Metadata.DeevirtInstance.DeevirtCompanyID,
desc.Metadata.DeevirtInstance.DeevirtDatacenterID,
domainUUID,
@ -341,7 +341,7 @@ func CollectDomainVCPU(ch chan<- prometheus.Metric, stat []libvirt.DomainStatsVc
libvirtDomainVcpuDelay,
prometheus.CounterValue,
float64(vcpu.Delay)/1e9, // From nsec to sec
config.Section("").Key("id").String(),
config.ClusterID,
desc.Metadata.DeevirtInstance.DeevirtCompanyID,
desc.Metadata.DeevirtInstance.DeevirtDatacenterID,
domainUUID,
@ -350,12 +350,12 @@ func CollectDomainVCPU(ch chan<- prometheus.Metric, stat []libvirt.DomainStatsVc
}
}
func CollectDomainBalloon(ch chan<- prometheus.Metric, stat *libvirt.DomainStatsBalloon, hostname string, domainUUID string, config *ini.File, desc schema.Domain) {
func CollectDomainBalloon(ch chan<- prometheus.Metric, stat *libvirt.DomainStatsBalloon, hostname string, domainUUID string, config *config.Config, desc schema.Domain) {
ch <- prometheus.MustNewConstMetric(
libvirtDomainBalloonStatCurrentBytes,
prometheus.GaugeValue,
float64(stat.Current)*1024,
config.Section("").Key("id").String(),
config.ClusterID,
desc.Metadata.DeevirtInstance.DeevirtCompanyID,
desc.Metadata.DeevirtInstance.DeevirtDatacenterID,
domainUUID)
@ -363,7 +363,7 @@ func CollectDomainBalloon(ch chan<- prometheus.Metric, stat *libvirt.DomainStats
libvirtDomainBalloonStatMaximumBytes,
prometheus.GaugeValue,
float64(stat.Maximum)*1024,
config.Section("").Key("id").String(),
config.ClusterID,
desc.Metadata.DeevirtInstance.DeevirtCompanyID,
desc.Metadata.DeevirtInstance.DeevirtDatacenterID,
domainUUID)
@ -371,7 +371,7 @@ func CollectDomainBalloon(ch chan<- prometheus.Metric, stat *libvirt.DomainStats
libvirtDomainBalloonStatSwapInBytes,
prometheus.GaugeValue,
float64(stat.SwapIn)*1024,
config.Section("").Key("id").String(),
config.ClusterID,
desc.Metadata.DeevirtInstance.DeevirtCompanyID,
desc.Metadata.DeevirtInstance.DeevirtDatacenterID,
domainUUID)
@ -379,7 +379,7 @@ func CollectDomainBalloon(ch chan<- prometheus.Metric, stat *libvirt.DomainStats
libvirtDomainBalloonStatSwapOutBytes,
prometheus.GaugeValue,
float64(stat.SwapOut)*1024,
config.Section("").Key("id").String(),
config.ClusterID,
desc.Metadata.DeevirtInstance.DeevirtCompanyID,
desc.Metadata.DeevirtInstance.DeevirtDatacenterID,
domainUUID)
@ -387,7 +387,7 @@ func CollectDomainBalloon(ch chan<- prometheus.Metric, stat *libvirt.DomainStats
libvirtDomainBalloonStatMajorFaultTotal,
prometheus.CounterValue,
float64(stat.MajorFault),
config.Section("").Key("id").String(),
config.ClusterID,
desc.Metadata.DeevirtInstance.DeevirtCompanyID,
desc.Metadata.DeevirtInstance.DeevirtDatacenterID,
domainUUID)
@ -395,7 +395,7 @@ func CollectDomainBalloon(ch chan<- prometheus.Metric, stat *libvirt.DomainStats
libvirtDomainBalloonStatMinorFaultTotal,
prometheus.CounterValue,
float64(stat.MinorFault),
config.Section("").Key("id").String(),
config.ClusterID,
desc.Metadata.DeevirtInstance.DeevirtCompanyID,
desc.Metadata.DeevirtInstance.DeevirtDatacenterID,
domainUUID)
@ -403,7 +403,7 @@ func CollectDomainBalloon(ch chan<- prometheus.Metric, stat *libvirt.DomainStats
libvirtDomainBalloonStatUnusedBytes,
prometheus.GaugeValue,
float64(stat.Unused)*1024,
config.Section("").Key("id").String(),
config.ClusterID,
desc.Metadata.DeevirtInstance.DeevirtCompanyID,
desc.Metadata.DeevirtInstance.DeevirtDatacenterID,
domainUUID)
@ -411,7 +411,7 @@ func CollectDomainBalloon(ch chan<- prometheus.Metric, stat *libvirt.DomainStats
libvirtDomainBalloonStatAvailableBytes,
prometheus.GaugeValue,
float64(stat.Available)*1024,
config.Section("").Key("id").String(),
config.ClusterID,
desc.Metadata.DeevirtInstance.DeevirtCompanyID,
desc.Metadata.DeevirtInstance.DeevirtDatacenterID,
domainUUID)
@ -419,7 +419,7 @@ func CollectDomainBalloon(ch chan<- prometheus.Metric, stat *libvirt.DomainStats
libvirtDomainBalloonStatRssBytes,
prometheus.GaugeValue,
float64(stat.Rss)*1024,
config.Section("").Key("id").String(),
config.ClusterID,
desc.Metadata.DeevirtInstance.DeevirtCompanyID,
desc.Metadata.DeevirtInstance.DeevirtDatacenterID,
domainUUID)
@ -427,7 +427,7 @@ func CollectDomainBalloon(ch chan<- prometheus.Metric, stat *libvirt.DomainStats
libvirtDomainBalloonStatUsableBytes,
prometheus.GaugeValue,
float64(stat.Usable)*1024,
config.Section("").Key("id").String(),
config.ClusterID,
desc.Metadata.DeevirtInstance.DeevirtCompanyID,
desc.Metadata.DeevirtInstance.DeevirtDatacenterID,
domainUUID)
@ -435,14 +435,14 @@ func CollectDomainBalloon(ch chan<- prometheus.Metric, stat *libvirt.DomainStats
libvirtDomainBalloonStatDiskCachesBytes,
prometheus.GaugeValue,
float64(stat.DiskCaches)*1024,
config.Section("").Key("id").String(),
config.ClusterID,
desc.Metadata.DeevirtInstance.DeevirtCompanyID,
desc.Metadata.DeevirtInstance.DeevirtDatacenterID,
domainUUID)
}
func CollectDomainBlock(ch chan<- prometheus.Metric, stat []libvirt.DomainStatsBlock, hostname string, domainUUID string, config *ini.File, desc schema.Domain) {
func CollectDomainBlock(ch chan<- prometheus.Metric, stat []libvirt.DomainStatsBlock, hostname string, domainUUID string, config *config.Config, desc schema.Domain) {
for _, block := range stat {
if block.RdBytesSet {
@ -450,7 +450,7 @@ func CollectDomainBlock(ch chan<- prometheus.Metric, stat []libvirt.DomainStatsB
libvirtDomainBlockRdBytes,
prometheus.CounterValue,
float64(block.RdBytes),
config.Section("").Key("id").String(),
config.ClusterID,
desc.Metadata.DeevirtInstance.DeevirtCompanyID,
desc.Metadata.DeevirtInstance.DeevirtDatacenterID,
domainUUID,
@ -461,7 +461,7 @@ func CollectDomainBlock(ch chan<- prometheus.Metric, stat []libvirt.DomainStatsB
libvirtDomainBlockRdReq,
prometheus.CounterValue,
float64(block.RdReqs),
config.Section("").Key("id").String(),
config.ClusterID,
desc.Metadata.DeevirtInstance.DeevirtCompanyID,
desc.Metadata.DeevirtInstance.DeevirtDatacenterID,
domainUUID,
@ -472,7 +472,7 @@ func CollectDomainBlock(ch chan<- prometheus.Metric, stat []libvirt.DomainStatsB
libvirtDomainBlockRdTotalTimeSeconds,
prometheus.CounterValue,
float64(block.RdTimes)/1e9,
config.Section("").Key("id").String(),
config.ClusterID,
desc.Metadata.DeevirtInstance.DeevirtCompanyID,
desc.Metadata.DeevirtInstance.DeevirtDatacenterID,
domainUUID,
@ -483,7 +483,7 @@ func CollectDomainBlock(ch chan<- prometheus.Metric, stat []libvirt.DomainStatsB
libvirtDomainBlockWrBytes,
prometheus.CounterValue,
float64(block.WrBytes),
config.Section("").Key("id").String(),
config.ClusterID,
desc.Metadata.DeevirtInstance.DeevirtCompanyID,
desc.Metadata.DeevirtInstance.DeevirtDatacenterID,
domainUUID,
@ -494,7 +494,7 @@ func CollectDomainBlock(ch chan<- prometheus.Metric, stat []libvirt.DomainStatsB
libvirtDomainBlockWrReq,
prometheus.CounterValue,
float64(block.WrReqs),
config.Section("").Key("id").String(),
config.ClusterID,
desc.Metadata.DeevirtInstance.DeevirtCompanyID,
desc.Metadata.DeevirtInstance.DeevirtDatacenterID,
domainUUID,
@ -505,7 +505,7 @@ func CollectDomainBlock(ch chan<- prometheus.Metric, stat []libvirt.DomainStatsB
libvirtDomainBlockWrTotalTimes,
prometheus.CounterValue,
float64(block.WrTimes)/1e9,
config.Section("").Key("id").String(),
config.ClusterID,
desc.Metadata.DeevirtInstance.DeevirtCompanyID,
desc.Metadata.DeevirtInstance.DeevirtDatacenterID,
domainUUID,
@ -516,7 +516,7 @@ func CollectDomainBlock(ch chan<- prometheus.Metric, stat []libvirt.DomainStatsB
libvirtDomainBlockFlushReq,
prometheus.CounterValue,
float64(block.FlReqs),
config.Section("").Key("id").String(),
config.ClusterID,
desc.Metadata.DeevirtInstance.DeevirtCompanyID,
desc.Metadata.DeevirtInstance.DeevirtDatacenterID,
domainUUID,
@ -527,7 +527,7 @@ func CollectDomainBlock(ch chan<- prometheus.Metric, stat []libvirt.DomainStatsB
libvirtDomainBlockFlushTotalTimeSeconds,
prometheus.CounterValue,
float64(block.FlTimes)/1e9,
config.Section("").Key("id").String(),
config.ClusterID,
desc.Metadata.DeevirtInstance.DeevirtCompanyID,
desc.Metadata.DeevirtInstance.DeevirtDatacenterID,
domainUUID,
@ -538,7 +538,7 @@ func CollectDomainBlock(ch chan<- prometheus.Metric, stat []libvirt.DomainStatsB
libvirtDomainBlockAllocation,
prometheus.GaugeValue,
float64(block.Allocation),
config.Section("").Key("id").String(),
config.ClusterID,
desc.Metadata.DeevirtInstance.DeevirtCompanyID,
desc.Metadata.DeevirtInstance.DeevirtDatacenterID,
domainUUID,
@ -549,7 +549,7 @@ func CollectDomainBlock(ch chan<- prometheus.Metric, stat []libvirt.DomainStatsB
libvirtDomainBlockCapacityBytes,
prometheus.GaugeValue,
float64(block.Capacity),
config.Section("").Key("id").String(),
config.ClusterID,
desc.Metadata.DeevirtInstance.DeevirtCompanyID,
desc.Metadata.DeevirtInstance.DeevirtDatacenterID,
domainUUID,
@ -560,7 +560,7 @@ func CollectDomainBlock(ch chan<- prometheus.Metric, stat []libvirt.DomainStatsB
libvirtDomainBlockPhysicalSizeBytes,
prometheus.GaugeValue,
float64(block.Physical),
config.Section("").Key("id").String(),
config.ClusterID,
desc.Metadata.DeevirtInstance.DeevirtCompanyID,
desc.Metadata.DeevirtInstance.DeevirtDatacenterID,
domainUUID,
@ -569,7 +569,7 @@ func CollectDomainBlock(ch chan<- prometheus.Metric, stat []libvirt.DomainStatsB
}
}
func CollectDomainNet(ch chan<- prometheus.Metric, stat []libvirt.DomainStatsNet, hostname string, domainUUID string, config *ini.File, desc schema.Domain) {
func CollectDomainNet(ch chan<- prometheus.Metric, stat []libvirt.DomainStatsNet, hostname string, domainUUID string, config *config.Config, desc schema.Domain) {
for _, iface := range stat {
if iface.RxBytesSet {
@ -577,7 +577,7 @@ func CollectDomainNet(ch chan<- prometheus.Metric, stat []libvirt.DomainStatsNet
libvirtDomainInterfaceRxBytes,
prometheus.CounterValue,
float64(iface.RxBytes),
config.Section("").Key("id").String(),
config.ClusterID,
desc.Metadata.DeevirtInstance.DeevirtCompanyID,
desc.Metadata.DeevirtInstance.DeevirtDatacenterID,
domainUUID,
@ -588,7 +588,7 @@ func CollectDomainNet(ch chan<- prometheus.Metric, stat []libvirt.DomainStatsNet
libvirtDomainInterfaceRxPackets,
prometheus.CounterValue,
float64(iface.RxPkts),
config.Section("").Key("id").String(),
config.ClusterID,
desc.Metadata.DeevirtInstance.DeevirtCompanyID,
desc.Metadata.DeevirtInstance.DeevirtDatacenterID,
domainUUID,
@ -599,7 +599,7 @@ func CollectDomainNet(ch chan<- prometheus.Metric, stat []libvirt.DomainStatsNet
libvirtDomainInterfaceRxErrs,
prometheus.CounterValue,
float64(iface.RxErrs),
config.Section("").Key("id").String(),
config.ClusterID,
desc.Metadata.DeevirtInstance.DeevirtCompanyID,
desc.Metadata.DeevirtInstance.DeevirtDatacenterID,
domainUUID,
@ -610,7 +610,7 @@ func CollectDomainNet(ch chan<- prometheus.Metric, stat []libvirt.DomainStatsNet
libvirtDomainInterfaceRxDrop,
prometheus.CounterValue,
float64(iface.RxDrop),
config.Section("").Key("id").String(),
config.ClusterID,
desc.Metadata.DeevirtInstance.DeevirtCompanyID,
desc.Metadata.DeevirtInstance.DeevirtDatacenterID,
domainUUID,
@ -621,7 +621,7 @@ func CollectDomainNet(ch chan<- prometheus.Metric, stat []libvirt.DomainStatsNet
libvirtDomainInterfaceTxBytes,
prometheus.CounterValue,
float64(iface.TxBytes),
config.Section("").Key("id").String(),
config.ClusterID,
desc.Metadata.DeevirtInstance.DeevirtCompanyID,
desc.Metadata.DeevirtInstance.DeevirtDatacenterID,
domainUUID,
@ -632,7 +632,7 @@ func CollectDomainNet(ch chan<- prometheus.Metric, stat []libvirt.DomainStatsNet
libvirtDomainInterfaceTxPackets,
prometheus.CounterValue,
float64(iface.TxPkts),
config.Section("").Key("id").String(),
config.ClusterID,
desc.Metadata.DeevirtInstance.DeevirtCompanyID,
desc.Metadata.DeevirtInstance.DeevirtDatacenterID,
domainUUID,
@ -643,7 +643,7 @@ func CollectDomainNet(ch chan<- prometheus.Metric, stat []libvirt.DomainStatsNet
libvirtDomainInterfaceTxErrs,
prometheus.CounterValue,
float64(iface.TxErrs),
config.Section("").Key("id").String(),
config.ClusterID,
desc.Metadata.DeevirtInstance.DeevirtCompanyID,
desc.Metadata.DeevirtInstance.DeevirtDatacenterID,
domainUUID,
@ -654,7 +654,7 @@ func CollectDomainNet(ch chan<- prometheus.Metric, stat []libvirt.DomainStatsNet
libvirtDomainInterfaceTxDrop,
prometheus.CounterValue,
float64(iface.TxDrop),
config.Section("").Key("id").String(),
config.ClusterID,
desc.Metadata.DeevirtInstance.DeevirtCompanyID,
desc.Metadata.DeevirtInstance.DeevirtDatacenterID,
domainUUID,

View File

@ -1,92 +0,0 @@
package schema
type Domain struct {
Devices Devices `xml:"devices"`
Metadata Metadata `xml:"metadata"`
}
type Metadata struct {
DeevirtInstance Instance `xml:"instance"`
}
type Instance struct {
DeevirtCompanyID string `xml:"company_id"`
DeevirtDatacenterID string `xml:"datacenter_id"`
}
type User struct {
UserName string `xml:",chardata"`
UserUUID string `xml:"uuid,attr"`
}
type Project struct {
ProjectName string `xml:",chardata"`
ProjectUUID string `xml:"uuid,attr"`
}
type Root struct {
RootType string `xml:"type,attr"`
RootUUID string `xml:"uuid,attr"`
}
type Devices struct {
Disks []Disk `xml:"disk"`
Interfaces []Interface `xml:"interface"`
}
type Disk struct {
Device string `xml:"device,attr"`
Driver DiskDriver `xml:"driver"`
Source DiskSource `xml:"source"`
Target DiskTarget `xml:"target"`
DiskType string `xml:"type,attr"`
Serial string `xml:"serial"`
}
type DiskDriver struct {
Type string `xml:"type,attr"`
Cache string `xml:"cache,attr"`
Discard string `xml:"discard,attr"`
}
type DiskSource struct {
File string `xml:"file,attr"`
Name string `xml:"name,attr"`
}
type DiskTarget struct {
Device string `xml:"dev,attr"`
Bus string `xml:"bus,attr"`
}
type Interface struct {
Source InterfaceSource `xml:"source"`
Target InterfaceTarget `xml:"target"`
Virtualport InterfaceVirtualPort `xml:"virtualport"`
}
type InterfaceVirtualPort struct {
Parameters InterfaceVirtualPortParam `xml:"parameters"`
}
type InterfaceVirtualPortParam struct {
InterfaceID string `xml:"interfaceid,attr"`
}
type InterfaceSource struct {
Bridge string `xml:"bridge,attr"`
}
type InterfaceTarget struct {
Device string `xml:"dev,attr"`
}
type VirDomainMemoryStats struct {
MajorFault uint64
MinorFault uint64
Unused uint64
Available uint64
ActualBalloon uint64
Rss uint64
Usable uint64
DiskCaches uint64
}

View File

@ -8,7 +8,6 @@ import (
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promhttp"
"gopkg.in/ini.v1"
"libvirt.org/go/libvirt"
)
@ -29,10 +28,6 @@ var (
errorsMap map[string]struct{}
)
func Config() (*ini.File, error) {
return ini.Load("/etc/deevirt/config.ini")
}
// WriteErrorOnce writes message to stdout only once
// for the error
// "err" - an error message

98
pkg/amqp/client.go Normal file
View File

@ -0,0 +1,98 @@
package amqp
import (
"crypto/tls"
"errors"
"log"
"sync"
"deevirt.fr/compute/pkg/config"
"github.com/rabbitmq/amqp091-go"
)
var (
//errNotConnected = errors.New("not connected to a server")
errAlreadyClosed = errors.New("already closed: not connected to the server")
//errShutdown = errors.New("client is shutting down")
)
type Client struct {
m *sync.Mutex
connection *amqp091.Connection
channel *amqp091.Channel
done chan bool
isReady bool
}
func NewAMQP() (*Client, error) {
config, _ := config.NewConfig()
amqp_config := amqp091.Config{
Properties: amqp091.NewConnectionProperties(),
TLSClientConfig: &tls.Config{InsecureSkipVerify: true},
}
conn, err := amqp091.DialConfig(config.AmqpURI, amqp_config)
if err != nil {
log.Fatalf("producer: error in dial: %s", err)
}
//defer conn.Close()
log.Println("producer: got Connection, getting Channel")
channel, err := conn.Channel()
if err != nil {
log.Fatalf("error getting a channel: %s", err)
}
//defer channel.Close()
return &Client{
m: &sync.Mutex{},
connection: conn,
channel: channel,
done: make(chan bool),
isReady: true,
}, nil
}
func (client *Client) Publisher(exchange string, key string, body []byte) {
//log.Printf("producer: publishing %dB body (%q)", len(*body), *body)
_, err := client.channel.PublishWithDeferredConfirm(
exchange,
key,
true,
false,
amqp091.Publishing{
ContentType: "application/json",
DeliveryMode: amqp091.Persistent,
Body: body,
},
)
if err != nil {
log.Fatalf("producer: error in publish: %s", err)
}
}
func (client *Client) Close() error {
client.m.Lock()
// we read and write isReady in two locations, so we grab the lock and hold onto
// it until we are finished
defer client.m.Unlock()
if !client.isReady {
return errAlreadyClosed
}
close(client.done)
err := client.channel.Close()
if err != nil {
return err
}
err = client.connection.Close()
if err != nil {
return err
}
client.isReady = false
return nil
}

View File

@ -1,52 +0,0 @@
package amqp
import (
"crypto/tls"
"log"
"github.com/rabbitmq/amqp091-go"
"gopkg.in/ini.v1"
)
func Config() (*ini.File, error) {
return ini.Load("/etc/deevirt/config.ini")
}
func Publisher(body []byte) {
config, _ := Config()
amqp_config := amqp091.Config{
Properties: amqp091.NewConnectionProperties(),
TLSClientConfig: &tls.Config{InsecureSkipVerify: true},
}
amqp_config.Properties.SetClientConnectionName("producer-with-confirms")
conn, err := amqp091.DialConfig(config.Section("broker").Key("uri").String(), amqp_config)
if err != nil {
log.Fatalf("producer: error in dial: %s", err)
}
defer conn.Close()
log.Println("producer: got Connection, getting Channel")
channel, err := conn.Channel()
if err != nil {
log.Fatalf("error getting a channel: %s", err)
}
defer channel.Close()
//log.Printf("producer: publishing %dB body (%q)", len(*body), *body)
_, err = channel.PublishWithDeferredConfirm(
"vmcenter",
"cluster.f242b4bb-b6d0-415f-b3f9-9e9d439532b5.dom.add",
true,
false,
amqp091.Publishing{
ContentType: "text/plain",
DeliveryMode: amqp091.Persistent,
Body: body,
},
)
if err != nil {
log.Fatalf("producer: error in publish: %s", err)
}
}

41
pkg/config/config.go Normal file
View File

@ -0,0 +1,41 @@
package config
import (
"encoding/hex"
"fmt"
"log"
"github.com/denisbrodbeck/machineid"
"gopkg.in/ini.v1"
)
type Config struct {
ClusterID string
NodeID string
AmqpURI string
EtcdURI string
LibvirtTLS bool
}
func NewConfig() (*Config, error) {
c, err := ini.Load("/etc/deevirt/config.ini")
if err != nil {
log.Fatal(err)
}
id, err := machineid.ID()
if err != nil {
log.Fatal(err)
}
mID, _ := hex.DecodeString(id)
libvirtTLS, _ := c.Section("libvirt").Key("tls").Bool()
return &Config{
ClusterID: c.Section("").Key("id").String(),
NodeID: fmt.Sprintf("%x-%x-%x-%x-%x", mID[:4], mID[4:6], mID[6:8], mID[8:10], mID[10:]),
AmqpURI: c.Section("broker").Key("uri").String(),
EtcdURI: c.Section("etcd").Key("uri").String(),
LibvirtTLS: libvirtTLS,
}, nil
}

22
pkg/schema/domain.go Normal file
View File

@ -0,0 +1,22 @@
package schema
type Domain struct {
Metadata Metadata `xml:"metadata"`
}
type Metadata struct {
DeevirtInstance Instance `xml:"instance"`
}
type Instance struct {
DeevirtCompanyID string `xml:"company_id"`
DeevirtDatacenterID string `xml:"datacenter_id"`
}
// JSON SCHEMA for AMQP
type DomainStateJSON struct {
CompanyID string `json:"company_id"`
DatacenterID string `json:"datacenter_id"`
DomainID string `json:"domain_id"`
State int64 `json:"state"`
}