oh yeah, go can do that multiple file thing

This commit is contained in:
farhan 2020-11-10 21:53:46 -05:00
parent 3ae62454db
commit 8a12f277b1
5 changed files with 465 additions and 459 deletions

80
poll/cli.go Normal file
View File

@ -0,0 +1,80 @@
package main
import (
"encoding/binary"
"encoding/json"
"fmt"
"net"
"log"
"io"
"os"
)
func handleClient(commandClient net.Conn, runninginstances *[]RunningInstance, instanceReportChan chan InstanceReport) {
sizebyte := make([]byte, 4)
var commandmap CommandMap
var responseback ResponseBack
n, err := io.ReadFull(commandClient, sizebyte)
if n != 4 {
fmt.Println("Did not read 4 bytes, failure.")
os.Exit(1)
}
jsonsize := int(binary.LittleEndian.Uint32(sizebyte))
jsonbyte := make([]byte, jsonsize)
n, err = io.ReadFull(commandClient, jsonbyte)
if n != jsonsize {
fmt.Println("Failued to read json size of ", n)
os.Exit(1)
}
err = json.Unmarshal(jsonbyte, &commandmap)
if err != nil {
fmt.Println("Unable to unmarshal")
os.Exit(1)
}
switch commandmap.Type {
case "status":
responseback.Message = "Ok"
case "add":
log.Print("Manually added instance: " + commandmap.Endpoint)
var q InstanceReport
q.endpoint = commandmap.Endpoint
q.status = NEW_INSTANCE
instanceReportChan <- q
responseback.Message = "Added " + commandmap.Endpoint
case "suspend":
fmt.Println("Suspend")
case "resume":
fmt.Println("Resume")
default:
fmt.Println("Something else")
}
responseback.Type = "status"
responseback.RunningInstances = *runninginstances
responsebytes, err := json.Marshal(responseback)
if err != nil {
fmt.Println("Error: ", err)
os.Exit(1)
}
n = len(responsebytes)
binary.LittleEndian.PutUint32(sizebyte, uint32(n))
commandClient.Write(sizebyte)
responsebyte, err := json.Marshal(responseback)
if err != nil {
fmt.Println("Error response back: ", err)
os.Exit(1)
}
commandClient.Write(responsebyte)
commandClient.Close()
}

View File

@ -1,459 +0,0 @@
package main
import (
"github.com/microcosm-cc/bluemonday"
"github.com/jackc/pgx/pgxpool"
_ "net/http/pprof"
"encoding/binary"
"encoding/json"
"crypto/sha1"
"io/ioutil"
"net/http"
"context"
"strings"
"html"
"time"
"fmt"
"net"
"log"
"io"
"os"
)
const (
NEW_INSTANCE = 0
RUNNING = 200
TOOMANYREQUESTS = 429
CLIENT_ISSUE = 600
ONION_PROTOCOL = 601
BAD_RESPONSE = 602
NO_CONNECTION = 603
BAD_NODEINFO = 604
)
// Parsing Unmarshal JSON type
type ReportPost struct {
// Retrieved values
Id string `json:"id"`
Url string `json:"url"`
Account AccountType
Content string `json:"content"`
Created_at string `json:"created_at"`
// Derived values
normalized string
posthash []byte
}
type AccountType struct {
Acct string `json:"acct"`
Avatar string `json:"avatar"`
Bot bool `json:"bot"`
Created_at string `json:"created_at"`
Display_name string `json:"display_name"`
Url string `json:"url"`
}
// Used to report a new instance to main
type InstanceReport struct {
endpoint string
status int
min_id string
numposts int
}
// Instance's new min_id value
type RunningInstance struct {
Endpoint string `json:"endpoint"`
Software string `json:"software"`
Min_id string
Status int `json:"status"`
LastRun string `json:"lastrun"`
}
type NodeInfoSoftware struct {
Name string `json:"name"`
Version string `json:"version"`
}
type NodeInfo struct {
Software NodeInfoSoftware `json:"software"`
}
type CommandMap struct {
Type string `json:"Type"`
Endpoint string `json:"Endpoint"`
}
type ResponseBack struct {
Type string `json:"Type"`
Message string `json:"Message"`
RunningInstances []RunningInstance `json:"RunningInstances"`
}
func handleClient(commandClient net.Conn, runninginstances *[]RunningInstance, instanceReportChan chan InstanceReport) {
sizebyte := make([]byte, 4)
var commandmap CommandMap
var responseback ResponseBack
n, err := io.ReadFull(commandClient, sizebyte)
if n != 4 {
fmt.Println("Did not read 4 bytes, failure.")
os.Exit(1)
}
jsonsize := int(binary.LittleEndian.Uint32(sizebyte))
jsonbyte := make([]byte, jsonsize)
n, err = io.ReadFull(commandClient, jsonbyte)
if n != jsonsize {
fmt.Println("Failued to read json size of ", n)
os.Exit(1)
}
err = json.Unmarshal(jsonbyte, &commandmap)
if err != nil {
fmt.Println("Unable to unmarshal")
os.Exit(1)
}
switch commandmap.Type {
case "status":
responseback.Message = "Ok"
case "add":
log.Print("Manually added instance: " + commandmap.Endpoint)
var q InstanceReport
q.endpoint = commandmap.Endpoint
q.status = NEW_INSTANCE
instanceReportChan <- q
responseback.Message = "Added " + commandmap.Endpoint
case "suspend":
fmt.Println("Suspend")
case "resume":
fmt.Println("Resume")
default:
fmt.Println("Something else")
}
responseback.Type = "status"
responseback.RunningInstances = *runninginstances
responsebytes, err := json.Marshal(responseback)
if err != nil {
fmt.Println("Error: ", err)
os.Exit(1)
}
n = len(responsebytes)
binary.LittleEndian.PutUint32(sizebyte, uint32(n))
commandClient.Write(sizebyte)
responsebyte, err := json.Marshal(responseback)
if err != nil {
fmt.Println("Error response back: ", err)
os.Exit(1)
}
commandClient.Write(responsebyte)
commandClient.Close()
}
/*
* This code can be refactored per
* https://golang.org/pkg/encoding/binary/
* But for now, this should be sufficient
*/
/*
func parseCommand(c net.Conn) {
rawCommand := make([]byte, 1)
}
*/
func AppendIfMissing(hay []string, needle string) []string {
for _, ele := range hay {
if ele == needle {
return hay
}
}
return append(hay, needle)
}
func StartInstancePoll(instancereport InstanceReport, reportPostChan chan ReportPost, instanceReportChan chan InstanceReport) {
p := bluemonday.NewPolicy()
newposts := make([]ReportPost, 0)
// Only placing this here to later have the option of using
// an HTTP client via a SOCKS5 Tor proxy
if strings.Contains(instancereport.endpoint, ".onion") == true {
instanceReportChan <- InstanceReport{instancereport.endpoint, ONION_PROTOCOL, "", 0}
return
}
api_timeline := "https://" + instancereport.endpoint + "/api/v1/timelines/public?min_id=" + instancereport.min_id
resp, err := http.Get(api_timeline)
if err != nil {
instanceReportChan <- InstanceReport{instancereport.endpoint, CLIENT_ISSUE, "", 0}
return
}
body, err := ioutil.ReadAll(resp.Body)
err = json.Unmarshal(body, &newposts)
if err != nil {
// instanceReportChan <- InstanceReport{instancereport.endpoint, resp.StatusCode, "", 0}
instanceReportChan <- InstanceReport{instancereport.endpoint, BAD_RESPONSE, "", 0}
return
}
newinstances := make([]string, 0)
min_id := ""
numposts := 0
for _, newpost := range newposts {
if newpost.Account.Acct == "" {
continue
}
posthash := sha1.New()
at_sign := strings.Index(newpost.Account.Acct, "@")
if at_sign == -1 {
at_sign = len(newpost.Account.Acct)
newpost.Account.Acct += "@" + instancereport.endpoint
}
// Calculate the post hash
fmt.Fprint(posthash, newpost.Url)
fmt.Fprint(posthash, newpost.normalized)
fmt.Fprint(posthash, newpost.Account.Acct)
fmt.Fprint(posthash, newpost.Account.Display_name)
newpost.posthash = posthash.Sum(nil)
newpost.normalized = html.UnescapeString(strings.ToLower(p.Sanitize(newpost.Content)))
reportPostChan <- newpost
// Check min_id
if newpost.Id > min_id {
min_id = newpost.Id
}
numposts = numposts + 1
newinstance := newpost.Account.Acct[at_sign+1:]
newinstances = AppendIfMissing(newinstances, newinstance)
}
for _, newinstance := range newinstances {
var q InstanceReport
q.endpoint = newinstance
q.status = NEW_INSTANCE
q.min_id = ""
q.numposts = 0
instanceReportChan <- q
}
instanceReportChan <- InstanceReport{instancereport.endpoint, resp.StatusCode, min_id, numposts}
}
func DeferPollRun(instancereport InstanceReport, runninginstances *[]RunningInstance, instanceReportChan chan InstanceReport, reportPostChan chan ReportPost) {
delay := 10
if instancereport.status == RUNNING && instancereport.numposts <= 10 {
delay = 10
} else if instancereport.status == RUNNING && instancereport.numposts > 10 {
delay = 15
} else if instancereport.status == 429 {
delay = 30
} else {
fmt.Println("error, status code is ------------->: ", instancereport.status)
os.Exit(1)
}
time.Sleep(time.Second * time.Duration(delay))
go StartInstancePoll(instancereport, reportPostChan, instanceReportChan)
}
// Change this to return a proper "err"
func GetNodeInfo(endpoint string, nodeinfo *NodeInfo) {
api_nodeinfo := "https://" + endpoint + "/nodeinfo/2.0.json"
http_client := http.Client{Timeout: 5 * time.Second}
resp, err := http_client.Get(api_nodeinfo)
if err != nil {
return
}
body, err := ioutil.ReadAll(resp.Body)
err = json.Unmarshal(body, &nodeinfo)
if err != nil {
// fmt.Println("Unmarshal 2");
return
}
}
func NewInstance(endpoint string, runninginstances *[]RunningInstance, instanceReportChan chan InstanceReport, reportPostChan chan ReportPost) {
var nodeinfo NodeInfo
if endpoint == "" {
return
}
// No repeats
for _, runninginstance := range *runninginstances {
if runninginstance.Endpoint == endpoint {
return
}
}
// Check node type
GetNodeInfo(endpoint, &nodeinfo)
if nodeinfo.Software.Name == "" {
go func() {
var q InstanceReport
q.endpoint = endpoint
q.status = BAD_NODEINFO
instanceReportChan <- q
return
}()
}
newinstance := RunningInstance{endpoint, "", "", NEW_INSTANCE, "Queued"}
*runninginstances = append(*runninginstances, newinstance)
if nodeinfo.Software.Name == "pleroma" || nodeinfo.Software.Name == "mastodon" {
var newinstancereport InstanceReport
newinstancereport.endpoint = endpoint
newinstancereport.status = 0
newinstancereport.min_id = ""
newinstancereport.numposts = 0
go StartInstancePoll(newinstancereport, reportPostChan, instanceReportChan)
}
}
func writePost(pool *pgxpool.Pool, reportpost ReportPost) {
conn, err := pool.Acquire(context.Background())
if err != nil {
log.Fatal("Error connecting to database:", err)
os.Exit(1)
}
defer conn.Release()
// Insert new account if new
var accountid int
err = conn.QueryRow(context.Background(), "INSERT INTO accounts (acct, avatar, bot, created_at, display_name, url) VALUES($1, $2, $3, $4, $5, $6) ON CONFLICT(acct) DO UPDATE SET acct=EXCLUDED.acct RETURNING id", reportpost.Account.Acct, reportpost.Account.Avatar, reportpost.Account.Bot, reportpost.Account.Created_at, reportpost.Account.Display_name, reportpost.Account.Url).Scan(&accountid)
if err != nil {
fmt.Println("First ", err)
fmt.Println("--------------------------")
fmt.Println("Reported error: ", err)
fmt.Println("Account Acct: ", reportpost.Account.Acct)
fmt.Println("Account Avatar: ", reportpost.Account.Avatar)
fmt.Println("Account Avatar len: ", len(reportpost.Account.Avatar))
fmt.Println("Account Bot: ", reportpost.Account.Bot)
fmt.Println("Account Created_at: ", reportpost.Account.Created_at)
fmt.Println("Account Display: ", reportpost.Account.Display_name)
fmt.Println("Account URL: ", reportpost.Account.Url)
fmt.Println(reportpost)
fmt.Println("--------------------------")
os.Exit(1) // For now I want this to die and learn why it failed
return
}
// Insert new post if new
_, err = conn.Exec(context.Background(), "INSERT INTO posts (url, content, created_at, normalized, account_id, posthash) VALUES ($1, $2, $3, $4, $5, $6) ON CONFLICT (posthash) DO NOTHING", reportpost.Url, reportpost.Content, reportpost.Created_at, reportpost.normalized, accountid, reportpost.posthash)
if err != nil { // For now I want to know why this failed.
fmt.Println("Second ", err)
fmt.Println("--------------------------")
fmt.Println("Reported error: ", err)
fmt.Println("Url: ", reportpost.Url)
fmt.Println("Content: ", reportpost.Content)
fmt.Println("Created_at: ", reportpost.Created_at)
fmt.Println("normalized: ", reportpost.normalized)
fmt.Println("account_id", accountid)
fmt.Println("posthash: ", reportpost.posthash)
fmt.Println("--------------------------")
os.Exit(1) // For now I want this to die and learn why it failed
return
}
}
func SuspendInstance(suspendinstance InstanceReport, runninginstances *[]RunningInstance) {
for i, runninginstance := range *runninginstances {
if runninginstance.Endpoint == suspendinstance.endpoint {
(*runninginstances)[i].Status = suspendinstance.status
(*runninginstances)[i].LastRun = time.Now().Format("2006.01.02-15:04:05")
return
}
}
}
func engine() {
// Current instances
runninginstances := make([]RunningInstance, 0)
// Initial Setup
reportPostChan := make(chan ReportPost, 2000)
instanceReportChan := make(chan InstanceReport, 100)
// Setup Database
pool, err := pgxpool.Connect(context.Background(), "postgres://postgres@127.0.0.1/fedilogue")
if err != nil {
fmt.Fprintln(os.Stderr, "Unable to connect to database:", err)
os.Exit(1)
}
l, err := net.Listen("tcp", "127.0.0.1:5555")
if err != nil {
fmt.Println(err)
return
}
defer l.Close()
commandClient := make(chan net.Conn)
go func(l net.Listener) {
for {
c, err := l.Accept()
if err != nil {
fmt.Println("Error on accept")
commandClient <- nil
return
}
commandClient <- c
}
}(l)
for {
select {
case c := <-commandClient: // New client connection
go handleClient(c, &runninginstances, instanceReportChan)
case v := <-reportPostChan: // New Post
go writePost(pool, v)
case w := <-instanceReportChan: // Start or suspend instance
if w.status == NEW_INSTANCE {
NewInstance(w.endpoint, &runninginstances, instanceReportChan, reportPostChan)
} else if w.status == RUNNING || w.status == TOOMANYREQUESTS {
for i, runninginstance := range runninginstances {
if runninginstance.Endpoint == w.endpoint {
runninginstances[i].Min_id = w.min_id
runninginstances[i].Status = w.status
runninginstances[i].LastRun = time.Now().Format("2006.01.02-15:04:05")
}
}
go DeferPollRun(w, &runninginstances, instanceReportChan, reportPostChan)
} else {
SuspendInstance(w, &runninginstances)
}
}
}
}
func main() {
go engine()
log.Println("serving on port 8080")
log.Fatal(http.ListenAndServe(":8080", nil))
}

74
poll/headers.go Normal file
View File

@ -0,0 +1,74 @@
package main
const (
NEW_INSTANCE = 0
RUNNING = 200
TOOMANYREQUESTS = 429
CLIENT_ISSUE = 600
ONION_PROTOCOL = 601
BAD_RESPONSE = 602
NO_CONNECTION = 603
BAD_NODEINFO = 604
)
// Parsing Unmarshal JSON type
type ReportPost struct {
// Retrieved values
Id string `json:"id"`
Url string `json:"url"`
Account AccountType
Content string `json:"content"`
Created_at string `json:"created_at"`
// Derived values
normalized string
posthash []byte
}
type AccountType struct {
Acct string `json:"acct"`
Avatar string `json:"avatar"`
Bot bool `json:"bot"`
Created_at string `json:"created_at"`
Display_name string `json:"display_name"`
Url string `json:"url"`
}
// Used to report a new instance to main
type InstanceReport struct {
endpoint string
status int
min_id string
numposts int
}
// Instance's new min_id value
type RunningInstance struct {
Endpoint string `json:"endpoint"`
Software string `json:"software"`
Min_id string
Status int `json:"status"`
LastRun string `json:"lastrun"`
}
type NodeInfoSoftware struct {
Name string `json:"name"`
Version string `json:"version"`
}
type NodeInfo struct {
Software NodeInfoSoftware `json:"software"`
}
type CommandMap struct {
Type string `json:"Type"`
Endpoint string `json:"Endpoint"`
}
type ResponseBack struct {
Type string `json:"Type"`
Message string `json:"Message"`
RunningInstances []RunningInstance `json:"RunningInstances"`
}

175
poll/instance.go Normal file
View File

@ -0,0 +1,175 @@
package main
import (
"github.com/microcosm-cc/bluemonday"
"encoding/json"
"crypto/sha1"
"io/ioutil"
"net/http"
"strings"
"html"
"time"
"fmt"
"os"
)
func DeferPollRun(instancereport InstanceReport, runninginstances *[]RunningInstance, instanceReportChan chan InstanceReport, reportPostChan chan ReportPost) {
delay := 10
if instancereport.status == RUNNING && instancereport.numposts <= 10 {
delay = 10
} else if instancereport.status == RUNNING && instancereport.numposts > 10 {
delay = 15
} else if instancereport.status == 429 {
delay = 30
} else {
fmt.Println("error, status code is ------------->: ", instancereport.status)
os.Exit(1)
}
time.Sleep(time.Second * time.Duration(delay))
go StartInstancePoll(instancereport, reportPostChan, instanceReportChan)
}
func StartInstancePoll(instancereport InstanceReport, reportPostChan chan ReportPost, instanceReportChan chan InstanceReport) {
p := bluemonday.NewPolicy()
newposts := make([]ReportPost, 0)
// Only placing this here to later have the option of using
// an HTTP client via a SOCKS5 Tor proxy
if strings.Contains(instancereport.endpoint, ".onion") == true {
instanceReportChan <- InstanceReport{instancereport.endpoint, ONION_PROTOCOL, "", 0}
return
}
api_timeline := "https://" + instancereport.endpoint + "/api/v1/timelines/public?limit=40&min_id=" + instancereport.min_id
resp, err := http.Get(api_timeline)
if err != nil {
instanceReportChan <- InstanceReport{instancereport.endpoint, CLIENT_ISSUE, "", 0}
return
}
body, err := ioutil.ReadAll(resp.Body)
err = json.Unmarshal(body, &newposts)
if err != nil {
// instanceReportChan <- InstanceReport{instancereport.endpoint, resp.StatusCode, "", 0}
instanceReportChan <- InstanceReport{instancereport.endpoint, BAD_RESPONSE, "", 0}
return
}
newinstances := make([]string, 0)
min_id := ""
numposts := 0
for _, newpost := range newposts {
if newpost.Account.Acct == "" {
continue
}
posthash := sha1.New()
at_sign := strings.Index(newpost.Account.Acct, "@")
if at_sign == -1 {
at_sign = len(newpost.Account.Acct)
newpost.Account.Acct += "@" + instancereport.endpoint
}
// Calculate the post hash
fmt.Fprint(posthash, newpost.Url)
fmt.Fprint(posthash, newpost.normalized)
fmt.Fprint(posthash, newpost.Account.Acct)
fmt.Fprint(posthash, newpost.Account.Display_name)
newpost.posthash = posthash.Sum(nil)
newpost.normalized = html.UnescapeString(strings.ToLower(p.Sanitize(newpost.Content)))
reportPostChan <- newpost
// Check min_id
if newpost.Id > min_id {
min_id = newpost.Id
}
numposts = numposts + 1
newinstance := newpost.Account.Acct[at_sign+1:]
newinstances = AppendIfMissing(newinstances, newinstance)
}
for _, newinstance := range newinstances {
var q InstanceReport
q.endpoint = newinstance
q.status = NEW_INSTANCE
q.min_id = ""
q.numposts = 0
instanceReportChan <- q
}
instanceReportChan <- InstanceReport{instancereport.endpoint, resp.StatusCode, min_id, numposts}
}
// Change this to return a proper "err"
func GetNodeInfo(endpoint string, nodeinfo *NodeInfo) {
api_nodeinfo := "https://" + endpoint + "/nodeinfo/2.0.json"
http_client := http.Client{Timeout: 5 * time.Second}
resp, err := http_client.Get(api_nodeinfo)
if err != nil {
return
}
body, err := ioutil.ReadAll(resp.Body)
err = json.Unmarshal(body, &nodeinfo)
if err != nil {
// fmt.Println("Unmarshal 2");
return
}
}
func NewInstance(endpoint string, runninginstances *[]RunningInstance, instanceReportChan chan InstanceReport, reportPostChan chan ReportPost) {
var nodeinfo NodeInfo
if endpoint == "" {
return
}
// No repeats
for _, runninginstance := range *runninginstances {
if runninginstance.Endpoint == endpoint {
return
}
}
// Check node type
GetNodeInfo(endpoint, &nodeinfo)
if nodeinfo.Software.Name == "" {
go func() {
var q InstanceReport
q.endpoint = endpoint
q.status = BAD_NODEINFO
instanceReportChan <- q
return
}()
}
newinstance := RunningInstance{endpoint, "", "", NEW_INSTANCE, "Queued"}
*runninginstances = append(*runninginstances, newinstance)
if nodeinfo.Software.Name == "pleroma" || nodeinfo.Software.Name == "mastodon" {
var newinstancereport InstanceReport
newinstancereport.endpoint = endpoint
newinstancereport.status = 0
newinstancereport.min_id = ""
newinstancereport.numposts = 0
go StartInstancePoll(newinstancereport, reportPostChan, instanceReportChan)
}
}
func SuspendInstance(suspendinstance InstanceReport, runninginstances *[]RunningInstance) {
for i, runninginstance := range *runninginstances {
if runninginstance.Endpoint == suspendinstance.endpoint {
(*runninginstances)[i].Status = suspendinstance.status
(*runninginstances)[i].LastRun = time.Now().Format("2006.01.02-15:04:05")
return
}
}
}

136
poll/main.go Normal file
View File

@ -0,0 +1,136 @@
package main
import (
"github.com/jackc/pgx/pgxpool"
_ "net/http/pprof"
"net/http"
"context"
"time"
"fmt"
"net"
"log"
"os"
)
func AppendIfMissing(hay []string, needle string) []string {
for _, ele := range hay {
if ele == needle {
return hay
}
}
return append(hay, needle)
}
func writePost(pool *pgxpool.Pool, reportpost ReportPost) {
conn, err := pool.Acquire(context.Background())
if err != nil {
log.Fatal("Error connecting to database:", err)
os.Exit(1)
}
defer conn.Release()
// Insert new account if new
var accountid int
err = conn.QueryRow(context.Background(), "INSERT INTO accounts (acct, avatar, bot, created_at, display_name, url) VALUES($1, $2, $3, $4, $5, $6) ON CONFLICT(acct) DO UPDATE SET acct=EXCLUDED.acct RETURNING id", reportpost.Account.Acct, reportpost.Account.Avatar, reportpost.Account.Bot, reportpost.Account.Created_at, reportpost.Account.Display_name, reportpost.Account.Url).Scan(&accountid)
if err != nil {
fmt.Println("First ", err)
fmt.Println("--------------------------")
fmt.Println("Reported error: ", err)
fmt.Println("Account Acct: ", reportpost.Account.Acct)
fmt.Println("Account Avatar: ", reportpost.Account.Avatar)
fmt.Println("Account Avatar len: ", len(reportpost.Account.Avatar))
fmt.Println("Account Bot: ", reportpost.Account.Bot)
fmt.Println("Account Created_at: ", reportpost.Account.Created_at)
fmt.Println("Account Display: ", reportpost.Account.Display_name)
fmt.Println("Account URL: ", reportpost.Account.Url)
fmt.Println(reportpost)
fmt.Println("--------------------------")
os.Exit(1) // For now I want this to die and learn why it failed
return
}
// Insert new post if new
_, err = conn.Exec(context.Background(), "INSERT INTO posts (url, content, created_at, normalized, account_id, posthash) VALUES ($1, $2, $3, $4, $5, $6) ON CONFLICT (posthash) DO NOTHING", reportpost.Url, reportpost.Content, reportpost.Created_at, reportpost.normalized, accountid, reportpost.posthash)
if err != nil { // For now I want to know why this failed.
fmt.Println("Second ", err)
fmt.Println("--------------------------")
fmt.Println("Reported error: ", err)
fmt.Println("Url: ", reportpost.Url)
fmt.Println("Content: ", reportpost.Content)
fmt.Println("Created_at: ", reportpost.Created_at)
fmt.Println("normalized: ", reportpost.normalized)
fmt.Println("account_id", accountid)
fmt.Println("posthash: ", reportpost.posthash)
fmt.Println("--------------------------")
os.Exit(1) // For now I want this to die and learn why it failed
return
}
}
func engine() {
// Current instances
runninginstances := make([]RunningInstance, 0)
// Initial Setup
reportPostChan := make(chan ReportPost, 2000)
instanceReportChan := make(chan InstanceReport, 20)
// Setup Database
pool, err := pgxpool.Connect(context.Background(), "postgres://postgres@127.0.0.1/fedilogue")
if err != nil {
fmt.Fprintln(os.Stderr, "Unable to connect to database:", err)
os.Exit(1)
}
l, err := net.Listen("tcp", "127.0.0.1:5555")
if err != nil {
fmt.Println(err)
return
}
defer l.Close()
commandClient := make(chan net.Conn)
go func(l net.Listener) {
for {
c, err := l.Accept()
if err != nil {
fmt.Println("Error on accept")
commandClient <- nil
return
}
commandClient <- c
}
}(l)
for {
select {
case c := <-commandClient: // New client connection
go handleClient(c, &runninginstances, instanceReportChan)
case v := <-reportPostChan: // New Post
go writePost(pool, v)
case w := <-instanceReportChan: // Start or suspend instance
if w.status == NEW_INSTANCE {
NewInstance(w.endpoint, &runninginstances, instanceReportChan, reportPostChan)
} else if w.status == RUNNING || w.status == TOOMANYREQUESTS {
for i, runninginstance := range runninginstances {
if runninginstance.Endpoint == w.endpoint {
runninginstances[i].Min_id = w.min_id
runninginstances[i].Status = w.status
runninginstances[i].LastRun = time.Now().Format("2006.01.02-15:04:05")
}
}
go DeferPollRun(w, &runninginstances, instanceReportChan, reportPostChan)
} else {
SuspendInstance(w, &runninginstances)
}
}
}
}
func main() {
go engine()
log.Println("serving on port 8080")
log.Fatal(http.ListenAndServe(":8080", nil))
}