Fully migrated to jsonb, adjusted uniquefifo to keep cache fresh

* Added fifo mechanism to actors table
* Increased fifo size to 10
* Still getting some database insert duplicate errors, but only for
  very active instances when they are newly identified
This commit is contained in:
farhan 2021-02-02 23:34:43 +00:00
parent f262de1dc3
commit a122f72af7
6 changed files with 45 additions and 74 deletions

View File

@ -1,5 +1,5 @@
FEDILOGUE_GOFILES = fedilogue.go ctl.go headers.go instance.go poll.go stream.go web.go db.go config.go oauth.go retrieve.go log.go fifo.go FEDILOGUE_GOFILES = fedilogue.go ctl.go headers.go instance.go poll.go stream.go web.go db.go config.go oauth.go retrieve.go log.go uniquefifo.go
FEDICTL_GOFILES = fedictl.go headers.go log.go fifo.go FEDICTL_GOFILES = fedictl.go headers.go log.go uniquefifo.go
build: build:
go build -o fedilogue $(FEDILOGUE_GOFILES) go build -o fedilogue $(FEDILOGUE_GOFILES)

3
ctl.go
View File

@ -76,7 +76,8 @@ func handleClient(commandClient net.Conn) {
} else { } else {
responseback.Message = "Adding: " + commandmap.Endpoint responseback.Message = "Adding: " + commandmap.Endpoint
o := RunningInstance{} o := RunningInstance{}
o.recenturis = newfifo(5) o.recentactivities = newUniqueFifo(10)
o.recentactors = newUniqueFifo(10)
o.client = BuildClient(commandmap.Endpoint) o.client = BuildClient(commandmap.Endpoint)
runninginstances[commandmap.Endpoint] = o runninginstances[commandmap.Endpoint] = o
go StartInstance(commandmap.Endpoint) go StartInstance(commandmap.Endpoint)

45
fifo.go
View File

@ -1,45 +0,0 @@
package main
import (
"sync"
)
type Fifo struct {
slice []string
mu sync.Mutex
size int
}
func newfifo(size int) *Fifo {
q := Fifo{}
q.slice = make([]string, 0)
q.size = size
return &q
}
func (q *Fifo) Add(v string) int {
q.mu.Lock()
i := q.Contains(v)
if i == -1 {
if len(q.slice) >= q.size {
q.DeleteFirst()
}
q.slice = append(q.slice, v)
}
q.mu.Unlock()
return i
}
func (q *Fifo) Contains(v string) int {
for i, val := range q.slice {
if val == v {
return i
}
}
return -1
}
func (q *Fifo) DeleteFirst() string {
a := q.slice[0]
q.slice = q.slice[1:]
return a
}

View File

@ -59,7 +59,8 @@ type RunningInstance struct {
client http.Client client http.Client
client_id string client_id string
client_secret string client_secret string
recenturis *Fifo recentactivities *UniqueFifo
recentactors *UniqueFifo
} }
type NodeInfoSoftware struct { type NodeInfoSoftware struct {

View File

@ -63,7 +63,8 @@ func GetRunner(endpoint string) (RunningInstance, bool) {
o = RunningInstance{} o = RunningInstance{}
o.client = BuildClient(endpoint) o.client = BuildClient(endpoint)
o.Status = KEEPALIVE o.Status = KEEPALIVE
o.recenturis = newfifo(5) o.recentactivities = newUniqueFifo(10)
o.recentactors = newUniqueFifo(10)
runninginstances[endpoint] = o runninginstances[endpoint] = o
} }
ri_mutex.Unlock() ri_mutex.Unlock()
@ -174,7 +175,8 @@ func CheckInstance(newinstance string, callerEndpoint string) {
if exists == false || o.Status == KEEPALIVE { if exists == false || o.Status == KEEPALIVE {
m := RunningInstance{} m := RunningInstance{}
m.client = BuildClient(newinstance) m.client = BuildClient(newinstance)
m.recenturis = newfifo(5) m.recentactivities = newUniqueFifo(10)
m.recentactors = newUniqueFifo(10)
runninginstances[newinstance] = m runninginstances[newinstance] = m
go StartInstance(newinstance) go StartInstance(newinstance)
} }

View File

@ -28,7 +28,8 @@ type PublicKeyType struct {
} }
type ActorJson struct { type ActorJson struct {
ID string `json:"id"` id int
uri string `json:"id"`
Type string `json:"type"` Type string `json:"type"`
Inbox string `json:"inbox"` Inbox string `json:"inbox"`
Outbox string `json:"outbox"` Outbox string `json:"outbox"`
@ -46,7 +47,8 @@ type ActorJson struct {
} }
type PostJson struct { type PostJson struct {
ID string `json:"id"` id int
uri string `json:"id"`
InReplyTo string `json:"inReplyTo"` InReplyTo string `json:"inReplyTo"`
normalized string normalized string
@ -87,15 +89,20 @@ func check_activity(uri string) (PostJson, error) {
o, _ := GetRunner(activityjson.instance) o, _ := GetRunner(activityjson.instance)
// Check if there were any recent requests on this // Check if there were any recent requests on this
if o.recenturis.Add(uri) != -1 { o.recentactivities.mu.Lock()
if o.recentactivities.Add(uri) == true {
o.recentactivities.mu.Unlock()
return activityjson, errors.New("Recently requested within local cache") return activityjson, errors.New("Recently requested within local cache")
} }
o.recentactivities.mu.Unlock()
var jsondocument string var jsondocument string
jsonmap := make(map[string]interface{})
selectRet := pool.QueryRow(context.Background(), "SELECT document, normalized FROM activities WHERE document->'id' = $1", uri) selectRet := pool.QueryRow(context.Background(), "SELECT id, document FROM activities WHERE document->>'id' = $1", uri)
err := selectRet.Scan(&activityjson.ID, &jsondocument, &activityjson.normalized) err := selectRet.Scan(&activityjson.id, &jsonmap)
if err == nil { if err == nil {
/////////// BETTER RETURN VALUES!!!!!
return activityjson, nil return activityjson, nil
} }
@ -113,9 +120,7 @@ func check_activity(uri string) (PostJson, error) {
return activityjson, errors.New("Read error on " + uri) return activityjson, errors.New("Read error on " + uri)
} }
resp.Body.Close() resp.Body.Close()
jsondocument = string(body) jsondocument = string(body)
err = json.Unmarshal(body, &activityjson) err = json.Unmarshal(body, &activityjson)
if err != nil { if err != nil {
return activityjson, err return activityjson, err
@ -123,7 +128,7 @@ func check_activity(uri string) (PostJson, error) {
if activityjson.InReplyTo != "" && activityjson.InReplyTo != uri { if activityjson.InReplyTo != "" && activityjson.InReplyTo != uri {
if activityjson.InReplyTo != uri { if activityjson.InReplyTo != uri {
go check_actor(activityjson.InReplyTo) go check_activity(activityjson.InReplyTo)
} }
} }
@ -145,7 +150,7 @@ func check_activity(uri string) (PostJson, error) {
_, err = pool.Exec(context.Background(), "INSERT INTO activities (document, normalized, instance) VALUES($1, $2, $3)", jsondocument, activityjson.normalized, activityjson.instance) _, err = pool.Exec(context.Background(), "INSERT INTO activities (document, normalized, instance) VALUES($1, $2, $3)", jsondocument, activityjson.normalized, activityjson.instance)
if err != nil { if err != nil {
logDebug.Print(err) logWarn.Printf("Error inserting %s into `activities`: %s", uri, err)
return activityjson, err return activityjson, err
} }
@ -164,26 +169,34 @@ func check_activity(uri string) (PostJson, error) {
func check_actor(uri string) (ActorJson, error) { func check_actor(uri string) (ActorJson, error) {
var actorjson ActorJson var actorjson ActorJson
endslash := strings.Index(uri[8:], "/")
if endslash == -1 {
return actorjson, errors.New("Invalid user: " + uri)
}
actorjson.instance = uri[8 : endslash+8]
for _, banned := range settings.Banned { for _, banned := range settings.Banned {
if strings.Index(uri, "https://"+banned+"/") == 0 { if strings.Index(uri, "https://"+banned+"/") == 0 {
return actorjson, errors.New("Banned instance") return actorjson, errors.New("Banned instance")
} }
} }
var jsondocument string // Check if there were any recent requests on this
selectRet := pool.QueryRow(context.Background(), "SELECT document, instance FROM actors WHERE document->'id' = $1", uri) o, _ := GetRunner(actorjson.instance)
err := selectRet.Scan(&actorjson.ID, &jsondocument, &actorjson.instance) o.recentactors.mu.Lock()
if o.recentactors.Add(uri) == true {
o.recentactors.mu.Unlock()
return actorjson, errors.New("Recently requested actor within local cache")
}
o.recentactors.mu.Unlock()
jsonmap := make(map[string]interface{})
selectRet := pool.QueryRow(context.Background(), "SELECT id, document, instance FROM actors WHERE document->>'id' = $1", uri)
err := selectRet.Scan(&actorjson.id, &jsonmap, &actorjson.instance)
if err == nil { if err == nil {
///////// BETTER RETURN VALUES ////////
return actorjson, nil return actorjson, nil
} }
endslash := strings.Index(uri[8:], "/")
if endslash == -1 {
return actorjson, errors.New("Invalid user: " + uri)
}
actorjson.instance = uri[8 : endslash+8]
logDebug.Print("CHECK: " + uri)
o, _ := GetRunner(actorjson.instance)
req, _ := http.NewRequest("GET", uri, nil) req, _ := http.NewRequest("GET", uri, nil)
req.Header.Set("User-Agent", "Tusky") req.Header.Set("User-Agent", "Tusky")
req.Header.Add("Accept", "application/ld+json") req.Header.Add("Accept", "application/ld+json")
@ -211,8 +224,7 @@ logDebug.Print("CHECK: " + uri)
} }
resp.Body.Close() resp.Body.Close()
jsondocument = string(body) jsondocument := string(body)
//logDebug.Print(string(jsondocument))
err = json.Unmarshal(body, &actorjson) err = json.Unmarshal(body, &actorjson)
if err != nil { if err != nil {
@ -221,7 +233,7 @@ logDebug.Print("CHECK: " + uri)
_, err = pool.Exec(context.Background(), "INSERT INTO actors (document, instance) VALUES($1, $2)", jsondocument, actorjson.instance) _, err = pool.Exec(context.Background(), "INSERT INTO actors (document, instance) VALUES($1, $2)", jsondocument, actorjson.instance)
if err != nil { if err != nil {
logDebug.Print(err) logWarn.Printf("Error inserting %s into `actors`: %s", uri, err)
return actorjson, err return actorjson, err
} }