Fixes #28853 Needs both https://gitea.com/gitea/act_runner/pulls/473 and https://gitea.com/gitea/act_runner/pulls/471 on the runner side and patched `actions/upload-artifact@v4` / `actions/download-artifact@v4`, like `christopherhx/gitea-upload-artifact@v4` and `christopherhx/gitea-download-artifact@v4`, to not return errors due to GHES not beeing supported yet.
		
			
				
	
	
		
			254 lines
		
	
	
		
			8.1 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
			
		
		
	
	
			254 lines
		
	
	
		
			8.1 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
| // Copyright 2023 The Gitea Authors. All rights reserved.
 | |
| // SPDX-License-Identifier: MIT
 | |
| 
 | |
| package actions
 | |
| 
 | |
| import (
 | |
| 	"crypto/md5"
 | |
| 	"crypto/sha256"
 | |
| 	"encoding/base64"
 | |
| 	"encoding/hex"
 | |
| 	"errors"
 | |
| 	"fmt"
 | |
| 	"hash"
 | |
| 	"io"
 | |
| 	"path/filepath"
 | |
| 	"sort"
 | |
| 	"strings"
 | |
| 	"time"
 | |
| 
 | |
| 	"code.gitea.io/gitea/models/actions"
 | |
| 	"code.gitea.io/gitea/models/db"
 | |
| 	"code.gitea.io/gitea/modules/log"
 | |
| 	"code.gitea.io/gitea/modules/storage"
 | |
| )
 | |
| 
 | |
| func saveUploadChunkBase(st storage.ObjectStorage, ctx *ArtifactContext,
 | |
| 	artifact *actions.ActionArtifact,
 | |
| 	contentSize, runID, start, end, length int64, checkMd5 bool,
 | |
| ) (int64, error) {
 | |
| 	// build chunk store path
 | |
| 	storagePath := fmt.Sprintf("tmp%d/%d-%d-%d-%d.chunk", runID, runID, artifact.ID, start, end)
 | |
| 	var r io.Reader = ctx.Req.Body
 | |
| 	var hasher hash.Hash
 | |
| 	if checkMd5 {
 | |
| 		// use io.TeeReader to avoid reading all body to md5 sum.
 | |
| 		// it writes data to hasher after reading end
 | |
| 		// if hash is not matched, delete the read-end result
 | |
| 		hasher = md5.New()
 | |
| 		r = io.TeeReader(r, hasher)
 | |
| 	}
 | |
| 	// save chunk to storage
 | |
| 	writtenSize, err := st.Save(storagePath, r, -1)
 | |
| 	if err != nil {
 | |
| 		return -1, fmt.Errorf("save chunk to storage error: %v", err)
 | |
| 	}
 | |
| 	var checkErr error
 | |
| 	if checkMd5 {
 | |
| 		// check md5
 | |
| 		reqMd5String := ctx.Req.Header.Get(artifactXActionsResultsMD5Header)
 | |
| 		chunkMd5String := base64.StdEncoding.EncodeToString(hasher.Sum(nil))
 | |
| 		log.Info("[artifact] check chunk md5, sum: %s, header: %s", chunkMd5String, reqMd5String)
 | |
| 		// if md5 not match, delete the chunk
 | |
| 		if reqMd5String != chunkMd5String {
 | |
| 			checkErr = fmt.Errorf("md5 not match")
 | |
| 		}
 | |
| 	}
 | |
| 	if writtenSize != contentSize {
 | |
| 		checkErr = errors.Join(checkErr, fmt.Errorf("contentSize not match body size"))
 | |
| 	}
 | |
| 	if checkErr != nil {
 | |
| 		if err := st.Delete(storagePath); err != nil {
 | |
| 			log.Error("Error deleting chunk: %s, %v", storagePath, err)
 | |
| 		}
 | |
| 		return -1, checkErr
 | |
| 	}
 | |
| 	log.Info("[artifact] save chunk %s, size: %d, artifact id: %d, start: %d, end: %d",
 | |
| 		storagePath, contentSize, artifact.ID, start, end)
 | |
| 	// return chunk total size
 | |
| 	return length, nil
 | |
| }
 | |
| 
 | |
| func saveUploadChunk(st storage.ObjectStorage, ctx *ArtifactContext,
 | |
| 	artifact *actions.ActionArtifact,
 | |
| 	contentSize, runID int64,
 | |
| ) (int64, error) {
 | |
| 	// parse content-range header, format: bytes 0-1023/146515
 | |
| 	contentRange := ctx.Req.Header.Get("Content-Range")
 | |
| 	start, end, length := int64(0), int64(0), int64(0)
 | |
| 	if _, err := fmt.Sscanf(contentRange, "bytes %d-%d/%d", &start, &end, &length); err != nil {
 | |
| 		log.Warn("parse content range error: %v, content-range: %s", err, contentRange)
 | |
| 		return -1, fmt.Errorf("parse content range error: %v", err)
 | |
| 	}
 | |
| 	return saveUploadChunkBase(st, ctx, artifact, contentSize, runID, start, end, length, true)
 | |
| }
 | |
| 
 | |
| func appendUploadChunk(st storage.ObjectStorage, ctx *ArtifactContext,
 | |
| 	artifact *actions.ActionArtifact,
 | |
| 	start, contentSize, runID int64,
 | |
| ) (int64, error) {
 | |
| 	end := start + contentSize - 1
 | |
| 	return saveUploadChunkBase(st, ctx, artifact, contentSize, runID, start, end, contentSize, false)
 | |
| }
 | |
| 
 | |
| type chunkFileItem struct {
 | |
| 	RunID      int64
 | |
| 	ArtifactID int64
 | |
| 	Start      int64
 | |
| 	End        int64
 | |
| 	Path       string
 | |
| }
 | |
| 
 | |
| func listChunksByRunID(st storage.ObjectStorage, runID int64) (map[int64][]*chunkFileItem, error) {
 | |
| 	storageDir := fmt.Sprintf("tmp%d", runID)
 | |
| 	var chunks []*chunkFileItem
 | |
| 	if err := st.IterateObjects(storageDir, func(fpath string, obj storage.Object) error {
 | |
| 		baseName := filepath.Base(fpath)
 | |
| 		// when read chunks from storage, it only contains storage dir and basename,
 | |
| 		// no matter the subdirectory setting in storage config
 | |
| 		item := chunkFileItem{Path: storageDir + "/" + baseName}
 | |
| 		if _, err := fmt.Sscanf(baseName, "%d-%d-%d-%d.chunk", &item.RunID, &item.ArtifactID, &item.Start, &item.End); err != nil {
 | |
| 			return fmt.Errorf("parse content range error: %v", err)
 | |
| 		}
 | |
| 		chunks = append(chunks, &item)
 | |
| 		return nil
 | |
| 	}); err != nil {
 | |
| 		return nil, err
 | |
| 	}
 | |
| 	// chunks group by artifact id
 | |
| 	chunksMap := make(map[int64][]*chunkFileItem)
 | |
| 	for _, c := range chunks {
 | |
| 		chunksMap[c.ArtifactID] = append(chunksMap[c.ArtifactID], c)
 | |
| 	}
 | |
| 	return chunksMap, nil
 | |
| }
 | |
| 
 | |
| func mergeChunksForRun(ctx *ArtifactContext, st storage.ObjectStorage, runID int64, artifactName string) error {
 | |
| 	// read all db artifacts by name
 | |
| 	artifacts, err := db.Find[actions.ActionArtifact](ctx, actions.FindArtifactsOptions{
 | |
| 		RunID:        runID,
 | |
| 		ArtifactName: artifactName,
 | |
| 	})
 | |
| 	if err != nil {
 | |
| 		return err
 | |
| 	}
 | |
| 	// read all uploading chunks from storage
 | |
| 	chunksMap, err := listChunksByRunID(st, runID)
 | |
| 	if err != nil {
 | |
| 		return err
 | |
| 	}
 | |
| 	// range db artifacts to merge chunks
 | |
| 	for _, art := range artifacts {
 | |
| 		chunks, ok := chunksMap[art.ID]
 | |
| 		if !ok {
 | |
| 			log.Debug("artifact %d chunks not found", art.ID)
 | |
| 			continue
 | |
| 		}
 | |
| 		if err := mergeChunksForArtifact(ctx, chunks, st, art, ""); err != nil {
 | |
| 			return err
 | |
| 		}
 | |
| 	}
 | |
| 	return nil
 | |
| }
 | |
| 
 | |
| func mergeChunksForArtifact(ctx *ArtifactContext, chunks []*chunkFileItem, st storage.ObjectStorage, artifact *actions.ActionArtifact, checksum string) error {
 | |
| 	sort.Slice(chunks, func(i, j int) bool {
 | |
| 		return chunks[i].Start < chunks[j].Start
 | |
| 	})
 | |
| 	allChunks := make([]*chunkFileItem, 0)
 | |
| 	startAt := int64(-1)
 | |
| 	// check if all chunks are uploaded and in order and clean repeated chunks
 | |
| 	for _, c := range chunks {
 | |
| 		// startAt is -1 means this is the first chunk
 | |
| 		// previous c.ChunkEnd + 1 == c.ChunkStart means this chunk is in order
 | |
| 		// StartAt is not -1 and c.ChunkStart is not startAt + 1 means there is a chunk missing
 | |
| 		if c.Start == (startAt + 1) {
 | |
| 			allChunks = append(allChunks, c)
 | |
| 			startAt = c.End
 | |
| 		}
 | |
| 	}
 | |
| 	// if the last chunk.End + 1 is not equal to chunk.ChunkLength, means chunks are not uploaded completely
 | |
| 	if startAt+1 != artifact.FileCompressedSize {
 | |
| 		log.Debug("[artifact] chunks are not uploaded completely, artifact_id: %d", artifact.ID)
 | |
| 		return nil
 | |
| 	}
 | |
| 	// use multiReader
 | |
| 	readers := make([]io.Reader, 0, len(allChunks))
 | |
| 	closeReaders := func() {
 | |
| 		for _, r := range readers {
 | |
| 			_ = r.(io.Closer).Close() // it guarantees to be io.Closer by the following loop's Open function
 | |
| 		}
 | |
| 		readers = nil
 | |
| 	}
 | |
| 	defer closeReaders()
 | |
| 	for _, c := range allChunks {
 | |
| 		var readCloser io.ReadCloser
 | |
| 		var err error
 | |
| 		if readCloser, err = st.Open(c.Path); err != nil {
 | |
| 			return fmt.Errorf("open chunk error: %v, %s", err, c.Path)
 | |
| 		}
 | |
| 		readers = append(readers, readCloser)
 | |
| 	}
 | |
| 	mergedReader := io.MultiReader(readers...)
 | |
| 	shaPrefix := "sha256:"
 | |
| 	var hash hash.Hash
 | |
| 	if strings.HasPrefix(checksum, shaPrefix) {
 | |
| 		hash = sha256.New()
 | |
| 	}
 | |
| 	if hash != nil {
 | |
| 		mergedReader = io.TeeReader(mergedReader, hash)
 | |
| 	}
 | |
| 
 | |
| 	// if chunk is gzip, use gz as extension
 | |
| 	// download-artifact action will use content-encoding header to decide if it should decompress the file
 | |
| 	extension := "chunk"
 | |
| 	if artifact.ContentEncoding == "gzip" {
 | |
| 		extension = "chunk.gz"
 | |
| 	}
 | |
| 
 | |
| 	// save merged file
 | |
| 	storagePath := fmt.Sprintf("%d/%d/%d.%s", artifact.RunID%255, artifact.ID%255, time.Now().UnixNano(), extension)
 | |
| 	written, err := st.Save(storagePath, mergedReader, -1)
 | |
| 	if err != nil {
 | |
| 		return fmt.Errorf("save merged file error: %v", err)
 | |
| 	}
 | |
| 	if written != artifact.FileCompressedSize {
 | |
| 		return fmt.Errorf("merged file size is not equal to chunk length")
 | |
| 	}
 | |
| 
 | |
| 	defer func() {
 | |
| 		closeReaders() // close before delete
 | |
| 		// drop chunks
 | |
| 		for _, c := range chunks {
 | |
| 			if err := st.Delete(c.Path); err != nil {
 | |
| 				log.Warn("Error deleting chunk: %s, %v", c.Path, err)
 | |
| 			}
 | |
| 		}
 | |
| 	}()
 | |
| 
 | |
| 	if hash != nil {
 | |
| 		rawChecksum := hash.Sum(nil)
 | |
| 		actualChecksum := hex.EncodeToString(rawChecksum)
 | |
| 		if !strings.HasSuffix(checksum, actualChecksum) {
 | |
| 			return fmt.Errorf("update artifact error checksum is invalid")
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	// save storage path to artifact
 | |
| 	log.Debug("[artifact] merge chunks to artifact: %d, %s, old:%s", artifact.ID, storagePath, artifact.StoragePath)
 | |
| 	// if artifact is already uploaded, delete the old file
 | |
| 	if artifact.StoragePath != "" {
 | |
| 		if err := st.Delete(artifact.StoragePath); err != nil {
 | |
| 			log.Warn("Error deleting old artifact: %s, %v", artifact.StoragePath, err)
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	artifact.StoragePath = storagePath
 | |
| 	artifact.Status = int64(actions.ArtifactStatusUploadConfirmed)
 | |
| 	if err := actions.UpdateArtifactByID(ctx, artifact.ID, artifact); err != nil {
 | |
| 		return fmt.Errorf("update artifact error: %v", err)
 | |
| 	}
 | |
| 
 | |
| 	return nil
 | |
| }
 |