diff --git a/poll/cli.go b/poll/ctl.go similarity index 74% rename from poll/cli.go rename to poll/ctl.go index dd0d83a..a5760c7 100644 --- a/poll/cli.go +++ b/poll/ctl.go @@ -8,8 +8,38 @@ import ( "log" "io" "os" + "github.com/microcosm-cc/bluemonday" ) +func startctl(reportPostChan chan ReportPost) { + p = bluemonday.NewPolicy() + + log.Print("Starting ctl listener on 127.0.0.1:5555") + l, err := net.Listen("tcp", "127.0.0.1:5555") + if err != nil { + log.Fatal("Unable to start listener:", err) + } + defer l.Close() + + commandClient := make(chan net.Conn) + + go func(l net.Listener) { + for { + c, err := l.Accept() + if err != nil { + log.Fatal("Error on accept", err) + } + commandClient <- c + } + }(l) + + for { + c := <-commandClient // New client connection + go handleClient(c, reportPostChan) + } + +} + func handleClient(commandClient net.Conn, reportPostChan chan ReportPost) { sizebyte := make([]byte, 4) @@ -28,18 +58,15 @@ func handleClient(commandClient net.Conn, reportPostChan chan ReportPost) { jsonbyte := make([]byte, jsonsize) n, err = io.ReadFull(commandClient, jsonbyte) if err != nil { - fmt.Println("Unable to unmarshal") - os.Exit(1) + log.Fatal("Unable to unmarshal") } if n != jsonsize { - fmt.Println("Failed to read json size of ", n) - os.Exit(1) + log.Fatal("Failed to read json size of ", n) } err = json.Unmarshal(jsonbyte, &commandmap) if err != nil { - fmt.Println("Unable to unmarshal") - os.Exit(1) + log.Fatal("Unable to unmarshal") } switch commandmap.Type { diff --git a/poll/db.go b/poll/db.go new file mode 100644 index 0000000..ec05271 --- /dev/null +++ b/poll/db.go @@ -0,0 +1,59 @@ +package main + +import ( + "github.com/jackc/pgx/pgxpool" + "context" + "log" +) + +func writePost(pool *pgxpool.Pool, reportpost ReportPost) { + conn, err := pool.Acquire(context.Background()) + if err != nil { + log.Fatal("Error connecting to database:", err) + } + defer conn.Release() + + // Insert new account if new + var accountid int + err = conn.QueryRow(context.Background(), "INSERT INTO accounts (acct, avatar, bot, created_at, display_name, url) VALUES($1, $2, $3, $4, $5, $6) ON CONFLICT(acct) DO UPDATE SET acct=EXCLUDED.acct RETURNING id", reportpost.Account.Acct, reportpost.Account.Avatar, reportpost.Account.Bot, reportpost.Account.Created_at, reportpost.Account.Display_name, reportpost.Account.Url).Scan(&accountid) + if err != nil { + log.Print("First ", err) + log.Print("--------------------------") + log.Print("Reported error: ", err) + log.Print("Account Acct: ", reportpost.Account.Acct) + log.Print("Account Avatar: ", reportpost.Account.Avatar) + log.Print("Account Avatar len: ", len(reportpost.Account.Avatar)) + log.Print("Account Bot: ", reportpost.Account.Bot) + log.Print("Account Created_at: ", reportpost.Account.Created_at) + log.Print("Account Display: ", reportpost.Account.Display_name) + log.Print("Account URL: ", reportpost.Account.Url) + log.Print(reportpost) + log.Print("--------------------------") + log.Fatal("Unable to write record to database") + } + + // Insert new post if new + _, err = conn.Exec(context.Background(), "INSERT INTO posts (url, content, created_at, normalized, account_id, posthash) VALUES ($1, $2, $3, $4, $5, $6) ON CONFLICT (posthash) DO NOTHING", reportpost.Url, reportpost.Content, reportpost.Created_at, reportpost.normalized, accountid, reportpost.posthash) + if err != nil { // For now I want to know why this failed. + log.Print("Second ", err) + log.Print("--------------------------") + log.Print("Reported error: ", err) + log.Print("Url: ", reportpost.Url) + log.Print("Content: ", reportpost.Content) + log.Print("Created_at: ", reportpost.Created_at) + log.Print("normalized: ", reportpost.normalized) + log.Print("account_id", accountid) + log.Print("posthash: ", reportpost.posthash) + log.Print("--------------------------") + log.Fatal("Unable to write record to database") + } +} + +func get_db_pool() (*pgxpool.Pool) { + // Setup Database + pool, err := pgxpool.Connect(context.Background(), "postgres://postgres@127.0.0.1/fedilogue") + if err != nil { + log.Fatal("Unable to connect to database:", err) + } + return pool +} diff --git a/poll/instance.go b/poll/instance.go index 0cf9c06..47ed17e 100644 --- a/poll/instance.go +++ b/poll/instance.go @@ -10,6 +10,7 @@ import ( "html" "time" "fmt" + "log" ) var p *bluemonday.Policy @@ -166,23 +167,13 @@ func GetNodeInfo(endpoint string) (NodeInfo) { indexstr := string(indexbin) if strings.Contains(indexstr, "Pleroma") { nodeinfo.Software.Name = "pleroma" - fmt.Println("It is Pleroma: " + endpoint) } else if strings.Contains(indexstr, "Mastodon") { nodeinfo.Software.Name = "mastodon" - fmt.Println("It is Mastodon: " + endpoint) } else { return NodeInfo{} } } - /* - err = json.NewDecoder(resp.Body).Decode(&nodeinfo) - if err != nil { - fmt.Println("Error Message 2:", resp.StatusCode, err, endpoint, resp.Status, api_nodeinfo) - return NodeInfo{} - } - */ - return nodeinfo } @@ -200,19 +191,8 @@ func StartInstance(endpoint string, reportPostChan chan ReportPost) { } if nodeinfo.Software.Name == "pleroma" || nodeinfo.Software.Name == "mastodon" { + log.Print("Starting " + endpoint + " as Mastodon/Pleroma instance") go PollMastodonPleroma(endpoint, reportPostChan) } } - -/* -func SuspendInstance(suspendinstance InstanceReport) { - for i, runninginstance := range runninginstances { - if runninginstance.Endpoint == suspendinstance.endpoint { - (runninginstances)[i].Status = suspendinstance.status - (runninginstances)[i].LastRun = time.Now().Format("2006.01.02-15:04:05") - return - } - } -} -*/ diff --git a/poll/main.go b/poll/main.go index 3a7d5fd..788be7e 100644 --- a/poll/main.go +++ b/poll/main.go @@ -1,127 +1,33 @@ package main import ( - "github.com/microcosm-cc/bluemonday" - "github.com/jackc/pgx/pgxpool" _ "net/http/pprof" "net/http" - "context" "sync" - "fmt" - "net" "log" - "os" ) -func AppendIfMissing(hay []string, needle string) []string { - for _, ele := range hay { - if ele == needle { - return hay - } - } - return append(hay, needle) -} - -func writePost(pool *pgxpool.Pool, reportpost ReportPost) { - conn, err := pool.Acquire(context.Background()) - if err != nil { - log.Fatal("Error connecting to database:", err) - os.Exit(1) - } - defer conn.Release() - - // Insert new account if new - var accountid int - err = conn.QueryRow(context.Background(), "INSERT INTO accounts (acct, avatar, bot, created_at, display_name, url) VALUES($1, $2, $3, $4, $5, $6) ON CONFLICT(acct) DO UPDATE SET acct=EXCLUDED.acct RETURNING id", reportpost.Account.Acct, reportpost.Account.Avatar, reportpost.Account.Bot, reportpost.Account.Created_at, reportpost.Account.Display_name, reportpost.Account.Url).Scan(&accountid) - if err != nil { - fmt.Println("First ", err) - fmt.Println("--------------------------") - fmt.Println("Reported error: ", err) - fmt.Println("Account Acct: ", reportpost.Account.Acct) - fmt.Println("Account Avatar: ", reportpost.Account.Avatar) - fmt.Println("Account Avatar len: ", len(reportpost.Account.Avatar)) - fmt.Println("Account Bot: ", reportpost.Account.Bot) - fmt.Println("Account Created_at: ", reportpost.Account.Created_at) - fmt.Println("Account Display: ", reportpost.Account.Display_name) - fmt.Println("Account URL: ", reportpost.Account.Url) - fmt.Println(reportpost) - fmt.Println("--------------------------") - os.Exit(1) // For now I want this to die and learn why it failed - return - } - - // Insert new post if new - _, err = conn.Exec(context.Background(), "INSERT INTO posts (url, content, created_at, normalized, account_id, posthash) VALUES ($1, $2, $3, $4, $5, $6) ON CONFLICT (posthash) DO NOTHING", reportpost.Url, reportpost.Content, reportpost.Created_at, reportpost.normalized, accountid, reportpost.posthash) - if err != nil { // For now I want to know why this failed. - fmt.Println("Second ", err) - fmt.Println("--------------------------") - fmt.Println("Reported error: ", err) - fmt.Println("Url: ", reportpost.Url) - fmt.Println("Content: ", reportpost.Content) - fmt.Println("Created_at: ", reportpost.Created_at) - fmt.Println("normalized: ", reportpost.normalized) - fmt.Println("account_id", accountid) - fmt.Println("posthash: ", reportpost.posthash) - fmt.Println("--------------------------") - os.Exit(1) // For now I want this to die and learn why it failed - return - } -} - // Current instances var runninginstances map[string]RunningInstance var ri_mutex = &sync.Mutex{} -func engine() { - p = bluemonday.NewPolicy() - runninginstances = make(map[string]RunningInstance) - - // Initial Setup - reportPostChan := make(chan ReportPost) - - // Setup Database - pool, err := pgxpool.Connect(context.Background(), "postgres://postgres@127.0.0.1/fedilogue") - if err != nil { - fmt.Fprintln(os.Stderr, "Unable to connect to database:", err) - os.Exit(1) - } - - l, err := net.Listen("tcp", "127.0.0.1:5555") - if err != nil { - fmt.Println(err) - return - } - defer l.Close() - - commandClient := make(chan net.Conn) - - go func(l net.Listener) { - for { - c, err := l.Accept() - if err != nil { - fmt.Println("Error on accept", err) - os.Exit(0) - } - commandClient <- c - } - }(l) - - go func() { - for { - c := <-commandClient // New client connection - go handleClient(c, reportPostChan) - } - }() - - for { - v := <-reportPostChan // New Post - go writePost(pool, v) - } +func startpprof() { + log.Print("Starting http/pprof on :7777") + log.Fatal(http.ListenAndServe("127.0.0.1:7777", nil)) } func main() { - go engine() - go webmain() - log.Println("serving on port 8080") - log.Fatal(http.ListenAndServe(":7777", nil)) + // Initial Setup + reportPostChan := make(chan ReportPost) + runninginstances = make(map[string]RunningInstance) + + go startpprof() + + pool := get_db_pool() + go startctl(reportPostChan) + go webmain(reportPostChan) + for { // Write posts + v := <-reportPostChan + go writePost(pool, v) + } } diff --git a/poll/web.go b/poll/web.go index a3bd605..6374e0d 100644 --- a/poll/web.go +++ b/poll/web.go @@ -183,7 +183,7 @@ func errorHandler(w http.ResponseWriter, r *http.Request) { fmt.Println("404 --> ", r.URL.Path) } -func webmain() { +func webmain(reportPostChan chan ReportPost) { http.HandleFunc("/.well-known/webfinger", webfinger) http.HandleFunc("/.well-known/host-meta", hostmeta) http.HandleFunc("/inbox", inbox)