Loading core/src/storage/hdfs/HDFSIOReader.cpp +14 −3 Original line number Diff line number Diff line Loading @@ -2,6 +2,7 @@ #include "storage/hdfs/HDFSIOReader.h" //#include "HDFSClient.h" namespace milvus{ namespace storage{ Loading @@ -27,12 +28,22 @@ HDFSIOReader::read(void* ptr, int64_t size){ void HDFSIOReader::seekg(int64_t pos){ int seek_pos = hdfsSeek(hdfs_fs, hdfs_file, pos); // if seek_pos == 0 success , -1 error // if seek_pos == 0 success , -1 error, but can't be used because of the return value. } int64_t HDFSIOReader::length(){ hdfsFileInfo *hdfs_file_info = hdfsGetPathInfo(hdfs_fs, name_.c_str()); return hdfs_file_info->mSize; } void HDFSIOReader::close(){ //close file int flag = hdfsCloseFile(hdfs_fs, hdfs_file); //disconnect int status = hdfsDisconnect(hdfs_fs); } }//storage Loading core/src/storage/hdfs/HDFSIOReader.h +3 −1 Original line number Diff line number Diff line Loading @@ -34,6 +34,8 @@ class HDFSIOReader : public IOReader{ int64_t length() override; void close() override; public: Loading @@ -42,7 +44,7 @@ public: hdfsFS hdfs_fs; hdfsFile hdfs_file; } }; } } Loading core/src/storage/hdfs/HDFSIOWriter.cpp +50 −0 Original line number Diff line number Diff line // Copyright (C) 2019-2020 Zilliz. All rights reserved. // // Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance // with the License. You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software distributed under the License // is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express // or implied. See the License for the specific language governing permissions and limitations under the License. #include "HDFSIOWriter.h" namespace milvus{ namespace storage{ bool HDFSIOWriter::open(const std::string& name){ name_ = name; len_ = 0; hdfs_fs = hdfsConnect("default",0); if(hdfs_fs == nullptr) return false; //open file with append mode. hdfs_file = hdfsOpenFile(hdfs_fs, name_.c_str(), O_WRONLY | O_APPEND, 0, 0, 0); return true; } void HDFSIOWriter::write(void *ptr, int64_t size){ tSize nums_write_bytes = hdfsWrite(hdfs_fs, hdfs_file, reinterpret_cast<char*>(ptr), static_cast<tSize>(size)); len_ += size; } int64_t HDFSIOWriter::length(){ return len_; } void HDFSIOWriter::close(){ int flag = hdfsCloseFile(hdfs_fs, hdfs_file); //disconnect int status = hdfsDisconnect(hdfs_fs); } } } No newline at end of file core/src/storage/hdfs/HDFSIOWriter.h +55 −0 Original line number Diff line number Diff line // Copyright (C) 2019-2020 Zilliz. All rights reserved. // // Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance // with the License. You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software distributed under the License // is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express // or implied. See the License for the specific language governing permissions and limitations under the License. #pragma once #include "hdfs.h" #include <memory> #include <string> #include "storage/IOWriter.h" namespace milvus{ namespace storage{ class HDFSIOWriter : public IOWriter{ public: HDFSIOWriter() = default; ~HDFSIOWriter() = default; HDFSIOWriter(const HDFSIOWriter&) = delete; HDFSIOWriter(HDFSIOWriter&&) = delete; HDFSIOWriter& operator=(const HDFSIOWriter&) = delete; HDFSIOWriter& operator=(HDFSIOWriter&&) = delete; bool open(const std::string& name) override; void write(void* ptr, int64_t size) override; int64_t length() override; void close() override; public: int64_t len_; std::string name_; hdfsFS hdfs_fs; hdfsFile hdfs_file; }; }//storage }//milvus No newline at end of file Loading
core/src/storage/hdfs/HDFSIOReader.cpp +14 −3 Original line number Diff line number Diff line Loading @@ -2,6 +2,7 @@ #include "storage/hdfs/HDFSIOReader.h" //#include "HDFSClient.h" namespace milvus{ namespace storage{ Loading @@ -27,12 +28,22 @@ HDFSIOReader::read(void* ptr, int64_t size){ void HDFSIOReader::seekg(int64_t pos){ int seek_pos = hdfsSeek(hdfs_fs, hdfs_file, pos); // if seek_pos == 0 success , -1 error // if seek_pos == 0 success , -1 error, but can't be used because of the return value. } int64_t HDFSIOReader::length(){ hdfsFileInfo *hdfs_file_info = hdfsGetPathInfo(hdfs_fs, name_.c_str()); return hdfs_file_info->mSize; } void HDFSIOReader::close(){ //close file int flag = hdfsCloseFile(hdfs_fs, hdfs_file); //disconnect int status = hdfsDisconnect(hdfs_fs); } }//storage Loading
core/src/storage/hdfs/HDFSIOReader.h +3 −1 Original line number Diff line number Diff line Loading @@ -34,6 +34,8 @@ class HDFSIOReader : public IOReader{ int64_t length() override; void close() override; public: Loading @@ -42,7 +44,7 @@ public: hdfsFS hdfs_fs; hdfsFile hdfs_file; } }; } } Loading
core/src/storage/hdfs/HDFSIOWriter.cpp +50 −0 Original line number Diff line number Diff line // Copyright (C) 2019-2020 Zilliz. All rights reserved. // // Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance // with the License. You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software distributed under the License // is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express // or implied. See the License for the specific language governing permissions and limitations under the License. #include "HDFSIOWriter.h" namespace milvus{ namespace storage{ bool HDFSIOWriter::open(const std::string& name){ name_ = name; len_ = 0; hdfs_fs = hdfsConnect("default",0); if(hdfs_fs == nullptr) return false; //open file with append mode. hdfs_file = hdfsOpenFile(hdfs_fs, name_.c_str(), O_WRONLY | O_APPEND, 0, 0, 0); return true; } void HDFSIOWriter::write(void *ptr, int64_t size){ tSize nums_write_bytes = hdfsWrite(hdfs_fs, hdfs_file, reinterpret_cast<char*>(ptr), static_cast<tSize>(size)); len_ += size; } int64_t HDFSIOWriter::length(){ return len_; } void HDFSIOWriter::close(){ int flag = hdfsCloseFile(hdfs_fs, hdfs_file); //disconnect int status = hdfsDisconnect(hdfs_fs); } } } No newline at end of file
core/src/storage/hdfs/HDFSIOWriter.h +55 −0 Original line number Diff line number Diff line // Copyright (C) 2019-2020 Zilliz. All rights reserved. // // Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance // with the License. You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software distributed under the License // is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express // or implied. See the License for the specific language governing permissions and limitations under the License. #pragma once #include "hdfs.h" #include <memory> #include <string> #include "storage/IOWriter.h" namespace milvus{ namespace storage{ class HDFSIOWriter : public IOWriter{ public: HDFSIOWriter() = default; ~HDFSIOWriter() = default; HDFSIOWriter(const HDFSIOWriter&) = delete; HDFSIOWriter(HDFSIOWriter&&) = delete; HDFSIOWriter& operator=(const HDFSIOWriter&) = delete; HDFSIOWriter& operator=(HDFSIOWriter&&) = delete; bool open(const std::string& name) override; void write(void* ptr, int64_t size) override; int64_t length() override; void close() override; public: int64_t len_; std::string name_; hdfsFS hdfs_fs; hdfsFile hdfs_file; }; }//storage }//milvus No newline at end of file