From 1adaba8322d6284c6647ed532b07b17dd834b27b Mon Sep 17 00:00:00 2001 From: Farhan Khan Date: Thu, 14 Jan 2021 19:51:42 +0000 Subject: [PATCH] retrying connections + logging prefix this is because some hosting providers throttle rapid new connections regex additions --- Makefile | 4 ++-- config.go | 7 +++--- ctl.go | 38 ++++++++++++++++---------------- db.go | 3 +-- fedilogue.go | 11 +++++----- instance.go | 62 ++++++++++++++++++++++++++++++++++++++++++---------- oauth.go | 42 +++++++++++++++-------------------- poll.go | 26 +++++++++------------- retrieve.go | 27 ++++++++++++++++------- stream.go | 36 +++++++++++++++++++++++------- 10 files changed, 156 insertions(+), 100 deletions(-) diff --git a/Makefile b/Makefile index 9a41436..9ea23a0 100644 --- a/Makefile +++ b/Makefile @@ -1,5 +1,5 @@ -FEDILOGUE_GOFILES = fedilogue.go ctl.go headers.go instance.go poll.go stream.go web.go db.go config.go oauth.go retrieve.go -FEDICTL_GOFILES = fedictl.go headers.go +FEDILOGUE_GOFILES = fedilogue.go ctl.go headers.go instance.go poll.go stream.go web.go db.go config.go oauth.go retrieve.go log.go +FEDICTL_GOFILES = fedictl.go headers.go log.go build: go build -o fedilogue $(FEDILOGUE_GOFILES) diff --git a/config.go b/config.go index a61b97a..b69d1bd 100644 --- a/config.go +++ b/config.go @@ -3,7 +3,6 @@ package main import ( "encoding/json" "io/ioutil" - "log" "muzzammil.xyz/jsonc" ) @@ -67,15 +66,15 @@ func stringexists(needle string, haystack []string) bool { func getSettings() { c, err := ioutil.ReadFile("config.jsonc") if err != nil { - log.Fatal("Unable to open config.jsonc, exiting: ", err) + logFatal.Fatal("Unable to open config.jsonc, exiting: ", err) } jsoncbin := jsonc.ToJSON(c) // Calling jsonc.ToJSON() to convert JSONC to JSON if jsonc.Valid(jsoncbin) == false { - log.Fatal("Invalid jsonc, exiting.") + logFatal.Fatal("Invalid jsonc, exiting.") } err = json.Unmarshal(jsoncbin, &settings) if err != nil { - log.Fatal("Unable to parse config.jsonc, exiting: ", err) + logFatal.Fatal("Unable to parse config.jsonc, exiting: ", err) } } diff --git a/ctl.go b/ctl.go index 1ff054c..e79dd59 100644 --- a/ctl.go +++ b/ctl.go @@ -3,17 +3,15 @@ package main import ( "encoding/binary" "encoding/json" - "fmt" "io" - "log" "net" ) func startctl() { - log.Print("Starting ctl listener on 127.0.0.1:5555") + logInfo.Print("Starting ctl listener on 127.0.0.1:5555") l, err := net.Listen("tcp", "127.0.0.1:5555") if err != nil { - log.Fatal("Unable to start listener:", err) + logFatal.Fatal("Unable to start listener:", err) } defer l.Close() @@ -23,7 +21,7 @@ func startctl() { for { c, err := l.Accept() if err != nil { - log.Fatal("Error on accept", err) + logFatal.Fatal("Error on accept", err) } commandClient <- c } @@ -43,35 +41,37 @@ func handleClient(commandClient net.Conn) { var responseback ResponseBack n, err := io.ReadFull(commandClient, sizebyte) if err != nil { - log.Fatal("Read error: ", err) + logFatal.Fatal("Read error: ", err) + return } if n != 4 { - log.Fatal("Did not read 4 bytes, failure.") + logFatal.Fatal("Did not read 4 bytes, failure.") + return } jsonsize := int(binary.LittleEndian.Uint32(sizebyte)) jsonbyte := make([]byte, jsonsize) n, err = io.ReadFull(commandClient, jsonbyte) if err != nil { - log.Fatal("Unable to unmarshal") + logFatal.Fatal("Unable to unmarshal") } if n != jsonsize { - log.Fatal("Failed to read json size of ", n) + logFatal.Fatal("Failed to read json size of ", n) } err = json.Unmarshal(jsonbyte, &commandmap) if err != nil { - log.Fatal("Unable to unmarshal") + logFatal.Fatal("Unable to unmarshal") } switch commandmap.Type { case "status": responseback.Message = "Ok" case "add": - log.Print("Manually added instance: " + commandmap.Endpoint) + logInfo.Print("Manually added instance: " + commandmap.Endpoint) ri_mutex.Lock() _, exists := runninginstances[commandmap.Endpoint] if exists == true { - log.Println("Already exists: " + commandmap.Endpoint) + logInfo.Println("Already exists: " + commandmap.Endpoint) responseback.Message = "Exists: " + commandmap.Endpoint } else { responseback.Message = "Added: " + commandmap.Endpoint @@ -80,11 +80,11 @@ func handleClient(commandClient net.Conn) { } ri_mutex.Unlock() case "suspend": - fmt.Println("Suspend") + logFatal.Fatal("Suspend") case "resume": - fmt.Println("Resume") + logFatal.Fatal("Resume") default: - fmt.Println("Something else") + logFatal.Fatal("Something else") } responseback.Type = "status" @@ -92,7 +92,7 @@ func handleClient(commandClient net.Conn) { responsebytes, err := json.Marshal(responseback) if err != nil { - log.Fatal("Error: ", err) + logErr.Fatal(err) } n = len(responsebytes) @@ -100,17 +100,17 @@ func handleClient(commandClient net.Conn) { _, err = commandClient.Write(sizebyte) if err != nil { - log.Fatal("Error on write:", err) + logFatal.Fatal("Error on write:", err) } responsebyte, err := json.Marshal(responseback) if err != nil { - log.Fatal("Error response back: ", err) + logFatal.Fatal("Error response back: ", err) } _, err = commandClient.Write(responsebyte) if err != nil { - log.Fatal("Error on write:", err) + logFatal.Fatal("Error on write:", err) } commandClient.Close() } diff --git a/db.go b/db.go index 4c2a7f7..069d0dd 100644 --- a/db.go +++ b/db.go @@ -4,7 +4,6 @@ import ( "context" "fmt" "github.com/jackc/pgx/pgxpool" - "log" ) func getDbPool() *pgxpool.Pool { @@ -12,7 +11,7 @@ func getDbPool() *pgxpool.Pool { dbURI := fmt.Sprintf("postgres://%s:%s@%s:%d/fedilogue", settings.Database.Username, settings.Database.Password, settings.Database.Host, settings.Database.Port) pool, err := pgxpool.Connect(context.Background(), dbURI) if err != nil { - log.Fatal("Unable to connect to database:", err) + logFatal.Fatal("Unable to connect to database:", err) } return pool } diff --git a/fedilogue.go b/fedilogue.go index b08896e..913cfb3 100644 --- a/fedilogue.go +++ b/fedilogue.go @@ -3,7 +3,6 @@ package main import ( "github.com/jackc/pgx/pgxpool" "github.com/microcosm-cc/bluemonday" - "log" "net/http" _ "net/http/pprof" "regexp" @@ -17,12 +16,13 @@ var ri_mutex = &sync.Mutex{} var pool *pgxpool.Pool func startpprof() { - log.Print("Starting http/pprof on :7777") - log.Fatal(http.ListenAndServe("127.0.0.1:7777", nil)) + logInfo.Print("Starting http/pprof on :7777") + logFatal.Fatal(http.ListenAndServe("127.0.0.1:7777", nil)) } func main() { // Initial Setup + logInit() runninginstances = make(map[string]RunningInstance) getSettings() @@ -31,12 +31,11 @@ func main() { pool = getDbPool() p = bluemonday.NewPolicy() - spaceReg = regexp.MustCompile(`\s+`) -// re = regexp.MustCompile("^https?://(.*)/.*$") + spaceReg = regexp.MustCompile(`[\s\t\.]+`) re = regexp.MustCompile("^https?://([^/]*)/(.*)$") for _, endpoint := range settings.Autostart { - log.Print("Autostarting " + endpoint) + logInfo.Print("Autostarting " + endpoint) ri_mutex.Lock() _, exists := runninginstances[endpoint] if exists == false { diff --git a/instance.go b/instance.go index dbd2b13..6cd81fa 100644 --- a/instance.go +++ b/instance.go @@ -4,7 +4,6 @@ import ( "encoding/json" "github.com/microcosm-cc/bluemonday" "io/ioutil" - "log" "net/http" "regexp" "strings" @@ -16,17 +15,40 @@ var p *bluemonday.Policy var spaceReg *regexp.Regexp var re *regexp.Regexp +func DoTries(o *RunningInstance, req *http.Request) (*http.Response, error) { + var resp *http.Response + var err error + + for tries := 0; tries < 10; tries++ { + resp, err = o.client.Do(req) + if err != nil { + // URL.Scheme, Host, Path Opaque + logWarn.Print("Failure connecting to " + req.URL.Scheme + "://" + req.URL.Host + req.URL.Path + ", attempt ", tries + 1, ", sleeping for 5 minutes: ", err) + time.Sleep(time.Minute * 5) + continue + } + break + } + return resp, err +} + func GetRunner(endpoint string) RunningInstance { ri_mutex.Lock() o, exists := runninginstances[endpoint] if exists == false { o := RunningInstance{} - //tr := &http.Transport{MaxIdleConns: 10, IdleConnTimeout: 7200 * time.Second} - tr := &http.Transport{MaxIdleConns: 10, IdleConnTimeout: 7200 * time.Second} + tr := &http.Transport{ + MaxIdleConns: 10, + IdleConnTimeout: 7200 * time.Second, + DialContext: (&net.Dialer{ + Timeout: 30 * time.Second, + KeepAlive: 30 * time.Second, + DualStack: true, + }).DialContext, + } o.client = http.Client{Transport: tr} - //o.client = http.Client{} o.Status = KEEPALIVE runninginstances[endpoint] = o } @@ -72,11 +94,11 @@ func GetNodeInfo(endpoint string, o RunningInstance) RunningInstance { // Check the front page index_uri := "https://" + endpoint + "/" req, _ = http.NewRequest("GET", index_uri, nil) - resp_index, err := o.client.Do(req) + resp_index, err := DoTries(&o, req) o.LastRun = time.Now().Format(time.RFC3339) if err != nil { o.Status = UNSUPPORTED_INSTANCE - log.Print("Unable to connect to " + endpoint + ", giving up") + logWarn.Print("Unable to connect to " + endpoint + ", giving up") return o } defer resp_index.Body.Close() @@ -84,7 +106,7 @@ func GetNodeInfo(endpoint string, o RunningInstance) RunningInstance { indexbin, err := ioutil.ReadAll(resp_index.Body) if err != nil { o.Status = UNSUPPORTED_INSTANCE - log.Print("Unable to read index of " + endpoint + ", giving up") + logWarn.Print("Unable to read index of " + endpoint + ", giving up") return o } indexstr := string(indexbin) @@ -105,10 +127,26 @@ func GetNodeInfo(endpoint string, o RunningInstance) RunningInstance { func CheckInstance(newinstance string, callerEndpoint string) { if settings.Crawl == true && stringexists(newinstance, settings.Banned) == false { - _, err := net.LookupHost(newinstance) - if err != nil { // Bad hostname + // Skip over this if its the same as the endpoint or empty + if newinstance == callerEndpoint || newinstance == "" { return } + + var err error + for attempt := 0; attempt > 5; attempt = attempt + 1 { + _, err = net.LookupHost(newinstance) + if err != nil { + logDebug.Print("Unable to resolve " + newinstance + " attempt ", attempt, "/5. Sleeping for 30 seconds") + time.Sleep(time.Second * 30) + continue + } + break + } + if err != nil { + logWarn.Print("Unable to resolve ", newinstance, " after 5 attempts, giving up: ", err) + return + } + // Skip over this if its the same as the endpoint if newinstance == callerEndpoint { return @@ -138,14 +176,16 @@ func StartInstance(endpoint string) { } if o.Software == "pleroma" { - log.Print("Starting " + endpoint + " as " + o.Software + " " + o.Version) + logInfo.Print("Starting " + endpoint + " as " + o.Software + " " + o.Version) o.CaptureType = "Poll" UpdateRunner(endpoint, o) PollMastodonPleroma(endpoint, &o) } else if o.Software == "mastodon" { - log.Print("Starting " + endpoint + " as " + o.Software + " " + o.Version) + logInfo.Print("Starting " + endpoint + " as " + o.Software + " " + o.Version) o.CaptureType = "Stream" UpdateRunner(endpoint, o) StreamMastodon(endpoint, &o) + } else { + logWarn.Print("Unsupported endpoint " + endpoint) } } diff --git a/oauth.go b/oauth.go index 98599a1..a85c5c9 100644 --- a/oauth.go +++ b/oauth.go @@ -6,7 +6,6 @@ import ( "encoding/json" "io" "io/ioutil" - "log" "net/http" "os" ) @@ -38,12 +37,13 @@ func register_client(endpoint string, o *RunningInstance) error { resp, err := o.client.Post(api_base_apps, "application/json", requestBodybytes) if err != nil { - log.Fatal("Unable to connect to "+api_base_apps+" ", err) + logErr.Print("Unable to connect to "+api_base_apps+" ", err) + return err } body, err := ioutil.ReadAll(resp.Body) if err != nil { - log.Fatal("Unable to read HTTP response: ", err) + logErr.Print("Unable to read HTTP response: ", err) o.client_id = "" o.client_secret = "" return err @@ -53,7 +53,7 @@ func register_client(endpoint string, o *RunningInstance) error { bodymap := make(map[string]string) err = json.Unmarshal(body, &bodymap) if err != nil { - log.Fatal("Unable to Unmarshal response: ", err) + logErr.Print("Unable to parse response from "+endpoint+": ", err) o.client_id = "" o.client_secret = "" return err @@ -63,24 +63,23 @@ func register_client(endpoint string, o *RunningInstance) error { f, err := os.Create("clients/" + endpoint) if err != nil { - log.Fatal("Unable to create "+client_file+": ", err) + logErr.Print("Unable to create "+client_file+": ", err) o.client_id = "" o.client_secret = "" return err - //return bodymap["client_id"], bodymap["client_secret"], nil } defer f.Close() _, err = io.WriteString(f, bodymap["client_id"]+"\n") if err != nil { - log.Fatal("Unable to write client_id line: ", err) + logErr.Print("Unable to write client_id line to file "+client_file+": ", err) o.client_id = bodymap["client_id"] o.client_secret = bodymap["client_secret"] return nil } _, err = io.WriteString(f, bodymap["client_secret"]+"\n") if err != nil { - log.Fatal("Unable to write client_secret line: ", err) + logErr.Print("Unable to write client_secret to file "+client_file+": ", err) o.client_id = bodymap["client_id"] o.client_secret = bodymap["client_secret"] return nil @@ -98,9 +97,8 @@ func get_client(endpoint string, o *RunningInstance) error { if os.IsNotExist(err) == false { // The file exists f, err := os.Open(client_file) if err != nil { - log.Print("Unable to open " + client_file + ", creating new client") + logErr.Print("Unable to open " + client_file + ", creating new client") return err - // return register_client(endpoint, o) } defer f.Close() @@ -109,17 +107,14 @@ func get_client(endpoint string, o *RunningInstance) error { client_id_bin, _, err := rd.ReadLine() o.client_id = string(client_id_bin) if err != nil { - log.Print("Unable to read client_id line of " + client_file + ", building new client") + logErr.Print("Unable to read client_id line of " + client_file + ", building new client") return err - // return register_client(endpoint, o) } client_secret_bin, _, err := rd.ReadLine() o.client_secret = string(client_secret_bin) if err != nil { - log.Print("Unable to read client_secret line of " + client_file + ", building new client") + logErr.Print("Unable to read client_secret line of " + client_file + ", building new client") return err - // return register_client(endpoint, o) - // return o } return nil @@ -143,7 +138,7 @@ func oauth_login(endpoint string, o *RunningInstance, username string, password }) if err != nil { - log.Print("Unable to create Authentication map") + logErr.Print("Unable to create Authentication map for "+endpoint) return OAuth{}, err } @@ -151,30 +146,29 @@ func oauth_login(endpoint string, o *RunningInstance, username string, password resp, err := http.Post("https://"+endpoint+"/oauth/token", "application/json", authMapbytes) if err != nil { - log.Print("Cannot connect to "+endpoint+": ", err) + logErr.Print("Cannot connect to "+endpoint+": ", err) return OAuth{}, err } body, err := ioutil.ReadAll(resp.Body) if err != nil { - log.Print("Unable to read response data: ", err) + logErr.Print("Unable to read response data for "+endpoint+": ", err) return OAuth{}, err } if resp.StatusCode == 400 { - log.Print("Unable to authenticate") + logErr.Print("Unable to authenticate to " + endpoint) return OAuth{}, &authError{"Authentication error"} } oauthData := OAuth{} err = json.Unmarshal(body, &oauthData) if err != nil { - log.Print("Unable to Unmarshal json data: ", err) + logErr.Print("Unable to parse json data for "+endpoint+": ", err) return OAuth{}, err } return oauthData, nil - } func oauth_refresh(endpoint string, client_id string, client_secret string, refresh_token string) (OAuth, error) { @@ -191,20 +185,20 @@ func oauth_refresh(endpoint string, client_id string, client_secret string, refr resp, err := http.Post("https://"+endpoint+"/oauth/token", "application/json", authMapbytes) if err != nil { - log.Print("Cannot connect to "+endpoint+": ", err) + logErr.Print("Unable to connect to "+endpoint+": ", err) return OAuth{}, err } body, err := ioutil.ReadAll(resp.Body) if err != nil { - log.Print("Unable to read response data: ", err) + logErr.Print("Unable to read response data for "+endpoint+": ", err) return OAuth{}, err } oauthData := OAuth{} err = json.Unmarshal(body, &oauthData) if err != nil { - log.Print("Unable to Unmarshal json data: ", err) + logErr.Print("Unable to parse json data for "+endpoint+": ", err) return oauthData, err } diff --git a/poll.go b/poll.go index 929bd77..98fdbfb 100644 --- a/poll.go +++ b/poll.go @@ -3,7 +3,6 @@ package main import ( "encoding/json" "io/ioutil" - "log" "net/http" "time" ) @@ -53,7 +52,6 @@ func PollMastodonPleroma(endpoint string, o *RunningInstance) { min_id := "" parsing_error := 0 - unprocess_error := 0 use_auth := false var last_refresh int64 @@ -68,13 +66,13 @@ func PollMastodonPleroma(endpoint string, o *RunningInstance) { o := GetRunner(endpoint) err = get_client(endpoint, &o) if err != nil { - log.Print("Unable to register client: ", err) + logErr.Print("Unable to register client for " + endpoint + ": ", err) return } oauthData, err = oauth_login(endpoint, &o, extaccount.Username, extaccount.Password) if err != nil { - log.Print("Unable to login: ", err) + logErr.Print("Unable to login to " + endpoint + ": ", err) return } last_refresh = time.Now().Unix() @@ -90,7 +88,7 @@ func PollMastodonPleroma(endpoint string, o *RunningInstance) { api_timeline := "https://" + endpoint + "/api/v1/timelines/public?limit=40&since_id=" + min_id req, err := http.NewRequest("GET", api_timeline, nil) if err != nil { - log.Print("Unable to create new request") + logFatal.Fatal("Unable to create new request for " + endpoint + ": ", err) return } @@ -98,7 +96,7 @@ func PollMastodonPleroma(endpoint string, o *RunningInstance) { if time.Now().Unix() > last_refresh+oauthData.Expires_in { oauthData, err = oauth_refresh(endpoint, client_id, client_secret, oauthData.Refresh_token) if err != nil { - log.Print("Unable to refresh: ", err) + logWarn.Fatal("Unable to refresh oauth token for " + endpoint + ": ", err) return } last_refresh = time.Now().Unix() @@ -107,31 +105,28 @@ func PollMastodonPleroma(endpoint string, o *RunningInstance) { } m.LastRun = time.Now().Format(time.RFC3339) - resp, err := o.client.Do(req) + resp, err := DoTries(o, req) if err != nil { m.Status = CLIENT_ISSUE ri_mutex.Lock() runninginstances[endpoint] = m ri_mutex.Unlock() + logWarn.Print("Giving up on " + endpoint + ": ", err.Error()) return } if resp.StatusCode == TOOMANYREQUESTS { // Short Delay, 30 seconds - log.Print("Delaying "+endpoint+", gave status ", resp.StatusCode, ", 1 hour delay") + logWarn.Print("Delaying "+endpoint+", gave status ", resp.StatusCode, ", 1 hour delay") _, _ = ioutil.ReadAll(resp.Body) resp.Body.Close() // Release as soon as done m.Status = resp.StatusCode ri_mutex.Lock() runninginstances[endpoint] = m ri_mutex.Unlock() - if unprocess_error > 5 { - log.Print("Exiting for " + endpoint) - } - unprocess_error = unprocess_error + 1 time.Sleep(time.Second * 30) continue } else if resp.StatusCode == INTERNAL_ERROR { // Longer delay, 1 hour - log.Print("Suspending "+endpoint+", gave status ", resp.StatusCode, ", 1 hour delay") + logWarn.Print("Suspending " + endpoint + ", gave status ", resp.StatusCode, ", 1 hour delay") _, _ = ioutil.ReadAll(resp.Body) resp.Body.Close() // Release as soon as done m.Status = 765 @@ -141,7 +136,7 @@ func PollMastodonPleroma(endpoint string, o *RunningInstance) { time.Sleep(time.Second * 3600) continue } else if resp.StatusCode != 200 { // Crash - log.Print("Terminating "+endpoint+", gave status ", resp.StatusCode) + logErr.Print("Terminating " + endpoint + ", gave status ", resp.StatusCode) _, _ = ioutil.ReadAll(resp.Body) resp.Body.Close() // Release as soon as done m.Status = resp.StatusCode @@ -158,7 +153,7 @@ func PollMastodonPleroma(endpoint string, o *RunningInstance) { ri_mutex.Lock() runninginstances[endpoint] = m ri_mutex.Unlock() - log.Print("Giving up on " + endpoint) + logErr.Print("Giving up on " + endpoint + " after 5 unmarshal errors.") return } parsing_error = parsing_error + 1 @@ -182,7 +177,6 @@ func PollMastodonPleroma(endpoint string, o *RunningInstance) { min_id = newpost.Id } - // Only done if we are crawling go CheckInstance(newinstance, endpoint) } } diff --git a/retrieve.go b/retrieve.go index c3fcf96..00e2abd 100644 --- a/retrieve.go +++ b/retrieve.go @@ -6,7 +6,6 @@ import ( "errors" "html" "io/ioutil" - "log" "io" "net/http" "strings" @@ -139,7 +138,7 @@ func check_post(uri string) (PostJson, error) { } postjson.normalized = html.UnescapeString(strings.ToLower(p.Sanitize(postjson.Content))) - spaceReg.ReplaceAllString(postjson.normalized, " ") + postjson.normalized = spaceReg.ReplaceAllString(postjson.normalized, " ") _, err = pool.Exec(context.Background(), "INSERT INTO posts (id, inreplyto, published, summary, content, normalized, attributedto, posthash, instance) VALUES($1, $2, $3, $4, $5, $6, $7, $8, $9)", postjson.ID, postjson.InReplyTo, postjson.Published, postjson.Summary, postjson.Content, postjson.normalized, postjson.AttributedTo, postjson.posthash, postjson.instance) if err != nil { @@ -182,15 +181,28 @@ func check_user(uri string) (UserJson, error) { req, _ := http.NewRequest("GET", uri, nil) req.Header.Add("Accept", "application/ld+json") - resp, err := o.client.Do(req) - if err != nil { - log.Print("Retrieval error: ", err) - return userjson, err + var resp *http.Response + tries := 0 + for { + resp, err = o.client.Do(req) + if err != nil { + if tries > 10 { + logErr.Print("Unable to connect to "+uri+" attempt 10/10, giving up.") + return userjson, err + } + logWarn.Print("Unable to connect to "+uri+", attempt ",tries+1,"+/10 sleeping for 30 seconds.") + time.Sleep(time.Second * 30) + tries = tries + 1 + continue + } + break } err = json.NewDecoder(resp.Body).Decode(&userjson) if err != nil { - log.Print("Retrieval error: ", err) + // Going forward, this might need to be double-checked, but for now just die +// log.Fatal("Retrieval error 2: ", err) + tries = tries + 1 return userjson, err } @@ -200,7 +212,6 @@ func check_user(uri string) (UserJson, error) { _, err = pool.Exec(context.Background(), "INSERT INTO accounts (id, actor_type, inbox, outbox, followers, following, url, preferredUsername, name, summary, icon, image, publicKey, instance) VALUES($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14)", userjson.ID, userjson.Type, userjson.Inbox, userjson.Outbox, userjson.Followers, userjson.Following, userjson.Url, userjson.PreferredUsername, userjson.Name, userjson.Summary, userjson.Icon.Url, userjson.Image.Url, userjson.PublicKey.PublicKeyPem, userjson.instance) if err != nil { - // log.Print("INSERT accounts error: ", err) return userjson, err } diff --git a/stream.go b/stream.go index f60260e..1066ea9 100644 --- a/stream.go +++ b/stream.go @@ -3,14 +3,24 @@ package main import ( "bufio" "encoding/json" - "log" "net/http" "strings" "time" + "net" ) func StreamMastodon(endpoint string, o *RunningInstance) { - stream_client := http.Client{} + tr := &http.Transport{ + MaxIdleConns: 10, + IdleConnTimeout: 7200 * time.Second, + DialContext: (&net.Dialer{ + Timeout: 30 * time.Second, + KeepAlive: 30 * time.Second, + DualStack: true, + }).DialContext, + } + + stream_client := http.Client{Transport: tr} var oauthData OAuth var retry bool @@ -20,7 +30,7 @@ func StreamMastodon(endpoint string, o *RunningInstance) { api_timeline := "https://" + endpoint + "/api/v1/streaming/public" req, err := http.NewRequest("GET", api_timeline, nil) if err != nil { - log.Print("Unable to create new request") + logFatal.Fatal("Unable to create new request for " + endpoint + ", exiting.") return } @@ -30,12 +40,12 @@ func StreamMastodon(endpoint string, o *RunningInstance) { err = get_client(endpoint, o) if err != nil { - log.Fatal("Unable to register client: ", err) + logWarn.Print("Unable to register client: ", err) } oauthData, err = oauth_login(endpoint, o, extaccount.Username, extaccount.Password) if err != nil { - log.Print("Unable to login: ", err) + logWarn.Print("Unable to login: ", err) return } @@ -44,9 +54,19 @@ func StreamMastodon(endpoint string, o *RunningInstance) { } } - resp, err := stream_client.Do(req) + var resp *http.Response + + for tries := 0; tries < 10; tries++ { + resp, err = stream_client.Do(req) + if err != nil { + time.Sleep(time.Minute * 5) + logWarn.Print("Failure connecting to " + req.URL.Scheme + "://" + req.URL.Host + req.URL.Path + ", attempt ", tries + 1, ", sleeping for 5 minutes, ", err) + continue + } + break + } if err != nil { - log.Print("Unable to stream "+api_timeline+": ", err) + logErr.Print("Unable to stream " + api_timeline + ": ", err) return } defer resp.Body.Close() @@ -78,8 +98,8 @@ func StreamMastodon(endpoint string, o *RunningInstance) { jsondata := token[1][1:] err := json.Unmarshal([]byte(jsondata), &newpost) if err != nil { + logDebug.Print("Unable to parse data from "+endpoint+", but still connected.") continue - log.Fatal("Unable to unmarshal with error: ", err) } retry = true default: