From ad9fe9fad27219b010f609e509bfad10c241f38d Mon Sep 17 00:00:00 2001 From: Farhan Khan Date: Fri, 18 Dec 2020 06:06:32 +0000 Subject: [PATCH] checking user and made 'pool' global --- ctl.go | 9 ++++----- db.go | 29 ++++++++++++++++++++++++++--- fedilogue.go | 12 +++++++----- instance.go | 7 +++---- poll.go | 22 +++++++++++++++++++--- stream.go | 5 ++--- web.go | 13 ++++++------- 7 files changed, 67 insertions(+), 30 deletions(-) diff --git a/ctl.go b/ctl.go index 48cc8a1..080da81 100644 --- a/ctl.go +++ b/ctl.go @@ -8,10 +8,9 @@ import ( "log" "net" "os" - "github.com/jackc/pgx/pgxpool" ) -func startctl(reportPostChan chan ReportPost, pool *pgxpool.Pool) { +func startctl(reportPostChan chan ReportPost) { log.Print("Starting ctl listener on 127.0.0.1:5555") l, err := net.Listen("tcp", "127.0.0.1:5555") if err != nil { @@ -33,12 +32,12 @@ func startctl(reportPostChan chan ReportPost, pool *pgxpool.Pool) { for { c := <-commandClient // New client connection - go handleClient(c, reportPostChan, pool) + go handleClient(c, reportPostChan) } } -func handleClient(commandClient net.Conn, reportPostChan chan ReportPost, pool *pgxpool.Pool) { +func handleClient(commandClient net.Conn, reportPostChan chan ReportPost) { sizebyte := make([]byte, 4) var commandmap CommandMap @@ -80,7 +79,7 @@ func handleClient(commandClient net.Conn, reportPostChan chan ReportPost, pool * } else { responseback.Message = "Added: " + commandmap.Endpoint runninginstances[commandmap.Endpoint] = RunningInstance{} - go StartInstance(commandmap.Endpoint, reportPostChan, pool) + go StartInstance(commandmap.Endpoint, reportPostChan) } ri_mutex.Unlock() case "suspend": diff --git a/db.go b/db.go index 8dd68b8..df27205 100644 --- a/db.go +++ b/db.go @@ -5,16 +5,39 @@ import ( "fmt" "log" "github.com/jackc/pgx/pgxpool" + "github.com/davecgh/go-spew/spew" + "time" ) -func postHandler(reportPostChan chan ReportPost, pool *pgxpool.Pool) { +func postHandler(reportPostChan chan ReportPost) { for { // Write posts v := <-reportPostChan - go writePost(pool, v) + go writePost(v) } } -func writePost(pool *pgxpool.Pool, reportpost ReportPost) { +func check_user(uri string) (AccountType, error) { + conn, _:= pool.Acquire(context.Background()) + defer conn.Release() + + var accountData AccountType + + //var q string + //var Acct string + //err := conn.QueryRow(context.Background(), "SELECT acct, avatar, bot, created_at, display_name FROM accounts WHERE uri = $1", uri).Scan(&accountData.Acct, &(accountData.Avatar), &(q), &accountData.Created_at, &accountData.Display_name) + var timez time.Time + row := conn.QueryRow(context.Background(), "SELECT acct, avatar, bot, created_at, display_name, FROM accounts WHERE uri = $1", uri) + err := row.Scan(&accountData.Acct, &accountData.Avatar, &accountData.Bot, &timez, &accountData.Display_name) + if err != nil { + return accountData, err + } + accountData.Url = uri + accountData.Created_at = timez.Format(time.RFC3339) + spew.Dump(accountData) + return accountData, err +} + +func writePost(reportpost ReportPost) { conn, err := pool.Acquire(context.Background()) if err != nil { log.Fatal("Error connecting to database:", err) diff --git a/fedilogue.go b/fedilogue.go index f86146d..80724b9 100644 --- a/fedilogue.go +++ b/fedilogue.go @@ -8,11 +8,13 @@ import ( "regexp" "runtime" "sync" + "github.com/jackc/pgx/pgxpool" ) // Current instances var runninginstances map[string]RunningInstance var ri_mutex = &sync.Mutex{} +var pool *pgxpool.Pool func startpprof() { log.Print("Starting http/pprof on :7777") @@ -27,10 +29,10 @@ func main() { getSettings() go startpprof() - pool := getDbPool() + pool = getDbPool() for i := 0; i < settings.Database.Workers; i++ { - go postHandler(reportPostChan, pool) + go postHandler(reportPostChan) } p = bluemonday.NewPolicy() @@ -42,13 +44,13 @@ func main() { _, exists := runninginstances[endpoint] if exists == false { runninginstances[endpoint] = RunningInstance{} - go StartInstance(endpoint, reportPostChan, pool) + go StartInstance(endpoint, reportPostChan) } ri_mutex.Unlock() } - go startctl(reportPostChan, pool) - go webmain(reportPostChan, pool) + go startctl(reportPostChan) + go webmain(reportPostChan) runtime.Goexit() } diff --git a/instance.go b/instance.go index cc6f609..3999fc1 100644 --- a/instance.go +++ b/instance.go @@ -9,7 +9,6 @@ import ( "regexp" "strings" "time" - "github.com/jackc/pgx/pgxpool" ) var p *bluemonday.Policy @@ -71,7 +70,7 @@ func GetNodeInfo(endpoint string) (http.Client, NodeInfo) { return http_client, nodeinfo } -func StartInstance(endpoint string, reportPostChan chan ReportPost, pool *pgxpool.Pool) { +func StartInstance(endpoint string, reportPostChan chan ReportPost) { http_client, nodeinfo := GetNodeInfo(endpoint) ri_mutex.Lock() m := runninginstances[endpoint] @@ -90,14 +89,14 @@ func StartInstance(endpoint string, reportPostChan chan ReportPost, pool *pgxpoo m.CaptureType = "Poll" runninginstances[endpoint] = m ri_mutex.Unlock() - PollMastodonPleroma(endpoint, reportPostChan, http_client, pool) + PollMastodonPleroma(endpoint, reportPostChan, http_client) } else if nodeinfo.Software.Name == "mastodon" { log.Print("Starting " + endpoint + " as " + nodeinfo.Software.Name) m.CaptureType = "Stream" runninginstances[endpoint] = m ri_mutex.Unlock() // PollMastodonPleroma(endpoint, reportPostChan, http_client, pool) - StreamMastodon(endpoint, reportPostChan, pool) + StreamMastodon(endpoint, reportPostChan) } } diff --git a/poll.go b/poll.go index 68b74a5..e0958d3 100644 --- a/poll.go +++ b/poll.go @@ -10,7 +10,6 @@ import ( "net/http" "strings" "time" - "github.com/jackc/pgx/pgxpool" ) type ImageData struct { @@ -54,6 +53,23 @@ type PostInfo struct { func fetch_user_info(http_client http.Client, uri string) (UserInfo, error) { var userinfo UserInfo + accounttype, err := check_user(uri) + if err == nil { + userinfo.Id = uri + userinfo.Type = uri + if accounttype.Bot { + userinfo.Type = "Bot" + } else { + userinfo.Type = "Person" + } + userinfo.PreferredUsername = accounttype.Display_name +// userInfo.Url = Icon = +// userInfo.iconData = ImageData + userinfo.Icon.Type = "Image" + userinfo.Icon.Url = accounttype.Avatar + log.Print("This exit path!!!") + return userinfo, nil + } req, err := http.NewRequest(http.MethodGet, uri, nil) if err != nil { @@ -100,7 +116,7 @@ func fetch_post(http_client http.Client, uri string) (PostInfo, error) { return postinfo, nil } -func PollMastodonPleroma(endpoint string, reportPostChan chan ReportPost, http_client http.Client, pool *pgxpool.Pool) { +func PollMastodonPleroma(endpoint string, reportPostChan chan ReportPost, http_client http.Client) { newposts := make([]ReportPost, 0) min_id := "" @@ -313,7 +329,7 @@ func PollMastodonPleroma(endpoint string, reportPostChan chan ReportPost, http_c if exists == false || o.Status == KEEPALIVE { m := RunningInstance{} runninginstances[newinstance] = m - go StartInstance(newinstance, reportPostChan, pool) + go StartInstance(newinstance, reportPostChan) } ri_mutex.Unlock() diff --git a/stream.go b/stream.go index e44f140..b8d43ba 100644 --- a/stream.go +++ b/stream.go @@ -10,10 +10,9 @@ import ( "net/http" "strings" "time" - "github.com/jackc/pgx/pgxpool" ) -func StreamMastodon(endpoint string, reportPostChan chan ReportPost, pool *pgxpool.Pool) { +func StreamMastodon(endpoint string, reportPostChan chan ReportPost) { http_client := http.Client{} var client_id string @@ -174,7 +173,7 @@ func StreamMastodon(endpoint string, reportPostChan chan ReportPost, pool *pgxpo if exists == false || o.Status == KEEPALIVE { m := RunningInstance{} runninginstances[newinstance] = m - go StartInstance(newinstance, reportPostChan, pool) + go StartInstance(newinstance, reportPostChan) } ri_mutex.Unlock() diff --git a/web.go b/web.go index 8ca2154..cc56389 100644 --- a/web.go +++ b/web.go @@ -11,7 +11,6 @@ import ( "os" "strings" "time" - "github.com/jackc/pgx/pgxpool" ) // CreateObject - Used by post web receiver @@ -23,11 +22,10 @@ type CreateObject struct { Type string `json:"type"` } -// CreateObject - Used by post web receiver -type FindType struct { +// RelayBase - The base object used by web receiver +type RelayBase struct { Actor string `json:"actor"` Cc []string `json:"cc"` - //Object interface{} `json:"Object"` Object json.RawMessage `json:"Object"` ID string `json:"id"` Published string `json:"published"` @@ -90,7 +88,6 @@ func webfinger(w http.ResponseWriter, r *http.Request) { } w.Header().Set("Content-Type", "application/json; charset=utf-8") fmt.Fprintf(w, webfingerstr) - fmt.Println("Writes properly") } else { fmt.Println(query) w.WriteHeader(http.StatusNotFound) @@ -109,7 +106,7 @@ func inboxHandler(reportPostChan chan ReportPost) http.HandlerFunc { return } - var findtype FindType + var findtype RelayBase err = json.Unmarshal(body, &findtype) fmt.Println(string(body)) @@ -143,7 +140,9 @@ func inboxHandler(reportPostChan chan ReportPost) http.HandlerFunc { ri_mutex.Unlock() // This only needs to be done if the user does not exist in the database + log.Print("Actor object: ", createobject.Actor) realuser, err := fetch_user_info(o.client, createobject.Actor) + _, _ = check_user(createobject.Actor) if err != nil { return } @@ -261,7 +260,7 @@ func errorHandler(w http.ResponseWriter, r *http.Request) { fmt.Println("404 --> ", r.URL.Path) } -func webmain(reportPostChan chan ReportPost, pool *pgxpool.Pool) { +func webmain(reportPostChan chan ReportPost) { http.HandleFunc("/.well-known/webfinger", webfinger) http.HandleFunc("/.well-known/host-meta", hostmeta) http.HandleFunc("/inbox", inboxHandler(reportPostChan))