Compare commits
	
		
			2 commits
		
	
	
		
			61818c58a9
			...
			fea9660f80
		
	
	| Author | SHA1 | Date | |
|---|---|---|---|
| fea9660f80 | |||
| 96e61b726f | 
					 4 changed files with 51 additions and 11 deletions
				
			
		| 
						 | 
				
			
			@ -4,5 +4,5 @@
 | 
			
		|||
 | 
			
		||||
- error: { lhs: return, rhs: pure }
 | 
			
		||||
 | 
			
		||||
- ignore: {name: "Avoid lambda using `infix`"}
 | 
			
		||||
- ignore: {name: ["Avoid lambda using `infix`", "Use lambda-case"]}
 | 
			
		||||
          
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -4,6 +4,7 @@ module Hash2Pub.DHTProtocol
 | 
			
		|||
    , addCacheEntry
 | 
			
		||||
    , addCacheEntryPure
 | 
			
		||||
    , deleteCacheEntry
 | 
			
		||||
    , deserialiseMessage
 | 
			
		||||
    , markCacheEntryAsVerified
 | 
			
		||||
    , RemoteCacheEntry(..)
 | 
			
		||||
    , toRemoteCacheEntry
 | 
			
		||||
| 
						 | 
				
			
			@ -19,6 +20,7 @@ module Hash2Pub.DHTProtocol
 | 
			
		|||
    , resolve
 | 
			
		||||
    , mkSendSocket
 | 
			
		||||
    , mkServerSocket
 | 
			
		||||
    , handleIncomingRequest
 | 
			
		||||
    )
 | 
			
		||||
        where
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			@ -31,6 +33,7 @@ import           Control.Monad                  (foldM, forM, forM_)
 | 
			
		|||
import qualified Data.ByteString                as BS
 | 
			
		||||
import           Data.Either                    (rights)
 | 
			
		||||
import           Data.Foldable                  (foldl', foldr')
 | 
			
		||||
import           Data.Functor.Identity
 | 
			
		||||
import           Data.IORef
 | 
			
		||||
import           Data.IP                        (IPv6, fromHostAddress6,
 | 
			
		||||
                                                 toHostAddress6)
 | 
			
		||||
| 
						 | 
				
			
			@ -141,8 +144,33 @@ markCacheEntryAsVerified timestamp = Map.adjust adjustFunc
 | 
			
		|||
        adjustFunc (ProxyEntry _ (Just entry)) = adjustFunc entry
 | 
			
		||||
        adjustFunc entry = entry
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
-- | uses the successor and predecessor list of a node as an indicator for whether a
 | 
			
		||||
-- node has properly joined the DHT
 | 
			
		||||
isJoined_ :: LocalNodeState -> Bool
 | 
			
		||||
isJoined_ ns = not . all null $ [successors ns, predecessors ns]
 | 
			
		||||
 | 
			
		||||
-- ====== message send and receive operations ======
 | 
			
		||||
 | 
			
		||||
handleIncomingRequest :: LocalNodeState                     -- ^ the handling node
 | 
			
		||||
                      -> TQueue (BS.ByteString, SockAddr)   -- ^ send queue
 | 
			
		||||
                      -> FediChordMessage                   -- ^ request to handle
 | 
			
		||||
                      -> SockAddr                           -- ^ source address of the request
 | 
			
		||||
                      -> IO ()
 | 
			
		||||
handleIncomingRequest ns sendQ msg sourceAddr = do
 | 
			
		||||
    -- add nodestate to cache
 | 
			
		||||
    now <- getPOSIXTime
 | 
			
		||||
    queueAddEntries (Identity . RemoteCacheEntry (sender msg) $ now) ns
 | 
			
		||||
    -- distinguish on whether and how to respond
 | 
			
		||||
    -- create and enqueue ACK
 | 
			
		||||
    -- Idea: only respond with payload on last part (part == parts), problem: need to know partnumber of response from first part on
 | 
			
		||||
    -- PLACEHOLDER
 | 
			
		||||
    pure ()
 | 
			
		||||
 | 
			
		||||
-- ....... response sending .......
 | 
			
		||||
 | 
			
		||||
-- ....... request sending .......
 | 
			
		||||
 | 
			
		||||
-- | send a join request and return the joined 'LocalNodeState' including neighbours
 | 
			
		||||
requestJoin :: NodeState a => a             -- ^ currently responsible node to be contacted
 | 
			
		||||
            -> LocalNodeState               -- ^ joining NodeState
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -53,10 +53,12 @@ import qualified Data.Map.Strict               as Map
 | 
			
		|||
import           Data.Maybe                    (fromMaybe, isJust, mapMaybe)
 | 
			
		||||
import qualified Data.Set                      as Set
 | 
			
		||||
import           Data.Time.Clock.POSIX
 | 
			
		||||
import           Network.Socket                hiding (recv, recvFrom, send, sendTo)
 | 
			
		||||
import           Network.Socket                hiding (recv, recvFrom, send,
 | 
			
		||||
                                                sendTo)
 | 
			
		||||
import           Network.Socket.ByteString
 | 
			
		||||
 | 
			
		||||
-- for hashing and ID conversion
 | 
			
		||||
import           Control.Concurrent
 | 
			
		||||
import           Control.Concurrent.Async
 | 
			
		||||
import           Control.Concurrent.STM
 | 
			
		||||
import           Control.Concurrent.STM.TQueue
 | 
			
		||||
| 
						 | 
				
			
			@ -213,12 +215,22 @@ fediMessageHandler :: TQueue (BS.ByteString, SockAddr)  -- ^ send queue
 | 
			
		|||
                   -> IO ()
 | 
			
		||||
fediMessageHandler sendQ recvQ ns = forever $ do
 | 
			
		||||
    -- wait for incoming messages
 | 
			
		||||
--    newMsg <- deserialiseMessage <$> recvFrom sock
 | 
			
		||||
--    either (\_ ->
 | 
			
		||||
--        -- ignore invalid messages
 | 
			
		||||
--        pure ()
 | 
			
		||||
--           )
 | 
			
		||||
--           (\aMsg ->
 | 
			
		||||
--        case aMsg of
 | 
			
		||||
--          aRequest@Request{} -> handleRequest 
 | 
			
		||||
    (rawMsg, sourceAddr) <- atomically $ readTQueue recvQ
 | 
			
		||||
    let aMsg = deserialiseMessage rawMsg
 | 
			
		||||
    -- handling multipart messages:
 | 
			
		||||
    -- So far I handle the effects of each message part immedeiately, before making sure that and whether all parts have been received, based on the idea that even incomplete information is beneficial and handled idempotent.
 | 
			
		||||
    -- If this turns out not to be the case, request parts can be insert into a map (key: (sender IP against spoofing, request ID), value: timestamp + set of message parts, handle all of them when size of set == parts) before being handled. This map needs to be purged periodically by a separate thread and can be protected by an MVar for fairness.
 | 
			
		||||
    either (\_ ->
 | 
			
		||||
        -- drop invalid messages
 | 
			
		||||
        pure ()
 | 
			
		||||
           )
 | 
			
		||||
           (\validMsg ->
 | 
			
		||||
        case validMsg of
 | 
			
		||||
          aRequest@Request{} -> forkIO (handleIncomingRequest ns sendQ aRequest sourceAddr) >> pure ()
 | 
			
		||||
          -- Responses should never arrive on the main server port, as they are always
 | 
			
		||||
          -- responses to requests sent from dedicated sockets on another port
 | 
			
		||||
          _ -> pure ()
 | 
			
		||||
           )
 | 
			
		||||
        aMsg
 | 
			
		||||
 | 
			
		||||
    pure ()
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -4,7 +4,7 @@ import           Control.Concurrent
 | 
			
		|||
import           Control.Concurrent.Async
 | 
			
		||||
import           Control.Exception
 | 
			
		||||
import           Data.Either
 | 
			
		||||
import           Data.IP            (IPv6, toHostAddress6)
 | 
			
		||||
import           Data.IP                  (IPv6, toHostAddress6)
 | 
			
		||||
import           System.Environment
 | 
			
		||||
 | 
			
		||||
import           Hash2Pub.FediChord
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
		Loading…
	
	Add table
		Add a link
		
	
		Reference in a new issue