retrying connections + logging prefix
this is because some hosting providers throttle rapid new connections regex additions
This commit is contained in:
parent
e9bd9b67cf
commit
1adaba8322
4
Makefile
4
Makefile
@ -1,5 +1,5 @@
|
||||
FEDILOGUE_GOFILES = fedilogue.go ctl.go headers.go instance.go poll.go stream.go web.go db.go config.go oauth.go retrieve.go
|
||||
FEDICTL_GOFILES = fedictl.go headers.go
|
||||
FEDILOGUE_GOFILES = fedilogue.go ctl.go headers.go instance.go poll.go stream.go web.go db.go config.go oauth.go retrieve.go log.go
|
||||
FEDICTL_GOFILES = fedictl.go headers.go log.go
|
||||
|
||||
build:
|
||||
go build -o fedilogue $(FEDILOGUE_GOFILES)
|
||||
|
@ -3,7 +3,6 @@ package main
|
||||
import (
|
||||
"encoding/json"
|
||||
"io/ioutil"
|
||||
"log"
|
||||
"muzzammil.xyz/jsonc"
|
||||
)
|
||||
|
||||
@ -67,15 +66,15 @@ func stringexists(needle string, haystack []string) bool {
|
||||
func getSettings() {
|
||||
c, err := ioutil.ReadFile("config.jsonc")
|
||||
if err != nil {
|
||||
log.Fatal("Unable to open config.jsonc, exiting: ", err)
|
||||
logFatal.Fatal("Unable to open config.jsonc, exiting: ", err)
|
||||
}
|
||||
jsoncbin := jsonc.ToJSON(c) // Calling jsonc.ToJSON() to convert JSONC to JSON
|
||||
if jsonc.Valid(jsoncbin) == false {
|
||||
log.Fatal("Invalid jsonc, exiting.")
|
||||
logFatal.Fatal("Invalid jsonc, exiting.")
|
||||
}
|
||||
|
||||
err = json.Unmarshal(jsoncbin, &settings)
|
||||
if err != nil {
|
||||
log.Fatal("Unable to parse config.jsonc, exiting: ", err)
|
||||
logFatal.Fatal("Unable to parse config.jsonc, exiting: ", err)
|
||||
}
|
||||
}
|
||||
|
38
ctl.go
38
ctl.go
@ -3,17 +3,15 @@ package main
|
||||
import (
|
||||
"encoding/binary"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"io"
|
||||
"log"
|
||||
"net"
|
||||
)
|
||||
|
||||
func startctl() {
|
||||
log.Print("Starting ctl listener on 127.0.0.1:5555")
|
||||
logInfo.Print("Starting ctl listener on 127.0.0.1:5555")
|
||||
l, err := net.Listen("tcp", "127.0.0.1:5555")
|
||||
if err != nil {
|
||||
log.Fatal("Unable to start listener:", err)
|
||||
logFatal.Fatal("Unable to start listener:", err)
|
||||
}
|
||||
defer l.Close()
|
||||
|
||||
@ -23,7 +21,7 @@ func startctl() {
|
||||
for {
|
||||
c, err := l.Accept()
|
||||
if err != nil {
|
||||
log.Fatal("Error on accept", err)
|
||||
logFatal.Fatal("Error on accept", err)
|
||||
}
|
||||
commandClient <- c
|
||||
}
|
||||
@ -43,35 +41,37 @@ func handleClient(commandClient net.Conn) {
|
||||
var responseback ResponseBack
|
||||
n, err := io.ReadFull(commandClient, sizebyte)
|
||||
if err != nil {
|
||||
log.Fatal("Read error: ", err)
|
||||
logFatal.Fatal("Read error: ", err)
|
||||
return
|
||||
}
|
||||
if n != 4 {
|
||||
log.Fatal("Did not read 4 bytes, failure.")
|
||||
logFatal.Fatal("Did not read 4 bytes, failure.")
|
||||
return
|
||||
}
|
||||
jsonsize := int(binary.LittleEndian.Uint32(sizebyte))
|
||||
jsonbyte := make([]byte, jsonsize)
|
||||
n, err = io.ReadFull(commandClient, jsonbyte)
|
||||
if err != nil {
|
||||
log.Fatal("Unable to unmarshal")
|
||||
logFatal.Fatal("Unable to unmarshal")
|
||||
}
|
||||
if n != jsonsize {
|
||||
log.Fatal("Failed to read json size of ", n)
|
||||
logFatal.Fatal("Failed to read json size of ", n)
|
||||
}
|
||||
|
||||
err = json.Unmarshal(jsonbyte, &commandmap)
|
||||
if err != nil {
|
||||
log.Fatal("Unable to unmarshal")
|
||||
logFatal.Fatal("Unable to unmarshal")
|
||||
}
|
||||
|
||||
switch commandmap.Type {
|
||||
case "status":
|
||||
responseback.Message = "Ok"
|
||||
case "add":
|
||||
log.Print("Manually added instance: " + commandmap.Endpoint)
|
||||
logInfo.Print("Manually added instance: " + commandmap.Endpoint)
|
||||
ri_mutex.Lock()
|
||||
_, exists := runninginstances[commandmap.Endpoint]
|
||||
if exists == true {
|
||||
log.Println("Already exists: " + commandmap.Endpoint)
|
||||
logInfo.Println("Already exists: " + commandmap.Endpoint)
|
||||
responseback.Message = "Exists: " + commandmap.Endpoint
|
||||
} else {
|
||||
responseback.Message = "Added: " + commandmap.Endpoint
|
||||
@ -80,11 +80,11 @@ func handleClient(commandClient net.Conn) {
|
||||
}
|
||||
ri_mutex.Unlock()
|
||||
case "suspend":
|
||||
fmt.Println("Suspend")
|
||||
logFatal.Fatal("Suspend")
|
||||
case "resume":
|
||||
fmt.Println("Resume")
|
||||
logFatal.Fatal("Resume")
|
||||
default:
|
||||
fmt.Println("Something else")
|
||||
logFatal.Fatal("Something else")
|
||||
}
|
||||
|
||||
responseback.Type = "status"
|
||||
@ -92,7 +92,7 @@ func handleClient(commandClient net.Conn) {
|
||||
|
||||
responsebytes, err := json.Marshal(responseback)
|
||||
if err != nil {
|
||||
log.Fatal("Error: ", err)
|
||||
logErr.Fatal(err)
|
||||
}
|
||||
|
||||
n = len(responsebytes)
|
||||
@ -100,17 +100,17 @@ func handleClient(commandClient net.Conn) {
|
||||
|
||||
_, err = commandClient.Write(sizebyte)
|
||||
if err != nil {
|
||||
log.Fatal("Error on write:", err)
|
||||
logFatal.Fatal("Error on write:", err)
|
||||
}
|
||||
|
||||
responsebyte, err := json.Marshal(responseback)
|
||||
if err != nil {
|
||||
log.Fatal("Error response back: ", err)
|
||||
logFatal.Fatal("Error response back: ", err)
|
||||
}
|
||||
|
||||
_, err = commandClient.Write(responsebyte)
|
||||
if err != nil {
|
||||
log.Fatal("Error on write:", err)
|
||||
logFatal.Fatal("Error on write:", err)
|
||||
}
|
||||
commandClient.Close()
|
||||
}
|
||||
|
3
db.go
3
db.go
@ -4,7 +4,6 @@ import (
|
||||
"context"
|
||||
"fmt"
|
||||
"github.com/jackc/pgx/pgxpool"
|
||||
"log"
|
||||
)
|
||||
|
||||
func getDbPool() *pgxpool.Pool {
|
||||
@ -12,7 +11,7 @@ func getDbPool() *pgxpool.Pool {
|
||||
dbURI := fmt.Sprintf("postgres://%s:%s@%s:%d/fedilogue", settings.Database.Username, settings.Database.Password, settings.Database.Host, settings.Database.Port)
|
||||
pool, err := pgxpool.Connect(context.Background(), dbURI)
|
||||
if err != nil {
|
||||
log.Fatal("Unable to connect to database:", err)
|
||||
logFatal.Fatal("Unable to connect to database:", err)
|
||||
}
|
||||
return pool
|
||||
}
|
||||
|
11
fedilogue.go
11
fedilogue.go
@ -3,7 +3,6 @@ package main
|
||||
import (
|
||||
"github.com/jackc/pgx/pgxpool"
|
||||
"github.com/microcosm-cc/bluemonday"
|
||||
"log"
|
||||
"net/http"
|
||||
_ "net/http/pprof"
|
||||
"regexp"
|
||||
@ -17,12 +16,13 @@ var ri_mutex = &sync.Mutex{}
|
||||
var pool *pgxpool.Pool
|
||||
|
||||
func startpprof() {
|
||||
log.Print("Starting http/pprof on :7777")
|
||||
log.Fatal(http.ListenAndServe("127.0.0.1:7777", nil))
|
||||
logInfo.Print("Starting http/pprof on :7777")
|
||||
logFatal.Fatal(http.ListenAndServe("127.0.0.1:7777", nil))
|
||||
}
|
||||
|
||||
func main() {
|
||||
// Initial Setup
|
||||
logInit()
|
||||
runninginstances = make(map[string]RunningInstance)
|
||||
|
||||
getSettings()
|
||||
@ -31,12 +31,11 @@ func main() {
|
||||
pool = getDbPool()
|
||||
|
||||
p = bluemonday.NewPolicy()
|
||||
spaceReg = regexp.MustCompile(`\s+`)
|
||||
// re = regexp.MustCompile("^https?://(.*)/.*$")
|
||||
spaceReg = regexp.MustCompile(`[\s\t\.]+`)
|
||||
re = regexp.MustCompile("^https?://([^/]*)/(.*)$")
|
||||
|
||||
for _, endpoint := range settings.Autostart {
|
||||
log.Print("Autostarting " + endpoint)
|
||||
logInfo.Print("Autostarting " + endpoint)
|
||||
ri_mutex.Lock()
|
||||
_, exists := runninginstances[endpoint]
|
||||
if exists == false {
|
||||
|
62
instance.go
62
instance.go
@ -4,7 +4,6 @@ import (
|
||||
"encoding/json"
|
||||
"github.com/microcosm-cc/bluemonday"
|
||||
"io/ioutil"
|
||||
"log"
|
||||
"net/http"
|
||||
"regexp"
|
||||
"strings"
|
||||
@ -16,17 +15,40 @@ var p *bluemonday.Policy
|
||||
var spaceReg *regexp.Regexp
|
||||
var re *regexp.Regexp
|
||||
|
||||
func DoTries(o *RunningInstance, req *http.Request) (*http.Response, error) {
|
||||
var resp *http.Response
|
||||
var err error
|
||||
|
||||
for tries := 0; tries < 10; tries++ {
|
||||
resp, err = o.client.Do(req)
|
||||
if err != nil {
|
||||
// URL.Scheme, Host, Path Opaque
|
||||
logWarn.Print("Failure connecting to " + req.URL.Scheme + "://" + req.URL.Host + req.URL.Path + ", attempt ", tries + 1, ", sleeping for 5 minutes: ", err)
|
||||
time.Sleep(time.Minute * 5)
|
||||
continue
|
||||
}
|
||||
break
|
||||
}
|
||||
return resp, err
|
||||
}
|
||||
|
||||
func GetRunner(endpoint string) RunningInstance {
|
||||
ri_mutex.Lock()
|
||||
o, exists := runninginstances[endpoint]
|
||||
if exists == false {
|
||||
o := RunningInstance{}
|
||||
|
||||
//tr := &http.Transport{MaxIdleConns: 10, IdleConnTimeout: 7200 * time.Second}
|
||||
tr := &http.Transport{MaxIdleConns: 10, IdleConnTimeout: 7200 * time.Second}
|
||||
tr := &http.Transport{
|
||||
MaxIdleConns: 10,
|
||||
IdleConnTimeout: 7200 * time.Second,
|
||||
DialContext: (&net.Dialer{
|
||||
Timeout: 30 * time.Second,
|
||||
KeepAlive: 30 * time.Second,
|
||||
DualStack: true,
|
||||
}).DialContext,
|
||||
}
|
||||
|
||||
o.client = http.Client{Transport: tr}
|
||||
//o.client = http.Client{}
|
||||
o.Status = KEEPALIVE
|
||||
runninginstances[endpoint] = o
|
||||
}
|
||||
@ -72,11 +94,11 @@ func GetNodeInfo(endpoint string, o RunningInstance) RunningInstance {
|
||||
// Check the front page
|
||||
index_uri := "https://" + endpoint + "/"
|
||||
req, _ = http.NewRequest("GET", index_uri, nil)
|
||||
resp_index, err := o.client.Do(req)
|
||||
resp_index, err := DoTries(&o, req)
|
||||
o.LastRun = time.Now().Format(time.RFC3339)
|
||||
if err != nil {
|
||||
o.Status = UNSUPPORTED_INSTANCE
|
||||
log.Print("Unable to connect to " + endpoint + ", giving up")
|
||||
logWarn.Print("Unable to connect to " + endpoint + ", giving up")
|
||||
return o
|
||||
}
|
||||
defer resp_index.Body.Close()
|
||||
@ -84,7 +106,7 @@ func GetNodeInfo(endpoint string, o RunningInstance) RunningInstance {
|
||||
indexbin, err := ioutil.ReadAll(resp_index.Body)
|
||||
if err != nil {
|
||||
o.Status = UNSUPPORTED_INSTANCE
|
||||
log.Print("Unable to read index of " + endpoint + ", giving up")
|
||||
logWarn.Print("Unable to read index of " + endpoint + ", giving up")
|
||||
return o
|
||||
}
|
||||
indexstr := string(indexbin)
|
||||
@ -105,10 +127,26 @@ func GetNodeInfo(endpoint string, o RunningInstance) RunningInstance {
|
||||
|
||||
func CheckInstance(newinstance string, callerEndpoint string) {
|
||||
if settings.Crawl == true && stringexists(newinstance, settings.Banned) == false {
|
||||
_, err := net.LookupHost(newinstance)
|
||||
if err != nil { // Bad hostname
|
||||
// Skip over this if its the same as the endpoint or empty
|
||||
if newinstance == callerEndpoint || newinstance == "" {
|
||||
return
|
||||
}
|
||||
|
||||
var err error
|
||||
for attempt := 0; attempt > 5; attempt = attempt + 1 {
|
||||
_, err = net.LookupHost(newinstance)
|
||||
if err != nil {
|
||||
logDebug.Print("Unable to resolve " + newinstance + " attempt ", attempt, "/5. Sleeping for 30 seconds")
|
||||
time.Sleep(time.Second * 30)
|
||||
continue
|
||||
}
|
||||
break
|
||||
}
|
||||
if err != nil {
|
||||
logWarn.Print("Unable to resolve ", newinstance, " after 5 attempts, giving up: ", err)
|
||||
return
|
||||
}
|
||||
|
||||
// Skip over this if its the same as the endpoint
|
||||
if newinstance == callerEndpoint {
|
||||
return
|
||||
@ -138,14 +176,16 @@ func StartInstance(endpoint string) {
|
||||
}
|
||||
|
||||
if o.Software == "pleroma" {
|
||||
log.Print("Starting " + endpoint + " as " + o.Software + " " + o.Version)
|
||||
logInfo.Print("Starting " + endpoint + " as " + o.Software + " " + o.Version)
|
||||
o.CaptureType = "Poll"
|
||||
UpdateRunner(endpoint, o)
|
||||
PollMastodonPleroma(endpoint, &o)
|
||||
} else if o.Software == "mastodon" {
|
||||
log.Print("Starting " + endpoint + " as " + o.Software + " " + o.Version)
|
||||
logInfo.Print("Starting " + endpoint + " as " + o.Software + " " + o.Version)
|
||||
o.CaptureType = "Stream"
|
||||
UpdateRunner(endpoint, o)
|
||||
StreamMastodon(endpoint, &o)
|
||||
} else {
|
||||
logWarn.Print("Unsupported endpoint " + endpoint)
|
||||
}
|
||||
}
|
||||
|
42
oauth.go
42
oauth.go
@ -6,7 +6,6 @@ import (
|
||||
"encoding/json"
|
||||
"io"
|
||||
"io/ioutil"
|
||||
"log"
|
||||
"net/http"
|
||||
"os"
|
||||
)
|
||||
@ -38,12 +37,13 @@ func register_client(endpoint string, o *RunningInstance) error {
|
||||
|
||||
resp, err := o.client.Post(api_base_apps, "application/json", requestBodybytes)
|
||||
if err != nil {
|
||||
log.Fatal("Unable to connect to "+api_base_apps+" ", err)
|
||||
logErr.Print("Unable to connect to "+api_base_apps+" ", err)
|
||||
return err
|
||||
}
|
||||
|
||||
body, err := ioutil.ReadAll(resp.Body)
|
||||
if err != nil {
|
||||
log.Fatal("Unable to read HTTP response: ", err)
|
||||
logErr.Print("Unable to read HTTP response: ", err)
|
||||
o.client_id = ""
|
||||
o.client_secret = ""
|
||||
return err
|
||||
@ -53,7 +53,7 @@ func register_client(endpoint string, o *RunningInstance) error {
|
||||
bodymap := make(map[string]string)
|
||||
err = json.Unmarshal(body, &bodymap)
|
||||
if err != nil {
|
||||
log.Fatal("Unable to Unmarshal response: ", err)
|
||||
logErr.Print("Unable to parse response from "+endpoint+": ", err)
|
||||
o.client_id = ""
|
||||
o.client_secret = ""
|
||||
return err
|
||||
@ -63,24 +63,23 @@ func register_client(endpoint string, o *RunningInstance) error {
|
||||
|
||||
f, err := os.Create("clients/" + endpoint)
|
||||
if err != nil {
|
||||
log.Fatal("Unable to create "+client_file+": ", err)
|
||||
logErr.Print("Unable to create "+client_file+": ", err)
|
||||
o.client_id = ""
|
||||
o.client_secret = ""
|
||||
return err
|
||||
//return bodymap["client_id"], bodymap["client_secret"], nil
|
||||
}
|
||||
defer f.Close()
|
||||
|
||||
_, err = io.WriteString(f, bodymap["client_id"]+"\n")
|
||||
if err != nil {
|
||||
log.Fatal("Unable to write client_id line: ", err)
|
||||
logErr.Print("Unable to write client_id line to file "+client_file+": ", err)
|
||||
o.client_id = bodymap["client_id"]
|
||||
o.client_secret = bodymap["client_secret"]
|
||||
return nil
|
||||
}
|
||||
_, err = io.WriteString(f, bodymap["client_secret"]+"\n")
|
||||
if err != nil {
|
||||
log.Fatal("Unable to write client_secret line: ", err)
|
||||
logErr.Print("Unable to write client_secret to file "+client_file+": ", err)
|
||||
o.client_id = bodymap["client_id"]
|
||||
o.client_secret = bodymap["client_secret"]
|
||||
return nil
|
||||
@ -98,9 +97,8 @@ func get_client(endpoint string, o *RunningInstance) error {
|
||||
if os.IsNotExist(err) == false { // The file exists
|
||||
f, err := os.Open(client_file)
|
||||
if err != nil {
|
||||
log.Print("Unable to open " + client_file + ", creating new client")
|
||||
logErr.Print("Unable to open " + client_file + ", creating new client")
|
||||
return err
|
||||
// return register_client(endpoint, o)
|
||||
}
|
||||
defer f.Close()
|
||||
|
||||
@ -109,17 +107,14 @@ func get_client(endpoint string, o *RunningInstance) error {
|
||||
client_id_bin, _, err := rd.ReadLine()
|
||||
o.client_id = string(client_id_bin)
|
||||
if err != nil {
|
||||
log.Print("Unable to read client_id line of " + client_file + ", building new client")
|
||||
logErr.Print("Unable to read client_id line of " + client_file + ", building new client")
|
||||
return err
|
||||
// return register_client(endpoint, o)
|
||||
}
|
||||
client_secret_bin, _, err := rd.ReadLine()
|
||||
o.client_secret = string(client_secret_bin)
|
||||
if err != nil {
|
||||
log.Print("Unable to read client_secret line of " + client_file + ", building new client")
|
||||
logErr.Print("Unable to read client_secret line of " + client_file + ", building new client")
|
||||
return err
|
||||
// return register_client(endpoint, o)
|
||||
// return o
|
||||
}
|
||||
|
||||
return nil
|
||||
@ -143,7 +138,7 @@ func oauth_login(endpoint string, o *RunningInstance, username string, password
|
||||
})
|
||||
|
||||
if err != nil {
|
||||
log.Print("Unable to create Authentication map")
|
||||
logErr.Print("Unable to create Authentication map for "+endpoint)
|
||||
return OAuth{}, err
|
||||
}
|
||||
|
||||
@ -151,30 +146,29 @@ func oauth_login(endpoint string, o *RunningInstance, username string, password
|
||||
|
||||
resp, err := http.Post("https://"+endpoint+"/oauth/token", "application/json", authMapbytes)
|
||||
if err != nil {
|
||||
log.Print("Cannot connect to "+endpoint+": ", err)
|
||||
logErr.Print("Cannot connect to "+endpoint+": ", err)
|
||||
return OAuth{}, err
|
||||
}
|
||||
|
||||
body, err := ioutil.ReadAll(resp.Body)
|
||||
if err != nil {
|
||||
log.Print("Unable to read response data: ", err)
|
||||
logErr.Print("Unable to read response data for "+endpoint+": ", err)
|
||||
return OAuth{}, err
|
||||
}
|
||||
|
||||
if resp.StatusCode == 400 {
|
||||
log.Print("Unable to authenticate")
|
||||
logErr.Print("Unable to authenticate to " + endpoint)
|
||||
return OAuth{}, &authError{"Authentication error"}
|
||||
}
|
||||
|
||||
oauthData := OAuth{}
|
||||
err = json.Unmarshal(body, &oauthData)
|
||||
if err != nil {
|
||||
log.Print("Unable to Unmarshal json data: ", err)
|
||||
logErr.Print("Unable to parse json data for "+endpoint+": ", err)
|
||||
return OAuth{}, err
|
||||
}
|
||||
|
||||
return oauthData, nil
|
||||
|
||||
}
|
||||
|
||||
func oauth_refresh(endpoint string, client_id string, client_secret string, refresh_token string) (OAuth, error) {
|
||||
@ -191,20 +185,20 @@ func oauth_refresh(endpoint string, client_id string, client_secret string, refr
|
||||
|
||||
resp, err := http.Post("https://"+endpoint+"/oauth/token", "application/json", authMapbytes)
|
||||
if err != nil {
|
||||
log.Print("Cannot connect to "+endpoint+": ", err)
|
||||
logErr.Print("Unable to connect to "+endpoint+": ", err)
|
||||
return OAuth{}, err
|
||||
}
|
||||
|
||||
body, err := ioutil.ReadAll(resp.Body)
|
||||
if err != nil {
|
||||
log.Print("Unable to read response data: ", err)
|
||||
logErr.Print("Unable to read response data for "+endpoint+": ", err)
|
||||
return OAuth{}, err
|
||||
}
|
||||
|
||||
oauthData := OAuth{}
|
||||
err = json.Unmarshal(body, &oauthData)
|
||||
if err != nil {
|
||||
log.Print("Unable to Unmarshal json data: ", err)
|
||||
logErr.Print("Unable to parse json data for "+endpoint+": ", err)
|
||||
return oauthData, err
|
||||
}
|
||||
|
||||
|
26
poll.go
26
poll.go
@ -3,7 +3,6 @@ package main
|
||||
import (
|
||||
"encoding/json"
|
||||
"io/ioutil"
|
||||
"log"
|
||||
"net/http"
|
||||
"time"
|
||||
)
|
||||
@ -53,7 +52,6 @@ func PollMastodonPleroma(endpoint string, o *RunningInstance) {
|
||||
min_id := ""
|
||||
|
||||
parsing_error := 0
|
||||
unprocess_error := 0
|
||||
use_auth := false
|
||||
|
||||
var last_refresh int64
|
||||
@ -68,13 +66,13 @@ func PollMastodonPleroma(endpoint string, o *RunningInstance) {
|
||||
o := GetRunner(endpoint)
|
||||
err = get_client(endpoint, &o)
|
||||
if err != nil {
|
||||
log.Print("Unable to register client: ", err)
|
||||
logErr.Print("Unable to register client for " + endpoint + ": ", err)
|
||||
return
|
||||
}
|
||||
|
||||
oauthData, err = oauth_login(endpoint, &o, extaccount.Username, extaccount.Password)
|
||||
if err != nil {
|
||||
log.Print("Unable to login: ", err)
|
||||
logErr.Print("Unable to login to " + endpoint + ": ", err)
|
||||
return
|
||||
}
|
||||
last_refresh = time.Now().Unix()
|
||||
@ -90,7 +88,7 @@ func PollMastodonPleroma(endpoint string, o *RunningInstance) {
|
||||
api_timeline := "https://" + endpoint + "/api/v1/timelines/public?limit=40&since_id=" + min_id
|
||||
req, err := http.NewRequest("GET", api_timeline, nil)
|
||||
if err != nil {
|
||||
log.Print("Unable to create new request")
|
||||
logFatal.Fatal("Unable to create new request for " + endpoint + ": ", err)
|
||||
return
|
||||
}
|
||||
|
||||
@ -98,7 +96,7 @@ func PollMastodonPleroma(endpoint string, o *RunningInstance) {
|
||||
if time.Now().Unix() > last_refresh+oauthData.Expires_in {
|
||||
oauthData, err = oauth_refresh(endpoint, client_id, client_secret, oauthData.Refresh_token)
|
||||
if err != nil {
|
||||
log.Print("Unable to refresh: ", err)
|
||||
logWarn.Fatal("Unable to refresh oauth token for " + endpoint + ": ", err)
|
||||
return
|
||||
}
|
||||
last_refresh = time.Now().Unix()
|
||||
@ -107,31 +105,28 @@ func PollMastodonPleroma(endpoint string, o *RunningInstance) {
|
||||
}
|
||||
|
||||
m.LastRun = time.Now().Format(time.RFC3339)
|
||||
resp, err := o.client.Do(req)
|
||||
resp, err := DoTries(o, req)
|
||||
if err != nil {
|
||||
m.Status = CLIENT_ISSUE
|
||||
ri_mutex.Lock()
|
||||
runninginstances[endpoint] = m
|
||||
ri_mutex.Unlock()
|
||||
logWarn.Print("Giving up on " + endpoint + ": ", err.Error())
|
||||
return
|
||||
}
|
||||
|
||||
if resp.StatusCode == TOOMANYREQUESTS { // Short Delay, 30 seconds
|
||||
log.Print("Delaying "+endpoint+", gave status ", resp.StatusCode, ", 1 hour delay")
|
||||
logWarn.Print("Delaying "+endpoint+", gave status ", resp.StatusCode, ", 1 hour delay")
|
||||
_, _ = ioutil.ReadAll(resp.Body)
|
||||
resp.Body.Close() // Release as soon as done
|
||||
m.Status = resp.StatusCode
|
||||
ri_mutex.Lock()
|
||||
runninginstances[endpoint] = m
|
||||
ri_mutex.Unlock()
|
||||
if unprocess_error > 5 {
|
||||
log.Print("Exiting for " + endpoint)
|
||||
}
|
||||
unprocess_error = unprocess_error + 1
|
||||
time.Sleep(time.Second * 30)
|
||||
continue
|
||||
} else if resp.StatusCode == INTERNAL_ERROR { // Longer delay, 1 hour
|
||||
log.Print("Suspending "+endpoint+", gave status ", resp.StatusCode, ", 1 hour delay")
|
||||
logWarn.Print("Suspending " + endpoint + ", gave status ", resp.StatusCode, ", 1 hour delay")
|
||||
_, _ = ioutil.ReadAll(resp.Body)
|
||||
resp.Body.Close() // Release as soon as done
|
||||
m.Status = 765
|
||||
@ -141,7 +136,7 @@ func PollMastodonPleroma(endpoint string, o *RunningInstance) {
|
||||
time.Sleep(time.Second * 3600)
|
||||
continue
|
||||
} else if resp.StatusCode != 200 { // Crash
|
||||
log.Print("Terminating "+endpoint+", gave status ", resp.StatusCode)
|
||||
logErr.Print("Terminating " + endpoint + ", gave status ", resp.StatusCode)
|
||||
_, _ = ioutil.ReadAll(resp.Body)
|
||||
resp.Body.Close() // Release as soon as done
|
||||
m.Status = resp.StatusCode
|
||||
@ -158,7 +153,7 @@ func PollMastodonPleroma(endpoint string, o *RunningInstance) {
|
||||
ri_mutex.Lock()
|
||||
runninginstances[endpoint] = m
|
||||
ri_mutex.Unlock()
|
||||
log.Print("Giving up on " + endpoint)
|
||||
logErr.Print("Giving up on " + endpoint + " after 5 unmarshal errors.")
|
||||
return
|
||||
}
|
||||
parsing_error = parsing_error + 1
|
||||
@ -182,7 +177,6 @@ func PollMastodonPleroma(endpoint string, o *RunningInstance) {
|
||||
min_id = newpost.Id
|
||||
}
|
||||
|
||||
// Only done if we are crawling
|
||||
go CheckInstance(newinstance, endpoint)
|
||||
}
|
||||
}
|
||||
|
23
retrieve.go
23
retrieve.go
@ -6,7 +6,6 @@ import (
|
||||
"errors"
|
||||
"html"
|
||||
"io/ioutil"
|
||||
"log"
|
||||
"io"
|
||||
"net/http"
|
||||
"strings"
|
||||
@ -139,7 +138,7 @@ func check_post(uri string) (PostJson, error) {
|
||||
}
|
||||
|
||||
postjson.normalized = html.UnescapeString(strings.ToLower(p.Sanitize(postjson.Content)))
|
||||
spaceReg.ReplaceAllString(postjson.normalized, " ")
|
||||
postjson.normalized = spaceReg.ReplaceAllString(postjson.normalized, " ")
|
||||
|
||||
_, err = pool.Exec(context.Background(), "INSERT INTO posts (id, inreplyto, published, summary, content, normalized, attributedto, posthash, instance) VALUES($1, $2, $3, $4, $5, $6, $7, $8, $9)", postjson.ID, postjson.InReplyTo, postjson.Published, postjson.Summary, postjson.Content, postjson.normalized, postjson.AttributedTo, postjson.posthash, postjson.instance)
|
||||
if err != nil {
|
||||
@ -182,15 +181,28 @@ func check_user(uri string) (UserJson, error) {
|
||||
req, _ := http.NewRequest("GET", uri, nil)
|
||||
req.Header.Add("Accept", "application/ld+json")
|
||||
|
||||
resp, err := o.client.Do(req)
|
||||
var resp *http.Response
|
||||
tries := 0
|
||||
for {
|
||||
resp, err = o.client.Do(req)
|
||||
if err != nil {
|
||||
log.Print("Retrieval error: ", err)
|
||||
if tries > 10 {
|
||||
logErr.Print("Unable to connect to "+uri+" attempt 10/10, giving up.")
|
||||
return userjson, err
|
||||
}
|
||||
logWarn.Print("Unable to connect to "+uri+", attempt ",tries+1,"+/10 sleeping for 30 seconds.")
|
||||
time.Sleep(time.Second * 30)
|
||||
tries = tries + 1
|
||||
continue
|
||||
}
|
||||
break
|
||||
}
|
||||
|
||||
err = json.NewDecoder(resp.Body).Decode(&userjson)
|
||||
if err != nil {
|
||||
log.Print("Retrieval error: ", err)
|
||||
// Going forward, this might need to be double-checked, but for now just die
|
||||
// log.Fatal("Retrieval error 2: ", err)
|
||||
tries = tries + 1
|
||||
return userjson, err
|
||||
}
|
||||
|
||||
@ -200,7 +212,6 @@ func check_user(uri string) (UserJson, error) {
|
||||
|
||||
_, err = pool.Exec(context.Background(), "INSERT INTO accounts (id, actor_type, inbox, outbox, followers, following, url, preferredUsername, name, summary, icon, image, publicKey, instance) VALUES($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14)", userjson.ID, userjson.Type, userjson.Inbox, userjson.Outbox, userjson.Followers, userjson.Following, userjson.Url, userjson.PreferredUsername, userjson.Name, userjson.Summary, userjson.Icon.Url, userjson.Image.Url, userjson.PublicKey.PublicKeyPem, userjson.instance)
|
||||
if err != nil {
|
||||
// log.Print("INSERT accounts error: ", err)
|
||||
return userjson, err
|
||||
}
|
||||
|
||||
|
36
stream.go
36
stream.go
@ -3,14 +3,24 @@ package main
|
||||
import (
|
||||
"bufio"
|
||||
"encoding/json"
|
||||
"log"
|
||||
"net/http"
|
||||
"strings"
|
||||
"time"
|
||||
"net"
|
||||
)
|
||||
|
||||
func StreamMastodon(endpoint string, o *RunningInstance) {
|
||||
stream_client := http.Client{}
|
||||
tr := &http.Transport{
|
||||
MaxIdleConns: 10,
|
||||
IdleConnTimeout: 7200 * time.Second,
|
||||
DialContext: (&net.Dialer{
|
||||
Timeout: 30 * time.Second,
|
||||
KeepAlive: 30 * time.Second,
|
||||
DualStack: true,
|
||||
}).DialContext,
|
||||
}
|
||||
|
||||
stream_client := http.Client{Transport: tr}
|
||||
|
||||
var oauthData OAuth
|
||||
var retry bool
|
||||
@ -20,7 +30,7 @@ func StreamMastodon(endpoint string, o *RunningInstance) {
|
||||
api_timeline := "https://" + endpoint + "/api/v1/streaming/public"
|
||||
req, err := http.NewRequest("GET", api_timeline, nil)
|
||||
if err != nil {
|
||||
log.Print("Unable to create new request")
|
||||
logFatal.Fatal("Unable to create new request for " + endpoint + ", exiting.")
|
||||
return
|
||||
}
|
||||
|
||||
@ -30,12 +40,12 @@ func StreamMastodon(endpoint string, o *RunningInstance) {
|
||||
|
||||
err = get_client(endpoint, o)
|
||||
if err != nil {
|
||||
log.Fatal("Unable to register client: ", err)
|
||||
logWarn.Print("Unable to register client: ", err)
|
||||
}
|
||||
|
||||
oauthData, err = oauth_login(endpoint, o, extaccount.Username, extaccount.Password)
|
||||
if err != nil {
|
||||
log.Print("Unable to login: ", err)
|
||||
logWarn.Print("Unable to login: ", err)
|
||||
return
|
||||
}
|
||||
|
||||
@ -44,9 +54,19 @@ func StreamMastodon(endpoint string, o *RunningInstance) {
|
||||
}
|
||||
}
|
||||
|
||||
resp, err := stream_client.Do(req)
|
||||
var resp *http.Response
|
||||
|
||||
for tries := 0; tries < 10; tries++ {
|
||||
resp, err = stream_client.Do(req)
|
||||
if err != nil {
|
||||
log.Print("Unable to stream "+api_timeline+": ", err)
|
||||
time.Sleep(time.Minute * 5)
|
||||
logWarn.Print("Failure connecting to " + req.URL.Scheme + "://" + req.URL.Host + req.URL.Path + ", attempt ", tries + 1, ", sleeping for 5 minutes, ", err)
|
||||
continue
|
||||
}
|
||||
break
|
||||
}
|
||||
if err != nil {
|
||||
logErr.Print("Unable to stream " + api_timeline + ": ", err)
|
||||
return
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
@ -78,8 +98,8 @@ func StreamMastodon(endpoint string, o *RunningInstance) {
|
||||
jsondata := token[1][1:]
|
||||
err := json.Unmarshal([]byte(jsondata), &newpost)
|
||||
if err != nil {
|
||||
logDebug.Print("Unable to parse data from "+endpoint+", but still connected.")
|
||||
continue
|
||||
log.Fatal("Unable to unmarshal with error: ", err)
|
||||
}
|
||||
retry = true
|
||||
default:
|
||||
|
Loading…
x
Reference in New Issue
Block a user