package main import ( "bufio" // "crypto/sha1" "encoding/json" // "fmt" // "html" "log" "net/http" "strings" "time" ) func StreamMastodon(endpoint string, reportPostChan chan ReportPost) { http_client := http.Client{} var client_id string var client_secret string var oauthData OAuth var err error api_timeline := "https://" + endpoint + "/api/v1/streaming/public" req, err := http.NewRequest("GET", api_timeline, nil) if err != nil { log.Print("Unable to create new request") return } for _, extaccount := range settings.Externalaccounts { if extaccount.Endpoint == endpoint { // use_auth = true get_client(endpoint, &http_client) client_id, client_secret, err = get_client(endpoint, &http_client) if err != nil { log.Fatal("Unable to register client: ", err) } oauthData, err = oauth_login(endpoint, extaccount.Username, extaccount.Password, client_id, client_secret) if err != nil { log.Print("Unable to login: ", err) return } // This needs to updated with the time // last_refresh := time.Now().Unix() _ = time.Now().Unix() req.Header.Add("Authorization", oauthData.Access_token) } } resp, err := http_client.Do(req) if err != nil { log.Fatal("Error occured for " + api_timeline) } defer resp.Body.Close() ri_mutex.Lock() m := runninginstances[endpoint] m.Status = RUNNING m.LastRun = "Streaming" runninginstances[endpoint] = m ri_mutex.Unlock() s := bufio.NewScanner(resp.Body) var name string for s.Scan() { line := s.Text() token := strings.SplitN(line, ":", 2) var newpost ReportPost if len(token) != 2 { continue } switch strings.TrimSpace(token[0]) { case "event": name = strings.TrimSpace(token[1]) continue case "data": switch name { case "update": jsoner := token[1][1:] err := json.Unmarshal([]byte(jsoner), &newpost) if err != nil { log.Fatal("Unable to unmarshal with error: ", err) } default: continue } default: continue } log.Print("----------> " + newpost.Uri) go check_post(newpost.Uri) at_sign := strings.Index(newpost.Account.Acct, "@") newinstance := newpost.Account.Acct[at_sign+1:] if settings.Crawl == true && stringexists(endpoint, settings.Banned) == false { ri_mutex.Lock() o, exists := runninginstances[newinstance] if exists == false || o.Status == KEEPALIVE { m := RunningInstance{} runninginstances[newinstance] = m go StartInstance(newinstance, reportPostChan) } ri_mutex.Unlock() } } ri_mutex.Lock() m = runninginstances[endpoint] m.LastRun = time.Now().Format(time.RFC3339) m.Status = STREAM_ENDED runninginstances[endpoint] = m ri_mutex.Unlock() }