Loading main.go +59 −37 Original line number Diff line number Diff line Loading @@ -9,6 +9,7 @@ package main import ( "encoding/binary" "flag" "fmt" "github.com/spf13/viper" Loading @@ -17,6 +18,7 @@ import ( "net" "rsync-os/fldb" "rsync-os/rsync" "rsync-os/storage" "sort" "time" ) Loading Loading @@ -45,29 +47,32 @@ func Socket(uri string, dest string) { defer conn.Close() c := &rsync.SocketConn{ Conn: conn, receiver := &rsync.SocketConn{ RawConn: conn, DemuxIn: make(chan byte, 16*1024*1024), } c.HandShake(module, path) if receiver.HandShake(module, path) != nil { log.Println("HandShake Failed") return } // fmt.Println(readInteger(conn)) log.Println("HandShake OK") // Start De-Multiplexing go rsync.DeMuxChan(conn, c.DemuxIn) go rsync.DeMuxChan(conn, receiver.DemuxIn) filelist := make(rsync.FileList, 0, 1024 * 1024) // recv_file_list for { if rsync.GetFileList(c.DemuxIn, &filelist) == io.EOF { if rsync.GetFileList(receiver.DemuxIn, &filelist) == io.EOF { break } } log.Println("File List Received, total size is", len(filelist)) ioerr := rsync.GetInteger(c.DemuxIn) ioerr := rsync.GetInteger(receiver.DemuxIn) log.Println("IOERR", ioerr) // Sort the filelist lexicographically Loading @@ -76,56 +81,73 @@ func Socket(uri string, dest string) { ppath := rsync.TrimPrepath(path) //fldb.Snapshot(filelist[:], module, ppath) if viper.GetStringMapString(dest) == nil { log.Fatalln("Lack of ", dest) log.Println("Lack of ", dest) return } dbconf := viper.GetStringMapString(dest + ".boltdb") cache := fldb.Open(dbconf["path"], []byte(module), []byte(ppath)) if cache == nil { // TODO log.Fatalln("Failed to init cache") log.Println("Failed to init cache") return } // Diff downloadList, deleteList := cache.Diff(filelist[:]) fmt.Println(len(downloadList)) for _, d := range downloadList { fmt.Println(string(filelist[d].Path)) } fmt.Println(len(deleteList)) for _, d := range deleteList { fmt.Println(string(d)) } // Update file list && start downloading log.Println("File List Saved") //c.FinalPhase() // FIXME: Close fldb & network cache.Close() // Init the object storage minioConf := viper.GetStringMapString(dest) log.Println(minioConf) if len(minioConf) == 0 { // test log.Println("Failed to read config about ", dest) return } osClient := storage.NewMinio(module, minioConf["endpoint"], minioConf["keyaccess"], minioConf["keysecret"], false) if osClient == nil { log.Println("object storage failed to init") return } // Init the object storage // For test //minioConf := viper.GetStringMapString("minio") //minioClient, err := minio.New(minioConf["endpoint"], minioConf["keyaccess"], minioConf["keysecret"], false) //if err != nil { // panic("minio Client failed to init") //} if len(downloadList) == 0 && len(deleteList) == 0 { // Send -1 to finish, then start to download if binary.Write(receiver.RawConn, binary.LittleEndian, rsync.INDEX_END) != nil { log.Println("Can't send INDEX_END") return } log.Println("There is nothing to do") } else { // Start downloading if receiver.RequestFiles(filelist[:], downloadList[:], osClient, ppath) != nil { log.Println("Failed to request file") return } // Delete old file if osClient.DeleteAll([]byte(ppath), deleteList[:]) != nil { log.Println("Failed to delete old files") } // Update cache if cache.Update(filelist[:], downloadList[:], deleteList[:]) != nil { log.Println("Failed to Update") } log.Println("Updated cache") } // Generate target file list //rsync.GetFiles(data, conn, &filelist) if receiver.FinalPhase() != nil { log.Println("Failed to say goodbye") } // rsync.RequestFiles(conn, data, &filelist, minioClient, module, path) //go rsync.Downloader(data, &filelist) //fmt.Println(filelist) // TODO: Need to close fldb & network defer cache.Close() return } func main() { //FIXME: Can't handle wrong module/path rsync://mirrors.tuna.tsinghua.edu.cn/linuxmint-packages/pool/romeo/libf/libfm/ loadConfigIfExists() flag.Parse() args := flag.Args() Loading Loading
main.go +59 −37 Original line number Diff line number Diff line Loading @@ -9,6 +9,7 @@ package main import ( "encoding/binary" "flag" "fmt" "github.com/spf13/viper" Loading @@ -17,6 +18,7 @@ import ( "net" "rsync-os/fldb" "rsync-os/rsync" "rsync-os/storage" "sort" "time" ) Loading Loading @@ -45,29 +47,32 @@ func Socket(uri string, dest string) { defer conn.Close() c := &rsync.SocketConn{ Conn: conn, receiver := &rsync.SocketConn{ RawConn: conn, DemuxIn: make(chan byte, 16*1024*1024), } c.HandShake(module, path) if receiver.HandShake(module, path) != nil { log.Println("HandShake Failed") return } // fmt.Println(readInteger(conn)) log.Println("HandShake OK") // Start De-Multiplexing go rsync.DeMuxChan(conn, c.DemuxIn) go rsync.DeMuxChan(conn, receiver.DemuxIn) filelist := make(rsync.FileList, 0, 1024 * 1024) // recv_file_list for { if rsync.GetFileList(c.DemuxIn, &filelist) == io.EOF { if rsync.GetFileList(receiver.DemuxIn, &filelist) == io.EOF { break } } log.Println("File List Received, total size is", len(filelist)) ioerr := rsync.GetInteger(c.DemuxIn) ioerr := rsync.GetInteger(receiver.DemuxIn) log.Println("IOERR", ioerr) // Sort the filelist lexicographically Loading @@ -76,56 +81,73 @@ func Socket(uri string, dest string) { ppath := rsync.TrimPrepath(path) //fldb.Snapshot(filelist[:], module, ppath) if viper.GetStringMapString(dest) == nil { log.Fatalln("Lack of ", dest) log.Println("Lack of ", dest) return } dbconf := viper.GetStringMapString(dest + ".boltdb") cache := fldb.Open(dbconf["path"], []byte(module), []byte(ppath)) if cache == nil { // TODO log.Fatalln("Failed to init cache") log.Println("Failed to init cache") return } // Diff downloadList, deleteList := cache.Diff(filelist[:]) fmt.Println(len(downloadList)) for _, d := range downloadList { fmt.Println(string(filelist[d].Path)) } fmt.Println(len(deleteList)) for _, d := range deleteList { fmt.Println(string(d)) } // Update file list && start downloading log.Println("File List Saved") //c.FinalPhase() // FIXME: Close fldb & network cache.Close() // Init the object storage minioConf := viper.GetStringMapString(dest) log.Println(minioConf) if len(minioConf) == 0 { // test log.Println("Failed to read config about ", dest) return } osClient := storage.NewMinio(module, minioConf["endpoint"], minioConf["keyaccess"], minioConf["keysecret"], false) if osClient == nil { log.Println("object storage failed to init") return } // Init the object storage // For test //minioConf := viper.GetStringMapString("minio") //minioClient, err := minio.New(minioConf["endpoint"], minioConf["keyaccess"], minioConf["keysecret"], false) //if err != nil { // panic("minio Client failed to init") //} if len(downloadList) == 0 && len(deleteList) == 0 { // Send -1 to finish, then start to download if binary.Write(receiver.RawConn, binary.LittleEndian, rsync.INDEX_END) != nil { log.Println("Can't send INDEX_END") return } log.Println("There is nothing to do") } else { // Start downloading if receiver.RequestFiles(filelist[:], downloadList[:], osClient, ppath) != nil { log.Println("Failed to request file") return } // Delete old file if osClient.DeleteAll([]byte(ppath), deleteList[:]) != nil { log.Println("Failed to delete old files") } // Update cache if cache.Update(filelist[:], downloadList[:], deleteList[:]) != nil { log.Println("Failed to Update") } log.Println("Updated cache") } // Generate target file list //rsync.GetFiles(data, conn, &filelist) if receiver.FinalPhase() != nil { log.Println("Failed to say goodbye") } // rsync.RequestFiles(conn, data, &filelist, minioClient, module, path) //go rsync.Downloader(data, &filelist) //fmt.Println(filelist) // TODO: Need to close fldb & network defer cache.Close() return } func main() { //FIXME: Can't handle wrong module/path rsync://mirrors.tuna.tsinghua.edu.cn/linuxmint-packages/pool/romeo/libf/libfm/ loadConfigIfExists() flag.Parse() args := flag.Args() Loading