Skip to content
Snippets Groups Projects
Unverified Commit b34068a9 authored by Dean's avatar Dean Committed by GitHub
Browse files

Merge pull request #5 from whats-this/feature/elastic-metrics

Elasticsearch metrics
parents 714acaed ef25c3bf
No related branches found
No related tags found
No related merge requests found
......@@ -2,6 +2,10 @@
indent_style = tab
indent_size = 8
[*.json]
indent_style = space
indent_size = 2
[*.sql]
indent_style = space
indent_size = 2
......
......@@ -19,5 +19,8 @@ cdn-origin
# Package configuration file (not including sample configuration)
cdn-origin.toml
# MaxMind GeoLite2 Country database files
GeoLite2-Country.mmdb
# JetBrains .idea
.idea/
# Whats-This CDN Origin
Simple but quick Golang webserver that serves requests to get files and
Simple but quick Golang webserver that serves requests to get files and
redirects from a [PostgreSQL](https://www.postgresql.org) and
[SeaweedFS](https://github.com/chrislusf/seaweedfs) backend.
......@@ -40,28 +40,22 @@ Information about configuration variables and their purpose can be found in
### Metrics
If `cdnUtil.serveMetrics` is enabled,
`/.cdn-util/prometheus/metrics?authKey={{cdnUtil.authKey}}` will return
Prometheus-compatible information about requests (and Go runtime information).
Here is a sample scrape configuration for accessing data from `cdn-origin`:
```yml
scrape_configs:
- job_name: 'cdn_origin'
scrape_interval: 5s
metrics_path: '/.cdn-util/prometheus/metrics'
params:
authKey: ['secret']
static_configs:
- targets: ['localhost:8080']
If `metrics.enable` is `true`, request metadata will be indexed in the provided
Elaticsearch server in the following format:
```js
{
"country_code": keyword,
"hostname": keyword,
"object_type": keyword,
"status_code": short,
"@timestamp": date // generated from `@timestamp` pipeline
}
```
Sample Grafana dashboards can be found in [dashboards.json](dashboards.json).
Custom metrics:
- HTTPRequestsTotal: `cdn_origin_http_requests_total{host="request Host header"}`
The index and `@timestamp` pipeline are created automatically if `cdn-origin`
has permission. Alternatively, the mapping and pipeline can be created by other
means using the `.json` files in [metrics/](metrics).
### TODO
......
......@@ -4,46 +4,33 @@ import (
"database/sql"
"fmt"
"html/template"
"net"
"strings"
"time"
"github.com/whats-this/cdn-origin/prometheus"
"github.com/whats-this/cdn-origin/metrics"
"github.com/whats-this/cdn-origin/weed"
log "github.com/Sirupsen/logrus"
_ "github.com/lib/pq"
prom "github.com/prometheus/client_golang/prometheus"
"github.com/spf13/pflag"
"github.com/spf13/viper"
"github.com/valyala/fasthttp"
)
// version is the current version of cdn-origin.
const version = "0.3.0"
const version = "0.4.0"
// Default SeaweedFS fetch parameters.
// Default SeaweedFS fetch query parameters.
const defaultSeaweedFSQueryParameters = ""
// HTTP header strings.
const (
accept = "accept"
host = "host"
accept = "accept"
host = "host"
location = "Location"
)
// Host domain match strings.
const (
asteriskStr = "*"
asteriskPeriodStr = "*."
periodStr = "."
)
// cdnUtil is the path to CDN utilities endpoint.
const cdnUtil = "/.cdn-util"
// authKey is the query parameter for authentication on the CDN utilities endpoint.
const keyParam = "authKey"
// redirectHTML is the html/template template for generating redirect HTML.
const redirectHTML = `<html><head><meta charset="UTF-8" /><meta http-equiv=refresh content="0; url={{.}}" /><script type="text/javascript">window.location.href="{{.}}"</script><title>Redirect</title></head><body><p>If you are not redirected automatically, click <a href="{{.}}">here</a> to go to the destination.</p></body></html>`
......@@ -54,67 +41,19 @@ const redirectPreviewHTML = `<html><head><meta charset="UTF-8" /><title>Redirect
var redirectPreviewHTMLTemplate *template.Template
// treeNode is used for domain whitelisting.
type treeNode struct {
Leaf bool
Value string
FullValue string
SubNodes []*treeNode
}
func (t *treeNode) FindOrCreateSubNode(v string) *treeNode {
if t.SubNodes == nil {
t.SubNodes = []*treeNode{}
}
for _, n := range t.SubNodes {
if !n.Leaf && n.Value == v {
return n
}
}
node := &treeNode{Value: v}
t.SubNodes = append(t.SubNodes, node)
return node
}
func (t *treeNode) GetMatch(s []string) string {
if t.Leaf || len(t.SubNodes) == 0 || len(s) == 0 {
return ""
}
for _, node := range t.SubNodes {
if node.Value == asteriskStr || node.Value == s[0] {
if node.Leaf {
return node.FullValue
}
if match := node.GetMatch(s[1:]); match != "" {
return match
}
}
}
return ""
}
// Domain whitelist configuration
var (
domainMetricsWhitelist *treeNode
useDomainMetricsWhitelist bool
)
func init() {
// Read in configuration
flags := pflag.NewFlagSet("cdn-origin", pflag.ExitOnError)
// cdnUtil.authKey (string=""): authentication token for accessing /.cdn-util (cdnUtil)
flags.String("cdnutil-auth-key", "", "Authentication token for accessing /.cdn-util (util)")
viper.BindPFlag("cdnUtil.authKey", flags.Lookup("cdnutil-auth-key"))
viper.BindEnv("cdnUtil.authKey", "CDNUTIL_AUTH_KEY")
// database.connectionURL* (string): PostgreSQL connection URL
flags.String("database-connection-url", "", "* PostgreSQL connection URL")
viper.BindPFlag("database.connectionURL", flags.Lookup("database-connection-url"))
viper.BindEnv("database.connectionURL", "DATABASE_CONNECTION_URL")
// cdnUtil.serveMetrics (bool=false): serve Prometheus metrics (cdnUtil)
flags.Bool("cdnutil-serve-metrics", false, "Serve Prometheus metrics (cdnUtil)")
viper.BindPFlag("cdnUtil.serveMetrics", flags.Lookup("cdnutil-serve-metrics"))
viper.BindEnv("cdnUtil.serveMetrics", "CDNUTIL_SERVE_METRICS")
// debug (bool=false): enable debug mode (logs requests and prints other information)
flags.Bool("debug", false, "Enable debug mode (logs requests and prints other information)")
viper.BindPFlag("debug", flags.Lookup("debug"))
viper.BindEnv("debug", "DEBUG")
// http.compressResponse (bool=false): enable transparent response compression
flags.Bool("compress-response", false, "Enable transparent response compression")
......@@ -126,22 +65,36 @@ func init() {
viper.BindPFlag("http.listenAddress", flags.Lookup("listen-address"))
viper.BindEnv("http.listenAddress", "HTTP_LISTEN_ADDRESS")
// log.debug (bool=false): enable debug mode (logs requests and prints other information)
flags.Bool("debug", false, "Enable debug mode (logs requests and prints other information)")
viper.BindPFlag("log.debug", flags.Lookup("debug"))
viper.BindEnv("log.debug", "DEBUG")
// metrics.domainWhitelist (string[]=[]): domains to whitelist for metric collection (only through config file)
// metrics.enableDomainWhitelist (bool=false): enable domain whitelist for metrics (metrics.domainWhitelist)
flags.Bool("metrics.enableWhitelist", false, "Enable domain whitelist for metric collection")
viper.BindPFlag("metrics.enableWhitelist", flags.Lookup("debug"))
viper.BindEnv("metrics.enableWhitelist", "METRICS_ENABLE_WHITELIST")
// database.connectionURL* (string): PostgreSQL connection URL
flags.String("database-connection-url", "", "* PostgreSQL connection URL")
viper.BindPFlag("database.connectionURL", flags.Lookup("database-connection-url"))
viper.BindEnv("database.connectionURL", "DATABASE_CONNECTION_URL")
// http.trustProxy (bool=false): trust X-Forwarded-For header from proxy
flags.Bool("trust-proxy", false, "Trust X-Forwarded-For header from proxy")
viper.BindPFlag("http.trustProxy", flags.Lookup("trust-proxy"))
viper.BindEnv("http.trustProxy", "HTTP_TRUST_PROXY")
// metrics.enable (bool=false): enable anonymized request recording (country code, hostname, object type, status
// code)
flags.Bool("enable-metrics", false,
"Enable anonymized request recording (status code, country code, object type, hostname)")
viper.BindPFlag("metrics.enable", flags.Lookup("enable-metrics"))
viper.BindEnv("metrics.enable", "ENABLE_METRICS")
// metrics.elasticURL (string): Elasticsearch URL to connect to for metrics, required if metrics.enable == true
flags.String("elastic-url", "", "Elastic URL to connect to for metrics")
viper.BindPFlag("metrics.elasticURL", flags.Lookup("elastic-url"))
viper.BindEnv("metrics.elasticURL", "ELASTIC_URL")
// metrics.enableHostnameWhitelist (bool=false): enable hostname whitelist for metrics
// (metrics.hostnameWhitelist)
flags.Bool("metrics-enable-whitelist", false, "Enable hostname whitelist for metrics")
viper.BindPFlag("metrics.enableHostnameWhitelist", flags.Lookup("metrics-enable-whitelist"))
viper.BindEnv("metrics.enableHostnameWhitelist", "METRICS_ENABLE_WHITELIST")
// metrics.hostnameWhitelist (string[]=[]): hostnames to whitelist for metrics (config file only)
// metrics.maxmindDBLocation (string): location of MaxMind GeoLite2 Country database file (.mmdb) on disk, if
// empty country codes will not be recorded by metrics collector
flags.String("maxmind-db-location", "", "Location of MaxMind GeoLite2 Country database on disk")
viper.BindPFlag("metrics.maxmindDBLocation", flags.Lookup("maxmind-db-location"))
viper.BindEnv("metrics.maxmindDBLocation", "MAXMIND_DB_LOCATION")
// seaweed.masterURL* (string): SeaweedFS master URL
flags.String("seaweed-master-url", "", "* SeaweedFS master URL")
......@@ -156,91 +109,53 @@ func init() {
if err := viper.ReadInConfig(); err != nil {
if _, ok := err.(viper.ConfigFileNotFoundError); !ok {
log.WithField("err", err).Fatal("failed to read in configuration")
log.WithError(err).Fatal("failed to read in configuration")
}
}
// Enable debug mode
if viper.GetBool("log.debug") {
if viper.GetBool("debug") {
log.SetLevel(log.DebugLevel)
}
// Print configuration variables to debug
log.WithFields(log.Fields{
"cdnUtil.authKey": viper.GetString("cdnUtil.authKey"),
"cdnUtil.serveMetrics": viper.GetBool("cdnUtil.serveMetrics"),
"database.connectionURL": viper.GetString("database.connectionURL"),
"http.compressResponse": viper.GetBool("http.compressResponse"),
"http.listenAddress": viper.GetString("http.listenAddress"),
"log.debug": viper.GetBool("log.debug"),
"seaweed.masterURL": viper.GetString("seaweed.masterURL"),
"database.connectionURL": viper.GetString("database.connectionURL"),
"debug": viper.GetBool("debug"),
"http.compressResponse": viper.GetBool("http.compressResponse"),
"http.listenAddress": viper.GetString("http.listenAddress"),
"metrics.enable": viper.GetBool("metrics.enable"),
"metrics.elasticURL": viper.GetString("metrics.elasticURL"),
"len(metrics.hostnameWhitelist)": len(viper.GetStringSlice("metrics.hostnameWhitelist")),
"metrics.enableHostnameWhitelist": viper.GetString("metrics.enableHostnameWhitelist"),
"seaweed.masterURL": viper.GetString("seaweed.masterURL"),
}).Debug("retrieved configuration")
// Ensure required configuration variables are set
if len(viper.GetString("database.connectionURL")) == 0 {
if viper.GetString("database.connectionURL") == "" {
log.Fatal("database.connectionURL is required")
}
if len(viper.GetString("seaweed.masterURL")) == 0 {
if viper.GetString("seaweed.masterURL") == "" {
log.Fatal("seaweed.masterURL is required")
}
// metrics.enableWhitelist configuration
if viper.GetBool("metrics.enableWhitelist") {
whitelist := []string{}
switch w := viper.Get("metrics.domainWhitelist").(type) {
case []interface{}:
for _, s := range w {
whitelist = append(whitelist, strings.TrimSpace(fmt.Sprint(s)))
}
break
default:
log.Warn("metrics.domainWhitelist is not an array, ignoring (no metrics will be recorded)")
}
// NOTE: using arrays because the provided order is important (maps would be a check if part in in map,
// then check for *, which works but doesn't maintain order)
tree := &treeNode{SubNodes: []*treeNode{}}
for _, d := range whitelist {
split := strings.Split(d, ".")
currentNode := tree
for i, s := range split {
currentNode = currentNode.FindOrCreateSubNode(s)
if i+1 == len(split) {
currentNode.Leaf = true
currentNode.FullValue = d
continue
}
if currentNode.SubNodes == nil {
currentNode.SubNodes = []*treeNode{}
}
}
}
useDomainMetricsWhitelist = true
domainMetricsWhitelist = tree
if viper.GetBool("metrics.enable") && viper.GetString("metrics.elasticURL") == "" {
log.Fatal("metrics.elasticURL is required when metrics are enabled")
}
// Parse redirect templates
var err error
redirectHTMLTemplate, err = template.New("redirectHTML").Parse(redirectHTML)
if err != nil {
log.WithField("err", err).Fatal("failed to parse redirectHTML template")
return
log.WithError(err).Fatal("failed to parse redirectHTML template")
}
redirectPreviewHTMLTemplate, err = template.New("redirectPreviewHTML").Parse(redirectPreviewHTML)
if err != nil {
log.WithField("err", err).Fatal("failed to parse redirectPreviewHTML template")
return
log.WithError(err).Fatal("failed to parse redirectPreviewHTML template")
}
}
// db holds the current PostgreSQL database connection.
var db *sql.DB
// seaweed client to use for fetching files from the SeaweedFS cluster.
var collector *metrics.Collector
var seaweed *weed.Seaweed
func main() {
......@@ -250,15 +165,40 @@ func main() {
seaweed = weed.New(viper.GetString("seaweed.masterURL"), time.Second*5)
err = seaweed.Ping()
if err != nil {
log.WithField("err", err).Fatal("failed to ping SeaweedFS master")
return
log.WithError(err).Fatal("failed to ping SeaweedFS master")
}
// Connect to PostgreSQL database
// TODO: abstractify database connection
// TODO: create table if it doesn't exist
db, err = sql.Open("postgres", viper.GetString("database.connectionURL"))
if err != nil {
log.WithField("err", err).Fatal("failed to open database connection")
return
log.WithError(err).Fatal("failed to open database connection")
}
// Setup metrics collector
if viper.GetBool("metrics.enable") {
hostnameWhitelist := []string{}
if viper.GetBool("metrics.enableHostnameWhitelist") {
switch w := viper.Get("metrics.hostnameWhitelist").(type) {
case []interface{}:
for _, s := range w {
hostnameWhitelist = append(hostnameWhitelist, strings.TrimSpace(fmt.Sprint(s)))
}
break
default:
log.Fatal("metrics.hostnameWhitelist is not an array")
}
}
collector, err = metrics.New(
viper.GetString("metrics.elasticURL"),
viper.GetString("metrics.maxmindDBLocation"),
viper.GetBool("metrics.enableHostnameWhitelist"),
hostnameWhitelist,
)
if err != nil {
log.WithError(err).Fatal("failed to setup metrics collector")
}
}
// Launch server
......@@ -273,124 +213,123 @@ func main() {
ReadBufferSize: 1024 * 6, // 6 KB
ReadTimeout: time.Minute * 30,
WriteTimeout: time.Minute * 30,
GetOnly: true,
GetOnly: true, // TODO: OPTIONS/HEAD requests
LogAllErrors: log.GetLevel() == log.DebugLevel,
DisableHeaderNamesNormalizing: false,
Logger: log.New(),
}
if err := server.ListenAndServe(viper.GetString("http.listenAddress")); err != nil {
log.WithField("err", err).Fatal("error in ListenAndServe")
log.WithError(err).Fatal("error in server.ListenAndServe")
}
}
func requestHandler(ctx *fasthttp.RequestCtx) {
path := string(ctx.Path())
func recordMetrics(ctx *fasthttp.RequestCtx, objectType string) {
if !viper.GetBool("metrics.enable") {
return
}
// Log requests in debug mode, wrapped in an if statement to prevent unnecessary memory allocations
if log.GetLevel() == log.DebugLevel {
log.WithFields(log.Fields{
"connRequestNumber": ctx.ConnRequestNum(),
// "connTime": ctx.ConnTime(),
"method": string(ctx.Method()),
// "path": path,
"queryString": ctx.QueryArgs(),
"remoteIP": ctx.RemoteIP(),
"requestURI": string(ctx.RequestURI()),
// "time": ctx.Time(),
// "userAgent": string(ctx.UserAgent()),
}).Debug("request received")
var remoteIP net.IP
if viper.GetBool("http.trustProxy") {
ipString := string(ctx.Request.Header.Peek("X-Forwarded-For"))
remoteIP = net.ParseIP(strings.Split(ipString, ",")[0])
} else {
remoteIP = ctx.RemoteIP()
}
// CDN utilities endpoint (only accessible if key is supplied and correct)
if len(viper.GetString("cdnUtil.authKey")) != 0 && strings.HasPrefix(path, cdnUtil) && string(ctx.QueryArgs().Peek(keyParam)) == viper.GetString("cdnUtil.authKey") {
switch path[len(cdnUtil):] {
case "/prometheus/metrics":
if !viper.GetBool("cdnUtil.serveMetrics") {
break
remoteIP = net.ParseIP("51.15.83.140")
hostBytes := ctx.Request.Header.Peek(host)
if len(hostBytes) != 0 {
go func() {
// Check hostname
hostStr, isValid := collector.MatchHostname(string(hostBytes))
if !isValid {
return
}
contentType, err := prometheus.WriteMetrics(ctx, string(ctx.Request.Header.Peek(accept)))
// Get country code of visitor
countryCode, err := collector.GetCountryCode(remoteIP)
if err != nil {
log.WithField("err", err).Error("failed to generate util Prometheus response")
ctx.SetStatusCode(fasthttp.StatusInternalServerError)
ctx.SetContentType("text/plain; charset=utf8")
fmt.Fprint(ctx, "500 Internal Server Error")
return
// Don't log the error here, it might contain an IP address
log.Warn("failed to get country code for IP, omitting from record")
}
ctx.SetContentType(contentType)
return
}
ctx.SetStatusCode(fasthttp.StatusNotFound)
ctx.SetContentType("text/plain; charset=utf8")
fmt.Fprintf(ctx, "404 Not Found: %s", ctx.Path())
return
record := metrics.GetRecord()
record.CountryCode = countryCode
record.Hostname = hostStr
record.ObjectType = objectType
record.StatusCode = ctx.Response.StatusCode()
err = collector.Put(record)
if err != nil {
log.WithError(err).Warn("failed to collect record")
}
log.Debug("successfully collected metrics")
}()
}
}
// Update metrics if metrics are being served
if viper.GetBool("cdnUtil.serveMetrics") {
hostBytes := ctx.Request.Header.Peek(host)
if len(hostBytes) != 0 {
hostStr := string(hostBytes)
go func() {
if useDomainMetricsWhitelist {
hostSplit := strings.Split(hostStr, periodStr)
if match := domainMetricsWhitelist.GetMatch(hostSplit); match != "" {
if strings.HasPrefix(match, asteriskPeriodStr) {
hostSplit[0] = asteriskStr
}
prometheus.HTTPRequestsTotal.With(prom.Labels{host: strings.Join(hostSplit, periodStr)}).Inc()
}
} else {
prometheus.HTTPRequestsTotal.With(prom.Labels{host: hostStr}).Inc()
}
}()
}
func requestHandler(ctx *fasthttp.RequestCtx) {
metricsObjectType := ""
// Log requests in debug mode, wrapped in an if statement to prevent unnecessary memory allocations
if log.GetLevel() == log.DebugLevel {
log.WithFields(log.Fields{
"connRequestNumber": ctx.ConnRequestNum(),
"method": string(ctx.Method()),
"queryString": ctx.QueryArgs(),
"remoteIP": ctx.RemoteIP(),
"requestURI": string(ctx.RequestURI()),
}).Debug("request received")
}
// Fetch object from database
var backend_file_id sql.NullString
var content_type sql.NullString
var dest_url sql.NullString
var object_type int
var backendFileID sql.NullString
var contentType sql.NullString
var destURL sql.NullString
var objectType int
err := db.QueryRow(
`SELECT backend_file_id, content_type, dest_url, "type" FROM objects WHERE bucket_key=$1 LIMIT 1`,
fmt.Sprintf("public%s", path)).Scan(&backend_file_id, &content_type, &dest_url, &object_type)
fmt.Sprintf("public%s", ctx.Path()),
).Scan(&backendFileID, &contentType, &destURL, &objectType)
switch {
case err == sql.ErrNoRows:
ctx.SetStatusCode(fasthttp.StatusNotFound)
ctx.SetContentType("text/plain; charset=utf8")
fmt.Fprintf(ctx, "404 Not Found: %s", ctx.Path())
recordMetrics(ctx, metricsObjectType)
return
case err != nil:
log.WithField("err", err).Error("failed to run SELECT query on database")
log.WithError(err).Error("failed to run SELECT query on database")
ctx.SetStatusCode(fasthttp.StatusInternalServerError)
ctx.SetContentType("text/plain; charset=utf8")
fmt.Fprint(ctx, "500 Internal Server Error")
ctx.SetUserValue("object_type", "file")
recordMetrics(ctx, metricsObjectType)
return
}
switch object_type {
switch objectType {
case 0: // file
metricsObjectType = "file"
// Get object from SeaweedFS and write to response
if !backend_file_id.Valid {
log.WithField("bucket_key", path).Warn("found file object with NULL backend_file_id")
if !backendFileID.Valid {
log.WithField("bucket_key", string(ctx.Path())).Warn("found file object with NULL backend_file_id")
ctx.SetStatusCode(fasthttp.StatusInternalServerError)
ctx.SetContentType("text/plain; charset=utf8")
fmt.Fprint(ctx, "500 Internal Server Error")
}
if content_type.Valid {
ctx.SetContentType(content_type.String)
} else {
ctx.SetContentType("application/octet-stream")
recordMetrics(ctx, metricsObjectType)
return
}
// TODO: ?thumbnail query parameter for images
statusCode, contentSize, err := seaweed.Get(ctx, backend_file_id.String, defaultSeaweedFSQueryParameters)
statusCode, contentSize, err := seaweed.Get(ctx, backendFileID.String, defaultSeaweedFSQueryParameters)
if err != nil {
log.WithField("err", err).Warn("failed to retrieve file from SeaweedFS volume server")
log.WithError(err).Warn("failed to retrieve file from SeaweedFS volume server")
ctx.SetStatusCode(fasthttp.StatusInternalServerError)
ctx.SetContentType("text/plain; charset=utf8")
fmt.Fprint(ctx, "500 Internal Server Error")
recordMetrics(ctx, metricsObjectType)
return
}
if statusCode != fasthttp.StatusOK {
......@@ -401,41 +340,55 @@ func requestHandler(ctx *fasthttp.RequestCtx) {
ctx.SetStatusCode(fasthttp.StatusInternalServerError)
ctx.SetContentType("text/plain; charset=utf8")
fmt.Fprint(ctx, "500 Internal Server Error")
recordMetrics(ctx, metricsObjectType)
return
}
if contentType.Valid {
ctx.SetContentType(contentType.String)
} else {
ctx.SetContentType("application/octet-stream")
}
ctx.Response.Header.SetContentLength(contentSize)
recordMetrics(ctx, metricsObjectType)
case 1: // redirect
if !dest_url.Valid {
metricsObjectType = "redirect"
if !destURL.Valid {
log.Warn("encountered redirect object with NULL dest_url")
ctx.SetStatusCode(fasthttp.StatusInternalServerError)
ctx.SetContentType("text/plain; charset=utf8")
fmt.Fprint(ctx, "500 Internal Server Error")
recordMetrics(ctx, metricsObjectType)
return
}
previewMode := ctx.QueryArgs().Has("preview")
var err error
if previewMode {
err = redirectPreviewHTMLTemplate.Execute(ctx, dest_url.String)
err = redirectPreviewHTMLTemplate.Execute(ctx, destURL.String)
} else {
err = redirectHTMLTemplate.Execute(ctx, dest_url.String)
err = redirectHTMLTemplate.Execute(ctx, destURL.String)
}
if err != nil {
log.WithFields(log.Fields{
"err": err,
"dest_url": dest_url.String,
"preview": ctx.QueryArgs().Has("preview"),
"error": err,
"dest_url": destURL.String,
"preview": ctx.QueryArgs().Has("preview"),
}).Warn("failed to generate HTML redirect page to send to client")
ctx.SetContentType("text/plain; charset=utf8")
fmt.Fprintf(ctx, "Failed to generate HTML redirect page, destination URL: %s", dest_url.String)
fmt.Fprintf(ctx, "Failed to generate HTML redirect page, destination URL: %s", destURL.String)
recordMetrics(ctx, metricsObjectType)
return
}
ctx.SetContentType("text/html; charset=ut8")
if !previewMode {
ctx.SetStatusCode(fasthttp.StatusFound)
ctx.Response.Header.Set(location, dest_url.String)
ctx.Response.Header.Set(location, destURL.String)
}
recordMetrics(ctx, metricsObjectType)
}
}
[cdnUtil]
# Authentication token for accessing /.cdn-util (supplied with query
# parameter "authKey")
authKey = "secret"
# Enable debug mode (logs requests and prints other information)
debug = false
# Serve Prometheus metrics (/.cdn-util/prometheus/metrics)
serveMetrics = true
[database]
# PostgreSQL connection URL, see
# https://godoc.org/github.com/lib/pq#hdr-Connection_String_Parameters for
# more information
connectionURL = "postgres://postgres:password@localhost/whats_this?sslmode=disable"
[http]
# Enable transparent response compression (only when the client Accepts it)
compressResponse = true
compressResponse = false
# TCP address to listen to for HTTP requests
listenAddress = ":8001"
listenAddress = ":8080"
# Trust X-Forwarded-For header from proxy (used in metrics collection)
trustProxy = false
[metrics]
# Enable metrics collection domain whitelist. When this is enabled, metrics
# will only be collected for requests with a Host header matching a domain
# in the list below.
enableWhitelist = false
# Metrics collection domain whitelist. Wildcards are supported at the
# beginning of each domain.
domainWhitelist = [
# Enable anonymized request recording (country code, hostname, object type,
# status code)
enable = false
# Elasticsearch URL to connect to for metrics, required if
# `metrics.enable == true`. See
# https://godoc.org/gopkg.in/olivere/elastic.v5/config#Parse for more
# information.
elasticURL = "http://elasticsearch:9200/cdn-origin_requests?shards=1&replicas=0"
# Enable metrics collection hostname whitelist. When this is enabled,
# metrics will only be collected for requests with a Host header matching a
# domain in the list below.
enableHostnameWhitelist = false
# Metrics collection hostname whitelist. Wildcards are supported at the
# beginning of each domain. It is recommended to rank this list by domain
# popularity at intervals to reduce match time.
#
# Since 0.4.0 `www.` prefixes will be stripped from all incoming `Host`
# headers, and don't need to be included in `metrics.hostnameWhitelist`.
hostnameWhitelist = [
"example.com",
"*.example.net",
"test.example.org"
]
[log]
# Enable debug mode (logs requests and prints other information)
debug = false
[database]
# PostgreSQL connection URL, see
# https://godoc.org/github.com/lib/pq#hdr-Connection_String_Parameters for
# more information
connectionURL = "postgres://postgres:password@localhost/whats_this?sslmode=disable"
# MaxMind GeoLite2 Country database location on disk. If omitted, no country
# code data will be collected. https://dev.maxmind.com/geoip/geoip2/geolite2
maxmindDBLocation = "/var/data/maxmind/GeoLite2-Country.mmdb"
[seaweed]
# SeaweedFS master URL (used for looking up volumes, volumes must be
......
{
"annotations": {
"list": []
},
"editable": true,
"gnetId": null,
"graphTooltip": 2,
"hideControls": false,
"id": 1,
"links": [],
"refresh": "5s",
"rows": [
{
"collapse": false,
"height": 125,
"panels": [
{
"cacheTimeout": null,
"colorBackground": false,
"colorValue": false,
"colors": [
"rgba(245, 54, 54, 0.9)",
"rgba(237, 129, 40, 0.89)",
"rgba(50, 172, 45, 0.97)"
],
"datasource": null,
"format": "none",
"gauge": {
"maxValue": 100,
"minValue": 0,
"show": false,
"thresholdLabels": false,
"thresholdMarkers": true
},
"hideTimeOverride": false,
"id": 3,
"interval": null,
"links": [],
"mappingType": 1,
"mappingTypes": [
{
"name": "value to text",
"value": 1
},
{
"name": "range to text",
"value": 2
}
],
"maxDataPoints": 100,
"minSpan": null,
"nullPointMode": "connected",
"nullText": null,
"postfix": "",
"postfixFontSize": "50%",
"prefix": "",
"prefixFontSize": "50%",
"rangeMaps": [
{
"from": "null",
"text": "N/A",
"to": "null"
}
],
"repeat": null,
"span": 6,
"sparkline": {
"fillColor": "rgba(31, 118, 189, 0.18)",
"full": false,
"lineColor": "rgb(31, 120, 193)",
"show": false
},
"tableColumn": "",
"targets": [
{
"expr": "sum(delta(cdn_origin_http_requests_total[24h]))",
"format": "time_series",
"interval": "",
"intervalFactor": 2,
"legendFormat": "",
"metric": "cdn_origin_http_requests_total",
"refId": "A",
"step": 4
}
],
"thresholds": "",
"timeFrom": null,
"title": "Requests in the Last 24 Hours",
"transparent": false,
"type": "singlestat",
"valueFontSize": "80%",
"valueMaps": [
{
"op": "=",
"text": "N/A",
"value": "null"
}
],
"valueName": "avg"
},
{
"cacheTimeout": null,
"colorBackground": false,
"colorValue": false,
"colors": [
"rgba(245, 54, 54, 0.9)",
"rgba(237, 129, 40, 0.89)",
"rgba(50, 172, 45, 0.97)"
],
"datasource": null,
"format": "none",
"gauge": {
"maxValue": 100,
"minValue": 0,
"show": false,
"thresholdLabels": false,
"thresholdMarkers": true
},
"id": 4,
"interval": null,
"links": [],
"mappingType": 1,
"mappingTypes": [
{
"name": "value to text",
"value": 1
},
{
"name": "range to text",
"value": 2
}
],
"maxDataPoints": 100,
"nullPointMode": "connected",
"nullText": null,
"postfix": "",
"postfixFontSize": "50%",
"prefix": "",
"prefixFontSize": "50%",
"rangeMaps": [
{
"from": "null",
"text": "N/A",
"to": "null"
}
],
"span": 6,
"sparkline": {
"fillColor": "rgba(31, 118, 189, 0.18)",
"full": false,
"lineColor": "rgb(31, 120, 193)",
"show": false
},
"tableColumn": "",
"targets": [
{
"expr": "sum(cdn_origin_http_requests_total)",
"format": "time_series",
"intervalFactor": 2,
"legendFormat": "",
"metric": "cdn_origin_http_requests_total",
"refId": "A",
"step": 4
}
],
"thresholds": "",
"title": "Total Requests",
"type": "singlestat",
"valueFontSize": "80%",
"valueMaps": [
{
"op": "=",
"text": "N/A",
"value": "null"
}
],
"valueName": "avg"
}
],
"repeat": null,
"repeatIteration": null,
"repeatRowId": null,
"showTitle": false,
"title": "Dashboard Row",
"titleSize": "h6"
},
{
"collapse": false,
"height": 250,
"panels": [
{
"aliasColors": {},
"bars": false,
"dashLength": 10,
"dashes": false,
"datasource": null,
"fill": 1,
"id": 5,
"legend": {
"alignAsTable": false,
"avg": false,
"current": false,
"max": false,
"min": false,
"show": true,
"total": false,
"values": false
},
"lines": true,
"linewidth": 1,
"links": [],
"nullPointMode": "null",
"percentage": false,
"pointradius": 5,
"points": false,
"renderer": "flot",
"seriesOverrides": [],
"spaceLength": 10,
"span": 12,
"stack": false,
"steppedLine": false,
"targets": [
{
"expr": "sum(idelta(cdn_origin_http_requests_total[5m]))",
"format": "time_series",
"intervalFactor": 2,
"legendFormat": "",
"refId": "A",
"step": 2
}
],
"thresholds": [],
"timeFrom": null,
"timeShift": null,
"title": "Total Requests",
"tooltip": {
"shared": true,
"sort": 0,
"value_type": "individual"
},
"type": "graph",
"xaxis": {
"buckets": null,
"mode": "time",
"name": null,
"show": true,
"values": []
},
"yaxes": [
{
"format": "short",
"label": "Requests/5s",
"logBase": 1,
"max": null,
"min": "0",
"show": true
},
{
"format": "short",
"label": null,
"logBase": 1,
"max": null,
"min": null,
"show": true
}
]
}
],
"repeat": null,
"repeatIteration": null,
"repeatRowId": null,
"showTitle": false,
"title": "Dashboard Row",
"titleSize": "h6"
},
{
"collapse": false,
"height": 284,
"panels": [
{
"aliasColors": {},
"bars": false,
"dashLength": 10,
"dashes": false,
"datasource": "PrometheusLocalhost",
"fill": 1,
"id": 2,
"legend": {
"alignAsTable": true,
"avg": true,
"current": true,
"hideEmpty": false,
"max": true,
"min": true,
"rightSide": false,
"show": true,
"total": true,
"values": true
},
"lines": true,
"linewidth": 1,
"links": [],
"nullPointMode": "null",
"percentage": false,
"pointradius": 5,
"points": false,
"renderer": "flot",
"seriesOverrides": [],
"spaceLength": 10,
"span": 12,
"stack": false,
"steppedLine": false,
"targets": [
{
"expr": "idelta(cdn_origin_http_requests_total[5m])",
"format": "time_series",
"intervalFactor": 2,
"legendFormat": "{{host}}",
"metric": "cdn_origin_http_requests_total",
"refId": "A",
"step": 2
}
],
"thresholds": [],
"timeFrom": null,
"timeShift": null,
"title": "Requests Per Host",
"tooltip": {
"shared": true,
"sort": 1,
"value_type": "individual"
},
"type": "graph",
"xaxis": {
"buckets": null,
"mode": "time",
"name": null,
"show": true,
"values": []
},
"yaxes": [
{
"format": "short",
"label": "Requests/5s",
"logBase": 1,
"max": null,
"min": "0",
"show": true
},
{
"format": "short",
"label": null,
"logBase": 1,
"max": null,
"min": null,
"show": false
}
]
}
],
"repeat": null,
"repeatIteration": null,
"repeatRowId": null,
"showTitle": false,
"title": "Dashboard Row",
"titleSize": "h6"
}
],
"schemaVersion": 14,
"style": "dark",
"tags": [],
"templating": {
"list": []
},
"time": {
"from": "now-5m",
"to": "now"
},
"timepicker": {
"refresh_intervals": [
"5s",
"10s",
"30s",
"1m",
"5m",
"15m",
"30m",
"1h",
"2h",
"1d"
],
"time_options": [
"5m",
"15m",
"1h",
"6h",
"12h",
"24h",
"2d",
"7d",
"30d"
]
},
"timezone": "browser",
"title": "cdn-origin Requests",
"version": 1
}
\ No newline at end of file
package metrics
import (
"context"
"errors"
"fmt"
"net"
"strings"
log "github.com/Sirupsen/logrus"
"github.com/oschwald/maxminddb-golang"
"gopkg.in/olivere/elastic.v5"
"gopkg.in/olivere/elastic.v5/config"
)
// mapping is the default mapping to use when creating the index if it doesn't exist. This JSON data is also maintained
// in `./mapping.elasticsearch.json`.
const mapping = `
{
"settings": {
"number_of_shards": 1
},
"mappings": {
"request": {
"properties": {
"country_code": {
"type": "keyword",
"ignore_above": 2,
"index": true
},
"hostname": {
"type": "keyword",
"ignore_above": 30,
"index": true
},
"object_type": {
"type": "keyword",
"ignore_above": 30,
"index": true
},
"status_code": {
"type": "short",
"index": true
},
"@timestamp": {
"type": "date",
"index": true
}
}
}
}
}`
// The default `@timestamp` pipeline. Sets the `@timestamp` field to the ingest timestamp (date type). This JSON data is
// also maintained in `./timestampPipeline.elasticsearch.json`.
const timestampPipeline = `
{
"description": "Stores the ingest timestamp as a date field in the document.",
"processors": [
{
"set": {
"field": "@timestamp",
"value": "{{_ingest.timestamp}}"
}
},
{
"date": {
"field": "@timestamp",
"target_field": "@timestamp",
"formats": ["EEE MMM d HH:mm:ss z yyyy"]
}
}
]
}`
// Collector collects request metadata and sends it to Elasticsearch.
type Collector struct {
ctx context.Context
elastic *elastic.Client
index string
geoIPDatabase *maxminddb.Reader
enableHostnameWhitelist bool
hostnameWhitelist *treeNode
}
// New creates a new Elasticsearch connection and returns a Collector using that connection.
func New(elasticURL string, maxmindLoc string, enableHostnameWhitelist bool, hostnameWhitelist []string) (*Collector, error) {
// Parse elasticURL
cfg, err := config.Parse(elasticURL)
if err != nil {
return nil, fmt.Errorf("failed to parse elasticURL: %s", err)
}
if cfg.Index == "" {
log.Info(`empty index name in elasticURL, using "cdn-origin_requests"`)
cfg.Index = "cdn-origin_requests"
}
// Create client and ping Elasticsearch server
client, err := elastic.NewClientFromConfig(cfg)
if err != nil {
return nil, fmt.Errorf("failed to create elastic client: %s", err)
}
ctx := context.Background()
info, code, err := client.Ping(cfg.URL).Do(ctx)
if err != nil {
return nil, fmt.Errorf("failed to ping Elasticsearch server: %s", err)
}
// TODO: if code != 200
log.WithFields(log.Fields{
"statusCode": code,
"version": info.Version.Number,
}).Debug("elasticsearch ping success")
// Check if the index exists or create it
exists, err := client.IndexExists(cfg.Index).Do(ctx)
if err != nil {
return nil, fmt.Errorf("failed to determine if the index exists: %s", err)
}
if !exists {
log.WithField("index", cfg.Index).Info("creating Elasticsearch index")
createIndex, err := client.CreateIndex(cfg.Index).
BodyString(mapping).
Do(ctx)
if err != nil {
return nil, fmt.Errorf("failed to create missing index on Elasticsearchr: %s", err)
}
if !createIndex.Acknowledged {
return nil, errors.New("failed to create missing index on Elasticsearch: no acknowledged")
}
}
// Check if the timestamp pipeline exists or create it
pipelines, err := elastic.NewIngestGetPipelineService(client).
Id("timestamp").
Do(ctx)
if err != nil && !strings.Contains(err.Error(), "404") {
return nil, fmt.Errorf("failed to determine if the timestamp pipeline exists: %s", err)
}
if len(pipelines) == 0 {
log.WithField("pipeline", "timestamp").Info("creating Elasticsearch ingest pipeline")
putPipeline, err := elastic.NewIngestPutPipelineService(client).
Id("timestamp").
BodyString(timestampPipeline).
Do(ctx)
if err != nil {
return nil, fmt.Errorf("failed to put missing pipeline on Elasticsearch: %s", err)
}
if !putPipeline.Acknowledged {
return nil, errors.New("failed to put missing pipeline on Elasticsearch: not acknowledged")
}
}
// Create Maxmind GeoLite2 Country database reader
var geoIPDatabase *maxminddb.Reader
if maxmindLoc != "" {
geoIPDatabase, err = maxminddb.Open(maxmindLoc)
if err != nil {
return nil, fmt.Errorf("failed to open MaxMind GeoLite2 Country database: %s", err)
}
}
// Construct hostname whitelist *treeNode
var hostnameWhitelistTree *treeNode
if enableHostnameWhitelist {
hostnameWhitelistTree = parseWhitelistSlice(hostnameWhitelist)
}
// Create Collector
return &Collector{
ctx: ctx,
elastic: client,
index: cfg.Index,
geoIPDatabase: geoIPDatabase,
enableHostnameWhitelist: enableHostnameWhitelist,
hostnameWhitelist: hostnameWhitelistTree,
}, nil
}
// Put indexes a record in the Elasticsearch server.
func (c *Collector) Put(record *Record) error {
_, err := c.elastic.Index().
Index(c.index).
Type("request").
Pipeline("timestamp").
BodyJson(record).
Do(c.ctx)
if err != nil {
return fmt.Errorf("failed to index record: %s", err)
}
return nil
}
// MatchHostname returns an anonymized hostname and whether or not the hostname is in the whitelist.
func (c *Collector) MatchHostname(hostname string) (string, bool) {
if c.enableHostnameWhitelist {
hostSplit := strings.Split(hostname, ".")
if hostSplit[0] == "www" {
hostSplit = hostSplit[1:]
}
if match := c.hostnameWhitelist.getMatch(hostSplit); match != "" {
if strings.HasPrefix(match, "*.") {
hostSplit[0] = "*"
}
return strings.Join(hostSplit, "."), true
}
return "", false
}
return hostname, true
}
// GetCountryCode returns the country code for an IP address from the MaxMind GeoLite2 Country database.
func (c *Collector) GetCountryCode(ip net.IP) (string, error) {
if c.geoIPDatabase == nil {
return "", nil
}
geoIPRecord := getGeoIPCountryRecord()
defer returnGeoIPCountryRecord(geoIPRecord)
err := c.geoIPDatabase.Lookup(ip, &geoIPRecord)
if err != nil {
return "", err
}
return geoIPRecord.Country.IsoCode, nil
}
package metrics
import "sync"
var geoIPPool = &sync.Pool{
New: func() interface{} {
return &geoIPCountryRecord{}
},
}
func getGeoIPCountryRecord() *geoIPCountryRecord {
return geoIPPool.Get().(*geoIPCountryRecord)
}
func returnGeoIPCountryRecord(record *geoIPCountryRecord) {
go func() {
record.Country.IsoCode = ""
geoIPPool.Put(record)
}()
}
type geoIPCountryRecord struct {
Country struct {
IsoCode string `maxminddb:"iso_code"`
} `maxminddb:"country"`
}
{
"settings": {
"number_of_shards": 1
},
"mappings": {
"request": {
"properties": {
"country_code": {
"type": "keyword",
"ignore_above": 2,
"index": true
},
"hostname": {
"type": "keyword",
"ignore_above": 30,
"index": true
},
"object_type": {
"type": "keyword",
"ignore_above": 30,
"index": true
},
"status_code": {
"type": "short",
"index": true
},
"@timestamp": {
"type": "date",
"index": true
}
}
}
}
}
package metrics
import "strings"
// treeNode is used for domain whitelisting.
type treeNode struct {
Leaf bool
Value string
FullValue string
SubNodes []*treeNode
}
func (t *treeNode) findOrCreateSubNode(v string) *treeNode {
if t.SubNodes == nil {
t.SubNodes = []*treeNode{}
}
for _, n := range t.SubNodes {
if !n.Leaf && n.Value == v {
return n
}
}
node := &treeNode{Value: v}
t.SubNodes = append(t.SubNodes, node)
return node
}
func (t *treeNode) getMatch(s []string) string {
if t.Leaf || len(t.SubNodes) == 0 || len(s) == 0 {
return ""
}
for _, node := range t.SubNodes {
if node.Value == "*" || node.Value == s[0] {
if node.Leaf {
return node.FullValue
}
if match := node.getMatch(s[1:]); match != "" {
return match
}
}
}
return ""
}
func parseWhitelistSlice(hostnameWhitelist []string) *treeNode {
// NOTE: using arrays because the provided order is important (maps would be a check if part in in map,
// then check for *, which works but doesn't maintain order)
tree := &treeNode{SubNodes: []*treeNode{}}
for _, d := range hostnameWhitelist {
split := strings.Split(d, ".")
currentNode := tree
for i, s := range split {
currentNode = currentNode.findOrCreateSubNode(s)
if i+1 == len(split) {
currentNode.Leaf = true
currentNode.FullValue = d
continue
}
if currentNode.SubNodes == nil {
currentNode.SubNodes = []*treeNode{}
}
}
}
return tree
}
package metrics
import "sync"
var recordPool = &sync.Pool{
New: func() interface{} {
return &Record{}
},
}
// GetRecord returns a `*Record` with all properties set to their zero values.
func GetRecord() *Record {
return recordPool.Get().(*Record)
}
// ReturnRecord returns a record to the `Record` pool. Once a record has been returned, its properties must not be
// altered.
func ReturnRecord(record *Record) {
go func() {
record.CountryCode = ""
record.Hostname = ""
record.ObjectType = ""
record.StatusCode = 0
recordPool.Put(record)
}()
}
// Record represents request metadata to be stored in Elasticsearch. When using `Record`s, it is recommended to use the
// pool methods `GetRecord()` and `ReturnRecord(*Record)` to reduce garbage colllector load and improve performance.
type Record struct {
CountryCode string `json:"country_code,omitempty"`
Hostname string `json:"hostname,omitempty"`
ObjectType string `json:"object_type,omitempty"`
StatusCode int `json:"status_code"`
}
{
"description": "Stores the ingest timestamp as a date field in the document.",
"processors": [
{
"set": {
"field": "@timestamp",
"value": "{{_ingest.timestamp}}"
}
},
{
"date": {
"field": "@timestamp",
"target_field": "@timestamp",
"formats": ["EEE MMM d HH:mm:ss z yyyy"]
}
}
]
}
package prometheus
import "github.com/prometheus/client_golang/prometheus"
// namespace for use with metric names.
const namespace = "cdn_origin"
// Additional metrics to expose, prometheus.DefaultGatherer includes runtime metrics by default.
var (
// HTTPRequestsTotal records the total number of HTTP requests, partitioned by hostname.
HTTPRequestsTotal = prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: namespace,
Name: "http_requests_total",
Help: "Total number of HTTP requests, partitioned by hostname.",
},
[]string{"host"},
)
)
func init() {
// Register metrics
prometheus.MustRegister(HTTPRequestsTotal)
}
package prometheus
import (
"io"
"net/http"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/expfmt"
)
// WriteMetrics writes data to the supplied io.Writer in the format specified by the `Accept` header (where possible).
// The `Content-Type` of the response is returned.
func WriteMetrics(writer io.Writer, acceptHeader string) (string, error) {
metricFamilies, err := prometheus.DefaultGatherer.Gather()
if err != nil {
return "", nil
}
contentType := expfmt.Negotiate(http.Header{
"Accept": []string{acceptHeader},
})
enc := expfmt.NewEncoder(writer, contentType)
for _, metricFamily := range metricFamilies {
if err := enc.Encode(metricFamily); err != nil {
return "", err
}
}
if closer, ok := writer.(io.Closer); ok {
closer.Close()
}
return string(contentType), nil
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment