All Products
Search
Document Center

:Multipart upload

Last Updated:Jun 18, 2025

Object Storage Service (OSS) provides the multipart upload feature that allows you to split a large object into multiple parts to upload. After these parts are uploaded, you can call the CompleteMultipartUpload operation to combine the parts into a complete object.

Notes

  • The sample code in this topic uses the region ID cn-hangzhou of the China (Hangzhou) region. By default, the public endpoint is used to access resources in a bucket. If you want to access resources in the bucket by using other Alibaba Cloud services in the same region in which the bucket is located, use the internal endpoint. For more information about OSS regions and endpoints, see Regions and endpoints.

  • In this topic, access credentials are obtained from environment variables. For more information about how to configure the access credentials, see Configure access credentials.

  • To use multipart upload, you must have the oss:PutObject permission. For more information, see Attach a custom policy to a RAM user.

Process

To upload a local file by using multipart upload, perform the following steps:

  1. Initiate a multipart upload task.

    Call the Client.InitiateMultipartUpload method to obtain a unique upload ID in OSS.

  2. Upload parts.

    Call the Client.UploadPart method to upload the parts.

    Note
    • For parts that are uploaded by running a multipart upload task with a specific upload ID, the part numbers identify their relative positions in an object. If you upload a part and reuse its part number to upload another part, the new part overwrites the original part.

    • OSS includes the MD5 hash of each uploaded part in the ETag header in the response.

    • OSS calculates the MD5 hash of the uploaded parts and compares the MD5 hash with the MD5 hash that is calculated by OSS SDK for Go. If the two hashes are different, OSS returns the InvalidDigest error code.

  3. Complete the multipart upload task.

    After all parts are uploaded, call the Client.CompleteMultipartUpload method to combine these parts into a complete object.

Examples

The following sample code provides an example on how to split a local large file into multiple parts, upload the parts to a bucket, and then combine these parts into a complete object:

package main

import (
	"bufio"
	"bytes"
	"context"
	"flag"
	"io"
	"log"
	"os"
	"sync"

	"github.com/aliyun/alibabacloud-oss-go-sdk-v2/oss"
	"github.com/aliyun/alibabacloud-oss-go-sdk-v2/oss/credentials"
)

// Define global variables.
var (
	region     string // The region in which the bucket is located.
	bucketName string // The name of the bucket.
	objectName string // The name of the object.

)

// Specify the init function used to initialize command line parameters.
func init() {
	flag.StringVar(&region, "region", "", "The region in which the bucket is located.")
	flag.StringVar(&bucketName, "bucket", "", "The name of the source bucket.")
	flag.StringVar(&objectName, "object", "", "The name of the source object.")
}

func main() {
	// Parse command line parameters.
	flag.Parse()

	// Specify the upload ID.
	var uploadId string

	// Check whether the bucket name is specified.
	if len(bucketName) == 0 {
		flag.PrintDefaults()
		log.Fatalf("invalid parameters, source bucket name required")
	}

	// Check whether the region in which the bucket is located is specified.
	if len(region) == 0 {
		flag.PrintDefaults()
		log.Fatalf("invalid parameters, region required")
	}

	// Check whether the object name is specified.
	if len(objectName) == 0 {
		flag.PrintDefaults()
		log.Fatalf("invalid parameters, source object name required")
	}

	// Load the default configurations and specify the credential provider and region.
	cfg := oss.LoadDefaultConfig().
		WithCredentialsProvider(credentials.NewEnvironmentVariableCredentialsProvider()).
		WithRegion(region)

	// Create an OSSClient instance.
	client := oss.NewClient(cfg)

	// Create a request to initiate the multipart upload task.
	initRequest := &oss.InitiateMultipartUploadRequest{
		Bucket: oss.Ptr(bucketName),
		Key:    oss.Ptr(objectName),
	}
	initResult, err := client.InitiateMultipartUpload(context.TODO(), initRequest)
	if err != nil {
		log.Fatalf("failed to initiate multipart upload %v", err)
	}

	// Display the result of initiating the multipart upload task.
	log.Printf("initiate multipart upload result:%#v\n", *initResult.UploadId)
	uploadId = *initResult.UploadId

	// Initialize the wait group and mutex.
	var wg sync.WaitGroup
	var parts []oss.UploadPart
	count := 3
	var mu sync.Mutex

	// Read data from the local file to the memory and replace yourLocalFile with the actual path of the local file that contains the file name.
	file, err := os.Open("yourLocalFile")
	if err != nil {
		log.Fatalf("failed to open local file %v", err)
	}
	defer file.Close()

	bufReader := bufio.NewReader(file)
	content, err := io.ReadAll(bufReader)
	if err != nil {
		log.Fatalf("failed to read local file %v", err)
	}
	log.Printf("file size: %d\n", len(content))

	// Calculate the size of each part.
	chunkSize := len(content) / count
	if chunkSize == 0 {
		chunkSize = 1
	}

	// Start multiple goroutines for the multipart upload task.
	for i := 0; i < count; i++ {
		start := i * chunkSize
		end := start + chunkSize
		if i == count-1 {
			end = len(content)
		}

		wg.Add(1)
		go func(partNumber int, start, end int) {
			defer wg.Done()

			// Create a request to upload a part.
			partRequest := &oss.UploadPartRequest{
				Bucket:     oss.Ptr(bucketName),                 // The name of the bucket.
				Key:        oss.Ptr(objectName),                 // The name of the object.
				PartNumber: int32(partNumber),                   // The part number.
				UploadId:   oss.Ptr(uploadId),                   // The upload ID.
				Body:       bytes.NewReader(content[start:end]), // The content of the part.
			}

			// Send the request to upload the part.
			partResult, err := client.UploadPart(context.TODO(), partRequest)
			if err != nil {
				log.Fatalf("failed to upload part %d: %v", partNumber, err)
			}

			// Display the result of the part upload request.
			part := oss.UploadPart{
				PartNumber: partRequest.PartNumber,
				ETag:       partResult.ETag,
			}

			// Use a mutex to protect shared data.
			mu.Lock()
			parts = append(parts, part)
			mu.Unlock()
		}(i+1, start, end)
	}

	// Wait until all goroutines are complete.
	wg.Wait()

	// Complete the multipart upload task.
	request := &oss.CompleteMultipartUploadRequest{
		Bucket:   oss.Ptr(bucketName),
		Key:      oss.Ptr(objectName),
		UploadId: oss.Ptr(uploadId),
		CompleteMultipartUpload: &oss.CompleteMultipartUpload{
			Parts: parts,
		},
	}
	result, err := client.CompleteMultipartUpload(context.TODO(), request)
	if err != nil {
		log.Fatalf("failed to complete multipart upload %v", err)
	}

	// Display the result of the multipart upload task.
	log.Printf("complete multipart upload result:%#v\n", result)
}

Common scenarios

Upload a random string of a specific length by using multipart upload

The following sample code provides an example on how to split a random string of 400 KB in size into 3 parts, upload the parts to a bucket, and then combine these parts into a complete object:

package main

import (
	"bufio"
	"context"
	"flag"
	"io"
	"log"
	"math/rand"
	"strings"
	"sync"
	"time"

	"github.com/aliyun/alibabacloud-oss-go-sdk-v2/oss"
	"github.com/aliyun/alibabacloud-oss-go-sdk-v2/oss/credentials"
)

// Define global variables.
var (
	region: string; // The region in which the bucket is located.
	bucketName string // The name of the bucket.
	objectName string // The name of the object.
	letters = []rune("0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ") // The character set used to generate a random string.
)

// Specify the init function used to initialize command line parameters.
func init() {
	flag.StringVar(&region, "region", "", "The region in which the bucket is located.")
	flag.StringVar(&bucketName, "bucket", "", "The name of the bucket.")
	flag.StringVar(&objectName, "object", "", "The name of the object.")
}

func main() {
	// Parse command line parameters.
	flag.Parse()

	// Specify the upload ID.
	var uploadId string

	// Check whether the bucket name is specified.
	if len(bucketName) == 0 {
		flag.PrintDefaults()
		log.Fatalf("invalid parameters, bucket name required")
	}

	// Check whether the region in which the bucket is located is specified.
	if len(region) == 0 {
		flag.PrintDefaults()
		log.Fatalf("invalid parameters, region required")
	}

	// Check whether the object name is specified.
	if len(objectName) == 0 {
		flag.PrintDefaults()
		log.Fatalf("invalid parameters, object name required")
	}

	// Load the default configurations and specify the credential provider and region.
	cfg := oss.LoadDefaultConfig().
		WithCredentialsProvider(credentials.NewEnvironmentVariableCredentialsProvider()).
		WithRegion(region)

	// Create an OSSClient instance.
	client := oss.NewClient(cfg)

	// Create a request to initiate the multipart upload task.
	initRequest := &oss.InitiateMultipartUploadRequest{
		Bucket: oss.Ptr(bucketName), // The name of the bucket.
		Key:    oss.Ptr(objectName), // The name of the object.
	}

	// Execute the request to initiate the multipart upload task and process the result.
	initResult, err := client.InitiateMultipartUpload(context.TODO(), initRequest)
	if err != nil {
		log.Fatalf("failed to initiate multipart upload %v", err)
	}

	// Display the result of initiating the multipart upload task.
	log.Printf("initiate multipart upload result:%#v\n", initResult)
	uploadId = *initResult.UploadId

	// Initialize the wait group and mutex.
	var wg sync.WaitGroup
	var parts []oss.UploadPart
	count := 3
	body := randBody(400000) // Generate a random string of 400 KB in size.
	reader := strings.NewReader(body)
	bufReader := bufio.NewReader(reader)
	content, _ := io.ReadAll(bufReader)
	partSize := len(body) / count
	var mu sync.Mutex

	// Start multiple goroutines for the multipart upload task.
	for i := 0; i < count; i++ {
		wg.Add(1)
		go func(partNumber int, partSize int, i int) {
			defer wg.Done()

			// Create a request to upload a part.
			partRequest := &oss.UploadPartRequest{
				Bucket: oss.Ptr(bucketName), // The name of the bucket.
				Key: oss.Ptr(objectName), // The name of the object.
				PartNumber: int32(partNumber), // The part number.
				UploadId: oss.Ptr(uploadId), // The upload ID.
				Body: strings.NewReader(string(content[i*partSize : (i +1)*partSize])), // The content of the part.
			}

			// Send the request to upload the part.
			partResult, err := client.UploadPart(context.TODO(), partRequest)
			if err != nil {
				log.Fatalf("failed to upload part %d: %v", partNumber, err)
			}

			// Display the result of the part upload request.
			part := oss.UploadPart{
				PartNumber: partRequest.PartNumber,
				ETag:       partResult.ETag,
			}

			// Use a mutex to protect shared data.
			mu.Lock()
			parts = append(parts, part)
			mu.Unlock()
		}(i+1, partSize, i)
	}

	// Wait until all goroutines are complete.
	wg.Wait()

	// A message indicating that the parts are uploaded is returned.
	log.Println("upload part success!")

	// Create a request to complete the multipart upload task.
	request := &oss.CompleteMultipartUploadRequest{
		Bucket:   oss.Ptr(bucketName),
		Key:      oss.Ptr(objectName),
		UploadId: oss.Ptr(uploadId),
		CompleteMultipartUpload: &oss.CompleteMultipartUpload{
			Parts: parts,
		},
	}

	// Complete the multipart upload task and process the result.
	result, err := client.CompleteMultipartUpload(context.TODO(), request)
	if err != nil {
		log.Fatalf("failed to complete multipart upload %v", err)
	}
	log.Printf("complete multipart upload result:%#v\n", result)
}

// randBody generates a random string of the specified length.
func randBody(n int) string {
	b := make([]rune, n)
	randMarker := rand.New(rand.NewSource(time.Now().UnixNano()))
	for i := range b {
		b[i] = letters[randMarker.Intn(len(letters))]
	}
	return string(b)
}

Cancel a multipart upload task

In one of the following scenarios, you can use the Client.AbortMultipartUpload method to cancel a multipart upload task:

  1. Errors in an object

    • If you detect errors in the object during the upload process, such as object damage or malicious code, you can cancel the multipart upload task to prevent potential risks.

  2. Unstable network connection

    • If the network connection is unstable or interrupted, parts may be lost or damaged during the upload process. You can cancel the multipart upload task and initiate another multipart upload task to ensure data integrity and consistency.

  3. Resource limits

    • If your storage capacity is insufficient and the object that you want to upload is too large, you can cancel the multipart upload task, release storage resources, and allocate the resources to more important tasks.

  4. Accidental operations

    • If you accidentally initiate an unnecessary multipart upload task or upload an incorrect version of an object, you can cancel the multipart upload task.

package main

import (
	"context"
	"flag"
	"log"

	"github.com/aliyun/alibabacloud-oss-go-sdk-v2/oss"
	"github.com/aliyun/alibabacloud-oss-go-sdk-v2/oss/credentials"
)

// Define global variables.
var (
	region     string // Region in which the bucket is located.
	bucketName string // Name of the source bucket.
	objectName string // Name of the source object.

)

// Specify the init function used to initialize command line parameters.
func init() {
	flag.StringVar(&region, "region", "", "The region in which the bucket is located.")
	flag.StringVar(&bucketName, "bucket", "", "The name of the source bucket.")
	flag.StringVar(&objectName, "object", "", "The name of the source object.")
}

func main() {
	// Parse command line parameters.
	flag.Parse()

	// Specify the upload ID.
	var uploadId string

	// Check whether the name of the source bucket is specified.
	if len(bucketName) == 0 {
		flag.PrintDefaults()
		log.Fatalf("invalid parameters, source bucket name required")
	}

	// Check whether the region is specified.
	if len(region) == 0 {
		flag.PrintDefaults()
		log.Fatalf("invalid parameters, region required")
	}

	// Check whether the name of the source object is specified.
	if len(objectName) == 0 {
		flag.PrintDefaults()
		log.Fatalf("invalid parameters, source object name required")
	}

	// Load the default configurations and specify the credential provider and region.
	cfg := oss.LoadDefaultConfig().
		WithCredentialsProvider(credentials.NewEnvironmentVariableCredentialsProvider()).
		WithRegion(region)

	// Create an OSS client.
	client := oss.NewClient(cfg)

	// Create a request to initiate the multipart upload task.
	initRequest := &oss.InitiateMultipartUploadRequest{
		Bucket: oss.Ptr(bucketName),
		Key:    oss.Ptr(objectName),
	}

	// Execute the multipart upload request.
	initResult, err := client.InitiateMultipartUpload(context.TODO(), initRequest)
	if err != nil {
		log.Fatalf("failed to initiate multipart upload %v", err)
	}

	// Display the result.
	log.Printf("initiate multipart upload result:%#v\n", *initResult.UploadId)
	uploadId = *initResult.UploadId

	// Create an AbortMultipartUploadRequest.
	request := &oss.AbortMultipartUploadRequest{
		Bucket:   oss.Ptr(bucketName), // Name of the bucket.
		Key:      oss.Ptr(objectName), // Name of the object.
		UploadId: oss.Ptr(uploadId),   // Specify the upload ID.
	}
	// Execute the request and process the result.
	result, err := client.AbortMultipartUpload(context.TODO(), request)
	if err != nil {
		log.Fatalf("failed to abort multipart upload %v", err)
	}
	log.Printf("abort multipart upload result:%#v\n", result)

}

List the parts that are uploaded in a specific multipart upload task

In one of the following scenarios, you can use the Client.NewListPartsPaginator paginator to list the uploaded parts in a multipart upload task:

Monitor the object upload progress

  1. Large object upload

    • When you upload a very large object, list the uploaded parts to ensure that the upload task runs as expected and issues are detected at the earliest opportunity.

  2. Resumable upload

    • When the network connection is unstable or interrupted during the object upload, you can view the uploaded parts to determine whether you need to re-upload the failed parts by using resumable upload.

  3. Troubleshooting

    • If an error occurs during the object upload, you can quickly identify the error by checking the uploaded parts, such as the upload failure of a specific part, and then resolve the error accordingly.

  4. Resource management

    • In scenarios that require strict control of resource usage, you can better manage the storage capacity and bandwidth resources to ensure that the resources are effectively used by monitoring the object upload progress.

package main

import (
	"context"
	"flag"
	"log"

	"github.com/aliyun/alibabacloud-oss-go-sdk-v2/oss"
	"github.com/aliyun/alibabacloud-oss-go-sdk-v2/oss/credentials"
)

// Define global variables.
var (
	region     string // Region in which the bucket is located.
	bucketName string // Name of the source bucket.
	objectName string // Name of the source object.

)

// Specify the init function used to initialize command line parameters.
func init() {
	flag.StringVar(&region, "region", "", "The region in which the bucket is located.")
	flag.StringVar(&bucketName, "bucket", "", "The name of the source bucket.")
	flag.StringVar(&objectName, "object", "", "The name of the source object.")
}

func main() {
	// Parse command line parameters.
	flag.Parse()

	// Specify the upload ID.
	var uploadId string

	// Check whether the name of the source bucket is specified.
	if len(bucketName) == 0 {
		flag.PrintDefaults()
		log.Fatalf("invalid parameters, source bucket name required")
	}

	// Check whether the region is specified.
	if len(region) == 0 {
		flag.PrintDefaults()
		log.Fatalf("invalid parameters, region required")
	}

	// Check whether the name of the source object is specified.
	if len(objectName) == 0 {
		flag.PrintDefaults()
		log.Fatalf("invalid parameters, source object name required")
	}

	// Load the default configurations and specify the credential provider and region.
	cfg := oss.LoadDefaultConfig().
		WithCredentialsProvider(credentials.NewEnvironmentVariableCredentialsProvider()).
		WithRegion(region)

	// Create an OSS client.
	client := oss.NewClient(cfg)

	// Create a request to initiate the multipart upload task.
	initRequest := &oss.InitiateMultipartUploadRequest{
		Bucket: oss.Ptr(bucketName),
		Key:    oss.Ptr(objectName),
	}

	// Execute the multipart upload request.
	initResult, err := client.InitiateMultipartUpload(context.TODO(), initRequest)
	if err != nil {
		log.Fatalf("failed to initiate multipart upload %v", err)
	}

	// Display the result.
	log.Printf("initiate multipart upload result:%#v\n", *initResult.UploadId)
	uploadId = *initResult.UploadId

	// Create a request to list uploaded parts.
	request := &oss.ListPartsRequest{
		Bucket:   oss.Ptr(bucketName), // Region in which the bucket is located.
		Key:      oss.Ptr(objectName), // Name of the object.
		UploadId: oss.Ptr(uploadId),   // Specify the uploadId.
	}

	// Create a paginator.
	p := client.NewListPartsPaginator(request)

	// Initialize the page number counter.
	var i int
	log.Println("List Parts:")

	// Traverse each page in the paginator.
	for p.HasNext() {
		i++

		// Obtain the data on the next page.
		page, err := p.NextPage(context.TODO())
		if err != nil {
			log.Fatalf("failed to get page %v, %v", i, err)
		}

		// Display information about each part on the page.
		for _, part := range page.Parts {
			log.Printf("Part Number: %v, ETag: %v, Last Modified: %v, Size: %v, HashCRC64: %v\n",
				part.PartNumber,
				oss.ToString(part.ETag),
				oss.ToTime(part.LastModified),
				part.Size,
				oss.ToString(part.HashCRC64))
		}
	}

}

List the multipart upload tasks of a bucket

In one of the following scenarios, you can use the Client.NewListMultipartUploadsPaginator paginator to list all ongoing multipart upload tasks of a bucket:

Monitoring scenarios

  1. Batch object upload management

    • If you want to upload a large number of objects, you can use the ListMultipartUploads method to monitor all multipart upload tasks in real time to ensure that all objects are uploaded as expected.

  2. Fault detection and recovery

    • If network issues or other failures occur during the upload process, specific parts may fail to be uploaded. By monitoring ongoing multipart upload tasks, you can detect these issues at the earliest opportunity and take measures to resume the upload.

  3. Resource optimization and management

    • During large-scale object upload, you can monitor ongoing multipart upload tasks to optimize resource allocation, such as adjusting the bandwidth usage or optimizing upload policies based on the upload progress.

  4. Data migration

    • When you perform large-scale data migration, you can monitor all ongoing multipart upload tasks to ensure the smooth migration of data and detect and resolve potential issues at the earliest opportunity.

Parameter configuration

Parameter

Description

Delimiter

The character that is used to group objects by name. The objects whose names contain the same string from the prefix to the next occurrence of the delimiter are grouped as a single result element in the CommonPrefixes parameter.

MaxUploads

The maximum number of multipart upload tasks that you want to return for the current list. Maximum value: 1000. Default value: 1000.

KeyMarker

Specifies that all multipart upload tasks with objects whose names are alphabetically after the value of the KeyMarker parameter are included in the list. You can use this parameter with the UploadIDMarker parameter to specify the starting position from which you want to list the returned results.

Prefix

The prefix that the returned object names must contain. If you use a prefix for a query, the returned object name contains the prefix.

UploadIDMarker

The starting position from which you want to list the returned results. This parameter is used together with the KeyMarker parameter.

  • If the KeyMarker parameter is not configured, this parameter is ignored.

  • If the KeyMarker parameter is configured, the response includes the following items:

    • Multipart upload tasks in which the object names are alphabetically after the value of the KeyMarker parameter.

    • Multipart upload tasks in which the object names are the same as the value of the KeyMarker parameter but whose upload IDs are greater than the value of the UploadIDMarker parameter.

  • Set the Prefix parameter to file and specify that up to 100 multipart upload tasks are returned

    package main
    
    import (
    	"context"
    	"flag"
    	"log"
    
    	"github.com/aliyun/alibabacloud-oss-go-sdk-v2/oss"
    	"github.com/aliyun/alibabacloud-oss-go-sdk-v2/oss/credentials"
    )
    
    // Define global variables.
    var (
    	region     string // Region in which the bucket is located.
    	bucketName string // Name of the source bucket.
    	objectName string // Name of the source object.
    
    )
    
    // Specify the init function used to initialize command line parameters.
    func init() {
    	flag.StringVar(&region, "region", "", "The region in which the bucket is located.")
    	flag.StringVar(&bucketName, "bucket", "", "The name of the source bucket.")
    	flag.StringVar(&objectName, "object", "", "The name of the source object.")
    }
    
    func main() {
    	// Parse command line parameters.
    	flag.Parse()
    
    	// Check whether the name of the source bucket is specified.
    	if len(bucketName) == 0 {
    		flag.PrintDefaults()
    		log.Fatalf("invalid parameters, source bucket name required")
    	}
    
    	// Check whether the region is specified.
    	if len(region) == 0 {
    		flag.PrintDefaults()
    		log.Fatalf("invalid parameters, region required")
    	}
    
    	// Check whether the name of the source object is specified.
    	if len(objectName) == 0 {
    		flag.PrintDefaults()
    		log.Fatalf("invalid parameters, source object name required")
    	}
    
    	// Load the default configurations and specify the credential provider and region.
    	cfg := oss.LoadDefaultConfig().
    		WithCredentialsProvider(credentials.NewEnvironmentVariableCredentialsProvider()).
    		WithRegion(region)
    
    	// Create an OSS client.
    	client := oss.NewClient(cfg)
    
    	// Create a request to list multipart upload tasks.
    	request := &oss.ListMultipartUploadsRequest{
    		Bucket:     oss.Ptr(bucketName), // Region in which the bucket is located.
    		MaxUploads: 100,                 // Specify that a maximum of 100 multipart upload tasks are returned.
    		Prefix:     oss.Ptr("file"),     // Set the Prefix parameter to file.
    	}
    
    	// Create a paginator.
    	p := client.NewListMultipartUploadsPaginator(request)
    
    	var i int
    	log.Println("List Multipart Uploads:")
    
    	// Traverse each page in the paginator.
    	for p.HasNext() {
    		i++
    
    		// Obtain the data on the next page.
    		page, err := p.NextPage(context.TODO())
    		if err != nil {
    			log.Fatalf("failed to get page %v, %v", i, err)
    		}
    
    		// Display information about each multipart upload task on the page.
    		for _, u := range page.Uploads {
    			log.Printf("Upload key: %v, upload id: %v, initiated: %v\n", oss.ToString(u.Key), oss.ToString(u.UploadId), oss.ToTime(u.Initiated))
    		}
    	}
    
    }
    

Configure upload callbacks for multipart upload tasks

This code implements splitting a 400KB file into 3 parts, uploading them concurrently to Alibaba Cloud OSS, merging the parts into a complete object after upload completion, and triggering a callback notification upon successful merging.

package main

import (
	"bufio"
	"context"
	"encoding/base64"
	"encoding/json"
	"flag"
	"io"
	"log"
	"math/rand"
	"strings"
	"sync"
	"time"

	"github.com/aliyun/alibabacloud-oss-go-sdk-v2/oss"
	"github.com/aliyun/alibabacloud-oss-go-sdk-v2/oss/credentials"
)

var (
	region     string
	bucketName string
	objectName string
	letters    = []rune("0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ")
)

func init() {
	flag.StringVar(&region, "region", "", "The region in which the bucket is located.")
	flag.StringVar(&bucketName, "bucket", "", "The name of the bucket.")
	flag.StringVar(&objectName, "object", "", "The name of the object.")
}

func main() {
	flag.Parse()
	if len(bucketName) == 0 {
		flag.PrintDefaults()
		log.Fatalf("invalid parameters, bucket name required")
	}

	if len(region) == 0 {
		flag.PrintDefaults()
		log.Fatalf("invalid parameters, region required")
	}

	if len(objectName) == 0 {
		flag.PrintDefaults()
		log.Fatalf("invalid parameters, object name required")
	}

	cfg := oss.LoadDefaultConfig().
		WithCredentialsProvider(credentials.NewEnvironmentVariableCredentialsProvider()).
		WithRegion(region)

	client := oss.NewClient(cfg)

	initRequest := &oss.InitiateMultipartUploadRequest{
		Bucket: oss.Ptr(bucketName),
		Key:    oss.Ptr(objectName),
	}
	initResult, err := client.InitiateMultipartUpload(context.TODO(), initRequest)

	// Specify the callback parameters.
	callbackMap := map[string]string{
		"callbackUrl":      "https://example.com:23450",                                                                  // Specify the URL of the callback server. Example: https://example.com:23450.
		"callbackBody":     "bucket=${bucket}&object=${object}&size=${size}&my_var_1=${x:my_var1}&my_var_2=${x:my_var2}", // Specify the callback request body.
		"callbackBodyType": "application/x-www-form-urlencoded",                                                          // Specify the type of the callback request body.
	}

	// Convert the configurations of the callback parameters to a JSON string and encode the string in Base64 to pass the callback configurations.
	callbackStr, err := json.Marshal(callbackMap)
	if err != nil {
		log.Fatalf("failed to marshal callback map: %v", err)
	}
	callbackBase64 := base64.StdEncoding.EncodeToString(callbackStr)

	callbackVarMap := map[string]string{}
	callbackVarMap["x:my_var1"] = "this is var 1"
	callbackVarMap["x:my_var2"] = "this is var 2"
	callbackVarStr, err := json.Marshal(callbackVarMap)
	if err != nil {
		log.Fatalf("failed to marshal callback var: %v", err)
	}
	callbackVarBase64 := base64.StdEncoding.EncodeToString(callbackVarStr)

	var wg sync.WaitGroup
	var parts []oss.UploadPart
	count := 3
	body := randBody(400000)
	reader := strings.NewReader(body)
	bufReader := bufio.NewReader(reader)
	content, _ := io.ReadAll(bufReader)
	partSize := len(body) / count
	var mu sync.Mutex
	for i := 0; i < count; i++ {
		wg.Add(1)
		go func(partNumber int, partSize int, i int) {
			defer wg.Done()
			partRequest := &oss.UploadPartRequest{
				Bucket:     oss.Ptr(bucketName),
				Key:        oss.Ptr(objectName),
				PartNumber: int32(partNumber),
				UploadId:   initResult.UploadId,
				Body:       strings.NewReader(string(content[i*partSize : (i+1)*partSize])),
			}
			partResult, err := client.UploadPart(context.TODO(), partRequest)
			if err != nil {
				log.Fatalf("failed to upload part %d: %v", partNumber, err)
			}
			part := oss.UploadPart{
				PartNumber: partRequest.PartNumber,
				ETag:       partResult.ETag,
			}
			mu.Lock()
			parts = append(parts, part)
			mu.Unlock()
		}(i+1, partSize, i)
	}
	wg.Wait()

	request := &oss.CompleteMultipartUploadRequest{
		Bucket:   oss.Ptr(bucketName),
		Key:      oss.Ptr(objectName),
		UploadId: initResult.UploadId,
		CompleteMultipartUpload: &oss.CompleteMultipartUpload{
			Parts: parts,
		},
		Callback:    oss.Ptr(callbackBase64), // The callback parameters.
		CallbackVar: oss.Ptr(callbackVarBase64),
	}
	result, err := client.CompleteMultipartUpload(context.TODO(), request)
	if err != nil {
		log.Fatalf("failed to complete multipart upload %v", err)
	}
	log.Printf("complete multipart upload result:%#v\n", result)
}

func randBody(n int) string {
	b := make([]rune, n)
	randMarker := rand.New(rand.NewSource(time.Now().UnixNano()))
	for i := range b {
		b[i] = letters[randMarker.Intn(len(letters))]
	}
	return string(b)
}

Multipart upload with progress monitoring

package main

import (
	"bufio"
	"bytes"
	"context"
	"flag"
	"fmt"
	"io"
	"log"
	"os"
	"sync"

	"github.com/aliyun/alibabacloud-oss-go-sdk-v2/oss"
	"github.com/aliyun/alibabacloud-oss-go-sdk-v2/oss/credentials"
)

// Specify the global variables.
var (
	region     string // The region of the source bucket.
	bucketName string // The name of the source bucket.
	objectName string // The name of the source object.

)

// Specify the init function used to initialize command line parameters.
func init() {
	flag.StringVar(&region, "region", "", "The region in which the bucket is located.")
	flag.StringVar(&bucketName, "bucket", "", "The name of the source bucket.")
	flag.StringVar(&objectName, "object", "", "The name of the source object.")
}

func main() {
	// Parse the command-line parameters.
	flag.Parse()

	// Specify the upload ID.
	var uploadId string

	// Check whether the source bucket name is specified.
	if len(bucketName) == 0 {
		flag.PrintDefaults()
		log.Fatalf("invalid parameters, source bucket name required")
	}

	// Check whether the region is specified.
	if len(region) == 0 {
		flag.PrintDefaults()
		log.Fatalf("invalid parameters, region required")
	}

	// Check whether the source object name is specified.
	if len(objectName) == 0 {
		flag.PrintDefaults()
		log.Fatalf("invalid parameters, source object name required")
	}

	// Load the default configurations and specify the credential provider and region.
	cfg := oss.LoadDefaultConfig().
		WithCredentialsProvider(credentials.NewEnvironmentVariableCredentialsProvider()).
		WithRegion(region)

	// Create an OSS client.
	client := oss.NewClient(cfg)

	// Initiate the multipart upload request.
	initRequest := &oss.InitiateMultipartUploadRequest{
		Bucket:  oss.Ptr(bucketName),
		Key:     oss.Ptr(objectName),
	}

	// Excute the initiation of the multipart upload request.
	initResult, err := client.InitiateMultipartUpload(context.TODO(), initRequest)
	if err != nil {
		log.Fatalf("failed to initiate multipart upload %v", err)
	}

	// Print the result.
	log.Printf("initiate multipart upload result:%#v\n", *initResult.UploadId)
	uploadId = *initResult.UploadId

	// Initialize the wait group and mutex.
	var wg sync.WaitGroup
	var parts []oss.UploadPart
	count := 5
	var mu sync.Mutex

	// Read data from the local file to the memory and replace yourLocalFile with the actual path of the local file that contains the file name.
	file, err := os.Open("/Users/yourLocalPath/yourFileName")
	if err != nil {
		log.Fatalf("failed to open local file %v", err)
	}
	defer file.Close()

	bufReader := bufio.NewReader(file)
	content, err := io.ReadAll(bufReader)
	if err != nil {
		log.Fatalf("failed to read local file %v", err)
	}
	log.Printf("file size: %d\n", len(content))

	// Calculate the size of each part.
	chunkSize := len(content) / count
	if chunkSize == 0 {
		chunkSize = 1
	}

	// Start multiple goroutines for the multipart upload task.
	for i := 0; i < count; i++ {
		start := i * chunkSize
		end := start + chunkSize
		if i == count-1 {
			end = len(content)
		}

		wg.Add(1)
		go func(partNumber int, start, end int) {
			defer wg.Done()

			// Create the multipart upload request.
			partRequest := &oss.UploadPartRequest{
				Bucket:     oss.Ptr(bucketName),                 // The name of the destination bucket.
				Key:        oss.Ptr(objectName),                 // The name of the destination object.
				PartNumber: int32(partNumber),                   // The number of the part.
				UploadId:   oss.Ptr(uploadId),                   // Upload ID.
				Body:       bytes.NewReader(content[start:end]), // The content of the part.
				ProgressFn: func(increment, transferred, total int64) {
					fmt.Printf("increment:%v, transferred:%v, total:%v\n", increment, transferred, total)
				}, // Specify a progress callback function that is used to query the upload progress.
			}

			// Send the multipart upload request.
			partResult, err := client.UploadPart(context.TODO(), partRequest)
			if err != nil {
				log.Fatalf("failed to upload part %d: %v", partNumber, err)
			}

			log.Printf("successfully uploaded part %d (start: %d, end: %d)", partNumber, start, end)

			// Capture the result.
			part := oss.UploadPart{
				PartNumber: partRequest.PartNumber,
				ETag:       partResult.ETag,
			}

			// Use a mutex to protect shared data.
			mu.Lock()
			parts = append(parts, part)
			mu.Unlock()
		}(i+1, start, end)
	}

	// Wait until all goroutines are complete.
	wg.Wait()

	// Complete the the multipart upload request.
	request := &oss.CompleteMultipartUploadRequest{
		Bucket:   oss.Ptr(bucketName),
		Key:      oss.Ptr(objectName),
		UploadId: oss.Ptr(uploadId),
		CompleteMultipartUpload: &oss.CompleteMultipartUpload{
			Parts: parts,
		},
	}
	result, err := client.CompleteMultipartUpload(context.TODO(), request)
	if err != nil {
		log.Fatalf("failed to complete multipart upload %v", err)
	}

	// Log the version ID returned from the completed multipart upload operation.
	log.Printf("complete multipart upload result versionId:%#v\n", result)
}

References

  • For the complete sample code that is used to perform multipart upload, visit GitHub.

  • A multipart upload involves three API operations. For more information about the operations, visit the following topics:

  • For more information about the API operation that you can call to cancel a multipart upload task, visit AbortMultipartUpload.

  • For more information about the API operation that you can call to list the uploaded parts, visit NewListPartsPaginator.

  • For more information about the API operation that you can call to list ongoing multipart upload tasks, visit NewListMultipartUploadsPaginator. Ongoing multipart upload tasks are tasks that are initiated but are not completed or canceled.