From a122f72af75af1826cacfb8d3a1cfeddc0200e51 Mon Sep 17 00:00:00 2001 From: Farhan Khan Date: Tue, 2 Feb 2021 23:34:43 +0000 Subject: [PATCH] Fully migrated to jsonb, adjusted uniquefifo to keep cache fresh * Added fifo mechanism to actors table * Increased fifo size to 10 * Still getting some database insert duplicate errors, but only for very active instances when they are newly identified --- Makefile | 4 ++-- ctl.go | 3 ++- fifo.go | 45 ----------------------------------------- headers.go | 3 ++- instance.go | 6 ++++-- retrieve.go | 58 ++++++++++++++++++++++++++++++++--------------------- 6 files changed, 45 insertions(+), 74 deletions(-) delete mode 100644 fifo.go diff --git a/Makefile b/Makefile index ee2f26c..e7fcf9e 100644 --- a/Makefile +++ b/Makefile @@ -1,5 +1,5 @@ -FEDILOGUE_GOFILES = fedilogue.go ctl.go headers.go instance.go poll.go stream.go web.go db.go config.go oauth.go retrieve.go log.go fifo.go -FEDICTL_GOFILES = fedictl.go headers.go log.go fifo.go +FEDILOGUE_GOFILES = fedilogue.go ctl.go headers.go instance.go poll.go stream.go web.go db.go config.go oauth.go retrieve.go log.go uniquefifo.go +FEDICTL_GOFILES = fedictl.go headers.go log.go uniquefifo.go build: go build -o fedilogue $(FEDILOGUE_GOFILES) diff --git a/ctl.go b/ctl.go index fc2274d..a6a8b6b 100644 --- a/ctl.go +++ b/ctl.go @@ -76,7 +76,8 @@ func handleClient(commandClient net.Conn) { } else { responseback.Message = "Adding: " + commandmap.Endpoint o := RunningInstance{} - o.recenturis = newfifo(5) + o.recentactivities = newUniqueFifo(10) + o.recentactors = newUniqueFifo(10) o.client = BuildClient(commandmap.Endpoint) runninginstances[commandmap.Endpoint] = o go StartInstance(commandmap.Endpoint) diff --git a/fifo.go b/fifo.go deleted file mode 100644 index 5290ed5..0000000 --- a/fifo.go +++ /dev/null @@ -1,45 +0,0 @@ -package main - -import ( - "sync" -) - -type Fifo struct { - slice []string - mu sync.Mutex - size int -} - -func newfifo(size int) *Fifo { - q := Fifo{} - q.slice = make([]string, 0) - q.size = size - return &q -} -func (q *Fifo) Add(v string) int { - q.mu.Lock() - i := q.Contains(v) - if i == -1 { - if len(q.slice) >= q.size { - q.DeleteFirst() - } - q.slice = append(q.slice, v) - } - q.mu.Unlock() - return i -} - -func (q *Fifo) Contains(v string) int { - for i, val := range q.slice { - if val == v { - return i - } - } - return -1 -} - -func (q *Fifo) DeleteFirst() string { - a := q.slice[0] - q.slice = q.slice[1:] - return a -} diff --git a/headers.go b/headers.go index 827ab13..01a1710 100644 --- a/headers.go +++ b/headers.go @@ -59,7 +59,8 @@ type RunningInstance struct { client http.Client client_id string client_secret string - recenturis *Fifo + recentactivities *UniqueFifo + recentactors *UniqueFifo } type NodeInfoSoftware struct { diff --git a/instance.go b/instance.go index 50ed1bd..427ac18 100644 --- a/instance.go +++ b/instance.go @@ -63,7 +63,8 @@ func GetRunner(endpoint string) (RunningInstance, bool) { o = RunningInstance{} o.client = BuildClient(endpoint) o.Status = KEEPALIVE - o.recenturis = newfifo(5) + o.recentactivities = newUniqueFifo(10) + o.recentactors = newUniqueFifo(10) runninginstances[endpoint] = o } ri_mutex.Unlock() @@ -174,7 +175,8 @@ func CheckInstance(newinstance string, callerEndpoint string) { if exists == false || o.Status == KEEPALIVE { m := RunningInstance{} m.client = BuildClient(newinstance) - m.recenturis = newfifo(5) + m.recentactivities = newUniqueFifo(10) + m.recentactors = newUniqueFifo(10) runninginstances[newinstance] = m go StartInstance(newinstance) } diff --git a/retrieve.go b/retrieve.go index 4c1fa91..e5607ea 100644 --- a/retrieve.go +++ b/retrieve.go @@ -28,7 +28,8 @@ type PublicKeyType struct { } type ActorJson struct { - ID string `json:"id"` + id int + uri string `json:"id"` Type string `json:"type"` Inbox string `json:"inbox"` Outbox string `json:"outbox"` @@ -46,7 +47,8 @@ type ActorJson struct { } type PostJson struct { - ID string `json:"id"` + id int + uri string `json:"id"` InReplyTo string `json:"inReplyTo"` normalized string @@ -87,15 +89,20 @@ func check_activity(uri string) (PostJson, error) { o, _ := GetRunner(activityjson.instance) // Check if there were any recent requests on this - if o.recenturis.Add(uri) != -1 { - return activityjson, errors.New("Recently requested within local cache") + o.recentactivities.mu.Lock() + if o.recentactivities.Add(uri) == true { + o.recentactivities.mu.Unlock() + return activityjson, errors.New("Recently requested within local cache") } + o.recentactivities.mu.Unlock() var jsondocument string + jsonmap := make(map[string]interface{}) - selectRet := pool.QueryRow(context.Background(), "SELECT document, normalized FROM activities WHERE document->'id' = $1", uri) - err := selectRet.Scan(&activityjson.ID, &jsondocument, &activityjson.normalized) + selectRet := pool.QueryRow(context.Background(), "SELECT id, document FROM activities WHERE document->>'id' = $1", uri) + err := selectRet.Scan(&activityjson.id, &jsonmap) if err == nil { + /////////// BETTER RETURN VALUES!!!!! return activityjson, nil } @@ -113,9 +120,7 @@ func check_activity(uri string) (PostJson, error) { return activityjson, errors.New("Read error on " + uri) } resp.Body.Close() - jsondocument = string(body) - err = json.Unmarshal(body, &activityjson) if err != nil { return activityjson, err @@ -123,7 +128,7 @@ func check_activity(uri string) (PostJson, error) { if activityjson.InReplyTo != "" && activityjson.InReplyTo != uri { if activityjson.InReplyTo != uri { - go check_actor(activityjson.InReplyTo) + go check_activity(activityjson.InReplyTo) } } @@ -145,7 +150,7 @@ func check_activity(uri string) (PostJson, error) { _, err = pool.Exec(context.Background(), "INSERT INTO activities (document, normalized, instance) VALUES($1, $2, $3)", jsondocument, activityjson.normalized, activityjson.instance) if err != nil { - logDebug.Print(err) + logWarn.Printf("Error inserting %s into `activities`: %s", uri, err) return activityjson, err } @@ -164,26 +169,34 @@ func check_activity(uri string) (PostJson, error) { func check_actor(uri string) (ActorJson, error) { var actorjson ActorJson + endslash := strings.Index(uri[8:], "/") + if endslash == -1 { + return actorjson, errors.New("Invalid user: " + uri) + } + actorjson.instance = uri[8 : endslash+8] for _, banned := range settings.Banned { if strings.Index(uri, "https://"+banned+"/") == 0 { return actorjson, errors.New("Banned instance") } } - var jsondocument string - selectRet := pool.QueryRow(context.Background(), "SELECT document, instance FROM actors WHERE document->'id' = $1", uri) - err := selectRet.Scan(&actorjson.ID, &jsondocument, &actorjson.instance) + // Check if there were any recent requests on this + o, _ := GetRunner(actorjson.instance) + o.recentactors.mu.Lock() + if o.recentactors.Add(uri) == true { + o.recentactors.mu.Unlock() + return actorjson, errors.New("Recently requested actor within local cache") + } + o.recentactors.mu.Unlock() + + jsonmap := make(map[string]interface{}) + selectRet := pool.QueryRow(context.Background(), "SELECT id, document, instance FROM actors WHERE document->>'id' = $1", uri) + err := selectRet.Scan(&actorjson.id, &jsonmap, &actorjson.instance) if err == nil { + ///////// BETTER RETURN VALUES //////// return actorjson, nil } - endslash := strings.Index(uri[8:], "/") - if endslash == -1 { - return actorjson, errors.New("Invalid user: " + uri) - } - actorjson.instance = uri[8 : endslash+8] -logDebug.Print("CHECK: " + uri) - o, _ := GetRunner(actorjson.instance) req, _ := http.NewRequest("GET", uri, nil) req.Header.Set("User-Agent", "Tusky") req.Header.Add("Accept", "application/ld+json") @@ -211,8 +224,7 @@ logDebug.Print("CHECK: " + uri) } resp.Body.Close() - jsondocument = string(body) - //logDebug.Print(string(jsondocument)) + jsondocument := string(body) err = json.Unmarshal(body, &actorjson) if err != nil { @@ -221,7 +233,7 @@ logDebug.Print("CHECK: " + uri) _, err = pool.Exec(context.Background(), "INSERT INTO actors (document, instance) VALUES($1, $2)", jsondocument, actorjson.instance) if err != nil { - logDebug.Print(err) + logWarn.Printf("Error inserting %s into `actors`: %s", uri, err) return actorjson, err }