package main import ( "context" "encoding/json" "fmt" "log" "net/http" "strconv" "time" "github.com/go-echarts/go-echarts/v2/charts" "github.com/go-echarts/go-echarts/v2/opts" "github.com/go-echarts/go-echarts/v2/types" "github.com/jackc/pgx/v4" ) type InstanceStatsJson struct { Timestamp time.Time `json:"timestamp"` Instance string `json:"timestamp"` ActivitiesCount int `json:"activitiescount"` ActorsCount int `json:"actorscount"` } type ActorStatsJson struct { Timestamp time.Time `json:"timestamp"` Actor string `json:"actor"` ActivitiesCount int `json:"activitiescount"` } type GlobalStatsJson struct { Timestamp time.Time `json:"timestamp"` ActivitiesCount int `json:"activitiescount"` ActorsCount int `json:"actorscount"` } var metricsText string func enableCors(w *http.ResponseWriter) { (*w).Header().Set("Access-Control-Allow-Origin", "*") } func runMetrics() { hashtagtotal := runTrendingHashtags() wordstotal := make(map[string]interface{}) totalJson := make(map[string]interface{}) totalJson["hashtags"] = hashtagtotal totalJson["words"] = wordstotal totalJson["datetime"] = time.Now().UTC() data, err := json.Marshal(totalJson) if err != nil { log.Fatalf("error marshaling combined activity: %v\n", err) } metricsText = string(data) } func runTrendingHashtags() map[string]interface{} { sql := `WITH taglist AS (SELECT DISTINCT unnest(hashtags) AS tag, activities.document->>'attributedTo' AS attributed FROM activities JOIN actors ON activities.document->>'attributedTo'=actors.document->>'id' WHERE actors.bot=False AND activities.identifiedAt > NOW() - INTERVAL '30 MINUTES') SELECT tag, COUNT(*) FROM taglist GROUP BY tag ORDER BY count DESC LIMIT 100;` rows, err := pool.Query(context.Background(), sql) if err != nil { panic(err) } hashtagitems := make([]interface{}, 0) hashcount := 0 for rows.Next() { var hashtag string var count int err = rows.Scan(&hashtag, &count) if err != nil { panic(err) } hashtagitem := make(map[string]interface{}) hashtagitem["hashtag"] = hashtag hashtagitem["count"] = count hashtagitems = append(hashtagitems, hashtagitem) hashcount = hashcount + 1 } rows.Close() hashtagtotal := make(map[string]interface{}) hashtagtotal["count"] = hashcount hashtagtotal["items"] = hashtagitems return hashtagtotal } func runTrendingWords() map[string]interface{} { sql := `WITH popular_words AS ( select word FROM ts_stat( ' SELECT to_tsvector(''simple'', normalized) FROM activities LEFT JOIN actors ON activities.document->>''attributedTo''=actors.document->>''id'' WHERE activities.identifiedat > current_timestamp - interval ''60 minutes'' AND actors.bot=false ' ) WHERE length(word) > 3 AND NOT word in (SELECT word FROM stopwords) ORDER BY ndoc DESC LIMIT 100) SELECT concat_ws(' ', a1.word, a2.word) phrase, count(*) FROM popular_words AS a1 CROSS JOIN popular_words AS a2 CROSS JOIN activities WHERE normalized ilike format('%%%s %s%%', a1.word, a2.word) AND identifiedat > current_timestamp - interval '60 minutes' GROUP BY 1 HAVING count(*) > 1 ORDER BY 2 DESC LIMIT 20;` rows, err := pool.Query(context.Background(), sql) if err != nil { panic(err) } trendingitems := make([]interface{}, 0) trendingcount := 0 for rows.Next() { var trendingword string var count int err = rows.Scan(&trendingword, &count) if err != nil { panic(err) } trendingitem := make(map[string]interface{}) trendingitem["trending"] = trendingword trendingitem["count"] = count trendingitems = append(trendingitems, trendingitem) trendingcount = trendingcount + 1 } rows.Close() trendingwordtotal := make(map[string]interface{}) trendingwordtotal["count"] = trendingcount trendingwordtotal["items"] = trendingitems return trendingwordtotal } // GET handlers func getTrending(w http.ResponseWriter, r *http.Request) { enableCors(&w) fmt.Fprintf(w, "%s", metricsText) } func getSearch(w http.ResponseWriter, r *http.Request) { enableCors(&w) searchkeys, exists_search := r.URL.Query()["s"] offsetkeys, exists_offset := r.URL.Query()["o"] var err error var rows pgx.Rows var searchKey string if exists_search { searchKey = searchkeys[0] } var offsetKey int if exists_offset { offsetKey, _ = strconv.Atoi(offsetkeys[0]) } else { offsetKey = -1 } if exists_search && searchKey != "" { if offsetKey == -1 { rows, err = pool.Query(context.Background(), "SELECT activities.id, activities.document, actors.document FROM activities as activities INNER JOIN actors as actors ON activities.document->>'actor' = actors.document->>'id' WHERE activities.normalized_tsvector @@ plainto_tsquery($1) ORDER BY activities.id DESC LIMIT 10", searchKey) } else { rows, err = pool.Query(context.Background(), "SELECT activities.id, activities.document, actors.document FROM activities as activities INNER JOIN actors as actors ON activities.document->>'actor' = actors.document->>'id' WHERE activities.normalized_tsvector @@ plainto_tsquery($1) AND activities.id < $2 ORDER BY activities.id DESC LIMIT 10", searchKey, offsetKey) } } else { if offsetKey == -1 { rows, err = pool.Query(context.Background(), "SELECT activities.id, activities.document, actors.document FROM activities as activities INNER JOIN actors as actors ON activities.document->>'actor' = actors.document->>'id' ORDER BY activities.id DESC LIMIT 10") } else { rows, err = pool.Query(context.Background(), "SELECT activities.id, activities.document, actors.document FROM activities as activities INNER JOIN actors as actors ON activities.document->>'actor' = actors.document->>'id' AND activities.id < $1 ORDER BY activities.id DESC LIMIT 10", offsetKey) } } if err != nil { panic(err) } defer rows.Close() var earliestid int earliestid = 0 var activitiesJson []map[string]json.RawMessage for rows.Next() { var id int var activityRaw string var actorRaw string var activityJson map[string]json.RawMessage err = rows.Scan(&id, &activityRaw, &actorRaw) if err != nil { panic(err) } err := json.Unmarshal([]byte(activityRaw), &activityJson) if err != nil { fmt.Println(err) } if earliestid == 0 { earliestid = id } else if earliestid > id { earliestid = id } activityJson["actor"] = json.RawMessage(actorRaw) activitiesJson = append(activitiesJson, activityJson) } requestData := make(map[string]int) requestData["earliestid"] = earliestid totalJson := make(map[string]interface{}) totalJson["requestdata"] = requestData totalJson["activities"] = activitiesJson data, err := json.Marshal(totalJson) if err != nil { log.Fatalf("error marshaling combined activity: %v\n", err) } fmt.Fprintf(w, "%s", data) } func getInstanceStats(w http.ResponseWriter, r *http.Request) { enableCors(&w) instanceKeys, exists := r.URL.Query()["i"] var instance string if exists { instance = instanceKeys[0] } instancestatsjson := &InstanceStatsJson{} instancestatsjson.Timestamp = time.Now() instancestatsjson.Instance = instance if exists && instance != "" { var activitiescount int selectActivities := pool.QueryRow(context.Background(), "SELECT count(*) FROM activities WHERE instance = $1", instance) err := selectActivities.Scan(&activitiescount) if err != nil { fmt.Println("Error ", err) return } instancestatsjson.ActivitiesCount = activitiescount var actorscount int selectActors := pool.QueryRow(context.Background(), "SELECT count(*) FROM actors WHERE instance = $1", instance) err = selectActors.Scan(&actorscount) if err != nil { fmt.Println("Error ", err) return } instancestatsjson.ActorsCount = actorscount } bytearray, _ := json.Marshal(instancestatsjson) stringarray := string(bytearray) fmt.Fprintf(w, "%s", stringarray) } func getActorStats(w http.ResponseWriter, r *http.Request) { enableCors(&w) actorKeys, exists := r.URL.Query()["a"] var actor string if exists { actor = actorKeys[0] } actorstatsjson := &ActorStatsJson{} actorstatsjson.Timestamp = time.Now() actorstatsjson.Actor = actor if exists && actor != "" { var actorscount int selectActivities := pool.QueryRow(context.Background(), "SELECT count(*) FROM activities WHERE document->>'attributedTo' = $1", actor) err := selectActivities.Scan(&actorscount) if err != nil { fmt.Println("Error ", err) return } actorstatsjson.ActivitiesCount = actorscount } bytearray, _ := json.Marshal(actorstatsjson) stringarray := string(bytearray) fmt.Fprintf(w, "%s", stringarray) } func getGlobalStats(w http.ResponseWriter, r *http.Request) { enableCors(&w) globalstatsjson := &GlobalStatsJson{} globalstatsjson.Timestamp = time.Now() var activitiescount int selectActivities := pool.QueryRow(context.Background(), "SELECT count(*) FROM activities") err := selectActivities.Scan(&activitiescount) if err != nil { fmt.Println("Error ", err) return } globalstatsjson.ActivitiesCount = activitiescount var actorscount int selectActors := pool.QueryRow(context.Background(), "SELECT count(*) FROM actors") err = selectActors.Scan(&actorscount) if err != nil { fmt.Println("Error ", err) return } globalstatsjson.ActorsCount = actorscount bytearray, _ := json.Marshal(globalstatsjson) stringarray := string(bytearray) fmt.Fprintf(w, "%s", stringarray) } func generateLineItems(counts []int) []opts.LineData { items := make([]opts.LineData, 0) for i := 0; i < len(counts); i++ { items = append(items, opts.LineData{Value: counts[i]}) } return items } func getGlobalGraph(w http.ResponseWriter, r *http.Request) { rows, err := pool.Query(context.Background(), `select date_trunc('hour', identifiedat) as "HOURTIME", COUNT(*) from activities WHERE date_trunc('hour', identifiedat) > NOW() - interval '24 hour' AND date_trunc('hour', identifiedat) < date_trunc('hour', NOW()) group by "HOURTIME"`) if err != nil { panic(err) } defer rows.Close() var timestamp time.Time var count int dates := []string{} counts := []int{} for rows.Next() { err = rows.Scan(×tamp, &count) if err != nil { panic(err) } dates = append(dates, timestamp.Format("2006-01-02 15:04:05")) counts = append(counts, count) } line := charts.NewLine() line.SetGlobalOptions( charts.WithInitializationOpts(opts.Initialization{Theme: types.ThemeWesteros}), charts.WithTitleOpts(opts.Title{ Title: "Posts Across the Fediverse", })) line.SetXAxis(dates). AddSeries("Post Count", generateLineItems(counts)). SetSeriesOptions(charts.WithLineChartOpts(opts.LineChart{Smooth: false})) line.Render(w) } func main() { pool = getDbPool() metricsText = "[]" go func() { for { runMetrics() time.Sleep(10 * time.Minute) } }() http.HandleFunc("/api/v1/search", getSearch) http.HandleFunc("/api/v1/trending", getTrending) http.HandleFunc("/api/v1/instance/stats", getInstanceStats) http.HandleFunc("/api/v1/actor/stats", getActorStats) http.HandleFunc("/api/v1/global/stats", getGlobalStats) http.HandleFunc("/api/v1/global/graph", getGlobalGraph) log.Print("Starting HTTP inbox on port http://0.0.0.0:6431") log.Fatal(http.ListenAndServe("0.0.0.0:6431", nil)) }