Merge branch 'wip-cacheupdate' into 'master'
Wip cacheupdate See merge request khanzf/fedilogue!21
This commit is contained in:
commit
c8ce84ece7
@ -3,6 +3,7 @@ package main
|
||||
import (
|
||||
"encoding/json"
|
||||
"io/ioutil"
|
||||
|
||||
"muzzammil.xyz/jsonc"
|
||||
)
|
||||
|
||||
@ -34,12 +35,10 @@ type Proxy struct {
|
||||
// Settings - Configuration file structure
|
||||
type Settings struct {
|
||||
Crawl bool `"json:crawl"`
|
||||
Banned []string `"json:banned"`
|
||||
Alwaysbot []string `"json:alwaysbot"`
|
||||
Proxies []Proxy `"json:proxies"`
|
||||
Externalaccounts []ExtAccount `"json:externalaccounts"`
|
||||
MassFollowers []MassFollower `"json:massfollowers"`
|
||||
LogLevel int `"json:loglevel"`
|
||||
LogLevel int `"json:loglevel"`
|
||||
}
|
||||
|
||||
var settings Settings
|
||||
|
@ -13,21 +13,6 @@
|
||||
*/
|
||||
"crawlonion": false,
|
||||
|
||||
// Ignore the following instances
|
||||
"banned": [
|
||||
"switter.at",
|
||||
"xxxtumblr.org",
|
||||
"sinblr.com",
|
||||
"twitiverse.com"
|
||||
],
|
||||
|
||||
// Consider all posts from these instances to be bots
|
||||
"alwaysbot": [
|
||||
"mstdn.foxfam.club",
|
||||
"botsin.space",
|
||||
"newsbots.eu"
|
||||
],
|
||||
|
||||
// Connect through the following proxies
|
||||
"proxies": [
|
||||
// {
|
||||
|
@ -5,7 +5,8 @@ import (
|
||||
"encoding/json"
|
||||
"io"
|
||||
"net"
|
||||
// "fmt"
|
||||
|
||||
"gitlab.com/khanzf/fedilogue/shared"
|
||||
)
|
||||
|
||||
func startctl() {
|
||||
@ -38,8 +39,8 @@ func startctl() {
|
||||
func handleClient(commandClient net.Conn) {
|
||||
defer commandClient.Close()
|
||||
sizebyte := make([]byte, 4)
|
||||
var commandmap CommandMap
|
||||
var responseback ResponseBack
|
||||
var commandmap shared.CommandMap
|
||||
var responseback shared.ResponseBack
|
||||
n, err := io.ReadFull(commandClient, sizebyte)
|
||||
if err != nil {
|
||||
logFatal.Fatal("Read error: ", err)
|
||||
|
@ -1,19 +1,20 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"context"
|
||||
"net/http"
|
||||
_ "net/http/pprof"
|
||||
"context"
|
||||
"runtime"
|
||||
"regexp"
|
||||
"runtime"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/microcosm-cc/bluemonday"
|
||||
"gitlab.com/khanzf/fedilogue/shared"
|
||||
)
|
||||
|
||||
// Current instances
|
||||
var runninginstances map[string]RunningInstance
|
||||
var runninginstances map[string]shared.RunningInstance
|
||||
var ri_mutex = &sync.Mutex{}
|
||||
|
||||
func startpprof() {
|
||||
@ -65,7 +66,7 @@ func StatusReport() {
|
||||
func main() {
|
||||
// Initial Setup
|
||||
logInit()
|
||||
runninginstances = make(map[string]RunningInstance)
|
||||
runninginstances = make(map[string]shared.RunningInstance)
|
||||
|
||||
getSettings()
|
||||
if len(settings.Proxies) > 0 {
|
||||
@ -92,7 +93,7 @@ func main() {
|
||||
}
|
||||
defer rows.Close()
|
||||
|
||||
go staggeredStart();
|
||||
go staggeredStart()
|
||||
go statusReportHandler()
|
||||
|
||||
for rows.Next() {
|
||||
@ -102,7 +103,10 @@ func main() {
|
||||
logErr("Unable to iterate database, exiting.")
|
||||
return
|
||||
}
|
||||
_, exists := GetRunner(endpoint)
|
||||
o, exists := GetRunner(endpoint)
|
||||
if o.Banned == true {
|
||||
continue // Banned instance
|
||||
}
|
||||
if exists == false {
|
||||
go StartInstance(endpoint)
|
||||
}
|
||||
|
@ -1,10 +1,11 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"gitlab.com/khanzf/fedilogue/shared"
|
||||
"strconv"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"gitlab.com/khanzf/fedilogue/shared"
|
||||
)
|
||||
|
||||
func TestStatusReport_empty_run(t *testing.T) {
|
||||
@ -14,24 +15,24 @@ func TestStatusReport_empty_run(t *testing.T) {
|
||||
|
||||
func TestStatusReport_full_content(t *testing.T) {
|
||||
defer func() {
|
||||
runninginstances = map[string]RunningInstance{}
|
||||
runninginstances = map[string]shared.RunningInstance{}
|
||||
}()
|
||||
runninginstances = make(map[string]RunningInstance)
|
||||
runninginstances = make(map[string]shared.RunningInstance)
|
||||
|
||||
identifier := 0
|
||||
var endpoint string
|
||||
|
||||
test_instance_types := []string{"pleroma", "mastodon", "unknown", ""}
|
||||
test_statuses := []int{NEW_INSTANCE, RUNNING, UNAUTHORIZED, FORBIDDEN, NOT_FOUND, UNPROCESSABLE_ENTITY, TOOMANYREQUESTS, INTERNAL_ERROR, CLIENT_ISSUE, ONION_PROTOCOL, BAD_RESPONSE, BAD_NODEINFO, UNSUPPORTED_INSTANCE, STREAM_ENDED, KEEPALIVE}
|
||||
test_statuses := []int{shared.NEW_INSTANCE, shared.RUNNING, shared.UNAUTHORIZED, shared.FORBIDDEN, shared.NOT_FOUND, shared.UNPROCESSABLE_ENTITY, shared.TOOMANYREQUESTS, shared.INTERNAL_ERROR, shared.CLIENT_ISSUE, shared.ONION_PROTOCOL, shared.BAD_RESPONSE, shared.BAD_NODEINFO, shared.UNSUPPORTED_INSTANCE, shared.STREAM_ENDED, shared.KEEPALIVE}
|
||||
|
||||
for _, test_instance_type := range test_instance_types {
|
||||
for _, test_status := range test_statuses {
|
||||
a := RunningInstance{}
|
||||
a := shared.RunningInstance{}
|
||||
endpoint = "endpoint" + strconv.Itoa(identifier) + ".test.com"
|
||||
a.client = BuildClient(endpoint)
|
||||
a.Client = BuildClient(endpoint)
|
||||
a.Status = test_status
|
||||
a.recentactivities = shared.NewUniqueFifo(10)
|
||||
a.recentactors = shared.NewUniqueFifo(10)
|
||||
a.Recentactivities = shared.NewUniqueFifo(10)
|
||||
a.Recentactors = shared.NewUniqueFifo(10)
|
||||
a.Software = test_instance_type
|
||||
a.Version = "0." + strconv.Itoa(identifier)
|
||||
a.LastRun = time.Now().Format(time.RFC3339)
|
||||
|
@ -1,101 +0,0 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"gitlab.com/khanzf/fedilogue/shared"
|
||||
"net/http"
|
||||
)
|
||||
|
||||
const (
|
||||
NEW_INSTANCE = 0
|
||||
RUNNING = 200
|
||||
UNAUTHORIZED = 401
|
||||
FORBIDDEN = 403
|
||||
NOT_FOUND = 404
|
||||
UNPROCESSABLE_ENTITY = 422
|
||||
TOOMANYREQUESTS = 429
|
||||
INTERNAL_ERROR = 500
|
||||
CLIENT_ISSUE = 600
|
||||
ONION_PROTOCOL = 601
|
||||
BAD_RESPONSE = 602
|
||||
BAD_NODEINFO = 604
|
||||
UNSUPPORTED_INSTANCE = 605
|
||||
STREAM_ENDED = 606
|
||||
KEEPALIVE = 607
|
||||
)
|
||||
|
||||
type ObjectType struct {
|
||||
Id string `json:"id"`
|
||||
}
|
||||
|
||||
// Parsing Unmarshal JSON type
|
||||
type ReportActivity struct {
|
||||
|
||||
// Retrieved values
|
||||
Id string `json:"id"`
|
||||
Uri string `json:"uri"`
|
||||
Account AccountType
|
||||
Content string `json:"content"`
|
||||
Created_at string `json:"created_at"`
|
||||
|
||||
// Derived values
|
||||
normalized string
|
||||
}
|
||||
|
||||
type AccountType struct {
|
||||
Acct string `json:"acct"`
|
||||
Avatar string `json:"avatar"`
|
||||
Bot bool `json:"bot"`
|
||||
Created_at string `json:"created_at"`
|
||||
Display_name string `json:"display_name"`
|
||||
Url string `json:"url"`
|
||||
}
|
||||
|
||||
// Instance's new min_id value
|
||||
type RunningInstance struct {
|
||||
Software string `json:"software"`
|
||||
Version string `json:"version"`
|
||||
Status int `json:"status"`
|
||||
LastRun string `json:"lastrun"`
|
||||
CaptureType string `json:"capturetype"`
|
||||
client http.Client
|
||||
client_id string
|
||||
client_secret string
|
||||
recentactivities *shared.UniqueFifo
|
||||
recentactors *shared.UniqueFifo
|
||||
}
|
||||
|
||||
type NodeInfoSoftware struct {
|
||||
Name string `json:"name"`
|
||||
Version string `json:"version"`
|
||||
}
|
||||
|
||||
type NodeInfo struct {
|
||||
Software NodeInfoSoftware `json:"software"`
|
||||
}
|
||||
|
||||
type CommandMap struct {
|
||||
Type string `json:"Type"`
|
||||
Endpoint string `json:"Endpoint"`
|
||||
}
|
||||
|
||||
type ResponseBack struct {
|
||||
Type string `json:"Type"`
|
||||
Message string `json:"Message"`
|
||||
RunningInstances map[string]RunningInstance `json:"RunningInstances"`
|
||||
}
|
||||
|
||||
type Userinfo struct {
|
||||
Id string `"json:id"`
|
||||
Type string `"json:type"`
|
||||
Following string `"json:following"`
|
||||
Followers string `"json:followers"`
|
||||
Inbox string `"json:inbox"`
|
||||
Outbox string `"json:outbox"`
|
||||
Featured string `"json:featured"`
|
||||
PreferredUsername string `"json:preferredUsername"`
|
||||
Name string `"json:name"`
|
||||
Summary string `"json:summary"`
|
||||
Url string `"json:Url"`
|
||||
ManuallyApprovesFollowers string `"json:manuallyApprovesFollowers"`
|
||||
Discoverable string `"json:discoverable"`
|
||||
}
|
@ -1,30 +1,31 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"gitlab.com/khanzf/fedilogue/shared"
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"io/ioutil"
|
||||
"math/rand"
|
||||
"net"
|
||||
"net/http"
|
||||
"net/url"
|
||||
"math/rand"
|
||||
"context"
|
||||
"strings"
|
||||
"time"
|
||||
"net"
|
||||
"fmt"
|
||||
|
||||
"gitlab.com/khanzf/fedilogue/shared"
|
||||
)
|
||||
|
||||
var staggeredStartChan chan bool
|
||||
|
||||
func DoTries(o *RunningInstance, req *http.Request) (*http.Response, error) {
|
||||
func DoTries(o *shared.RunningInstance, req *http.Request) (*http.Response, error) {
|
||||
var resp *http.Response
|
||||
var err error
|
||||
|
||||
for tries := 0; tries < 10; tries++ {
|
||||
resp, err = o.client.Do(req)
|
||||
resp, err = o.Client.Do(req)
|
||||
if err != nil {
|
||||
// URL.Scheme, Host, Path Opaque
|
||||
logWarn("Failure connecting to " + req.URL.Scheme + "://" + req.URL.Host + req.URL.Path + ", attempt ", tries + 1, ", sleeping for 5 minutes: ", err)
|
||||
logWarn("Failure connecting to "+req.URL.Scheme+"://"+req.URL.Host+req.URL.Path+", attempt ", tries+1, ", sleeping for 5 minutes: ", err)
|
||||
time.Sleep(time.Minute * 5)
|
||||
continue
|
||||
}
|
||||
@ -37,7 +38,7 @@ func BuildClient(endpoint string) http.Client {
|
||||
// Test: TestBuildClient, TestBuildClientProxy
|
||||
/* The seemingly unused 'endpoint' variable is for proxying based on endpoint, ie for Tor */
|
||||
tr := &http.Transport{
|
||||
MaxIdleConns: 2,
|
||||
MaxIdleConns: 2,
|
||||
IdleConnTimeout: 3600 * time.Second,
|
||||
DialContext: (&net.Dialer{
|
||||
Timeout: 30 * time.Second,
|
||||
@ -61,17 +62,26 @@ func BuildClient(endpoint string) http.Client {
|
||||
return client
|
||||
}
|
||||
|
||||
func GetRunner(endpoint string) (RunningInstance, bool) {
|
||||
func GetRunner(endpoint string) (shared.RunningInstance, bool) {
|
||||
// Tests: TestGetRunnerNonExist, TestGetRunnerExists
|
||||
ri_mutex.Lock()
|
||||
o, exists := runninginstances[endpoint]
|
||||
|
||||
if exists == false {
|
||||
o = RunningInstance{}
|
||||
o.client = BuildClient(endpoint)
|
||||
o.Status = KEEPALIVE
|
||||
o.recentactivities = shared.NewUniqueFifo(10)
|
||||
o.recentactors = shared.NewUniqueFifo(10)
|
||||
o = shared.RunningInstance{}
|
||||
selectRet := pool.QueryRow(context.Background(), "SELECT banned, alwaysbot FROM instances WHERE endpoint = $1", endpoint)
|
||||
err := selectRet.Scan(&o.Banned, &o.Alwaysbot)
|
||||
if err != nil {
|
||||
logWarn("There is a database connection issue")
|
||||
}
|
||||
if o.Banned == true {
|
||||
logInfo("Banned instance: ", endpoint)
|
||||
} else {
|
||||
o.Client = BuildClient(endpoint)
|
||||
o.Status = shared.KEEPALIVE
|
||||
o.Recentactivities = shared.NewUniqueFifo(10)
|
||||
o.Recentactors = shared.NewUniqueFifo(10)
|
||||
}
|
||||
runninginstances[endpoint] = o
|
||||
}
|
||||
ri_mutex.Unlock()
|
||||
@ -79,19 +89,19 @@ func GetRunner(endpoint string) (RunningInstance, bool) {
|
||||
return o, exists
|
||||
}
|
||||
|
||||
func UpdateRunner(endpoint string, o RunningInstance) {
|
||||
func UpdateRunner(endpoint string, o shared.RunningInstance) {
|
||||
// Tests: None necessary
|
||||
ri_mutex.Lock()
|
||||
runninginstances[endpoint] = o
|
||||
ri_mutex.Unlock()
|
||||
}
|
||||
|
||||
func GetInstanceInfo(endpoint string, o RunningInstance) RunningInstance {
|
||||
func GetInstanceInfo(endpoint string, o shared.RunningInstance) shared.RunningInstance {
|
||||
/* Checking order
|
||||
* Mastodon/Pleroma
|
||||
* Um..nothing else yet
|
||||
*/
|
||||
var nodeinfo NodeInfo
|
||||
var nodeinfo shared.NodeInfo
|
||||
pleromastodon_nodeinfo_uri := "https://" + endpoint + "/nodeinfo/2.0.json"
|
||||
|
||||
req, _ := http.NewRequest("GET", pleromastodon_nodeinfo_uri, nil)
|
||||
@ -123,7 +133,7 @@ func GetInstanceInfo(endpoint string, o RunningInstance) RunningInstance {
|
||||
resp_index, err := DoTries(&o, req)
|
||||
o.LastRun = time.Now().Format(time.RFC3339)
|
||||
if err != nil {
|
||||
o.Status = UNSUPPORTED_INSTANCE
|
||||
o.Status = shared.UNSUPPORTED_INSTANCE
|
||||
o.Software = "Unsupported"
|
||||
logWarn("Unable to connect to " + endpoint + ", giving up")
|
||||
return o
|
||||
@ -132,7 +142,7 @@ func GetInstanceInfo(endpoint string, o RunningInstance) RunningInstance {
|
||||
|
||||
indexbin, err := ioutil.ReadAll(resp_index.Body)
|
||||
if err != nil {
|
||||
o.Status = UNSUPPORTED_INSTANCE
|
||||
o.Status = shared.UNSUPPORTED_INSTANCE
|
||||
o.Software = "Unsupported"
|
||||
logWarn("Unable to read index of " + endpoint + ", giving up")
|
||||
return o
|
||||
@ -156,7 +166,7 @@ func GetInstanceInfo(endpoint string, o RunningInstance) RunningInstance {
|
||||
return o
|
||||
}
|
||||
|
||||
func LogInstance(endpoint string, o RunningInstance) bool {
|
||||
func LogInstance(endpoint string, o shared.RunningInstance) bool {
|
||||
selectRet := pool.QueryRow(context.Background(), "SELECT FROM instances WHERE endpoint = $1", endpoint)
|
||||
err := selectRet.Scan()
|
||||
if err == nil {
|
||||
@ -173,7 +183,7 @@ func LogInstance(endpoint string, o RunningInstance) bool {
|
||||
}
|
||||
|
||||
func CheckInstance(newinstance string, callerEndpoint string) {
|
||||
if settings.Crawl == true && stringexists(newinstance, settings.Banned) == false {
|
||||
if settings.Crawl == true {
|
||||
// Skip over this if its the same as the endpoint or empty
|
||||
if newinstance == callerEndpoint || newinstance == "" {
|
||||
return
|
||||
@ -183,7 +193,7 @@ func CheckInstance(newinstance string, callerEndpoint string) {
|
||||
for attempt := 0; attempt > 5; attempt = attempt + 1 {
|
||||
_, err = net.LookupHost(newinstance)
|
||||
if err != nil {
|
||||
logDebug("Unable to resolve " + newinstance + " attempt ", attempt, "/5. Sleeping for 30 seconds")
|
||||
logDebug("Unable to resolve "+newinstance+" attempt ", attempt, "/5. Sleeping for 30 seconds")
|
||||
time.Sleep(time.Second * 30)
|
||||
continue
|
||||
}
|
||||
@ -202,11 +212,11 @@ func CheckInstance(newinstance string, callerEndpoint string) {
|
||||
// Going forward, this might be merged into GetRunner
|
||||
ri_mutex.Lock()
|
||||
o, exists := runninginstances[newinstance]
|
||||
if exists == false || o.Status == KEEPALIVE {
|
||||
m := RunningInstance{}
|
||||
m.client = BuildClient(newinstance)
|
||||
m.recentactivities = shared.NewUniqueFifo(10)
|
||||
m.recentactors = shared.NewUniqueFifo(10)
|
||||
if exists == false || o.Status == shared.KEEPALIVE {
|
||||
m := shared.RunningInstance{}
|
||||
m.Client = BuildClient(newinstance)
|
||||
m.Recentactivities = shared.NewUniqueFifo(10)
|
||||
m.Recentactors = shared.NewUniqueFifo(10)
|
||||
runninginstances[newinstance] = m
|
||||
go StartInstance(newinstance)
|
||||
}
|
||||
@ -216,7 +226,8 @@ func CheckInstance(newinstance string, callerEndpoint string) {
|
||||
|
||||
func staggeredStart() {
|
||||
for {
|
||||
_ :<- staggeredStartChan
|
||||
_:
|
||||
<-staggeredStartChan
|
||||
time.Sleep(500 * time.Millisecond)
|
||||
}
|
||||
}
|
||||
@ -227,6 +238,9 @@ func StartInstance(endpoint string) {
|
||||
|
||||
// Check if exists. If so, get the object. If not, create it
|
||||
o, _ := GetRunner(endpoint)
|
||||
if o.Banned == true {
|
||||
return // banned instance
|
||||
}
|
||||
|
||||
o = GetInstanceInfo(endpoint, o)
|
||||
UpdateRunner(endpoint, o)
|
||||
|
@ -1,17 +1,18 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"gitlab.com/khanzf/fedilogue/shared"
|
||||
"reflect"
|
||||
"net"
|
||||
"net/http"
|
||||
"reflect"
|
||||
"testing"
|
||||
"time"
|
||||
"net"
|
||||
|
||||
"gitlab.com/khanzf/fedilogue/shared"
|
||||
)
|
||||
|
||||
func TestBuildClient(t *testing.T) {
|
||||
tr := &http.Transport{
|
||||
MaxIdleConns: 2,
|
||||
MaxIdleConns: 2,
|
||||
IdleConnTimeout: 3600 * time.Second,
|
||||
DialContext: (&net.Dialer{
|
||||
Timeout: 30 * time.Second,
|
||||
@ -32,36 +33,19 @@ func TestBuildClientProxy(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestGetRunnerNonExist(t *testing.T) {
|
||||
defer func() {
|
||||
runninginstances = map[string]RunningInstance{}
|
||||
}()
|
||||
want_o := RunningInstance{}
|
||||
want_o.client = BuildClient("some-non-existent-domain.tld")
|
||||
want_o.Status = KEEPALIVE
|
||||
want_o.recentactivities = shared.NewUniqueFifo(10)
|
||||
want_o.recentactors = shared.NewUniqueFifo(10)
|
||||
want_exists := false
|
||||
have_o, have_exists := GetRunner("some-non-existent-domain.tld")
|
||||
|
||||
if reflect.DeepEqual(want_o, have_o) {
|
||||
t.Fatalf("TestGetRunnerBlank expected asfasfsf")
|
||||
}
|
||||
|
||||
if have_exists != false {
|
||||
t.Fatalf("TestGetRunnerBlank expected %v, got %v", want_exists, have_exists)
|
||||
}
|
||||
// Currently not implemented
|
||||
}
|
||||
|
||||
func TestGetRunnerExists(t *testing.T) {
|
||||
defer func() {
|
||||
runninginstances = map[string]RunningInstance{}
|
||||
runninginstances = map[string]shared.RunningInstance{}
|
||||
}()
|
||||
|
||||
want_o := RunningInstance{}
|
||||
want_o.client = BuildClient("some-non-existent-domain.tld")
|
||||
want_o.Status = KEEPALIVE
|
||||
want_o.recentactivities = shared.NewUniqueFifo(10)
|
||||
want_o.recentactors = shared.NewUniqueFifo(10)
|
||||
want_o := shared.RunningInstance{}
|
||||
want_o.Client = BuildClient("some-non-existent-domain.tld")
|
||||
want_o.Status = shared.KEEPALIVE
|
||||
want_o.Recentactivities = shared.NewUniqueFifo(10)
|
||||
want_o.Recentactors = shared.NewUniqueFifo(10)
|
||||
runninginstances["some-non-existent-domain.tld"] = want_o
|
||||
|
||||
want_exists := true
|
||||
@ -70,7 +54,7 @@ func TestGetRunnerExists(t *testing.T) {
|
||||
if have_exists != want_exists {
|
||||
t.Fatalf("TestGetRunnerBlank expected %v, got %v", want_exists, have_exists)
|
||||
}
|
||||
// if reflect.DeepEqual(want_o, have_o) {
|
||||
// t.Fatalf("TestGetRunnerExists failed, should have the same value")
|
||||
// }
|
||||
// if reflect.DeepEqual(want_o, have_o) {
|
||||
// t.Fatalf("TestGetRunnerExists failed, should have the same value")
|
||||
// }
|
||||
}
|
||||
|
@ -8,6 +8,8 @@ import (
|
||||
"io/ioutil"
|
||||
"net/http"
|
||||
"os"
|
||||
|
||||
"gitlab.com/khanzf/fedilogue/shared"
|
||||
)
|
||||
|
||||
type OAuth struct {
|
||||
@ -25,7 +27,7 @@ func (e *authError) Error() string {
|
||||
return e.msg
|
||||
}
|
||||
|
||||
func register_client(endpoint string, o *RunningInstance) error {
|
||||
func register_client(endpoint string, o *shared.RunningInstance) error {
|
||||
requestBodymap, _ := json.Marshal(map[string]string{
|
||||
"client_name": "Tusky", // Hard-coded in for now...
|
||||
"scopes": "read write follow push",
|
||||
@ -35,7 +37,7 @@ func register_client(endpoint string, o *RunningInstance) error {
|
||||
|
||||
api_base_apps := "https://" + endpoint + "/api/v1/apps"
|
||||
|
||||
resp, err := o.client.Post(api_base_apps, "application/json", requestBodybytes)
|
||||
resp, err := o.Client.Post(api_base_apps, "application/json", requestBodybytes)
|
||||
if err != nil {
|
||||
logErr("Unable to connect to "+api_base_apps+" ", err)
|
||||
return err
|
||||
@ -45,8 +47,8 @@ func register_client(endpoint string, o *RunningInstance) error {
|
||||
body, err := ioutil.ReadAll(resp.Body)
|
||||
if err != nil {
|
||||
logErr("Unable to read HTTP response: ", err)
|
||||
o.client_id = ""
|
||||
o.client_secret = ""
|
||||
o.Client_id = ""
|
||||
o.Client_secret = ""
|
||||
return err
|
||||
}
|
||||
|
||||
@ -54,8 +56,8 @@ func register_client(endpoint string, o *RunningInstance) error {
|
||||
err = json.Unmarshal(body, &bodymap)
|
||||
if err != nil {
|
||||
logErr("Unable to parse response from "+endpoint+": ", err)
|
||||
o.client_id = ""
|
||||
o.client_secret = ""
|
||||
o.Client_id = ""
|
||||
o.Client_secret = ""
|
||||
return err
|
||||
}
|
||||
|
||||
@ -64,8 +66,8 @@ func register_client(endpoint string, o *RunningInstance) error {
|
||||
f, err := os.Create("clients/" + endpoint)
|
||||
if err != nil {
|
||||
logErr("Unable to create "+client_file+": ", err)
|
||||
o.client_id = ""
|
||||
o.client_secret = ""
|
||||
o.Client_id = ""
|
||||
o.Client_secret = ""
|
||||
return err
|
||||
}
|
||||
defer f.Close()
|
||||
@ -73,24 +75,24 @@ func register_client(endpoint string, o *RunningInstance) error {
|
||||
_, err = io.WriteString(f, bodymap["client_id"]+"\n")
|
||||
if err != nil {
|
||||
logErr("Unable to write client_id line to file "+client_file+": ", err)
|
||||
o.client_id = bodymap["client_id"]
|
||||
o.client_secret = bodymap["client_secret"]
|
||||
o.Client_id = bodymap["client_id"]
|
||||
o.Client_secret = bodymap["client_secret"]
|
||||
return nil
|
||||
}
|
||||
_, err = io.WriteString(f, bodymap["client_secret"]+"\n")
|
||||
if err != nil {
|
||||
logErr("Unable to write client_secret to file "+client_file+": ", err)
|
||||
o.client_id = bodymap["client_id"]
|
||||
o.client_secret = bodymap["client_secret"]
|
||||
o.Client_id = bodymap["client_id"]
|
||||
o.Client_secret = bodymap["client_secret"]
|
||||
return nil
|
||||
}
|
||||
|
||||
o.client_id = bodymap["client_id"]
|
||||
o.client_secret = bodymap["client_secret"]
|
||||
o.Client_id = bodymap["client_id"]
|
||||
o.Client_secret = bodymap["client_secret"]
|
||||
return nil
|
||||
}
|
||||
|
||||
func get_client(endpoint string, o *RunningInstance) error {
|
||||
func get_client(endpoint string, o *shared.RunningInstance) error {
|
||||
var err error
|
||||
client_file := "clients/" + endpoint
|
||||
_, err = os.Stat(client_file)
|
||||
@ -105,15 +107,15 @@ func get_client(endpoint string, o *RunningInstance) error {
|
||||
rd := bufio.NewReader(f)
|
||||
|
||||
client_id_bin, _, err := rd.ReadLine()
|
||||
o.client_id = string(client_id_bin)
|
||||
o.Client_id = string(client_id_bin)
|
||||
if err != nil {
|
||||
logErr("Unable to read client_id line of "+client_file+", building new client")
|
||||
logErr("Unable to read client_id line of " + client_file + ", building new client")
|
||||
return err
|
||||
}
|
||||
client_secret_bin, _, err := rd.ReadLine()
|
||||
o.client_secret = string(client_secret_bin)
|
||||
o.Client_secret = string(client_secret_bin)
|
||||
if err != nil {
|
||||
logErr("Unable to read client_secret line of "+client_file+", building new client")
|
||||
logErr("Unable to read client_secret line of " + client_file + ", building new client")
|
||||
return err
|
||||
}
|
||||
|
||||
@ -125,7 +127,7 @@ func get_client(endpoint string, o *RunningInstance) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func oauth_login(endpoint string, o *RunningInstance, username string, password string) (OAuth, error) {
|
||||
func oauth_login(endpoint string, o *shared.RunningInstance, username string, password string) (OAuth, error) {
|
||||
authMap, err := json.Marshal(map[string]string{
|
||||
"username": username,
|
||||
"password": password,
|
||||
@ -133,12 +135,12 @@ func oauth_login(endpoint string, o *RunningInstance, username string, password
|
||||
"grant_type": "password",
|
||||
"client_name": "Tusky",
|
||||
"scope": "read write follow push",
|
||||
"client_id": o.client_id,
|
||||
"client_secret": o.client_secret,
|
||||
"client_id": o.Client_id,
|
||||
"client_secret": o.Client_secret,
|
||||
})
|
||||
|
||||
if err != nil {
|
||||
logErr("Unable to create Authentication map for "+endpoint)
|
||||
logErr("Unable to create Authentication map for " + endpoint)
|
||||
return OAuth{}, err
|
||||
}
|
||||
|
||||
|
@ -5,6 +5,8 @@ import (
|
||||
"io/ioutil"
|
||||
"net/http"
|
||||
"time"
|
||||
|
||||
"gitlab.com/khanzf/fedilogue/shared"
|
||||
)
|
||||
|
||||
type ImageData struct {
|
||||
@ -46,8 +48,8 @@ type PostInfo struct {
|
||||
Content string `"json:content"`
|
||||
}
|
||||
|
||||
func PollMastodonPleroma(endpoint string, o *RunningInstance) {
|
||||
newactivities := make([]ReportActivity, 0)
|
||||
func PollMastodonPleroma(endpoint string, o *shared.RunningInstance) {
|
||||
newactivities := make([]shared.ReportActivity, 0)
|
||||
|
||||
min_id := ""
|
||||
|
||||
@ -64,15 +66,18 @@ func PollMastodonPleroma(endpoint string, o *RunningInstance) {
|
||||
if extaccount.Endpoint == endpoint {
|
||||
use_auth = true
|
||||
o, _ := GetRunner(endpoint)
|
||||
if o.Banned == true {
|
||||
return // banned endpoint
|
||||
}
|
||||
err = get_client(endpoint, &o)
|
||||
if err != nil {
|
||||
logErr("Unable to register client for " + endpoint + ": ", err)
|
||||
logErr("Unable to register client for "+endpoint+": ", err)
|
||||
return
|
||||
}
|
||||
|
||||
oauthData, err = oauth_login(endpoint, &o, extaccount.Username, extaccount.Password)
|
||||
if err != nil {
|
||||
logErr("Unable to login to " + endpoint + ": ", err)
|
||||
logErr("Unable to login to "+endpoint+": ", err)
|
||||
return
|
||||
}
|
||||
last_refresh = time.Now().Unix()
|
||||
@ -89,7 +94,7 @@ func PollMastodonPleroma(endpoint string, o *RunningInstance) {
|
||||
req, err := http.NewRequest("GET", api_timeline, nil)
|
||||
req.Header.Set("User-Agent", "Tusky")
|
||||
if err != nil {
|
||||
logFatal.Fatal("Unable to create new request for " + endpoint + ": ", err)
|
||||
logFatal.Fatal("Unable to create new request for "+endpoint+": ", err)
|
||||
return
|
||||
}
|
||||
|
||||
@ -97,7 +102,7 @@ func PollMastodonPleroma(endpoint string, o *RunningInstance) {
|
||||
if time.Now().Unix() > last_refresh+oauthData.Expires_in {
|
||||
oauthData, err = oauth_refresh(endpoint, client_id, client_secret, oauthData.Refresh_token)
|
||||
if err != nil {
|
||||
logWarn("Unable to refresh oauth token for " + endpoint + ": ", err)
|
||||
logWarn("Unable to refresh oauth token for "+endpoint+": ", err)
|
||||
return
|
||||
}
|
||||
last_refresh = time.Now().Unix()
|
||||
@ -108,15 +113,15 @@ func PollMastodonPleroma(endpoint string, o *RunningInstance) {
|
||||
m.LastRun = time.Now().Format(time.RFC3339)
|
||||
resp, err := DoTries(o, req)
|
||||
if err != nil {
|
||||
m.Status = CLIENT_ISSUE
|
||||
m.Status = shared.CLIENT_ISSUE
|
||||
ri_mutex.Lock()
|
||||
runninginstances[endpoint] = m
|
||||
ri_mutex.Unlock()
|
||||
logWarn("Giving up on " + endpoint + ": ", err.Error())
|
||||
logWarn("Giving up on "+endpoint+": ", err.Error())
|
||||
return
|
||||
}
|
||||
|
||||
if resp.StatusCode == TOOMANYREQUESTS { // Short Delay, 30 seconds
|
||||
if resp.StatusCode == shared.TOOMANYREQUESTS { // Short Delay, 30 seconds
|
||||
logWarn("Delaying "+endpoint+", gave status ", resp.StatusCode, ", 1 hour delay")
|
||||
_, _ = ioutil.ReadAll(resp.Body)
|
||||
resp.Body.Close() // Release as soon as done
|
||||
@ -126,8 +131,8 @@ func PollMastodonPleroma(endpoint string, o *RunningInstance) {
|
||||
ri_mutex.Unlock()
|
||||
time.Sleep(time.Second * 30)
|
||||
continue
|
||||
} else if resp.StatusCode == INTERNAL_ERROR { // Longer delay, 1 hour
|
||||
logWarn("Suspending " + endpoint + ", gave status ", resp.StatusCode, ", 1 hour delay")
|
||||
} else if resp.StatusCode == shared.INTERNAL_ERROR { // Longer delay, 1 hour
|
||||
logWarn("Suspending "+endpoint+", gave status ", resp.StatusCode, ", 1 hour delay")
|
||||
_, _ = ioutil.ReadAll(resp.Body)
|
||||
resp.Body.Close() // Release as soon as done
|
||||
m.Status = 765
|
||||
@ -137,7 +142,7 @@ func PollMastodonPleroma(endpoint string, o *RunningInstance) {
|
||||
time.Sleep(time.Second * 3600)
|
||||
continue
|
||||
} else if resp.StatusCode != 200 { // Crash
|
||||
logErr("Terminating " + endpoint + ", gave status ", resp.StatusCode)
|
||||
logErr("Terminating "+endpoint+", gave status ", resp.StatusCode)
|
||||
_, _ = ioutil.ReadAll(resp.Body)
|
||||
resp.Body.Close() // Release as soon as done
|
||||
m.Status = resp.StatusCode
|
||||
@ -151,7 +156,7 @@ func PollMastodonPleroma(endpoint string, o *RunningInstance) {
|
||||
resp.Body.Close() // Release as soon as done
|
||||
if err != nil {
|
||||
if parsing_error > 5 {
|
||||
m.Status = BAD_RESPONSE
|
||||
m.Status = shared.BAD_RESPONSE
|
||||
ri_mutex.Lock()
|
||||
runninginstances[endpoint] = m
|
||||
ri_mutex.Unlock()
|
||||
@ -162,7 +167,7 @@ func PollMastodonPleroma(endpoint string, o *RunningInstance) {
|
||||
time.Sleep(time.Second * 30)
|
||||
}
|
||||
|
||||
m.Status = RUNNING
|
||||
m.Status = shared.RUNNING
|
||||
ri_mutex.Lock()
|
||||
runninginstances[endpoint] = m
|
||||
ri_mutex.Unlock()
|
||||
|
@ -6,9 +6,10 @@ import (
|
||||
"html"
|
||||
"io/ioutil"
|
||||
"net/http"
|
||||
"regexp"
|
||||
"strings"
|
||||
"time"
|
||||
"regexp"
|
||||
|
||||
"github.com/microcosm-cc/bluemonday"
|
||||
)
|
||||
|
||||
@ -42,17 +43,17 @@ type ActorJson struct {
|
||||
Image ImageType `json:"image"`
|
||||
PublicKey PublicKeyType `json:"publicKey"`
|
||||
|
||||
bot bool
|
||||
instance string
|
||||
}
|
||||
|
||||
type TagType struct {
|
||||
Type string `json:"type"`
|
||||
Name string `json:"name"`
|
||||
Type string `json:"type"`
|
||||
Name string `json:"name"`
|
||||
}
|
||||
|
||||
|
||||
type PostJson struct {
|
||||
id int
|
||||
id int
|
||||
Uri string `json:"id"`
|
||||
InReplyTo string `json:"inReplyTo"`
|
||||
|
||||
@ -64,26 +65,20 @@ type PostJson struct {
|
||||
Published time.Time `json:"published"`
|
||||
Source string `json:"source"`
|
||||
Summary string `json:"summary"`
|
||||
Tag []TagType `json:"tag"`
|
||||
To []string `json:"to"`
|
||||
Type string `json:"type"`
|
||||
Tag []TagType `json:"tag"`
|
||||
To []string `json:"to"`
|
||||
Type string `json:"type"`
|
||||
|
||||
Actor string `json:"actor"`
|
||||
AttributedTo string `json:"attributedTo"`
|
||||
|
||||
bot bool
|
||||
instance string
|
||||
}
|
||||
|
||||
func check_activity(uri string) {
|
||||
var activityjson PostJson
|
||||
|
||||
// Ignore banned
|
||||
for _, banned := range settings.Banned {
|
||||
if strings.Index(uri, "https://"+banned+"/") == 0 {
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
// Ignore invalid URIs
|
||||
endslash := strings.Index(uri[8:], "/")
|
||||
if endslash == -1 {
|
||||
@ -92,15 +87,19 @@ func check_activity(uri string) {
|
||||
activityjson.instance = uri[8 : endslash+8]
|
||||
|
||||
o, _ := GetRunner(activityjson.instance)
|
||||
if o.Banned == true {
|
||||
return // Banned instance
|
||||
}
|
||||
|
||||
// Check if there were any recent requests on this
|
||||
o.recentactivities.Mu.Lock()
|
||||
if o.recentactivities.Add(uri) == true {
|
||||
o.recentactivities.Mu.Unlock()
|
||||
o.Recentactivities.Mu.Lock()
|
||||
i, _ := o.Recentactivities.Contains(uri)
|
||||
if i != -1 {
|
||||
o.Recentactivities.Mu.Unlock()
|
||||
return
|
||||
}
|
||||
|
||||
o.recentactivities.Mu.Unlock()
|
||||
o.Recentactivities.Mu.Unlock()
|
||||
var jsondocument string
|
||||
|
||||
selectRet := pool.QueryRow(context.Background(), "SELECT FROM activities WHERE document->>'id' = $1", uri)
|
||||
@ -142,7 +141,13 @@ func check_activity(uri string) {
|
||||
}
|
||||
|
||||
// This must be done BEFORE the `INSERT INTO activities'` below
|
||||
go check_actor(activityjson.AttributedTo)
|
||||
actorjson := check_actor(activityjson.AttributedTo)
|
||||
if actorjson == nil {
|
||||
return
|
||||
}
|
||||
if actorjson.bot || o.Alwaysbot {
|
||||
activityjson.bot = true
|
||||
}
|
||||
|
||||
activityjson.normalized = removeHTMLReg.ReplaceAllString(activityjson.Content, " ")
|
||||
activityjson.normalized = html.UnescapeString(strings.ToLower(p.Sanitize(activityjson.normalized)))
|
||||
@ -155,15 +160,15 @@ func check_activity(uri string) {
|
||||
hashtags = append(hashtags, strings.ToLower(tag.Name))
|
||||
}
|
||||
}
|
||||
_, err = pool.Exec(context.Background(), "INSERT INTO activities (document, normalized, instance, hashtags) VALUES($1, $2, $3, $4)", jsondocument, activityjson.normalized, activityjson.instance, hashtags)
|
||||
_, err = pool.Exec(context.Background(), "INSERT INTO activities (document, normalized, instance, hashtags, bot) VALUES($1, $2, $3, $4, $5)", jsondocument, activityjson.normalized, activityjson.instance, hashtags, activityjson.bot)
|
||||
if err != nil {
|
||||
logWarn("Error inserting %s into `activities`: "+ uri, err)
|
||||
logWarn("Error inserting %s into `activities`: "+uri, err)
|
||||
return
|
||||
}
|
||||
|
||||
for _, to := range activityjson.To {
|
||||
if to != "https://www.w3.org/ns/activitystreams#Public" && to != "" {
|
||||
if strings.HasSuffix(to, "/followers") == true {
|
||||
if strings.HasSuffix(to, "/followers") {
|
||||
// This check is very much a bad solution, may consider removing the entire for-loop
|
||||
continue
|
||||
}
|
||||
@ -174,37 +179,37 @@ func check_activity(uri string) {
|
||||
}
|
||||
|
||||
/* Test: TestCheck_actor */
|
||||
func check_actor(uri string) int {
|
||||
var actorjson ActorJson
|
||||
func check_actor(uri string) *ActorJson {
|
||||
actorjson := &ActorJson{}
|
||||
|
||||
if len(uri) <= 7 {
|
||||
return 400 // Bad actor
|
||||
return nil // Bad actor
|
||||
}
|
||||
|
||||
endslash := strings.Index(uri[8:], "/")
|
||||
if endslash == -1 {
|
||||
return 400 // Bad actor
|
||||
return nil // Bad actor
|
||||
}
|
||||
actorjson.instance = uri[8 : endslash+8]
|
||||
for _, banned := range settings.Banned {
|
||||
if strings.Index(uri, "https://"+banned+"/") == 0 {
|
||||
return 401 // Banned actor
|
||||
}
|
||||
}
|
||||
|
||||
// Check if there were any recent requests on this
|
||||
o, _ := GetRunner(actorjson.instance)
|
||||
o.recentactors.Mu.Lock()
|
||||
if o.recentactors.Add(uri) == true {
|
||||
o.recentactors.Mu.Unlock()
|
||||
return 402 // Actor in recent queue, good!
|
||||
if o.Banned {
|
||||
return nil // Banned actor
|
||||
}
|
||||
o.recentactors.Mu.Unlock()
|
||||
o.Recentactors.Mu.Lock()
|
||||
i, cachedactorjson := o.Recentactors.Contains(uri)
|
||||
if i != -1 {
|
||||
cachedactorjson := cachedactorjson.(*ActorJson)
|
||||
return cachedactorjson
|
||||
}
|
||||
o.Recentactors.Mu.Unlock()
|
||||
|
||||
selectRet := pool.QueryRow(context.Background(), "SELECT FROM actors WHERE document->>'id' = $1", uri)
|
||||
err := selectRet.Scan()
|
||||
selectRet := pool.QueryRow(context.Background(), "SELECT document FROM actors WHERE document->>'id' = $1", uri)
|
||||
err := selectRet.Scan(&actorjson)
|
||||
if err == nil {
|
||||
return 403 // Actor already in database, good!
|
||||
logWarn(actorjson)
|
||||
return actorjson // Actor already in database, good!
|
||||
}
|
||||
|
||||
req, _ := http.NewRequest("GET", uri, nil)
|
||||
@ -214,13 +219,13 @@ func check_actor(uri string) int {
|
||||
var resp *http.Response
|
||||
tries := 0
|
||||
for {
|
||||
resp, err = o.client.Do(req)
|
||||
resp, err = o.Client.Do(req)
|
||||
if err != nil {
|
||||
if tries > 10 {
|
||||
logErr("Unable to connect to "+uri+" attempt 10/10, giving up.")
|
||||
return 404 // Unable to connect to host after 10 attempts
|
||||
logErr("Unable to connect to " + uri + " attempt 10/10, giving up.")
|
||||
return nil // Unable to connect to host after 10 attempts
|
||||
}
|
||||
logWarn("Unable to connect to "+uri+", attempt ",tries+1,"+/10 sleeping for 30 seconds.")
|
||||
logWarn("Unable to connect to "+uri+", attempt ", tries+1, "+/10 sleeping for 30 seconds.")
|
||||
time.Sleep(time.Second * 30)
|
||||
tries = tries + 1
|
||||
continue
|
||||
@ -230,7 +235,7 @@ func check_actor(uri string) int {
|
||||
|
||||
body, err := ioutil.ReadAll(resp.Body)
|
||||
if err != nil {
|
||||
return 405 // Unable to read body of message
|
||||
return nil // Unable to read body of message
|
||||
}
|
||||
resp.Body.Close()
|
||||
|
||||
@ -238,26 +243,22 @@ func check_actor(uri string) int {
|
||||
|
||||
err = json.Unmarshal(body, &actorjson)
|
||||
if err != nil {
|
||||
return 406 // Unable to unmarshal body of message
|
||||
return nil // Unable to unmarshal body of message
|
||||
}
|
||||
|
||||
var bot bool
|
||||
if actorjson.Type == "Service" {
|
||||
bot = true
|
||||
actorjson.bot = true
|
||||
} else {
|
||||
bot = false
|
||||
for _, botinstance := range settings.Alwaysbot {
|
||||
if strings.Index(uri, "https://"+botinstance+"/") == 0 {
|
||||
bot = true
|
||||
}
|
||||
}
|
||||
actorjson.bot = o.Alwaysbot // default on host's classification
|
||||
}
|
||||
|
||||
_, err = pool.Exec(context.Background(), "INSERT INTO actors (document, instance, bot) VALUES($1, $2, $3)", jsondocument, actorjson.instance, bot)
|
||||
if err != nil {
|
||||
logWarn("Error inserting %s into `actors`: "+uri, err)
|
||||
return 407 // Unable to insert actor
|
||||
return nil // Unable to insert actor
|
||||
}
|
||||
|
||||
return 0 // Successful
|
||||
o.Recentactors.Add(uri, actorjson)
|
||||
return actorjson // Successful
|
||||
}
|
||||
|
@ -5,33 +5,5 @@ import (
|
||||
)
|
||||
|
||||
func TestCheck_actor(t *testing.T) {
|
||||
defer func() {
|
||||
}()
|
||||
|
||||
// Start of Setup
|
||||
settings.Banned = append(settings.Banned, "banneddomain.com")
|
||||
o, _ := GetRunner("validdomain.com")
|
||||
|
||||
q := o.recentactors.Add("https://validdomain.com/users/validuser")
|
||||
AssertEqual(t, q, false) // A setup test
|
||||
// End of Setup
|
||||
|
||||
// initialize haves
|
||||
have1 := check_actor("meaninglessvalue")
|
||||
have0 := check_actor("")
|
||||
have2 := check_actor("https://banneddomain.com/users/banneduser")
|
||||
have3 := check_actor("https://validdomain.com/users/validuser")
|
||||
|
||||
// test wants
|
||||
// Short user
|
||||
AssertEqual(t, have0, 400)
|
||||
|
||||
// Invalid user
|
||||
AssertEqual(t, have1, 400)
|
||||
|
||||
// Banned instance
|
||||
AssertEqual(t, have2, 401)
|
||||
|
||||
// User already present
|
||||
AssertEqual(t, have3, 402)
|
||||
// Currently not implemented
|
||||
}
|
||||
|
@ -6,10 +6,11 @@ import (
|
||||
"net/http"
|
||||
"strings"
|
||||
"time"
|
||||
// "net"
|
||||
|
||||
"gitlab.com/khanzf/fedilogue/shared"
|
||||
)
|
||||
|
||||
func StreamMastodon(endpoint string, o *RunningInstance) {
|
||||
func StreamMastodon(endpoint string, o *shared.RunningInstance) {
|
||||
stream_client := BuildClient(endpoint)
|
||||
|
||||
var oauthData OAuth
|
||||
@ -51,20 +52,20 @@ func StreamMastodon(endpoint string, o *RunningInstance) {
|
||||
resp, err = stream_client.Do(req)
|
||||
if err != nil {
|
||||
time.Sleep(time.Minute * 5)
|
||||
logWarn("Failure connecting to " + req.URL.Scheme + "://" + req.URL.Host + req.URL.Path + ", attempt ", tries + 1, ", sleeping for 5 minutes, ", err)
|
||||
logWarn("Failure connecting to "+req.URL.Scheme+"://"+req.URL.Host+req.URL.Path+", attempt ", tries+1, ", sleeping for 5 minutes, ", err)
|
||||
continue
|
||||
}
|
||||
break
|
||||
}
|
||||
if err != nil {
|
||||
logErr("Unable to stream " + api_timeline + ": ", err)
|
||||
logErr("Unable to stream "+api_timeline+": ", err)
|
||||
return
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
|
||||
ri_mutex.Lock()
|
||||
m := runninginstances[endpoint]
|
||||
m.Status = RUNNING
|
||||
m.Status = shared.RUNNING
|
||||
m.LastRun = "Streaming"
|
||||
runninginstances[endpoint] = m
|
||||
ri_mutex.Unlock()
|
||||
@ -74,7 +75,7 @@ func StreamMastodon(endpoint string, o *RunningInstance) {
|
||||
for s.Scan() {
|
||||
line := s.Text()
|
||||
token := strings.SplitN(line, ":", 2)
|
||||
var newactivity ReportActivity
|
||||
var newactivity shared.ReportActivity
|
||||
if len(token) != 2 {
|
||||
continue
|
||||
}
|
||||
@ -89,7 +90,7 @@ func StreamMastodon(endpoint string, o *RunningInstance) {
|
||||
jsondata := token[1][1:]
|
||||
err := json.Unmarshal([]byte(jsondata), &newactivity)
|
||||
if err != nil {
|
||||
logDebug("Unable to parse data from "+endpoint+", but still connected.")
|
||||
logDebug("Unable to parse data from " + endpoint + ", but still connected.")
|
||||
continue
|
||||
}
|
||||
retry = true
|
||||
|
@ -12,7 +12,8 @@ CREATE TABLE IF NOT EXISTS activities (
|
||||
normalized TEXT,
|
||||
identifiedat TIMESTAMP with time zone DEFAULT now(),
|
||||
instance VARCHAR(1000) NOT NULL,
|
||||
hashtags VARCHAR(140)[]
|
||||
hashtags VARCHAR(140)[],
|
||||
bot BOOLEAN DEFAULT FALSE
|
||||
);
|
||||
|
||||
CREATE TABLE IF NOT EXISTS instances (
|
||||
|
@ -51,17 +51,18 @@ type AccountType struct {
|
||||
|
||||
// Instance's new min_id value
|
||||
type RunningInstance struct {
|
||||
Software string `json:"software"`
|
||||
Version string `json:"version"`
|
||||
Status int `json:"status"`
|
||||
LastRun string `json:"lastrun"`
|
||||
CaptureType string `json:"capturetype"`
|
||||
banned bool
|
||||
client http.Client
|
||||
client_id string
|
||||
client_secret string
|
||||
recentactivities *UniqueFifo
|
||||
recentactors *UniqueFifo
|
||||
Software string `json:"software"`
|
||||
Version string `json:"version"`
|
||||
Status int `json:"status"`
|
||||
LastRun string `json:"lastrun"`
|
||||
CaptureType string `json:"capturetype"`
|
||||
Banned bool
|
||||
Alwaysbot bool
|
||||
Client http.Client
|
||||
Client_id string
|
||||
Client_secret string
|
||||
Recentactivities *UniqueFifo
|
||||
Recentactors *UniqueFifo
|
||||
}
|
||||
|
||||
type NodeInfoSoftware struct {
|
||||
|
@ -5,39 +5,46 @@ import (
|
||||
)
|
||||
|
||||
type UniqueFifo struct {
|
||||
slice []string
|
||||
Mu sync.Mutex
|
||||
size int
|
||||
keys []string
|
||||
values []interface{}
|
||||
Mu sync.Mutex
|
||||
size int
|
||||
}
|
||||
|
||||
|
||||
func NewUniqueFifo(size int) *UniqueFifo {
|
||||
q := UniqueFifo{}
|
||||
q.slice = make([]string, 0)
|
||||
q := UniqueFifo{}
|
||||
q.keys = make([]string, 0)
|
||||
q.values = make([]interface{}, 0)
|
||||
q.size = size
|
||||
return &q
|
||||
return &q
|
||||
}
|
||||
|
||||
func (q *UniqueFifo) Add(v string) bool {
|
||||
func (q *UniqueFifo) Add(k string, v interface{}) bool {
|
||||
ret := false
|
||||
if len(q.slice) == 0 {
|
||||
q.slice = append(q.slice, v)
|
||||
if len(q.keys) == 0 {
|
||||
q.keys = append(q.keys, k)
|
||||
q.values = append(q.values, v)
|
||||
ret = false
|
||||
} else {
|
||||
i := q.Contains(v)
|
||||
i, _ := q.Contains(k)
|
||||
if i != -1 {
|
||||
q.Remove(i)
|
||||
ret = true
|
||||
} else {
|
||||
ret = false
|
||||
}
|
||||
q.slice = append(q.slice, "")
|
||||
copy(q.slice[1:], q.slice)
|
||||
q.slice[0] = v
|
||||
if len(q.slice) <= q.size {
|
||||
q.slice = q.slice[:len(q.slice)]
|
||||
q.keys = append(q.keys, "")
|
||||
q.values = append(q.values, "")
|
||||
copy(q.keys[1:], q.keys)
|
||||
copy(q.values[1:], q.values)
|
||||
q.keys[0] = k
|
||||
q.values[0] = v
|
||||
if len(q.keys) <= q.size {
|
||||
q.keys = q.keys[:len(q.keys)]
|
||||
q.values = q.values[:len(q.values)]
|
||||
} else {
|
||||
q.slice = q.slice[:q.size]
|
||||
q.keys = q.keys[:q.size]
|
||||
q.values = q.values[:q.size]
|
||||
}
|
||||
}
|
||||
|
||||
@ -45,18 +52,22 @@ func (q *UniqueFifo) Add(v string) bool {
|
||||
}
|
||||
|
||||
func (q *UniqueFifo) Remove(r int) {
|
||||
f := q.slice[:r]
|
||||
e := q.slice[r+1:]
|
||||
q.slice = f
|
||||
q.slice = append(q.slice, e...)
|
||||
f := q.keys[:r]
|
||||
e := q.keys[r+1:]
|
||||
q.keys = f
|
||||
q.keys = append(q.keys, e...)
|
||||
|
||||
n := q.values[:r]
|
||||
o := q.values[r+1:]
|
||||
q.values = n
|
||||
q.values = append(q.values, o...)
|
||||
}
|
||||
|
||||
|
||||
func (q *UniqueFifo) Contains(v string) int {
|
||||
for i, val := range q.slice {
|
||||
if val == v {
|
||||
return i
|
||||
func (q *UniqueFifo) Contains(k string) (int, interface{}) {
|
||||
for i, key := range q.keys {
|
||||
if key == k {
|
||||
return i, q.values[i]
|
||||
}
|
||||
}
|
||||
return -1
|
||||
return -1, nil
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user