From 0a175c4e428dfc334a8ab9fdedb57983d9c993c8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Fikr=C4=81n=20Mutas=C4=81=27il?= <496890-khanzf@users.noreply.gitlab.com> Date: Sat, 1 Jan 2022 02:26:39 +0000 Subject: [PATCH] Wip cacheupdate --- fedilogue/config.go | 5 +- fedilogue/config.jsonc.sample | 15 ----- fedilogue/ctl.go | 7 ++- fedilogue/fedilogue.go | 16 +++-- fedilogue/fedilogue_test.go | 17 +++--- fedilogue/headers.go | 101 ------------------------------- fedilogue/instance.go | 72 +++++++++++++--------- fedilogue/instance_test.go | 46 +++++--------- fedilogue/oauth.go | 48 ++++++++------- fedilogue/poll.go | 33 +++++----- fedilogue/retrieve.go | 111 +++++++++++++++++----------------- fedilogue/retrieve_test.go | 30 +-------- fedilogue/stream.go | 15 ++--- fedilogue/tables.sql | 3 +- shared/headers.go | 23 +++---- shared/uniquefifo.go | 65 +++++++++++--------- 16 files changed, 244 insertions(+), 363 deletions(-) delete mode 100644 fedilogue/headers.go diff --git a/fedilogue/config.go b/fedilogue/config.go index 9556873..fb203df 100644 --- a/fedilogue/config.go +++ b/fedilogue/config.go @@ -3,6 +3,7 @@ package main import ( "encoding/json" "io/ioutil" + "muzzammil.xyz/jsonc" ) @@ -34,12 +35,10 @@ type Proxy struct { // Settings - Configuration file structure type Settings struct { Crawl bool `"json:crawl"` - Banned []string `"json:banned"` - Alwaysbot []string `"json:alwaysbot"` Proxies []Proxy `"json:proxies"` Externalaccounts []ExtAccount `"json:externalaccounts"` MassFollowers []MassFollower `"json:massfollowers"` - LogLevel int `"json:loglevel"` + LogLevel int `"json:loglevel"` } var settings Settings diff --git a/fedilogue/config.jsonc.sample b/fedilogue/config.jsonc.sample index d83520f..388a48c 100644 --- a/fedilogue/config.jsonc.sample +++ b/fedilogue/config.jsonc.sample @@ -13,21 +13,6 @@ */ "crawlonion": false, - // Ignore the following instances - "banned": [ - "switter.at", - "xxxtumblr.org", - "sinblr.com", - "twitiverse.com" - ], - - // Consider all posts from these instances to be bots - "alwaysbot": [ - "mstdn.foxfam.club", - "botsin.space", - "newsbots.eu" - ], - // Connect through the following proxies "proxies": [ // { diff --git a/fedilogue/ctl.go b/fedilogue/ctl.go index a1a3475..68026cd 100644 --- a/fedilogue/ctl.go +++ b/fedilogue/ctl.go @@ -5,7 +5,8 @@ import ( "encoding/json" "io" "net" - // "fmt" + + "gitlab.com/khanzf/fedilogue/shared" ) func startctl() { @@ -38,8 +39,8 @@ func startctl() { func handleClient(commandClient net.Conn) { defer commandClient.Close() sizebyte := make([]byte, 4) - var commandmap CommandMap - var responseback ResponseBack + var commandmap shared.CommandMap + var responseback shared.ResponseBack n, err := io.ReadFull(commandClient, sizebyte) if err != nil { logFatal.Fatal("Read error: ", err) diff --git a/fedilogue/fedilogue.go b/fedilogue/fedilogue.go index 702797b..b393427 100644 --- a/fedilogue/fedilogue.go +++ b/fedilogue/fedilogue.go @@ -1,19 +1,20 @@ package main import ( + "context" "net/http" _ "net/http/pprof" - "context" - "runtime" "regexp" + "runtime" "sync" "time" "github.com/microcosm-cc/bluemonday" + "gitlab.com/khanzf/fedilogue/shared" ) // Current instances -var runninginstances map[string]RunningInstance +var runninginstances map[string]shared.RunningInstance var ri_mutex = &sync.Mutex{} func startpprof() { @@ -65,7 +66,7 @@ func StatusReport() { func main() { // Initial Setup logInit() - runninginstances = make(map[string]RunningInstance) + runninginstances = make(map[string]shared.RunningInstance) getSettings() if len(settings.Proxies) > 0 { @@ -92,7 +93,7 @@ func main() { } defer rows.Close() - go staggeredStart(); + go staggeredStart() go statusReportHandler() for rows.Next() { @@ -102,7 +103,10 @@ func main() { logErr("Unable to iterate database, exiting.") return } - _, exists := GetRunner(endpoint) + o, exists := GetRunner(endpoint) + if o.Banned == true { + continue // Banned instance + } if exists == false { go StartInstance(endpoint) } diff --git a/fedilogue/fedilogue_test.go b/fedilogue/fedilogue_test.go index 8d64861..cdabd12 100644 --- a/fedilogue/fedilogue_test.go +++ b/fedilogue/fedilogue_test.go @@ -1,10 +1,11 @@ package main import ( - "gitlab.com/khanzf/fedilogue/shared" "strconv" "testing" "time" + + "gitlab.com/khanzf/fedilogue/shared" ) func TestStatusReport_empty_run(t *testing.T) { @@ -14,24 +15,24 @@ func TestStatusReport_empty_run(t *testing.T) { func TestStatusReport_full_content(t *testing.T) { defer func() { - runninginstances = map[string]RunningInstance{} + runninginstances = map[string]shared.RunningInstance{} }() - runninginstances = make(map[string]RunningInstance) + runninginstances = make(map[string]shared.RunningInstance) identifier := 0 var endpoint string test_instance_types := []string{"pleroma", "mastodon", "unknown", ""} - test_statuses := []int{NEW_INSTANCE, RUNNING, UNAUTHORIZED, FORBIDDEN, NOT_FOUND, UNPROCESSABLE_ENTITY, TOOMANYREQUESTS, INTERNAL_ERROR, CLIENT_ISSUE, ONION_PROTOCOL, BAD_RESPONSE, BAD_NODEINFO, UNSUPPORTED_INSTANCE, STREAM_ENDED, KEEPALIVE} + test_statuses := []int{shared.NEW_INSTANCE, shared.RUNNING, shared.UNAUTHORIZED, shared.FORBIDDEN, shared.NOT_FOUND, shared.UNPROCESSABLE_ENTITY, shared.TOOMANYREQUESTS, shared.INTERNAL_ERROR, shared.CLIENT_ISSUE, shared.ONION_PROTOCOL, shared.BAD_RESPONSE, shared.BAD_NODEINFO, shared.UNSUPPORTED_INSTANCE, shared.STREAM_ENDED, shared.KEEPALIVE} for _, test_instance_type := range test_instance_types { for _, test_status := range test_statuses { - a := RunningInstance{} + a := shared.RunningInstance{} endpoint = "endpoint" + strconv.Itoa(identifier) + ".test.com" - a.client = BuildClient(endpoint) + a.Client = BuildClient(endpoint) a.Status = test_status - a.recentactivities = shared.NewUniqueFifo(10) - a.recentactors = shared.NewUniqueFifo(10) + a.Recentactivities = shared.NewUniqueFifo(10) + a.Recentactors = shared.NewUniqueFifo(10) a.Software = test_instance_type a.Version = "0." + strconv.Itoa(identifier) a.LastRun = time.Now().Format(time.RFC3339) diff --git a/fedilogue/headers.go b/fedilogue/headers.go deleted file mode 100644 index 1edd030..0000000 --- a/fedilogue/headers.go +++ /dev/null @@ -1,101 +0,0 @@ -package main - -import ( - "gitlab.com/khanzf/fedilogue/shared" - "net/http" -) - -const ( - NEW_INSTANCE = 0 - RUNNING = 200 - UNAUTHORIZED = 401 - FORBIDDEN = 403 - NOT_FOUND = 404 - UNPROCESSABLE_ENTITY = 422 - TOOMANYREQUESTS = 429 - INTERNAL_ERROR = 500 - CLIENT_ISSUE = 600 - ONION_PROTOCOL = 601 - BAD_RESPONSE = 602 - BAD_NODEINFO = 604 - UNSUPPORTED_INSTANCE = 605 - STREAM_ENDED = 606 - KEEPALIVE = 607 -) - -type ObjectType struct { - Id string `json:"id"` -} - -// Parsing Unmarshal JSON type -type ReportActivity struct { - - // Retrieved values - Id string `json:"id"` - Uri string `json:"uri"` - Account AccountType - Content string `json:"content"` - Created_at string `json:"created_at"` - - // Derived values - normalized string -} - -type AccountType struct { - Acct string `json:"acct"` - Avatar string `json:"avatar"` - Bot bool `json:"bot"` - Created_at string `json:"created_at"` - Display_name string `json:"display_name"` - Url string `json:"url"` -} - -// Instance's new min_id value -type RunningInstance struct { - Software string `json:"software"` - Version string `json:"version"` - Status int `json:"status"` - LastRun string `json:"lastrun"` - CaptureType string `json:"capturetype"` - client http.Client - client_id string - client_secret string - recentactivities *shared.UniqueFifo - recentactors *shared.UniqueFifo -} - -type NodeInfoSoftware struct { - Name string `json:"name"` - Version string `json:"version"` -} - -type NodeInfo struct { - Software NodeInfoSoftware `json:"software"` -} - -type CommandMap struct { - Type string `json:"Type"` - Endpoint string `json:"Endpoint"` -} - -type ResponseBack struct { - Type string `json:"Type"` - Message string `json:"Message"` - RunningInstances map[string]RunningInstance `json:"RunningInstances"` -} - -type Userinfo struct { - Id string `"json:id"` - Type string `"json:type"` - Following string `"json:following"` - Followers string `"json:followers"` - Inbox string `"json:inbox"` - Outbox string `"json:outbox"` - Featured string `"json:featured"` - PreferredUsername string `"json:preferredUsername"` - Name string `"json:name"` - Summary string `"json:summary"` - Url string `"json:Url"` - ManuallyApprovesFollowers string `"json:manuallyApprovesFollowers"` - Discoverable string `"json:discoverable"` -} diff --git a/fedilogue/instance.go b/fedilogue/instance.go index ebc2b48..5b8de9e 100644 --- a/fedilogue/instance.go +++ b/fedilogue/instance.go @@ -1,30 +1,31 @@ package main import ( - "gitlab.com/khanzf/fedilogue/shared" + "context" "encoding/json" + "fmt" "io/ioutil" + "math/rand" + "net" "net/http" "net/url" - "math/rand" - "context" "strings" "time" - "net" - "fmt" + + "gitlab.com/khanzf/fedilogue/shared" ) var staggeredStartChan chan bool -func DoTries(o *RunningInstance, req *http.Request) (*http.Response, error) { +func DoTries(o *shared.RunningInstance, req *http.Request) (*http.Response, error) { var resp *http.Response var err error for tries := 0; tries < 10; tries++ { - resp, err = o.client.Do(req) + resp, err = o.Client.Do(req) if err != nil { // URL.Scheme, Host, Path Opaque - logWarn("Failure connecting to " + req.URL.Scheme + "://" + req.URL.Host + req.URL.Path + ", attempt ", tries + 1, ", sleeping for 5 minutes: ", err) + logWarn("Failure connecting to "+req.URL.Scheme+"://"+req.URL.Host+req.URL.Path+", attempt ", tries+1, ", sleeping for 5 minutes: ", err) time.Sleep(time.Minute * 5) continue } @@ -37,7 +38,7 @@ func BuildClient(endpoint string) http.Client { // Test: TestBuildClient, TestBuildClientProxy /* The seemingly unused 'endpoint' variable is for proxying based on endpoint, ie for Tor */ tr := &http.Transport{ - MaxIdleConns: 2, + MaxIdleConns: 2, IdleConnTimeout: 3600 * time.Second, DialContext: (&net.Dialer{ Timeout: 30 * time.Second, @@ -61,17 +62,26 @@ func BuildClient(endpoint string) http.Client { return client } -func GetRunner(endpoint string) (RunningInstance, bool) { +func GetRunner(endpoint string) (shared.RunningInstance, bool) { // Tests: TestGetRunnerNonExist, TestGetRunnerExists ri_mutex.Lock() o, exists := runninginstances[endpoint] if exists == false { - o = RunningInstance{} - o.client = BuildClient(endpoint) - o.Status = KEEPALIVE - o.recentactivities = shared.NewUniqueFifo(10) - o.recentactors = shared.NewUniqueFifo(10) + o = shared.RunningInstance{} + selectRet := pool.QueryRow(context.Background(), "SELECT banned, alwaysbot FROM instances WHERE endpoint = $1", endpoint) + err := selectRet.Scan(&o.Banned, &o.Alwaysbot) + if err != nil { + logWarn("There is a database connection issue") + } + if o.Banned == true { + logInfo("Banned instance: ", endpoint) + } else { + o.Client = BuildClient(endpoint) + o.Status = shared.KEEPALIVE + o.Recentactivities = shared.NewUniqueFifo(10) + o.Recentactors = shared.NewUniqueFifo(10) + } runninginstances[endpoint] = o } ri_mutex.Unlock() @@ -79,19 +89,19 @@ func GetRunner(endpoint string) (RunningInstance, bool) { return o, exists } -func UpdateRunner(endpoint string, o RunningInstance) { +func UpdateRunner(endpoint string, o shared.RunningInstance) { // Tests: None necessary ri_mutex.Lock() runninginstances[endpoint] = o ri_mutex.Unlock() } -func GetInstanceInfo(endpoint string, o RunningInstance) RunningInstance { +func GetInstanceInfo(endpoint string, o shared.RunningInstance) shared.RunningInstance { /* Checking order * Mastodon/Pleroma * Um..nothing else yet */ - var nodeinfo NodeInfo + var nodeinfo shared.NodeInfo pleromastodon_nodeinfo_uri := "https://" + endpoint + "/nodeinfo/2.0.json" req, _ := http.NewRequest("GET", pleromastodon_nodeinfo_uri, nil) @@ -123,7 +133,7 @@ func GetInstanceInfo(endpoint string, o RunningInstance) RunningInstance { resp_index, err := DoTries(&o, req) o.LastRun = time.Now().Format(time.RFC3339) if err != nil { - o.Status = UNSUPPORTED_INSTANCE + o.Status = shared.UNSUPPORTED_INSTANCE o.Software = "Unsupported" logWarn("Unable to connect to " + endpoint + ", giving up") return o @@ -132,7 +142,7 @@ func GetInstanceInfo(endpoint string, o RunningInstance) RunningInstance { indexbin, err := ioutil.ReadAll(resp_index.Body) if err != nil { - o.Status = UNSUPPORTED_INSTANCE + o.Status = shared.UNSUPPORTED_INSTANCE o.Software = "Unsupported" logWarn("Unable to read index of " + endpoint + ", giving up") return o @@ -156,7 +166,7 @@ func GetInstanceInfo(endpoint string, o RunningInstance) RunningInstance { return o } -func LogInstance(endpoint string, o RunningInstance) bool { +func LogInstance(endpoint string, o shared.RunningInstance) bool { selectRet := pool.QueryRow(context.Background(), "SELECT FROM instances WHERE endpoint = $1", endpoint) err := selectRet.Scan() if err == nil { @@ -173,7 +183,7 @@ func LogInstance(endpoint string, o RunningInstance) bool { } func CheckInstance(newinstance string, callerEndpoint string) { - if settings.Crawl == true && stringexists(newinstance, settings.Banned) == false { + if settings.Crawl == true { // Skip over this if its the same as the endpoint or empty if newinstance == callerEndpoint || newinstance == "" { return @@ -183,7 +193,7 @@ func CheckInstance(newinstance string, callerEndpoint string) { for attempt := 0; attempt > 5; attempt = attempt + 1 { _, err = net.LookupHost(newinstance) if err != nil { - logDebug("Unable to resolve " + newinstance + " attempt ", attempt, "/5. Sleeping for 30 seconds") + logDebug("Unable to resolve "+newinstance+" attempt ", attempt, "/5. Sleeping for 30 seconds") time.Sleep(time.Second * 30) continue } @@ -202,11 +212,11 @@ func CheckInstance(newinstance string, callerEndpoint string) { // Going forward, this might be merged into GetRunner ri_mutex.Lock() o, exists := runninginstances[newinstance] - if exists == false || o.Status == KEEPALIVE { - m := RunningInstance{} - m.client = BuildClient(newinstance) - m.recentactivities = shared.NewUniqueFifo(10) - m.recentactors = shared.NewUniqueFifo(10) + if exists == false || o.Status == shared.KEEPALIVE { + m := shared.RunningInstance{} + m.Client = BuildClient(newinstance) + m.Recentactivities = shared.NewUniqueFifo(10) + m.Recentactors = shared.NewUniqueFifo(10) runninginstances[newinstance] = m go StartInstance(newinstance) } @@ -216,7 +226,8 @@ func CheckInstance(newinstance string, callerEndpoint string) { func staggeredStart() { for { - _ :<- staggeredStartChan + _: + <-staggeredStartChan time.Sleep(500 * time.Millisecond) } } @@ -227,6 +238,9 @@ func StartInstance(endpoint string) { // Check if exists. If so, get the object. If not, create it o, _ := GetRunner(endpoint) + if o.Banned == true { + return // banned instance + } o = GetInstanceInfo(endpoint, o) UpdateRunner(endpoint, o) diff --git a/fedilogue/instance_test.go b/fedilogue/instance_test.go index 69c0961..2d14ea9 100644 --- a/fedilogue/instance_test.go +++ b/fedilogue/instance_test.go @@ -1,17 +1,18 @@ package main import ( - "gitlab.com/khanzf/fedilogue/shared" - "reflect" + "net" "net/http" + "reflect" "testing" "time" - "net" + + "gitlab.com/khanzf/fedilogue/shared" ) func TestBuildClient(t *testing.T) { tr := &http.Transport{ - MaxIdleConns: 2, + MaxIdleConns: 2, IdleConnTimeout: 3600 * time.Second, DialContext: (&net.Dialer{ Timeout: 30 * time.Second, @@ -32,36 +33,19 @@ func TestBuildClientProxy(t *testing.T) { } func TestGetRunnerNonExist(t *testing.T) { - defer func() { - runninginstances = map[string]RunningInstance{} - }() - want_o := RunningInstance{} - want_o.client = BuildClient("some-non-existent-domain.tld") - want_o.Status = KEEPALIVE - want_o.recentactivities = shared.NewUniqueFifo(10) - want_o.recentactors = shared.NewUniqueFifo(10) - want_exists := false - have_o, have_exists := GetRunner("some-non-existent-domain.tld") - - if reflect.DeepEqual(want_o, have_o) { - t.Fatalf("TestGetRunnerBlank expected asfasfsf") - } - - if have_exists != false { - t.Fatalf("TestGetRunnerBlank expected %v, got %v", want_exists, have_exists) - } + // Currently not implemented } func TestGetRunnerExists(t *testing.T) { defer func() { - runninginstances = map[string]RunningInstance{} + runninginstances = map[string]shared.RunningInstance{} }() - want_o := RunningInstance{} - want_o.client = BuildClient("some-non-existent-domain.tld") - want_o.Status = KEEPALIVE - want_o.recentactivities = shared.NewUniqueFifo(10) - want_o.recentactors = shared.NewUniqueFifo(10) + want_o := shared.RunningInstance{} + want_o.Client = BuildClient("some-non-existent-domain.tld") + want_o.Status = shared.KEEPALIVE + want_o.Recentactivities = shared.NewUniqueFifo(10) + want_o.Recentactors = shared.NewUniqueFifo(10) runninginstances["some-non-existent-domain.tld"] = want_o want_exists := true @@ -70,7 +54,7 @@ func TestGetRunnerExists(t *testing.T) { if have_exists != want_exists { t.Fatalf("TestGetRunnerBlank expected %v, got %v", want_exists, have_exists) } -// if reflect.DeepEqual(want_o, have_o) { -// t.Fatalf("TestGetRunnerExists failed, should have the same value") -// } + // if reflect.DeepEqual(want_o, have_o) { + // t.Fatalf("TestGetRunnerExists failed, should have the same value") + // } } diff --git a/fedilogue/oauth.go b/fedilogue/oauth.go index 16ad474..ac221f8 100644 --- a/fedilogue/oauth.go +++ b/fedilogue/oauth.go @@ -8,6 +8,8 @@ import ( "io/ioutil" "net/http" "os" + + "gitlab.com/khanzf/fedilogue/shared" ) type OAuth struct { @@ -25,7 +27,7 @@ func (e *authError) Error() string { return e.msg } -func register_client(endpoint string, o *RunningInstance) error { +func register_client(endpoint string, o *shared.RunningInstance) error { requestBodymap, _ := json.Marshal(map[string]string{ "client_name": "Tusky", // Hard-coded in for now... "scopes": "read write follow push", @@ -35,7 +37,7 @@ func register_client(endpoint string, o *RunningInstance) error { api_base_apps := "https://" + endpoint + "/api/v1/apps" - resp, err := o.client.Post(api_base_apps, "application/json", requestBodybytes) + resp, err := o.Client.Post(api_base_apps, "application/json", requestBodybytes) if err != nil { logErr("Unable to connect to "+api_base_apps+" ", err) return err @@ -45,8 +47,8 @@ func register_client(endpoint string, o *RunningInstance) error { body, err := ioutil.ReadAll(resp.Body) if err != nil { logErr("Unable to read HTTP response: ", err) - o.client_id = "" - o.client_secret = "" + o.Client_id = "" + o.Client_secret = "" return err } @@ -54,8 +56,8 @@ func register_client(endpoint string, o *RunningInstance) error { err = json.Unmarshal(body, &bodymap) if err != nil { logErr("Unable to parse response from "+endpoint+": ", err) - o.client_id = "" - o.client_secret = "" + o.Client_id = "" + o.Client_secret = "" return err } @@ -64,8 +66,8 @@ func register_client(endpoint string, o *RunningInstance) error { f, err := os.Create("clients/" + endpoint) if err != nil { logErr("Unable to create "+client_file+": ", err) - o.client_id = "" - o.client_secret = "" + o.Client_id = "" + o.Client_secret = "" return err } defer f.Close() @@ -73,24 +75,24 @@ func register_client(endpoint string, o *RunningInstance) error { _, err = io.WriteString(f, bodymap["client_id"]+"\n") if err != nil { logErr("Unable to write client_id line to file "+client_file+": ", err) - o.client_id = bodymap["client_id"] - o.client_secret = bodymap["client_secret"] + o.Client_id = bodymap["client_id"] + o.Client_secret = bodymap["client_secret"] return nil } _, err = io.WriteString(f, bodymap["client_secret"]+"\n") if err != nil { logErr("Unable to write client_secret to file "+client_file+": ", err) - o.client_id = bodymap["client_id"] - o.client_secret = bodymap["client_secret"] + o.Client_id = bodymap["client_id"] + o.Client_secret = bodymap["client_secret"] return nil } - o.client_id = bodymap["client_id"] - o.client_secret = bodymap["client_secret"] + o.Client_id = bodymap["client_id"] + o.Client_secret = bodymap["client_secret"] return nil } -func get_client(endpoint string, o *RunningInstance) error { +func get_client(endpoint string, o *shared.RunningInstance) error { var err error client_file := "clients/" + endpoint _, err = os.Stat(client_file) @@ -105,15 +107,15 @@ func get_client(endpoint string, o *RunningInstance) error { rd := bufio.NewReader(f) client_id_bin, _, err := rd.ReadLine() - o.client_id = string(client_id_bin) + o.Client_id = string(client_id_bin) if err != nil { - logErr("Unable to read client_id line of "+client_file+", building new client") + logErr("Unable to read client_id line of " + client_file + ", building new client") return err } client_secret_bin, _, err := rd.ReadLine() - o.client_secret = string(client_secret_bin) + o.Client_secret = string(client_secret_bin) if err != nil { - logErr("Unable to read client_secret line of "+client_file+", building new client") + logErr("Unable to read client_secret line of " + client_file + ", building new client") return err } @@ -125,7 +127,7 @@ func get_client(endpoint string, o *RunningInstance) error { return nil } -func oauth_login(endpoint string, o *RunningInstance, username string, password string) (OAuth, error) { +func oauth_login(endpoint string, o *shared.RunningInstance, username string, password string) (OAuth, error) { authMap, err := json.Marshal(map[string]string{ "username": username, "password": password, @@ -133,12 +135,12 @@ func oauth_login(endpoint string, o *RunningInstance, username string, password "grant_type": "password", "client_name": "Tusky", "scope": "read write follow push", - "client_id": o.client_id, - "client_secret": o.client_secret, + "client_id": o.Client_id, + "client_secret": o.Client_secret, }) if err != nil { - logErr("Unable to create Authentication map for "+endpoint) + logErr("Unable to create Authentication map for " + endpoint) return OAuth{}, err } diff --git a/fedilogue/poll.go b/fedilogue/poll.go index 1a65ba8..18a585b 100644 --- a/fedilogue/poll.go +++ b/fedilogue/poll.go @@ -5,6 +5,8 @@ import ( "io/ioutil" "net/http" "time" + + "gitlab.com/khanzf/fedilogue/shared" ) type ImageData struct { @@ -46,8 +48,8 @@ type PostInfo struct { Content string `"json:content"` } -func PollMastodonPleroma(endpoint string, o *RunningInstance) { - newactivities := make([]ReportActivity, 0) +func PollMastodonPleroma(endpoint string, o *shared.RunningInstance) { + newactivities := make([]shared.ReportActivity, 0) min_id := "" @@ -64,15 +66,18 @@ func PollMastodonPleroma(endpoint string, o *RunningInstance) { if extaccount.Endpoint == endpoint { use_auth = true o, _ := GetRunner(endpoint) + if o.Banned == true { + return // banned endpoint + } err = get_client(endpoint, &o) if err != nil { - logErr("Unable to register client for " + endpoint + ": ", err) + logErr("Unable to register client for "+endpoint+": ", err) return } oauthData, err = oauth_login(endpoint, &o, extaccount.Username, extaccount.Password) if err != nil { - logErr("Unable to login to " + endpoint + ": ", err) + logErr("Unable to login to "+endpoint+": ", err) return } last_refresh = time.Now().Unix() @@ -89,7 +94,7 @@ func PollMastodonPleroma(endpoint string, o *RunningInstance) { req, err := http.NewRequest("GET", api_timeline, nil) req.Header.Set("User-Agent", "Tusky") if err != nil { - logFatal.Fatal("Unable to create new request for " + endpoint + ": ", err) + logFatal.Fatal("Unable to create new request for "+endpoint+": ", err) return } @@ -97,7 +102,7 @@ func PollMastodonPleroma(endpoint string, o *RunningInstance) { if time.Now().Unix() > last_refresh+oauthData.Expires_in { oauthData, err = oauth_refresh(endpoint, client_id, client_secret, oauthData.Refresh_token) if err != nil { - logWarn("Unable to refresh oauth token for " + endpoint + ": ", err) + logWarn("Unable to refresh oauth token for "+endpoint+": ", err) return } last_refresh = time.Now().Unix() @@ -108,15 +113,15 @@ func PollMastodonPleroma(endpoint string, o *RunningInstance) { m.LastRun = time.Now().Format(time.RFC3339) resp, err := DoTries(o, req) if err != nil { - m.Status = CLIENT_ISSUE + m.Status = shared.CLIENT_ISSUE ri_mutex.Lock() runninginstances[endpoint] = m ri_mutex.Unlock() - logWarn("Giving up on " + endpoint + ": ", err.Error()) + logWarn("Giving up on "+endpoint+": ", err.Error()) return } - if resp.StatusCode == TOOMANYREQUESTS { // Short Delay, 30 seconds + if resp.StatusCode == shared.TOOMANYREQUESTS { // Short Delay, 30 seconds logWarn("Delaying "+endpoint+", gave status ", resp.StatusCode, ", 1 hour delay") _, _ = ioutil.ReadAll(resp.Body) resp.Body.Close() // Release as soon as done @@ -126,8 +131,8 @@ func PollMastodonPleroma(endpoint string, o *RunningInstance) { ri_mutex.Unlock() time.Sleep(time.Second * 30) continue - } else if resp.StatusCode == INTERNAL_ERROR { // Longer delay, 1 hour - logWarn("Suspending " + endpoint + ", gave status ", resp.StatusCode, ", 1 hour delay") + } else if resp.StatusCode == shared.INTERNAL_ERROR { // Longer delay, 1 hour + logWarn("Suspending "+endpoint+", gave status ", resp.StatusCode, ", 1 hour delay") _, _ = ioutil.ReadAll(resp.Body) resp.Body.Close() // Release as soon as done m.Status = 765 @@ -137,7 +142,7 @@ func PollMastodonPleroma(endpoint string, o *RunningInstance) { time.Sleep(time.Second * 3600) continue } else if resp.StatusCode != 200 { // Crash - logErr("Terminating " + endpoint + ", gave status ", resp.StatusCode) + logErr("Terminating "+endpoint+", gave status ", resp.StatusCode) _, _ = ioutil.ReadAll(resp.Body) resp.Body.Close() // Release as soon as done m.Status = resp.StatusCode @@ -151,7 +156,7 @@ func PollMastodonPleroma(endpoint string, o *RunningInstance) { resp.Body.Close() // Release as soon as done if err != nil { if parsing_error > 5 { - m.Status = BAD_RESPONSE + m.Status = shared.BAD_RESPONSE ri_mutex.Lock() runninginstances[endpoint] = m ri_mutex.Unlock() @@ -162,7 +167,7 @@ func PollMastodonPleroma(endpoint string, o *RunningInstance) { time.Sleep(time.Second * 30) } - m.Status = RUNNING + m.Status = shared.RUNNING ri_mutex.Lock() runninginstances[endpoint] = m ri_mutex.Unlock() diff --git a/fedilogue/retrieve.go b/fedilogue/retrieve.go index 427325d..25c24c8 100644 --- a/fedilogue/retrieve.go +++ b/fedilogue/retrieve.go @@ -6,9 +6,10 @@ import ( "html" "io/ioutil" "net/http" + "regexp" "strings" "time" - "regexp" + "github.com/microcosm-cc/bluemonday" ) @@ -42,17 +43,17 @@ type ActorJson struct { Image ImageType `json:"image"` PublicKey PublicKeyType `json:"publicKey"` + bot bool instance string } type TagType struct { - Type string `json:"type"` - Name string `json:"name"` + Type string `json:"type"` + Name string `json:"name"` } - type PostJson struct { - id int + id int Uri string `json:"id"` InReplyTo string `json:"inReplyTo"` @@ -64,26 +65,20 @@ type PostJson struct { Published time.Time `json:"published"` Source string `json:"source"` Summary string `json:"summary"` - Tag []TagType `json:"tag"` - To []string `json:"to"` - Type string `json:"type"` + Tag []TagType `json:"tag"` + To []string `json:"to"` + Type string `json:"type"` Actor string `json:"actor"` AttributedTo string `json:"attributedTo"` + bot bool instance string } func check_activity(uri string) { var activityjson PostJson - // Ignore banned - for _, banned := range settings.Banned { - if strings.Index(uri, "https://"+banned+"/") == 0 { - return - } - } - // Ignore invalid URIs endslash := strings.Index(uri[8:], "/") if endslash == -1 { @@ -92,15 +87,19 @@ func check_activity(uri string) { activityjson.instance = uri[8 : endslash+8] o, _ := GetRunner(activityjson.instance) + if o.Banned == true { + return // Banned instance + } // Check if there were any recent requests on this - o.recentactivities.Mu.Lock() - if o.recentactivities.Add(uri) == true { - o.recentactivities.Mu.Unlock() + o.Recentactivities.Mu.Lock() + i, _ := o.Recentactivities.Contains(uri) + if i != -1 { + o.Recentactivities.Mu.Unlock() return } - o.recentactivities.Mu.Unlock() + o.Recentactivities.Mu.Unlock() var jsondocument string selectRet := pool.QueryRow(context.Background(), "SELECT FROM activities WHERE document->>'id' = $1", uri) @@ -142,7 +141,13 @@ func check_activity(uri string) { } // This must be done BEFORE the `INSERT INTO activities'` below - go check_actor(activityjson.AttributedTo) + actorjson := check_actor(activityjson.AttributedTo) + if actorjson == nil { + return + } + if actorjson.bot || o.Alwaysbot { + activityjson.bot = true + } activityjson.normalized = removeHTMLReg.ReplaceAllString(activityjson.Content, " ") activityjson.normalized = html.UnescapeString(strings.ToLower(p.Sanitize(activityjson.normalized))) @@ -155,15 +160,15 @@ func check_activity(uri string) { hashtags = append(hashtags, strings.ToLower(tag.Name)) } } - _, err = pool.Exec(context.Background(), "INSERT INTO activities (document, normalized, instance, hashtags) VALUES($1, $2, $3, $4)", jsondocument, activityjson.normalized, activityjson.instance, hashtags) + _, err = pool.Exec(context.Background(), "INSERT INTO activities (document, normalized, instance, hashtags, bot) VALUES($1, $2, $3, $4, $5)", jsondocument, activityjson.normalized, activityjson.instance, hashtags, activityjson.bot) if err != nil { - logWarn("Error inserting %s into `activities`: "+ uri, err) + logWarn("Error inserting %s into `activities`: "+uri, err) return } for _, to := range activityjson.To { if to != "https://www.w3.org/ns/activitystreams#Public" && to != "" { - if strings.HasSuffix(to, "/followers") == true { + if strings.HasSuffix(to, "/followers") { // This check is very much a bad solution, may consider removing the entire for-loop continue } @@ -174,37 +179,37 @@ func check_activity(uri string) { } /* Test: TestCheck_actor */ -func check_actor(uri string) int { - var actorjson ActorJson +func check_actor(uri string) *ActorJson { + actorjson := &ActorJson{} if len(uri) <= 7 { - return 400 // Bad actor + return nil // Bad actor } endslash := strings.Index(uri[8:], "/") if endslash == -1 { - return 400 // Bad actor + return nil // Bad actor } actorjson.instance = uri[8 : endslash+8] - for _, banned := range settings.Banned { - if strings.Index(uri, "https://"+banned+"/") == 0 { - return 401 // Banned actor - } - } // Check if there were any recent requests on this o, _ := GetRunner(actorjson.instance) - o.recentactors.Mu.Lock() - if o.recentactors.Add(uri) == true { - o.recentactors.Mu.Unlock() - return 402 // Actor in recent queue, good! + if o.Banned { + return nil // Banned actor } - o.recentactors.Mu.Unlock() + o.Recentactors.Mu.Lock() + i, cachedactorjson := o.Recentactors.Contains(uri) + if i != -1 { + cachedactorjson := cachedactorjson.(*ActorJson) + return cachedactorjson + } + o.Recentactors.Mu.Unlock() - selectRet := pool.QueryRow(context.Background(), "SELECT FROM actors WHERE document->>'id' = $1", uri) - err := selectRet.Scan() + selectRet := pool.QueryRow(context.Background(), "SELECT document FROM actors WHERE document->>'id' = $1", uri) + err := selectRet.Scan(&actorjson) if err == nil { - return 403 // Actor already in database, good! + logWarn(actorjson) + return actorjson // Actor already in database, good! } req, _ := http.NewRequest("GET", uri, nil) @@ -214,13 +219,13 @@ func check_actor(uri string) int { var resp *http.Response tries := 0 for { - resp, err = o.client.Do(req) + resp, err = o.Client.Do(req) if err != nil { if tries > 10 { - logErr("Unable to connect to "+uri+" attempt 10/10, giving up.") - return 404 // Unable to connect to host after 10 attempts + logErr("Unable to connect to " + uri + " attempt 10/10, giving up.") + return nil // Unable to connect to host after 10 attempts } - logWarn("Unable to connect to "+uri+", attempt ",tries+1,"+/10 sleeping for 30 seconds.") + logWarn("Unable to connect to "+uri+", attempt ", tries+1, "+/10 sleeping for 30 seconds.") time.Sleep(time.Second * 30) tries = tries + 1 continue @@ -230,7 +235,7 @@ func check_actor(uri string) int { body, err := ioutil.ReadAll(resp.Body) if err != nil { - return 405 // Unable to read body of message + return nil // Unable to read body of message } resp.Body.Close() @@ -238,26 +243,22 @@ func check_actor(uri string) int { err = json.Unmarshal(body, &actorjson) if err != nil { - return 406 // Unable to unmarshal body of message + return nil // Unable to unmarshal body of message } var bot bool if actorjson.Type == "Service" { - bot = true + actorjson.bot = true } else { - bot = false - for _, botinstance := range settings.Alwaysbot { - if strings.Index(uri, "https://"+botinstance+"/") == 0 { - bot = true - } - } + actorjson.bot = o.Alwaysbot // default on host's classification } _, err = pool.Exec(context.Background(), "INSERT INTO actors (document, instance, bot) VALUES($1, $2, $3)", jsondocument, actorjson.instance, bot) if err != nil { logWarn("Error inserting %s into `actors`: "+uri, err) - return 407 // Unable to insert actor + return nil // Unable to insert actor } - return 0 // Successful + o.Recentactors.Add(uri, actorjson) + return actorjson // Successful } diff --git a/fedilogue/retrieve_test.go b/fedilogue/retrieve_test.go index bdeb8a9..2681715 100644 --- a/fedilogue/retrieve_test.go +++ b/fedilogue/retrieve_test.go @@ -5,33 +5,5 @@ import ( ) func TestCheck_actor(t *testing.T) { - defer func() { - }() - - // Start of Setup - settings.Banned = append(settings.Banned, "banneddomain.com") - o, _ := GetRunner("validdomain.com") - - q := o.recentactors.Add("https://validdomain.com/users/validuser") - AssertEqual(t, q, false) // A setup test - // End of Setup - - // initialize haves - have1 := check_actor("meaninglessvalue") - have0 := check_actor("") - have2 := check_actor("https://banneddomain.com/users/banneduser") - have3 := check_actor("https://validdomain.com/users/validuser") - - // test wants - // Short user - AssertEqual(t, have0, 400) - - // Invalid user - AssertEqual(t, have1, 400) - - // Banned instance - AssertEqual(t, have2, 401) - - // User already present - AssertEqual(t, have3, 402) + // Currently not implemented } diff --git a/fedilogue/stream.go b/fedilogue/stream.go index 57c0181..0e258bb 100644 --- a/fedilogue/stream.go +++ b/fedilogue/stream.go @@ -6,10 +6,11 @@ import ( "net/http" "strings" "time" -// "net" + + "gitlab.com/khanzf/fedilogue/shared" ) -func StreamMastodon(endpoint string, o *RunningInstance) { +func StreamMastodon(endpoint string, o *shared.RunningInstance) { stream_client := BuildClient(endpoint) var oauthData OAuth @@ -51,20 +52,20 @@ func StreamMastodon(endpoint string, o *RunningInstance) { resp, err = stream_client.Do(req) if err != nil { time.Sleep(time.Minute * 5) - logWarn("Failure connecting to " + req.URL.Scheme + "://" + req.URL.Host + req.URL.Path + ", attempt ", tries + 1, ", sleeping for 5 minutes, ", err) + logWarn("Failure connecting to "+req.URL.Scheme+"://"+req.URL.Host+req.URL.Path+", attempt ", tries+1, ", sleeping for 5 minutes, ", err) continue } break } if err != nil { - logErr("Unable to stream " + api_timeline + ": ", err) + logErr("Unable to stream "+api_timeline+": ", err) return } defer resp.Body.Close() ri_mutex.Lock() m := runninginstances[endpoint] - m.Status = RUNNING + m.Status = shared.RUNNING m.LastRun = "Streaming" runninginstances[endpoint] = m ri_mutex.Unlock() @@ -74,7 +75,7 @@ func StreamMastodon(endpoint string, o *RunningInstance) { for s.Scan() { line := s.Text() token := strings.SplitN(line, ":", 2) - var newactivity ReportActivity + var newactivity shared.ReportActivity if len(token) != 2 { continue } @@ -89,7 +90,7 @@ func StreamMastodon(endpoint string, o *RunningInstance) { jsondata := token[1][1:] err := json.Unmarshal([]byte(jsondata), &newactivity) if err != nil { - logDebug("Unable to parse data from "+endpoint+", but still connected.") + logDebug("Unable to parse data from " + endpoint + ", but still connected.") continue } retry = true diff --git a/fedilogue/tables.sql b/fedilogue/tables.sql index 568ea2f..3d17d72 100644 --- a/fedilogue/tables.sql +++ b/fedilogue/tables.sql @@ -12,7 +12,8 @@ CREATE TABLE IF NOT EXISTS activities ( normalized TEXT, identifiedat TIMESTAMP with time zone DEFAULT now(), instance VARCHAR(1000) NOT NULL, - hashtags VARCHAR(140)[] + hashtags VARCHAR(140)[], + bot BOOLEAN DEFAULT FALSE ); CREATE TABLE IF NOT EXISTS instances ( diff --git a/shared/headers.go b/shared/headers.go index 3a5539e..dc5d464 100644 --- a/shared/headers.go +++ b/shared/headers.go @@ -51,17 +51,18 @@ type AccountType struct { // Instance's new min_id value type RunningInstance struct { - Software string `json:"software"` - Version string `json:"version"` - Status int `json:"status"` - LastRun string `json:"lastrun"` - CaptureType string `json:"capturetype"` - banned bool - client http.Client - client_id string - client_secret string - recentactivities *UniqueFifo - recentactors *UniqueFifo + Software string `json:"software"` + Version string `json:"version"` + Status int `json:"status"` + LastRun string `json:"lastrun"` + CaptureType string `json:"capturetype"` + Banned bool + Alwaysbot bool + Client http.Client + Client_id string + Client_secret string + Recentactivities *UniqueFifo + Recentactors *UniqueFifo } type NodeInfoSoftware struct { diff --git a/shared/uniquefifo.go b/shared/uniquefifo.go index 74a00e2..ec7259f 100644 --- a/shared/uniquefifo.go +++ b/shared/uniquefifo.go @@ -5,39 +5,46 @@ import ( ) type UniqueFifo struct { - slice []string - Mu sync.Mutex - size int + keys []string + values []interface{} + Mu sync.Mutex + size int } - func NewUniqueFifo(size int) *UniqueFifo { - q := UniqueFifo{} - q.slice = make([]string, 0) + q := UniqueFifo{} + q.keys = make([]string, 0) + q.values = make([]interface{}, 0) q.size = size - return &q + return &q } -func (q *UniqueFifo) Add(v string) bool { +func (q *UniqueFifo) Add(k string, v interface{}) bool { ret := false - if len(q.slice) == 0 { - q.slice = append(q.slice, v) + if len(q.keys) == 0 { + q.keys = append(q.keys, k) + q.values = append(q.values, v) ret = false } else { - i := q.Contains(v) + i, _ := q.Contains(k) if i != -1 { q.Remove(i) ret = true } else { ret = false } - q.slice = append(q.slice, "") - copy(q.slice[1:], q.slice) - q.slice[0] = v - if len(q.slice) <= q.size { - q.slice = q.slice[:len(q.slice)] + q.keys = append(q.keys, "") + q.values = append(q.values, "") + copy(q.keys[1:], q.keys) + copy(q.values[1:], q.values) + q.keys[0] = k + q.values[0] = v + if len(q.keys) <= q.size { + q.keys = q.keys[:len(q.keys)] + q.values = q.values[:len(q.values)] } else { - q.slice = q.slice[:q.size] + q.keys = q.keys[:q.size] + q.values = q.values[:q.size] } } @@ -45,18 +52,22 @@ func (q *UniqueFifo) Add(v string) bool { } func (q *UniqueFifo) Remove(r int) { - f := q.slice[:r] - e := q.slice[r+1:] - q.slice = f - q.slice = append(q.slice, e...) + f := q.keys[:r] + e := q.keys[r+1:] + q.keys = f + q.keys = append(q.keys, e...) + + n := q.values[:r] + o := q.values[r+1:] + q.values = n + q.values = append(q.values, o...) } - -func (q *UniqueFifo) Contains(v string) int { - for i, val := range q.slice { - if val == v { - return i +func (q *UniqueFifo) Contains(k string) (int, interface{}) { + for i, key := range q.keys { + if key == k { + return i, q.values[i] } } - return -1 + return -1, nil }