Skip to content
Snippets Groups Projects
Commit 655fac38 authored by Dean's avatar Dean
Browse files

add Range and Accept-Encoding support

parent eb2915b9
No related branches found
No related tags found
No related merge requests found
......@@ -5,6 +5,7 @@ import (
"fmt"
"html/template"
"net"
"strconv"
"strings"
"time"
......@@ -237,6 +238,7 @@ func recordMetrics(ctx *fasthttp.RequestCtx, objectType string) {
}
hostBytes := ctx.Request.Header.Peek(host)
statusCode := ctx.Response.StatusCode()
if len(hostBytes) != 0 {
go func() {
// Check hostname
......@@ -256,7 +258,7 @@ func recordMetrics(ctx *fasthttp.RequestCtx, objectType string) {
record.CountryCode = countryCode
record.Hostname = hostStr
record.ObjectType = objectType
record.StatusCode = ctx.Response.StatusCode()
record.StatusCode = statusCode
err = collector.Put(record)
if err != nil {
log.WithError(err).Warn("failed to collect record")
......@@ -320,8 +322,17 @@ func requestHandler(ctx *fasthttp.RequestCtx) {
return
}
// Construct headers and get object
headers := map[string][]byte{}
if acceptEncoding := ctx.Request.Header.Peek("Accept-Encoding"); len(acceptEncoding) > 0 {
headers["Accept-Encoding"] = acceptEncoding
}
if reqRange := ctx.Request.Header.Peek("Range"); len(reqRange) > 0 {
headers["Range"] = reqRange
}
fmt.Println(headers)
// TODO: ?thumbnail query parameter for images
statusCode, contentSize, err := seaweed.Get(ctx, backendFileID.String, defaultSeaweedFSQueryParameters)
statusCode, resHeaders, err := seaweed.Get(ctx, backendFileID.String, headers, defaultSeaweedFSQueryParameters)
if err != nil {
log.WithError(err).Warn("failed to retrieve file from SeaweedFS volume server")
ctx.SetStatusCode(fasthttp.StatusInternalServerError)
......@@ -330,9 +341,9 @@ func requestHandler(ctx *fasthttp.RequestCtx) {
recordMetrics(ctx, metricsObjectType)
return
}
if statusCode != fasthttp.StatusOK {
if statusCode != fasthttp.StatusOK && statusCode != fasthttp.StatusPartialContent {
log.WithFields(log.Fields{
"expected": fasthttp.StatusOK,
"expected": fmt.Sprintf("%v,%v", fasthttp.StatusOK, fasthttp.StatusPartialContent),
"got": statusCode,
}).Warn("unexpected status code while retrieving file from SeaweedFS volume server")
ctx.SetStatusCode(fasthttp.StatusInternalServerError)
......@@ -342,12 +353,25 @@ func requestHandler(ctx *fasthttp.RequestCtx) {
return
}
// Set response headers
ctx.SetStatusCode(statusCode)
ctx.Response.Header.Set("Accept-Ranges", "bytes")
if contentType.Valid {
ctx.SetContentType(contentType.String)
} else {
ctx.SetContentType("application/octet-stream")
}
ctx.Response.Header.SetContentLength(contentSize)
if contentLength, ok := resHeaders["Content-Length"]; ok {
if val, err := strconv.Atoi(string(contentLength)); err != nil && val >= 0 {
ctx.Response.Header.SetContentLength(val)
}
}
if contentEncoding, ok := resHeaders["Content-Encoding"]; ok {
ctx.Response.Header.SetBytesV("Content-Encoding", contentEncoding)
}
if contentRange, ok := resHeaders["Content-Range"]; ok {
ctx.Response.Header.SetBytesV("Content-Range", contentRange)
}
recordMetrics(ctx, metricsObjectType)
case 1: // redirect
......
......@@ -15,15 +15,15 @@ import (
// lookupResponse represents a volume lookup response from the SeaweedFS master.
type lookupResponse struct {
VolumeId string `json:"volumeId"`
VolumeID string `json:"volumeId"`
Locations []struct {
Url string `json:"url"`
URL string `json:"url"`
PublicURL string `json:"publicUrl"`
} `json:"locations"`
}
// publicUrlsToSlice converts the list of locations in the lookupResponse to a slice of publicUrl strings.
func (res *lookupResponse) publicUrlsToSlice() []string {
// publicUrlsToSlice converts the list of locations in the lookupResponse to a slice of publicURL strings.
func (res *lookupResponse) publicURLsToSlice() []string {
var urls = []string{}
for _, loc := range res.Locations {
urls = append(urls, loc.PublicURL)
......@@ -55,10 +55,11 @@ func New(masterURI string, lookupTimeout time.Duration) *Seaweed {
}
}
func (s *Seaweed) Get(writer io.Writer, fid string, query string) (int, int, error) {
// Get a file from a SeaweedFS cluster.
func (s *Seaweed) Get(writer io.Writer, fid string, headers map[string][]byte, query string) (int, map[string][]byte, error) {
volumeURL := s.lookupVolume(strings.Split(fid, ",")[0])
if volumeURL == "" {
return fasthttp.StatusInternalServerError, 0, errors.New("failed to retrieve volume URL")
return fasthttp.StatusInternalServerError, nil, errors.New("failed to retrieve volume URL")
}
requestURL := volumeURL
if !strings.HasPrefix(requestURL, "http://") && !strings.HasPrefix(requestURL, "https://") {
......@@ -76,6 +77,11 @@ func (s *Seaweed) Get(writer io.Writer, fid string, query string) (int, int, err
req := fasthttp.AcquireRequest()
req.Reset()
req.SetRequestURI(requestURL)
if headers != nil {
for h, v := range headers {
req.Header.SetBytesV(h, v)
}
}
res := fasthttp.AcquireResponse()
defer func() {
fasthttp.ReleaseRequest(req)
......@@ -85,17 +91,24 @@ func (s *Seaweed) Get(writer io.Writer, fid string, query string) (int, int, err
// Perform request
err := fasthttp.Do(req, res)
if err != nil {
return 0, 0, err
return 0, nil, err
}
if res.StatusCode() == fasthttp.StatusOK {
if res.StatusCode() == fasthttp.StatusOK || res.StatusCode() == fasthttp.StatusPartialContent {
if err := res.BodyWriteTo(writer); err != nil {
log.WithField("err", err).Warn("failed to set body writer for response")
return fasthttp.StatusInternalServerError, 0, err
return fasthttp.StatusInternalServerError, nil, err
}
}
return fasthttp.StatusOK, res.Header.ContentLength(), err
// Get response headers
resHeaders := map[string][]byte{}
res.Header.VisitAll(func(key, value []byte) {
resHeaders[string(key)] = value
})
return res.StatusCode(), resHeaders, err
}
// Ping the master of a SeaweedFS cluster (sends a /cluster/status HTTP request to the master).
func (s *Seaweed) Ping() error {
lookupURL := fmt.Sprintf("%s/cluster/status", s.Master)
statusCode, _, err := fasthttp.GetTimeout(nil, lookupURL, s.LookupTimeout)
......@@ -103,7 +116,7 @@ func (s *Seaweed) Ping() error {
return err
}
if statusCode != fasthttp.StatusOK {
return errors.New(fmt.Sprintf("expected 200 OK response status code, got %v", statusCode))
return fmt.Errorf("expected 200 OK response status code, got %v", statusCode)
}
return nil
}
......@@ -155,6 +168,6 @@ func (s *Seaweed) lookupVolume(volumeID string) string {
log.Warn("SeaweedFS master returned no volume servers without 404ing")
return ""
}
s.VolumeCache.Add(volumeUint32, res.publicUrlsToSlice())
s.VolumeCache.Add(volumeUint32, res.publicURLsToSlice())
return s.VolumeCache.GetNext(volumeUint32)
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment