package main import ( "encoding/json" "crypto/sha1" "io/ioutil" "net/http" "strings" "html" "time" "fmt" "log" ) func PollMastodonPleroma(endpoint string, reportPostChan chan ReportPost) { newposts := make([]ReportPost, 0) min_id := "" http_client := http.Client{} parsing_error := 0 unprocess_error := 0 use_auth := false var last_refresh int64 var client_id string var client_secret string var oauthData OAuth var err error for _, extaccount := range settings.Externalaccounts { if extaccount.Endpoint == endpoint { use_auth = true register_client(endpoint, &http_client) client_id, client_secret, err = register_client(endpoint, &http_client); if err != nil { log.Fatal("Unable to register client: ", err) } oauthData, err = oauth_login(endpoint, extaccount.Username, extaccount.Password, client_id, client_secret) if err != nil { log.Print("Unable to login: ", err) return } last_refresh = time.Now().Unix() } } for { ri_mutex.Lock() m := runninginstances[endpoint] ri_mutex.Unlock() api_timeline := "https://" + endpoint + "/api/v1/timelines/public?limit=40&since_id=" + min_id req, err := http.NewRequest("GET", api_timeline, nil) if err != nil { log.Print("Unable to create new request") return } if use_auth == true { if time.Now().Unix() > last_refresh + oauthData.Expires_in { oauthData, err = oauth_refresh(endpoint, client_id, client_secret, oauthData.Refresh_token) if err != nil { log.Print("Unable to refresh: ", err) return } last_refresh = time.Now().Unix() } req.Header.Add("Authorization", oauthData.Access_token) } m.LastRun = time.Now().Format(time.RFC3339) resp, err := http_client.Do(req) if err != nil { ri_mutex.Lock() m.Status = CLIENT_ISSUE runninginstances[endpoint] = m ri_mutex.Unlock() log.Fatal("Failure here", err.Error()) return } if resp.StatusCode == TOOMANYREQUESTS { // Short Delay, 30 seconds log.Print("Delaying " + endpoint + ", gave status ", resp.StatusCode, ", 1 hour delay") _, _ = ioutil.ReadAll(resp.Body) resp.Body.Close() // Release as soon as done ri_mutex.Lock() m.Status = resp.StatusCode runninginstances[endpoint] = m ri_mutex.Unlock() if unprocess_error > 5 { log.Print("Exiting for " + endpoint) } unprocess_error = unprocess_error + 1 time.Sleep(time.Second * 30) continue } else if resp.StatusCode == INTERNAL_ERROR { // Longer delay, 1 hour log.Print("Suspending " + endpoint + ", gave status ", resp.StatusCode, ", 1 hour delay") _, _ = ioutil.ReadAll(resp.Body) resp.Body.Close() // Release as soon as done ri_mutex.Lock() m.Status = 765 runninginstances[endpoint] = m ri_mutex.Unlock() time.Sleep(time.Second * 3600) continue } else if resp.StatusCode != 200 { // Crash log.Print("Terminating " + endpoint + ", gave status ", resp.StatusCode) _, _ = ioutil.ReadAll(resp.Body) resp.Body.Close() // Release as soon as done ri_mutex.Lock() m.Status = resp.StatusCode runninginstances[endpoint] = m ri_mutex.Unlock() return } err = json.NewDecoder(resp.Body).Decode(&newposts) if err != nil { if parsing_error > 5 { ri_mutex.Lock() m.Status = BAD_RESPONSE runninginstances[endpoint] = m ri_mutex.Unlock() log.Print("Giving up on " + endpoint) return } parsing_error = parsing_error + 1 time.Sleep(time.Second * 30) } resp.Body.Close() // Release as soon as done ri_mutex.Lock() m.Status = RUNNING runninginstances[endpoint] = m ri_mutex.Unlock() for _, newpost := range newposts { if newpost.Account.Acct == "" { continue } posthash := sha1.New() at_sign := strings.Index(newpost.Account.Acct, "@") if at_sign == -1 { at_sign = len(newpost.Account.Acct) newpost.Account.Acct += "@" + endpoint } // Calculate the post hash fmt.Fprint(posthash, newpost.Url) fmt.Fprint(posthash, newpost.normalized) fmt.Fprint(posthash, newpost.Account.Acct) fmt.Fprint(posthash, newpost.Account.Display_name) newpost.posthash = posthash.Sum(nil) newpost.normalized = html.UnescapeString(strings.ToLower(p.Sanitize(newpost.Content))) newpost.normalized = strings.ReplaceAll(newpost.normalized, "\t", " ") newpost.normalized = spaceReg.ReplaceAllString(newpost.normalized, " ") // Validate time t, err := time.Parse(time.RFC3339, newpost.Created_at) if err != nil { newpost.Created_at = time.Now().Format(time.RFC3339) } if t.Unix() < 0 { newpost.Created_at = time.Now().Format(time.RFC3339) } t, err = time.Parse(time.RFC3339, newpost.Account.Created_at) if err != nil { newpost.Account.Created_at = time.Now().Format(time.RFC3339) } if t.Unix() < 0 { newpost.Account.Created_at = time.Now().Format(time.RFC3339) } reportPostChan <- newpost // Check min_id if newpost.Id > min_id { min_id = newpost.Id } // Only done if we are crawling if settings.Crawl == true || StringExists(endpoint, settings.Banned) == false { newinstance := newpost.Account.Acct[at_sign+1:] ri_mutex.Lock() _, exists := runninginstances[newinstance] if exists == false { m := RunningInstance{} runninginstances[newinstance] = m go StartInstance(newinstance, reportPostChan) } ri_mutex.Unlock() } } time.Sleep(time.Second * 10) } }