fedilogue/engine/poll.go
Farhan Khan a50916eb36 using the same http client, adding it to running instances
some web work
lost track of what's going on and I'm not accountable to anyone...so screw it
2020-12-16 02:41:45 +00:00

204 lines
5.3 KiB
Go

package main
import (
"encoding/json"
"crypto/sha1"
"io/ioutil"
"net/http"
"strings"
"html"
"time"
"fmt"
"log"
)
func PollMastodonPleroma(endpoint string, reportPostChan chan ReportPost, http_client http.Client) {
newposts := make([]ReportPost, 0)
min_id := ""
// http_client := http.Client{}
parsing_error := 0
unprocess_error := 0
use_auth := false
var last_refresh int64
var client_id string
var client_secret string
var oauthData OAuth
var err error
for _, extaccount := range settings.Externalaccounts {
if extaccount.Endpoint == endpoint {
use_auth = true
client_id, client_secret, err = get_client(endpoint, &http_client);
if err != nil {
log.Fatal("Unable to register client: ", err)
}
oauthData, err = oauth_login(endpoint, extaccount.Username, extaccount.Password, client_id, client_secret)
if err != nil {
log.Print("Unable to login: ", err)
return
}
last_refresh = time.Now().Unix()
}
}
for {
ri_mutex.Lock()
m := runninginstances[endpoint]
ri_mutex.Unlock()
api_timeline := "https://" + endpoint + "/api/v1/timelines/public?limit=40&since_id=" + min_id
req, err := http.NewRequest("GET", api_timeline, nil)
if err != nil {
log.Print("Unable to create new request")
return
}
if use_auth == true {
if time.Now().Unix() > last_refresh + oauthData.Expires_in {
oauthData, err = oauth_refresh(endpoint, client_id, client_secret, oauthData.Refresh_token)
if err != nil {
log.Print("Unable to refresh: ", err)
return
}
last_refresh = time.Now().Unix()
}
req.Header.Add("Authorization", oauthData.Access_token)
}
m.LastRun = time.Now().Format(time.RFC3339)
resp, err := http_client.Do(req)
if err != nil {
ri_mutex.Lock()
m.Status = CLIENT_ISSUE
runninginstances[endpoint] = m
ri_mutex.Unlock()
log.Fatal("Failure here", err.Error())
return
}
if resp.StatusCode == TOOMANYREQUESTS { // Short Delay, 30 seconds
log.Print("Delaying " + endpoint + ", gave status ", resp.StatusCode, ", 1 hour delay")
_, _ = ioutil.ReadAll(resp.Body)
resp.Body.Close() // Release as soon as done
ri_mutex.Lock()
m.Status = resp.StatusCode
runninginstances[endpoint] = m
ri_mutex.Unlock()
if unprocess_error > 5 {
log.Print("Exiting for " + endpoint)
}
unprocess_error = unprocess_error + 1
time.Sleep(time.Second * 30)
continue
} else if resp.StatusCode == INTERNAL_ERROR { // Longer delay, 1 hour
log.Print("Suspending " + endpoint + ", gave status ", resp.StatusCode, ", 1 hour delay")
_, _ = ioutil.ReadAll(resp.Body)
resp.Body.Close() // Release as soon as done
ri_mutex.Lock()
m.Status = 765
runninginstances[endpoint] = m
ri_mutex.Unlock()
time.Sleep(time.Second * 3600)
continue
} else if resp.StatusCode != 200 { // Crash
log.Print("Terminating " + endpoint + ", gave status ", resp.StatusCode)
_, _ = ioutil.ReadAll(resp.Body)
resp.Body.Close() // Release as soon as done
ri_mutex.Lock()
m.Status = resp.StatusCode
runninginstances[endpoint] = m
ri_mutex.Unlock()
return
}
err = json.NewDecoder(resp.Body).Decode(&newposts)
if err != nil {
if parsing_error > 5 {
ri_mutex.Lock()
m.Status = BAD_RESPONSE
runninginstances[endpoint] = m
ri_mutex.Unlock()
log.Print("Giving up on " + endpoint)
return
}
parsing_error = parsing_error + 1
time.Sleep(time.Second * 30)
}
resp.Body.Close() // Release as soon as done
ri_mutex.Lock()
m.Status = RUNNING
runninginstances[endpoint] = m
ri_mutex.Unlock()
for _, newpost := range newposts {
if newpost.Account.Acct == "" {
continue
}
posthash := sha1.New()
at_sign := strings.Index(newpost.Account.Acct, "@")
if at_sign == -1 {
at_sign = len(newpost.Account.Acct)
newpost.Account.Acct += "@" + endpoint
}
// Calculate the post hash
fmt.Fprint(posthash, newpost.Uri)
fmt.Fprint(posthash, newpost.normalized)
fmt.Fprint(posthash, newpost.Account.Acct)
fmt.Fprint(posthash, newpost.Account.Display_name)
newpost.posthash = posthash.Sum(nil)
newpost.normalized = html.UnescapeString(strings.ToLower(p.Sanitize(newpost.Content)))
newpost.normalized = strings.ReplaceAll(newpost.normalized, "\t", " ")
newpost.normalized = spaceReg.ReplaceAllString(newpost.normalized, " ")
// Validate time
t, err := time.Parse(time.RFC3339, newpost.Created_at)
if err != nil {
newpost.Created_at = time.Now().Format(time.RFC3339)
}
if t.Unix() < 0 {
newpost.Created_at = time.Now().Format(time.RFC3339)
}
t, err = time.Parse(time.RFC3339, newpost.Account.Created_at)
if err != nil {
newpost.Account.Created_at = time.Now().Format(time.RFC3339)
}
if t.Unix() < 0 {
newpost.Account.Created_at = time.Now().Format(time.RFC3339)
}
reportPostChan <- newpost
// Check min_id
if newpost.Id > min_id {
min_id = newpost.Id
}
// Only done if we are crawling
if settings.Crawl == true && StringExists(endpoint, settings.Banned) == false {
newinstance := newpost.Account.Acct[at_sign+1:]
ri_mutex.Lock()
_, exists := runninginstances[newinstance]
if exists == false {
m := RunningInstance{}
runninginstances[newinstance] = m
go StartInstance(newinstance, reportPostChan)
}
ri_mutex.Unlock()
}
}
time.Sleep(time.Second * 10)
}
}