diff --git a/ctl.go b/ctl.go index 080da81..07f12df 100644 --- a/ctl.go +++ b/ctl.go @@ -10,7 +10,7 @@ import ( "os" ) -func startctl(reportPostChan chan ReportPost) { +func startctl() { log.Print("Starting ctl listener on 127.0.0.1:5555") l, err := net.Listen("tcp", "127.0.0.1:5555") if err != nil { @@ -32,12 +32,12 @@ func startctl(reportPostChan chan ReportPost) { for { c := <-commandClient // New client connection - go handleClient(c, reportPostChan) + go handleClient(c) } } -func handleClient(commandClient net.Conn, reportPostChan chan ReportPost) { +func handleClient(commandClient net.Conn) { sizebyte := make([]byte, 4) var commandmap CommandMap @@ -79,7 +79,7 @@ func handleClient(commandClient net.Conn, reportPostChan chan ReportPost) { } else { responseback.Message = "Added: " + commandmap.Endpoint runninginstances[commandmap.Endpoint] = RunningInstance{} - go StartInstance(commandmap.Endpoint, reportPostChan) + go StartInstance(commandmap.Endpoint) } ri_mutex.Unlock() case "suspend": diff --git a/db.go b/db.go index f8e4c78..30f84f1 100644 --- a/db.go +++ b/db.go @@ -7,56 +7,6 @@ import ( "github.com/jackc/pgx/pgxpool" ) -func postHandler(reportPostChan chan ReportPost) { - for { // Write posts - v := <-reportPostChan - go writePost(v) - } -} - -func writePost(reportpost ReportPost) { - conn, err := pool.Acquire(context.Background()) - if err != nil { - log.Fatal("Error connecting to database:", err) - } - defer conn.Release() - - // Insert new account if new - var accountid int - err = conn.QueryRow(context.Background(), "INSERT INTO accounts (acct, avatar, bot, created_at, display_name, uri) VALUES($1, $2, $3, $4, $5, $6) ON CONFLICT(acct) DO UPDATE SET acct=EXCLUDED.acct RETURNING id", reportpost.Account.Acct, reportpost.Account.Avatar, reportpost.Account.Bot, reportpost.Account.Created_at, reportpost.Account.Display_name, reportpost.Account.Url).Scan(&accountid) - if err != nil { - log.Print("First ", err) - log.Print("--------------------------") - log.Print("Reported error: ", err) - log.Print("Account Acct: ", reportpost.Account.Acct) - log.Print("Account Avatar: ", reportpost.Account.Avatar) - log.Print("Account Avatar len: ", len(reportpost.Account.Avatar)) - log.Print("Account Bot: ", reportpost.Account.Bot) - log.Print("Account Created_at: ", reportpost.Account.Created_at) - log.Print("Account Display: ", reportpost.Account.Display_name) - log.Print("Account URL: ", reportpost.Account.Url) - log.Print(reportpost) - log.Print("--------------------------") - log.Fatal("Unable to write record to database") - } - - // Insert new post if new - _, err = conn.Exec(context.Background(), "INSERT INTO posts (uri, content, created_at, normalized, account_id, posthash) VALUES ($1, $2, $3, $4, $5, $6) ON CONFLICT (posthash) DO NOTHING", reportpost.Uri, reportpost.Content, reportpost.Created_at, reportpost.normalized, accountid, reportpost.posthash) - if err != nil { // For now I want to know why this failed. - log.Print("Second ", err) - log.Print("--------------------------") - log.Print("Reported error: ", err) - log.Print("Uri: ", reportpost.Uri) - log.Print("Content: ", reportpost.Content) - log.Print("Created_at: ", reportpost.Created_at) - log.Print("normalized: ", reportpost.normalized) - log.Print("account_id", accountid) - log.Print("posthash: ", reportpost.posthash) - log.Print("--------------------------") - log.Fatal("Unable to write record to database") - } -} - func getDbPool() *pgxpool.Pool { // Setup Database dbURI := fmt.Sprintf("postgres://%s:%s@%s:%d/fedilogue", settings.Database.Username, settings.Database.Password, settings.Database.Host, settings.Database.Port) diff --git a/fedilogue.go b/fedilogue.go index bb29446..03a6997 100644 --- a/fedilogue.go +++ b/fedilogue.go @@ -24,7 +24,6 @@ func startpprof() { func main() { // Initial Setup - reportPostChan := make(chan ReportPost) runninginstances = make(map[string]RunningInstance) requestconnchan = make(chan ConnRequest) @@ -46,13 +45,13 @@ func main() { _, exists := runninginstances[endpoint] if exists == false { runninginstances[endpoint] = RunningInstance{} - go StartInstance(endpoint, reportPostChan) + go StartInstance(endpoint) } ri_mutex.Unlock() } - go startctl(reportPostChan) - go webmain(reportPostChan) + go startctl() + go webmain() runtime.Goexit() } diff --git a/instance.go b/instance.go index 3999fc1..9be7382 100644 --- a/instance.go +++ b/instance.go @@ -70,7 +70,7 @@ func GetNodeInfo(endpoint string) (http.Client, NodeInfo) { return http_client, nodeinfo } -func StartInstance(endpoint string, reportPostChan chan ReportPost) { +func StartInstance(endpoint string) { http_client, nodeinfo := GetNodeInfo(endpoint) ri_mutex.Lock() m := runninginstances[endpoint] @@ -89,14 +89,13 @@ func StartInstance(endpoint string, reportPostChan chan ReportPost) { m.CaptureType = "Poll" runninginstances[endpoint] = m ri_mutex.Unlock() - PollMastodonPleroma(endpoint, reportPostChan, http_client) + PollMastodonPleroma(endpoint, http_client) } else if nodeinfo.Software.Name == "mastodon" { log.Print("Starting " + endpoint + " as " + nodeinfo.Software.Name) m.CaptureType = "Stream" runninginstances[endpoint] = m ri_mutex.Unlock() -// PollMastodonPleroma(endpoint, reportPostChan, http_client, pool) - StreamMastodon(endpoint, reportPostChan) + StreamMastodon(endpoint) } } diff --git a/poll.go b/poll.go index f4344ba..76dc151 100644 --- a/poll.go +++ b/poll.go @@ -50,7 +50,7 @@ type PostInfo struct { Content string `"json:content"` } -func PollMastodonPleroma(endpoint string, reportPostChan chan ReportPost, http_client http.Client) { +func PollMastodonPleroma(endpoint string, http_client http.Client) { newposts := make([]ReportPost, 0) min_id := "" @@ -194,7 +194,7 @@ func PollMastodonPleroma(endpoint string, reportPostChan chan ReportPost, http_c if exists == false || o.Status == KEEPALIVE { m := RunningInstance{} runninginstances[newinstance] = m - go StartInstance(newinstance, reportPostChan) + go StartInstance(newinstance) } ri_mutex.Unlock() diff --git a/stream.go b/stream.go index 5d7a9b6..d57d48d 100644 --- a/stream.go +++ b/stream.go @@ -9,7 +9,7 @@ import ( "time" ) -func StreamMastodon(endpoint string, reportPostChan chan ReportPost) { +func StreamMastodon(endpoint string) { http_client := http.Client{} var client_id string @@ -96,7 +96,7 @@ func StreamMastodon(endpoint string, reportPostChan chan ReportPost) { if exists == false || o.Status == KEEPALIVE { m := RunningInstance{} runninginstances[newinstance] = m - go StartInstance(newinstance, reportPostChan) + go StartInstance(newinstance) } ri_mutex.Unlock() diff --git a/web.go b/web.go index f77db98..66bf5b0 100644 --- a/web.go +++ b/web.go @@ -95,7 +95,7 @@ func webfinger(w http.ResponseWriter, r *http.Request) { } } -func inboxHandler(reportPostChan chan ReportPost) http.HandlerFunc { +func inboxHandler() http.HandlerFunc { return func(w http.ResponseWriter, r *http.Request) { fmt.Println("PATH --> ", r.URL.Path) @@ -216,10 +216,10 @@ func errorHandler(w http.ResponseWriter, r *http.Request) { fmt.Println("404 --> ", r.URL.Path) } -func webmain(reportPostChan chan ReportPost) { +func webmain() { http.HandleFunc("/.well-known/webfinger", webfinger) http.HandleFunc("/.well-known/host-meta", hostmeta) - http.HandleFunc("/inbox", inboxHandler(reportPostChan)) + http.HandleFunc("/inbox", inboxHandler()) http.HandleFunc("/users/fedilogue", usersFedilogue) http.HandleFunc("/users/fedilogue/followers", usersFedilogueFollowers) http.HandleFunc("/users/fedilogue/following", usersFedilogueFollowing)