fedilogue/stream.go
Farhan Khan 1adaba8322 retrying connections + logging prefix
this is because some hosting providers throttle rapid new connections
regex additions
2021-01-14 19:51:42 +00:00

127 lines
2.7 KiB
Go

package main
import (
"bufio"
"encoding/json"
"net/http"
"strings"
"time"
"net"
)
func StreamMastodon(endpoint string, o *RunningInstance) {
tr := &http.Transport{
MaxIdleConns: 10,
IdleConnTimeout: 7200 * time.Second,
DialContext: (&net.Dialer{
Timeout: 30 * time.Second,
KeepAlive: 30 * time.Second,
DualStack: true,
}).DialContext,
}
stream_client := http.Client{Transport: tr}
var oauthData OAuth
var retry bool
for {
api_timeline := "https://" + endpoint + "/api/v1/streaming/public"
req, err := http.NewRequest("GET", api_timeline, nil)
if err != nil {
logFatal.Fatal("Unable to create new request for " + endpoint + ", exiting.")
return
}
for _, extaccount := range settings.Externalaccounts {
if extaccount.Endpoint == endpoint {
get_client(endpoint, o)
err = get_client(endpoint, o)
if err != nil {
logWarn.Print("Unable to register client: ", err)
}
oauthData, err = oauth_login(endpoint, o, extaccount.Username, extaccount.Password)
if err != nil {
logWarn.Print("Unable to login: ", err)
return
}
req.Header.Add("Authorization", oauthData.Access_token)
}
}
var resp *http.Response
for tries := 0; tries < 10; tries++ {
resp, err = stream_client.Do(req)
if err != nil {
time.Sleep(time.Minute * 5)
logWarn.Print("Failure connecting to " + req.URL.Scheme + "://" + req.URL.Host + req.URL.Path + ", attempt ", tries + 1, ", sleeping for 5 minutes, ", err)
continue
}
break
}
if err != nil {
logErr.Print("Unable to stream " + api_timeline + ": ", err)
return
}
defer resp.Body.Close()
ri_mutex.Lock()
m := runninginstances[endpoint]
m.Status = RUNNING
m.LastRun = "Streaming"
runninginstances[endpoint] = m
ri_mutex.Unlock()
s := bufio.NewScanner(resp.Body)
var name string
for s.Scan() {
line := s.Text()
token := strings.SplitN(line, ":", 2)
var newpost ReportPost
if len(token) != 2 {
continue
}
switch strings.TrimSpace(token[0]) {
case "event":
name = strings.TrimSpace(token[1])
continue
case "data":
switch name {
case "update":
jsondata := token[1][1:]
err := json.Unmarshal([]byte(jsondata), &newpost)
if err != nil {
logDebug.Print("Unable to parse data from "+endpoint+", but still connected.")
continue
}
retry = true
default:
continue
}
default:
continue
}
go check_post(newpost.Uri)
matchset := re.FindStringSubmatch(newpost.Uri)
if matchset != nil {
newinstance := matchset[1]
go CheckInstance(newinstance, endpoint)
}
}
if retry == true {
time.Sleep(time.Minute * 30)
} else {
break
}
}
}