Skip to content

Commit

Permalink
fixed server
Browse files Browse the repository at this point in the history
  • Loading branch information
lmnzx committed Apr 23, 2024
1 parent 8a96082 commit 450beab
Show file tree
Hide file tree
Showing 5 changed files with 38 additions and 33 deletions.
5 changes: 4 additions & 1 deletion main.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,12 +47,15 @@ func main() {

key := "privatedata_ttt"

data := bytes.NewReader([]byte("big data"))
data := bytes.NewReader([]byte("big data askdjflkasjdfl askdfjlaksjdfklajsdflkajdf"))
s1.Store(key, data)

if err := s1.store.Delete(key); err != nil {
log.Fatal(err)
}
fmt.Println("File deleted from s1")

time.Sleep(10 * time.Second)

r, err := s1.Get(key)
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion p2p/tcp_transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ func (t *TCPTransport) Addr() string {
}

// Dail implements the Transport interface
func (t *TCPTransport) Dail(addr string) error {
func (t *TCPTransport) Dial(addr string) error {
conn, err := net.Dial("tcp", addr)
if err != nil {
return err
Expand Down
2 changes: 1 addition & 1 deletion p2p/transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ type Peer interface {
// Can be TCP, UDP, websockets, ...
type Transport interface {
Addr() string
Dail(string) error
Dial(string) error
ListenAndAccept() error
Consume() <-chan RPC
Close() error
Expand Down
42 changes: 24 additions & 18 deletions server.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,17 +77,13 @@ func (s *FileServer) Get(key string) (io.Reader, error) {
return nil, err
}

time.Sleep(1 * time.Millisecond)
time.Sleep(100 * time.Millisecond)

for _, peer := range s.peers {
var fileSize int64

binary.Read(peer, binary.LittleEndian, &fileSize)

// n, err := s.store.Write(key, io.LimitReader(peer, fileSize))
// if err != nil {
// return nil, err
// }
n, err := s.store.WriteDecypt(s.EncKey, key, io.LimitReader(peer, fileSize))
if err != nil {
return nil, err
Expand All @@ -106,7 +102,8 @@ func (s *FileServer) Get(key string) (io.Reader, error) {
return r, err
}

func (s *FileServer) Remove() {}
// TODO
// func (s *FileServer) Remove() {}

// Store the file to the disk and broadcast to the peers
func (s *FileServer) Store(key string, r io.Reader) error {
Expand All @@ -123,15 +120,15 @@ func (s *FileServer) Store(key string, r io.Reader) error {
msg := Message{
Payload: MessageStoreFile{
Key: key,
Size: size,
Size: size + 16,
},
}

if err := s.broadcast(&msg); err != nil {
return err
}

time.Sleep(1 * time.Millisecond)
time.Sleep(100 * time.Millisecond)

peers := []io.Writer{}
for _, peer := range s.peers {
Expand Down Expand Up @@ -183,7 +180,7 @@ func (s *FileServer) OnPeer(p p2p.Peer) error {

func (s *FileServer) loop() {
defer func() {
log.Println("file server stopped due to user quit action")
log.Println("file server stopped due to error or user quit action")
s.Transport.Close()
}()

Expand All @@ -194,7 +191,6 @@ func (s *FileServer) loop() {
if err := gob.NewDecoder(bytes.NewReader(rpc.Payload)).Decode(&msg); err != nil {
log.Println("decoding error: ", err)
}

if err := s.handleMessage(rpc.From.String(), &msg); err != nil {
log.Println("handle message error: ", err)
}
Expand All @@ -217,34 +213,40 @@ func (s *FileServer) handleMessage(from string, msg *Message) error {

func (s *FileServer) handleMessageGetFile(from string, msg MessageGetFile) error {
if !s.store.Has(msg.Key) {
return fmt.Errorf("file (%s) does not exist on disk\n", msg.Key)
return fmt.Errorf(
"[%s] need to serve file (%s) but it does not exist on disk",
s.Transport.Addr(),
msg.Key,
)
}

fmt.Printf("serving file (%s) over the network\n", msg.Key)
fmt.Printf("[%s] serving file (%s) over the network\n", s.Transport.Addr(), msg.Key)

fileSize, r, err := s.store.Read(msg.Key)
if err != nil {
return err
}

if rc, ok := r.(io.ReadCloser); ok {
fmt.Println("closing readCloser")
defer rc.Close()
}

peer, ok := s.peers[from]
if !ok {
return fmt.Errorf("peer (%s) could not be found in the peer list", from)
return fmt.Errorf("peer %s not in map", from)
}

// First send the "incomingStream" byte to the peer and then we can send
// the file size as an int64.
peer.Send([]byte{p2p.IncomingStream})

binary.Write(peer, binary.LittleEndian, fileSize)

n, err := io.Copy(peer, r)
if err != nil {
return err
}

fmt.Printf("read %d bytes over the network to %s\n", n, from)
fmt.Printf("[%s] written (%d) bytes over the network to %s\n", s.Transport.Addr(), n, from)

return nil
}
Expand All @@ -260,7 +262,7 @@ func (s *FileServer) handleMessageStoreFile(from string, msg MessageStoreFile) e
return err
}

log.Printf("[%s] written (%d) bytes to disk\n", s.Transport.Addr(), n)
fmt.Printf("[%s] written %d bytes to disk\n", s.Transport.Addr(), n)

peer.CloseStream()

Expand All @@ -272,17 +274,21 @@ func (s *FileServer) bootstrapNetwork() error {
if len(addr) == 0 {
continue
}

go func(addr string) {
fmt.Printf("[%s] attemping to connect with remote %s\n", s.Transport.Addr(), addr)
if err := s.Transport.Dail(addr); err != nil {
if err := s.Transport.Dial(addr); err != nil {
log.Println("dial error: ", err)
}
}(addr)
}

return nil
}

func (s *FileServer) Start() error {
fmt.Printf("[%s] starting fileserver...\n", s.Transport.Addr())

if err := s.Transport.ListenAndAccept(); err != nil {
return err
}
Expand Down
20 changes: 8 additions & 12 deletions store.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,15 +135,7 @@ func (s *Store) readStream(key string) (int64, io.ReadCloser, error) {
}

func (s *Store) WriteDecypt(encKey []byte, key string, r io.Reader) (int64, error) {
pathKey := s.PathTransformFunc(key)
pathNameWithRoot := fmt.Sprintf("%s/%s", s.Root, pathKey.Pathname)
if err := os.MkdirAll(pathNameWithRoot, os.ModePerm); err != nil {
return 0, err
}

fullPathWithRoot := fmt.Sprintf("%s/%s", s.Root, pathKey.FullPath())

f, err := os.Create(fullPathWithRoot)
f, err := s.openFileForWriting(key)
if err != nil {
return 0, err
}
Expand All @@ -156,16 +148,20 @@ func (s *Store) WriteDecypt(encKey []byte, key string, r io.Reader) (int64, erro
return int64(n), nil
}

func (s *Store) writeStream(key string, r io.Reader) (int64, error) {
func (s *Store) openFileForWriting(key string) (*os.File, error) {
pathKey := s.PathTransformFunc(key)
pathNameWithRoot := fmt.Sprintf("%s/%s", s.Root, pathKey.Pathname)
if err := os.MkdirAll(pathNameWithRoot, os.ModePerm); err != nil {
return 0, err
return nil, err
}

fullPathWithRoot := fmt.Sprintf("%s/%s", s.Root, pathKey.FullPath())

f, err := os.Create(fullPathWithRoot)
return os.Create(fullPathWithRoot)
}

func (s *Store) writeStream(key string, r io.Reader) (int64, error) {
f, err := s.openFileForWriting(key)
if err != nil {
return 0, err
}
Expand Down

0 comments on commit 450beab

Please sign in to comment.