/*
 * MinIO Go Library for Amazon S3 Compatible Cloud Storage
 * Copyright 2023 MinIO, Inc.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package minio

import (
	"context"
	"encoding/json"
	"errors"
	"io"
	"mime/multipart"
	"net/http"
	"strconv"
	"strings"
	"time"

	"github.com/minio/minio-go/v7/pkg/encrypt"
)

// PutObjectFanOutEntry is per object entry fan-out metadata
type PutObjectFanOutEntry struct {
	Key                string            `json:"key"`
	UserMetadata       map[string]string `json:"metadata,omitempty"`
	UserTags           map[string]string `json:"tags,omitempty"`
	ContentType        string            `json:"contentType,omitempty"`
	ContentEncoding    string            `json:"contentEncoding,omitempty"`
	ContentDisposition string            `json:"contentDisposition,omitempty"`
	ContentLanguage    string            `json:"contentLanguage,omitempty"`
	CacheControl       string            `json:"cacheControl,omitempty"`
	Retention          RetentionMode     `json:"retention,omitempty"`
	RetainUntilDate    *time.Time        `json:"retainUntil,omitempty"`
}

// PutObjectFanOutRequest this is the request structure sent
// to the server to fan-out the stream to multiple objects.
type PutObjectFanOutRequest struct {
	Entries  []PutObjectFanOutEntry
	Checksum Checksum
	SSE      encrypt.ServerSide
}

// PutObjectFanOutResponse this is the response structure sent
// by the server upon success or failure for each object
// fan-out keys. Additionally, this response carries ETag,
// VersionID and LastModified for each object fan-out.
type PutObjectFanOutResponse struct {
	Key          string     `json:"key"`
	ETag         string     `json:"etag,omitempty"`
	VersionID    string     `json:"versionId,omitempty"`
	LastModified *time.Time `json:"lastModified,omitempty"`
	Error        string     `json:"error,omitempty"`
}

// PutObjectFanOut - is a variant of PutObject instead of writing a single object from a single
// stream multiple objects are written, defined via a list of PutObjectFanOutRequests. Each entry
// in PutObjectFanOutRequest carries an object keyname and its relevant metadata if any. `Key` is
// mandatory, rest of the other options in PutObjectFanOutRequest are optional.
func (c *Client) PutObjectFanOut(ctx context.Context, bucket string, fanOutData io.Reader, fanOutReq PutObjectFanOutRequest) ([]PutObjectFanOutResponse, error) {
	if len(fanOutReq.Entries) == 0 {
		return nil, errInvalidArgument("fan out requests cannot be empty")
	}

	policy := NewPostPolicy()
	policy.SetBucket(bucket)
	policy.SetKey(strconv.FormatInt(time.Now().UnixNano(), 16))

	// Expires in 15 minutes.
	policy.SetExpires(time.Now().UTC().Add(15 * time.Minute))

	// Set encryption headers if any.
	policy.SetEncryption(fanOutReq.SSE)

	// Set checksum headers if any.
	err := policy.SetChecksum(fanOutReq.Checksum)
	if err != nil {
		return nil, err
	}

	url, formData, err := c.PresignedPostPolicy(ctx, policy)
	if err != nil {
		return nil, err
	}

	r, w := io.Pipe()

	req, err := http.NewRequest(http.MethodPost, url.String(), r)
	if err != nil {
		w.Close()
		return nil, err
	}

	var b strings.Builder
	enc := json.NewEncoder(&b)
	for _, req := range fanOutReq.Entries {
		if req.Key == "" {
			w.Close()
			return nil, errors.New("PutObjectFanOutRequest.Key is mandatory and cannot be empty")
		}
		if err = enc.Encode(&req); err != nil {
			w.Close()
			return nil, err
		}
	}

	mwriter := multipart.NewWriter(w)
	req.Header.Add("Content-Type", mwriter.FormDataContentType())

	go func() {
		defer w.Close()
		defer mwriter.Close()

		for k, v := range formData {
			if err := mwriter.WriteField(k, v); err != nil {
				return
			}
		}

		if err := mwriter.WriteField("x-minio-fanout-list", b.String()); err != nil {
			return
		}

		mw, err := mwriter.CreateFormFile("file", "fanout-content")
		if err != nil {
			return
		}

		if _, err = io.Copy(mw, fanOutData); err != nil {
			return
		}
	}()

	resp, err := c.do(req)
	if err != nil {
		return nil, err
	}
	defer closeResponse(resp)

	if resp.StatusCode != http.StatusOK {
		return nil, httpRespToErrorResponse(resp, bucket, "fanout-content")
	}

	dec := json.NewDecoder(resp.Body)
	fanOutResp := make([]PutObjectFanOutResponse, 0, len(fanOutReq.Entries))
	for dec.More() {
		var m PutObjectFanOutResponse
		if err = dec.Decode(&m); err != nil {
			return nil, err
		}
		fanOutResp = append(fanOutResp, m)
	}

	return fanOutResp, nil
}