You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

411 lines
11 KiB

// Package lib/vfs позволяет хранить файлы на разных источниках без необходимости учитывать особенности
// каждой реализации файлового хранилища
// поддерживаются local, s3, azure (остальные активировать по-необходимости)
package lib
import (
"context"
"fmt"
"io"
"net/http"
"net/url"
"strings"
"github.com/graymeta/stow"
"github.com/graymeta/stow/azure"
"github.com/graymeta/stow/local"
"git.edtech.vm.prod-6.cloud.el/fabric/lib/pkg/s3"
// support Azure storage
_ "github.com/graymeta/stow/azure"
// support Google storage
_ "github.com/graymeta/stow/google"
// support local storage
_ "github.com/graymeta/stow/local"
// support swift storage
_ "github.com/graymeta/stow/swift"
// support s3 storage
_ "github.com/graymeta/stow/s3"
// support oracle storage
_ "github.com/graymeta/stow/oracle"
)
type vfs struct {
bucket string
kind, endpoint, accessKeyID, secretKey, region string
location stow.Location
container stow.Container
comma string
cacert string
}
type Vfs interface {
List(ctx context.Context, prefix string, pageSize int) (files []Item, err error)
Read(ctx context.Context, file string) (data []byte, mimeType string, err error)
ReadFromBucket(ctx context.Context, file, bucket string) (data []byte, mimeType string, err error)
ReadCloser(ctx context.Context, file string) (reader io.ReadCloser, err error)
ReadCloserFromBucket(ctx context.Context, file, bucket string) (reader io.ReadCloser, err error)
Write(ctx context.Context, file string, data []byte) (err error)
Delete(ctx context.Context, file string) (err error)
Connect() (err error)
Close() (err error)
Proxy(trimPrefix, newPrefix string) (http.Handler, error)
}
type Item interface {
stow.Item
}
// Connect инициируем подключение к хранилищу, в зависимости от типа соединения
func (v *vfs) Connect() (err error) {
var config = stow.ConfigMap{}
var flagBucketExist bool
if v.region == "" {
v.region = "ru-central-1"
}
switch v.kind {
case "s3":
config = stow.ConfigMap{
s3.ConfigEndpoint: v.endpoint,
s3.ConfigAccessKeyID: v.accessKeyID,
s3.ConfigSecretKey: v.secretKey,
s3.ConfigRegion: v.region,
s3.ConfigCaCert: v.cacert,
}
case "azure":
config = stow.ConfigMap{
azure.ConfigAccount: v.accessKeyID,
azure.ConfigKey: v.secretKey,
}
case "local":
config = stow.ConfigMap{
local.ConfigKeyPath: v.endpoint,
local.MetadataDir: v.bucket,
}
}
// подсключаемся к хранилищу
v.location, err = stow.Dial(v.kind, config)
if err != nil {
return fmt.Errorf("error create container from config. err: %s", err)
}
// ищем переданных бакет, если нет, то создаем его
err = stow.WalkContainers(v.location, stow.NoPrefix, 10000, func(c stow.Container, err error) error {
if err != nil {
return err
}
if c.Name() == v.bucket {
flagBucketExist = true
return nil
}
return nil
})
if err != nil {
return fmt.Errorf("error list to containers from config. err: %s", err)
}
// создаем если нет
if !flagBucketExist {
v.container, err = v.location.CreateContainer(v.bucket)
if err != nil {
return fmt.Errorf("error create container from config. err: %s", err)
}
}
// инициируем переданный контейнер
v.container, err = v.location.Container(v.bucket)
if err != nil {
return fmt.Errorf("error create container from config. err: %s", err)
}
return err
}
// Close закрываем соединение
func (v *vfs) Close() (err error) {
err = v.location.Close()
return err
}
// Read чтение по указанному пути из бакета проекта
func (v *vfs) Read(ctx context.Context, file string) (data []byte, mimeType string, err error) {
type result struct {
Data []byte
MimeType string
Err error
}
chResult := make(chan result)
exec := func(ctx context.Context, file string) (r result) {
r.Data, r.MimeType, r.Err = v.ReadFromBucket(ctx, file, v.bucket)
return r
}
go func() {
select {
case chResult <- exec(ctx, file):
return
case <-ctx.Done():
return
}
}()
select {
case d := <-chResult:
return d.Data, d.MimeType, d.Err
case <-ctx.Done():
return data, mimeType, fmt.Errorf("exec Read dead for context")
}
}
// ReadFromBucket чтение по указанному пути из указанного бакета
func (v *vfs) ReadFromBucket(ctx context.Context, file, bucket string) (data []byte, mimeType string, err error) {
type result struct {
Reader io.ReadCloser
Err error
}
err = v.Connect()
if err != nil {
return []byte{}, "", fmt.Errorf("error connect to filestorage. err: %s cfg: VfsKind: %s, VfsEndpoint: %s, VfsBucket: %s", err, v.kind, v.endpoint, v.bucket)
}
defer v.Close()
chResult := make(chan result)
exec := func(ctx context.Context, file string) (r result) {
r.Reader, r.Err = v.ReadCloserFromBucket(ctx, file, bucket)
if r.Err != nil {
r.Err = fmt.Errorf("error ReadCloserFromBucket. err: %s", r.Err)
return r
}
// проверка на закрытие по таймауту в момент завершения функции
// прибиваем внутри ридер, если до завершения словили контекс и тело уже не нужно
select {
case <-ctx.Done():
if r.Reader != nil {
r.Err = r.Reader.Close()
}
r.Reader = nil
r.Err = fmt.Errorf("exit (ReadCloserFromBucket) with context deadline. err closed: %s", r.Err)
return r
default:
}
return r
}
go func() {
select {
case chResult <- exec(ctx, file):
return
case <-ctx.Done():
return
}
}()
select {
case d := <-chResult:
if d.Err != nil {
return nil, "", err
}
defer func() {
if d.Reader != nil {
err = d.Reader.Close()
if err != nil {
d.Err = err
}
}
}()
data, err = io.ReadAll(d.Reader)
if err != nil {
err = fmt.Errorf("error ReadAll. err: %s. file: %s, bucket: %s, v.container: %+v", err, file, bucket, v.container)
return nil, "", err
}
mimeType = detectMIME(data, file) // - определяем MimeType отдаваемого файла
return data, mimeType, err
case <-ctx.Done():
return data, mimeType, fmt.Errorf("exec ReadFromBucket dead for context")
}
}
// Write создаем объект в хранилище
func (v *vfs) Write(ctx context.Context, file string, data []byte) (err error) {
type result struct {
I Item
Err error
}
err = v.Connect()
if err != nil {
return fmt.Errorf("error connect to filestorage. err: %s cfg: VfsKind: %s, VfsEndpoint: %s, VfsBucket: %s", err, v.kind, v.endpoint, v.bucket)
}
defer v.Close()
sdata := string(data)
r := strings.NewReader(sdata)
size := int64(len(sdata))
// если передан разделитель, то заменяем / на него (возможно понадобится для совместимости плоских хранилищ)
if v.comma != "" {
file = strings.Replace(file, sep, v.comma, -1)
}
if strings.Contains(file, "../") {
return fmt.Errorf("path file not valid")
}
chResult := make(chan result)
exec := func(ctx context.Context, name string, rr io.Reader, size int64, metadata map[string]interface{}) (r result) {
_, err = v.container.Put(file, rr, size, nil)
r.Err = err
return r
}
go func() {
chResult <- exec(ctx, file, r, size, nil)
}()
select {
case d := <-chResult:
return d.Err
case <-ctx.Done():
return fmt.Errorf("exec Write dead for context")
}
}
// Delete удаляем объект в хранилище
func (v *vfs) Delete(ctx context.Context, file string) (err error) {
err = v.Connect()
if err != nil {
return fmt.Errorf("error connect to filestorage. err: %s cfg: VfsKind: %s, VfsEndpoint: %s, VfsBucket: %s", err, v.kind, v.endpoint, v.bucket)
}
defer v.Close()
item, err := v.getItem(file, v.bucket)
if err != nil {
return fmt.Errorf("error get Item for path: %s, err: %s", file, err)
}
err = v.container.RemoveItem(item.ID())
if err != nil {
return err
}
return err
}
// List список файлов выбранного
func (v *vfs) List(ctx context.Context, prefix string, pageSize int) (files []Item, err error) {
err = v.Connect()
if err != nil {
return files, fmt.Errorf("error connect to filestorage. err: %s cfg: VfsKind: %s, VfsEndpoint: %s, VfsBucket: %s", err, v.kind, v.endpoint, v.bucket)
}
defer v.Close()
err = stow.Walk(v.container, prefix, pageSize, func(item stow.Item, err error) error {
if err != nil {
fmt.Printf("error Walk from list vfs. connect:%+v, prefix: %s, err: %s\n", v, prefix, err)
return err
}
files = append(files, item)
return nil
})
return files, err
}
func (v *vfs) ReadCloser(ctx context.Context, file string) (reader io.ReadCloser, err error) {
return v.ReadCloserFromBucket(ctx, file, v.bucket)
}
func (v *vfs) ReadCloserFromBucket(ctx context.Context, file, bucket string) (reader io.ReadCloser, err error) {
item, err := v.getItem(file, bucket)
if err != nil {
return nil, err
}
reader, err = item.Open()
if err != nil {
return nil, err
}
return reader, err
}
func (v *vfs) getItem(file, bucket string) (item Item, err error) {
var urlPath url.URL
// если передан разделитель, то заменяем / на него (возможно понадобится для совместимости плоских хранилищ)
if v.comma != "" {
file = strings.Replace(file, v.comma, sep, -1)
}
// если локально, то добавляем к endpoint бакет
if v.kind == "local" {
file = v.endpoint + sep + bucket + sep + file
// подчищаем //
file = strings.Replace(file, sep+sep, sep, -1)
} else {
// подчищаем от части путей, которая использовалась раньше в локальном хранилище
// легаси, удалить когда все сайты переедут на использование только vfs
//localPrefix := sep + "upload" + sep + v.bucket
localPrefix := "upload" + sep + bucket
file = strings.Replace(file, localPrefix, "", -1)
file = strings.Replace(file, sep+sep, sep, -1)
}
//fmt.Printf("file: %s, bucket: %s, container: %-v\n", file, bucket, v.container)
urlPath.Host = bucket
urlPath.Path = file
if v.location == nil {
return nil, fmt.Errorf("error. location is empty. bucket: %s, file: %s, endpoint: %s", urlPath.Host, urlPath.Path, v.endpoint)
}
item, err = v.location.ItemByURL(&urlPath)
if err != nil {
return nil, fmt.Errorf("error. location.ItemByURL is failled. urlPath: %v, err: %s", urlPath, err)
}
if item == nil {
return nil, fmt.Errorf("error. Item is null. urlPath: %v", urlPath)
}
return item, err
}
func NewVfs(kind, endpoint, accessKeyID, secretKey, region, bucket, comma, cacert string) Vfs {
return &vfs{
kind: kind,
endpoint: endpoint,
accessKeyID: accessKeyID,
secretKey: secretKey,
region: region,
bucket: bucket,
comma: comma,
cacert: cacert,
}
}