checking user and made 'pool' global

This commit is contained in:
farhan 2020-12-18 06:06:32 +00:00
parent 5bc9a68d53
commit ad9fe9fad2
7 changed files with 67 additions and 30 deletions

9
ctl.go
View File

@ -8,10 +8,9 @@ import (
"log"
"net"
"os"
"github.com/jackc/pgx/pgxpool"
)
func startctl(reportPostChan chan ReportPost, pool *pgxpool.Pool) {
func startctl(reportPostChan chan ReportPost) {
log.Print("Starting ctl listener on 127.0.0.1:5555")
l, err := net.Listen("tcp", "127.0.0.1:5555")
if err != nil {
@ -33,12 +32,12 @@ func startctl(reportPostChan chan ReportPost, pool *pgxpool.Pool) {
for {
c := <-commandClient // New client connection
go handleClient(c, reportPostChan, pool)
go handleClient(c, reportPostChan)
}
}
func handleClient(commandClient net.Conn, reportPostChan chan ReportPost, pool *pgxpool.Pool) {
func handleClient(commandClient net.Conn, reportPostChan chan ReportPost) {
sizebyte := make([]byte, 4)
var commandmap CommandMap
@ -80,7 +79,7 @@ func handleClient(commandClient net.Conn, reportPostChan chan ReportPost, pool *
} else {
responseback.Message = "Added: " + commandmap.Endpoint
runninginstances[commandmap.Endpoint] = RunningInstance{}
go StartInstance(commandmap.Endpoint, reportPostChan, pool)
go StartInstance(commandmap.Endpoint, reportPostChan)
}
ri_mutex.Unlock()
case "suspend":

29
db.go
View File

@ -5,16 +5,39 @@ import (
"fmt"
"log"
"github.com/jackc/pgx/pgxpool"
"github.com/davecgh/go-spew/spew"
"time"
)
func postHandler(reportPostChan chan ReportPost, pool *pgxpool.Pool) {
func postHandler(reportPostChan chan ReportPost) {
for { // Write posts
v := <-reportPostChan
go writePost(pool, v)
go writePost(v)
}
}
func writePost(pool *pgxpool.Pool, reportpost ReportPost) {
func check_user(uri string) (AccountType, error) {
conn, _:= pool.Acquire(context.Background())
defer conn.Release()
var accountData AccountType
//var q string
//var Acct string
//err := conn.QueryRow(context.Background(), "SELECT acct, avatar, bot, created_at, display_name FROM accounts WHERE uri = $1", uri).Scan(&accountData.Acct, &(accountData.Avatar), &(q), &accountData.Created_at, &accountData.Display_name)
var timez time.Time
row := conn.QueryRow(context.Background(), "SELECT acct, avatar, bot, created_at, display_name, FROM accounts WHERE uri = $1", uri)
err := row.Scan(&accountData.Acct, &accountData.Avatar, &accountData.Bot, &timez, &accountData.Display_name)
if err != nil {
return accountData, err
}
accountData.Url = uri
accountData.Created_at = timez.Format(time.RFC3339)
spew.Dump(accountData)
return accountData, err
}
func writePost(reportpost ReportPost) {
conn, err := pool.Acquire(context.Background())
if err != nil {
log.Fatal("Error connecting to database:", err)

View File

@ -8,11 +8,13 @@ import (
"regexp"
"runtime"
"sync"
"github.com/jackc/pgx/pgxpool"
)
// Current instances
var runninginstances map[string]RunningInstance
var ri_mutex = &sync.Mutex{}
var pool *pgxpool.Pool
func startpprof() {
log.Print("Starting http/pprof on :7777")
@ -27,10 +29,10 @@ func main() {
getSettings()
go startpprof()
pool := getDbPool()
pool = getDbPool()
for i := 0; i < settings.Database.Workers; i++ {
go postHandler(reportPostChan, pool)
go postHandler(reportPostChan)
}
p = bluemonday.NewPolicy()
@ -42,13 +44,13 @@ func main() {
_, exists := runninginstances[endpoint]
if exists == false {
runninginstances[endpoint] = RunningInstance{}
go StartInstance(endpoint, reportPostChan, pool)
go StartInstance(endpoint, reportPostChan)
}
ri_mutex.Unlock()
}
go startctl(reportPostChan, pool)
go webmain(reportPostChan, pool)
go startctl(reportPostChan)
go webmain(reportPostChan)
runtime.Goexit()
}

View File

@ -9,7 +9,6 @@ import (
"regexp"
"strings"
"time"
"github.com/jackc/pgx/pgxpool"
)
var p *bluemonday.Policy
@ -71,7 +70,7 @@ func GetNodeInfo(endpoint string) (http.Client, NodeInfo) {
return http_client, nodeinfo
}
func StartInstance(endpoint string, reportPostChan chan ReportPost, pool *pgxpool.Pool) {
func StartInstance(endpoint string, reportPostChan chan ReportPost) {
http_client, nodeinfo := GetNodeInfo(endpoint)
ri_mutex.Lock()
m := runninginstances[endpoint]
@ -90,14 +89,14 @@ func StartInstance(endpoint string, reportPostChan chan ReportPost, pool *pgxpoo
m.CaptureType = "Poll"
runninginstances[endpoint] = m
ri_mutex.Unlock()
PollMastodonPleroma(endpoint, reportPostChan, http_client, pool)
PollMastodonPleroma(endpoint, reportPostChan, http_client)
} else if nodeinfo.Software.Name == "mastodon" {
log.Print("Starting " + endpoint + " as " + nodeinfo.Software.Name)
m.CaptureType = "Stream"
runninginstances[endpoint] = m
ri_mutex.Unlock()
// PollMastodonPleroma(endpoint, reportPostChan, http_client, pool)
StreamMastodon(endpoint, reportPostChan, pool)
StreamMastodon(endpoint, reportPostChan)
}
}

22
poll.go
View File

@ -10,7 +10,6 @@ import (
"net/http"
"strings"
"time"
"github.com/jackc/pgx/pgxpool"
)
type ImageData struct {
@ -54,6 +53,23 @@ type PostInfo struct {
func fetch_user_info(http_client http.Client, uri string) (UserInfo, error) {
var userinfo UserInfo
accounttype, err := check_user(uri)
if err == nil {
userinfo.Id = uri
userinfo.Type = uri
if accounttype.Bot {
userinfo.Type = "Bot"
} else {
userinfo.Type = "Person"
}
userinfo.PreferredUsername = accounttype.Display_name
// userInfo.Url = Icon =
// userInfo.iconData = ImageData
userinfo.Icon.Type = "Image"
userinfo.Icon.Url = accounttype.Avatar
log.Print("This exit path!!!")
return userinfo, nil
}
req, err := http.NewRequest(http.MethodGet, uri, nil)
if err != nil {
@ -100,7 +116,7 @@ func fetch_post(http_client http.Client, uri string) (PostInfo, error) {
return postinfo, nil
}
func PollMastodonPleroma(endpoint string, reportPostChan chan ReportPost, http_client http.Client, pool *pgxpool.Pool) {
func PollMastodonPleroma(endpoint string, reportPostChan chan ReportPost, http_client http.Client) {
newposts := make([]ReportPost, 0)
min_id := ""
@ -313,7 +329,7 @@ func PollMastodonPleroma(endpoint string, reportPostChan chan ReportPost, http_c
if exists == false || o.Status == KEEPALIVE {
m := RunningInstance{}
runninginstances[newinstance] = m
go StartInstance(newinstance, reportPostChan, pool)
go StartInstance(newinstance, reportPostChan)
}
ri_mutex.Unlock()

View File

@ -10,10 +10,9 @@ import (
"net/http"
"strings"
"time"
"github.com/jackc/pgx/pgxpool"
)
func StreamMastodon(endpoint string, reportPostChan chan ReportPost, pool *pgxpool.Pool) {
func StreamMastodon(endpoint string, reportPostChan chan ReportPost) {
http_client := http.Client{}
var client_id string
@ -174,7 +173,7 @@ func StreamMastodon(endpoint string, reportPostChan chan ReportPost, pool *pgxpo
if exists == false || o.Status == KEEPALIVE {
m := RunningInstance{}
runninginstances[newinstance] = m
go StartInstance(newinstance, reportPostChan, pool)
go StartInstance(newinstance, reportPostChan)
}
ri_mutex.Unlock()

13
web.go
View File

@ -11,7 +11,6 @@ import (
"os"
"strings"
"time"
"github.com/jackc/pgx/pgxpool"
)
// CreateObject - Used by post web receiver
@ -23,11 +22,10 @@ type CreateObject struct {
Type string `json:"type"`
}
// CreateObject - Used by post web receiver
type FindType struct {
// RelayBase - The base object used by web receiver
type RelayBase struct {
Actor string `json:"actor"`
Cc []string `json:"cc"`
//Object interface{} `json:"Object"`
Object json.RawMessage `json:"Object"`
ID string `json:"id"`
Published string `json:"published"`
@ -90,7 +88,6 @@ func webfinger(w http.ResponseWriter, r *http.Request) {
}
w.Header().Set("Content-Type", "application/json; charset=utf-8")
fmt.Fprintf(w, webfingerstr)
fmt.Println("Writes properly")
} else {
fmt.Println(query)
w.WriteHeader(http.StatusNotFound)
@ -109,7 +106,7 @@ func inboxHandler(reportPostChan chan ReportPost) http.HandlerFunc {
return
}
var findtype FindType
var findtype RelayBase
err = json.Unmarshal(body, &findtype)
fmt.Println(string(body))
@ -143,7 +140,9 @@ func inboxHandler(reportPostChan chan ReportPost) http.HandlerFunc {
ri_mutex.Unlock()
// This only needs to be done if the user does not exist in the database
log.Print("Actor object: ", createobject.Actor)
realuser, err := fetch_user_info(o.client, createobject.Actor)
_, _ = check_user(createobject.Actor)
if err != nil {
return
}
@ -261,7 +260,7 @@ func errorHandler(w http.ResponseWriter, r *http.Request) {
fmt.Println("404 --> ", r.URL.Path)
}
func webmain(reportPostChan chan ReportPost, pool *pgxpool.Pool) {
func webmain(reportPostChan chan ReportPost) {
http.HandleFunc("/.well-known/webfinger", webfinger)
http.HandleFunc("/.well-known/host-meta", hostmeta)
http.HandleFunc("/inbox", inboxHandler(reportPostChan))