From ab8e0182bb020b63f38aee0d5ba63e4fe3a9489c Mon Sep 17 00:00:00 2001 From: Farhan Khan Date: Tue, 17 Nov 2020 18:28:59 -0500 Subject: [PATCH] blood sweat and tears later, memory leak is gone restructured how the engine works some communication is through a shared object, perhaps I will revisit this problem in the future. Until then, this is stable and works... --- client/searchctl.go | 10 +-- poll/cli.go | 21 +++-- poll/headers.go | 12 +-- poll/instance.go | 209 ++++++++++++++++++++++++++++---------------- poll/main.go | 41 +++++---- 5 files changed, 174 insertions(+), 119 deletions(-) diff --git a/client/searchctl.go b/client/searchctl.go index 6186bea..ff60c9e 100644 --- a/client/searchctl.go +++ b/client/searchctl.go @@ -15,18 +15,18 @@ type CommandMap struct { Endpoint string `json:"Endpoint"` } +// Instance's new min_id value type RunningInstance struct { - Endpoint string `json:"endpoint"` Software string `json:"software"` Min_id string - Status int `json:"status"` + Status int `json:"status"` LastRun string `json:"lastrun"` } type ResponseBack struct { Type string `json:"Type"` Message string `json:"Message"` - RunningInstances []RunningInstance `json:"RunningInstances"` + RunningInstances map[string]RunningInstance `json:"RunningInstances"` } func main() { @@ -120,7 +120,7 @@ func main() { case "add": fmt.Println("Add instance") case "status": - for _, runninginstance := range responseback.RunningInstances { + for endpoint, runninginstance := range responseback.RunningInstances { if runninginstance.Status == 0 { fmt.Fprintf(os.Stdout, "New\t") } else if runninginstance.Status == 200 { @@ -134,7 +134,7 @@ func main() { if runninginstance.LastRun == "Queued" { fmt.Fprintf(os.Stdout, "\t\t") } - fmt.Fprintf(os.Stdout, "%s\n", runninginstance.Endpoint) + fmt.Fprintf(os.Stdout, "%s\n", endpoint) } } } diff --git a/poll/cli.go b/poll/cli.go index 21291b6..ff72c8f 100644 --- a/poll/cli.go +++ b/poll/cli.go @@ -10,7 +10,7 @@ import ( "os" ) -func handleClient(commandClient net.Conn, runninginstances *[]RunningInstance, instanceReportChan chan InstanceReport) { +func handleClient(commandClient net.Conn, reportPostChan chan ReportPost) { sizebyte := make([]byte, 4) var commandmap CommandMap @@ -47,13 +47,16 @@ func handleClient(commandClient net.Conn, runninginstances *[]RunningInstance, i responseback.Message = "Ok" case "add": log.Print("Manually added instance: " + commandmap.Endpoint) - var q InstanceReport - q.endpoint = commandmap.Endpoint - q.status = NEW_INSTANCE - - instanceReportChan <- q - - responseback.Message = "Added " + commandmap.Endpoint + ri_mutex.Lock() + _, exists := runninginstances[commandmap.Endpoint] + if exists == true { + log.Println("Already exists: " + commandmap.Endpoint) + } else { + runninginstances[commandmap.Endpoint] = RunningInstance{} + } + ri_mutex.Unlock() + go StartInstance(commandmap.Endpoint, reportPostChan) + responseback.Message = "Already exists: " + commandmap.Endpoint case "suspend": fmt.Println("Suspend") case "resume": @@ -64,7 +67,7 @@ func handleClient(commandClient net.Conn, runninginstances *[]RunningInstance, i responseback.Type = "status" - responseback.RunningInstances = *runninginstances + responseback.RunningInstances = runninginstances responsebytes, err := json.Marshal(responseback) if err != nil { diff --git a/poll/headers.go b/poll/headers.go index 848e928..bb8eba5 100644 --- a/poll/headers.go +++ b/poll/headers.go @@ -34,18 +34,8 @@ type AccountType struct { Url string `json:"url"` } -// Used to report a new instance to main -type InstanceReport struct { - endpoint string - status int - - min_id string - numposts int -} - // Instance's new min_id value type RunningInstance struct { - Endpoint string `json:"endpoint"` Software string `json:"software"` Min_id string Status int `json:"status"` @@ -69,5 +59,5 @@ type CommandMap struct { type ResponseBack struct { Type string `json:"Type"` Message string `json:"Message"` - RunningInstances []RunningInstance `json:"RunningInstances"` + RunningInstances map[string]RunningInstance `json:"RunningInstances"` } diff --git a/poll/instance.go b/poll/instance.go index c7e3a08..682896a 100644 --- a/poll/instance.go +++ b/poll/instance.go @@ -10,10 +10,10 @@ import ( "html" "time" "fmt" - "os" ) -func DeferPollRun(instancereport InstanceReport, runninginstances *[]RunningInstance, instanceReportChan chan InstanceReport, reportPostChan chan ReportPost) { +/* +func DeferPollRun(instancereport InstanceReport, instanceReportChan chan InstanceReport, reportPostChan chan ReportPost) { delay := 10 if instancereport.status == RUNNING && instancereport.numposts <= 10 { @@ -30,106 +30,157 @@ func DeferPollRun(instancereport InstanceReport, runninginstances *[]RunningInst StartInstancePoll(instancereport, reportPostChan, instanceReportChan) } +*/ -func StartInstancePoll(instancereport InstanceReport, reportPostChan chan ReportPost, instanceReportChan chan InstanceReport) { +func PollMastodonPleroma(endpoint string, reportPostChan chan ReportPost) { + fmt.Println("Arrived at PollMastodonPleroma for " + endpoint) + // Make this a global variable p := bluemonday.NewPolicy() newposts := make([]ReportPost, 0) - // Only placing this here to later have the option of using - // an HTTP client via a SOCKS5 Tor proxy - if strings.Contains(instancereport.endpoint, ".onion") { - instanceReportChan <- InstanceReport{instancereport.endpoint, ONION_PROTOCOL, "", 0} - return - } - - api_timeline := "https://" + instancereport.endpoint + "/api/v1/timelines/public?limit=40&min_id=" + instancereport.min_id - resp, err := http.Get(api_timeline) - if err != nil { - instanceReportChan <- InstanceReport{instancereport.endpoint, CLIENT_ISSUE, "", 0} - return - } - defer resp.Body.Close() - body, err := ioutil.ReadAll(resp.Body) - if err != nil { - instanceReportChan <- InstanceReport{instancereport.endpoint, BAD_RESPONSE, "", 0} - return - } - err = json.Unmarshal(body, &newposts) - if err != nil { - instanceReportChan <- InstanceReport{instancereport.endpoint, BAD_RESPONSE, "", 0} - return - } - - newinstances := make([]string, 0) min_id := "" - numposts := 0 - for _, newpost := range newposts { - if newpost.Account.Acct == "" { - continue + + + http_client := http.Client{Timeout: 5 * time.Second} + + for { + + api_timeline := "https://" + endpoint + "/api/v1/timelines/public?limit=40&min_id=" + min_id + fmt.Println(api_timeline) + numposts := 0 + // newinstances := make([]string, 0) + + resp, err := http_client.Get(api_timeline) + if err != nil { + ri_mutex.Lock() + var m = runninginstances[endpoint] + m.Status = CLIENT_ISSUE + runninginstances[endpoint] = m + ri_mutex.Unlock() + return } - sendpost := newpost - - posthash := sha1.New() - - at_sign := strings.Index(sendpost.Account.Acct, "@") - - if at_sign == -1 { - at_sign = len(sendpost.Account.Acct) - sendpost.Account.Acct += "@" + instancereport.endpoint + body, err := ioutil.ReadAll(resp.Body) + if err != nil { + ri_mutex.Lock() + var m = runninginstances[endpoint] + m.Status = BAD_RESPONSE + runninginstances[endpoint] = m + ri_mutex.Unlock() + return + } + err = json.Unmarshal(body, &newposts) + if err != nil { + ri_mutex.Lock() + var m = runninginstances[endpoint] + m.Status = BAD_RESPONSE + runninginstances[endpoint] = m + ri_mutex.Unlock() + return } - // Calculate the post hash - fmt.Fprint(posthash, sendpost.Url) - fmt.Fprint(posthash, sendpost.normalized) - fmt.Fprint(posthash, sendpost.Account.Acct) - fmt.Fprint(posthash, sendpost.Account.Display_name) - sendpost.posthash = posthash.Sum(nil) + resp.Body.Close() - sendpost.normalized = html.UnescapeString(strings.ToLower(p.Sanitize(sendpost.Content))) - reportPostChan <- sendpost + for _, newpost := range newposts { + if newpost.Account.Acct == "" { + continue + } + posthash := sha1.New() - // Check min_id - if sendpost.Id > min_id { - min_id = sendpost.Id + at_sign := strings.Index(newpost.Account.Acct, "@") + + if at_sign == -1 { + at_sign = len(newpost.Account.Acct) + newpost.Account.Acct += "@" + endpoint + } + + // Calculate the post hash + fmt.Fprint(posthash, newpost.Url) + fmt.Fprint(posthash, newpost.normalized) + fmt.Fprint(posthash, newpost.Account.Acct) + fmt.Fprint(posthash, newpost.Account.Display_name) + newpost.posthash = posthash.Sum(nil) + + newpost.normalized = html.UnescapeString(strings.ToLower(p.Sanitize(newpost.Content))) + + reportPostChan <- newpost + + // Check min_id + if newpost.Id > min_id { + min_id = newpost.Id + } + numposts = numposts + 1 + + newinstance := newpost.Account.Acct[at_sign+1:] + ri_mutex.Lock() + _, exists := runninginstances[newinstance] + if exists == false { + m := RunningInstance{} + runninginstances[newinstance] = m + go StartInstance(newinstance, reportPostChan) + } + + ri_mutex.Unlock() } - numposts = numposts + 1 - newinstance := sendpost.Account.Acct[at_sign+1:] - newinstances = AppendIfMissing(newinstances, newinstance) + + time.Sleep(time.Second * 10) } - for _, newinstance := range newinstances { - var q InstanceReport - q.endpoint = newinstance - q.status = NEW_INSTANCE - q.min_id = "" - q.numposts = 0 - instanceReportChan <- q - } - - instanceReportChan <- InstanceReport{instancereport.endpoint, resp.StatusCode, min_id, numposts} } // Change this to return a proper "err" -func GetNodeInfo(endpoint string, nodeinfo *NodeInfo) { +func GetNodeInfo(endpoint string) (NodeInfo) { + var nodeinfo NodeInfo api_nodeinfo := "https://" + endpoint + "/nodeinfo/2.0.json" http_client := http.Client{Timeout: 5 * time.Second} resp, err := http_client.Get(api_nodeinfo) if err != nil { - return + fmt.Println("Make a legit error here") + return NodeInfo{} } defer resp.Body.Close() body, err := ioutil.ReadAll(resp.Body) err = json.Unmarshal(body, &nodeinfo) if err != nil { -// fmt.Println("Unmarshal 2"); - return + fmt.Println("Make a legit error here") + fmt.Println("Unmarshal 2"); + return NodeInfo{} } + + return nodeinfo } -func NewInstance(endpoint string, runninginstances *[]RunningInstance, instanceReportChan chan InstanceReport, reportPostChan chan ReportPost) { +func StartInstance(endpoint string, reportPostChan chan ReportPost) { + + // This might not be necessary... +// ri_mutex.Lock() +// _, exists := runninginstances[endpoint] +// fmt.Println("The exists is", exists) +// ri_mutex.Unlock() +// if exists == true { +// return +// } + + nodeinfo := GetNodeInfo(endpoint) + if nodeinfo.Software.Name == "" { + ri_mutex.Lock() + var m = runninginstances[endpoint] + m.Software = "" + runninginstances[endpoint] = m + ri_mutex.Unlock() + return + } + + if nodeinfo.Software.Name == "pleroma" || nodeinfo.Software.Name == "mastodon" { + go PollMastodonPleroma(endpoint, reportPostChan) + } + +} + +/* +func NewInstance(endpoint string, instanceReportChan chan InstanceReport, reportPostChan chan ReportPost) { var nodeinfo NodeInfo if endpoint == "" { @@ -137,7 +188,8 @@ func NewInstance(endpoint string, runninginstances *[]RunningInstance, instanceR } // No repeats - for _, runninginstance := range *runninginstances { + + for _, runninginstance := range runninginstances { if runninginstance.Endpoint == endpoint { return } @@ -155,7 +207,7 @@ func NewInstance(endpoint string, runninginstances *[]RunningInstance, instanceR } newinstance := RunningInstance{endpoint, "", "", NEW_INSTANCE, "Queued"} - *runninginstances = append(*runninginstances, newinstance) + runninginstances = append(runninginstances, newinstance) if nodeinfo.Software.Name == "pleroma" || nodeinfo.Software.Name == "mastodon" { var newinstancereport InstanceReport @@ -166,13 +218,16 @@ func NewInstance(endpoint string, runninginstances *[]RunningInstance, instanceR go StartInstancePoll(newinstancereport, reportPostChan, instanceReportChan) } } +*/ -func SuspendInstance(suspendinstance InstanceReport, runninginstances *[]RunningInstance) { - for i, runninginstance := range *runninginstances { +/* +func SuspendInstance(suspendinstance InstanceReport) { + for i, runninginstance := range runninginstances { if runninginstance.Endpoint == suspendinstance.endpoint { - (*runninginstances)[i].Status = suspendinstance.status - (*runninginstances)[i].LastRun = time.Now().Format("2006.01.02-15:04:05") + (runninginstances)[i].Status = suspendinstance.status + (runninginstances)[i].LastRun = time.Now().Format("2006.01.02-15:04:05") return } } } +*/ diff --git a/poll/main.go b/poll/main.go index bff43b1..0c0ce9f 100644 --- a/poll/main.go +++ b/poll/main.go @@ -5,7 +5,7 @@ import ( _ "net/http/pprof" "net/http" "context" - "time" + "sync" "fmt" "net" "log" @@ -21,7 +21,12 @@ func AppendIfMissing(hay []string, needle string) []string { return append(hay, needle) } +func imdone() { + // fmt.Println("I am done") +} + func writePost(pool *pgxpool.Pool, reportpost ReportPost) { + defer imdone() conn, err := pool.Acquire(context.Background()) if err != nil { log.Fatal("Error connecting to database:", err) @@ -67,14 +72,15 @@ func writePost(pool *pgxpool.Pool, reportpost ReportPost) { } } -func engine() { +// Current instances +var runninginstances map[string]RunningInstance +var ri_mutex = &sync.Mutex{} - // Current instances - runninginstances := make([]RunningInstance, 0) +func engine() { + runninginstances = make(map[string]RunningInstance) // Initial Setup reportPostChan := make(chan ReportPost) - instanceReportChan := make(chan InstanceReport, 200) // Setup Database pool, err := pgxpool.Connect(context.Background(), "postgres://postgres@127.0.0.1/fedilogue") @@ -88,9 +94,7 @@ func engine() { fmt.Println(err) return } - defer l.Close() - - x := 0 +// defer l.Close() commandClient := make(chan net.Conn) @@ -98,8 +102,9 @@ func engine() { for { c, err := l.Accept() if err != nil { - fmt.Println("Error on accept") - commandClient <- nil + fmt.Println("Error on accept", err) + os.Exit(0) +// commandClient <- nil return } commandClient <- c @@ -108,25 +113,26 @@ func engine() { go func() { for{ - v := <-reportPostChan // New Post - fmt.Println(v) + v := <-reportPostChan // New Post +// fmt.Println(v) go writePost(pool, v) } }() go func() { for { - c := <-commandClient // New client connection - go handleClient(c, &runninginstances, instanceReportChan) + c := <-commandClient // New client connection + go handleClient(c, reportPostChan) } }() +/* for { select { case w := <-instanceReportChan: // Start or suspend instance if w.status == NEW_INSTANCE { x = x + 1 - NewInstance(w.endpoint, &runninginstances, instanceReportChan, reportPostChan) + NewInstance(w.endpoint, instanceReportChan, reportPostChan) } else if w.status == RUNNING || w.status == TOOMANYREQUESTS { for i, runninginstance := range runninginstances { if runninginstance.Endpoint == w.endpoint { @@ -135,12 +141,13 @@ func engine() { runninginstances[i].LastRun = time.Now().Format("2006.01.02-15:04:05") } } - go DeferPollRun(w, &runninginstances, instanceReportChan, reportPostChan) + go DeferPollRun(w, instanceReportChan, reportPostChan) } else { - SuspendInstance(w, &runninginstances) + SuspendInstance(w) } } } +*/ } func main() {