restructuring and cleanup
This commit is contained in:
parent
58d63dea93
commit
97bef269b8
@ -8,8 +8,38 @@ import (
|
||||
"log"
|
||||
"io"
|
||||
"os"
|
||||
"github.com/microcosm-cc/bluemonday"
|
||||
)
|
||||
|
||||
func startctl(reportPostChan chan ReportPost) {
|
||||
p = bluemonday.NewPolicy()
|
||||
|
||||
log.Print("Starting ctl listener on 127.0.0.1:5555")
|
||||
l, err := net.Listen("tcp", "127.0.0.1:5555")
|
||||
if err != nil {
|
||||
log.Fatal("Unable to start listener:", err)
|
||||
}
|
||||
defer l.Close()
|
||||
|
||||
commandClient := make(chan net.Conn)
|
||||
|
||||
go func(l net.Listener) {
|
||||
for {
|
||||
c, err := l.Accept()
|
||||
if err != nil {
|
||||
log.Fatal("Error on accept", err)
|
||||
}
|
||||
commandClient <- c
|
||||
}
|
||||
}(l)
|
||||
|
||||
for {
|
||||
c := <-commandClient // New client connection
|
||||
go handleClient(c, reportPostChan)
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
func handleClient(commandClient net.Conn, reportPostChan chan ReportPost) {
|
||||
|
||||
sizebyte := make([]byte, 4)
|
||||
@ -28,18 +58,15 @@ func handleClient(commandClient net.Conn, reportPostChan chan ReportPost) {
|
||||
jsonbyte := make([]byte, jsonsize)
|
||||
n, err = io.ReadFull(commandClient, jsonbyte)
|
||||
if err != nil {
|
||||
fmt.Println("Unable to unmarshal")
|
||||
os.Exit(1)
|
||||
log.Fatal("Unable to unmarshal")
|
||||
}
|
||||
if n != jsonsize {
|
||||
fmt.Println("Failed to read json size of ", n)
|
||||
os.Exit(1)
|
||||
log.Fatal("Failed to read json size of ", n)
|
||||
}
|
||||
|
||||
err = json.Unmarshal(jsonbyte, &commandmap)
|
||||
if err != nil {
|
||||
fmt.Println("Unable to unmarshal")
|
||||
os.Exit(1)
|
||||
log.Fatal("Unable to unmarshal")
|
||||
}
|
||||
|
||||
switch commandmap.Type {
|
59
poll/db.go
Normal file
59
poll/db.go
Normal file
@ -0,0 +1,59 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"github.com/jackc/pgx/pgxpool"
|
||||
"context"
|
||||
"log"
|
||||
)
|
||||
|
||||
func writePost(pool *pgxpool.Pool, reportpost ReportPost) {
|
||||
conn, err := pool.Acquire(context.Background())
|
||||
if err != nil {
|
||||
log.Fatal("Error connecting to database:", err)
|
||||
}
|
||||
defer conn.Release()
|
||||
|
||||
// Insert new account if new
|
||||
var accountid int
|
||||
err = conn.QueryRow(context.Background(), "INSERT INTO accounts (acct, avatar, bot, created_at, display_name, url) VALUES($1, $2, $3, $4, $5, $6) ON CONFLICT(acct) DO UPDATE SET acct=EXCLUDED.acct RETURNING id", reportpost.Account.Acct, reportpost.Account.Avatar, reportpost.Account.Bot, reportpost.Account.Created_at, reportpost.Account.Display_name, reportpost.Account.Url).Scan(&accountid)
|
||||
if err != nil {
|
||||
log.Print("First ", err)
|
||||
log.Print("--------------------------")
|
||||
log.Print("Reported error: ", err)
|
||||
log.Print("Account Acct: ", reportpost.Account.Acct)
|
||||
log.Print("Account Avatar: ", reportpost.Account.Avatar)
|
||||
log.Print("Account Avatar len: ", len(reportpost.Account.Avatar))
|
||||
log.Print("Account Bot: ", reportpost.Account.Bot)
|
||||
log.Print("Account Created_at: ", reportpost.Account.Created_at)
|
||||
log.Print("Account Display: ", reportpost.Account.Display_name)
|
||||
log.Print("Account URL: ", reportpost.Account.Url)
|
||||
log.Print(reportpost)
|
||||
log.Print("--------------------------")
|
||||
log.Fatal("Unable to write record to database")
|
||||
}
|
||||
|
||||
// Insert new post if new
|
||||
_, err = conn.Exec(context.Background(), "INSERT INTO posts (url, content, created_at, normalized, account_id, posthash) VALUES ($1, $2, $3, $4, $5, $6) ON CONFLICT (posthash) DO NOTHING", reportpost.Url, reportpost.Content, reportpost.Created_at, reportpost.normalized, accountid, reportpost.posthash)
|
||||
if err != nil { // For now I want to know why this failed.
|
||||
log.Print("Second ", err)
|
||||
log.Print("--------------------------")
|
||||
log.Print("Reported error: ", err)
|
||||
log.Print("Url: ", reportpost.Url)
|
||||
log.Print("Content: ", reportpost.Content)
|
||||
log.Print("Created_at: ", reportpost.Created_at)
|
||||
log.Print("normalized: ", reportpost.normalized)
|
||||
log.Print("account_id", accountid)
|
||||
log.Print("posthash: ", reportpost.posthash)
|
||||
log.Print("--------------------------")
|
||||
log.Fatal("Unable to write record to database")
|
||||
}
|
||||
}
|
||||
|
||||
func get_db_pool() (*pgxpool.Pool) {
|
||||
// Setup Database
|
||||
pool, err := pgxpool.Connect(context.Background(), "postgres://postgres@127.0.0.1/fedilogue")
|
||||
if err != nil {
|
||||
log.Fatal("Unable to connect to database:", err)
|
||||
}
|
||||
return pool
|
||||
}
|
@ -10,6 +10,7 @@ import (
|
||||
"html"
|
||||
"time"
|
||||
"fmt"
|
||||
"log"
|
||||
)
|
||||
|
||||
var p *bluemonday.Policy
|
||||
@ -166,23 +167,13 @@ func GetNodeInfo(endpoint string) (NodeInfo) {
|
||||
indexstr := string(indexbin)
|
||||
if strings.Contains(indexstr, "Pleroma") {
|
||||
nodeinfo.Software.Name = "pleroma"
|
||||
fmt.Println("It is Pleroma: " + endpoint)
|
||||
} else if strings.Contains(indexstr, "Mastodon") {
|
||||
nodeinfo.Software.Name = "mastodon"
|
||||
fmt.Println("It is Mastodon: " + endpoint)
|
||||
} else {
|
||||
return NodeInfo{}
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
err = json.NewDecoder(resp.Body).Decode(&nodeinfo)
|
||||
if err != nil {
|
||||
fmt.Println("Error Message 2:", resp.StatusCode, err, endpoint, resp.Status, api_nodeinfo)
|
||||
return NodeInfo{}
|
||||
}
|
||||
*/
|
||||
|
||||
return nodeinfo
|
||||
}
|
||||
|
||||
@ -200,19 +191,8 @@ func StartInstance(endpoint string, reportPostChan chan ReportPost) {
|
||||
}
|
||||
|
||||
if nodeinfo.Software.Name == "pleroma" || nodeinfo.Software.Name == "mastodon" {
|
||||
log.Print("Starting " + endpoint + " as Mastodon/Pleroma instance")
|
||||
go PollMastodonPleroma(endpoint, reportPostChan)
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
/*
|
||||
func SuspendInstance(suspendinstance InstanceReport) {
|
||||
for i, runninginstance := range runninginstances {
|
||||
if runninginstance.Endpoint == suspendinstance.endpoint {
|
||||
(runninginstances)[i].Status = suspendinstance.status
|
||||
(runninginstances)[i].LastRun = time.Now().Format("2006.01.02-15:04:05")
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
*/
|
||||
|
126
poll/main.go
126
poll/main.go
@ -1,127 +1,33 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"github.com/microcosm-cc/bluemonday"
|
||||
"github.com/jackc/pgx/pgxpool"
|
||||
_ "net/http/pprof"
|
||||
"net/http"
|
||||
"context"
|
||||
"sync"
|
||||
"fmt"
|
||||
"net"
|
||||
"log"
|
||||
"os"
|
||||
)
|
||||
|
||||
func AppendIfMissing(hay []string, needle string) []string {
|
||||
for _, ele := range hay {
|
||||
if ele == needle {
|
||||
return hay
|
||||
}
|
||||
}
|
||||
return append(hay, needle)
|
||||
}
|
||||
|
||||
func writePost(pool *pgxpool.Pool, reportpost ReportPost) {
|
||||
conn, err := pool.Acquire(context.Background())
|
||||
if err != nil {
|
||||
log.Fatal("Error connecting to database:", err)
|
||||
os.Exit(1)
|
||||
}
|
||||
defer conn.Release()
|
||||
|
||||
// Insert new account if new
|
||||
var accountid int
|
||||
err = conn.QueryRow(context.Background(), "INSERT INTO accounts (acct, avatar, bot, created_at, display_name, url) VALUES($1, $2, $3, $4, $5, $6) ON CONFLICT(acct) DO UPDATE SET acct=EXCLUDED.acct RETURNING id", reportpost.Account.Acct, reportpost.Account.Avatar, reportpost.Account.Bot, reportpost.Account.Created_at, reportpost.Account.Display_name, reportpost.Account.Url).Scan(&accountid)
|
||||
if err != nil {
|
||||
fmt.Println("First ", err)
|
||||
fmt.Println("--------------------------")
|
||||
fmt.Println("Reported error: ", err)
|
||||
fmt.Println("Account Acct: ", reportpost.Account.Acct)
|
||||
fmt.Println("Account Avatar: ", reportpost.Account.Avatar)
|
||||
fmt.Println("Account Avatar len: ", len(reportpost.Account.Avatar))
|
||||
fmt.Println("Account Bot: ", reportpost.Account.Bot)
|
||||
fmt.Println("Account Created_at: ", reportpost.Account.Created_at)
|
||||
fmt.Println("Account Display: ", reportpost.Account.Display_name)
|
||||
fmt.Println("Account URL: ", reportpost.Account.Url)
|
||||
fmt.Println(reportpost)
|
||||
fmt.Println("--------------------------")
|
||||
os.Exit(1) // For now I want this to die and learn why it failed
|
||||
return
|
||||
}
|
||||
|
||||
// Insert new post if new
|
||||
_, err = conn.Exec(context.Background(), "INSERT INTO posts (url, content, created_at, normalized, account_id, posthash) VALUES ($1, $2, $3, $4, $5, $6) ON CONFLICT (posthash) DO NOTHING", reportpost.Url, reportpost.Content, reportpost.Created_at, reportpost.normalized, accountid, reportpost.posthash)
|
||||
if err != nil { // For now I want to know why this failed.
|
||||
fmt.Println("Second ", err)
|
||||
fmt.Println("--------------------------")
|
||||
fmt.Println("Reported error: ", err)
|
||||
fmt.Println("Url: ", reportpost.Url)
|
||||
fmt.Println("Content: ", reportpost.Content)
|
||||
fmt.Println("Created_at: ", reportpost.Created_at)
|
||||
fmt.Println("normalized: ", reportpost.normalized)
|
||||
fmt.Println("account_id", accountid)
|
||||
fmt.Println("posthash: ", reportpost.posthash)
|
||||
fmt.Println("--------------------------")
|
||||
os.Exit(1) // For now I want this to die and learn why it failed
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
// Current instances
|
||||
var runninginstances map[string]RunningInstance
|
||||
var ri_mutex = &sync.Mutex{}
|
||||
|
||||
func engine() {
|
||||
p = bluemonday.NewPolicy()
|
||||
runninginstances = make(map[string]RunningInstance)
|
||||
|
||||
// Initial Setup
|
||||
reportPostChan := make(chan ReportPost)
|
||||
|
||||
// Setup Database
|
||||
pool, err := pgxpool.Connect(context.Background(), "postgres://postgres@127.0.0.1/fedilogue")
|
||||
if err != nil {
|
||||
fmt.Fprintln(os.Stderr, "Unable to connect to database:", err)
|
||||
os.Exit(1)
|
||||
}
|
||||
|
||||
l, err := net.Listen("tcp", "127.0.0.1:5555")
|
||||
if err != nil {
|
||||
fmt.Println(err)
|
||||
return
|
||||
}
|
||||
defer l.Close()
|
||||
|
||||
commandClient := make(chan net.Conn)
|
||||
|
||||
go func(l net.Listener) {
|
||||
for {
|
||||
c, err := l.Accept()
|
||||
if err != nil {
|
||||
fmt.Println("Error on accept", err)
|
||||
os.Exit(0)
|
||||
}
|
||||
commandClient <- c
|
||||
}
|
||||
}(l)
|
||||
|
||||
go func() {
|
||||
for {
|
||||
c := <-commandClient // New client connection
|
||||
go handleClient(c, reportPostChan)
|
||||
}
|
||||
}()
|
||||
|
||||
for {
|
||||
v := <-reportPostChan // New Post
|
||||
go writePost(pool, v)
|
||||
}
|
||||
func startpprof() {
|
||||
log.Print("Starting http/pprof on :7777")
|
||||
log.Fatal(http.ListenAndServe("127.0.0.1:7777", nil))
|
||||
}
|
||||
|
||||
func main() {
|
||||
go engine()
|
||||
go webmain()
|
||||
log.Println("serving on port 8080")
|
||||
log.Fatal(http.ListenAndServe(":7777", nil))
|
||||
// Initial Setup
|
||||
reportPostChan := make(chan ReportPost)
|
||||
runninginstances = make(map[string]RunningInstance)
|
||||
|
||||
go startpprof()
|
||||
|
||||
pool := get_db_pool()
|
||||
go startctl(reportPostChan)
|
||||
go webmain(reportPostChan)
|
||||
for { // Write posts
|
||||
v := <-reportPostChan
|
||||
go writePost(pool, v)
|
||||
}
|
||||
}
|
||||
|
@ -183,7 +183,7 @@ func errorHandler(w http.ResponseWriter, r *http.Request) {
|
||||
fmt.Println("404 --> ", r.URL.Path)
|
||||
}
|
||||
|
||||
func webmain() {
|
||||
func webmain(reportPostChan chan ReportPost) {
|
||||
http.HandleFunc("/.well-known/webfinger", webfinger)
|
||||
http.HandleFunc("/.well-known/host-meta", hostmeta)
|
||||
http.HandleFunc("/inbox", inbox)
|
||||
|
Loading…
x
Reference in New Issue
Block a user