{-# LANGUAGE OverloadedStrings #-} module Hash2Pub.DHTProtocol ( QueryResponse (..) , queryLocalCache , addCacheEntry , addCacheEntryPure , deleteCacheEntry , markCacheEntryAsVerified , RemoteCacheEntry(..) , toRemoteCacheEntry , remoteNode_ , Action(..) , ActionPayload(..) , FediChordMessage(..) , maximumParts ) where import Data.Maybe (maybe, fromMaybe) import qualified Data.Set as Set import qualified Data.Map as Map import Data.Time.Clock.POSIX import Network.Socket hiding (send, sendTo, recv, recvFrom) import Network.Socket.ByteString import System.Timeout import Control.Monad.State.Strict import Hash2Pub.FediChord ( NodeID , NodeState (..) , getSuccessors , putSuccessors , getPredecessors , putPredecessors , cacheGetNodeStateUnvalidated , NodeCache , CacheEntry(..) , cacheLookup , cacheLookupSucc , cacheLookupPred , localCompare ) import Debug.Trace (trace) -- === queries === data QueryResponse = FORWARD (Set.Set RemoteCacheEntry) -- ^return closest nodes from local cache. -- whole cache entry is returned for making -- the entry time stamp available to the -- protocol serialiser | FOUND NodeState -- ^node is the responsible node for queried ID deriving (Show, Eq) -- TODO: evaluate more fine-grained argument passing to allow granular locking -- | look up an ID to either claim responsibility for it or return the closest l nodes from the local cache queryLocalCache :: NodeState -> NodeCache -> Int -> NodeID -> QueryResponse queryLocalCache ownState nCache lBestNodes targetID -- as target ID falls between own ID and first predecessor, it is handled by this node | (targetID `localCompare` ownID) `elem` [LT, EQ] && not (null preds) && (targetID `localCompare` head preds == GT) = FOUND ownState -- my interpretation: the "l best next hops" are the l-1 closest preceding nodes and -- the closest succeeding node (like with the p initiated parallel queries | otherwise = FORWARD $ closestSuccessor `Set.union` closestPredecessors where preds = fromMaybe [] $ getPredecessors ownState ownID = nid ownState closestSuccessor :: Set.Set RemoteCacheEntry closestSuccessor = maybe Set.empty Set.singleton $ toRemoteCacheEntry =<< cacheLookupSucc targetID nCache closestPredecessors :: Set.Set RemoteCacheEntry closestPredecessors = closestPredecessor (lBestNodes-1) $ nid ownState closestPredecessor :: (Integral n, Show n) => n -> NodeID -> Set.Set RemoteCacheEntry closestPredecessor 0 _ = Set.empty closestPredecessor remainingLookups lastID | remainingLookups < 0 = Set.empty | otherwise = let result = cacheLookupPred lastID nCache in case toRemoteCacheEntry =<< result of Nothing -> Set.empty Just nPred@(RemoteCacheEntry ns ts) -> Set.insert nPred $ closestPredecessor (remainingLookups-1) (nid ns) -- === protocol serialisation data types data Action = QueryID | Join | Leave | Stabilise | Ping deriving (Show, Eq, Enum) data FediChordMessage = Request { requestID :: Integer , sender :: NodeState , parts :: Integer , part :: Integer -- ^ part starts at 0 , action :: Action , payload :: Maybe ActionPayload } | Response { responseTo :: Integer , senderID :: NodeID , parts :: Integer , part :: Integer , action :: Action , payload :: Maybe ActionPayload } deriving (Show, Eq) data ActionPayload = QueryIDRequestPayload { queryTargetID :: NodeID , queryLBestNodes :: Integer } | JoinRequestPayload | LeaveRequestPayload { leaveSuccessors :: [NodeID] , leavePredecessors :: [NodeID] } | StabiliseRequestPayload | PingRequestPayload | QueryIDResponsePayload { queryResult :: QueryResponse } | JoinResponsePayload { joinSuccessors :: [NodeID] , joinPredecessors :: [NodeID] , joinCache :: [RemoteCacheEntry] } | LeaveResponsePayload | StabiliseResponsePayload { stabiliseSuccessors :: [NodeID] , stabilisePredecessors :: [NodeID] } | PingResponsePayload { pingNodeStates :: [NodeState] } deriving (Show, Eq) -- | global limit of parts per message used when (de)serialising messages. -- Used to limit the impact of DOS attempts with partial messages. maximumParts :: Num a => a maximumParts = 150 -- | dedicated data type for cache entries sent to or received from the network, -- as these have to be considered as unvalidated. Also helps with separation of trust. data RemoteCacheEntry = RemoteCacheEntry NodeState POSIXTime deriving (Show, Eq) instance Ord RemoteCacheEntry where (RemoteCacheEntry ns1 _) `compare` (RemoteCacheEntry ns2 _) = nid ns1 `compare` nid ns2 toRemoteCacheEntry :: CacheEntry -> Maybe RemoteCacheEntry toRemoteCacheEntry (NodeEntry _ ns ts) = Just $ RemoteCacheEntry ns ts toRemoteCacheEntry (ProxyEntry _ (Just entry@NodeEntry{})) = toRemoteCacheEntry entry toRemoteCacheEntry _ = Nothing -- helper function for use in tests remoteNode_ :: RemoteCacheEntry -> NodeState remoteNode_ (RemoteCacheEntry ns _) = ns -- cache operations -- | update or insert a 'RemoteCacheEntry' into the cache, -- converting it to a local 'CacheEntry' addCacheEntry :: RemoteCacheEntry -- ^ a remote cache entry received from network -> NodeCache -- ^ node cache to insert to -> IO NodeCache -- ^ new node cache with the element inserted addCacheEntry entry cache = do now <- getPOSIXTime return $ addCacheEntryPure now entry cache -- | pure version of 'addCacheEntry' with current time explicitly specified as argument addCacheEntryPure :: POSIXTime -- ^ current time -> RemoteCacheEntry -- ^ a remote cache entry received from network -> NodeCache -- ^ node cache to insert to -> NodeCache -- ^ new node cache with the element inserted addCacheEntryPure now (RemoteCacheEntry ns ts) cache = let -- TODO: limit diffSeconds to some maximum value to prevent malicious nodes from inserting entries valid nearly until eternity timestamp' = if ts <= now then ts else now newCache = Map.insertWith insertCombineFunction (nid ns) (NodeEntry False ns timestamp') cache insertCombineFunction newVal@(NodeEntry newValidationState newNode newTimestamp) oldVal = case oldVal of ProxyEntry n _ -> ProxyEntry n (Just newVal) NodeEntry oldValidationState _ oldTimestamp -> NodeEntry oldValidationState newNode (max oldTimestamp newTimestamp) in newCache -- | delete the node with given ID from cache deleteCacheEntry :: NodeID -- ^ID of the node to be deleted -> NodeCache -- ^cache to delete from -> NodeCache -- ^cache without the specified element deleteCacheEntry = Map.update modifier where modifier (ProxyEntry idPointer _) = Just (ProxyEntry idPointer Nothing) modifier NodeEntry {} = Nothing -- | Mark a cache entry as verified after pinging it, possibly bumping its timestamp. markCacheEntryAsVerified :: Maybe POSIXTime -- ^ the (current) timestamp to be -- given to the entry, or Nothing -> NodeID -- ^ which node to mark -> NodeCache -- ^ current node cache -> NodeCache -- ^ new NodeCache with the updated entry markCacheEntryAsVerified timestamp = Map.adjust adjustFunc where adjustFunc (NodeEntry _ ns ts) = NodeEntry True ns $ fromMaybe ts timestamp adjustFunc (ProxyEntry _ (Just entry)) = adjustFunc entry adjustFunc entry = entry -- ====== message send and receive operations ====== requestQueryID :: NodeState -> NodeID -> IO NodeState -- 1. do a local lookup for the l closest nodes -- 2. create l sockets -- 3. send a message async concurrently to all l nodes -- 4. collect the results, insert them into cache -- 5. repeat until FOUND (problem: new entries not necessarily already in cache, explicitly compare with closer results) requestQueryID ns targetID = do cacheSnapshot <- readIORef $ getNodeCacheRef ns let localResult = queryLocalCache ns cacheSnapshot (fromMaybe 1 $ getLNumBestNodes ns) targetID -- FOUND can only be returned if targetID is owned by local node case localResult of FOUND thisNode -> return thisNode FORWARD nodeSet -> sockets <- mapM (\resultNode -> mkSendSocket (domain result) (dhtPort resultNode)) $ Set.toList nodeSet -- ToDo: process results immediately instead of waiting for the last one to finish, see https://stackoverflow.com/a/38815224/9198613 responses = mapM sendRequestTo :: Int -- ^ timeout in seconds -> Int -- ^ number of retries -> FediChordMessage -- ^ the message to be sent -> Socket -- ^ connected socket to use for sending -> IO (Set.Set FediChordMessage) -- ^ responses sendRequestTo timeout attempts msg sock = do let requests = serialiseMessage 1200 msg -- ToDo: make attempts and timeout configurable attempts 3 . timeout 5000 $ do where -- state reingeben: state = noch nicht geackte messages, result = responses sendAndAck :: Socket -> StateT (Map.Map Integer BS.ByteString) IO (Set.Set FediChordMessage) sendAndAck sock = do remainingSends <- get sendMany sock $ Map.elems remainingSends -- timeout pro receive socket, danach catMaybes -- wichtig: Pakete können dupliziert werden, dh es können mehr ACKs als gesendete parts ankommen replicateM -- idea: send all parts at once -- Set/ Map with unacked parts -- then recv with timeout for |unackedParts| attempts, receive acked parts from set/ map -- how to manage individual retries? nested "attempts" -- | retry an IO action at most *i* times until it delivers a result attempts :: Int -- ^ number of retries *i* -> IO (Maybe a) -- ^ action to retry -> IO (Maybe a) -- ^ result after at most *i* retries attempts 0 _ = return Nothing attempts i action = do actionResult <- action case actionResult of Nothing -> attempts (i-1) action Just res -> return $ Just res