package main import ( "github.com/microcosm-cc/bluemonday" "encoding/json" "crypto/sha1" "io/ioutil" "net/http" "strings" "regexp" "html" "time" "fmt" "log" ) var p *bluemonday.Policy var spaceReg *regexp.Regexp func PollMastodonPleroma(endpoint string, reportPostChan chan ReportPost) { newposts := make([]ReportPost, 0) min_id := "" http_client := http.Client{} parsing_error := 0 unprocess_error := 0 use_auth := false var last_refresh int64 var client_id string var client_secret string var oauthData OAuth var err error for _, extaccount := range settings.Externalaccounts { if extaccount.Endpoint == endpoint { use_auth = true register_client(endpoint, &http_client) client_id, client_secret, err = register_client(endpoint, &http_client); if err != nil { log.Fatal("Unable to register client: ", err) } oauthData, err = oauth_login(endpoint, extaccount.Username, extaccount.Password, client_id, client_secret) if err != nil { log.Print("Unable to login: ", err) return } last_refresh = time.Now().Unix() } } for { ri_mutex.Lock() m := runninginstances[endpoint] ri_mutex.Unlock() api_timeline := "https://" + endpoint + "/api/v1/timelines/public?limit=40&since_id=" + min_id req, err := http.NewRequest("GET", api_timeline, nil) if err != nil { log.Print("Unable to create new request") return } if use_auth == true { if time.Now().Unix() > last_refresh + oauthData.Expires_in { oauthData, err = oauth_refresh(endpoint, client_id, client_secret, oauthData.Refresh_token) if err != nil { log.Print("Unable to refresh: ", err) return } last_refresh = time.Now().Unix() } req.Header.Add("Authorization", oauthData.Access_token) } m.LastRun = time.Now().Format(time.RFC3339) resp, err := http_client.Do(req) if err != nil { ri_mutex.Lock() m.Status = CLIENT_ISSUE runninginstances[endpoint] = m ri_mutex.Unlock() log.Fatal("Failure here", err.Error()) return } if resp.StatusCode == TOOMANYREQUESTS { // Short Delay, 30 seconds log.Print("Delaying " + endpoint + ", gave status ", resp.StatusCode, ", 1 hour delay") _, _ = ioutil.ReadAll(resp.Body) resp.Body.Close() // Release as soon as done ri_mutex.Lock() m.Status = resp.StatusCode runninginstances[endpoint] = m ri_mutex.Unlock() if unprocess_error > 5 { log.Print("Exiting for " + endpoint) } unprocess_error = unprocess_error + 1 time.Sleep(time.Second * 30) continue } else if resp.StatusCode == INTERNAL_ERROR { // Longer delay, 1 hour log.Print("Suspending " + endpoint + ", gave status ", resp.StatusCode, ", 1 hour delay") _, _ = ioutil.ReadAll(resp.Body) resp.Body.Close() // Release as soon as done ri_mutex.Lock() m.Status = 765 runninginstances[endpoint] = m ri_mutex.Unlock() time.Sleep(time.Second * 3600) continue } else if resp.StatusCode != 200 { // Crash log.Print("Terminating " + endpoint + ", gave status ", resp.StatusCode) _, _ = ioutil.ReadAll(resp.Body) resp.Body.Close() // Release as soon as done ri_mutex.Lock() m.Status = resp.StatusCode runninginstances[endpoint] = m ri_mutex.Unlock() return } err = json.NewDecoder(resp.Body).Decode(&newposts) if err != nil { if parsing_error > 5 { ri_mutex.Lock() m.Status = BAD_RESPONSE runninginstances[endpoint] = m ri_mutex.Unlock() log.Print("Giving up on " + endpoint) return } parsing_error = parsing_error + 1 time.Sleep(time.Second * 30) } resp.Body.Close() // Release as soon as done ri_mutex.Lock() m.Status = RUNNING runninginstances[endpoint] = m ri_mutex.Unlock() for _, newpost := range newposts { if newpost.Account.Acct == "" { continue } posthash := sha1.New() at_sign := strings.Index(newpost.Account.Acct, "@") if at_sign == -1 { at_sign = len(newpost.Account.Acct) newpost.Account.Acct += "@" + endpoint } // Calculate the post hash fmt.Fprint(posthash, newpost.Url) fmt.Fprint(posthash, newpost.normalized) fmt.Fprint(posthash, newpost.Account.Acct) fmt.Fprint(posthash, newpost.Account.Display_name) newpost.posthash = posthash.Sum(nil) newpost.normalized = html.UnescapeString(strings.ToLower(p.Sanitize(newpost.Content))) newpost.normalized = strings.ReplaceAll(newpost.normalized, "\t", " ") newpost.normalized = spaceReg.ReplaceAllString(newpost.normalized, " ") // Validate time t, err := time.Parse(time.RFC3339, newpost.Created_at) if err != nil { newpost.Created_at = time.Now().Format(time.RFC3339) } if t.Unix() < 0 { newpost.Created_at = time.Now().Format(time.RFC3339) } t, err = time.Parse(time.RFC3339, newpost.Account.Created_at) if err != nil { newpost.Account.Created_at = time.Now().Format(time.RFC3339) } if t.Unix() < 0 { newpost.Account.Created_at = time.Now().Format(time.RFC3339) } reportPostChan <- newpost // Check min_id if newpost.Id > min_id { min_id = newpost.Id } // Only done if we are crawling if settings.Crawl == true || StringExists(endpoint, settings.Banned) == false { newinstance := newpost.Account.Acct[at_sign+1:] ri_mutex.Lock() _, exists := runninginstances[newinstance] if exists == false { m := RunningInstance{} runninginstances[newinstance] = m go StartInstance(newinstance, reportPostChan) } ri_mutex.Unlock() } } time.Sleep(time.Second * 10) } } // Change this to return a proper "err" func GetNodeInfo(endpoint string) (NodeInfo) { /* Checking order * Mastodon/Pleroma * Um..nothing else yet */ pleromastodon_nodeinfo_uri := "https://" + endpoint + "/nodeinfo/2.0.json" //http_client := http.Client{Timeout: 10 * time.Second} http_client := http.Client{} pleromastodon_api_resp, err := http_client.Get(pleromastodon_nodeinfo_uri) if err != nil { return NodeInfo{} } else { defer pleromastodon_api_resp.Body.Close() } if pleromastodon_api_resp.StatusCode == 200 { var nodeinfo NodeInfo err = json.NewDecoder(pleromastodon_api_resp.Body).Decode(&nodeinfo) if err == nil { defer pleromastodon_api_resp.Body.Close() return nodeinfo } } // Check the front page index_uri := "https://" + endpoint + "/" resp_index, err := http_client.Get(index_uri) if err != nil { log.Print("Unable to connect to " + endpoint + ", giving up") return NodeInfo{} } defer resp_index.Body.Close() indexbin, err := ioutil.ReadAll(resp_index.Body) if err != nil { log.Print("Unable to read index of " + endpoint + ", giving up") return NodeInfo{} } indexstr := string(indexbin) nodeinfo := NodeInfo{} if strings.Contains(indexstr, "Pleroma") || strings.Contains(indexstr, "Soapbox") { log.Print("Manual view: Pleroma" + endpoint) nodeinfo.Software.Name = "pleroma" nodeinfo.Software.Version = "guess" } else if strings.Contains(indexstr, "Mastodon") { log.Print("Manual view: Mastodon" + endpoint) nodeinfo.Software.Name = "mastodon" nodeinfo.Software.Version = "guess" } else if strings.Contains(indexstr, "Gab") { log.Print("Manual view: Gab" + endpoint) nodeinfo.Software.Name = "gab" nodeinfo.Software.Version = "guess" } return nodeinfo } func StartInstance(endpoint string, reportPostChan chan ReportPost) { nodeinfo := GetNodeInfo(endpoint) if nodeinfo.Software.Name == "" { var m = runninginstances[endpoint] m.Software = "" m.LastRun = time.Now().Format(time.RFC3339) m.Status = UNSUPPORTED_INSTANCE ri_mutex.Lock() runninginstances[endpoint] = m ri_mutex.Unlock() return } if nodeinfo.Software.Name == "pleroma" || nodeinfo.Software.Name == "mastodon" { log.Print("Starting " + endpoint + " as Mastodon/Pleroma instance") go PollMastodonPleroma(endpoint, reportPostChan) } }