412 lines
11 KiB
Go
412 lines
11 KiB
Go
package main
|
|
|
|
import (
|
|
"context"
|
|
"encoding/json"
|
|
"fmt"
|
|
"log"
|
|
"net/http"
|
|
"strconv"
|
|
"time"
|
|
|
|
"github.com/go-echarts/go-echarts/v2/charts"
|
|
"github.com/go-echarts/go-echarts/v2/opts"
|
|
"github.com/go-echarts/go-echarts/v2/types"
|
|
"github.com/jackc/pgx/v4"
|
|
)
|
|
|
|
type InstanceStatsJson struct {
|
|
Timestamp time.Time `json:"timestamp"`
|
|
Instance string `json:"timestamp"`
|
|
ActivitiesCount int `json:"activitiescount"`
|
|
ActorsCount int `json:"actorscount"`
|
|
}
|
|
|
|
type ActorStatsJson struct {
|
|
Timestamp time.Time `json:"timestamp"`
|
|
Actor string `json:"actor"`
|
|
ActivitiesCount int `json:"activitiescount"`
|
|
}
|
|
|
|
type GlobalStatsJson struct {
|
|
Timestamp time.Time `json:"timestamp"`
|
|
ActivitiesCount int `json:"activitiescount"`
|
|
ActorsCount int `json:"actorscount"`
|
|
}
|
|
|
|
var metricsText string
|
|
|
|
func enableCors(w *http.ResponseWriter) {
|
|
(*w).Header().Set("Access-Control-Allow-Origin", "*")
|
|
}
|
|
|
|
func runMetrics() {
|
|
hashtagtotal := runTrendingHashtags()
|
|
wordstotal := make(map[string]interface{})
|
|
|
|
totalJson := make(map[string]interface{})
|
|
totalJson["hashtags"] = hashtagtotal
|
|
totalJson["words"] = wordstotal
|
|
totalJson["datetime"] = time.Now().UTC()
|
|
|
|
data, err := json.Marshal(totalJson)
|
|
if err != nil {
|
|
log.Fatalf("error marshaling combined activity 1: %v\n", err)
|
|
}
|
|
|
|
metricsText = string(data)
|
|
}
|
|
|
|
func runTrendingHashtags() map[string]interface{} {
|
|
|
|
sql := `WITH taglist AS (SELECT DISTINCT unnest(hashtags) AS tag, activities.document->>'attributedTo' AS attributed
|
|
FROM activities JOIN actors ON activities.document->>'attributedTo'=actors.document->>'id'
|
|
WHERE actors.bot=False AND activities.identifiedAt > NOW() - INTERVAL '30 MINUTES')
|
|
SELECT tag, COUNT(*) FROM taglist GROUP BY tag ORDER BY count DESC LIMIT 100;`
|
|
|
|
rows, err := pool.Query(context.Background(), sql)
|
|
if err != nil {
|
|
panic(err)
|
|
}
|
|
|
|
hashtagitems := make([]interface{}, 0)
|
|
hashcount := 0
|
|
for rows.Next() {
|
|
var hashtag string
|
|
var count int
|
|
|
|
err = rows.Scan(&hashtag, &count)
|
|
if err != nil {
|
|
panic(err)
|
|
}
|
|
|
|
hashtagitem := make(map[string]interface{})
|
|
hashtagitem["hashtag"] = hashtag
|
|
hashtagitem["count"] = count
|
|
hashtagitems = append(hashtagitems, hashtagitem)
|
|
hashcount = hashcount + 1
|
|
}
|
|
rows.Close()
|
|
|
|
hashtagtotal := make(map[string]interface{})
|
|
hashtagtotal["count"] = hashcount
|
|
hashtagtotal["items"] = hashtagitems
|
|
|
|
return hashtagtotal
|
|
}
|
|
|
|
func runTrendingWords() map[string]interface{} {
|
|
|
|
sql := `WITH popular_words AS (
|
|
select word FROM ts_stat(
|
|
'
|
|
SELECT to_tsvector(''simple'', normalized) FROM activities
|
|
LEFT JOIN actors ON activities.document->>''attributedTo''=actors.document->>''id''
|
|
WHERE activities.identifiedat > current_timestamp - interval ''60 minutes''
|
|
AND actors.bot=false
|
|
'
|
|
)
|
|
WHERE length(word) > 3
|
|
AND NOT word in (SELECT word FROM stopwords)
|
|
ORDER BY ndoc DESC LIMIT 100)
|
|
|
|
SELECT concat_ws(' ', a1.word, a2.word) phrase, count(*)
|
|
FROM popular_words AS a1
|
|
CROSS JOIN popular_words AS a2
|
|
CROSS JOIN activities
|
|
WHERE normalized ilike format('%%%s %s%%', a1.word, a2.word)
|
|
AND identifiedat > current_timestamp - interval '60 minutes'
|
|
GROUP BY 1
|
|
HAVING count(*) > 1
|
|
ORDER BY 2 DESC LIMIT 20;`
|
|
rows, err := pool.Query(context.Background(), sql)
|
|
if err != nil {
|
|
panic(err)
|
|
}
|
|
|
|
trendingitems := make([]interface{}, 0)
|
|
trendingcount := 0
|
|
for rows.Next() {
|
|
var trendingword string
|
|
var count int
|
|
|
|
err = rows.Scan(&trendingword, &count)
|
|
if err != nil {
|
|
panic(err)
|
|
}
|
|
|
|
trendingitem := make(map[string]interface{})
|
|
trendingitem["trending"] = trendingword
|
|
trendingitem["count"] = count
|
|
trendingitems = append(trendingitems, trendingitem)
|
|
trendingcount = trendingcount + 1
|
|
}
|
|
rows.Close()
|
|
|
|
trendingwordtotal := make(map[string]interface{})
|
|
trendingwordtotal["count"] = trendingcount
|
|
trendingwordtotal["items"] = trendingitems
|
|
|
|
return trendingwordtotal
|
|
}
|
|
|
|
// GET handlers
|
|
func getTrending(w http.ResponseWriter, r *http.Request) {
|
|
enableCors(&w)
|
|
fmt.Fprintf(w, "%s", metricsText)
|
|
}
|
|
|
|
func getSearch(w http.ResponseWriter, r *http.Request) {
|
|
enableCors(&w)
|
|
searchkeys, exists_search := r.URL.Query()["s"]
|
|
offsetkeys, exists_offset := r.URL.Query()["o"]
|
|
|
|
var err error
|
|
var rows pgx.Rows
|
|
var searchKey string
|
|
if exists_search {
|
|
searchKey = searchkeys[0]
|
|
}
|
|
var offsetKey int
|
|
if exists_offset {
|
|
offsetKey, _ = strconv.Atoi(offsetkeys[0])
|
|
} else {
|
|
offsetKey = -1
|
|
}
|
|
|
|
if exists_search && searchKey != "" {
|
|
if offsetKey == -1 {
|
|
rows, err = pool.Query(context.Background(), "SELECT activities.id, activities.document, actors.document, activities.instance FROM activities as activities INNER JOIN actors as actors ON activities.document->>'actor' = actors.document->>'id' WHERE activities.normalized_tsvector @@ plainto_tsquery($1) ORDER BY activities.id DESC LIMIT 10", searchKey)
|
|
} else {
|
|
rows, err = pool.Query(context.Background(), "SELECT activities.id, activities.document, actors.document, activities.instance, FROM activities as activities INNER JOIN actors as actors ON activities.document->>'actor' = actors.document->>'id' WHERE activities.normalized_tsvector @@ plainto_tsquery($1) AND activities.id < $2 ORDER BY activities.id DESC LIMIT 10", searchKey, offsetKey)
|
|
}
|
|
} else {
|
|
if offsetKey == -1 {
|
|
rows, err = pool.Query(context.Background(), "SELECT activities.id, activities.document, actors.document, activities.instance FROM activities as activities INNER JOIN actors as actors ON activities.document->>'actor' = actors.document->>'id' ORDER BY activities.id DESC LIMIT 10")
|
|
} else {
|
|
rows, err = pool.Query(context.Background(), "SELECT activities.id, activities.document, actors.document, activities.instance FROM activities as activities INNER JOIN actors as actors ON activities.document->>'actor' = actors.document->>'id' AND activities.id < $1 ORDER BY activities.id DESC LIMIT 10", offsetKey)
|
|
}
|
|
}
|
|
|
|
if err != nil {
|
|
panic(err)
|
|
}
|
|
defer rows.Close()
|
|
|
|
var earliestid int
|
|
earliestid = 0
|
|
var activitiesJson []map[string]json.RawMessage
|
|
for rows.Next() {
|
|
var id int
|
|
var activityRaw string
|
|
var actorRaw string
|
|
var instance string
|
|
var activityJson map[string]json.RawMessage
|
|
|
|
err = rows.Scan(&id, &activityRaw, &actorRaw, &instance)
|
|
if err != nil {
|
|
panic(err)
|
|
}
|
|
|
|
err := json.Unmarshal([]byte(activityRaw), &activityJson)
|
|
if err != nil {
|
|
fmt.Println(err)
|
|
}
|
|
if earliestid == 0 {
|
|
earliestid = id
|
|
} else if earliestid > id {
|
|
earliestid = id
|
|
}
|
|
|
|
// Add the instance string
|
|
rawInstance, err := json.Marshal(instance)
|
|
if err != nil {
|
|
fmt.Println("Error marshaling instance string:", err)
|
|
return
|
|
}
|
|
activityJson["instance"] = json.RawMessage(rawInstance)
|
|
|
|
activityJson["actor"] = json.RawMessage(actorRaw)
|
|
activitiesJson = append(activitiesJson, activityJson)
|
|
}
|
|
|
|
requestData := make(map[string]int)
|
|
requestData["earliestid"] = earliestid
|
|
|
|
totalJson := make(map[string]interface{})
|
|
totalJson["requestdata"] = requestData
|
|
totalJson["activities"] = activitiesJson
|
|
|
|
data, err := json.Marshal(totalJson)
|
|
if err != nil {
|
|
log.Fatalf("error marshaling combined activity 2: %v\n", err)
|
|
}
|
|
fmt.Fprintf(w, "%s", data)
|
|
}
|
|
|
|
func getInstanceStats(w http.ResponseWriter, r *http.Request) {
|
|
enableCors(&w)
|
|
instanceKeys, exists := r.URL.Query()["i"]
|
|
|
|
var instance string
|
|
if exists {
|
|
instance = instanceKeys[0]
|
|
}
|
|
|
|
instancestatsjson := &InstanceStatsJson{}
|
|
instancestatsjson.Timestamp = time.Now()
|
|
instancestatsjson.Instance = instance
|
|
|
|
if exists && instance != "" {
|
|
var activitiescount int
|
|
|
|
selectActivities := pool.QueryRow(context.Background(), "SELECT count(*) FROM activities WHERE instance = $1", instance)
|
|
err := selectActivities.Scan(&activitiescount)
|
|
if err != nil {
|
|
fmt.Println("Error ", err)
|
|
return
|
|
}
|
|
instancestatsjson.ActivitiesCount = activitiescount
|
|
|
|
var actorscount int
|
|
selectActors := pool.QueryRow(context.Background(), "SELECT count(*) FROM actors WHERE instance = $1", instance)
|
|
err = selectActors.Scan(&actorscount)
|
|
if err != nil {
|
|
fmt.Println("Error ", err)
|
|
return
|
|
}
|
|
instancestatsjson.ActorsCount = actorscount
|
|
}
|
|
|
|
bytearray, _ := json.Marshal(instancestatsjson)
|
|
stringarray := string(bytearray)
|
|
fmt.Fprintf(w, "%s", stringarray)
|
|
}
|
|
|
|
func getActorStats(w http.ResponseWriter, r *http.Request) {
|
|
enableCors(&w)
|
|
actorKeys, exists := r.URL.Query()["a"]
|
|
|
|
var actor string
|
|
if exists {
|
|
actor = actorKeys[0]
|
|
}
|
|
|
|
actorstatsjson := &ActorStatsJson{}
|
|
actorstatsjson.Timestamp = time.Now()
|
|
actorstatsjson.Actor = actor
|
|
|
|
if exists && actor != "" {
|
|
var actorscount int
|
|
|
|
selectActivities := pool.QueryRow(context.Background(), "SELECT count(*) FROM activities WHERE document->>'attributedTo' = $1", actor)
|
|
err := selectActivities.Scan(&actorscount)
|
|
if err != nil {
|
|
fmt.Println("Error ", err)
|
|
return
|
|
}
|
|
actorstatsjson.ActivitiesCount = actorscount
|
|
}
|
|
|
|
bytearray, _ := json.Marshal(actorstatsjson)
|
|
stringarray := string(bytearray)
|
|
fmt.Fprintf(w, "%s", stringarray)
|
|
}
|
|
|
|
func getGlobalStats(w http.ResponseWriter, r *http.Request) {
|
|
enableCors(&w)
|
|
|
|
globalstatsjson := &GlobalStatsJson{}
|
|
globalstatsjson.Timestamp = time.Now()
|
|
|
|
var activitiescount int
|
|
selectActivities := pool.QueryRow(context.Background(), "SELECT count(*) FROM activities")
|
|
err := selectActivities.Scan(&activitiescount)
|
|
if err != nil {
|
|
fmt.Println("Error ", err)
|
|
return
|
|
}
|
|
globalstatsjson.ActivitiesCount = activitiescount
|
|
|
|
var actorscount int
|
|
selectActors := pool.QueryRow(context.Background(), "SELECT count(*) FROM actors")
|
|
err = selectActors.Scan(&actorscount)
|
|
if err != nil {
|
|
fmt.Println("Error ", err)
|
|
return
|
|
}
|
|
globalstatsjson.ActorsCount = actorscount
|
|
|
|
bytearray, _ := json.Marshal(globalstatsjson)
|
|
stringarray := string(bytearray)
|
|
fmt.Fprintf(w, "%s", stringarray)
|
|
}
|
|
|
|
func generateLineItems(counts []int) []opts.LineData {
|
|
items := make([]opts.LineData, 0)
|
|
for i := 0; i < len(counts); i++ {
|
|
items = append(items, opts.LineData{Value: counts[i]})
|
|
}
|
|
return items
|
|
}
|
|
|
|
func getGlobalGraph(w http.ResponseWriter, r *http.Request) {
|
|
|
|
rows, err := pool.Query(context.Background(), `select date_trunc('hour', identifiedat) as "HOURTIME", COUNT(*) from activities WHERE date_trunc('hour', identifiedat) > NOW() - interval '24 hour' AND date_trunc('hour', identifiedat) < date_trunc('hour', NOW()) group by "HOURTIME"`)
|
|
|
|
if err != nil {
|
|
panic(err)
|
|
}
|
|
|
|
defer rows.Close()
|
|
|
|
var timestamp time.Time
|
|
var count int
|
|
dates := []string{}
|
|
counts := []int{}
|
|
|
|
for rows.Next() {
|
|
err = rows.Scan(×tamp, &count)
|
|
if err != nil {
|
|
panic(err)
|
|
}
|
|
dates = append(dates, timestamp.Format("2006-01-02 15:04:05"))
|
|
counts = append(counts, count)
|
|
}
|
|
|
|
line := charts.NewLine()
|
|
line.SetGlobalOptions(
|
|
charts.WithInitializationOpts(opts.Initialization{Theme: types.ThemeWesteros}),
|
|
charts.WithTitleOpts(opts.Title{
|
|
Title: "Posts Across the Fediverse",
|
|
}))
|
|
|
|
line.SetXAxis(dates).
|
|
AddSeries("Post Count", generateLineItems(counts)).
|
|
SetSeriesOptions(charts.WithLineChartOpts(opts.LineChart{Smooth: false}))
|
|
|
|
line.Render(w)
|
|
}
|
|
|
|
func main() {
|
|
pool = getDbPool()
|
|
|
|
metricsText = "[]"
|
|
|
|
go func() {
|
|
for {
|
|
runMetrics()
|
|
time.Sleep(10 * time.Minute)
|
|
}
|
|
}()
|
|
|
|
http.HandleFunc("/api/v1/search", getSearch)
|
|
http.HandleFunc("/api/v1/trending", getTrending)
|
|
http.HandleFunc("/api/v1/instance/stats", getInstanceStats)
|
|
http.HandleFunc("/api/v1/actor/stats", getActorStats)
|
|
http.HandleFunc("/api/v1/global/stats", getGlobalStats)
|
|
http.HandleFunc("/api/v1/global/graph", getGlobalGraph)
|
|
log.Print("Starting HTTP inbox on port http://0.0.0.0:6431")
|
|
log.Fatal(http.ListenAndServe("0.0.0.0:6431", nil))
|
|
}
|