diff --git a/client/searchctl.go b/client/searchctl.go index f415f1b..b084c13 100644 --- a/client/searchctl.go +++ b/client/searchctl.go @@ -7,6 +7,7 @@ import ( "fmt" "net" "os" + "io" ) type CommandMap struct { @@ -14,6 +15,19 @@ type CommandMap struct { Endpoint string `json:"Endpoint"` } +type RunningInstance struct { + Endpoint string `json:"endpoint"` + Software string `json:"software"` + Min_id string + Status int `json:"status"` +} + +type ResponseBack struct { + Type string `json:"Type"` + Message string `json:"Message"` + RunningInstances []RunningInstance `json:"RunningInstances"` +} + func main() { shutdownPtr := flag.Bool("shutdown", false, "Shutdown server") @@ -28,6 +42,7 @@ func main() { /* Condition verification */ totalflags := 0 var commandMap CommandMap + var responseback ResponseBack if *shutdownPtr == true { totalflags++ @@ -68,9 +83,34 @@ func main() { fmt.Println(err) return } - a := make([]byte, 4) - i := len(commandByte) - binary.LittleEndian.PutUint32(a, uint32(i)) - c.Write(a) + sizebytes := make([]byte, 4) + b := len(commandByte) + + // Send the request + binary.LittleEndian.PutUint32(sizebytes, uint32(b)) + c.Write(sizebytes) c.Write(commandByte) + + // Read the response + n, err := io.ReadFull(c, sizebytes) + if err != nil || n != 4 { + fmt.Println("err", err, n) + } + jsonsize := int(binary.LittleEndian.Uint32(sizebytes)) + responsebytes := make([]byte, jsonsize) + n, err = io.ReadFull(c, responsebytes) + err = json.Unmarshal(responsebytes, &responseback) + if err != nil { + fmt.Println("Unmarshal error", err) + } + + switch commandMap.Type { + case "add": + fmt.Println("Add instance") + case "status": + fmt.Println("Status:" + responseback.Message) + for x, runninginstance := range responseback.RunningInstances { + fmt.Println("ID:", x, runninginstance.Endpoint, runninginstance.Status) + } + } } diff --git a/poll/engine.go b/poll/engine.go index e6f9824..43d42b2 100644 --- a/poll/engine.go +++ b/poll/engine.go @@ -21,6 +21,7 @@ import ( const ( NEW_INSTANCE = 0 + INSTANCE_ERROR = -1 CLIENT_ISSUE = 600 ONION_PROTOCOL = 601 UNMARSHAL_ERROR = 602 @@ -67,10 +68,10 @@ type ReportInstance struct { // Instance's new min_id value type RunningInstance struct { - endpoint string `json:"endpoint"` - software string `json:"software"` - min_id string - status int `json:"status"` + Endpoint string `json:"endpoint"` + Software string `json:"software"` + Min_id string + Status int `json:"status"` } type NodeInfoSoftware struct { @@ -90,13 +91,14 @@ type CommandMap struct { type ResponseBack struct { Type string `json:"Type"` Message string `json:"Message"` - RunningInstances RunningInstance `json:"RunningInstances"` + RunningInstances []RunningInstance `json:"RunningInstances"` } func handleClient(commandClient net.Conn, runninginstances *[]RunningInstance, reportInstanceChan chan ReportInstance) { sizebyte := make([]byte, 4) var commandmap CommandMap + var responseback ResponseBack n, err := io.ReadFull(commandClient, sizebyte) if n != 4 { fmt.Println("Did not read 4 bytes, failure.") @@ -119,20 +121,51 @@ func handleClient(commandClient net.Conn, runninginstances *[]RunningInstance, r switch commandmap.Type { case "status": fmt.Println("Status") - for _, runninginstance := range *runninginstances { - fmt.Println(runninginstance) - } + responseback.Message = "Ok" case "add": fmt.Println("Add instance: " + commandmap.Endpoint) var q ReportInstance q.from = "" q.endpoint = commandmap.Endpoint q.status = NEW_INSTANCE + reportInstanceChan <- q + + responseback.Message = "Added " + commandmap.Endpoint + case "suspend": + fmt.Println("Suspend") + case "resume": + fmt.Println("Resume") default: fmt.Println("Something else") } + + responseback.Type = "status" + responseback.RunningInstances = *runninginstances + + fmt.Println(responseback) + + responsebytes, err := json.Marshal(responseback) + if err != nil { + fmt.Println("Error: ", err) + os.Exit(1) + } + + n = len(responsebytes) + binary.LittleEndian.PutUint32(sizebyte, uint32(n)) + + fmt.Println(sizebyte) + commandClient.Write(sizebyte) + + responsebyte, err := json.Marshal(responseback) + if err != nil { + fmt.Println("Error response back: ", err) + os.Exit(1) + } + + commandClient.Write(responsebyte) + commandClient.Close() } @@ -268,8 +301,8 @@ func DeferPollRun(pollmessage PollMessage, runninginstances *[]RunningInstance, var min_id string for _, runninginstance := range *runninginstances { - if runninginstance.endpoint == pollmessage.from { - min_id = runninginstance.min_id + if runninginstance.Endpoint == pollmessage.from { + min_id = runninginstance.Min_id break } } @@ -309,12 +342,20 @@ func NewInstance(endpoint string, runninginstances *[]RunningInstance, reportIns var nodeinfo NodeInfo // Check node type GetNodeInfo(endpoint, &nodeinfo) + if nodeinfo.Software.Name == "" { + fmt.Println("was blank: ", nodeinfo.Software.Name) + var q ReportInstance + q.from = "" + q.endpoint = endpoint + q.status = INSTANCE_ERROR + reportInstanceChan <- q + } if endpoint == "" { return } for _, runninginstance := range *runninginstances { - if runninginstance.endpoint == endpoint { + if runninginstance.Endpoint == endpoint { return } } @@ -375,9 +416,10 @@ func writePost(pool *pgxpool.Pool, reportpost ReportPost) { } func SuspendInstance(suspendinstance ReportInstance, runninginstances *[]RunningInstance) { - for _, runninginstance := range *runninginstances { - if runninginstance.endpoint == suspendinstance.endpoint { - runninginstance.status = suspendinstance.status + for i, runninginstance := range *runninginstances { + if runninginstance.Endpoint == suspendinstance.endpoint { + fmt.Println("Updated status to ", suspendinstance.status) + (*runninginstances)[i].Status = suspendinstance.status return } } @@ -427,8 +469,8 @@ func main() { go handleClient(c, &runninginstances, reportInstanceChan) case p := <-pollMessageChan: // A poller ended for i, runninginstance := range runninginstances { - if runninginstance.endpoint == p.from { - runninginstances[i].min_id = p.min_id + if runninginstance.Endpoint == p.from { + runninginstances[i].Min_id = p.min_id } } go DeferPollRun(p, &runninginstances, reportInstanceChan, reportPostChan, pollMessageChan) @@ -438,6 +480,7 @@ func main() { if w.status == NEW_INSTANCE { NewInstance(w.endpoint, &runninginstances, reportInstanceChan, reportPostChan, pollMessageChan) } else { + fmt.Println("Error here failure ", w.status) SuspendInstance(w, &runninginstances) } }