Newer
Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
package weed
import (
"encoding/json"
"errors"
"fmt"
"io"
"strconv"
"strings"
"time"
log "github.com/Sirupsen/logrus"
"github.com/valyala/fasthttp"
)
// lookupResponse represents a volume lookup response from the SeaweedFS master.
type lookupResponse struct {
VolumeId string `json:"volumeId"`
Locations []struct {
Url string `json:"url"`
PublicURL string `json:"publicUrl"`
} `json:"locations"`
}
// publicUrlsToSlice converts the list of locations in the lookupResponse to a slice of publicUrl strings.
func (res *lookupResponse) publicUrlsToSlice() []string {
var urls = []string{}
for _, loc := range res.Locations {
urls = append(urls, loc.PublicURL)
}
return urls
}
// Seaweed allows for retrieving files from a SeaweedFS cluster.
type Seaweed struct {
// Connection URI for the master.
Master string
// Volume cache to use if CacheVolumes is true.
VolumeCache *VolumeCache
// Lookup timeout for fetching volumes from master.
LookupTimeout time.Duration
}
// New creates a new instance of Seaweed.
func New(masterURI string, lookupTimeout time.Duration) *Seaweed {
return &Seaweed{
Master: masterURI,
VolumeCache: &VolumeCache{
volumeCache: map[uint32][]string{},
next: map[uint32]int{},
},
LookupTimeout: lookupTimeout,
}
}
func (s *Seaweed) Get(writer io.Writer, fid string, query string) (int, int, error) {
volumeURL := s.lookupVolume(strings.Split(fid, ",")[0])
if volumeURL == "" {
return fasthttp.StatusInternalServerError, 0, errors.New("failed to retrieve volume URL")
}
requestURL := volumeURL
if !strings.HasPrefix(requestURL, "http://") && !strings.HasPrefix(requestURL, "https://") {
requestURL = "http://" + requestURL
}
if !strings.HasSuffix(requestURL, "/") {
requestURL += "/"
}
requestURL += fid
if len(query) != 0 {
requestURL += "?" + query
}
// Set request and response
req := fasthttp.AcquireRequest()
req.Reset()
req.SetRequestURI(requestURL)
res := fasthttp.AcquireResponse()
defer func() {
fasthttp.ReleaseRequest(req)
fasthttp.ReleaseResponse(res)
}()
// Perform request
err := fasthttp.Do(req, res)
}
if res.StatusCode() == fasthttp.StatusOK {
if err := res.BodyWriteTo(writer); err != nil {
log.WithField("err", err).Warn("failed to set body writer for response")
return fasthttp.StatusInternalServerError, 0, err
return fasthttp.StatusOK, res.Header.ContentLength(), err
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
}
func (s *Seaweed) Ping() error {
lookupURL := fmt.Sprintf("%s/cluster/status", s.Master)
statusCode, _, err := fasthttp.GetTimeout(nil, lookupURL, s.LookupTimeout)
if err != nil {
return err
}
if statusCode != fasthttp.StatusOK {
return errors.New(fmt.Sprintf("expected 200 OK response status code, got %v", statusCode))
}
return nil
}
func (s *Seaweed) lookupVolume(volumeID string) string {
volumeUint64, err := strconv.ParseUint(volumeID, 10, 32)
if err != nil {
log.WithFields(log.Fields{
"err": err,
"volumeID": volumeID,
}).Warn("could not parse volume ID")
return ""
}
volumeUint32 := uint32(volumeUint64)
if uri := s.VolumeCache.GetNext(volumeUint32); uri != "" {
return uri
}
lookupURL := fmt.Sprintf("%s/dir/lookup?volumeId=%s", s.Master, volumeID)
log.WithFields(log.Fields{
"lookupURL": lookupURL,
"volumeID": volumeID,
}).Debug("looking up volume from SeaweedFS master")
statusCode, body, err := fasthttp.GetTimeout(nil, lookupURL, s.LookupTimeout)
if err != nil {
log.WithFields(log.Fields{
"err": err,
"url": lookupURL,
}).Error("failed to lookup SeaweedFS volume from master")
return ""
}
if statusCode != fasthttp.StatusOK {
log.WithFields(log.Fields{
"expected": fasthttp.StatusOK,
"got": statusCode,
}).Warn("unexpected status code while looking up SeaweedFS volume from master")
return ""
}
var res lookupResponse
err = json.Unmarshal(body, &res)
if err != nil {
log.WithFields(log.Fields{
"body": string(body),
"err": err,
}).Error("failed to parse lookup volume response from SeaweedFS master")
return ""
}
if len(res.Locations) == 0 {
log.Warn("SeaweedFS master returned no volume servers without 404ing")
return ""
}
s.VolumeCache.Add(volumeUint32, res.publicUrlsToSlice())
return s.VolumeCache.GetNext(volumeUint32)
}