Commit a7280645 authored by 黄大凯's avatar 黄大凯
Browse files

Added unbounded buffer

parent 937cef97
Loading
Loading
Loading
Loading
+35 −13
Original line number Diff line number Diff line
package rsync

import (
	"bufio"
	"bytes"
	"encoding/binary"
	"fmt"
	"github.com/kaiakz/ubuffer"
	"io"
	"log"
	"net"
@@ -234,7 +236,9 @@ func (conn *SocketConn) RequestFiles(filelist FileList, downloadList []int, osCl
	}

	// Send -1 to finish, then start to download
	binary.Write(conn.RawConn, binary.LittleEndian, INDEX_END)
	if binary.Write(conn.RawConn, binary.LittleEndian, INDEX_END) != nil {
		panic("Can't send INDEX_END")
	}
	log.Println("Request completed")

	startTime := time.Now()
@@ -259,8 +263,12 @@ func Downloader(data chan byte, filelist FileList, osClient IO, prepath string)
		clen := GetInteger(data)      /* checksum length */
		remainder := GetInteger(data) /* block remainder */

		log.Println(path, count, blen, clen, remainder, filelist[index].Size)
		buf := new(bytes.Buffer)
		log.Println("Downloading:", string(path), count, blen, clen, remainder, filelist[index].Size)

		// If the file is too big to store in memory, creates a temporary file in the directory 'tmp'
		buffer := ubuffer.NewBuffer(filelist[index].Size)
		downloadeSize := 0
		bufwriter := bufio.NewWriter(buffer)
		for {
			token := GetInteger(data)
			log.Println("TOKEN", token)
@@ -270,22 +278,35 @@ func Downloader(data chan byte, filelist FileList, osClient IO, prepath string)
				panic("Does not support block checksum")
				// Reference
			} else {
				ctx := make([]byte, token)
				ctx := make([]byte, token)		// FIXME: memory leak?
				GetBytes(data, ctx)
				log.Println("Buff size:", buf.Len())
				buf.Write(ctx)
				downloadeSize += int(token)
				log.Println("Downloaded:", downloadeSize, "byte")
				bufwriter.Write(ctx)
			}
		}

		if bufwriter.Flush() != nil {
			panic("Failed to flush buffer")
		}
		// Put file to object storage
		objectName := string(append(ppath[:], path[:]...))	// prefix + path

		n, err := osClient.Write(objectName, buf, int64(buf.Len()), FileMetadata{
		var (
			n int64
			err error
		)
		n, err = buffer.Seek(0, io.SeekStart)

		n, err = osClient.Write(objectName, buffer, int64(downloadeSize), FileMetadata{
			Mtime: filelist[index].Mtime,
			Mode:  filelist[index].Mode,
		})
		if err != nil {
			log.Fatalln(err)
			panic(err)
		}

		if buffer.Finalize() != nil {
			panic("Buffer can't be finalized")
		}

		log.Printf("Successfully uploaded %s of size %d\n", path, n)
@@ -297,7 +318,7 @@ func Downloader(data chan byte, filelist FileList, osClient IO, prepath string)
		fmt.Println("Remote MD4:", rmd4)

		//lmd4 := md4.New()
		//lmd4.Write(buf.Bytes())
		//lmd4.Write(buffer.Bytes())
		//if bytes.Compare(rmd4, lmd4.Sum(nil)) == 0 {
		//
		//}
@@ -316,9 +337,10 @@ func exchangeBlock() {
func (conn *SocketConn) FinalPhase() {
	go func() {
		ioerror := GetInteger(conn.DemuxIn)
		fmt.Println(ioerror)
		log.Println(ioerror)
	}()

	binary.Write(conn.RawConn, binary.LittleEndian, INDEX_END)
	binary.Write(conn.RawConn, binary.LittleEndian, INDEX_END)
	if binary.Write(conn.RawConn, binary.LittleEndian, INDEX_END) != nil && binary.Write(conn.RawConn, binary.LittleEndian, INDEX_END) != nil {
		panic("Failed to say goodbye")
	}
}