package main import ( "bufio" "crypto/sha1" "encoding/json" "fmt" "html" "log" "net/http" "strings" "time" ) func StreamMastodon(endpoint string, reportPostChan chan ReportPost) { http_client := http.Client{} var client_id string var client_secret string var oauthData OAuth var err error api_timeline := "https://" + endpoint + "/api/v1/streaming/public" req, err := http.NewRequest("GET", api_timeline, nil) if err != nil { log.Print("Unable to create new request") return } for _, extaccount := range settings.Externalaccounts { if extaccount.Endpoint == endpoint { // use_auth = true get_client(endpoint, &http_client) client_id, client_secret, err = get_client(endpoint, &http_client) if err != nil { log.Fatal("Unable to register client: ", err) } oauthData, err = oauth_login(endpoint, extaccount.Username, extaccount.Password, client_id, client_secret) if err != nil { log.Print("Unable to login: ", err) return } // This needs to updated with the time // last_refresh := time.Now().Unix() _ = time.Now().Unix() req.Header.Add("Authorization", oauthData.Access_token) } } resp, err := http_client.Do(req) if err != nil { log.Fatal("Error occured for " + api_timeline) } defer resp.Body.Close() ri_mutex.Lock() m := runninginstances[endpoint] m.Status = RUNNING m.LastRun = "Streaming" runninginstances[endpoint] = m ri_mutex.Unlock() s := bufio.NewScanner(resp.Body) var name string for s.Scan() { line := s.Text() token := strings.SplitN(line, ":", 2) var newpost ReportPost if len(token) != 2 { continue } switch strings.TrimSpace(token[0]) { case "event": name = strings.TrimSpace(token[1]) continue case "data": switch name { case "update": jsoner := token[1][1:] err := json.Unmarshal([]byte(jsoner), &newpost) if err != nil { log.Fatal("Unable to unmarshal with error: ", err) } default: continue } default: continue } if newpost.Account.Acct == "" { continue } at_sign := strings.Index(newpost.Account.Acct, "@") newinstance := newpost.Account.Acct[at_sign+1:] // Trust the post if it comes from the same source if newinstance != endpoint { ri_mutex.Lock() o, exist := runninginstances[newinstance] ri_mutex.Unlock() if exist == false { o := RunningInstance{} new_client := http.Client{} o.client = new_client o.Status = KEEPALIVE ri_mutex.Lock() runninginstances[newinstance] = o ri_mutex.Unlock() } realuser, err := fetch_user_info(o.client, newpost.Account.Url) if err != nil { continue } realpost, err := fetch_post(o.client, newpost.Uri) if err != nil { continue } // Minor verification for now... newpost.Account.Display_name = realuser.Name newpost.Content = realpost.Content newpost.Created_at = realpost.Published } posthash := sha1.New() if at_sign == -1 { at_sign = len(newpost.Account.Acct) newpost.Account.Acct += "@" + endpoint } // Calculate the post hash fmt.Fprint(posthash, newpost.Uri) fmt.Fprint(posthash, newpost.normalized) fmt.Fprint(posthash, newpost.Account.Acct) fmt.Fprint(posthash, newpost.Account.Display_name) newpost.posthash = posthash.Sum(nil) newpost.normalized = html.UnescapeString(strings.ToLower(p.Sanitize(newpost.Content))) newpost.normalized = strings.ReplaceAll(newpost.normalized, "\t", " ") newpost.normalized = spaceReg.ReplaceAllString(newpost.normalized, " ") // Validate time t, err := time.Parse(time.RFC3339, newpost.Created_at) if err != nil { newpost.Created_at = time.Now().Format(time.RFC3339) } if t.Unix() < 0 { newpost.Created_at = time.Now().Format(time.RFC3339) } t, err = time.Parse(time.RFC3339, newpost.Account.Created_at) if err != nil { newpost.Account.Created_at = time.Now().Format(time.RFC3339) } if t.Unix() < 0 { newpost.Account.Created_at = time.Now().Format(time.RFC3339) } // Reporting post reportPostChan <- newpost if settings.Crawl == true && stringexists(endpoint, settings.Banned) == false { ri_mutex.Lock() o, exists := runninginstances[newinstance] if exists == false || o.Status == KEEPALIVE { m := RunningInstance{} runninginstances[newinstance] = m go StartInstance(newinstance, reportPostChan) } ri_mutex.Unlock() } } ri_mutex.Lock() m = runninginstances[endpoint] m.LastRun = time.Now().Format(time.RFC3339) m.Status = STREAM_ENDED runninginstances[endpoint] = m ri_mutex.Unlock() }