|
|
@ -25,6 +25,7 @@ import ( |
|
|
|
"cloud.google.com/go/internal/trace" |
|
|
|
gapic "cloud.google.com/go/storage/internal/apiv2" |
|
|
|
storagepb "cloud.google.com/go/storage/internal/apiv2/stubs" |
|
|
|
"github.com/googleapis/gax-go/v2" |
|
|
|
"google.golang.org/api/iterator" |
|
|
|
"google.golang.org/api/option" |
|
|
|
"google.golang.org/api/option/internaloption" |
|
|
@ -246,7 +247,8 @@ func (c *grpcStorageClient) DeleteBucket(ctx context.Context, bucket string, con |
|
|
|
func (c *grpcStorageClient) GetBucket(ctx context.Context, bucket string, conds *BucketConditions, opts ...storageOption) (*BucketAttrs, error) { |
|
|
|
s := callSettings(c.settings, opts...) |
|
|
|
req := &storagepb.GetBucketRequest{ |
|
|
|
Name: bucketResourceName(globalProjectAlias, bucket), |
|
|
|
Name: bucketResourceName(globalProjectAlias, bucket), |
|
|
|
ReadMask: &fieldmaskpb.FieldMask{Paths: []string{"*"}}, |
|
|
|
} |
|
|
|
if err := applyBucketCondsProto("grpcStorageClient.GetBucket", conds, req); err != nil { |
|
|
|
return nil, err |
|
|
@ -344,6 +346,9 @@ func (c *grpcStorageClient) UpdateBucket(ctx context.Context, bucket string, uat |
|
|
|
if uattrs.RPO != RPOUnknown { |
|
|
|
fieldMask.Paths = append(fieldMask.Paths, "rpo") |
|
|
|
} |
|
|
|
if uattrs.Autoclass != nil { |
|
|
|
fieldMask.Paths = append(fieldMask.Paths, "autoclass") |
|
|
|
} |
|
|
|
// TODO(cathyo): Handle labels. Pending b/230510191.
|
|
|
|
req.UpdateMask = fieldMask |
|
|
|
|
|
|
@ -380,14 +385,14 @@ func (c *grpcStorageClient) ListObjects(ctx context.Context, bucket string, q *Q |
|
|
|
it.query = *q |
|
|
|
} |
|
|
|
req := &storagepb.ListObjectsRequest{ |
|
|
|
Parent: bucketResourceName(globalProjectAlias, bucket), |
|
|
|
Prefix: it.query.Prefix, |
|
|
|
Delimiter: it.query.Delimiter, |
|
|
|
Versions: it.query.Versions, |
|
|
|
LexicographicStart: it.query.StartOffset, |
|
|
|
LexicographicEnd: it.query.EndOffset, |
|
|
|
// TODO(noahietz): Convert a projection to a FieldMask.
|
|
|
|
// ReadMask: q.Projection,
|
|
|
|
Parent: bucketResourceName(globalProjectAlias, bucket), |
|
|
|
Prefix: it.query.Prefix, |
|
|
|
Delimiter: it.query.Delimiter, |
|
|
|
Versions: it.query.Versions, |
|
|
|
LexicographicStart: it.query.StartOffset, |
|
|
|
LexicographicEnd: it.query.EndOffset, |
|
|
|
IncludeTrailingDelimiter: it.query.IncludeTrailingDelimiter, |
|
|
|
ReadMask: q.toFieldMask(), // a nil Query still results in a "*" FieldMask
|
|
|
|
} |
|
|
|
if s.userProject != "" { |
|
|
|
ctx = setUserProjectMetadata(ctx, s.userProject) |
|
|
@ -411,6 +416,12 @@ func (c *grpcStorageClient) ListObjects(ctx context.Context, bucket string, q *Q |
|
|
|
it.items = append(it.items, b) |
|
|
|
} |
|
|
|
|
|
|
|
// Response is always non-nil after a successful request.
|
|
|
|
res := gitr.Response.(*storagepb.ListObjectsResponse) |
|
|
|
for _, prefix := range res.GetPrefixes() { |
|
|
|
it.items = append(it.items, &ObjectAttrs{Prefix: prefix}) |
|
|
|
} |
|
|
|
|
|
|
|
return token, nil |
|
|
|
} |
|
|
|
it.pageInfo, it.nextFunc = iterator.NewPageInfo( |
|
|
@ -449,6 +460,8 @@ func (c *grpcStorageClient) GetObject(ctx context.Context, bucket, object string |
|
|
|
req := &storagepb.GetObjectRequest{ |
|
|
|
Bucket: bucketResourceName(globalProjectAlias, bucket), |
|
|
|
Object: object, |
|
|
|
// ProjectionFull by default.
|
|
|
|
ReadMask: &fieldmaskpb.FieldMask{Paths: []string{"*"}}, |
|
|
|
} |
|
|
|
if err := applyCondsProto("grpcStorageClient.GetObject", gen, conds, req); err != nil { |
|
|
|
return nil, err |
|
|
@ -492,10 +505,7 @@ func (c *grpcStorageClient) UpdateObject(ctx context.Context, bucket, object str |
|
|
|
req.CommonObjectRequestParams = toProtoCommonObjectRequestParams(encryptionKey) |
|
|
|
} |
|
|
|
|
|
|
|
var paths []string |
|
|
|
fieldMask := &fieldmaskpb.FieldMask{ |
|
|
|
Paths: paths, |
|
|
|
} |
|
|
|
fieldMask := &fieldmaskpb.FieldMask{Paths: nil} |
|
|
|
if uattrs.EventBasedHold != nil { |
|
|
|
fieldMask.Paths = append(fieldMask.Paths, "event_based_hold") |
|
|
|
} |
|
|
@ -522,7 +532,7 @@ func (c *grpcStorageClient) UpdateObject(ctx context.Context, bucket, object str |
|
|
|
} |
|
|
|
// Note: This API currently does not support entites using project ID.
|
|
|
|
// Use project numbers in ACL entities. Pending b/233617896.
|
|
|
|
if uattrs.ACL != nil { |
|
|
|
if uattrs.ACL != nil || len(uattrs.PredefinedACL) > 0 { |
|
|
|
fieldMask.Paths = append(fieldMask.Paths, "acl") |
|
|
|
} |
|
|
|
// TODO(cathyo): Handle metadata. Pending b/230510191.
|
|
|
@ -812,6 +822,9 @@ func (c *grpcStorageClient) RewriteObject(ctx context.Context, req *rewriteObjec |
|
|
|
call.CopySourceEncryptionKeyBytes = srcParams.GetEncryptionKeyBytes() |
|
|
|
call.CopySourceEncryptionKeySha256Bytes = srcParams.GetEncryptionKeySha256Bytes() |
|
|
|
} |
|
|
|
|
|
|
|
call.MaxBytesRewrittenPerCall = req.maxBytesRewrittenPerCall |
|
|
|
|
|
|
|
var res *storagepb.RewriteResponse |
|
|
|
var err error |
|
|
|
|
|
|
@ -943,6 +956,7 @@ func (c *grpcStorageClient) NewRangeReader(ctx context.Context, params *newRange |
|
|
|
// Store the content from the first Recv in the
|
|
|
|
// client buffer for reading later.
|
|
|
|
leftovers: msg.GetChecksummedData().GetContent(), |
|
|
|
settings: s, |
|
|
|
}, |
|
|
|
} |
|
|
|
|
|
|
@ -964,6 +978,8 @@ func (c *grpcStorageClient) NewRangeReader(ctx context.Context, params *newRange |
|
|
|
} |
|
|
|
|
|
|
|
func (c *grpcStorageClient) OpenWriter(params *openWriterParams, opts ...storageOption) (*io.PipeWriter, error) { |
|
|
|
s := callSettings(c.settings, opts...) |
|
|
|
|
|
|
|
var offset int64 |
|
|
|
errorf := params.setError |
|
|
|
progress := params.progress |
|
|
@ -971,6 +987,10 @@ func (c *grpcStorageClient) OpenWriter(params *openWriterParams, opts ...storage |
|
|
|
|
|
|
|
pr, pw := io.Pipe() |
|
|
|
gw := newGRPCWriter(c, params, pr) |
|
|
|
gw.settings = s |
|
|
|
if s.userProject != "" { |
|
|
|
gw.ctx = setUserProjectMetadata(gw.ctx, s.userProject) |
|
|
|
} |
|
|
|
|
|
|
|
// This function reads the data sent to the pipe and sends sets of messages
|
|
|
|
// on the gRPC client-stream as the buffer is filled.
|
|
|
@ -1315,6 +1335,7 @@ type gRPCReader struct { |
|
|
|
reopen func(seen int64) (*readStreamResponse, context.CancelFunc, error) |
|
|
|
leftovers []byte |
|
|
|
cancel context.CancelFunc |
|
|
|
settings *settings |
|
|
|
} |
|
|
|
|
|
|
|
// Read reads bytes into the user's buffer from an open gRPC stream.
|
|
|
@ -1390,7 +1411,11 @@ func (r *gRPCReader) Close() error { |
|
|
|
// an attempt to reopen the stream.
|
|
|
|
func (r *gRPCReader) recv() (*storagepb.ReadObjectResponse, error) { |
|
|
|
msg, err := r.stream.Recv() |
|
|
|
if err != nil && ShouldRetry(err) { |
|
|
|
var shouldRetry = ShouldRetry |
|
|
|
if r.settings.retry != nil && r.settings.retry.shouldRetry != nil { |
|
|
|
shouldRetry = r.settings.retry.shouldRetry |
|
|
|
} |
|
|
|
if err != nil && shouldRetry(err) { |
|
|
|
// This will "close" the existing stream and immediately attempt to
|
|
|
|
// reopen the stream, but will backoff if further attempts are necessary.
|
|
|
|
// Reopening the stream Recvs the first message, so if retrying is
|
|
|
@ -1454,6 +1479,7 @@ type gRPCWriter struct { |
|
|
|
attrs *ObjectAttrs |
|
|
|
conds *Conditions |
|
|
|
encryptionKey []byte |
|
|
|
settings *settings |
|
|
|
|
|
|
|
sendCRC32C bool |
|
|
|
|
|
|
@ -1471,21 +1497,27 @@ func (w *gRPCWriter) startResumableUpload() error { |
|
|
|
if err != nil { |
|
|
|
return err |
|
|
|
} |
|
|
|
upres, err := w.c.raw.StartResumableWrite(w.ctx, &storagepb.StartResumableWriteRequest{ |
|
|
|
WriteObjectSpec: spec, |
|
|
|
}) |
|
|
|
|
|
|
|
w.upid = upres.GetUploadId() |
|
|
|
return err |
|
|
|
return run(w.ctx, func() error { |
|
|
|
upres, err := w.c.raw.StartResumableWrite(w.ctx, &storagepb.StartResumableWriteRequest{ |
|
|
|
WriteObjectSpec: spec, |
|
|
|
}) |
|
|
|
w.upid = upres.GetUploadId() |
|
|
|
return err |
|
|
|
}, w.settings.retry, w.settings.idempotent, setRetryHeaderGRPC(w.ctx)) |
|
|
|
} |
|
|
|
|
|
|
|
// queryProgress is a helper that queries the status of the resumable upload
|
|
|
|
// associated with the given upload ID.
|
|
|
|
func (w *gRPCWriter) queryProgress() (int64, error) { |
|
|
|
q, err := w.c.raw.QueryWriteStatus(w.ctx, &storagepb.QueryWriteStatusRequest{UploadId: w.upid}) |
|
|
|
var persistedSize int64 |
|
|
|
err := run(w.ctx, func() error { |
|
|
|
q, err := w.c.raw.QueryWriteStatus(w.ctx, &storagepb.QueryWriteStatusRequest{UploadId: w.upid}) |
|
|
|
persistedSize = q.GetPersistedSize() |
|
|
|
return err |
|
|
|
}, w.settings.retry, true, setRetryHeaderGRPC(w.ctx)) |
|
|
|
|
|
|
|
// q.GetCommittedSize() will return 0 if q is nil.
|
|
|
|
return q.GetPersistedSize(), err |
|
|
|
return persistedSize, err |
|
|
|
} |
|
|
|
|
|
|
|
// uploadBuffer opens a Write stream and uploads the buffer at the given offset (if
|
|
|
@ -1500,6 +1532,10 @@ func (w *gRPCWriter) uploadBuffer(recvd int, start int64, doneReading bool) (*st |
|
|
|
var err error |
|
|
|
var finishWrite bool |
|
|
|
var sent, limit int = 0, maxPerMessageWriteSize |
|
|
|
var shouldRetry = ShouldRetry |
|
|
|
if w.settings.retry != nil && w.settings.retry.shouldRetry != nil { |
|
|
|
shouldRetry = w.settings.retry.shouldRetry |
|
|
|
} |
|
|
|
offset := start |
|
|
|
toWrite := w.buf[:recvd] |
|
|
|
for { |
|
|
@ -1553,8 +1589,16 @@ func (w *gRPCWriter) uploadBuffer(recvd int, start int64, doneReading bool) (*st |
|
|
|
// on the *last* message of the stream (instead of the first).
|
|
|
|
if w.sendCRC32C { |
|
|
|
req.ObjectChecksums = &storagepb.ObjectChecksums{ |
|
|
|
Crc32C: proto.Uint32(w.attrs.CRC32C), |
|
|
|
Md5Hash: w.attrs.MD5, |
|
|
|
Crc32C: proto.Uint32(w.attrs.CRC32C), |
|
|
|
} |
|
|
|
} |
|
|
|
if len(w.attrs.MD5) != 0 { |
|
|
|
if cs := req.GetObjectChecksums(); cs == nil { |
|
|
|
req.ObjectChecksums = &storagepb.ObjectChecksums{ |
|
|
|
Md5Hash: w.attrs.MD5, |
|
|
|
} |
|
|
|
} else { |
|
|
|
cs.Md5Hash = w.attrs.MD5 |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
@ -1570,7 +1614,7 @@ func (w *gRPCWriter) uploadBuffer(recvd int, start int64, doneReading bool) (*st |
|
|
|
// resend the entire buffer via a new stream.
|
|
|
|
// If not retriable, falling through will return the error received
|
|
|
|
// from closing the stream.
|
|
|
|
if ShouldRetry(err) { |
|
|
|
if shouldRetry(err) { |
|
|
|
sent = 0 |
|
|
|
finishWrite = false |
|
|
|
// TODO: Add test case for failure modes of querying progress.
|
|
|
@ -1601,7 +1645,7 @@ func (w *gRPCWriter) uploadBuffer(recvd int, start int64, doneReading bool) (*st |
|
|
|
// resend the entire buffer via a new stream.
|
|
|
|
// If not retriable, falling through will return the error received
|
|
|
|
// from closing the stream.
|
|
|
|
if ShouldRetry(err) { |
|
|
|
if shouldRetry(err) { |
|
|
|
sent = 0 |
|
|
|
finishWrite = false |
|
|
|
offset, err = w.determineOffset(start) |
|
|
@ -1673,7 +1717,12 @@ func (w *gRPCWriter) writeObjectSpec() (*storagepb.WriteObjectSpec, error) { |
|
|
|
|
|
|
|
// read copies the data in the reader to the given buffer and reports how much
|
|
|
|
// data was read into the buffer and if there is no more data to read (EOF).
|
|
|
|
// Furthermore, if the attrs.ContentType is unset, the first bytes of content
|
|
|
|
// will be sniffed for a matching content type.
|
|
|
|
func (w *gRPCWriter) read() (int, bool, error) { |
|
|
|
if w.attrs.ContentType == "" { |
|
|
|
w.reader, w.attrs.ContentType = gax.DetermineContentType(w.reader) |
|
|
|
} |
|
|
|
// Set n to -1 to start the Read loop.
|
|
|
|
var n, recvd int = -1, 0 |
|
|
|
var err error |
|
|
|