268 lines
6.7 KiB
Go
268 lines
6.7 KiB
Go
package main
|
|
|
|
import (
|
|
"context"
|
|
"encoding/json"
|
|
"net"
|
|
"net/http"
|
|
"time"
|
|
|
|
"os"
|
|
"fmt"
|
|
|
|
"git.farhan.codes/farhan/fedilogue/shared"
|
|
)
|
|
|
|
var staggeredStartChan chan bool
|
|
|
|
func DoTries(o *shared.RunningInstance, req *http.Request) (*http.Response, error) {
|
|
var resp *http.Response
|
|
var err error
|
|
|
|
for tries := 0; tries < 10; tries++ {
|
|
resp, err = o.Client.Do(req)
|
|
if err != nil {
|
|
// URL.Scheme, Host, Path Opaque
|
|
logWarn("Failure connecting to "+req.URL.Scheme+"://"+req.URL.Host+req.URL.Path+", attempt ", tries+1, ", sleeping for 5 minutes: ", err)
|
|
time.Sleep(time.Minute * 5)
|
|
continue
|
|
}
|
|
break
|
|
}
|
|
return resp, err
|
|
}
|
|
|
|
func BuildClient(endpoint string) http.Client {
|
|
logDebug("BuildClient for ", endpoint)
|
|
// Test: TestBuildClient, TestBuildClientProxy
|
|
/* The seemingly unused 'endpoint' variable is for proxying based on endpoint, ie for Tor */
|
|
tr := &http.Transport{
|
|
MaxIdleConns: 2,
|
|
IdleConnTimeout: 3600 * time.Second,
|
|
DialContext: (&net.Dialer{
|
|
Timeout: 30 * time.Second,
|
|
KeepAlive: 30 * time.Second,
|
|
DualStack: true,
|
|
}).DialContext,
|
|
}
|
|
client := http.Client{Transport: tr}
|
|
|
|
return client
|
|
}
|
|
|
|
func GetRunner(endpoint string) (shared.RunningInstance, bool) {
|
|
// Tests: TestGetRunnerNonExist, TestGetRunnerExists
|
|
ri_mutex.Lock()
|
|
o, exists := runninginstances[endpoint]
|
|
|
|
if exists == false {
|
|
o = shared.RunningInstance{}
|
|
selectRet := pool.QueryRow(context.Background(), "SELECT banned, alwaysbot FROM instances WHERE endpoint = $1", endpoint)
|
|
err := selectRet.Scan(&o.Banned, &o.Alwaysbot)
|
|
if err != nil {
|
|
logDebug("Did not find instance in database: ", endpoint)
|
|
}
|
|
if o.Banned == true {
|
|
logInfo("Banned instance: ", endpoint)
|
|
} else {
|
|
logDebug("Building runner for: ", endpoint)
|
|
o.Client = BuildClient(endpoint)
|
|
o.Status = shared.KEEPALIVE
|
|
o.Recentactivities = shared.NewUniqueFifo(10)
|
|
o.Recentactors = shared.NewUniqueFifo(10)
|
|
}
|
|
runninginstances[endpoint] = o
|
|
}
|
|
ri_mutex.Unlock()
|
|
|
|
return o, exists
|
|
}
|
|
|
|
func UpdateRunner(endpoint string, o shared.RunningInstance) {
|
|
// Tests: None necessary
|
|
ri_mutex.Lock()
|
|
runninginstances[endpoint] = o
|
|
ri_mutex.Unlock()
|
|
}
|
|
|
|
|
|
func GetInstanceInfo(endpoint string, o shared.RunningInstance) shared.RunningInstance {
|
|
logDebug("GetInstanceInfo for ", endpoint)
|
|
wellknown_nodeinfo_uri := "https://" + endpoint + "/.well-known/nodeinfo"
|
|
|
|
req, _ := http.NewRequest("GET", wellknown_nodeinfo_uri, nil)
|
|
req.Header.Set("User-Agent", "Tusky")
|
|
|
|
resp, err := DoTries(&o, req)
|
|
if err != nil {
|
|
fmt.Println("Exit 1")
|
|
o.Software = "Unsupported"
|
|
return o
|
|
} else {
|
|
defer resp.Body.Close()
|
|
}
|
|
|
|
if resp.StatusCode != 200 {
|
|
fmt.Println("Exit 2")
|
|
o.Software = "Unsupported"
|
|
return o
|
|
}
|
|
|
|
var wellknownnodeinfo shared.WellKnownNodeInfo
|
|
err = json.NewDecoder(resp.Body).Decode(&wellknownnodeinfo)
|
|
if (err != nil) {
|
|
fmt.Println("Exit 3")
|
|
o.Software = "Unsupported"
|
|
return o
|
|
}
|
|
|
|
// There can be multiple entries of /.well-known/nodeinfo entries, we want the last
|
|
length := len(wellknownnodeinfo.Links)
|
|
if length == 0 {
|
|
fmt.Println("Exit 4")
|
|
o.Software = "Unsupported"
|
|
return o
|
|
}
|
|
|
|
actual_nodeinfo_uri := wellknownnodeinfo.Links[length-1].Href
|
|
|
|
req, _ = http.NewRequest("GET", actual_nodeinfo_uri, nil)
|
|
req.Header.Set("User-Agent", "Tusky")
|
|
|
|
respActual, err := DoTries(&o, req)
|
|
if err != nil {
|
|
fmt.Println("Exit 5")
|
|
o.Software = "Unsupported"
|
|
return o
|
|
} else {
|
|
defer respActual.Body.Close()
|
|
}
|
|
|
|
if respActual.StatusCode != 200 {
|
|
fmt.Println("Exit 6")
|
|
o.Software = "Unsupported"
|
|
return o
|
|
}
|
|
|
|
var actualnodeinfo shared.ActualNodeInfo
|
|
err = json.NewDecoder(respActual.Body).Decode(&actualnodeinfo)
|
|
if (err != nil) {
|
|
fmt.Println("Exit 7")
|
|
fmt.Println(err)
|
|
o.Software = "Unsupported"
|
|
return o
|
|
}
|
|
|
|
fmt.Println(actualnodeinfo)
|
|
fmt.Println("Final exit")
|
|
os.Exit(1)
|
|
|
|
return o
|
|
}
|
|
|
|
func LogInstance(endpoint string, o shared.RunningInstance) bool {
|
|
logDebug("LogInstance: ", endpoint)
|
|
selectRet := pool.QueryRow(context.Background(), "SELECT FROM instances WHERE endpoint = $1", endpoint)
|
|
err := selectRet.Scan()
|
|
if err == nil {
|
|
return true // Endpoint already in database, continuing
|
|
}
|
|
|
|
_, err = pool.Exec(context.Background(), "INSERT INTO instances (endpoint, state, software) VALUES($1, $2, $3)", endpoint, "", o.Software)
|
|
if err != nil {
|
|
logWarn("Error inserting ", endpoint+" into `instances`: ", err)
|
|
return true
|
|
}
|
|
|
|
return false
|
|
}
|
|
|
|
func CheckInstance(newinstance string, callerEndpoint string) {
|
|
logDebug("checkInstance: ", newinstance)
|
|
if settings.Crawl == true {
|
|
// Skip over this if its the same as the endpoint or empty
|
|
if newinstance == callerEndpoint || newinstance == "" {
|
|
return
|
|
}
|
|
|
|
var err error
|
|
for attempt := 0; attempt > 5; attempt = attempt + 1 {
|
|
_, err = net.LookupHost(newinstance)
|
|
if err != nil {
|
|
logDebug("Unable to resolve "+newinstance+" attempt ", attempt, "/5. Sleeping for 30 seconds")
|
|
time.Sleep(time.Second * 30)
|
|
continue
|
|
}
|
|
break
|
|
}
|
|
if err != nil {
|
|
logWarn("Unable to resolve ", newinstance, " after 5 attempts, giving up: ", err)
|
|
return
|
|
}
|
|
|
|
// Skip over this if its the same as the endpoint
|
|
if newinstance == callerEndpoint {
|
|
return
|
|
}
|
|
|
|
// Going forward, this might be merged into GetRunner
|
|
ri_mutex.Lock()
|
|
o, exists := runninginstances[newinstance]
|
|
if exists == false || o.Status == shared.KEEPALIVE {
|
|
m := shared.RunningInstance{}
|
|
m.Client = BuildClient(newinstance)
|
|
m.Recentactivities = shared.NewUniqueFifo(10)
|
|
m.Recentactors = shared.NewUniqueFifo(10)
|
|
runninginstances[newinstance] = m
|
|
go StartInstance(newinstance)
|
|
}
|
|
ri_mutex.Unlock()
|
|
}
|
|
}
|
|
|
|
func staggeredStart() {
|
|
for {
|
|
_:
|
|
<-staggeredStartChan
|
|
time.Sleep(500 * time.Millisecond)
|
|
}
|
|
}
|
|
|
|
func StartInstance(endpoint string) {
|
|
staggeredStartChan <- true
|
|
logInfo("Starting " + endpoint)
|
|
|
|
// Check if exists. If so, get the object. If not, create it
|
|
o, _ := GetRunner(endpoint)
|
|
if o.Banned == true {
|
|
logInfo("Ignoring banned instance: ", endpoint)
|
|
return // banned instance
|
|
}
|
|
|
|
o = GetInstanceInfo(endpoint, o)
|
|
UpdateRunner(endpoint, o)
|
|
LogInstance(endpoint, o)
|
|
|
|
if o.Software == "pleroma" {
|
|
logConn("Starting " + endpoint + " as " + o.Software + " " + o.Version)
|
|
o.CaptureType = "Stream"
|
|
UpdateRunner(endpoint, o)
|
|
// PollMastodonPleroma(endpoint, &o)
|
|
StreamPleroma(endpoint)
|
|
} else if o.Software == "mastodon" {
|
|
logConn("Starting " + endpoint + " as " + o.Software + " " + o.Version)
|
|
o.CaptureType = "Stream"
|
|
UpdateRunner(endpoint, o)
|
|
StreamMastodon(endpoint, &o)
|
|
} else if o.Software == "misskey" {
|
|
logConn("Starting " + endpoint + " as " + o.Software + " " + o.Version)
|
|
o.CaptureType = "Stream"
|
|
UpdateRunner(endpoint, o)
|
|
StreamMisskey(endpoint)
|
|
} else {
|
|
o.Status = 605
|
|
UpdateRunner(endpoint, o)
|
|
logConn("Unsupported endpoint " + endpoint)
|
|
}
|
|
}
|