fedilogue/poll/engine.go

447 lines
12 KiB
Go

package main
import (
"github.com/microcosm-cc/bluemonday"
"github.com/jackc/pgx/pgxpool"
"encoding/binary"
"encoding/json"
"crypto/sha1"
"io/ioutil"
"net/http"
"context"
"strings"
"html"
"time"
"fmt"
"net"
"log"
"io"
"os"
)
const (
NEW_INSTANCE = 0
CLIENT_ISSUE = 600
ONION_PROTOCOL = 601
UNMARSHAL_ERROR = 602
NO_CONNECTION = 603
)
type PollMessage struct {
from string
status int
min_id string
numposts int
}
// Parsing Unmarshal JSON type
type ReportPost struct {
// Retrieved values
Id string `json:"id"`
Url string `json:"url"`
Account AccountType
Content string `json:"content"`
Created_at string `json:"created_at"`
// Derived values
normalized string
posthash []byte
}
type AccountType struct {
Acct string `json:"acct"`
Avatar string `json:"avatar"`
Bot bool `json:"bot"`
Created_at string `json:"created_at"`
Display_name string `json:"display_name"`
Url string `json:"url"`
}
// Used to report a new instance to main
type ReportInstance struct {
from string
endpoint string
status int
}
// Instance's new min_id value
type RunningInstance struct {
endpoint string `json:"endpoint"`
software string `json:"software"`
min_id string
status int `json:"status"`
}
type NodeInfoSoftware struct {
Name string `json:"name"`
Version string `json:"version"`
}
type NodeInfo struct {
Software NodeInfoSoftware `json:"software"`
}
type CommandMap struct {
Type string `json:"Type"`
Endpoint string `json:"Endpoint"`
}
type ResponseBack struct {
Type string `json:"Type"`
Message string `json:"Message"`
RunningInstances RunningInstance `json:"RunningInstances"`
}
func handleClient(commandClient net.Conn, runninginstances *[]RunningInstance, reportInstanceChan chan ReportInstance) {
sizebyte := make([]byte, 4)
var commandmap CommandMap
n, err := io.ReadFull(commandClient, sizebyte)
if n != 4 {
fmt.Println("Did not read 4 bytes, failure.")
os.Exit(1)
}
jsonsize := int(binary.LittleEndian.Uint32(sizebyte))
jsonbyte := make([]byte, jsonsize)
n, err = io.ReadFull(commandClient, jsonbyte)
if n != jsonsize {
fmt.Println("Failued to read json size of ", n)
os.Exit(1)
}
err = json.Unmarshal(jsonbyte, &commandmap)
if err != nil {
fmt.Println("Unable to unmarshal")
os.Exit(1)
}
switch commandmap.Type {
case "status":
fmt.Println("Status")
for _, runninginstance := range *runninginstances {
fmt.Println(runninginstance)
}
case "add":
fmt.Println("Add instance: " + commandmap.Endpoint)
var q ReportInstance
q.from = ""
q.endpoint = commandmap.Endpoint
q.status = NEW_INSTANCE
reportInstanceChan <- q
default:
fmt.Println("Something else")
}
commandClient.Close()
}
/*
* This code can be refactored per
* https://golang.org/pkg/encoding/binary/
* But for now, this should be sufficient
*/
/*
func parseCommand(c net.Conn) {
rawCommand := make([]byte, 1)
}
*/
func AppendIfMissing(hay []string, needle string) []string {
for _, ele := range hay {
if ele == needle {
return hay
}
}
return append(hay, needle)
}
func StartInstancePoll(endpoint string, min_id string, reportPostChan chan ReportPost, pollMessageChan chan PollMessage, reportInstanceChan chan ReportInstance) {
p := bluemonday.NewPolicy()
newposts := make([]ReportPost, 0)
// Only placing this here to later have the option of using an HTTP client via a SOCKS5 Tor proxy
if strings.Contains(endpoint, ".onion") == true {
reportInstanceChan <- ReportInstance{endpoint, endpoint, ONION_PROTOCOL}
return
}
api_timeline := "https://" + endpoint + "/api/v1/timelines/public?min_id=" + min_id
resp, err := http.Get(api_timeline)
if err != nil {
reportInstanceChan <- ReportInstance{endpoint, endpoint, CLIENT_ISSUE}
return
}
body, err := ioutil.ReadAll(resp.Body)
err = json.Unmarshal(body, &newposts)
if err != nil {
fmt.Println("Unmarshal 3");
// Perhaps get rid of this if-condition?
if resp.StatusCode >= 400 && resp.StatusCode < 500 {
reportInstanceChan <- ReportInstance{endpoint, endpoint, resp.StatusCode}
} else if resp.StatusCode >= 500 && resp.StatusCode < 600 {
reportInstanceChan <- ReportInstance{endpoint, endpoint, resp.StatusCode}
} else {
reportInstanceChan <- ReportInstance{endpoint, endpoint, UNMARSHAL_ERROR}
}
//log.Fatal(err)
return
}
newinstances := make([]string, 0)
numposts := 0
for _, newpost := range newposts {
posthash := sha1.New()
at_sign := strings.Index(newpost.Account.Acct, "@")
if at_sign == -1 {
at_sign = len(newpost.Account.Acct)
newpost.Account.Acct += "@" + endpoint
}
// Calculate the post hash
fmt.Fprint(posthash, newpost.Url)
fmt.Fprint(posthash, newpost.normalized)
fmt.Fprint(posthash, newpost.Account.Acct)
fmt.Fprint(posthash, newpost.Account.Display_name)
newpost.posthash = posthash.Sum(nil)
newpost.normalized = html.UnescapeString(strings.ToLower(p.Sanitize(newpost.Content)))
reportPostChan <- newpost
// Check min_id
if newpost.Id > min_id {
min_id = newpost.Id
}
numposts = numposts + 1
newinstance := newpost.Account.Acct[at_sign+1:]
newinstances = AppendIfMissing(newinstances, newinstance)
}
for _, newinstance := range newinstances {
var q ReportInstance
q.from = endpoint
q.endpoint = newinstance
q.status = NEW_INSTANCE
reportInstanceChan <- q
}
pollMessageChan <- PollMessage{endpoint, resp.StatusCode, min_id, numposts}
}
func StartGetPeers(endpoint string, reportInstanceChan chan ReportInstance) {
var newpeers []string
api_peers := "https://" + endpoint + "/api/v1/instance/peers"
resp, err := http.Get(api_peers)
if err != nil {
reportInstanceChan <- ReportInstance{endpoint, endpoint, NO_CONNECTION}
return
// os.Exit(1)
}
body, err := ioutil.ReadAll(resp.Body)
err = json.Unmarshal([]byte(body), &newpeers)
if err != nil {
fmt.Println("Unmarshal 1");
log.Fatal(err)
reportInstanceChan <- ReportInstance{endpoint, endpoint, UNMARSHAL_ERROR}
return
}
for _, newpeer := range newpeers {
var q ReportInstance
q.from = endpoint
q.endpoint = newpeer
q.status = NEW_INSTANCE
reportInstanceChan <- q
}
}
func DeferPollRun(pollmessage PollMessage, runninginstances *[]RunningInstance, reportInstanceChan chan ReportInstance, reportPostChan chan ReportPost, pollMessageChan chan PollMessage) {
var min_id string
for _, runninginstance := range *runninginstances {
if runninginstance.endpoint == pollmessage.from {
min_id = runninginstance.min_id
break
}
}
delay := 10
if pollmessage.status == 200 && pollmessage.numposts <= 10 {
delay = 10
} else if pollmessage.status == 200 && pollmessage.numposts > 10 {
delay = 15
} else if pollmessage.status == 429 {
delay = 30
} else {
fmt.Println("error, status code is: ", pollmessage.status)
}
time.Sleep(time.Second * time.Duration(delay))
go StartInstancePoll(pollmessage.from, min_id, reportPostChan, pollMessageChan, reportInstanceChan)
}
// Change this to return a proper "err"
func GetNodeInfo(endpoint string, nodeinfo *NodeInfo) {
api_nodeinfo := "https://" + endpoint + "/nodeinfo/2.0.json"
resp, err := http.Get(api_nodeinfo)
if err != nil {
return
}
body, err := ioutil.ReadAll(resp.Body)
err = json.Unmarshal(body, &nodeinfo)
if err != nil {
fmt.Println("Unmarshal 2");
return
}
}
func NewInstance(endpoint string, runninginstances *[]RunningInstance, reportInstanceChan chan ReportInstance, reportPostChan chan ReportPost, pollMessageChan chan PollMessage) {
var nodeinfo NodeInfo
// Check node type
GetNodeInfo(endpoint, &nodeinfo)
if endpoint == "" {
return
}
for _, runninginstance := range *runninginstances {
if runninginstance.endpoint == endpoint {
return
}
}
newinstance := RunningInstance{endpoint, "", "", NEW_INSTANCE}
*runninginstances = append(*runninginstances, newinstance)
if nodeinfo.Software.Name == "pleroma" || nodeinfo.Software.Name == "mastodon" {
go StartInstancePoll(endpoint, "", reportPostChan, pollMessageChan, reportInstanceChan)
// fmt.Println("Temporarily disabled Peer Hunting")
// go StartGetPeers(endpoint, reportInstanceChan)
}
}
func writePost(pool *pgxpool.Pool, reportpost ReportPost) {
conn, err := pool.Acquire(context.Background())
if err != nil {
fmt.Println("Error acquiring connection:", err)
os.Exit(1)
}
defer conn.Release()
// Insert new account if new
var accountid int
err = conn.QueryRow(context.Background(), "INSERT INTO accounts (acct, avatar, bot, created_at, display_name, url) VALUES($1, $2, $3, $4, $5, $6) ON CONFLICT(acct) DO UPDATE SET acct=EXCLUDED.acct RETURNING id", reportpost.Account.Acct, reportpost.Account.Avatar, reportpost.Account.Bot, reportpost.Account.Created_at, reportpost.Account.Display_name, reportpost.Account.Url).Scan(&accountid)
if err != nil {
fmt.Println("First ", err)
fmt.Println("--------------------------")
fmt.Println("Reported error: ", err)
fmt.Println("Account Acct: ", reportpost.Account.Acct)
fmt.Println("Account Avatar: ", reportpost.Account.Avatar)
fmt.Println("Account Avatar len: ", len(reportpost.Account.Avatar))
fmt.Println("Account Bot: ", reportpost.Account.Bot)
fmt.Println("Account Created_at: ", reportpost.Account.Created_at)
fmt.Println("Account Display: ", reportpost.Account.Display_name)
fmt.Println("Account URL: ", reportpost.Account.Url)
fmt.Println(reportpost)
fmt.Println("--------------------------")
os.Exit(1) // For now I want this to die and learn why it failed
return
}
// Insert new post if new
_, err = conn.Exec(context.Background(), "INSERT INTO posts (url, content, created_at, normalized, account_id, posthash) VALUES ($1, $2, $3, $4, $5, $6) ON CONFLICT (posthash) DO NOTHING", reportpost.Url, reportpost.Content, reportpost.Created_at, reportpost.normalized, accountid, reportpost.posthash)
if err != nil { // For now I want to know why this failed.
fmt.Println("Second ", err)
fmt.Println("--------------------------")
fmt.Println("Reported error: ", err)
fmt.Println("Url: ", reportpost.Url)
fmt.Println("Content: ", reportpost.Content)
fmt.Println("Created_at: ", reportpost.Created_at)
fmt.Println("normalized: ", reportpost.normalized)
fmt.Println("account_id", accountid)
fmt.Println("posthash: ", reportpost.posthash)
fmt.Println("--------------------------")
os.Exit(1) // For now I want this to die and learn why it failed
return
}
}
func SuspendInstance(suspendinstance ReportInstance, runninginstances *[]RunningInstance) {
for _, runninginstance := range *runninginstances {
if runninginstance.endpoint == suspendinstance.endpoint {
runninginstance.status = suspendinstance.status
return
}
}
}
func main() {
// Current instances
runninginstances := make([]RunningInstance, 0)
// Initial Setup
reportPostChan := make(chan ReportPost, 100)
reportInstanceChan := make(chan ReportInstance, 100)
pollMessageChan := make (chan PollMessage, 100)
// Setup Database
pool, err := pgxpool.Connect(context.Background(), "postgres://postgres@127.0.0.1/fedilogue")
if err != nil {
fmt.Fprintln(os.Stderr, "Unable to connect to database:", err)
os.Exit(1)
}
l, err := net.Listen("tcp", "127.0.0.1:5555")
if err != nil {
fmt.Println(err)
return
}
defer l.Close()
for {
commandClient := make(chan net.Conn)
go func(l net.Listener) {
for {
c, err := l.Accept()
if err != nil {
commandClient <- nil
return
}
commandClient <- c
}
}(l)
for {
select {
case c := <-commandClient: // New client connection
go handleClient(c, &runninginstances, reportInstanceChan)
case p := <-pollMessageChan: // A poller ended
for i, runninginstance := range runninginstances {
if runninginstance.endpoint == p.from {
runninginstances[i].min_id = p.min_id
}
}
go DeferPollRun(p, &runninginstances, reportInstanceChan, reportPostChan, pollMessageChan)
case v := <-reportPostChan: // New Post
go writePost(pool, v)
case w := <-reportInstanceChan: // Start or suspend instance
if w.status == NEW_INSTANCE {
NewInstance(w.endpoint, &runninginstances, reportInstanceChan, reportPostChan, pollMessageChan)
} else {
SuspendInstance(w, &runninginstances)
}
}
}
}
}