package main import ( "github.com/microcosm-cc/bluemonday" "github.com/jackc/pgx/pgxpool" _ "net/http/pprof" "encoding/binary" "encoding/json" "crypto/sha1" "io/ioutil" "net/http" "context" "strings" "html" "time" "fmt" "net" "log" "io" "os" ) const ( NEW_INSTANCE = 0 RUNNING = 200 TOOMANYREQUESTS = 429 CLIENT_ISSUE = 600 ONION_PROTOCOL = 601 BAD_RESPONSE = 602 NO_CONNECTION = 603 BAD_NODEINFO = 604 ) // Parsing Unmarshal JSON type type ReportPost struct { // Retrieved values Id string `json:"id"` Url string `json:"url"` Account AccountType Content string `json:"content"` Created_at string `json:"created_at"` // Derived values normalized string posthash []byte } type AccountType struct { Acct string `json:"acct"` Avatar string `json:"avatar"` Bot bool `json:"bot"` Created_at string `json:"created_at"` Display_name string `json:"display_name"` Url string `json:"url"` } // Used to report a new instance to main type InstanceReport struct { endpoint string status int min_id string numposts int } // Instance's new min_id value type RunningInstance struct { Endpoint string `json:"endpoint"` Software string `json:"software"` Min_id string Status int `json:"status"` LastRun string `json:"lastrun"` } type NodeInfoSoftware struct { Name string `json:"name"` Version string `json:"version"` } type NodeInfo struct { Software NodeInfoSoftware `json:"software"` } type CommandMap struct { Type string `json:"Type"` Endpoint string `json:"Endpoint"` } type ResponseBack struct { Type string `json:"Type"` Message string `json:"Message"` RunningInstances []RunningInstance `json:"RunningInstances"` } func handleClient(commandClient net.Conn, runninginstances *[]RunningInstance, instanceReportChan chan InstanceReport) { sizebyte := make([]byte, 4) var commandmap CommandMap var responseback ResponseBack n, err := io.ReadFull(commandClient, sizebyte) if n != 4 { fmt.Println("Did not read 4 bytes, failure.") os.Exit(1) } jsonsize := int(binary.LittleEndian.Uint32(sizebyte)) jsonbyte := make([]byte, jsonsize) n, err = io.ReadFull(commandClient, jsonbyte) if n != jsonsize { fmt.Println("Failued to read json size of ", n) os.Exit(1) } err = json.Unmarshal(jsonbyte, &commandmap) if err != nil { fmt.Println("Unable to unmarshal") os.Exit(1) } switch commandmap.Type { case "status": responseback.Message = "Ok" case "add": log.Print("Manually added instance: " + commandmap.Endpoint) var q InstanceReport q.endpoint = commandmap.Endpoint q.status = NEW_INSTANCE instanceReportChan <- q responseback.Message = "Added " + commandmap.Endpoint case "suspend": fmt.Println("Suspend") case "resume": fmt.Println("Resume") default: fmt.Println("Something else") } responseback.Type = "status" responseback.RunningInstances = *runninginstances responsebytes, err := json.Marshal(responseback) if err != nil { fmt.Println("Error: ", err) os.Exit(1) } n = len(responsebytes) binary.LittleEndian.PutUint32(sizebyte, uint32(n)) commandClient.Write(sizebyte) responsebyte, err := json.Marshal(responseback) if err != nil { fmt.Println("Error response back: ", err) os.Exit(1) } commandClient.Write(responsebyte) commandClient.Close() } /* * This code can be refactored per * https://golang.org/pkg/encoding/binary/ * But for now, this should be sufficient */ /* func parseCommand(c net.Conn) { rawCommand := make([]byte, 1) } */ func AppendIfMissing(hay []string, needle string) []string { for _, ele := range hay { if ele == needle { return hay } } return append(hay, needle) } func StartInstancePoll(instancereport InstanceReport, reportPostChan chan ReportPost, instanceReportChan chan InstanceReport) { p := bluemonday.NewPolicy() newposts := make([]ReportPost, 0) // Only placing this here to later have the option of using // an HTTP client via a SOCKS5 Tor proxy if strings.Contains(instancereport.endpoint, ".onion") == true { instanceReportChan <- InstanceReport{instancereport.endpoint, ONION_PROTOCOL, "", 0} return } api_timeline := "https://" + instancereport.endpoint + "/api/v1/timelines/public?min_id=" + instancereport.min_id resp, err := http.Get(api_timeline) if err != nil { instanceReportChan <- InstanceReport{instancereport.endpoint, CLIENT_ISSUE, "", 0} return } body, err := ioutil.ReadAll(resp.Body) err = json.Unmarshal(body, &newposts) if err != nil { // instanceReportChan <- InstanceReport{instancereport.endpoint, resp.StatusCode, "", 0} instanceReportChan <- InstanceReport{instancereport.endpoint, BAD_RESPONSE, "", 0} return } newinstances := make([]string, 0) min_id := "" numposts := 0 for _, newpost := range newposts { posthash := sha1.New() at_sign := strings.Index(newpost.Account.Acct, "@") if at_sign == -1 { at_sign = len(newpost.Account.Acct) newpost.Account.Acct += "@" + instancereport.endpoint } // Calculate the post hash fmt.Fprint(posthash, newpost.Url) fmt.Fprint(posthash, newpost.normalized) fmt.Fprint(posthash, newpost.Account.Acct) fmt.Fprint(posthash, newpost.Account.Display_name) newpost.posthash = posthash.Sum(nil) newpost.normalized = html.UnescapeString(strings.ToLower(p.Sanitize(newpost.Content))) reportPostChan <- newpost // Check min_id if newpost.Id > min_id { min_id = newpost.Id } numposts = numposts + 1 newinstance := newpost.Account.Acct[at_sign+1:] newinstances = AppendIfMissing(newinstances, newinstance) } for _, newinstance := range newinstances { var q InstanceReport q.endpoint = newinstance q.status = NEW_INSTANCE q.min_id = "" q.numposts = 0 instanceReportChan <- q } instanceReportChan <- InstanceReport{instancereport.endpoint, resp.StatusCode, min_id, numposts} } /* func StartGetPeers(endpoint string, instanceReportChan chan InstanceReport) { var newpeers []string api_peers := "https://" + endpoint + "/api/v1/instance/peers" resp, err := http.Get(api_peers) if err != nil { instanceReportChan <- InstanceReport{endpoint, NO_CONNECTION, "", 0} return // os.Exit(1) } body, err := ioutil.ReadAll(resp.Body) err = json.Unmarshal([]byte(body), &newpeers) if err != nil { fmt.Println("Unmarshal 1"); log.Fatal(err) instanceReportChan <- InstanceReport{endpoint, endpoint, UNMARSHAL_ERROR, "", 0} return } for _, newpeer := range newpeers { var q InstanceReport q.endpoint = newpeer q.status = NEW_INSTANCE instanceReportChan <- q } } */ func DeferPollRun(instancereport InstanceReport, runninginstances *[]RunningInstance, instanceReportChan chan InstanceReport, reportPostChan chan ReportPost) { delay := 10 if instancereport.status == 200 && instancereport.numposts <= 10 { delay = 10 } else if instancereport.status == 200 && instancereport.numposts > 10 { delay = 15 } else if instancereport.status == 429 { delay = 30 } else { fmt.Println("error, status code is ------------->: ", instancereport.status) os.Exit(1) } time.Sleep(time.Second * time.Duration(delay)) go StartInstancePoll(instancereport, reportPostChan, instanceReportChan) } // Change this to return a proper "err" func GetNodeInfo(endpoint string, nodeinfo *NodeInfo) { api_nodeinfo := "https://" + endpoint + "/nodeinfo/2.0.json" http_client := http.Client{Timeout: 5 * time.Second} resp, err := http_client.Get(api_nodeinfo) if err != nil { return } body, err := ioutil.ReadAll(resp.Body) err = json.Unmarshal(body, &nodeinfo) if err != nil { // fmt.Println("Unmarshal 2"); return } } func NewInstance(endpoint string, runninginstances *[]RunningInstance, instanceReportChan chan InstanceReport, reportPostChan chan ReportPost) { var nodeinfo NodeInfo if endpoint == "" { return } // No repeats for _, runninginstance := range *runninginstances { if runninginstance.Endpoint == endpoint { return } } // Check node type GetNodeInfo(endpoint, &nodeinfo) if nodeinfo.Software.Name == "" { go func() { var q InstanceReport q.endpoint = endpoint q.status = BAD_NODEINFO instanceReportChan <- q return }() } newinstance := RunningInstance{endpoint, "", "", NEW_INSTANCE, "Queued"} *runninginstances = append(*runninginstances, newinstance) if nodeinfo.Software.Name == "pleroma" || nodeinfo.Software.Name == "mastodon" { var newinstancereport InstanceReport newinstancereport.endpoint = endpoint newinstancereport.status = 0 newinstancereport.min_id = "" newinstancereport.numposts = 0 go StartInstancePoll(newinstancereport, reportPostChan, instanceReportChan) // fmt.Println("Temporarily disabled Peer Hunting") // go StartGetPeers(endpoint, instanceReportChan) } } func writePost(pool *pgxpool.Pool, reportpost ReportPost) { conn, err := pool.Acquire(context.Background()) if err != nil { fmt.Println("Error acquiring connection:", err) os.Exit(1) } defer conn.Release() // Insert new account if new var accountid int err = conn.QueryRow(context.Background(), "INSERT INTO accounts (acct, avatar, bot, created_at, display_name, url) VALUES($1, $2, $3, $4, $5, $6) ON CONFLICT(acct) DO UPDATE SET acct=EXCLUDED.acct RETURNING id", reportpost.Account.Acct, reportpost.Account.Avatar, reportpost.Account.Bot, reportpost.Account.Created_at, reportpost.Account.Display_name, reportpost.Account.Url).Scan(&accountid) if err != nil { fmt.Println("First ", err) fmt.Println("--------------------------") fmt.Println("Reported error: ", err) fmt.Println("Account Acct: ", reportpost.Account.Acct) fmt.Println("Account Avatar: ", reportpost.Account.Avatar) fmt.Println("Account Avatar len: ", len(reportpost.Account.Avatar)) fmt.Println("Account Bot: ", reportpost.Account.Bot) fmt.Println("Account Created_at: ", reportpost.Account.Created_at) fmt.Println("Account Display: ", reportpost.Account.Display_name) fmt.Println("Account URL: ", reportpost.Account.Url) fmt.Println(reportpost) fmt.Println("--------------------------") // os.Exit(1) // For now I want this to die and learn why it failed return } // Insert new post if new _, err = conn.Exec(context.Background(), "INSERT INTO posts (url, content, created_at, normalized, account_id, posthash) VALUES ($1, $2, $3, $4, $5, $6) ON CONFLICT (posthash) DO NOTHING", reportpost.Url, reportpost.Content, reportpost.Created_at, reportpost.normalized, accountid, reportpost.posthash) if err != nil { // For now I want to know why this failed. fmt.Println("Second ", err) fmt.Println("--------------------------") fmt.Println("Reported error: ", err) fmt.Println("Url: ", reportpost.Url) fmt.Println("Content: ", reportpost.Content) fmt.Println("Created_at: ", reportpost.Created_at) fmt.Println("normalized: ", reportpost.normalized) fmt.Println("account_id", accountid) fmt.Println("posthash: ", reportpost.posthash) fmt.Println("--------------------------") // os.Exit(1) // For now I want this to die and learn why it failed return } } func SuspendInstance(suspendinstance InstanceReport, runninginstances *[]RunningInstance) { for i, runninginstance := range *runninginstances { if runninginstance.Endpoint == suspendinstance.endpoint { (*runninginstances)[i].Status = suspendinstance.status (*runninginstances)[i].LastRun = time.Now().Format("2006.01.02-15:04:05") return } } } func engine() { // Current instances runninginstances := make([]RunningInstance, 0) // Initial Setup reportPostChan := make(chan ReportPost, 2000) instanceReportChan := make(chan InstanceReport, 100) // Setup Database pool, err := pgxpool.Connect(context.Background(), "postgres://postgres@127.0.0.1/fedilogue") if err != nil { fmt.Fprintln(os.Stderr, "Unable to connect to database:", err) os.Exit(1) } l, err := net.Listen("tcp", "127.0.0.1:5555") if err != nil { fmt.Println(err) return } defer l.Close() commandClient := make(chan net.Conn) go func(l net.Listener) { for { c, err := l.Accept() if err != nil { fmt.Println("Error on accept") commandClient <- nil return } commandClient <- c } }(l) for { select { case c := <-commandClient: // New client connection go handleClient(c, &runninginstances, instanceReportChan) case v := <-reportPostChan: // New Post go writePost(pool, v) case w := <-instanceReportChan: // Start or suspend instance if w.status == NEW_INSTANCE { NewInstance(w.endpoint, &runninginstances, instanceReportChan, reportPostChan) } else if w.status == RUNNING || w.status == TOOMANYREQUESTS { for i, runninginstance := range runninginstances { if runninginstance.Endpoint == w.endpoint { runninginstances[i].Min_id = w.min_id runninginstances[i].Status = w.status runninginstances[i].LastRun = time.Now().Format("2006.01.02-15:04:05") } } go DeferPollRun(w, &runninginstances, instanceReportChan, reportPostChan) } else { SuspendInstance(w, &runninginstances) } } } } func main() { go engine() log.Println("serving on port 8080") log.Fatal(http.ListenAndServe(":8080", nil)) }