From 911c30b4090108f3c1b6080ec535fa364fce4138 Mon Sep 17 00:00:00 2001 From: Farhan Khan Date: Sat, 28 Nov 2020 11:42:31 -0500 Subject: [PATCH] more stability and retries changed min_id to since_id due to bug in mastodon --- poll/instance.go | 95 ++++++++++++++++++++++-------------------------- poll/tables.sql | 2 +- 2 files changed, 44 insertions(+), 53 deletions(-) diff --git a/poll/instance.go b/poll/instance.go index 6ef7bc4..d9e62b4 100644 --- a/poll/instance.go +++ b/poll/instance.go @@ -21,72 +21,59 @@ func PollMastodonPleroma(endpoint string, reportPostChan chan ReportPost) { min_id := "" http_client := http.Client{} + parsing_error := 0 + unprocess_error := 0 for { ri_mutex.Lock() m := runninginstances[endpoint] ri_mutex.Unlock() - api_timeline := "https://" + endpoint + "/api/v1/timelines/public?limit=40&min_id=" + min_id + api_timeline := "https://" + endpoint + "/api/v1/timelines/public?limit=40&since_id=" + min_id resp, err := http_client.Get(api_timeline) - m.LastRun = time.Now().Format("2006.01.02-15:04:05") + m.LastRun = time.Now().Format(time.RFC3339) if err != nil { ri_mutex.Lock() m.Status = CLIENT_ISSUE runninginstances[endpoint] = m ri_mutex.Unlock() - log.Fatal("Failure here", err.Error()) + log.Print("Failure here", err.Error()) return } - if resp.StatusCode == UNAUTHORIZED || resp.StatusCode == UNPROCESSABLE_ENTITY { - // Apparently you just need to do this to throw away the body + if resp.StatusCode == TOOMANYREQUESTS { // Short Delay, 30 seconds + log.Print("Delaying " + endpoint + ", gave status ", resp.StatusCode, " 1 hour delay") _, _ = ioutil.ReadAll(resp.Body) resp.Body.Close() // Release as soon as done ri_mutex.Lock() - m.Status = UNAUTHORIZED - runninginstances[endpoint] = m - ri_mutex.Unlock() - return - } else if resp.StatusCode == TOOMANYREQUESTS { - // Apparently you just need to do this to throw away the body - _, _ = ioutil.ReadAll(resp.Body) - resp.Body.Close() // Release as soon as done - ri_mutex.Lock() - m.Status = TOOMANYREQUESTS + m.Status = resp.StatusCode runninginstances[endpoint] = m ri_mutex.Unlock() + if unprocess_error > 5 { + log.Print("Exiting for " + endpoint) + } + unprocess_error = unprocess_error + 1 time.Sleep(time.Second * 30) continue - } else if resp.StatusCode == FORBIDDEN { - // Apparently you just need to do this to throw away the body + } else if resp.StatusCode == UNPROCESSABLE_ENTITY || resp.StatusCode == INTERNAL_ERROR { // Longer delay, 1 hour + log.Print("Suspending " + endpoint + ", gave status ", resp.StatusCode, " 1 hour delay") _, _ = ioutil.ReadAll(resp.Body) resp.Body.Close() // Release as soon as done ri_mutex.Lock() - m.Status = FORBIDDEN + m.Status = 765 + runninginstances[endpoint] = m + ri_mutex.Unlock() + time.Sleep(time.Second * 3600) + continue + } else if resp.StatusCode != 200 { // Crash + log.Print("Terminating " + endpoint + ", gave status ", resp.StatusCode) + _, _ = ioutil.ReadAll(resp.Body) + resp.Body.Close() // Release as soon as done + ri_mutex.Lock() + m.Status = resp.StatusCode runninginstances[endpoint] = m ri_mutex.Unlock() return - } else if resp.StatusCode == INTERNAL_ERROR { - // Apparently you just need to do this to throw away the body - _, _ = ioutil.ReadAll(resp.Body) - resp.Body.Close() // Release as soon as done - ri_mutex.Lock() - m.Status = FORBIDDEN - runninginstances[endpoint] = m - ri_mutex.Unlock() - time.Sleep(time.Second * 3600) - continue - } else if resp.StatusCode == NOT_FOUND { // 404 - // Apparently you just need to do this to throw away the body - _, _ = ioutil.ReadAll(resp.Body) - resp.Body.Close() // Release as soon as done - ri_mutex.Lock() - m.Status = FORBIDDEN - runninginstances[endpoint] = m - ri_mutex.Unlock() - time.Sleep(time.Second * 3600) - continue } err = json.NewDecoder(resp.Body).Decode(&newposts) @@ -99,12 +86,16 @@ func PollMastodonPleroma(endpoint string, reportPostChan chan ReportPost) { fmt.Println(q) fmt.Println(err) resp.Body.Close() // Release as soon as done - ri_mutex.Lock() - m.Status = BAD_RESPONSE - runninginstances[endpoint] = m - ri_mutex.Unlock() - log.Fatal("Exiting") - continue + if parsing_error > 5 { + ri_mutex.Lock() + m.Status = BAD_RESPONSE + runninginstances[endpoint] = m + ri_mutex.Unlock() + log.Print("Giving up on " + endpoint) + return + } + parsing_error = parsing_error + 1 + time.Sleep(time.Second * 30) } resp.Body.Close() // Release as soon as done @@ -136,7 +127,6 @@ func PollMastodonPleroma(endpoint string, reportPostChan chan ReportPost) { newpost.normalized = html.UnescapeString(strings.ToLower(p.Sanitize(newpost.Content))) // Validate time - //t, err := time.Parse("2006-01-02T15:04:05.999Z", newpost.Created_at) t, err := time.Parse(time.RFC3339, newpost.Created_at) if err != nil { log.Print("Time was: " + newpost.Created_at) @@ -149,8 +139,7 @@ func PollMastodonPleroma(endpoint string, reportPostChan chan ReportPost) { log.Print("Set to : " + newpost.Created_at) } - //t, err = time.Parse("2006-01-02T15:04:05.999Z", newpost.Account.Created_at) - t, err = time.Parse(time.RFC3339, newpost.Created_at) + t, err = time.Parse(time.RFC3339, newpost.Account.Created_at) if err != nil { log.Print("Time was: " + newpost.Account.Created_at) newpost.Account.Created_at = time.Now().Format(time.RFC3339) @@ -202,22 +191,24 @@ func GetNodeInfo(endpoint string) (NodeInfo) { if pleromastodon_api_resp.StatusCode == 200 { var nodeinfo NodeInfo err = json.NewDecoder(pleromastodon_api_resp.Body).Decode(&nodeinfo) - if err != nil { - log.Println("Was not unmarshalled, continuing...") + if err == nil { + defer pleromastodon_api_resp.Body.Close() + return nodeinfo } - return nodeinfo } // Check the front page index_url := "https://" + endpoint + "/" resp_index, err := http_client.Get(index_url) if err != nil { - log.Fatal("Error Message 2b:", resp_index.StatusCode, err, endpoint, resp_index.Status) + log.Print("Unable to connect to " + endpoint + ", giving up") + return NodeInfo{} } defer resp_index.Body.Close() indexbin, err := ioutil.ReadAll(resp_index.Body) if err != nil { - log.Fatal("Error Message 2c:", resp_index.StatusCode, err, endpoint, resp_index.Status) + log.Print("Unable to read index of " + endpoint + ", giving up") + return NodeInfo{} } indexstr := string(indexbin) nodeinfo := NodeInfo{} diff --git a/poll/tables.sql b/poll/tables.sql index 3bf54d8..c32d6e7 100644 --- a/poll/tables.sql +++ b/poll/tables.sql @@ -19,7 +19,7 @@ CREATE TABLE posts ( normalized text, account_id int NOT NULL REFERENCES accounts (id), posthash bytea UNIQUE, - received_at timestamptz DEFAULT NOW(), + received_at timestamptz DEFAULT NOW() ); CREATE TABLE instances (