From afb17f51eb92fffe68f29843e422471a76669eb4 Mon Sep 17 00:00:00 2001 From: Farhan Khan Date: Fri, 30 Oct 2020 05:10:04 +0000 Subject: [PATCH] added client structure functionality --- client/searchctl.go | 76 +++++++++++++++++++++++++++++++++ poll/engine.go | 100 +++++++++++++++++++++++++++++++++----------- 2 files changed, 151 insertions(+), 25 deletions(-) create mode 100644 client/searchctl.go diff --git a/client/searchctl.go b/client/searchctl.go new file mode 100644 index 0000000..f415f1b --- /dev/null +++ b/client/searchctl.go @@ -0,0 +1,76 @@ +package main + +import ( + "encoding/binary" + "encoding/json" + "flag" + "fmt" + "net" + "os" +) + +type CommandMap struct { + Type string `json:"Type"` + Endpoint string `json:"Endpoint"` +} + +func main() { + + shutdownPtr := flag.Bool("shutdown", false, "Shutdown server") + suspendinstancePtr := flag.String("suspend-instance", "", "Instance to Suspend") + resumeinstancePtr := flag.String("resume-instance", "", "Instance to Resume") + addinstancePtr := flag.String("add-instance", "", "Instance to add") + statusPtr := flag.Bool("status", false, "Check status") +// usernamePtr := flag.String("username", "", "Set username") +// passwordPtr := flag.String("password", "", "Set password") + flag.Parse() + + /* Condition verification */ + totalflags := 0 + var commandMap CommandMap + + if *shutdownPtr == true { + totalflags++ + commandMap.Type = "shutdown" + } + if *statusPtr == true { + totalflags++ + commandMap.Type = "status" + } + if *addinstancePtr != "" { + totalflags++ + commandMap.Type = "add" + commandMap.Endpoint = *addinstancePtr + } + if *suspendinstancePtr != "" { + totalflags++ + commandMap.Type = "suspend" + } + if *resumeinstancePtr != "" { + totalflags++ + commandMap.Type = "resume" + } + if totalflags > 1 { + fmt.Println("Incompatible arguments, exiting.") + os.Exit(1) + } else if totalflags == 0 { + fmt.Println("No options specified, exiting.") + os.Exit(1) + } + + commandByte, err := json.Marshal(commandMap) + if err != nil { + fmt.Println(err) + return + } + c, err := net.Dial("tcp", "127.0.0.1:5555") + if err != nil { + fmt.Println(err) + return + } + a := make([]byte, 4) + i := len(commandByte) + binary.LittleEndian.PutUint32(a, uint32(i)) + c.Write(a) + c.Write(commandByte) +} diff --git a/poll/engine.go b/poll/engine.go index 8602dbd..e6f9824 100644 --- a/poll/engine.go +++ b/poll/engine.go @@ -3,6 +3,7 @@ package main import ( "github.com/microcosm-cc/bluemonday" "github.com/jackc/pgx/pgxpool" + "encoding/binary" "encoding/json" "crypto/sha1" "io/ioutil" @@ -66,10 +67,10 @@ type ReportInstance struct { // Instance's new min_id value type RunningInstance struct { - endpoint string - software string + endpoint string `json:"endpoint"` + software string `json:"software"` min_id string - status int + status int `json:"status"` } type NodeInfoSoftware struct { @@ -81,17 +82,58 @@ type NodeInfo struct { Software NodeInfoSoftware `json:"software"` } -func handleClient(commandClient net.Conn) { - rawCommand := make([]byte, 20) +type CommandMap struct { + Type string `json:"Type"` + Endpoint string `json:"Endpoint"` +} - for { - _, err := io.ReadFull(commandClient, rawCommand) - if err != nil { - fmt.Println(err) - commandClient.Close() - } - fmt.Println(rawCommand) +type ResponseBack struct { + Type string `json:"Type"` + Message string `json:"Message"` + RunningInstances RunningInstance `json:"RunningInstances"` +} + +func handleClient(commandClient net.Conn, runninginstances *[]RunningInstance, reportInstanceChan chan ReportInstance) { + + sizebyte := make([]byte, 4) + var commandmap CommandMap + n, err := io.ReadFull(commandClient, sizebyte) + if n != 4 { + fmt.Println("Did not read 4 bytes, failure.") + os.Exit(1) } + jsonsize := int(binary.LittleEndian.Uint32(sizebyte)) + jsonbyte := make([]byte, jsonsize) + n, err = io.ReadFull(commandClient, jsonbyte) + if n != jsonsize { + fmt.Println("Failued to read json size of ", n) + os.Exit(1) + } + + err = json.Unmarshal(jsonbyte, &commandmap) + if err != nil { + fmt.Println("Unable to unmarshal") + os.Exit(1) + } + + switch commandmap.Type { + case "status": + fmt.Println("Status") + for _, runninginstance := range *runninginstances { + fmt.Println(runninginstance) + } + case "add": + fmt.Println("Add instance: " + commandmap.Endpoint) + var q ReportInstance + q.from = "" + q.endpoint = commandmap.Endpoint + q.status = NEW_INSTANCE + reportInstanceChan <- q + default: + fmt.Println("Something else") + } + + commandClient.Close() } /* @@ -189,8 +231,6 @@ func StartInstancePoll(endpoint string, min_id string, reportPostChan chan Repor reportInstanceChan <- q } - fmt.Println(newinstances) - pollMessageChan <- PollMessage{endpoint, resp.StatusCode, min_id, numposts} } @@ -281,8 +321,6 @@ func NewInstance(endpoint string, runninginstances *[]RunningInstance, reportIns newinstance := RunningInstance{endpoint, "", "", NEW_INSTANCE} *runninginstances = append(*runninginstances, newinstance) - fmt.Println(nodeinfo.Software) - if nodeinfo.Software.Name == "pleroma" || nodeinfo.Software.Name == "mastodon" { go StartInstancePoll(endpoint, "", reportPostChan, pollMessageChan, reportInstanceChan) // fmt.Println("Temporarily disabled Peer Hunting") @@ -303,6 +341,17 @@ func writePost(pool *pgxpool.Pool, reportpost ReportPost) { err = conn.QueryRow(context.Background(), "INSERT INTO accounts (acct, avatar, bot, created_at, display_name, url) VALUES($1, $2, $3, $4, $5, $6) ON CONFLICT(acct) DO UPDATE SET acct=EXCLUDED.acct RETURNING id", reportpost.Account.Acct, reportpost.Account.Avatar, reportpost.Account.Bot, reportpost.Account.Created_at, reportpost.Account.Display_name, reportpost.Account.Url).Scan(&accountid) if err != nil { fmt.Println("First ", err) + fmt.Println("--------------------------") + fmt.Println("Reported error: ", err) + fmt.Println("Account Acct: ", reportpost.Account.Acct) + fmt.Println("Account Avatar: ", reportpost.Account.Avatar) + fmt.Println("Account Avatar len: ", len(reportpost.Account.Avatar)) + fmt.Println("Account Bot: ", reportpost.Account.Bot) + fmt.Println("Account Created_at: ", reportpost.Account.Created_at) + fmt.Println("Account Display: ", reportpost.Account.Display_name) + fmt.Println("Account URL: ", reportpost.Account.Url) + fmt.Println(reportpost) + fmt.Println("--------------------------") os.Exit(1) // For now I want this to die and learn why it failed return } @@ -311,13 +360,21 @@ func writePost(pool *pgxpool.Pool, reportpost ReportPost) { _, err = conn.Exec(context.Background(), "INSERT INTO posts (url, content, created_at, normalized, account_id, posthash) VALUES ($1, $2, $3, $4, $5, $6) ON CONFLICT (posthash) DO NOTHING", reportpost.Url, reportpost.Content, reportpost.Created_at, reportpost.normalized, accountid, reportpost.posthash) if err != nil { // For now I want to know why this failed. fmt.Println("Second ", err) + fmt.Println("--------------------------") + fmt.Println("Reported error: ", err) + fmt.Println("Url: ", reportpost.Url) + fmt.Println("Content: ", reportpost.Content) + fmt.Println("Created_at: ", reportpost.Created_at) + fmt.Println("normalized: ", reportpost.normalized) + fmt.Println("account_id", accountid) + fmt.Println("posthash: ", reportpost.posthash) + fmt.Println("--------------------------") os.Exit(1) // For now I want this to die and learn why it failed return } } func SuspendInstance(suspendinstance ReportInstance, runninginstances *[]RunningInstance) { - fmt.Println("Suspend") for _, runninginstance := range *runninginstances { if runninginstance.endpoint == suspendinstance.endpoint { runninginstance.status = suspendinstance.status @@ -350,12 +407,6 @@ func main() { } defer l.Close() - var q ReportInstance - q.from = "" - q.endpoint = "mastodon.social" - q.status = NEW_INSTANCE - reportInstanceChan <- q - for { commandClient := make(chan net.Conn) @@ -373,7 +424,7 @@ func main() { for { select { case c := <-commandClient: // New client connection - go handleClient(c) + go handleClient(c, &runninginstances, reportInstanceChan) case p := <-pollMessageChan: // A poller ended for i, runninginstance := range runninginstances { if runninginstance.endpoint == p.from { @@ -385,7 +436,6 @@ func main() { go writePost(pool, v) case w := <-reportInstanceChan: // Start or suspend instance if w.status == NEW_INSTANCE { - fmt.Println("NEW INSTANCE: ", w.endpoint) NewInstance(w.endpoint, &runninginstances, reportInstanceChan, reportPostChan, pollMessageChan) } else { SuspendInstance(w, &runninginstances)