blood sweat and tears later, memory leak is gone

restructured how the engine works
some communication is through a shared object, perhaps I will revisit
this problem in the future. Until then, this is stable and works...
This commit is contained in:
farhan 2020-11-17 18:28:59 -05:00
parent 94334b710b
commit ab8e0182bb
5 changed files with 174 additions and 119 deletions

View File

@ -15,18 +15,18 @@ type CommandMap struct {
Endpoint string `json:"Endpoint"`
}
// Instance's new min_id value
type RunningInstance struct {
Endpoint string `json:"endpoint"`
Software string `json:"software"`
Min_id string
Status int `json:"status"`
Status int `json:"status"`
LastRun string `json:"lastrun"`
}
type ResponseBack struct {
Type string `json:"Type"`
Message string `json:"Message"`
RunningInstances []RunningInstance `json:"RunningInstances"`
RunningInstances map[string]RunningInstance `json:"RunningInstances"`
}
func main() {
@ -120,7 +120,7 @@ func main() {
case "add":
fmt.Println("Add instance")
case "status":
for _, runninginstance := range responseback.RunningInstances {
for endpoint, runninginstance := range responseback.RunningInstances {
if runninginstance.Status == 0 {
fmt.Fprintf(os.Stdout, "New\t")
} else if runninginstance.Status == 200 {
@ -134,7 +134,7 @@ func main() {
if runninginstance.LastRun == "Queued" {
fmt.Fprintf(os.Stdout, "\t\t")
}
fmt.Fprintf(os.Stdout, "%s\n", runninginstance.Endpoint)
fmt.Fprintf(os.Stdout, "%s\n", endpoint)
}
}
}

View File

@ -10,7 +10,7 @@ import (
"os"
)
func handleClient(commandClient net.Conn, runninginstances *[]RunningInstance, instanceReportChan chan InstanceReport) {
func handleClient(commandClient net.Conn, reportPostChan chan ReportPost) {
sizebyte := make([]byte, 4)
var commandmap CommandMap
@ -47,13 +47,16 @@ func handleClient(commandClient net.Conn, runninginstances *[]RunningInstance, i
responseback.Message = "Ok"
case "add":
log.Print("Manually added instance: " + commandmap.Endpoint)
var q InstanceReport
q.endpoint = commandmap.Endpoint
q.status = NEW_INSTANCE
instanceReportChan <- q
responseback.Message = "Added " + commandmap.Endpoint
ri_mutex.Lock()
_, exists := runninginstances[commandmap.Endpoint]
if exists == true {
log.Println("Already exists: " + commandmap.Endpoint)
} else {
runninginstances[commandmap.Endpoint] = RunningInstance{}
}
ri_mutex.Unlock()
go StartInstance(commandmap.Endpoint, reportPostChan)
responseback.Message = "Already exists: " + commandmap.Endpoint
case "suspend":
fmt.Println("Suspend")
case "resume":
@ -64,7 +67,7 @@ func handleClient(commandClient net.Conn, runninginstances *[]RunningInstance, i
responseback.Type = "status"
responseback.RunningInstances = *runninginstances
responseback.RunningInstances = runninginstances
responsebytes, err := json.Marshal(responseback)
if err != nil {

View File

@ -34,18 +34,8 @@ type AccountType struct {
Url string `json:"url"`
}
// Used to report a new instance to main
type InstanceReport struct {
endpoint string
status int
min_id string
numposts int
}
// Instance's new min_id value
type RunningInstance struct {
Endpoint string `json:"endpoint"`
Software string `json:"software"`
Min_id string
Status int `json:"status"`
@ -69,5 +59,5 @@ type CommandMap struct {
type ResponseBack struct {
Type string `json:"Type"`
Message string `json:"Message"`
RunningInstances []RunningInstance `json:"RunningInstances"`
RunningInstances map[string]RunningInstance `json:"RunningInstances"`
}

View File

@ -10,10 +10,10 @@ import (
"html"
"time"
"fmt"
"os"
)
func DeferPollRun(instancereport InstanceReport, runninginstances *[]RunningInstance, instanceReportChan chan InstanceReport, reportPostChan chan ReportPost) {
/*
func DeferPollRun(instancereport InstanceReport, instanceReportChan chan InstanceReport, reportPostChan chan ReportPost) {
delay := 10
if instancereport.status == RUNNING && instancereport.numposts <= 10 {
@ -30,106 +30,157 @@ func DeferPollRun(instancereport InstanceReport, runninginstances *[]RunningInst
StartInstancePoll(instancereport, reportPostChan, instanceReportChan)
}
*/
func StartInstancePoll(instancereport InstanceReport, reportPostChan chan ReportPost, instanceReportChan chan InstanceReport) {
func PollMastodonPleroma(endpoint string, reportPostChan chan ReportPost) {
fmt.Println("Arrived at PollMastodonPleroma for " + endpoint)
// Make this a global variable
p := bluemonday.NewPolicy()
newposts := make([]ReportPost, 0)
// Only placing this here to later have the option of using
// an HTTP client via a SOCKS5 Tor proxy
if strings.Contains(instancereport.endpoint, ".onion") {
instanceReportChan <- InstanceReport{instancereport.endpoint, ONION_PROTOCOL, "", 0}
return
}
api_timeline := "https://" + instancereport.endpoint + "/api/v1/timelines/public?limit=40&min_id=" + instancereport.min_id
resp, err := http.Get(api_timeline)
if err != nil {
instanceReportChan <- InstanceReport{instancereport.endpoint, CLIENT_ISSUE, "", 0}
return
}
defer resp.Body.Close()
body, err := ioutil.ReadAll(resp.Body)
if err != nil {
instanceReportChan <- InstanceReport{instancereport.endpoint, BAD_RESPONSE, "", 0}
return
}
err = json.Unmarshal(body, &newposts)
if err != nil {
instanceReportChan <- InstanceReport{instancereport.endpoint, BAD_RESPONSE, "", 0}
return
}
newinstances := make([]string, 0)
min_id := ""
numposts := 0
for _, newpost := range newposts {
if newpost.Account.Acct == "" {
continue
http_client := http.Client{Timeout: 5 * time.Second}
for {
api_timeline := "https://" + endpoint + "/api/v1/timelines/public?limit=40&min_id=" + min_id
fmt.Println(api_timeline)
numposts := 0
// newinstances := make([]string, 0)
resp, err := http_client.Get(api_timeline)
if err != nil {
ri_mutex.Lock()
var m = runninginstances[endpoint]
m.Status = CLIENT_ISSUE
runninginstances[endpoint] = m
ri_mutex.Unlock()
return
}
sendpost := newpost
posthash := sha1.New()
at_sign := strings.Index(sendpost.Account.Acct, "@")
if at_sign == -1 {
at_sign = len(sendpost.Account.Acct)
sendpost.Account.Acct += "@" + instancereport.endpoint
body, err := ioutil.ReadAll(resp.Body)
if err != nil {
ri_mutex.Lock()
var m = runninginstances[endpoint]
m.Status = BAD_RESPONSE
runninginstances[endpoint] = m
ri_mutex.Unlock()
return
}
err = json.Unmarshal(body, &newposts)
if err != nil {
ri_mutex.Lock()
var m = runninginstances[endpoint]
m.Status = BAD_RESPONSE
runninginstances[endpoint] = m
ri_mutex.Unlock()
return
}
// Calculate the post hash
fmt.Fprint(posthash, sendpost.Url)
fmt.Fprint(posthash, sendpost.normalized)
fmt.Fprint(posthash, sendpost.Account.Acct)
fmt.Fprint(posthash, sendpost.Account.Display_name)
sendpost.posthash = posthash.Sum(nil)
resp.Body.Close()
sendpost.normalized = html.UnescapeString(strings.ToLower(p.Sanitize(sendpost.Content)))
reportPostChan <- sendpost
for _, newpost := range newposts {
if newpost.Account.Acct == "" {
continue
}
posthash := sha1.New()
// Check min_id
if sendpost.Id > min_id {
min_id = sendpost.Id
at_sign := strings.Index(newpost.Account.Acct, "@")
if at_sign == -1 {
at_sign = len(newpost.Account.Acct)
newpost.Account.Acct += "@" + endpoint
}
// Calculate the post hash
fmt.Fprint(posthash, newpost.Url)
fmt.Fprint(posthash, newpost.normalized)
fmt.Fprint(posthash, newpost.Account.Acct)
fmt.Fprint(posthash, newpost.Account.Display_name)
newpost.posthash = posthash.Sum(nil)
newpost.normalized = html.UnescapeString(strings.ToLower(p.Sanitize(newpost.Content)))
reportPostChan <- newpost
// Check min_id
if newpost.Id > min_id {
min_id = newpost.Id
}
numposts = numposts + 1
newinstance := newpost.Account.Acct[at_sign+1:]
ri_mutex.Lock()
_, exists := runninginstances[newinstance]
if exists == false {
m := RunningInstance{}
runninginstances[newinstance] = m
go StartInstance(newinstance, reportPostChan)
}
ri_mutex.Unlock()
}
numposts = numposts + 1
newinstance := sendpost.Account.Acct[at_sign+1:]
newinstances = AppendIfMissing(newinstances, newinstance)
time.Sleep(time.Second * 10)
}
for _, newinstance := range newinstances {
var q InstanceReport
q.endpoint = newinstance
q.status = NEW_INSTANCE
q.min_id = ""
q.numposts = 0
instanceReportChan <- q
}
instanceReportChan <- InstanceReport{instancereport.endpoint, resp.StatusCode, min_id, numposts}
}
// Change this to return a proper "err"
func GetNodeInfo(endpoint string, nodeinfo *NodeInfo) {
func GetNodeInfo(endpoint string) (NodeInfo) {
var nodeinfo NodeInfo
api_nodeinfo := "https://" + endpoint + "/nodeinfo/2.0.json"
http_client := http.Client{Timeout: 5 * time.Second}
resp, err := http_client.Get(api_nodeinfo)
if err != nil {
return
fmt.Println("Make a legit error here")
return NodeInfo{}
}
defer resp.Body.Close()
body, err := ioutil.ReadAll(resp.Body)
err = json.Unmarshal(body, &nodeinfo)
if err != nil {
// fmt.Println("Unmarshal 2");
return
fmt.Println("Make a legit error here")
fmt.Println("Unmarshal 2");
return NodeInfo{}
}
return nodeinfo
}
func NewInstance(endpoint string, runninginstances *[]RunningInstance, instanceReportChan chan InstanceReport, reportPostChan chan ReportPost) {
func StartInstance(endpoint string, reportPostChan chan ReportPost) {
// This might not be necessary...
// ri_mutex.Lock()
// _, exists := runninginstances[endpoint]
// fmt.Println("The exists is", exists)
// ri_mutex.Unlock()
// if exists == true {
// return
// }
nodeinfo := GetNodeInfo(endpoint)
if nodeinfo.Software.Name == "" {
ri_mutex.Lock()
var m = runninginstances[endpoint]
m.Software = ""
runninginstances[endpoint] = m
ri_mutex.Unlock()
return
}
if nodeinfo.Software.Name == "pleroma" || nodeinfo.Software.Name == "mastodon" {
go PollMastodonPleroma(endpoint, reportPostChan)
}
}
/*
func NewInstance(endpoint string, instanceReportChan chan InstanceReport, reportPostChan chan ReportPost) {
var nodeinfo NodeInfo
if endpoint == "" {
@ -137,7 +188,8 @@ func NewInstance(endpoint string, runninginstances *[]RunningInstance, instanceR
}
// No repeats
for _, runninginstance := range *runninginstances {
for _, runninginstance := range runninginstances {
if runninginstance.Endpoint == endpoint {
return
}
@ -155,7 +207,7 @@ func NewInstance(endpoint string, runninginstances *[]RunningInstance, instanceR
}
newinstance := RunningInstance{endpoint, "", "", NEW_INSTANCE, "Queued"}
*runninginstances = append(*runninginstances, newinstance)
runninginstances = append(runninginstances, newinstance)
if nodeinfo.Software.Name == "pleroma" || nodeinfo.Software.Name == "mastodon" {
var newinstancereport InstanceReport
@ -166,13 +218,16 @@ func NewInstance(endpoint string, runninginstances *[]RunningInstance, instanceR
go StartInstancePoll(newinstancereport, reportPostChan, instanceReportChan)
}
}
*/
func SuspendInstance(suspendinstance InstanceReport, runninginstances *[]RunningInstance) {
for i, runninginstance := range *runninginstances {
/*
func SuspendInstance(suspendinstance InstanceReport) {
for i, runninginstance := range runninginstances {
if runninginstance.Endpoint == suspendinstance.endpoint {
(*runninginstances)[i].Status = suspendinstance.status
(*runninginstances)[i].LastRun = time.Now().Format("2006.01.02-15:04:05")
(runninginstances)[i].Status = suspendinstance.status
(runninginstances)[i].LastRun = time.Now().Format("2006.01.02-15:04:05")
return
}
}
}
*/

View File

@ -5,7 +5,7 @@ import (
_ "net/http/pprof"
"net/http"
"context"
"time"
"sync"
"fmt"
"net"
"log"
@ -21,7 +21,12 @@ func AppendIfMissing(hay []string, needle string) []string {
return append(hay, needle)
}
func imdone() {
// fmt.Println("I am done")
}
func writePost(pool *pgxpool.Pool, reportpost ReportPost) {
defer imdone()
conn, err := pool.Acquire(context.Background())
if err != nil {
log.Fatal("Error connecting to database:", err)
@ -67,14 +72,15 @@ func writePost(pool *pgxpool.Pool, reportpost ReportPost) {
}
}
func engine() {
// Current instances
var runninginstances map[string]RunningInstance
var ri_mutex = &sync.Mutex{}
// Current instances
runninginstances := make([]RunningInstance, 0)
func engine() {
runninginstances = make(map[string]RunningInstance)
// Initial Setup
reportPostChan := make(chan ReportPost)
instanceReportChan := make(chan InstanceReport, 200)
// Setup Database
pool, err := pgxpool.Connect(context.Background(), "postgres://postgres@127.0.0.1/fedilogue")
@ -88,9 +94,7 @@ func engine() {
fmt.Println(err)
return
}
defer l.Close()
x := 0
// defer l.Close()
commandClient := make(chan net.Conn)
@ -98,8 +102,9 @@ func engine() {
for {
c, err := l.Accept()
if err != nil {
fmt.Println("Error on accept")
commandClient <- nil
fmt.Println("Error on accept", err)
os.Exit(0)
// commandClient <- nil
return
}
commandClient <- c
@ -108,25 +113,26 @@ func engine() {
go func() {
for{
v := <-reportPostChan // New Post
fmt.Println(v)
v := <-reportPostChan // New Post
// fmt.Println(v)
go writePost(pool, v)
}
}()
go func() {
for {
c := <-commandClient // New client connection
go handleClient(c, &runninginstances, instanceReportChan)
c := <-commandClient // New client connection
go handleClient(c, reportPostChan)
}
}()
/*
for {
select {
case w := <-instanceReportChan: // Start or suspend instance
if w.status == NEW_INSTANCE {
x = x + 1
NewInstance(w.endpoint, &runninginstances, instanceReportChan, reportPostChan)
NewInstance(w.endpoint, instanceReportChan, reportPostChan)
} else if w.status == RUNNING || w.status == TOOMANYREQUESTS {
for i, runninginstance := range runninginstances {
if runninginstance.Endpoint == w.endpoint {
@ -135,12 +141,13 @@ func engine() {
runninginstances[i].LastRun = time.Now().Format("2006.01.02-15:04:05")
}
}
go DeferPollRun(w, &runninginstances, instanceReportChan, reportPostChan)
go DeferPollRun(w, instanceReportChan, reportPostChan)
} else {
SuspendInstance(w, &runninginstances)
SuspendInstance(w)
}
}
}
*/
}
func main() {