more stability and retries

changed min_id to since_id due to bug in mastodon
This commit is contained in:
farhan 2020-11-28 11:42:31 -05:00
parent 53279ff649
commit 911c30b409
2 changed files with 44 additions and 53 deletions

View File

@ -21,72 +21,59 @@ func PollMastodonPleroma(endpoint string, reportPostChan chan ReportPost) {
min_id := "" min_id := ""
http_client := http.Client{} http_client := http.Client{}
parsing_error := 0
unprocess_error := 0
for { for {
ri_mutex.Lock() ri_mutex.Lock()
m := runninginstances[endpoint] m := runninginstances[endpoint]
ri_mutex.Unlock() ri_mutex.Unlock()
api_timeline := "https://" + endpoint + "/api/v1/timelines/public?limit=40&min_id=" + min_id api_timeline := "https://" + endpoint + "/api/v1/timelines/public?limit=40&since_id=" + min_id
resp, err := http_client.Get(api_timeline) resp, err := http_client.Get(api_timeline)
m.LastRun = time.Now().Format("2006.01.02-15:04:05") m.LastRun = time.Now().Format(time.RFC3339)
if err != nil { if err != nil {
ri_mutex.Lock() ri_mutex.Lock()
m.Status = CLIENT_ISSUE m.Status = CLIENT_ISSUE
runninginstances[endpoint] = m runninginstances[endpoint] = m
ri_mutex.Unlock() ri_mutex.Unlock()
log.Fatal("Failure here", err.Error()) log.Print("Failure here", err.Error())
return return
} }
if resp.StatusCode == UNAUTHORIZED || resp.StatusCode == UNPROCESSABLE_ENTITY { if resp.StatusCode == TOOMANYREQUESTS { // Short Delay, 30 seconds
// Apparently you just need to do this to throw away the body log.Print("Delaying " + endpoint + ", gave status ", resp.StatusCode, " 1 hour delay")
_, _ = ioutil.ReadAll(resp.Body) _, _ = ioutil.ReadAll(resp.Body)
resp.Body.Close() // Release as soon as done resp.Body.Close() // Release as soon as done
ri_mutex.Lock() ri_mutex.Lock()
m.Status = UNAUTHORIZED m.Status = resp.StatusCode
runninginstances[endpoint] = m
ri_mutex.Unlock()
return
} else if resp.StatusCode == TOOMANYREQUESTS {
// Apparently you just need to do this to throw away the body
_, _ = ioutil.ReadAll(resp.Body)
resp.Body.Close() // Release as soon as done
ri_mutex.Lock()
m.Status = TOOMANYREQUESTS
runninginstances[endpoint] = m runninginstances[endpoint] = m
ri_mutex.Unlock() ri_mutex.Unlock()
if unprocess_error > 5 {
log.Print("Exiting for " + endpoint)
}
unprocess_error = unprocess_error + 1
time.Sleep(time.Second * 30) time.Sleep(time.Second * 30)
continue continue
} else if resp.StatusCode == FORBIDDEN { } else if resp.StatusCode == UNPROCESSABLE_ENTITY || resp.StatusCode == INTERNAL_ERROR { // Longer delay, 1 hour
// Apparently you just need to do this to throw away the body log.Print("Suspending " + endpoint + ", gave status ", resp.StatusCode, " 1 hour delay")
_, _ = ioutil.ReadAll(resp.Body) _, _ = ioutil.ReadAll(resp.Body)
resp.Body.Close() // Release as soon as done resp.Body.Close() // Release as soon as done
ri_mutex.Lock() ri_mutex.Lock()
m.Status = FORBIDDEN m.Status = 765
runninginstances[endpoint] = m
ri_mutex.Unlock()
time.Sleep(time.Second * 3600)
continue
} else if resp.StatusCode != 200 { // Crash
log.Print("Terminating " + endpoint + ", gave status ", resp.StatusCode)
_, _ = ioutil.ReadAll(resp.Body)
resp.Body.Close() // Release as soon as done
ri_mutex.Lock()
m.Status = resp.StatusCode
runninginstances[endpoint] = m runninginstances[endpoint] = m
ri_mutex.Unlock() ri_mutex.Unlock()
return return
} else if resp.StatusCode == INTERNAL_ERROR {
// Apparently you just need to do this to throw away the body
_, _ = ioutil.ReadAll(resp.Body)
resp.Body.Close() // Release as soon as done
ri_mutex.Lock()
m.Status = FORBIDDEN
runninginstances[endpoint] = m
ri_mutex.Unlock()
time.Sleep(time.Second * 3600)
continue
} else if resp.StatusCode == NOT_FOUND { // 404
// Apparently you just need to do this to throw away the body
_, _ = ioutil.ReadAll(resp.Body)
resp.Body.Close() // Release as soon as done
ri_mutex.Lock()
m.Status = FORBIDDEN
runninginstances[endpoint] = m
ri_mutex.Unlock()
time.Sleep(time.Second * 3600)
continue
} }
err = json.NewDecoder(resp.Body).Decode(&newposts) err = json.NewDecoder(resp.Body).Decode(&newposts)
@ -99,12 +86,16 @@ func PollMastodonPleroma(endpoint string, reportPostChan chan ReportPost) {
fmt.Println(q) fmt.Println(q)
fmt.Println(err) fmt.Println(err)
resp.Body.Close() // Release as soon as done resp.Body.Close() // Release as soon as done
if parsing_error > 5 {
ri_mutex.Lock() ri_mutex.Lock()
m.Status = BAD_RESPONSE m.Status = BAD_RESPONSE
runninginstances[endpoint] = m runninginstances[endpoint] = m
ri_mutex.Unlock() ri_mutex.Unlock()
log.Fatal("Exiting") log.Print("Giving up on " + endpoint)
continue return
}
parsing_error = parsing_error + 1
time.Sleep(time.Second * 30)
} }
resp.Body.Close() // Release as soon as done resp.Body.Close() // Release as soon as done
@ -136,7 +127,6 @@ func PollMastodonPleroma(endpoint string, reportPostChan chan ReportPost) {
newpost.normalized = html.UnescapeString(strings.ToLower(p.Sanitize(newpost.Content))) newpost.normalized = html.UnescapeString(strings.ToLower(p.Sanitize(newpost.Content)))
// Validate time // Validate time
//t, err := time.Parse("2006-01-02T15:04:05.999Z", newpost.Created_at)
t, err := time.Parse(time.RFC3339, newpost.Created_at) t, err := time.Parse(time.RFC3339, newpost.Created_at)
if err != nil { if err != nil {
log.Print("Time was: " + newpost.Created_at) log.Print("Time was: " + newpost.Created_at)
@ -149,8 +139,7 @@ func PollMastodonPleroma(endpoint string, reportPostChan chan ReportPost) {
log.Print("Set to : " + newpost.Created_at) log.Print("Set to : " + newpost.Created_at)
} }
//t, err = time.Parse("2006-01-02T15:04:05.999Z", newpost.Account.Created_at) t, err = time.Parse(time.RFC3339, newpost.Account.Created_at)
t, err = time.Parse(time.RFC3339, newpost.Created_at)
if err != nil { if err != nil {
log.Print("Time was: " + newpost.Account.Created_at) log.Print("Time was: " + newpost.Account.Created_at)
newpost.Account.Created_at = time.Now().Format(time.RFC3339) newpost.Account.Created_at = time.Now().Format(time.RFC3339)
@ -202,22 +191,24 @@ func GetNodeInfo(endpoint string) (NodeInfo) {
if pleromastodon_api_resp.StatusCode == 200 { if pleromastodon_api_resp.StatusCode == 200 {
var nodeinfo NodeInfo var nodeinfo NodeInfo
err = json.NewDecoder(pleromastodon_api_resp.Body).Decode(&nodeinfo) err = json.NewDecoder(pleromastodon_api_resp.Body).Decode(&nodeinfo)
if err != nil { if err == nil {
log.Println("Was not unmarshalled, continuing...") defer pleromastodon_api_resp.Body.Close()
}
return nodeinfo return nodeinfo
} }
}
// Check the front page // Check the front page
index_url := "https://" + endpoint + "/" index_url := "https://" + endpoint + "/"
resp_index, err := http_client.Get(index_url) resp_index, err := http_client.Get(index_url)
if err != nil { if err != nil {
log.Fatal("Error Message 2b:", resp_index.StatusCode, err, endpoint, resp_index.Status) log.Print("Unable to connect to " + endpoint + ", giving up")
return NodeInfo{}
} }
defer resp_index.Body.Close() defer resp_index.Body.Close()
indexbin, err := ioutil.ReadAll(resp_index.Body) indexbin, err := ioutil.ReadAll(resp_index.Body)
if err != nil { if err != nil {
log.Fatal("Error Message 2c:", resp_index.StatusCode, err, endpoint, resp_index.Status) log.Print("Unable to read index of " + endpoint + ", giving up")
return NodeInfo{}
} }
indexstr := string(indexbin) indexstr := string(indexbin)
nodeinfo := NodeInfo{} nodeinfo := NodeInfo{}

View File

@ -19,7 +19,7 @@ CREATE TABLE posts (
normalized text, normalized text,
account_id int NOT NULL REFERENCES accounts (id), account_id int NOT NULL REFERENCES accounts (id),
posthash bytea UNIQUE, posthash bytea UNIQUE,
received_at timestamptz DEFAULT NOW(), received_at timestamptz DEFAULT NOW()
); );
CREATE TABLE instances ( CREATE TABLE instances (