fedilogue/poll/engine.go

296 lines
7.4 KiB
Go

package main
import (
"github.com/microcosm-cc/bluemonday"
"github.com/jackc/pgx/pgxpool"
"encoding/json"
"crypto/sha1"
"io/ioutil"
"net/http"
"context"
"strings"
"time"
"fmt"
"net"
"log"
"io"
"os"
)
type PollMessage struct {
from string
status int
min_id string
numposts int
}
// Parsing Unmarshal JSON type
type ReportPost struct {
// Retrieved values
Id string `json:"id"`
Url string `json:"url"`
Account AccountType
Content string `json:"content"`
Created_at string `json:"created_at"`
// Derived values
StrippedContent string
Posthash []byte
}
type AccountType struct {
Acct string `json:"acct"`
Avatar string `json:"avatar"`
Bot bool `json:"bot"`
Created_at string `json:"created_at"`
Display_name string `json:"display_name"`
Url string `json:"url"`
}
// Used to report a new instance to main
type ReportInstance struct {
from string
endpoint string
}
type RunningInstance struct {
endpoint string
min_id string
}
func handleClient(commandClient net.Conn) {
rawCommand := make([]byte, 20)
for {
_, err := io.ReadFull(commandClient, rawCommand)
if err != nil {
fmt.Println(err)
commandClient.Close()
}
fmt.Println(rawCommand)
}
}
/*
* This code can be refactored per
* https://golang.org/pkg/encoding/binary/
* But for now, this should be sufficient
*/
/*
func parseCommand(c net.Conn) {
rawCommand := make([]byte, 1)
}
*/
func StartInstancePoll(endpoint string, min_id string, reportPostChan chan ReportPost, pollMessageChan chan PollMessage) {
p := bluemonday.NewPolicy()
newposts := make([]ReportPost, 0)
api_timeline := "https://" + endpoint + "/api/v1/timelines/public?since_id=40&min_id=" + min_id
resp, err := http.Get(api_timeline)
if err != nil {
fmt.Println("Failure to retrieve HTTPS data...")
log.Fatal(err)
pollMessageChan <- PollMessage{endpoint, resp.StatusCode, "" , 0}
}
body, err := ioutil.ReadAll(resp.Body)
err = json.Unmarshal(body, &newposts)
if err != nil {
pollMessageChan <- PollMessage{endpoint, resp.StatusCode, "", 0}
fmt.Println("Failure to unmarshal 1")
fmt.Println(string(body))
log.Fatal(err)
panic(err)
}
numposts := 0
for _, newpost := range newposts {
posthash := sha1.New()
if strings.Contains(newpost.Account.Acct, "@") == false {
newpost.Account.Acct += "@" + endpoint
}
// Calculate the post hash
fmt.Fprint(posthash, newpost.Url)
fmt.Fprint(posthash, newpost.Content)
fmt.Fprint(posthash, newpost.Account.Acct)
fmt.Fprint(posthash, newpost.Account.Display_name)
fmt.Fprint(posthash, newpost.Account.Url)
newpost.Posthash = posthash.Sum(nil)
newpost.StrippedContent = p.Sanitize(newpost.Content)
reportPostChan <- newpost
// Check min_id
if newpost.Id > min_id {
min_id = newpost.Id
}
numposts = numposts + 1
}
pollMessageChan <- PollMessage{endpoint, resp.StatusCode, min_id, numposts}
}
func StartGetPeers(endpoint string, reportInstanceChan chan ReportInstance) {
var newpeers []string
api_peers := "https://" + endpoint + "/api/v1/instance/peers"
resp, err := http.Get(api_peers)
if err != nil {
fmt.Println("Peer instance failure")
os.Exit(1)
}
body, err := ioutil.ReadAll(resp.Body)
err = json.Unmarshal([]byte(body), &newpeers)
if err != nil {
fmt.Println("Unmarshal error")
log.Fatal(err)
panic(err)
}
for _, newpeer := range newpeers {
var q ReportInstance
q.from = endpoint
q.endpoint = newpeer
reportInstanceChan <- q
}
}
func DeferPollRun(pollmessage PollMessage, runninginstances *[]RunningInstance, reportInstanceChan chan ReportInstance, reportPostChan chan ReportPost, pollMessageChan chan PollMessage) {
var min_id string
for _, runninginstance := range *runninginstances {
if runninginstance.endpoint == pollmessage.from {
min_id = runninginstance.min_id
break
}
}
delay := 2
if pollmessage.status == 200 && pollmessage.numposts <= 10 {
delay = 2
} else if pollmessage.status == 200 && pollmessage.numposts > 10 {
delay = 5
} else if pollmessage.status == 429 {
delay = 30
} else {
fmt.Println("error, status code is: ", pollmessage.status)
os.Exit(1)
}
time.Sleep(time.Second * time.Duration(delay))
go StartInstancePoll(pollmessage.from, min_id, reportPostChan, pollMessageChan)
}
func NewInstance(endpoint string, runninginstances *[]RunningInstance, reportInstanceChan chan ReportInstance, reportPostChan chan ReportPost, pollMessageChan chan PollMessage) {
for _, runninginstance := range *runninginstances {
if runninginstance.endpoint == endpoint {
return
}
}
newinstance := RunningInstance{endpoint, ""}
*runninginstances = append(*runninginstances, newinstance)
go StartInstancePoll(endpoint, "", reportPostChan, pollMessageChan)
// fmt.Println("Temporarily disabled Peer Hunting")
// go StartGetPeers(endpoint, reportInstanceChan)
}
func writePost(pool *pgxpool.Pool, reportpost ReportPost) {
// fmt.Println("Writing post", reportpost)
conn, err := pool.Acquire(context.Background())
if err != nil {
fmt.Println("Error acquiring connection:", err)
os.Exit(1)
}
defer conn.Release()
// Insert new post if new
_, err = conn.Exec(context.Background(), "INSERT INTO posts (url, content, created_at, strippedcontent, posthash) VALUES ($1, $2, $3, $4, $5) ON CONFLICT (posthash) DO NOTHING", reportpost.Url, reportpost.Content, reportpost.Created_at, reportpost.StrippedContent, reportpost.Posthash)
if err != nil {
fmt.Println("Error on channel???")
fmt.Println(err)
os.Exit(1)
}
// Insert new account if new
_, err = conn.Exec(context.Background(), "INSERT INTO accounts (acct, avatar, bot, created_at, display_name, url) VALUES ($1, $2, $3, $4, $5, $6) ON CONFLICT (acct) DO NOTHING", reportpost.Account.Acct, reportpost.Account.Avatar, reportpost.Account.Bot, reportpost.Account.Created_at, reportpost.Account.Display_name, reportpost.Account.Url)
if err != nil {
fmt.Println("Error on channel???")
fmt.Println(err)
os.Exit(1)
}
// fmt.Println(reportpost.Account)
}
func main() {
// Current instances
runninginstances := make([]RunningInstance, 0)
// Initial Setup
reportPostChan := make(chan ReportPost, 100)
reportInstanceChan := make(chan ReportInstance, 100)
pollMessageChan := make (chan PollMessage, 100)
// Setup Database
pool, err := pgxpool.Connect(context.Background(), "postgres://postgres@127.0.0.1/tutorial")
if err != nil {
fmt.Fprintln(os.Stderr, "Unable to connect to database:", err)
os.Exit(1)
}
l, err := net.Listen("tcp", "127.0.0.1:5555")
if err != nil {
fmt.Println(err)
return
}
defer l.Close()
var q ReportInstance
q.from = ""
q.endpoint = "mastodon.social"
reportInstanceChan <- q
for {
commandClient := make(chan net.Conn)
go func(l net.Listener) {
for {
c, err := l.Accept()
if err != nil {
commandClient <- nil
return
}
commandClient <- c
}
}(l)
for {
select {
case c := <-commandClient: // New client connection
go handleClient(c)
case p := <-pollMessageChan: // A poller ended
for i, runninginstance := range runninginstances {
if runninginstance.endpoint == p.from {
runninginstances[i].min_id = p.min_id
}
}
go DeferPollRun(p, &runninginstances, reportInstanceChan, reportPostChan, pollMessageChan)
case v := <-reportPostChan: // New Post
go writePost(pool, v)
case w := <-reportInstanceChan: // Start a new instance
NewInstance(w.endpoint, &runninginstances, reportInstanceChan, reportPostChan, pollMessageChan)
}
}
}
}