From 4050e336c3a96d720daeb8e446fafe677f4c9549 Mon Sep 17 00:00:00 2001 From: Farhan Khan Date: Sat, 31 Oct 2020 04:24:53 +0000 Subject: [PATCH] Removed Poll channel, merged into InstanceReport channel Found a bug where the main routine was sending to a channel it also read from. If full, this resulted in the main channel blocking Added pperf, removed Peer "hunting" code --- poll/engine.go | 217 ++++++++++++++++++++++++------------------------- 1 file changed, 106 insertions(+), 111 deletions(-) diff --git a/poll/engine.go b/poll/engine.go index 43d42b2..954f9fd 100644 --- a/poll/engine.go +++ b/poll/engine.go @@ -3,6 +3,7 @@ package main import ( "github.com/microcosm-cc/bluemonday" "github.com/jackc/pgx/pgxpool" + _ "net/http/pprof" "encoding/binary" "encoding/json" "crypto/sha1" @@ -20,21 +21,16 @@ import ( ) const ( - NEW_INSTANCE = 0 INSTANCE_ERROR = -1 + NEW_INSTANCE = 0 + RUNNING = 200 + TOOMANYREQUESTS = 429 CLIENT_ISSUE = 600 ONION_PROTOCOL = 601 UNMARSHAL_ERROR = 602 NO_CONNECTION = 603 ) -type PollMessage struct { - from string - status int - min_id string - numposts int -} - // Parsing Unmarshal JSON type type ReportPost struct { @@ -60,10 +56,12 @@ type AccountType struct { } // Used to report a new instance to main -type ReportInstance struct { - from string +type InstanceReport struct { endpoint string status int + + min_id string + numposts int } // Instance's new min_id value @@ -94,7 +92,7 @@ type ResponseBack struct { RunningInstances []RunningInstance `json:"RunningInstances"` } -func handleClient(commandClient net.Conn, runninginstances *[]RunningInstance, reportInstanceChan chan ReportInstance) { +func handleClient(commandClient net.Conn, runninginstances *[]RunningInstance, instanceReportChan chan InstanceReport) { sizebyte := make([]byte, 4) var commandmap CommandMap @@ -120,16 +118,14 @@ func handleClient(commandClient net.Conn, runninginstances *[]RunningInstance, r switch commandmap.Type { case "status": - fmt.Println("Status") responseback.Message = "Ok" case "add": fmt.Println("Add instance: " + commandmap.Endpoint) - var q ReportInstance - q.from = "" + var q InstanceReport q.endpoint = commandmap.Endpoint q.status = NEW_INSTANCE - reportInstanceChan <- q + instanceReportChan <- q responseback.Message = "Added " + commandmap.Endpoint case "suspend": @@ -144,8 +140,6 @@ func handleClient(commandClient net.Conn, runninginstances *[]RunningInstance, r responseback.Type = "status" responseback.RunningInstances = *runninginstances - fmt.Println(responseback) - responsebytes, err := json.Marshal(responseback) if err != nil { fmt.Println("Error: ", err) @@ -155,7 +149,6 @@ func handleClient(commandClient net.Conn, runninginstances *[]RunningInstance, r n = len(responsebytes) binary.LittleEndian.PutUint32(sizebyte, uint32(n)) - fmt.Println(sizebyte) commandClient.Write(sizebyte) responsebyte, err := json.Marshal(responseback) @@ -165,7 +158,6 @@ func handleClient(commandClient net.Conn, runninginstances *[]RunningInstance, r } commandClient.Write(responsebyte) - commandClient.Close() } @@ -189,40 +181,34 @@ func AppendIfMissing(hay []string, needle string) []string { return append(hay, needle) } -func StartInstancePoll(endpoint string, min_id string, reportPostChan chan ReportPost, pollMessageChan chan PollMessage, reportInstanceChan chan ReportInstance) { +func StartInstancePoll(instancereport InstanceReport, reportPostChan chan ReportPost, instanceReportChan chan InstanceReport) { p := bluemonday.NewPolicy() newposts := make([]ReportPost, 0) - // Only placing this here to later have the option of using an HTTP client via a SOCKS5 Tor proxy - if strings.Contains(endpoint, ".onion") == true { - reportInstanceChan <- ReportInstance{endpoint, endpoint, ONION_PROTOCOL} + // Only placing this here to later have the option of using + // an HTTP client via a SOCKS5 Tor proxy + if strings.Contains(instancereport.endpoint, ".onion") == true { + instanceReportChan <- InstanceReport{instancereport.endpoint, ONION_PROTOCOL, "", 0} return } - api_timeline := "https://" + endpoint + "/api/v1/timelines/public?min_id=" + min_id + api_timeline := "https://" + instancereport.endpoint + "/api/v1/timelines/public?min_id=" + instancereport.min_id resp, err := http.Get(api_timeline) if err != nil { - reportInstanceChan <- ReportInstance{endpoint, endpoint, CLIENT_ISSUE} + instanceReportChan <- InstanceReport{instancereport.endpoint, CLIENT_ISSUE, "", 0} return } body, err := ioutil.ReadAll(resp.Body) err = json.Unmarshal(body, &newposts) if err != nil { - fmt.Println("Unmarshal 3"); - // Perhaps get rid of this if-condition? - if resp.StatusCode >= 400 && resp.StatusCode < 500 { - reportInstanceChan <- ReportInstance{endpoint, endpoint, resp.StatusCode} - } else if resp.StatusCode >= 500 && resp.StatusCode < 600 { - reportInstanceChan <- ReportInstance{endpoint, endpoint, resp.StatusCode} - } else { - reportInstanceChan <- ReportInstance{endpoint, endpoint, UNMARSHAL_ERROR} - } - //log.Fatal(err) +// instanceReportChan <- InstanceReport{instancereport.endpoint, resp.StatusCode, "", 0} + instanceReportChan <- InstanceReport{instancereport.endpoint, 999, "", 0} return } newinstances := make([]string, 0) + min_id := "" numposts := 0 for _, newpost := range newposts { posthash := sha1.New() @@ -231,7 +217,7 @@ func StartInstancePoll(endpoint string, min_id string, reportPostChan chan Repor if at_sign == -1 { at_sign = len(newpost.Account.Acct) - newpost.Account.Acct += "@" + endpoint + newpost.Account.Acct += "@" + instancereport.endpoint } // Calculate the post hash @@ -252,28 +238,29 @@ func StartInstancePoll(endpoint string, min_id string, reportPostChan chan Repor numposts = numposts + 1 newinstance := newpost.Account.Acct[at_sign+1:] - newinstances = AppendIfMissing(newinstances, newinstance) } for _, newinstance := range newinstances { - var q ReportInstance - q.from = endpoint + var q InstanceReport q.endpoint = newinstance q.status = NEW_INSTANCE - reportInstanceChan <- q + q.min_id = "" + q.numposts = 0 + instanceReportChan <- q } - pollMessageChan <- PollMessage{endpoint, resp.StatusCode, min_id, numposts} + instanceReportChan <- InstanceReport{instancereport.endpoint, resp.StatusCode, min_id, numposts} } -func StartGetPeers(endpoint string, reportInstanceChan chan ReportInstance) { +/* +func StartGetPeers(endpoint string, instanceReportChan chan InstanceReport) { var newpeers []string api_peers := "https://" + endpoint + "/api/v1/instance/peers" resp, err := http.Get(api_peers) if err != nil { - reportInstanceChan <- ReportInstance{endpoint, endpoint, NO_CONNECTION} + instanceReportChan <- InstanceReport{endpoint, NO_CONNECTION, "", 0} return // os.Exit(1) } @@ -284,48 +271,42 @@ func StartGetPeers(endpoint string, reportInstanceChan chan ReportInstance) { if err != nil { fmt.Println("Unmarshal 1"); log.Fatal(err) - reportInstanceChan <- ReportInstance{endpoint, endpoint, UNMARSHAL_ERROR} + instanceReportChan <- InstanceReport{endpoint, endpoint, UNMARSHAL_ERROR, "", 0} return } for _, newpeer := range newpeers { - var q ReportInstance - q.from = endpoint + var q InstanceReport q.endpoint = newpeer q.status = NEW_INSTANCE - reportInstanceChan <- q + instanceReportChan <- q } } +*/ -func DeferPollRun(pollmessage PollMessage, runninginstances *[]RunningInstance, reportInstanceChan chan ReportInstance, reportPostChan chan ReportPost, pollMessageChan chan PollMessage) { - var min_id string - - for _, runninginstance := range *runninginstances { - if runninginstance.Endpoint == pollmessage.from { - min_id = runninginstance.Min_id - break - } - } +func DeferPollRun(instancereport InstanceReport, runninginstances *[]RunningInstance, instanceReportChan chan InstanceReport, reportPostChan chan ReportPost) { delay := 10 - if pollmessage.status == 200 && pollmessage.numposts <= 10 { + if instancereport.status == 200 && instancereport.numposts <= 10 { delay = 10 - } else if pollmessage.status == 200 && pollmessage.numposts > 10 { + } else if instancereport.status == 200 && instancereport.numposts > 10 { delay = 15 - } else if pollmessage.status == 429 { + } else if instancereport.status == 429 { delay = 30 } else { - fmt.Println("error, status code is: ", pollmessage.status) + fmt.Println("error, status code is ------------->: ", instancereport.status) + os.Exit(1) } time.Sleep(time.Second * time.Duration(delay)) - go StartInstancePoll(pollmessage.from, min_id, reportPostChan, pollMessageChan, reportInstanceChan) + go StartInstancePoll(instancereport, reportPostChan, instanceReportChan) } // Change this to return a proper "err" func GetNodeInfo(endpoint string, nodeinfo *NodeInfo) { api_nodeinfo := "https://" + endpoint + "/nodeinfo/2.0.json" - resp, err := http.Get(api_nodeinfo) + http_client := http.Client{Timeout: 5 * time.Second} + resp, err := http_client.Get(api_nodeinfo) if err != nil { return } @@ -333,39 +314,49 @@ func GetNodeInfo(endpoint string, nodeinfo *NodeInfo) { body, err := ioutil.ReadAll(resp.Body) err = json.Unmarshal(body, &nodeinfo) if err != nil { - fmt.Println("Unmarshal 2"); +// fmt.Println("Unmarshal 2"); return } } -func NewInstance(endpoint string, runninginstances *[]RunningInstance, reportInstanceChan chan ReportInstance, reportPostChan chan ReportPost, pollMessageChan chan PollMessage) { +func NewInstance(endpoint string, runninginstances *[]RunningInstance, instanceReportChan chan InstanceReport, reportPostChan chan ReportPost) { var nodeinfo NodeInfo - // Check node type - GetNodeInfo(endpoint, &nodeinfo) - if nodeinfo.Software.Name == "" { - fmt.Println("was blank: ", nodeinfo.Software.Name) - var q ReportInstance - q.from = "" - q.endpoint = endpoint - q.status = INSTANCE_ERROR - reportInstanceChan <- q - } if endpoint == "" { return } + + // No repeats for _, runninginstance := range *runninginstances { if runninginstance.Endpoint == endpoint { return } } + + // Check node type + GetNodeInfo(endpoint, &nodeinfo) + if nodeinfo.Software.Name == "" { + go func() { + var q InstanceReport + q.endpoint = endpoint + q.status = INSTANCE_ERROR + instanceReportChan <- q + return + }() + } + newinstance := RunningInstance{endpoint, "", "", NEW_INSTANCE} *runninginstances = append(*runninginstances, newinstance) if nodeinfo.Software.Name == "pleroma" || nodeinfo.Software.Name == "mastodon" { - go StartInstancePoll(endpoint, "", reportPostChan, pollMessageChan, reportInstanceChan) + var newinstancereport InstanceReport + newinstancereport.endpoint = endpoint + newinstancereport.status = 0 + newinstancereport.min_id = "" + newinstancereport.numposts = 0 + go StartInstancePoll(newinstancereport, reportPostChan, instanceReportChan) // fmt.Println("Temporarily disabled Peer Hunting") -// go StartGetPeers(endpoint, reportInstanceChan) +// go StartGetPeers(endpoint, instanceReportChan) } } @@ -415,25 +406,24 @@ func writePost(pool *pgxpool.Pool, reportpost ReportPost) { } } -func SuspendInstance(suspendinstance ReportInstance, runninginstances *[]RunningInstance) { +func SuspendInstance(suspendinstance InstanceReport, runninginstances *[]RunningInstance) { for i, runninginstance := range *runninginstances { if runninginstance.Endpoint == suspendinstance.endpoint { - fmt.Println("Updated status to ", suspendinstance.status) (*runninginstances)[i].Status = suspendinstance.status return } } } -func main() { + +func engine() { // Current instances runninginstances := make([]RunningInstance, 0) // Initial Setup - reportPostChan := make(chan ReportPost, 100) - reportInstanceChan := make(chan ReportInstance, 100) - pollMessageChan := make (chan PollMessage, 100) + reportPostChan := make(chan ReportPost, 2000) + instanceReportChan := make(chan InstanceReport, 100) // Setup Database pool, err := pgxpool.Connect(context.Background(), "postgres://postgres@127.0.0.1/fedilogue") @@ -449,41 +439,46 @@ func main() { } defer l.Close() - for { - commandClient := make(chan net.Conn) - - go func(l net.Listener) { - for { - c, err := l.Accept() - if err != nil { - commandClient <- nil - return - } - commandClient <- c - } - }(l) + commandClient := make(chan net.Conn) + go func(l net.Listener) { for { - select { - case c := <-commandClient: // New client connection - go handleClient(c, &runninginstances, reportInstanceChan) - case p := <-pollMessageChan: // A poller ended + c, err := l.Accept() + if err != nil { + fmt.Println("Error on accept") + commandClient <- nil + return + } + commandClient <- c + } + }(l) + + for { + select { + case c := <-commandClient: // New client connection + go handleClient(c, &runninginstances, instanceReportChan) + case v := <-reportPostChan: // New Post + go writePost(pool, v) + case w := <-instanceReportChan: // Start or suspend instance + if w.status == NEW_INSTANCE { + NewInstance(w.endpoint, &runninginstances, instanceReportChan, reportPostChan) + } else if w.status == RUNNING || w.status == TOOMANYREQUESTS { for i, runninginstance := range runninginstances { - if runninginstance.Endpoint == p.from { - runninginstances[i].Min_id = p.min_id + if runninginstance.Endpoint == w.endpoint { + runninginstances[i].Min_id = w.min_id + runninginstances[i].Status = w.status } } - go DeferPollRun(p, &runninginstances, reportInstanceChan, reportPostChan, pollMessageChan) - case v := <-reportPostChan: // New Post - go writePost(pool, v) - case w := <-reportInstanceChan: // Start or suspend instance - if w.status == NEW_INSTANCE { - NewInstance(w.endpoint, &runninginstances, reportInstanceChan, reportPostChan, pollMessageChan) - } else { - fmt.Println("Error here failure ", w.status) - SuspendInstance(w, &runninginstances) - } + go DeferPollRun(w, &runninginstances, instanceReportChan, reportPostChan) + } else { + SuspendInstance(w, &runninginstances) } } } } + +func main() { + go engine() + log.Println("serving on port 8080") + log.Fatal(http.ListenAndServe(":8080", nil)) +}