changed to pool from connection, few error fixes
This commit is contained in:
parent
eafb3b9318
commit
63bc88324b
@ -15,7 +15,6 @@ import (
|
|||||||
var runninginstances map[string]RunningInstance
|
var runninginstances map[string]RunningInstance
|
||||||
var ri_mutex = &sync.Mutex{}
|
var ri_mutex = &sync.Mutex{}
|
||||||
var pool *pgxpool.Pool
|
var pool *pgxpool.Pool
|
||||||
var requestconnchan chan ConnRequest
|
|
||||||
|
|
||||||
func startpprof() {
|
func startpprof() {
|
||||||
log.Print("Starting http/pprof on :7777")
|
log.Print("Starting http/pprof on :7777")
|
||||||
@ -25,17 +24,12 @@ func startpprof() {
|
|||||||
func main() {
|
func main() {
|
||||||
// Initial Setup
|
// Initial Setup
|
||||||
runninginstances = make(map[string]RunningInstance)
|
runninginstances = make(map[string]RunningInstance)
|
||||||
requestconnchan = make(chan ConnRequest)
|
|
||||||
|
|
||||||
getSettings()
|
getSettings()
|
||||||
go startpprof()
|
go startpprof()
|
||||||
|
|
||||||
pool = getDbPool()
|
pool = getDbPool()
|
||||||
|
|
||||||
for i := 0; i < settings.Database.Workers; i++ {
|
|
||||||
go requestConn()
|
|
||||||
}
|
|
||||||
|
|
||||||
p = bluemonday.NewPolicy()
|
p = bluemonday.NewPolicy()
|
||||||
spaceReg = regexp.MustCompile(`\s+`)
|
spaceReg = regexp.MustCompile(`\s+`)
|
||||||
|
|
||||||
|
67
retrieve.go
67
retrieve.go
@ -68,15 +68,6 @@ type ConnRequest struct {
|
|||||||
b chan bool
|
b chan bool
|
||||||
}
|
}
|
||||||
|
|
||||||
func requestConn() {
|
|
||||||
conn, _:= pool.Acquire(context.Background())
|
|
||||||
defer conn.Release()
|
|
||||||
for connRequest := range requestconnchan {
|
|
||||||
connRequest.conn <-conn
|
|
||||||
_ = <-connRequest.b
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func GetHTTPSession(endpoint string) (RunningInstance) {
|
func GetHTTPSession(endpoint string) (RunningInstance) {
|
||||||
ri_mutex.Lock()
|
ri_mutex.Lock()
|
||||||
o, exist := runninginstances[endpoint]
|
o, exist := runninginstances[endpoint]
|
||||||
@ -95,19 +86,10 @@ func GetHTTPSession(endpoint string) (RunningInstance) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func check_post(uri string) (PostJson, error) {
|
func check_post(uri string) (PostJson, error) {
|
||||||
connrequest := ConnRequest{}
|
|
||||||
connrequest.conn = make(chan *pgxpool.Conn)
|
|
||||||
connrequest.b = make(chan bool)
|
|
||||||
requestconnchan <- connrequest
|
|
||||||
|
|
||||||
myconn := <-connrequest.conn
|
|
||||||
|
|
||||||
var postjson PostJson
|
var postjson PostJson
|
||||||
|
|
||||||
selectRet := myconn.QueryRow(context.Background(), "SELECT id, inReplyTo, published, summary, content, normalized, attributedto, posthash, received_at FROM posts WHERE id = $1", uri)
|
selectRet := pool.QueryRow(context.Background(), "SELECT id, inReplyTo, published, summary, content, normalized, attributedto, posthash, received_at FROM posts WHERE id = $1", uri)
|
||||||
err := selectRet.Scan(&postjson.ID, &postjson.InReplyTo, &postjson.Published, &postjson.Summary, &postjson.Content, &postjson.normalized, &postjson.AttributedTo, &postjson.posthash, &postjson.receivedAt)
|
err := selectRet.Scan(&postjson.ID, &postjson.InReplyTo, &postjson.Published, &postjson.Summary, &postjson.Content, &postjson.normalized, &postjson.AttributedTo, &postjson.posthash, &postjson.receivedAt)
|
||||||
close(connrequest.b)
|
|
||||||
|
|
||||||
if err == nil {
|
if err == nil {
|
||||||
return postjson, nil
|
return postjson, nil
|
||||||
}
|
}
|
||||||
@ -115,7 +97,6 @@ func check_post(uri string) (PostJson, error) {
|
|||||||
if endslash == -1 {
|
if endslash == -1 {
|
||||||
return postjson, nil
|
return postjson, nil
|
||||||
}
|
}
|
||||||
log.Print("Was: " + uri, " ", endslash)
|
|
||||||
postjson.instance = uri[8:endslash+8]
|
postjson.instance = uri[8:endslash+8]
|
||||||
|
|
||||||
|
|
||||||
@ -138,8 +119,9 @@ func check_post(uri string) (PostJson, error) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
if postjson.InReplyTo != "" && postjson.InReplyTo != uri {
|
if postjson.InReplyTo != "" && postjson.InReplyTo != uri {
|
||||||
log.Print("GOING INTO NEW POST: " + postjson.InReplyTo + " " + uri)
|
if postjson.InReplyTo != uri {
|
||||||
go check_post(postjson.InReplyTo)
|
go check_post(postjson.InReplyTo)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// If AttributedTo is blank, this is likely an authentication failure
|
// If AttributedTo is blank, this is likely an authentication failure
|
||||||
@ -148,20 +130,16 @@ func check_post(uri string) (PostJson, error) {
|
|||||||
if postjson.AttributedTo == "" {
|
if postjson.AttributedTo == "" {
|
||||||
return postjson, nil
|
return postjson, nil
|
||||||
}
|
}
|
||||||
check_user(postjson.AttributedTo) // This must be done BEFORE the `INSERT INTO posts` below
|
|
||||||
|
_, err = check_user(postjson.AttributedTo) // This must be done BEFORE the `INSERT INTO posts` below
|
||||||
|
if err != nil {
|
||||||
|
return postjson, err
|
||||||
|
}
|
||||||
|
|
||||||
postjson.normalized = html.UnescapeString(strings.ToLower(p.Sanitize(postjson.Content)))
|
postjson.normalized = html.UnescapeString(strings.ToLower(p.Sanitize(postjson.Content)))
|
||||||
spaceReg.ReplaceAllString(postjson.normalized, " ")
|
spaceReg.ReplaceAllString(postjson.normalized, " ")
|
||||||
|
|
||||||
connrequest = ConnRequest{}
|
_, err = pool.Exec(context.Background(), "INSERT INTO posts (id, inreplyto, published, summary, content, normalized, attributedto, posthash, instance) VALUES($1, $2, $3, $4, $5, $6, $7, $8, $9)", postjson.ID, postjson.InReplyTo, postjson.Published, postjson.Summary, postjson.Content, postjson.normalized, postjson.AttributedTo, postjson.posthash, postjson.instance)
|
||||||
connrequest.conn = make(chan *pgxpool.Conn)
|
|
||||||
connrequest.b = make(chan bool)
|
|
||||||
requestconnchan <- connrequest
|
|
||||||
|
|
||||||
myconn = <-connrequest.conn
|
|
||||||
|
|
||||||
_, err = myconn.Exec(context.Background(), "INSERT INTO posts (id, inreplyto, published, summary, content, normalized, attributedto, posthash, instance) VALUES($1, $2, $3, $4, $5, $6, $7, $8, $9)", postjson.ID, postjson.InReplyTo, postjson.Published, postjson.Summary, postjson.Content, postjson.normalized, postjson.AttributedTo, postjson.posthash, postjson.instance)
|
|
||||||
close(connrequest.b)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Print("INSERT posts error of " + uri + ": ", err)
|
log.Print("INSERT posts error of " + uri + ": ", err)
|
||||||
return postjson, err
|
return postjson, err
|
||||||
@ -177,25 +155,17 @@ func check_post(uri string) (PostJson, error) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func check_user(uri string) (UserJson, error) {
|
func check_user(uri string) (UserJson, error) {
|
||||||
connrequest := ConnRequest{}
|
|
||||||
connrequest.conn = make(chan *pgxpool.Conn)
|
|
||||||
connrequest.b = make(chan bool)
|
|
||||||
requestconnchan <- connrequest
|
|
||||||
|
|
||||||
myconn := <-connrequest.conn
|
|
||||||
|
|
||||||
var userjson UserJson
|
var userjson UserJson
|
||||||
|
|
||||||
selectRet := myconn.QueryRow(context.Background(), "SELECT id, actor_type, inbox, outbox, followers, following, url, preferredUsername, name, summary, icon, image, publicKey, instance FROM accounts WHERE id = $1", uri)
|
selectRet := pool.QueryRow(context.Background(), "SELECT id, actor_type, inbox, outbox, followers, following, url, preferredUsername, name, summary, icon, image, publicKey, instance FROM accounts WHERE id = $1", uri)
|
||||||
err := selectRet.Scan(&userjson.ID, &userjson.Type, &userjson.Inbox, &userjson.Outbox, &userjson.Followers, &userjson.Following, &userjson.Url, &userjson.PreferredUsername, &userjson.Name, &userjson.Summary, &userjson.Icon.Url, &userjson.Image.Url, &userjson.PublicKey.PublicKeyPem, &userjson.instance)
|
err := selectRet.Scan(&userjson.ID, &userjson.Type, &userjson.Inbox, &userjson.Outbox, &userjson.Followers, &userjson.Following, &userjson.Url, &userjson.PreferredUsername, &userjson.Name, &userjson.Summary, &userjson.Icon.Url, &userjson.Image.Url, &userjson.PublicKey.PublicKeyPem, &userjson.instance)
|
||||||
close(connrequest.b)
|
|
||||||
|
|
||||||
if err == nil {
|
if err == nil {
|
||||||
return userjson, nil
|
return userjson, nil
|
||||||
}
|
}
|
||||||
log.Print("The URI: " + uri)
|
|
||||||
endslash := strings.Index(uri[8:], "/")
|
endslash := strings.Index(uri[8:], "/")
|
||||||
log.Print("on " + uri + " ", endslash)
|
if endslash == -1 {
|
||||||
|
return userjson, nil
|
||||||
|
}
|
||||||
userjson.instance = uri[8:endslash+8]
|
userjson.instance = uri[8:endslash+8]
|
||||||
|
|
||||||
o := GetHTTPSession(userjson.instance)
|
o := GetHTTPSession(userjson.instance)
|
||||||
@ -214,14 +184,7 @@ func check_user(uri string) (UserJson, error) {
|
|||||||
return userjson, err
|
return userjson, err
|
||||||
}
|
}
|
||||||
|
|
||||||
connrequest = ConnRequest{}
|
_, err = pool.Exec(context.Background(), "INSERT INTO accounts (id, actor_type, inbox, outbox, followers, following, url, preferredUsername, name, summary, icon, image, publicKey, instance) VALUES($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14)", userjson.ID, userjson.Type, userjson.Inbox, userjson.Outbox, userjson.Followers, userjson.Following, userjson.Url, userjson.PreferredUsername, userjson.Name, userjson.Summary, userjson.Icon.Url, userjson.Image.Url, userjson.PublicKey.PublicKeyPem, userjson.instance)
|
||||||
connrequest.conn = make(chan *pgxpool.Conn)
|
|
||||||
connrequest.b = make(chan bool)
|
|
||||||
requestconnchan <- connrequest
|
|
||||||
|
|
||||||
myconn = <-connrequest.conn
|
|
||||||
_, err = myconn.Exec(context.Background(), "INSERT INTO accounts (id, actor_type, inbox, outbox, followers, following, url, preferredUsername, name, summary, icon, image, publicKey, instance) VALUES($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14)", userjson.ID, userjson.Type, userjson.Inbox, userjson.Outbox, userjson.Followers, userjson.Following, userjson.Url, userjson.PreferredUsername, userjson.Name, userjson.Summary, userjson.Icon.Url, userjson.Image.Url, userjson.PublicKey.PublicKeyPem, userjson.instance)
|
|
||||||
close(connrequest.b)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Print("INSERT accounts error: ", err)
|
log.Print("INSERT accounts error: ", err)
|
||||||
return userjson, err
|
return userjson, err
|
||||||
|
Loading…
x
Reference in New Issue
Block a user