package main import ( "context" "encoding/json" "net" "net/http" "time" "git.farhan.codes/farhan/fedilogue/shared" ) var staggeredStartChan chan bool func DoTries(o *shared.RunningInstance, req *http.Request) (*http.Response, error) { var resp *http.Response var err error for tries := 0; tries < 10; tries++ { resp, err = o.Client.Do(req) if err != nil { // URL.Scheme, Host, Path Opaque logWarn("Failure connecting to "+req.URL.Scheme+"://"+req.URL.Host+req.URL.Path+", attempt ", tries+1, ", sleeping for 5 minutes: ", err) time.Sleep(time.Minute * 5) continue } break } return resp, err } func BuildClient(endpoint string) http.Client { logDebug("BuildClient for ", endpoint) // Test: TestBuildClient, TestBuildClientProxy /* The seemingly unused 'endpoint' variable is for proxying based on endpoint, ie for Tor */ tr := &http.Transport{ MaxIdleConns: 2, IdleConnTimeout: 3600 * time.Second, DialContext: (&net.Dialer{ Timeout: 30 * time.Second, KeepAlive: 30 * time.Second, DualStack: true, }).DialContext, } client := http.Client{Transport: tr} return client } func GetRunner(endpoint string) (shared.RunningInstance, bool) { // Tests: TestGetRunnerNonExist, TestGetRunnerExists ri_mutex.Lock() o, exists := runninginstances[endpoint] if exists == false { o = shared.RunningInstance{} selectRet := pool.QueryRow(context.Background(), "SELECT banned, alwaysbot FROM instances WHERE endpoint = $1", endpoint) err := selectRet.Scan(&o.Banned, &o.Alwaysbot) if err != nil { logDebug("Did not find instance in database: ", endpoint) } if o.Banned == true { logInfo("Banned instance: ", endpoint) } else { logDebug("Building runner for: ", endpoint) o.Client = BuildClient(endpoint) o.Status = shared.KEEPALIVE o.Recentactivities = shared.NewUniqueFifo(10) o.Recentactors = shared.NewUniqueFifo(10) } runninginstances[endpoint] = o } ri_mutex.Unlock() return o, exists } func UpdateRunner(endpoint string, o shared.RunningInstance) { // Tests: None necessary ri_mutex.Lock() runninginstances[endpoint] = o ri_mutex.Unlock() } func GetInstanceInfo(endpoint string, o shared.RunningInstance) shared.RunningInstance { /* Checking order * Mastodon/Pleroma/Misskey * Um..nothing else yet */ logDebug("GetInstanceInfo for ", endpoint) var nodeinfo shared.NodeInfo pleromastodon_nodeinfo_uri := "https://" + endpoint + "/nodeinfo/2.0.json" // Checking Mastodon and Pleroma (with .json) reqjson, _ := http.NewRequest("GET", pleromastodon_nodeinfo_uri, nil) reqjson.Header.Set("User-Agent", "Tusky") pleromastodon_api_resp, err := DoTries(&o, reqjson) if err != nil { o.Software = "Unsupported" return o } else { defer pleromastodon_api_resp.Body.Close() } if pleromastodon_api_resp.StatusCode == 200 { err = json.NewDecoder(pleromastodon_api_resp.Body).Decode(&nodeinfo) if err == nil { o.Software = nodeinfo.Software.Name o.Version = nodeinfo.Software.Version o.LastRun = time.Now().Format(time.RFC3339) defer pleromastodon_api_resp.Body.Close() return o } } // Checking for Misskey (without .json) misskey_nodeinfo_uri := "https://" + endpoint + "/nodeinfo/2.0" req, _ := http.NewRequest("GET", misskey_nodeinfo_uri, nil) req.Header.Set("User-Agent", "Tusky") misskey_api_resp, err := DoTries(&o, req) if err != nil { o.Software = "Unsupported" return o } else { defer misskey_api_resp.Body.Close() } if misskey_api_resp.StatusCode == 200 { err = json.NewDecoder(misskey_api_resp.Body).Decode(&nodeinfo) if err == nil { o.Software = nodeinfo.Software.Name o.Version = nodeinfo.Software.Version o.LastRun = time.Now().Format(time.RFC3339) defer misskey_api_resp.Body.Close() return o } } // Check for Lemmy (With Json) lemmy_nodeinfo_uri := "https://" + endpoint + "/nodeinfo/2.1" req, _ = http.NewRequest("GET", lemmy_nodeinfo_uri, nil) req.Header.Set("User-Agent", "Tusky") lemmy_api_resp, err := DoTries(&o, req) if err != nil { o.Software = "Unsupported" return o } else { defer lemmy_api_resp.Body.Close() } if lemmy_api_resp.StatusCode == 200 { err = json.NewDecoder(lemmy_api_resp.Body).Decode(&nodeinfo) if err == nil { logDebug("Found a new Lemmy instance: " + endpoint) o.Software = nodeinfo.Software.Name o.Version = nodeinfo.Software.Version o.LastRun = time.Now().Format(time.RFC3339) defer lemmy_api_resp.Body.Close() return o } } // Unsupported Software o.Software = "Unsupported" o.Version = "Unknown" return o } func LogInstance(endpoint string, o shared.RunningInstance) bool { logDebug("LogInstance: ", endpoint) selectRet := pool.QueryRow(context.Background(), "SELECT FROM instances WHERE endpoint = $1", endpoint) err := selectRet.Scan() if err == nil { return true // Endpoint already in database, continuing } _, err = pool.Exec(context.Background(), "INSERT INTO instances (endpoint, state, software) VALUES($1, $2, $3)", endpoint, "", o.Software) if err != nil { logWarn("Error inserting ", endpoint+" into `instances`: ", err) return true } return false } func CheckInstance(newinstance string, callerEndpoint string) { logDebug("checkInstance: ", newinstance) if settings.Crawl == true { // Skip over this if its the same as the endpoint or empty if newinstance == callerEndpoint || newinstance == "" { return } var err error for attempt := 0; attempt > 5; attempt = attempt + 1 { _, err = net.LookupHost(newinstance) if err != nil { logDebug("Unable to resolve "+newinstance+" attempt ", attempt, "/5. Sleeping for 30 seconds") time.Sleep(time.Second * 30) continue } break } if err != nil { logWarn("Unable to resolve ", newinstance, " after 5 attempts, giving up: ", err) return } // Skip over this if its the same as the endpoint if newinstance == callerEndpoint { return } // Going forward, this might be merged into GetRunner ri_mutex.Lock() o, exists := runninginstances[newinstance] if exists == false || o.Status == shared.KEEPALIVE { m := shared.RunningInstance{} m.Client = BuildClient(newinstance) m.Recentactivities = shared.NewUniqueFifo(10) m.Recentactors = shared.NewUniqueFifo(10) runninginstances[newinstance] = m go StartInstance(newinstance) } ri_mutex.Unlock() } } func staggeredStart() { for { _: <-staggeredStartChan time.Sleep(500 * time.Millisecond) } } func StartInstance(endpoint string) { staggeredStartChan <- true logInfo("Starting " + endpoint) // Check if exists. If so, get the object. If not, create it o, _ := GetRunner(endpoint) if o.Banned == true { logInfo("Ignoring banned instance: ", endpoint) return // banned instance } o = GetInstanceInfo(endpoint, o) UpdateRunner(endpoint, o) LogInstance(endpoint, o) if o.Software == "pleroma" { logConn("Starting " + endpoint + " as " + o.Software + " " + o.Version) o.CaptureType = "Stream" UpdateRunner(endpoint, o) // PollMastodonPleroma(endpoint, &o) StreamPleroma(endpoint) } else if o.Software == "mastodon" { logConn("Starting " + endpoint + " as " + o.Software + " " + o.Version) o.CaptureType = "Stream" UpdateRunner(endpoint, o) StreamMastodon(endpoint, &o) } else if o.Software == "misskey" { logConn("Starting " + endpoint + " as " + o.Software + " " + o.Version) o.CaptureType = "Stream" UpdateRunner(endpoint, o) StreamMisskey(endpoint) } else { o.Status = 605 UpdateRunner(endpoint, o) logConn("Unsupported endpoint " + endpoint) } }