diff --git a/.hlint.yaml b/.hlint.yaml index dfe9a89..4fa15a6 100644 --- a/.hlint.yaml +++ b/.hlint.yaml @@ -4,5 +4,5 @@ - error: { lhs: return, rhs: pure } -- ignore: {name: ["Avoid lambda using `infix`", "Use lambda-case"]} +- ignore: {name: "Avoid lambda using `infix`"} diff --git a/FediChord.asn1 b/FediChord.asn1 index f278f8f..7c53cb0 100644 --- a/FediChord.asn1 +++ b/FediChord.asn1 @@ -4,16 +4,14 @@ NodeID ::= INTEGER (0..115792089237316195423570985008687907853269984665640564039 Domain ::= VisibleString -Partnum ::= INTEGER (0..150) - Action ::= ENUMERATED {queryID, join, leave, stabilise, ping} Request ::= SEQUENCE { action Action, - requestID INTEGER (0..4294967295), -- arbitrarily restricting to an unsigned 32bit integer + requestID INTEGER, sender NodeState, - part Partnum, -- part number of this message, starts at 1 - finalPart BOOLEAN, -- flag indicating this `part` to be the last of this reuest + parts INTEGER (0..150), -- number of message parts + part INTEGER (0..150), -- part number of this message, starts at 1 actionPayload CHOICE { queryIDRequestPayload QueryIDRequestPayload, joinRequestPayload JoinRequestPayload, @@ -27,11 +25,10 @@ Request ::= SEQUENCE { -- request and response instead of explicit flag Response ::= SEQUENCE { - -- requestID of the request responding to - requestID INTEGER (0..4294967295), -- arbitrarily restricting to an unsigned 32bit integer + responseTo INTEGER, senderID NodeID, - part Partnum, - finalPart BOOLEAN, -- flag indicating this `part` to be the last of this response + parts INTEGER (0..150), + part INTEGER (0..150), action Action, actionPayload CHOICE { queryIDResponsePayload QueryIDResponsePayload, @@ -47,7 +44,7 @@ NodeState ::= SEQUENCE { domain Domain, ipAddr OCTET STRING (SIZE(16)), dhtPort INTEGER, - servicePort INTEGER, + apPort INTEGER, vServerID INTEGER (0..255) } @@ -62,8 +59,8 @@ NodeCache ::= SEQUENCE OF CacheEntry JoinRequestPayload ::= NULL JoinResponsePayload ::= SEQUENCE { - successors SEQUENCE OF NodeState, - predecessors SEQUENCE OF NodeState, + successors SEQUENCE OF NodeID, + predecessors SEQUENCE OF NodeID, cache NodeCache } @@ -82,14 +79,14 @@ QueryIDResponsePayload ::= SEQUENCE { StabiliseRequestPayload ::= NULL StabiliseResponsePayload ::= SEQUENCE { - successors SEQUENCE OF NodeState, - predecessors SEQUENCE OF NodeState + successors SEQUENCE OF NodeID, + predecessors SEQUENCE OF NodeID -- ToDo: transfer of handled key data, if newly responsible for it } LeaveRequestPayload ::= SEQUENCE { - successors SEQUENCE OF NodeState, - predecessors SEQUENCE OF NodeState + successors SEQUENCE OF NodeID, + predecessors SEQUENCE OF NodeID -- ToDo: transfer of own data to newly responsible node } diff --git a/Hash2Pub.cabal b/Hash2Pub.cabal index d1ee4b1..4906c08 100644 --- a/Hash2Pub.cabal +++ b/Hash2Pub.cabal @@ -46,7 +46,7 @@ category: Network extra-source-files: CHANGELOG.md common deps - build-depends: base ^>=4.12.0.0, containers ^>=0.6.0.1, bytestring, utf8-string ^>=1.0.1.1, network ^>=2.8.0.1, time ^>=1.8.0.2, cmdargs ^>= 0.10, cryptonite ^>= 0.25, memory, async, stm, asn1-encoding, asn1-types, asn1-parse, publicsuffix, network-byte-order, safe, iproute, mtl, random + build-depends: base ^>=4.12.0.0, containers ^>=0.6.0.1, bytestring, utf8-string ^>=1.0.1.1, network ^>=2.8.0.1, time ^>=1.8.0.2, cmdargs ^>= 0.10, cryptonite ^>= 0.25, memory, async, stm, asn1-encoding, asn1-types, asn1-parse, publicsuffix, network-byte-order, safe, iproute ghc-options: -Wall @@ -55,7 +55,7 @@ library import: deps -- Modules exported by the library. - exposed-modules: Hash2Pub.FediChord, Hash2Pub.FediChordTypes, Hash2Pub.DHTProtocol, Hash2Pub.ASN1Coding, Hash2Pub.ProtocolTypes + exposed-modules: Hash2Pub.FediChord, Hash2Pub.DHTProtocol, Hash2Pub.ASN1Coding -- Modules included in this library but not exported. other-modules: Hash2Pub.Utils diff --git a/app/Main.hs b/app/Main.hs index cdfc2b3..1956f64 100644 --- a/app/Main.hs +++ b/app/Main.hs @@ -1,12 +1,7 @@ module Main where import Control.Concurrent -import Control.Concurrent.Async -import Control.Concurrent.STM -import Control.Concurrent.STM.TVar -import Control.Exception -import Data.Either -import Data.IP (IPv6, toHostAddress6) +import Data.IP (IPv6, toHostAddress6) import System.Environment import Hash2Pub.FediChord @@ -16,39 +11,22 @@ main = do -- ToDo: parse and pass config -- probably use `tomland` for that conf <- readConfig - -- TODO: first initialise 'RealNode', then the vservers -- ToDo: load persisted caches, bootstrapping nodes … (serverSock, thisNode) <- fediChordInit conf + print thisNode + print serverSock -- currently no masking is necessary, as there is nothing to clean up cacheWriterThread <- forkIO $ cacheWriter thisNode - -- try joining the DHT using one of the provided bootstrapping nodes - joinedState <- tryBootstrapJoining thisNode - either (\err -> do - -- handle unsuccessful join - - putStrLn $ err <> " Error joining, start listening for incoming requests anyways" - print =<< readTVarIO thisNode - -- launch thread attempting to join on new cache entries - _ <- forkIO $ joinOnNewEntriesThread thisNode - wait =<< async (fediMainThreads serverSock thisNode) - ) - (\joinedNS -> do - -- launch main eventloop with successfully joined state - putStrLn "successful join" - wait =<< async (fediMainThreads serverSock thisNode) - ) - joinedState + -- idea: list of bootstrapping nodes, try joining within a timeout + -- stop main thread from terminating during development + getChar pure () - readConfig :: IO FediChordConf readConfig = do - confDomainString : ipString : portString : bootstrapHost : bootstrapPortString : _ <- getArgs + confDomainString : ipString : portString : _ <- getArgs pure $ FediChordConf { confDomain = confDomainString , confIP = toHostAddress6 . read $ ipString , confDhtPort = read portString - , confBootstrapNodes = [(bootstrapHost, read bootstrapPortString)] - --, confStabiliseInterval = 60 - , confBootstrapSamplingInterval = 180 } diff --git a/src/Hash2Pub/ASN1Coding.hs b/src/Hash2Pub/ASN1Coding.hs index 456dac6..b2e7b64 100644 --- a/src/Hash2Pub/ASN1Coding.hs +++ b/src/Hash2Pub/ASN1Coding.hs @@ -16,8 +16,8 @@ import qualified Data.Set as Set import Data.Time.Clock.POSIX () import Safe -import Hash2Pub.FediChordTypes -import Hash2Pub.ProtocolTypes +import Hash2Pub.DHTProtocol +import Hash2Pub.FediChord import Hash2Pub.Utils import Debug.Trace @@ -77,9 +77,6 @@ chunkLength numParts totalSize = ceiling $ (realToFrac totalSize :: Double) / re -- The number of parts per message is limited to 150 for DOS protection reasons. -- The returned byte strings might exceed the desired maximum length, as only the payload (and not all of them) -- can be split into multiple parts. --- --- The return type is a Map from part number to encoded part, to be able to acknowledge --- an encoded part without having to decode its number. serialiseMessage :: Int -- maximum message size in bytes -> FediChordMessage -- mesage to be serialised in preparation for sending -> Map.Map Integer BS.ByteString -- list of ASN.1 DER encoded messages together representing @@ -103,11 +100,11 @@ serialiseMessage maxBytesLength msg = modifyMessage i (partNum, pl) pls = (partNum, msg { part = partNum , payload = Just pl - , isFinalPart = partNum == fromIntegral i + , parts = fromIntegral i }):pls -- part starts at 1 payloadParts :: Int -> Maybe [(Integer, ActionPayload)] - payloadParts i = zip [(part msg)..] . splitPayload i <$> actionPayload + payloadParts i = zip [1..] . splitPayload i <$> actionPayload actionPayload = payload msg encodedMsgs i = Map.map encodeMsg $ messageParts i maxMsgLength = maximum . fmap BS.length . Map.elems @@ -130,20 +127,20 @@ encodePayload LeaveResponsePayload = [Null] encodePayload payload'@LeaveRequestPayload{} = Start Sequence : Start Sequence - : concatMap encodeNodeState (leaveSuccessors payload') + : fmap (IntVal . getNodeID) (leaveSuccessors payload') <> [End Sequence , Start Sequence] - <> concatMap encodeNodeState (leavePredecessors payload') + <> fmap (IntVal . getNodeID) (leavePredecessors payload') <> [End Sequence , End Sequence] -- currently StabiliseResponsePayload and LeaveRequestPayload are equal encodePayload payload'@StabiliseResponsePayload{} = Start Sequence : Start Sequence - : concatMap encodeNodeState (stabiliseSuccessors payload') + : fmap (IntVal . getNodeID) (stabiliseSuccessors payload') <> [End Sequence , Start Sequence] - <> concatMap encodeNodeState (stabilisePredecessors payload') + <> fmap (IntVal . getNodeID) (stabilisePredecessors payload') <> [End Sequence , End Sequence] encodePayload payload'@StabiliseRequestPayload = [Null] @@ -170,10 +167,10 @@ encodePayload payload'@QueryIDRequestPayload{} = [ encodePayload payload'@JoinResponsePayload{} = Start Sequence : Start Sequence - : concatMap encodeNodeState (joinSuccessors payload') + : fmap (IntVal . getNodeID) (joinSuccessors payload') <> [End Sequence , Start Sequence] - <> concatMap encodeNodeState (joinPredecessors payload') + <> fmap (IntVal . getNodeID) (joinPredecessors payload') <> [End Sequence , Start Sequence] <> concatMap encodeCacheEntry (joinCache payload') @@ -186,15 +183,15 @@ encodePayload payload'@PingResponsePayload{} = : concatMap encodeNodeState (pingNodeStates payload') <> [End Sequence] -encodeNodeState :: NodeState a => a -> [ASN1] +encodeNodeState :: NodeState -> [ASN1] encodeNodeState ns = [ Start Sequence - , IntVal (getNodeID . getNid $ ns) - , ASN1String . asn1CharacterString Visible $ getDomain ns - , OctetString (ipAddrAsBS $ getIpAddr ns) - , IntVal (toInteger . getDhtPort $ ns) - , IntVal (toInteger . getServicePort $ ns) - , IntVal (getVServerID ns) + , IntVal (getNodeID . nid $ ns) + , ASN1String . asn1CharacterString Visible $ domain ns + , OctetString (ipAddrAsBS $ ipAddr ns) + , IntVal (toInteger . dhtPort $ ns) + , IntVal (maybe 0 toInteger $ apPort ns) + , IntVal (vServerID ns) , End Sequence ] @@ -216,22 +213,23 @@ encodeQueryResult FORWARD{} = Enumerated 1 encodeMessage :: FediChordMessage -- ^ the 'FediChordMessage to be encoded -> [ASN1] encodeMessage - (Request requestID sender part isFinalPart action requestPayload) = + (Request requestID sender parts part action requestPayload) = Start Sequence : (Enumerated . fromIntegral . fromEnum $ action) : IntVal requestID : encodeNodeState sender - <> [IntVal part - , Boolean isFinalPart] + <> [ + IntVal parts + , IntVal part ] <> maybe [] encodePayload requestPayload <> [End Sequence] encodeMessage - (Response requestID senderID part isFinalPart action responsePayload) = [ + (Response responseTo senderID parts part action responsePayload) = [ Start Sequence - , IntVal requestID + , IntVal responseTo , IntVal . getNodeID $ senderID + , IntVal parts , IntVal part - , Boolean isFinalPart , Enumerated . fromIntegral . fromEnum $ action] <> maybe [] encodePayload responsePayload <> [End Sequence] @@ -264,8 +262,8 @@ parseRequest :: Action -> ParseASN1 FediChordMessage parseRequest action = do requestID <- parseInteger sender <- parseNodeState + parts <- parseInteger part <- parseInteger - isFinalPart <- parseBool hasPayload <- hasNext payload <- if not hasPayload then pure Nothing else Just <$> case action of QueryID -> parseQueryIDRequest @@ -274,13 +272,13 @@ parseRequest action = do Stabilise -> parseStabiliseRequest Ping -> parsePingRequest - pure $ Request requestID sender part isFinalPart action payload + pure $ Request requestID sender parts part action payload parseResponse :: Integer -> ParseASN1 FediChordMessage -parseResponse requestID = do +parseResponse responseTo = do senderID <- fromInteger <$> parseInteger :: ParseASN1 NodeID + parts <- parseInteger part <- parseInteger - isFinalPart <- parseBool action <- parseEnum :: ParseASN1 Action hasPayload <- hasNext payload <- if not hasPayload then pure Nothing else Just <$> case action of @@ -290,14 +288,7 @@ parseResponse requestID = do Stabilise -> parseStabiliseResponse Ping -> parsePingResponse - pure $ Response requestID senderID part isFinalPart action payload - -parseBool :: ParseASN1 Bool -parseBool = do - i <- getNext - case i of - Boolean parsed -> pure parsed - x -> throwParseError $ "Expected Boolean but got " <> show x + pure $ Response responseTo senderID parts part action payload parseInteger :: ParseASN1 Integer parseInteger = do @@ -334,21 +325,22 @@ parseNull = do Null -> pure () x -> throwParseError $ "Expected Null but got " <> show x -parseNodeState :: ParseASN1 RemoteNodeState +parseNodeState :: ParseASN1 NodeState parseNodeState = onNextContainer Sequence $ do nid' <- fromInteger <$> parseInteger domain' <- parseString ip' <- bsAsIpAddr <$> parseOctets dhtPort' <- fromInteger <$> parseInteger - servicePort' <- fromInteger <$> parseInteger + apPort' <- fromInteger <$> parseInteger vServer' <- parseInteger - pure RemoteNodeState { + pure NodeState { nid = nid' , domain = domain' , dhtPort = dhtPort' - , servicePort = servicePort' + , apPort = if apPort' == 0 then Nothing else Just apPort' , vServerID = vServer' , ipAddr = ip' + , internals = Nothing } @@ -368,8 +360,8 @@ parseJoinRequest = do parseJoinResponse :: ParseASN1 ActionPayload parseJoinResponse = onNextContainer Sequence $ do - succ' <- onNextContainer Sequence (getMany parseNodeState) - pred' <- onNextContainer Sequence (getMany parseNodeState) + succ' <- fmap fromInteger <$> onNextContainer Sequence (getMany parseInteger) + pred' <- fmap fromInteger <$> onNextContainer Sequence (getMany parseInteger) cache <- parseNodeCache pure $ JoinResponsePayload { joinSuccessors = succ' @@ -404,8 +396,8 @@ parseStabiliseRequest = do parseStabiliseResponse :: ParseASN1 ActionPayload parseStabiliseResponse = onNextContainer Sequence $ do - succ' <- onNextContainer Sequence (getMany parseNodeState) - pred' <- onNextContainer Sequence (getMany parseNodeState) + succ' <- fmap fromInteger <$> onNextContainer Sequence (getMany parseInteger) + pred' <- fmap fromInteger <$> onNextContainer Sequence (getMany parseInteger) pure $ StabiliseResponsePayload { stabiliseSuccessors = succ' , stabilisePredecessors = pred' @@ -413,8 +405,8 @@ parseStabiliseResponse = onNextContainer Sequence $ do parseLeaveRequest :: ParseASN1 ActionPayload parseLeaveRequest = onNextContainer Sequence $ do - succ' <- onNextContainer Sequence (getMany parseNodeState) - pred' <- onNextContainer Sequence (getMany parseNodeState) + succ' <- fmap fromInteger <$> onNextContainer Sequence (getMany parseInteger) + pred' <- fmap fromInteger <$> onNextContainer Sequence (getMany parseInteger) pure $ LeaveRequestPayload { leaveSuccessors = succ' , leavePredecessors = pred' diff --git a/src/Hash2Pub/DHTProtocol.hs b/src/Hash2Pub/DHTProtocol.hs index a2dd676..d83a4bc 100644 --- a/src/Hash2Pub/DHTProtocol.hs +++ b/src/Hash2Pub/DHTProtocol.hs @@ -3,146 +3,153 @@ module Hash2Pub.DHTProtocol , queryLocalCache , addCacheEntry , addCacheEntryPure - , addNodeAsVerified - , addNodeAsVerifiedPure , deleteCacheEntry - , deserialiseMessage + , markCacheEntryAsVerified , RemoteCacheEntry(..) , toRemoteCacheEntry - , remoteNode + , remoteNode_ , Action(..) , ActionPayload(..) , FediChordMessage(..) , maximumParts - , sendQueryIdMessages - , requestQueryID - , requestJoin - , requestPing - , requestStabilise - , lookupMessage - , sendRequestTo - , queryIdLookupLoop - , queueAddEntries - , queueDeleteEntries - , queueDeleteEntry - , resolve - , mkSendSocket - , mkServerSocket - , handleIncomingRequest - , ackRequest - , isPossibleSuccessor - , isPossiblePredecessor - , isJoined - , closestCachePredecessors ) where -import Control.Concurrent.Async -import Control.Concurrent.STM -import Control.Concurrent.STM.TBQueue -import Control.Concurrent.STM.TQueue -import Control.Concurrent.STM.TVar -import Control.Exception -import Control.Monad (foldM, forM, forM_) -import qualified Data.ByteString as BS -import Data.Either (rights) -import Data.Foldable (foldl', foldr') -import Data.Functor.Identity -import Data.IP (IPv6, fromHostAddress6, - toHostAddress6) -import Data.List (delete, nub, sortBy) -import qualified Data.Map as Map -import Data.Maybe (fromJust, fromMaybe, isJust, - isNothing, mapMaybe, maybe) -import qualified Data.Set as Set +import qualified Data.Map as Map +import Data.Maybe (fromMaybe, maybe) +import qualified Data.Set as Set import Data.Time.Clock.POSIX -import Network.Socket hiding (recv, recvFrom, send, - sendTo) +import Network.Socket hiding (recv, recvFrom, send, sendTo) import Network.Socket.ByteString -import Safe -import System.Random -import System.Timeout -import Hash2Pub.ASN1Coding -import Hash2Pub.FediChordTypes (CacheEntry (..), - CacheEntry (..), HasKeyID (..), - LocalNodeState (..), - LocalNodeStateSTM, NodeCache, - NodeID, NodeState (..), - RemoteNodeState (..), - RingEntry (..), RingMap (..), - addRMapEntry, addRMapEntryWith, - cacheGetNodeStateUnvalidated, - cacheLookup, cacheLookupPred, - cacheLookupSucc, genNodeID, - getKeyID, localCompare, - rMapFromList, rMapLookupPred, - rMapLookupSucc, - setPredecessors, setSuccessors) -import Hash2Pub.ProtocolTypes +import Hash2Pub.FediChord (CacheEntry (..), NodeCache, NodeID, + NodeState (..), + cacheGetNodeStateUnvalidated, + cacheLookup, cacheLookupPred, + cacheLookupSucc, getPredecessors, + getSuccessors, localCompare, + putPredecessors, putSuccessors) -import Debug.Trace (trace) +import Debug.Trace (trace) -- === queries === +data QueryResponse = FORWARD (Set.Set RemoteCacheEntry) -- ^ return closest nodes from local cache. + -- whole cache entry is returned for making + -- the entry time stamp available to the + -- protocol serialiser + | FOUND NodeState -- ^node is the responsible node for queried ID + deriving (Show, Eq) + -- TODO: evaluate more fine-grained argument passing to allow granular locking -- | look up an ID to either claim responsibility for it or return the closest l nodes from the local cache -queryLocalCache :: LocalNodeState -> NodeCache -> Int -> NodeID -> QueryResponse +queryLocalCache :: NodeState -> NodeCache -> Int -> NodeID -> QueryResponse queryLocalCache ownState nCache lBestNodes targetID -- as target ID falls between own ID and first predecessor, it is handled by this node - -- This only makes sense if the node is part of the DHT by having joined. - -- A default answer to nodes querying an unjoined node is provided by 'respondQueryID'. - | isJoined ownState && targetID `isInOwnResponsibilitySlice` ownState = FOUND . toRemoteNodeState $ ownState + | (targetID `localCompare` ownID) `elem` [LT, EQ] && not (null preds) && (targetID `localCompare` head preds == GT) = FOUND ownState -- my interpretation: the "l best next hops" are the l-1 closest preceding nodes and -- the closest succeeding node (like with the p initiated parallel queries - | otherwise = FORWARD $ closestSuccessor `Set.union` closestCachePredecessors (lBestNodes-1) targetID nCache + | otherwise = FORWARD $ closestSuccessor `Set.union` closestPredecessors where - ownID = getNid ownState - preds = predecessors ownState + preds = fromMaybe [] $ getPredecessors ownState + ownID = nid ownState closestSuccessor :: Set.Set RemoteCacheEntry - closestSuccessor = maybe Set.empty (Set.singleton . toRemoteCacheEntry) $ cacheLookupSucc targetID nCache + closestSuccessor = maybe Set.empty Set.singleton $ toRemoteCacheEntry =<< cacheLookupSucc targetID nCache + closestPredecessors :: Set.Set RemoteCacheEntry + closestPredecessors = closestPredecessor (lBestNodes-1) $ nid ownState + closestPredecessor :: (Integral n, Show n) => n -> NodeID -> Set.Set RemoteCacheEntry + closestPredecessor 0 _ = Set.empty + closestPredecessor remainingLookups lastID + | remainingLookups < 0 = Set.empty + | otherwise = + let result = cacheLookupPred lastID nCache + in + case toRemoteCacheEntry =<< result of + Nothing -> Set.empty + Just nPred@(RemoteCacheEntry ns ts) -> Set.insert nPred $ closestPredecessor (remainingLookups-1) (nid ns) --- | look up the 3 direct predecessor cache entries of a given ID -closestCachePredecessors :: (Integral n) - => n -- ^ number of entries to look up - -> NodeID -- ^ target ID to get the predecessors of - -> NodeCache -- ^ cache to use for lookup - -> Set.Set RemoteCacheEntry -closestCachePredecessors 0 _ _ = Set.empty -closestCachePredecessors remainingLookups lastID nCache - | remainingLookups < 0 = Set.empty - | otherwise = - let result = cacheLookupPred lastID nCache - in - case toRemoteCacheEntry <$> result of - Nothing -> Set.empty - Just nPred@(RemoteCacheEntry ns ts) -> Set.insert nPred $ closestCachePredecessors (remainingLookups-1) (nid ns) nCache +-- === protocol serialisation data types --- | Determines whether a lookup key is within the responsibility slice of a node, --- as it falls between its first predecessor and the node itself. --- Looks up the successor of the lookup key on a 'RingMap' representation of the --- predecessor list with the node itself added. If the result is the same as the node --- itself then it falls into the responsibility interval. -isInOwnResponsibilitySlice :: HasKeyID a => a -> LocalNodeState -> Bool -isInOwnResponsibilitySlice lookupTarget ownNs = (getKeyID <$> rMapLookupSucc (getKeyID lookupTarget) predecessorRMap) == pure (getNid ownNs) - where - predecessorList = predecessors ownNs - -- add node itself to RingMap representation, to distinguish between - -- responsibility of own node and predecessor - predecessorRMap = addRMapEntry (toRemoteNodeState ownNs) $ rMapFromList predecessorList - closestPredecessor = headMay predecessorList +data Action = + QueryID + | Join + | Leave + | Stabilise + | Ping + deriving (Show, Eq, Enum) -isPossiblePredecessor :: HasKeyID a => a -> LocalNodeState -> Bool -isPossiblePredecessor = isInOwnResponsibilitySlice +data FediChordMessage = + Request { + requestID :: Integer + , sender :: NodeState + , parts :: Integer + , part :: Integer + -- ^ part starts at 0 + , action :: Action + , payload :: Maybe ActionPayload + } + | Response { + responseTo :: Integer + , senderID :: NodeID + , parts :: Integer + , part :: Integer + , action :: Action + , payload :: Maybe ActionPayload + } deriving (Show, Eq) -isPossibleSuccessor :: HasKeyID a => a -> LocalNodeState -> Bool -isPossibleSuccessor lookupTarget ownNs = (getKeyID <$> rMapLookupPred (getKeyID lookupTarget) successorRMap) == pure (getNid ownNs) - where - successorList = successors ownNs - successorRMap = addRMapEntry (toRemoteNodeState ownNs) $ rMapFromList successorList - closestSuccessor = headMay successorList +data ActionPayload = + QueryIDRequestPayload { + queryTargetID :: NodeID + , queryLBestNodes :: Integer + } + | JoinRequestPayload + | LeaveRequestPayload { + leaveSuccessors :: [NodeID] + , leavePredecessors :: [NodeID] + } + | StabiliseRequestPayload + | PingRequestPayload + | QueryIDResponsePayload { + queryResult :: QueryResponse + } + | JoinResponsePayload { + joinSuccessors :: [NodeID] + , joinPredecessors :: [NodeID] + , joinCache :: [RemoteCacheEntry] + } + | LeaveResponsePayload + | StabiliseResponsePayload { + stabiliseSuccessors :: [NodeID] + , stabilisePredecessors :: [NodeID] + } + | PingResponsePayload { + pingNodeStates :: [NodeState] + } + deriving (Show, Eq) + +-- | global limit of parts per message used when (de)serialising messages. +-- Used to limit the impact of DOS attempts with partial messages. +maximumParts :: Num a => a +maximumParts = 150 + +-- | dedicated data type for cache entries sent to or received from the network, +-- as these have to be considered as unvalidated. Also helps with separation of trust. +data RemoteCacheEntry = RemoteCacheEntry NodeState POSIXTime + deriving (Show, Eq) + +instance Ord RemoteCacheEntry where + (RemoteCacheEntry ns1 _) `compare` (RemoteCacheEntry ns2 _) = nid ns1 `compare` nid ns2 + +toRemoteCacheEntry :: CacheEntry -> Maybe RemoteCacheEntry +toRemoteCacheEntry (NodeEntry _ ns ts) = Just $ RemoteCacheEntry ns ts +toRemoteCacheEntry (ProxyEntry _ (Just entry@NodeEntry{})) = toRemoteCacheEntry entry +toRemoteCacheEntry _ = Nothing + +-- helper function for use in tests +remoteNode_ :: RemoteCacheEntry -> NodeState +remoteNode_ (RemoteCacheEntry ns _) = ns -- cache operations @@ -157,18 +164,18 @@ addCacheEntry entry cache = do -- | pure version of 'addCacheEntry' with current time explicitly specified as argument addCacheEntryPure :: POSIXTime -- ^ current time - -> RemoteCacheEntry -- ^ a remote cache entry received from network - -> NodeCache -- ^ node cache to insert to - -> NodeCache -- ^ new node cache with the element inserted + -> RemoteCacheEntry -- ^ a remote cache entry received from network + -> NodeCache -- ^ node cache to insert to + -> NodeCache -- ^ new node cache with the element inserted addCacheEntryPure now (RemoteCacheEntry ns ts) cache = let -- TODO: limit diffSeconds to some maximum value to prevent malicious nodes from inserting entries valid nearly until eternity timestamp' = if ts <= now then ts else now - newCache = addRMapEntryWith insertCombineFunction (CacheEntry False ns timestamp') cache - insertCombineFunction newVal@(KeyEntry (CacheEntry newValidationState newNode newTimestamp)) oldVal = + newCache = Map.insertWith insertCombineFunction (nid ns) (NodeEntry False ns timestamp') cache + insertCombineFunction newVal@(NodeEntry newValidationState newNode newTimestamp) oldVal = case oldVal of ProxyEntry n _ -> ProxyEntry n (Just newVal) - KeyEntry (CacheEntry oldValidationState _ oldTimestamp) -> KeyEntry (CacheEntry oldValidationState newNode (max oldTimestamp newTimestamp)) + NodeEntry oldValidationState _ oldTimestamp -> NodeEntry oldValidationState newNode (max oldTimestamp newTimestamp) in newCache @@ -176,30 +183,10 @@ addCacheEntryPure now (RemoteCacheEntry ns ts) cache = deleteCacheEntry :: NodeID -- ^ID of the node to be deleted -> NodeCache -- ^cache to delete from -> NodeCache -- ^cache without the specified element -deleteCacheEntry nid = RingMap . Map.update modifier nid . getRingMap +deleteCacheEntry = Map.update modifier where modifier (ProxyEntry idPointer _) = Just (ProxyEntry idPointer Nothing) - modifier KeyEntry {} = Nothing - - --- | Add a 'RemoteNodeState' to the node cache marked as verified. --- If an entry already exists, it is replaced by the new verified one. -addNodeAsVerified :: RemoteNodeState - -> NodeCache - -> IO NodeCache -addNodeAsVerified node cache = do - now <- getPOSIXTime - pure $ addNodeAsVerifiedPure now node cache - - --- | Pure variant of 'addNodeAsVerified' with current time explicitly specified as an argument -addNodeAsVerifiedPure :: POSIXTime - -> RemoteNodeState - -> NodeCache - -> NodeCache -addNodeAsVerifiedPure now node = addRMapEntry (CacheEntry True node now) - - + modifier NodeEntry {} = Nothing -- | Mark a cache entry as verified after pinging it, possibly bumping its timestamp. markCacheEntryAsVerified :: Maybe POSIXTime -- ^ the (current) timestamp to be @@ -207,529 +194,12 @@ markCacheEntryAsVerified :: Maybe POSIXTime -- ^ the (current) timestamp to -> NodeID -- ^ which node to mark -> NodeCache -- ^ current node cache -> NodeCache -- ^ new NodeCache with the updated entry -markCacheEntryAsVerified timestamp nid = RingMap . Map.adjust adjustFunc nid . getRingMap +markCacheEntryAsVerified timestamp = Map.adjust adjustFunc where - adjustFunc (KeyEntry (CacheEntry _ ns ts)) = KeyEntry (CacheEntry True ns $ fromMaybe ts timestamp) + adjustFunc (NodeEntry _ ns ts) = NodeEntry True ns $ fromMaybe ts timestamp adjustFunc (ProxyEntry _ (Just entry)) = adjustFunc entry adjustFunc entry = entry - --- | uses the successor and predecessor list of a node as an indicator for whether a --- node has properly joined the DHT -isJoined :: LocalNodeState -> Bool -isJoined ns = not . all null $ [successors ns, predecessors ns] - --- | the size limit to be used when serialising messages for sending -sendMessageSize :: Num i => i -sendMessageSize = 1200 - --- ====== message send and receive operations ====== - --- encode the response to a request that just signals successful receipt -ackRequest :: NodeID -> FediChordMessage -> Map.Map Integer BS.ByteString -ackRequest ownID req@Request{} = serialiseMessage sendMessageSize $ Response { - requestID = requestID req - , senderID = ownID - , part = part req - , isFinalPart = False - , action = action req - , payload = Nothing - } -ackRequest _ _ = Map.empty - - --- | Dispatch incoming requests to the dedicated handling and response function, and enqueue --- the response to be sent. -handleIncomingRequest :: LocalNodeStateSTM -- ^ the handling node - -> TQueue (BS.ByteString, SockAddr) -- ^ send queue - -> Set.Set FediChordMessage -- ^ all parts of the request to handle - -> SockAddr -- ^ source address of the request - -> IO () -handleIncomingRequest nsSTM sendQ msgSet sourceAddr = do - putStrLn $ "handling incoming request: " <> show msgSet - ns <- readTVarIO nsSTM - -- add nodestate to cache - now <- getPOSIXTime - case headMay . Set.elems $ msgSet of - Nothing -> pure () - Just aPart -> do - queueAddEntries (Identity $ RemoteCacheEntry (sender aPart) now) ns - -- distinguish on whether and how to respond. If responding, pass message to response generating function and write responses to send queue - maybe (pure ()) ( - mapM_ (\resp -> atomically $ writeTQueue sendQ (resp, sourceAddr)) - ) - =<< (case action aPart of - Ping -> Just <$> respondPing nsSTM msgSet - Join -> Just <$> respondJoin nsSTM msgSet - -- ToDo: figure out what happens if not joined - QueryID -> Just <$> respondQueryID nsSTM msgSet - -- only when joined - Leave -> if isJoined ns then Just <$> respondLeave nsSTM msgSet else pure Nothing - Stabilise -> if isJoined ns then Just <$> respondStabilise nsSTM msgSet else pure Nothing - ) - -- for single part request, response starts with part number 1. For multipart requests, response starts with part number n+1. - - -- TODO: determine request type only from first part, but catch RecSelError on each record access when folding, because otherwise different request type parts can make this crash - -- TODO: test case: mixed message types of parts - - --- ....... response sending ....... - --- TODO: could all these respond* functions be in STM instead of IO? - - --- | execute a key ID lookup on local cache and respond with the result -respondQueryID :: LocalNodeStateSTM -> Set.Set FediChordMessage -> IO (Map.Map Integer BS.ByteString) -respondQueryID nsSTM msgSet = do - putStrLn "responding to a QueryID request" - -- this message cannot be split reasonably, so just - -- consider the first payload - let - aRequestPart = Set.elemAt 0 msgSet - senderID = getNid . sender $ aRequestPart - senderPayload = foldr' (\msg plAcc -> - if isNothing plAcc && isJust (payload msg) - then payload msg - else plAcc - ) Nothing msgSet - -- return only empty message serialisation if no payload was included in parts - maybe (pure Map.empty) (\senderPayload' -> do - responseMsg <- atomically $ do - nsSnap <- readTVar nsSTM - cache <- readTVar $ nodeCacheSTM nsSnap - let - responsePayload = QueryIDResponsePayload { - queryResult = if isJoined nsSnap - then queryLocalCache nsSnap cache (fromIntegral $ queryLBestNodes senderPayload') (queryTargetID senderPayload') - -- if not joined yet, attract responsibility for - -- all keys to make bootstrapping possible - else FOUND (toRemoteNodeState nsSnap) - } - queryResponseMsg = Response { - requestID = requestID aRequestPart - , senderID = getNid nsSnap - , part = if Set.size msgSet == 1 then 1 else fromIntegral $ Set.size msgSet + 1 - , isFinalPart = False - , action = QueryID - , payload = Just responsePayload - } - pure queryResponseMsg - pure $ serialiseMessage sendMessageSize responseMsg - ) senderPayload - --- | Respond to a Leave request by removing the leaving node from local data structures --- and confirming with response. --- TODO: copy over key data from leaver and confirm -respondLeave :: LocalNodeStateSTM -> Set.Set FediChordMessage -> IO (Map.Map Integer BS.ByteString) -respondLeave nsSTM msgSet = do - -- combine payload of all parts - let (requestPreds, requestSuccs) = foldr' (\msg (predAcc, succAcc) -> - (maybe predAcc (++ predAcc) (leavePredecessors <$> payload msg) - ,maybe succAcc (++ succAcc) (leaveSuccessors <$> payload msg)) - ) - ([],[]) msgSet - aRequestPart = Set.elemAt 0 msgSet - senderID = getNid . sender $ aRequestPart - responseMsg <- atomically $ do - nsSnap <- readTVar nsSTM - -- remove leaving node from successors, predecessors and NodeCache - writeTQueue (cacheWriteQueue nsSnap) $ deleteCacheEntry senderID - writeTVar nsSTM $ - -- add predecessors and successors of leaving node to own lists - setPredecessors (filter ((/=) senderID . getNid) $ requestPreds <> predecessors nsSnap) - . setSuccessors (filter ((/=) senderID . getNid) $ requestSuccs <> successors nsSnap) $ nsSnap - -- TODO: handle handover of key data - let leaveResponse = Response { - requestID = requestID aRequestPart - , senderID = getNid nsSnap - , part = if Set.size msgSet == 1 then 1 else fromIntegral $ Set.size msgSet + 1 - , isFinalPart = False - , action = Leave - , payload = Just LeaveResponsePayload - } - pure leaveResponse - pure $ serialiseMessage sendMessageSize responseMsg - --- | respond to stabilise requests by returning successor and predecessor list -respondStabilise :: LocalNodeStateSTM -> Set.Set FediChordMessage -> IO (Map.Map Integer BS.ByteString) -respondStabilise nsSTM msgSet = do - nsSnap <- readTVarIO nsSTM - let - aRequestPart = Set.elemAt 0 msgSet - responsePayload = StabiliseResponsePayload { - stabiliseSuccessors = successors nsSnap - , stabilisePredecessors = predecessors nsSnap - } - stabiliseResponse = Response { - requestID = requestID aRequestPart - , senderID = getNid nsSnap - , part = if Set.size msgSet == 1 then 1 else fromIntegral $ Set.size msgSet + 1 - , isFinalPart = False - , action = Stabilise - , payload = Just responsePayload - } - -- TODO: return service endpoint for copying over key data - pure $ serialiseMessage sendMessageSize stabiliseResponse - - --- | respond to Ping request by returning all active vserver NodeStates -respondPing :: LocalNodeStateSTM -> Set.Set FediChordMessage -> IO (Map.Map Integer BS.ByteString) -respondPing nsSTM msgSet = do - -- TODO: respond with all active VS when implementing k-choices - nsSnap <- readTVarIO nsSTM - let - aRequestPart = Set.elemAt 0 msgSet - responsePayload = PingResponsePayload { pingNodeStates = [ toRemoteNodeState nsSnap ] } - pingResponse = Response { - requestID = requestID aRequestPart - , senderID = getNid nsSnap - , part = if Set.size msgSet == 1 then 1 else fromIntegral $ Set.size msgSet + 1 - , isFinalPart = False - , action = Ping - , payload = Just responsePayload - } - pure $ serialiseMessage sendMessageSize pingResponse - --- this modifies node state, so locking and IO seems to be necessary. --- Still try to keep as much code as possible pure -respondJoin :: LocalNodeStateSTM -> Set.Set FediChordMessage -> IO (Map.Map Integer BS.ByteString) -respondJoin nsSTM msgSet = do - -- atomically read and modify the node state according to the parsed request - responseMsg <- atomically $ do - nsSnap <- readTVar nsSTM - cache <- readTVar $ nodeCacheSTM nsSnap - let - aRequestPart = Set.elemAt 0 msgSet - senderNS = sender aRequestPart - responsibilityLookup = queryLocalCache nsSnap cache 1 (getNid senderNS) - thisNodeResponsible (FOUND _) = True - thisNodeResponsible (FORWARD _) = False - -- check whether the joining node falls into our responsibility - if thisNodeResponsible responsibilityLookup - then do - -- if yes, adjust own predecessors/ successors and return those in a response - let - newPreds = senderNS:predecessors nsSnap - joinedNS = setPredecessors newPreds nsSnap - responsePayload = JoinResponsePayload { - joinSuccessors = successors joinedNS - , joinPredecessors = predecessors joinedNS - , joinCache = toRemoteCache cache - } - joinResponse = Response { - requestID = requestID aRequestPart - , senderID = getNid joinedNS - , part = if Set.size msgSet == 1 then 1 else fromIntegral $ Set.size msgSet + 1 - , isFinalPart = False - , action = Join - , payload = Just responsePayload - } - writeTVar nsSTM joinedNS - pure joinResponse - -- otherwise respond with empty payload - else pure Response { - requestID = requestID aRequestPart - , senderID = getNid nsSnap - , part = if Set.size msgSet == 1 then 1 else fromIntegral $ Set.size msgSet + 1 - , isFinalPart = False - , action = Join - , payload = Nothing - } - - pure $ serialiseMessage sendMessageSize responseMsg - -- TODO: notify service layer to copy over data now handled by the new joined node - --- ....... request sending ....... - --- | send a join request and return the joined 'LocalNodeState' including neighbours -requestJoin :: NodeState a => a -- ^ currently responsible node to be contacted - -> LocalNodeStateSTM -- ^ joining NodeState - -> IO (Either String LocalNodeStateSTM) -- ^ node after join with all its new information -requestJoin toJoinOn ownStateSTM = - bracket (mkSendSocket (getDomain toJoinOn) (getDhtPort toJoinOn)) close (\sock -> do - -- extract own state for getting request information - ownState <- readTVarIO ownStateSTM - responses <- sendRequestTo 5000 3 (\rid -> Request rid (toRemoteNodeState ownState) 1 True Join (Just JoinRequestPayload)) sock - (cacheInsertQ, joinedState) <- atomically $ do - stateSnap <- readTVar ownStateSTM - let - (cacheInsertQ, predAccSet, succAccSet) = foldl' - (\(insertQ, predAccSet', succAccSet') msg -> - let - insertQ' = maybe insertQ (\msgPl -> - -- collect list of insertion statements into global cache - queueAddEntries (joinCache msgPl) : insertQ - ) $ payload msg - -- collect received predecessors and successors - predAccSet'' = maybe predAccSet' ( - foldr' Set.insert predAccSet' . joinPredecessors - ) $ payload msg - succAccSet'' = maybe succAccSet' ( - foldr' Set.insert succAccSet' . joinSuccessors - ) $ payload msg - in - (insertQ', predAccSet'', succAccSet'') - ) - -- reset predecessors and successors - ([], Set.empty, Set.empty) - responses - -- sort, slice and set the accumulated successors and predecessors - newState = setSuccessors (Set.elems succAccSet) . setPredecessors (Set.elems predAccSet) $ stateSnap - writeTVar ownStateSTM newState - pure (cacheInsertQ, newState) - -- execute the cache insertions - mapM_ (\f -> f joinedState) cacheInsertQ - pure $ if responses == Set.empty - then Left $ "join error: got no response from " <> show (getNid toJoinOn) - else if null (predecessors joinedState) && null (successors joinedState) - then Left "join error: no predecessors or successors" - -- successful join - else Right ownStateSTM - ) - `catch` (\e -> pure . Left $ displayException (e :: IOException)) - - --- | Send a 'QueryID' 'Request' for getting the node that handles a certain key ID. -requestQueryID :: LocalNodeState -- ^ NodeState of the querying node - -> NodeID -- ^ target key ID to look up - -> IO RemoteNodeState -- ^ the node responsible for handling that key --- 1. do a local lookup for the l closest nodes --- 2. create l sockets --- 3. send a message async concurrently to all l nodes --- 4. collect the results, insert them into cache --- 5. repeat until FOUND (problem: new entries not necessarily already in cache, explicitly compare with closer results) --- TODO: deal with lookup failures -requestQueryID ns targetID = do - firstCacheSnapshot <- readTVarIO . nodeCacheSTM $ ns - -- TODO: make maxAttempts configurable - queryIdLookupLoop firstCacheSnapshot ns 50 targetID - --- | like 'requestQueryID, but allows passing of a custom cache, e.g. for joining -queryIdLookupLoop :: NodeCache -> LocalNodeState -> Int -> NodeID -> IO RemoteNodeState --- return node itself as default fallback value against infinite recursion. --- TODO: consider using an Either instead of a default value -queryIdLookupLoop _ ns 0 _ = pure $ toRemoteNodeState ns -queryIdLookupLoop cacheSnapshot ns maxAttempts targetID = do - let localResult = queryLocalCache ns cacheSnapshot (lNumBestNodes ns) targetID - -- FOUND can only be returned if targetID is owned by local node - case localResult of - FOUND thisNode -> pure thisNode - FORWARD nodeSet -> do - responseEntries <- sendQueryIdMessages targetID ns Nothing (remoteNode <$> Set.elems nodeSet) - now <- getPOSIXTime - -- check for a FOUND and return it - case responseEntries of - FOUND foundNode -> pure foundNode - -- if not FOUND, insert entries into local cache copy and recurse - FORWARD entrySet -> - let newLCache = foldr' ( - addCacheEntryPure now - ) cacheSnapshot entrySet - in - queryIdLookupLoop newLCache ns (maxAttempts - 1) targetID - - -sendQueryIdMessages :: (Integral i) - => NodeID -- ^ target key ID to look up - -> LocalNodeState -- ^ node state of the node doing the query - -> Maybe i -- ^ optionally provide an explicit @l@ parameter of number of nodes to be returned - -> [RemoteNodeState] -- ^ nodes to query - -> IO QueryResponse -- ^ accumulated response -sendQueryIdMessages targetID ns lParam targets = do - - -- create connected sockets to all query targets and use them for request handling - -- ToDo: make attempts and timeout configurable - queryThreads <- mapM (\resultNode -> async $ bracket (mkSendSocket (getDomain resultNode) (getDhtPort resultNode)) close ( - sendRequestTo 5000 3 (lookupMessage targetID ns Nothing) - )) targets - -- ToDo: process results immediately instead of waiting for the last one to finish, see https://stackoverflow.com/a/38815224/9198613 - -- ToDo: exception handling, maybe log them - responses <- (mconcat . fmap Set.elems) . rights <$> mapM waitCatch queryThreads - -- insert new cache entries both into global cache as well as return accumulated QueryResponses for further processing - now <- getPOSIXTime - -- collect cache entries from all responses - foldM (\acc resp -> do - let entrySet = case queryResult <$> payload resp of - Just (FOUND result1) -> Set.singleton (RemoteCacheEntry result1 now) - Just (FORWARD resultset) -> resultset - _ -> Set.empty - - -- forward entries to global cache - queueAddEntries entrySet ns - -- return accumulated QueryResult - pure $ case acc of - -- once a FOUND as been encountered, return this as a result - isFound@FOUND{} -> isFound - FORWARD accSet -> FORWARD $ entrySet `Set.union` accSet - - ) (FORWARD Set.empty) responses - --- | Create a QueryID message to be supplied to 'sendRequestTo' -lookupMessage :: Integral i - => NodeID -- ^ target ID - -> LocalNodeState -- ^ sender node state - -> Maybe i -- ^ optionally provide a different l parameter - -> (Integer -> FediChordMessage) -lookupMessage targetID ns lParam = \rID -> Request rID (toRemoteNodeState ns) 1 True QueryID (Just $ pl ns targetID) - where - pl ns' targetID' = QueryIDRequestPayload { queryTargetID = targetID', queryLBestNodes = maybe (fromIntegral $ lNumBestNodes ns) fromIntegral lParam } - - --- | Send a stabilise request to provided 'RemoteNode' and, if successful, --- return parsed neighbour lists -requestStabilise :: LocalNodeState -- ^ sending node - -> RemoteNodeState -- ^ neighbour node to send to - -> IO (Either String ([RemoteNodeState], [RemoteNodeState])) -- ^ (predecessors, successors) of responding node -requestStabilise ns neighbour = do - responses <- bracket (mkSendSocket (getDomain neighbour) (getDhtPort neighbour)) close (fmap Right . sendRequestTo 5000 3 (\rid -> - Request { - requestID = rid - , sender = toRemoteNodeState ns - , part = 1 - , isFinalPart = False - , action = Stabilise - , payload = Just StabiliseRequestPayload - } - ) - ) `catch` (\e -> pure . Left $ displayException (e :: IOException)) - either - -- forward IO error messages - (pure . Left) - (\respSet -> do - -- fold all reply parts together - let (responsePreds, responseSuccs) = foldr' (\msg (predAcc, succAcc) -> - (maybe predAcc (++ predAcc) (stabilisePredecessors <$> payload msg) - ,maybe succAcc (++ succAcc) (stabiliseSuccessors <$> payload msg)) - ) - ([],[]) respSet - -- update successfully responded neighbour in cache - now <- getPOSIXTime - maybe (pure ()) (\p -> queueAddEntries (Identity $ RemoteCacheEntry (sender p) now) ns) $ headMay (Set.elems respSet) - pure $ if null responsePreds && null responseSuccs - then Left "no neighbours returned" - else Right (responsePreds, responseSuccs) - ) responses - - -requestPing :: LocalNodeState -- ^ sending node - -> RemoteNodeState -- ^ node to be PINGed - -> IO (Either String [RemoteNodeState]) -- ^ all active vServers of the pinged node -requestPing ns target = do - responses <- bracket (mkSendSocket (getDomain target) (getDhtPort target)) close - (\sock -> do - resp <- sendRequestTo 5000 3 (\rid -> - Request { - requestID = rid - , sender = toRemoteNodeState ns - , part = 1 - , isFinalPart = False - , action = Ping - , payload = Just PingRequestPayload - } - ) sock - (SockAddrInet6 _ _ peerAddr _) <- getPeerName sock - pure $ Right (peerAddr, resp) - ) `catch` (\e -> pure . Left $ displayException (e :: IOException)) - either - -- forward IO error messages - (pure . Left) - (\(peerAddr, respSet) -> do - -- fold all reply parts together - let responseVss = foldr' (\msg acc -> - maybe acc (foldr' (:) acc) (pingNodeStates <$> payload msg) - ) - [] respSet - -- recompute ID for each received node and mark as verified in cache - now <- getPOSIXTime - forM_ responseVss (\vs -> - let recomputedID = genNodeID peerAddr (getDomain vs) (fromInteger $ getVServerID vs) - in if recomputedID == getNid vs - then atomically $ writeTQueue (cacheWriteQueue ns) $ addNodeAsVerifiedPure now vs - else pure () - ) - pure $ if null responseVss - then Left "no active vServer IDs returned, ignoring node" - else Right responseVss - ) responses - - - --- | Generic function for sending a request over a connected socket and collecting the response. --- Serialises the message and tries to deliver its parts for a number of attempts within a specified timeout. -sendRequestTo :: Int -- ^ timeout in seconds - -> Int -- ^ number of retries - -> (Integer -> FediChordMessage) -- ^ the message to be sent, still needing a requestID - -> Socket -- ^ connected socket to use for sending - -> IO (Set.Set FediChordMessage) -- ^ responses -sendRequestTo timeoutMillis numAttempts msgIncomplete sock = do - -- give the message a random request ID - randomID <- randomRIO (0, 2^32-1) - let - msgComplete = msgIncomplete randomID - requests = serialiseMessage sendMessageSize msgComplete - putStrLn $ "sending request message " <> show msgComplete - -- create a queue for passing received response messages back, even after a timeout - responseQ <- newTBQueueIO $ 2*maximumParts -- keep room for duplicate packets - -- start sendAndAck with timeout - attempts numAttempts . timeout timeoutMillis $ sendAndAck responseQ sock requests - -- after timeout, check received responses, delete them from unacked message set/ map and rerun senAndAck with that if necessary. - recvdParts <- atomically $ flushTBQueue responseQ - pure $ Set.fromList recvdParts - where - sendAndAck :: TBQueue FediChordMessage -- ^ the queue for putting in the received responses - -> Socket -- ^ the socket used for sending and receiving for this particular remote node - -> Map.Map Integer BS.ByteString -- ^ the remaining unacked request parts - -> IO () - sendAndAck responseQueue sock remainingSends = do - sendMany sock $ Map.elems remainingSends - -- if all requests have been acked/ responded to, return prematurely - recvLoop responseQueue remainingSends Set.empty Nothing - recvLoop :: TBQueue FediChordMessage -- ^ the queue for putting in the received responses - -> Map.Map Integer BS.ByteString -- ^ the remaining unacked request parts - -> Set.Set Integer -- ^ already received response part numbers - -> Maybe Integer -- ^ total number of response parts if already known - -> IO () - recvLoop responseQueue remainingSends' receivedPartNums totalParts = do - -- 65535 is maximum length of UDP packets, as long as - -- no IPv6 jumbograms are used - response <- deserialiseMessage <$> recv sock 65535 - case response of - Right msg@Response{} -> do - atomically $ writeTBQueue responseQueue msg - let - newTotalParts = if isFinalPart msg then Just (part msg) else totalParts - newRemaining = Map.delete (part msg) remainingSends' - newReceivedParts = Set.insert (part msg) receivedPartNums - if Map.null newRemaining && maybe False (\p -> Set.size receivedPartNums == fromIntegral p) newTotalParts - then pure () - else recvLoop responseQueue newRemaining receivedPartNums newTotalParts - -- drop errors and invalid messages - Left _ -> recvLoop responseQueue remainingSends' receivedPartNums totalParts - - --- | enqueue a list of RemoteCacheEntries to be added to the global NodeCache -queueAddEntries :: Foldable c => c RemoteCacheEntry - -> LocalNodeState - -> IO () -queueAddEntries entries ns = do - now <- getPOSIXTime - forM_ entries $ \entry -> atomically $ writeTQueue (cacheWriteQueue ns) $ addCacheEntryPure now entry - - --- | enque a list of node IDs to be deleted from the global NodeCache -queueDeleteEntries :: Foldable c - => c NodeID - -> LocalNodeState - -> IO () -queueDeleteEntries ids ns = forM_ ids $ atomically . writeTQueue (cacheWriteQueue ns) . deleteCacheEntry - - --- | enque a single node ID to be deleted from the global NodeCache -queueDeleteEntry :: NodeID - -> LocalNodeState - -> IO () -queueDeleteEntry toDelete = queueDeleteEntries $ Identity toDelete - -- | retry an IO action at most *i* times until it delivers a result attempts :: Int -- ^ number of retries *i* -> IO (Maybe a) -- ^ action to retry @@ -740,39 +210,3 @@ attempts i action = do case actionResult of Nothing -> attempts (i-1) action Just res -> pure $ Just res - --- ====== network socket operations ====== - --- | resolve a specified host and return the 'AddrInfo' for it. --- If no hostname or IP is specified, the 'AddrInfo' can be used to bind to all --- addresses; --- if no port is specified an arbitrary free port is selected. -resolve :: Maybe String -- ^ hostname or IP address to be resolved - -> Maybe PortNumber -- ^ port number of either local bind or remote - -> IO AddrInfo -resolve host port = let - hints = defaultHints { addrFamily = AF_INET6, addrSocketType = Datagram - , addrFlags = [AI_PASSIVE] } - in - head <$> getAddrInfo (Just hints) host (show <$> port) - --- | create an unconnected UDP Datagram 'Socket' bound to the specified address -mkServerSocket :: HostAddress6 -> PortNumber -> IO Socket -mkServerSocket ip port = do - sockAddr <- addrAddress <$> resolve (Just $ show . fromHostAddress6 $ ip) (Just port) - sock <- socket AF_INET6 Datagram defaultProtocol - setSocketOption sock IPv6Only 1 - bind sock sockAddr - pure sock - --- | create a UDP datagram socket, connected to a destination. --- The socket gets an arbitrary free local port assigned. -mkSendSocket :: String -- ^ destination hostname or IP - -> PortNumber -- ^ destination port - -> IO Socket -- ^ a socket with an arbitrary source port -mkSendSocket dest destPort = do - destAddr <- addrAddress <$> resolve (Just dest) (Just destPort) - sendSock <- socket AF_INET6 Datagram defaultProtocol - setSocketOption sendSock IPv6Only 1 - connect sendSock destAddr - pure sendSock diff --git a/src/Hash2Pub/FediChord.hs b/src/Hash2Pub/FediChord.hs index 2b9a2ef..78dc711 100644 --- a/src/Hash2Pub/FediChord.hs +++ b/src/Hash2Pub/FediChord.hs @@ -1,7 +1,6 @@ -{-# LANGUAGE DataKinds #-} -{-# LANGUAGE DerivingStrategies #-} -{-# LANGUAGE FlexibleContexts #-} -{-# LANGUAGE OverloadedStrings #-} +{-# LANGUAGE DataKinds #-} +{-# LANGUAGE GeneralizedNewtypeDeriving #-} +{-# LANGUAGE OverloadedStrings #-} {- | Module : FediChord Description : An opinionated implementation of the EpiChord DHT by Leong et al. @@ -17,10 +16,14 @@ module Hash2Pub.FediChord ( , getNodeID , toNodeID , NodeState (..) - , LocalNodeState (..) - , RemoteNodeState (..) - , setSuccessors - , setPredecessors + , InternalNodeState (..) + , getNodeCacheRef + , putNodeCache + , getSuccessors + , putSuccessors + , getPredecessors + , putPredecessors + , getLNumBestNodes , NodeCache , CacheEntry(..) , cacheGetNodeStateUnvalidated @@ -38,94 +41,427 @@ module Hash2Pub.FediChord ( , bsAsIpAddr , FediChordConf(..) , fediChordInit - , fediChordJoin - , fediChordBootstrapJoin - , tryBootstrapJoining - , fediMainThreads - , RealNode (..) , nodeStateInit , mkServerSocket - , mkSendSocket , resolve , cacheWriter - , joinOnNewEntriesThread ) where -import Control.Applicative ((<|>)) -import Control.Concurrent -import Control.Concurrent.Async +import Control.Exception +import qualified Data.Map.Strict as Map +import Data.Maybe (fromMaybe, isJust, mapMaybe) +import Data.Time.Clock.POSIX +import Network.Socket + +-- for hashing and ID conversion import Control.Concurrent.STM import Control.Concurrent.STM.TQueue -import Control.Concurrent.STM.TVar -import Control.Exception -import Control.Monad (forM_, forever) -import Control.Monad.Except +import Control.Monad (forever) import Crypto.Hash import qualified Data.ByteArray as BA import qualified Data.ByteString as BS import qualified Data.ByteString.UTF8 as BSU -import Data.Either (rights) -import Data.Foldable (foldr') -import Data.Functor.Identity +import Data.IORef import Data.IP (IPv6, fromHostAddress6, toHostAddress6) -import Data.List ((\\)) -import qualified Data.Map.Strict as Map -import Data.Maybe (catMaybes, fromJust, fromMaybe, - isJust, isNothing, mapMaybe) -import qualified Data.Set as Set -import Data.Time.Clock.POSIX import Data.Typeable (Typeable (..), typeOf) import Data.Word import qualified Network.ByteOrder as NetworkBytes -import Network.Socket hiding (recv, recvFrom, send, - sendTo) -import Network.Socket.ByteString -import Safe -import System.Random (randomRIO) -import Hash2Pub.DHTProtocol -import Hash2Pub.FediChordTypes import Hash2Pub.Utils import Debug.Trace (trace) +-- define protocol constants +-- | static definition of ID length in bits +idBits :: Integer +idBits = 256 + +-- |NodeIDs are Integers wrapped in a newtype, to be able to redefine +-- their instance behaviour +-- +-- for being able to check value bounds, the constructor should not be used directly +-- and new values are created via @toNodeID@ (newtype constructors cannot be hidden) +newtype NodeID = NodeID { getNodeID :: Integer } deriving (Eq, Show, Enum) + +-- |smart data constructor for NodeID that throws a runtime exception for out-of-bounds values. +-- When needing a runtime-safe constructor with drawbacks, try @fromInteger@ +toNodeID :: Integer -> NodeID +toNodeID i = assert (i >= getNodeID minBound && i <= getNodeID maxBound) $ NodeID i + +-- |NodeIDs are bounded by the value range of an unsigned Integer of length 'idBits' +instance Bounded NodeID where + minBound = NodeID 0 + maxBound = NodeID (2^idBits - 1) + +-- |calculations with NodeIDs are modular arithmetic operations +instance Num NodeID where + a + b = NodeID $ (getNodeID a + getNodeID b) `mod` (getNodeID maxBound + 1) + a * b = NodeID $ (getNodeID a * getNodeID b) `mod` (getNodeID maxBound + 1) + a - b = NodeID $ (getNodeID a - getNodeID b) `mod` (getNodeID maxBound + 1) + -- |safe constructor for NodeID values with the drawback, that out-of-bound values are wrapped around + -- with modulo to fit in the allowed value space. For runtime checking, look at @toNodeID@. + fromInteger i = NodeID $ i `mod` (getNodeID maxBound + 1) + signum = NodeID . signum . getNodeID + abs = NodeID . abs . getNodeID -- ToDo: make sure that at creation time only IDs within the range are used + +-- | use normal strict monotonic ordering of integers, realising the ring structure +-- is done in the @NodeCache@ implementation +instance Ord NodeID where + a `compare` b = getNodeID a `compare` getNodeID b + +-- | local comparison of 2 node IDs, only relevant for determining a successor or predecessor on caches with just 2 nodes +localCompare :: NodeID -> NodeID -> Ordering +a `localCompare` b + | getNodeID a == getNodeID b = EQ + | wayForwards > wayBackwards = GT + | otherwise = LT + where + wayForwards = getNodeID (b - a) + wayBackwards = getNodeID (a - b) + + +-- | represents a node and all its important state +data NodeState = NodeState { + nid :: NodeID + , domain :: String + -- ^ full public domain name the node is reachable under + , ipAddr :: HostAddress6 + -- the node's public IPv6 address + , dhtPort :: PortNumber + -- ^ port of the DHT itself + , apPort :: Maybe PortNumber + -- ^ port of the ActivityPub relay and storage service + -- might have to be queried first + , vServerID :: Integer + -- ^ ID of this vserver + + -- ==== internal state ==== + , internals :: Maybe InternalNodeState + -- ^ data not present in the representation of remote nodes + -- is put into its own type. + -- This is usually @Nothing@ for all remote nodes. + } deriving (Show, Eq) + +-- | encapsulates all data and parameters that are not present for remote nodes +data InternalNodeState = InternalNodeState { + nodeCache :: IORef NodeCache + -- ^ EpiChord node cache with expiry times for nodes + -- as the map is ordered, lookups for the closes preceding node can be done using @lookupLT@. + -- encapsulated into an IORef for allowing concurrent reads without locking + , cacheWriteQueue :: TQueue (NodeCache -> NodeCache) + -- ^ cache updates are not written directly to the 'nodeCache' but queued and + -- only processed by a single writer thread to prevent lost updates. + -- All nodeCache modifying functions have to be partially applied enough before + -- being put into the queue. + -- + , successors :: [NodeID] -- could be a set instead as these are ordered as well + -- ^ successor nodes in ascending order by distance + , predecessors :: [NodeID] + -- ^ predecessor nodes in ascending order by distance + ----- protocol parameters ----- + -- TODO: evaluate moving these somewhere else + , kNeighbours :: Int + -- ^ desired length of predecessor and successor list + -- needs to be parameterisable for simulation purposes + , lNumBestNodes :: Int + -- ^ number of best next hops to provide + -- needs to be parameterisable for simulation purposes + , pNumParallelQueries :: Int + -- ^ number of parallel sent queries + -- needs to be parameterisable for simulation purposes + , jEntriesPerSlice :: Int + -- ^ number of desired entries per cache slice + -- needs to be parameterisable for simulation purposes + } deriving (Show, Eq) + +-- | defining Show instances to be able to print NodeState for debug purposes +instance Typeable a => Show (IORef a) where + show x = show (typeOf x) + +instance Typeable a => Show (TQueue a) where + show x = show (typeOf x) + +-- | extract a value from the internals of a 'NodeState' +getInternals_ :: (InternalNodeState -> a) -> NodeState -> Maybe a +getInternals_ func ns = func <$> internals ns + +-- could be done better with lenses +-- | convenience function that updates an internal value of a NodeState +putInternals_ :: (InternalNodeState -> InternalNodeState) -> NodeState -> NodeState +putInternals_ func ns = let + newInternals = func <$> internals ns + in + ns {internals = newInternals } + +-- | convenience function for extracting the 'NodeCache' from a 'NodeState' +getNodeCacheRef :: NodeState -> Maybe (IORef NodeCache) +getNodeCacheRef = getInternals_ nodeCache + +-- | convenience function for updating the 'NodeCache' on 'NodeState' s that have +-- internals. +-- NodeStates without a cache (without internals) are returned unchanged +putNodeCache :: IORef NodeCache -> NodeState -> NodeState +putNodeCache nc = putInternals_ (\i -> i {nodeCache = nc}) + +getCacheWriteQueue :: NodeState -> Maybe (TQueue (NodeCache -> NodeCache)) +getCacheWriteQueue = getInternals_ cacheWriteQueue + +-- | convenience function for extracting the @successors@ from a 'NodeState' +getSuccessors :: NodeState -> Maybe [NodeID] +getSuccessors = getInternals_ successors + +-- | convenience function that updates the successors of a NodeState +putSuccessors :: [NodeID] -> NodeState -> NodeState +putSuccessors succ' = putInternals_ (\i -> i {successors = succ'}) + +-- | convenience function for extracting the @predecessors@ from a 'NodeState' +getPredecessors :: NodeState -> Maybe [NodeID] +getPredecessors = getInternals_ predecessors + +-- | convenience function that updates the predecessors of a NodeState +putPredecessors :: [NodeID] -> NodeState -> NodeState +putPredecessors pred' = putInternals_ (\i -> i {predecessors = pred'}) + +-- | convenience function for extracting the @lNumBestNodes@ from a 'NodeState' +getLNumBestNodes :: NodeState -> Maybe Int +getLNumBestNodes = getInternals_ lNumBestNodes + +type NodeCache = Map.Map NodeID CacheEntry + +-- |an entry of the 'nodeCache' can hold 2 different kinds of data +data CacheEntry = + -- | an entry representing its validation status, the node state and its timestamp + NodeEntry Bool NodeState POSIXTime + -- | a proxy field for closing the ring structure, indicating the lookup shall be + -- resumed at the given @NodeID@ unless the @ProxyEntry@ itself holds a @NodeEntry@ + | ProxyEntry (NodeID, ProxyDirection) (Maybe CacheEntry) + deriving (Show, Eq) + +-- | as a compromise, only NodeEntry components are ordered by their NodeID +-- while ProxyEntry components should never be tried to be ordered. +instance Ord CacheEntry where + + a `compare` b = compare (extractID a) (extractID b) + where + extractID (NodeEntry _ eState _) = nid eState + extractID (ProxyEntry _ _) = error "proxy entries should never appear outside of the NodeCache" + +data ProxyDirection = Backwards | Forwards deriving (Show, Eq) + +instance Enum ProxyDirection where + toEnum (-1) = Backwards + toEnum 1 = Forwards + toEnum _ = error "no such ProxyDirection" + fromEnum Backwards = - 1 + fromEnum Forwards = 1 + +--- useful function for getting entries for a full cache transfer +cacheEntries :: NodeCache -> [CacheEntry] +cacheEntries ncache = mapMaybe extractNodeEntries $ Map.elems ncache + where + extractNodeEntries (ProxyEntry _ possibleEntry) = possibleEntry + +-- | An empty @NodeCache@ needs to be initialised with 2 proxy entries, +-- linking the modular name space together by connecting @minBound@ and @maxBound@ +initCache :: NodeCache +initCache = Map.fromList $ proxyEntry <$> [(maxBound, (minBound, Forwards)), (minBound, (maxBound, Backwards))] + where + proxyEntry (from,to) = (from, ProxyEntry to Nothing) + +-- | Maybe returns the cache entry stored at given key +cacheLookup :: NodeID -- ^lookup key + -> NodeCache -- ^lookup cache + -> Maybe CacheEntry +cacheLookup key cache = case Map.lookup key cache of + Just (ProxyEntry _ result) -> result + res -> res + +-- | a wrapper around lookup functions, making the lookup redirectable by a @ProxyEntry@ +-- to simulate a modular ring +lookupWrapper :: (NodeID -> NodeCache -> Maybe (NodeID, CacheEntry)) -> (NodeID -> NodeCache -> Maybe (NodeID, CacheEntry)) -> ProxyDirection -> NodeID -> NodeCache -> Maybe CacheEntry +lookupWrapper f fRepeat direction key cache = + case f key cache of + -- the proxy entry found holds a + Just (_, ProxyEntry _ (Just entry@NodeEntry{})) -> Just entry + -- proxy entry holds another proxy entry, this should not happen + Just (_, ProxyEntry _ (Just (ProxyEntry _ _))) -> Nothing + -- proxy entry without own entry is a pointer on where to continue + -- if lookup direction is the same as pointer direction: follow pointer + Just (foundKey, ProxyEntry (pointerID, pointerDirection) Nothing) -> + let newKey = if pointerDirection == direction + then pointerID + else foundKey + (fromInteger . toInteger . fromEnum $ direction) + in if cacheNotEmpty cache + then lookupWrapper fRepeat fRepeat direction newKey cache + else Nothing + -- normal entries are returned + Just (_, entry@NodeEntry{}) -> Just entry + Nothing -> Nothing + where + cacheNotEmpty :: NodeCache -> Bool + cacheNotEmpty cache' = (Map.size cache' > 2) -- there are more than the 2 ProxyEntries + || isJust ( cacheLookup minBound cache') -- or one of the ProxyEntries holds a node + || isJust (cacheLookup maxBound cache') + +-- | find the successor node to a given key on a modular EpiChord ring cache. +-- Note: The EpiChord definition of "successor" includes the node at the key itself, +-- if existing. +cacheLookupSucc :: NodeID -- ^lookup key + -> NodeCache -- ^ring cache + -> Maybe CacheEntry +cacheLookupSucc = lookupWrapper Map.lookupGE Map.lookupGE Forwards + +-- | find the predecessor node to a given key on a modular EpiChord ring cache. +cacheLookupPred :: NodeID -- ^lookup key + -> NodeCache -- ^ring cache + -> Maybe CacheEntry +cacheLookupPred = lookupWrapper Map.lookupLT Map.lookupLE Backwards + +-- clean up cache entries: once now - entry > maxAge +-- transfer difference now - entry to other node + +-- | return the @NodeState@ data from a cache entry without checking its validation status +cacheGetNodeStateUnvalidated :: CacheEntry -> NodeState +cacheGetNodeStateUnvalidated (NodeEntry _ nState _) = nState +cacheGetNodeStateUnvalidated (ProxyEntry _ (Just entry)) = cacheGetNodeStateUnvalidated entry +cacheGetNodeStateUnvalidated _ = error "trying to pure empty node state, please report a bug" + +-- | converts a 'HostAddress6' IP address to a big-endian strict ByteString +ipAddrAsBS :: HostAddress6 -> BS.ByteString +ipAddrAsBS (a, b, c, d) = mconcat $ fmap NetworkBytes.bytestring32 [a, b, c, d] + +-- | converts a ByteString in big endian order to an IPv6 address 'HostAddress6' +bsAsIpAddr :: BS.ByteString -> HostAddress6 +bsAsIpAddr bytes = (a,b,c,d) + where + a:b:c:d:_ = fmap NetworkBytes.word32 . chunkBytes 4 $ bytes + + +-- | generates a 256 bit long NodeID using SHAKE128, represented as ByteString +genNodeIDBS :: HostAddress6 -- ^a node's IPv6 address + -> String -- ^a node's 1st and 2nd level domain name + -> Word8 -- ^the used vserver ID + -> BS.ByteString -- ^the NodeID as a 256bit ByteString big-endian unsigned integer +genNodeIDBS ip nodeDomain vserver = + hashIpaddrUpper `BS.append` hashID nodeDomain' `BS.append` hashIpaddLower + where + vsBS = BS.pack [vserver] -- attention: only works for vserver IDs up to 255 + ipaddrNet = BS.take 8 (ipAddrAsBS ip) `BS.append` vsBS + nodeDomain' = BSU.fromString nodeDomain `BS.append` vsBS + hashID bstr = BS.pack . BA.unpack $ (hash bstr :: Digest (SHAKE128 128)) + (hashIpaddrUpper, hashIpaddLower) = BS.splitAt 64 $ hashID ipaddrNet + + +-- | generates a 256 bit long @NodeID@ using SHAKE128 +genNodeID :: HostAddress6 -- ^a node's IPv6 address + -> String -- ^a node's 1st and 2nd level domain name + -> Word8 -- ^the used vserver ID + -> NodeID -- ^the generated @NodeID@ +genNodeID ip nodeDomain vs = NodeID . byteStringToUInteger $ genNodeIDBS ip nodeDomain vs + +-- | generates a 256 bit long key identifier, represented as ByteString, for looking up its data on the DHT +genKeyIDBS :: String -- ^the key string + -> BS.ByteString -- ^the key ID represented as a @ByteString@ +genKeyIDBS key = BS.pack . BA.unpack $ (hash (BSU.fromString key) :: Digest SHA3_256) + +-- | generates a 256 bit long key identifier for looking up its data on the DHT +genKeyID :: String -- ^the key string + -> NodeID -- ^the key ID +genKeyID = NodeID . byteStringToUInteger . genKeyIDBS + + +-- | parses the bit pattern of a ByteString as an unsigned Integer in Big Endian order +-- by iterating it byte-wise from the back and shifting the byte values according to their offset +byteStringToUInteger :: BS.ByteString -> Integer +byteStringToUInteger bs = sum $ parsedBytes 0 bs + where + parsedBytes :: Integer -> BS.ByteString -> [ Integer ] + parsedBytes offset uintBs = case BS.unsnoc uintBs of + Nothing -> [] + Just (bs', w) -> parseWithOffset offset w : parsedBytes (offset+1) bs' + + parseWithOffset :: Integer -> Word8 -> Integer + parseWithOffset 0 word = toInteger word -- a shift of 0 is always 0 + parseWithOffset offset word = toInteger word * 2^(8 * offset) + + + +-- TODO: complete rewrite +-- |checks wether the cache entries fulfill the logarithmic EpiChord invariant +-- of having j entries per slice, and creates a list of necessary lookup actions. +-- Should be invoked periodically. +--checkCacheSlices :: NodeState -> IO [()] +--checkCacheSlices state = case getNodeCache state of +-- -- don't do anything on nodes without a cache +-- Nothing -> pure [()] +-- Just cache' -> checkSlice jEntries (nid state) startBound lastSucc =<< readIORef cache' +-- -- TODO: do the same for predecessors +-- where +-- jEntries = fromMaybe 0 $ getInternals_ jEntriesPerSlice state +-- lastSucc = last <$> maybeEmpty (fromMaybe [] $ getSuccessors state) +-- startBound = NodeID 2^(255::Integer) + nid state +-- checkSlice :: Int -> NodeID -> NodeID -> Maybe NodeID -> NodeCache -> [IO ()] +-- checkSlice _ _ _ Nothing _ = [] +-- checkSlice j ownID upperBound (Just lastSuccNode) cache +-- | upperBound < lastSuccNode = [] +-- | otherwise = +-- -- continuously half the DHT namespace, take the upper part as a slice, +-- -- check for existing entries in that slice and create a lookup action +-- -- and recursively do this on the lower half. +-- -- recursion edge case: all successors/ predecessors need to be in the +-- -- first slice. +-- let +-- diff = getNodeID $ upperBound - ownID +-- lowerBound = ownID + NodeID (diff `div` 2) +-- in +-- -- TODO: replace empty IO actions with actual lookups to middle of slice +-- -- TODO: validate ID before adding to cache +-- case Map.lookupLT upperBound cache of +-- Nothing -> pure () : checkSlice j ownID lowerBound (Just lastSuccNode) cache +-- Just (matchID, _) -> +-- if +-- matchID <= lowerBound then pure () : checkSlice j ownID lowerBound (Just lastSuccNode) cache +-- else +-- checkSlice j ownID lowerBound (Just lastSuccNode) cache + + +-- Todo: DHT backend can learn potential initial bootstrapping points through the instances mentioned in the received AP-relay messages +-- persist them on disk so they can be used for all following bootstraps + +-- | configuration values used for initialising the FediChord DHT +data FediChordConf = FediChordConf { + confDomain :: String + , confIP :: HostAddress6 + , confDhtPort :: Int + } deriving (Show, Eq) + -- | initialise data structures, compute own IDs and bind to listening socket -- ToDo: load persisted state, thus this function already operates in IO -fediChordInit :: FediChordConf -> IO (Socket, LocalNodeStateSTM) -fediChordInit initConf = do - let realNode = RealNode { - vservers = [] - , nodeConfig = initConf - , bootstrapNodes = confBootstrapNodes initConf - } - realNodeSTM <- newTVarIO realNode - initialState <- nodeStateInit realNodeSTM - initialStateSTM <- newTVarIO initialState - serverSock <- mkServerSocket (getIpAddr initialState) (getDhtPort initialState) - pure (serverSock, initialStateSTM) +fediChordInit :: FediChordConf -> IO (Socket, NodeState) +fediChordInit conf = do + initialState <- nodeStateInit conf + serverSock <- mkServerSocket (ipAddr initialState) (dhtPort initialState) + pure (serverSock, initialState) -- | initialises the 'NodeState' for this local node. -- Separated from 'fediChordInit' to be usable in tests. -nodeStateInit :: RealNodeSTM -> IO LocalNodeState -nodeStateInit realNodeSTM = do - realNode <- readTVarIO realNodeSTM - cacheSTM <- newTVarIO initCache +nodeStateInit :: FediChordConf -> IO NodeState +nodeStateInit conf = do + cacheRef <- newIORef initCache q <- atomically newTQueue let - conf = nodeConfig realNode - vsID = 0 - containedState = RemoteNodeState { + initialState = NodeState { domain = confDomain conf , ipAddr = confIP conf - , nid = genNodeID (confIP conf) (confDomain conf) $ fromInteger vsID + , nid = genNodeID (confIP conf) (confDomain conf) 0 , dhtPort = toEnum $ confDhtPort conf - , servicePort = 0 - , vServerID = vsID - } - initialState = LocalNodeState { - nodeState = containedState - , nodeCacheSTM = cacheSTM + , apPort = Nothing + , vServerID = 0 + , internals = Just internalsInit + } + internalsInit = InternalNodeState { + nodeCache = cacheRef , cacheWriteQueue = q , successors = [] , predecessors = [] @@ -133,504 +469,67 @@ nodeStateInit realNodeSTM = do , lNumBestNodes = 3 , pNumParallelQueries = 2 , jEntriesPerSlice = 2 - , parentRealNode = realNodeSTM } pure initialState --- | Join a new node into the DHT, using a provided bootstrap node as initial cache seed --- for resolving the new node's position. -fediChordBootstrapJoin :: LocalNodeStateSTM -- ^ the local 'NodeState' - -> (String, PortNumber) -- ^ domain and port of a bootstrapping node - -> IO (Either String LocalNodeStateSTM) -- ^ the joined 'NodeState' after a - -- successful join, otherwise an error message -fediChordBootstrapJoin nsSTM bootstrapNode = do - -- can be invoked multiple times with all known bootstrapping nodes until successfully joined - ns <- readTVarIO nsSTM - runExceptT $ do - -- 1. get routed to the currently responsible node - lookupResp <- liftIO $ bootstrapQueryId nsSTM bootstrapNode $ getNid ns - currentlyResponsible <- liftEither lookupResp - liftIO . putStrLn $ "Trying to join on " <> show (getNid currentlyResponsible) - -- 2. then send a join to the currently responsible node - joinResult <- liftIO $ requestJoin currentlyResponsible nsSTM - liftEither joinResult - --- Periodically lookup own ID through a random bootstrapping node to discover and merge separated DHT clusters. --- Unjoined try joining instead. -convergenceSampleThread :: LocalNodeStateSTM -> IO () -convergenceSampleThread nsSTM = forever $ do - nsSnap <- readTVarIO nsSTM - parentNode <- readTVarIO $ parentRealNode nsSnap - if isJoined nsSnap - then - runExceptT (do - -- joined node: choose random node, do queryIDLoop, compare result with own responsibility - let bss = bootstrapNodes parentNode - randIndex <- liftIO $ randomRIO (0, length bss - 1) - chosenNode <- maybe (throwError "invalid bootstrapping node index") pure $ atMay bss randIndex - lookupResult <- liftIO $ bootstrapQueryId nsSTM chosenNode (getNid nsSnap) - currentlyResponsible <- liftEither lookupResult - if getNid currentlyResponsible /= getNid nsSnap - -- if mismatch, stabilise on the result, else do nothing - then do - stabResult <- liftIO $ requestStabilise nsSnap currentlyResponsible - (preds, succs) <- liftEither stabResult - -- TODO: verify neighbours before adding, see #55 - liftIO . atomically $ do - ns <- readTVar nsSTM - writeTVar nsSTM $ addPredecessors preds ns - else pure () - ) >> pure () - -- unjoined node: try joining through all bootstrapping nodes - else tryBootstrapJoining nsSTM >> pure () - let delaySecs = confBootstrapSamplingInterval . nodeConfig $ parentNode - threadDelay $ delaySecs * 10^6 - - --- | Try joining the DHT through any of the bootstrapping nodes until it succeeds. -tryBootstrapJoining :: LocalNodeStateSTM -> IO (Either String LocalNodeStateSTM) -tryBootstrapJoining nsSTM = do - bss <- atomically $ do - nsSnap <- readTVar nsSTM - realNodeSnap <- readTVar $ parentRealNode nsSnap - pure $ bootstrapNodes realNodeSnap - tryJoining bss - where - tryJoining (bn:bns) = do - j <- fediChordBootstrapJoin nsSTM bn - case j of - Left err -> putStrLn ("join error: " <> err) >> tryJoining bns - Right joined -> pure $ Right joined - tryJoining [] = pure $ Left "Exhausted all bootstrap points for joining." - - --- | Look up a key just based on the responses of a single bootstrapping node. -bootstrapQueryId :: LocalNodeStateSTM -> (String, PortNumber) -> NodeID -> IO (Either String RemoteNodeState) -bootstrapQueryId nsSTM (bootstrapHost, bootstrapPort) targetID = do - ns <- readTVarIO nsSTM - bootstrapResponse <- bracket (mkSendSocket bootstrapHost bootstrapPort) close ( - -- Initialise an empty cache only with the responses from a bootstrapping node - fmap Right . sendRequestTo 5000 3 (lookupMessage targetID ns Nothing) - ) - `catch` (\e -> pure . Left $ "Error at bootstrap QueryId: " <> displayException (e :: IOException)) - - case bootstrapResponse of - Left err -> pure $ Left err - Right resp - | resp == Set.empty -> pure . Left $ "Bootstrapping node " <> show bootstrapHost <> " gave no response." - | otherwise -> do - now <- getPOSIXTime - -- create new cache with all returned node responses - let bootstrapCache = - -- traverse response parts - foldr' (\resp cacheAcc -> case queryResult <$> payload resp of - Nothing -> cacheAcc - Just (FOUND result1) -> addCacheEntryPure now (RemoteCacheEntry result1 now) cacheAcc - Just (FORWARD resultset) -> foldr' (addCacheEntryPure now) cacheAcc resultset - ) - initCache resp - currentlyResponsible <- queryIdLookupLoop bootstrapCache ns 50 $ getNid ns - pure $ Right currentlyResponsible - - --- | join a node to the DHT using the global node cache --- node's position. -fediChordJoin :: LocalNodeStateSTM -- ^ the local 'NodeState' - -> IO (Either String LocalNodeStateSTM) -- ^ the joined 'NodeState' after a - -- successful join, otherwise an error message -fediChordJoin nsSTM = do - ns <- readTVarIO nsSTM - -- 1. get routed to the currently responsible node - currentlyResponsible <- requestQueryID ns $ getNid ns - putStrLn $ "Trying to join on " <> show (getNid currentlyResponsible) - -- 2. then send a join to the currently responsible node - joinResult <- requestJoin currentlyResponsible nsSTM - case joinResult of - Left err -> pure . Left $ "Error joining on " <> err - Right joinedNS -> pure . Right $ joinedNS - - --- | Wait for new cache entries to appear and then try joining on them. --- Exits after successful joining. -joinOnNewEntriesThread :: LocalNodeStateSTM -> IO () -joinOnNewEntriesThread nsSTM = loop - where - loop = do - nsSnap <- readTVarIO nsSTM - (lookupResult, cache) <- atomically $ do - cache <- readTVar $ nodeCacheSTM nsSnap - case queryLocalCache nsSnap cache 1 (getNid nsSnap) of - -- empty cache, block until cache changes and then retry - (FORWARD s) | Set.null s -> retry - result -> pure (result, cache) - case lookupResult of - -- already joined - FOUND _ -> do - print =<< readTVarIO nsSTM - pure () - -- otherwise try joining - FORWARD _ -> do - joinResult <- fediChordJoin nsSTM - either - -- on join failure, sleep and retry - -- TODO: make delay configurable - (const $ threadDelay (30 * 10^6) >> loop) - (const $ pure ()) - joinResult - emptyset = Set.empty -- because pattern matches don't accept qualified names - +--fediChordJoin :: NodeState -- ^ the local 'NodeState' +-- -> (String, PortNumber) -- ^ domain and port of a bootstrapping node +-- -> Socket -- ^ socket used for sending and receiving the join message +-- -> IO Either String NodeState -- ^ the joined 'NodeState' after a successful +-- -- join, otherwise an error message +--fediChordJoin ns (joinHost, joinPort) sock = do +-- -- 1. get routed to destination until FOUND +-- -- 2. then send a join to the currently responsible node +-- -- ToDo: implement cache management, as already all received replies should be stored in cache +-- -- | cache updater thread that waits for incoming NodeCache update instructions on -- the node's cacheWriteQueue and then modifies the NodeCache as the single writer. -cacheWriter :: LocalNodeStateSTM -> IO () -cacheWriter nsSTM = - forever $ atomically $ do - ns <- readTVar nsSTM - cacheModifier <- readTQueue $ cacheWriteQueue ns - modifyTVar' (nodeCacheSTM ns) cacheModifier - - --- TODO: make max entry age configurable -maxEntryAge :: POSIXTime -maxEntryAge = 600 - - --- | Periodically iterate through cache, clean up expired entries and verify unverified ones -cacheVerifyThread :: LocalNodeStateSTM -> IO () -cacheVerifyThread nsSTM = forever $ do - putStrLn "cache verify run: begin" - -- get cache - (ns, cache) <- atomically $ do - ns <- readTVar nsSTM - cache <- readTVar $ nodeCacheSTM ns - pure (ns, cache) - -- iterate entries: - -- for avoiding too many time syscalls, get current time before iterating. - now <- getPOSIXTime - forM_ (cacheEntries cache) (\(CacheEntry validated node ts) -> - -- case too old: delete (future work: decide whether pinging and resetting timestamp is better) - if (now - ts) > maxEntryAge - then - queueDeleteEntry (getNid node) ns - -- case unverified: try verifying, otherwise delete - else if not validated - then do - -- marking as verified is done by 'requestPing' as well - pong <- requestPing ns node - either (\_-> - queueDeleteEntry (getNid node) ns - ) - (\vss -> - if node `notElem` vss - then queueDeleteEntry (getNid node) ns - -- after verifying a node, check whether it can be a closer neighbour - else do - if node `isPossiblePredecessor` ns - then atomically $ do - ns' <- readTVar nsSTM - writeTVar nsSTM $ addPredecessors [node] ns' - else pure () - if node `isPossibleSuccessor` ns - then atomically $ do - ns' <- readTVar nsSTM - writeTVar nsSTM $ addSuccessors [node] ns' - else pure () - ) pong - else pure () - ) - - -- check the cache invariant per slice and, if necessary, do a single lookup to the - -- middle of each slice not verifying the invariant - latestNs <- readTVarIO nsSTM - latestCache <- readTVarIO $ nodeCacheSTM latestNs - let nodesToQuery targetID = case queryLocalCache latestNs latestCache (lNumBestNodes latestNs) targetID of - FOUND node -> [node] - FORWARD nodeSet -> remoteNode <$> Set.elems nodeSet - forM_ (checkCacheSliceInvariants latestNs latestCache) (\targetID -> - forkIO $ sendQueryIdMessages targetID latestNs (Just (1 + jEntriesPerSlice latestNs)) (nodesToQuery targetID) >> pure () -- ask for 1 entry more than j because of querying the middle - ) - - putStrLn "cache verify run: end" - threadDelay $ 10^6 * round maxEntryAge `div` 20 - - --- | Checks the invariant of at least @jEntries@ per cache slice. --- If this invariant does not hold, the middle of the slice is returned for --- making lookups to that ID -checkCacheSliceInvariants :: LocalNodeState - -> NodeCache - -> [NodeID] -- ^ list of middle IDs of slices not - -- ^ fulfilling the invariant -checkCacheSliceInvariants ns - | isJoined ns = checkPredecessorSlice jEntries (getNid ns) startBound lastPred <> checkSuccessorSlice jEntries (getNid ns) startBound lastSucc - | otherwise = const [] - where - jEntries = jEntriesPerSlice ns - lastPred = getNid <$> lastMay (predecessors ns) - lastSucc = getNid <$> lastMay (successors ns) - -- start slice boundary: 1/2 key space - startBound = getNid ns + 2^(idBits - 1) - - checkSuccessorSlice :: Integral i => i -> NodeID -> NodeID -> Maybe NodeID -> NodeCache -> [NodeID] - checkSuccessorSlice _ _ _ Nothing _ = [] - checkSuccessorSlice j ownID upperBound (Just lastSuccID) cache - | (upperBound `localCompare` lastSuccID) == LT = [] - | otherwise = - let - diff = getNodeID $ upperBound - ownID - lowerBound = ownID + fromInteger (diff `div` 2) - middleID = lowerBound + fromInteger (diff `div` 4) - lookupResult = Set.map (getNid . remoteNode) $ closestCachePredecessors jEntries upperBound cache - in - -- check whether j entries are in the slice - if length lookupResult == jEntries - && all (\r -> (r `localCompare` lowerBound) == GT) lookupResult - && all (\r -> (r `localCompare` upperBound) == LT) lookupResult - then checkSuccessorSlice j ownID (lowerBound - 1) (Just lastSuccID) cache - -- if not enough entries, add the middle of the slice to list - else middleID : checkSuccessorSlice j ownID (lowerBound - 1) (Just lastSuccID) cache - - checkPredecessorSlice :: Integral i => i -> NodeID -> NodeID -> Maybe NodeID -> NodeCache -> [NodeID] - checkPredecessorSlice _ _ _ Nothing _ = [] - checkPredecessorSlice j ownID lowerBound (Just lastPredID) cache - | (lowerBound `localCompare` lastPredID) == GT = [] - | otherwise = - let - diff = getNodeID $ ownID - lowerBound - upperBound = ownID - fromInteger (diff `div` 2) - middleID = lowerBound + fromInteger (diff `div` 4) - lookupResult = Set.map (getNid . remoteNode) $ closestCachePredecessors jEntries upperBound cache - in - -- check whether j entries are in the slice - if length lookupResult == jEntries - && all (\r -> (r `localCompare` lowerBound) == GT) lookupResult - && all (\r -> (r `localCompare` upperBound) == LT) lookupResult - then checkPredecessorSlice j ownID (upperBound + 1) (Just lastPredID) cache - -- if not enough entries, add the middle of the slice to list - else middleID : checkPredecessorSlice j ownID (upperBound + 1) (Just lastPredID) cache - - --- | Periodically send @StabiliseRequest' s to the closest neighbour nodes, until --- one responds, and get their neighbours for maintaining the own neighbour lists. --- If necessary, request new neighbours. -stabiliseThread :: LocalNodeStateSTM -> IO () -stabiliseThread nsSTM = forever $ do - ns <- readTVarIO nsSTM - - putStrLn "stabilise run: begin" - print ns - - -- iterate through the same snapshot, collect potential new neighbours - -- and nodes to be deleted, and modify these changes only at the end of - -- each stabilise run. - -- This decision makes iterating through a potentially changing list easier. - - -- don't contact all neighbours unless the previous one failed/ Left ed - - predStabilise <- stabiliseClosestResponder ns predecessors 1 [] - succStabilise <- stabiliseClosestResponder ns predecessors 1 [] - - let - (predDeletes, predNeighbours) = either (const ([], [])) id predStabilise - (succDeletes, succNeighbours) = either (const ([], [])) id succStabilise - allDeletes = predDeletes <> succDeletes - allNeighbours = predNeighbours <> succNeighbours - - -- now actually modify the node state's neighbours - updatedNs <- atomically $ do - newerNsSnap <- readTVar nsSTM +cacheWriter :: NodeState -> IO () +cacheWriter ns = do + let writeQueue' = getCacheWriteQueue ns + case writeQueue' of + Nothing -> pure () + Just writeQueue -> forever $ do + f <- atomically $ readTQueue writeQueue let - -- sorting and taking only k neighbours is taken care of by the - -- setSuccessors/ setPredecessors functions - newPreds = (predecessors newerNsSnap \\ allDeletes) <> allNeighbours - newSuccs = (successors newerNsSnap \\ allDeletes) <> allNeighbours - newNs = setPredecessors newPreds . setSuccessors newSuccs $ newerNsSnap - writeTVar nsSTM newNs - pure newNs - -- delete unresponding nodes from cache as well - mapM_ (atomically . writeTQueue (cacheWriteQueue updatedNs) . deleteCacheEntry . getNid) allDeletes + refModifier :: NodeCache -> (NodeCache, ()) + refModifier nc = (f nc, ()) + maybe (pure ()) ( + \ref -> atomicModifyIORef' ref refModifier + ) $ getNodeCacheRef ns - -- try looking up additional neighbours if list too short - forM_ [(length $ predecessors updatedNs)..(kNeighbours updatedNs)] (\_ -> do - ns' <- readTVarIO nsSTM - nextEntry <- requestQueryID ns' $ pred . getNid $ lastDef (toRemoteNodeState ns') (predecessors ns') - atomically $ do - latestNs <- readTVar nsSTM - writeTVar nsSTM $ addPredecessors [nextEntry] latestNs - ) +-- ====== network socket operations ====== - forM_ [(length $ successors updatedNs)..(kNeighbours updatedNs)] (\_ -> do - ns' <- readTVarIO nsSTM - nextEntry <- requestQueryID ns' $ succ . getNid $ lastDef (toRemoteNodeState ns') (successors ns') - atomically $ do - latestNs <- readTVar nsSTM - writeTVar nsSTM $ addSuccessors [nextEntry] latestNs - ) +-- | resolve a specified host and return the 'AddrInfo' for it. +-- If no hostname or IP is specified, the 'AddrInfo' can be used to bind to all +-- addresses; +-- if no port is specified an arbitrary free port is selected. +resolve :: Maybe String -- ^ hostname or IP address to be resolved + -> Maybe PortNumber -- ^ port number of either local bind or remote + -> IO AddrInfo +resolve host port = let + hints = defaultHints { addrFamily = AF_INET6, addrSocketType = Datagram + , addrFlags = [AI_PASSIVE] } + in + head <$> getAddrInfo (Just hints) host (show <$> port) - putStrLn "stabilise run: end" - -- TODO: make delay configurable - threadDelay (60 * 10^6) - where - -- | send a stabilise request to the n-th neighbour - -- (specified by the provided getter function) and on failure retr - -- with the n+1-th neighbour. - -- On success, return 2 lists: The failed nodes and the potential neighbours - -- returned by the queried node. - stabiliseClosestResponder :: LocalNodeState -- ^ own node - -> (LocalNodeState -> [RemoteNodeState]) -- ^ getter function for either predecessors or successors - -> Int -- ^ index of neighbour to query - -> [RemoteNodeState] -- ^ delete accumulator - -> IO (Either String ([RemoteNodeState], [RemoteNodeState])) -- ^ (nodes to be deleted, successfully pinged potential neighbours) - stabiliseClosestResponder ns neighbourGetter neighbourNum deleteAcc - | isNothing (currentNeighbour ns neighbourGetter neighbourNum) = pure $ Left "exhausted all neigbours" - | otherwise = do - let node = fromJust $ currentNeighbour ns neighbourGetter neighbourNum - stabResponse <- requestStabilise ns node - case stabResponse of - -- returning @Left@ signifies the need to try again with next from list - Left err -> stabiliseClosestResponder ns neighbourGetter (neighbourNum+1) (node:deleteAcc) - Right (succs, preds) -> do - -- ping each returned node before actually inserting them - -- send pings in parallel, check whether ID is part of the returned IDs - pingThreads <- mapM (async . checkReachability ns) $ preds <> succs - -- ToDo: exception handling, maybe log them - -- filter out own node - checkedNeighbours <- filter (/= toRemoteNodeState ns) . catMaybes . rights <$> mapM waitCatch pingThreads - pure $ Right (deleteAcc, checkedNeighbours) +-- | create an unconnected UDP Datagram 'Socket' bound to the specified address +mkServerSocket :: HostAddress6 -> PortNumber -> IO Socket +mkServerSocket ip port = do + sockAddr <- addrAddress <$> resolve (Just $ show . fromHostAddress6 $ ip) (Just port) + sock <- socket AF_INET6 Datagram defaultProtocol + setSocketOption sock IPv6Only 1 + bind sock sockAddr + pure sock - - currentNeighbour ns neighbourGetter = atMay $ neighbourGetter ns - - checkReachability :: LocalNodeState -- ^ this node - -> RemoteNodeState -- ^ node to Ping for reachability - -> IO (Maybe RemoteNodeState) -- ^ if the Pinged node handles the requested node state then that one - checkReachability ns toCheck = do - resp <- requestPing ns toCheck - pure $ either (const Nothing) (\vss -> - if toCheck `elem` vss then Just toCheck else Nothing - ) resp - - --- | Receives UDP packets and passes them to other threads via the given TQueue. --- Shall be used as the single receiving thread on the server socket, as multiple --- threads blocking on the same socket degrades performance. -recvThread :: Socket -- ^ server socket to receive packets from - -> TQueue (BS.ByteString, SockAddr) -- ^ receive queue - -> IO () -recvThread sock recvQ = forever $ do - packet <- recvFrom sock 65535 - atomically $ writeTQueue recvQ packet - --- | Only thread to send data it gets from a TQueue through the server socket. -sendThread :: Socket -- ^ server socket used for sending - -> TQueue (BS.ByteString, SockAddr) -- ^ send queue - -> IO () -sendThread sock sendQ = forever $ do - (packet, addr) <- atomically $ readTQueue sendQ - sendAllTo sock packet addr - --- | Sets up and manages the main server threads of FediChord -fediMainThreads :: Socket -> LocalNodeStateSTM -> IO () -fediMainThreads sock nsSTM = do - (\x -> putStrLn $ "launching threads, ns: " <> show x) =<< readTVarIO nsSTM - sendQ <- newTQueueIO - recvQ <- newTQueueIO - -- concurrently launch all handler threads, if one of them throws an exception - -- all get cancelled - concurrently_ - (fediMessageHandler sendQ recvQ nsSTM) $ - concurrently_ (stabiliseThread nsSTM) $ - concurrently_ (cacheVerifyThread nsSTM) $ - concurrently_ (convergenceSampleThread nsSTM) $ - concurrently_ - (sendThread sock sendQ) - (recvThread sock recvQ) - - --- defining this here as, for now, the RequestMap is only used by fediMessageHandler. --- Once that changes, move to FediChordTypes -type RequestMap = Map.Map (SockAddr, Integer) RequestMapEntry - -data RequestMapEntry = RequestMapEntry (Set.Set FediChordMessage) (Maybe Integer) - POSIXTime - --- TODO: make purge age configurable --- | periodically clean up old request parts -responsePurgeAge :: POSIXTime -responsePurgeAge = 60 -- seconds - -requestMapPurge :: MVar RequestMap -> IO () -requestMapPurge mapVar = forever $ do - rMapState <- takeMVar mapVar - now <- getPOSIXTime - putMVar mapVar $ Map.filter (\entry@(RequestMapEntry _ _ ts) -> - now - ts < responsePurgeAge - ) rMapState - threadDelay $ round responsePurgeAge * 2 * 10^6 - - --- | Wait for messages, deserialise them, manage parts and acknowledgement status, --- and pass them to their specific handling function. -fediMessageHandler :: TQueue (BS.ByteString, SockAddr) -- ^ send queue - -> TQueue (BS.ByteString, SockAddr) -- ^ receive queue - -> LocalNodeStateSTM -- ^ acting NodeState - -> IO () -fediMessageHandler sendQ recvQ nsSTM = do - -- Read node state just once, assuming that all relevant data for this function does - -- not change. - -- Other functions are passed the nsSTM reference and thus can get the latest state. - nsSnap <- readTVarIO nsSTM - -- handling multipart messages: - -- Request parts can be insert into a map (key: (sender IP against spoofing, request ID), value: timestamp + set of message parts, handle all of them when size of set == parts) before being handled. This map needs to be purged periodically by a separate thread and can be protected by an MVar for fairness. - requestMap <- newMVar (Map.empty :: RequestMap) - -- run receive loop and requestMapPurge concurrently, so that an exception makes - -- both of them fail - concurrently_ (requestMapPurge requestMap) $ forever $ do - -- wait for incoming messages - (rawMsg, sourceAddr) <- atomically $ readTQueue recvQ - let aMsg = deserialiseMessage rawMsg - either (\_ -> - -- drop invalid messages - pure () - ) - (\validMsg -> - case validMsg of - aRequest@Request{} - -- if not a multipart message, handle immediately. Response is at the same time an ACK - | part aRequest == 1 && isFinalPart aRequest -> - forkIO (handleIncomingRequest nsSTM sendQ (Set.singleton aRequest) sourceAddr) >> pure () - -- otherwise collect all message parts first before handling the whole request - | otherwise -> do - now <- getPOSIXTime - -- critical locking section of requestMap - rMapState <- takeMVar requestMap - -- insert new message and get set - let - theseMaxParts = if isFinalPart aRequest then Just (part aRequest) else Nothing - thisKey = (sourceAddr, requestID aRequest) - newMapState = Map.insertWith (\ - (RequestMapEntry thisMsgSet p' ts) (RequestMapEntry oldMsgSet p'' _) -> - RequestMapEntry (thisMsgSet `Set.union` oldMsgSet) (p' <|> p'') ts - ) - thisKey - (RequestMapEntry (Set.singleton aRequest) theseMaxParts now) - rMapState - -- put map back into MVar, end of critical section - putMVar requestMap newMapState - -- ACK the received part - forM_ (ackRequest (getNid nsSnap) aRequest) $ - \msg -> atomically $ writeTQueue sendQ (msg, sourceAddr) - -- if all parts received, then handle request. - let - (RequestMapEntry theseParts mayMaxParts _) = fromJust $ Map.lookup thisKey newMapState - numParts = Set.size theseParts - if maybe False (numParts ==) (fromIntegral <$> mayMaxParts) - then forkIO (handleIncomingRequest nsSTM sendQ theseParts sourceAddr) >> pure() - else pure() - -- Responses should never arrive on the main server port, as they are always - -- responses to requests sent from dedicated sockets on another port - _ -> pure () - ) - aMsg - - pure () +-- | create a UDP datagram socket, connected to a destination. +-- The socket gets an arbitrary free local port assigned. +mkSendSocket :: String -- ^ destination hostname or IP + -> PortNumber -- ^ destination port + -> IO Socket -- ^ a socket with an arbitrary source port +mkSendSocket dest destPort = do + destAddr <- addrAddress <$> resolve (Just dest) (Just destPort) + sendSock <- socket AF_INET6 Datagram defaultProtocol + setSocketOption sendSock IPv6Only 1 + pure sendSock diff --git a/src/Hash2Pub/FediChordTypes.hs b/src/Hash2Pub/FediChordTypes.hs deleted file mode 100644 index 296ebfa..0000000 --- a/src/Hash2Pub/FediChordTypes.hs +++ /dev/null @@ -1,604 +0,0 @@ -{-# LANGUAGE DataKinds #-} -{-# LANGUAGE DerivingStrategies #-} -{-# LANGUAGE GeneralizedNewtypeDeriving #-} -{-# LANGUAGE OverloadedStrings #-} -{-# LANGUAGE RankNTypes #-} - -module Hash2Pub.FediChordTypes ( - NodeID -- abstract, but newtype constructors cannot be hidden - , idBits - , getNodeID - , toNodeID - , NodeState (..) - , LocalNodeState (..) - , LocalNodeStateSTM - , RemoteNodeState (..) - , RealNode (..) - , RealNodeSTM - , setSuccessors - , setPredecessors - , NodeCache - , CacheEntry(..) - , RingEntry(..) - , RingMap(..) - , HasKeyID - , getKeyID - , rMapSize - , rMapLookup - , rMapLookupPred - , rMapLookupSucc - , addRMapEntry - , addRMapEntryWith - , addPredecessors - , addSuccessors - , takeRMapPredecessors - , takeRMapSuccessors - , deleteRMapEntry - , setRMapEntries - , rMapFromList - , rMapToList - , cacheGetNodeStateUnvalidated - , initCache - , cacheEntries - , cacheLookup - , cacheLookupSucc - , cacheLookupPred - , localCompare - , genNodeID - , genNodeIDBS - , genKeyID - , genKeyIDBS - , byteStringToUInteger - , ipAddrAsBS - , bsAsIpAddr - , FediChordConf(..) - ) where - -import Control.Exception -import Data.Foldable (foldr') -import Data.Function (on) -import Data.List (delete, nub, sortBy) -import qualified Data.Map.Strict as Map -import Data.Maybe (fromJust, fromMaybe, isJust, - isNothing, mapMaybe) -import qualified Data.Set as Set -import Data.Time.Clock.POSIX -import Network.Socket - --- for hashing and ID conversion -import Control.Concurrent.STM -import Control.Concurrent.STM.TQueue -import Control.Concurrent.STM.TVar -import Control.Monad (forever) -import Crypto.Hash -import qualified Data.ByteArray as BA -import qualified Data.ByteString as BS -import qualified Data.ByteString.UTF8 as BSU -import Data.IP (IPv6, fromHostAddress6, - toHostAddress6) -import Data.Typeable (Typeable (..), typeOf) -import Data.Word -import qualified Network.ByteOrder as NetworkBytes - -import Hash2Pub.Utils - -import Debug.Trace (trace) - - - --- define protocol constants --- | static definition of ID length in bits -idBits :: Integer -idBits = 256 - --- |NodeIDs are Integers wrapped in a newtype, to be able to redefine --- their instance behaviour --- --- for being able to check value bounds, the constructor should not be used directly --- and new values are created via @toNodeID@ (newtype constructors cannot be hidden) -newtype NodeID = NodeID { getNodeID :: Integer } deriving stock (Show, Eq) deriving newtype Enum - --- |smart data constructor for NodeID that throws a runtime exception for out-of-bounds values. --- When needing a runtime-safe constructor with drawbacks, try @fromInteger@ -toNodeID :: Integer -> NodeID -toNodeID i = assert (i >= getNodeID minBound && i <= getNodeID maxBound) $ NodeID i - --- |NodeIDs are bounded by the value range of an unsigned Integer of length 'idBits' -instance Bounded NodeID where - minBound = NodeID 0 - maxBound = NodeID (2^idBits - 1) - --- |calculations with NodeIDs are modular arithmetic operations -instance Num NodeID where - a + b = NodeID $ (getNodeID a + getNodeID b) `mod` (getNodeID maxBound + 1) - a * b = NodeID $ (getNodeID a * getNodeID b) `mod` (getNodeID maxBound + 1) - a - b = NodeID $ (getNodeID a - getNodeID b) `mod` (getNodeID maxBound + 1) - -- |safe constructor for NodeID values with the drawback, that out-of-bound values are wrapped around - -- with modulo to fit in the allowed value space. For runtime checking, look at @toNodeID@. - fromInteger i = NodeID $ i `mod` (getNodeID maxBound + 1) - signum = NodeID . signum . getNodeID - abs = NodeID . abs . getNodeID -- ToDo: make sure that at creation time only IDs within the range are used - --- | use normal strict monotonic ordering of integers, realising the ring structure --- is done in the @NodeCache@ implementation -instance Ord NodeID where - a `compare` b = getNodeID a `compare` getNodeID b - --- | local comparison of 2 node IDs, only relevant for determining a successor or predecessor on caches with just 2 nodes -localCompare :: NodeID -> NodeID -> Ordering -a `localCompare` b - | getNodeID a == getNodeID b = EQ - | wayForwards > wayBackwards = GT - | otherwise = LT - where - wayForwards = getNodeID (b - a) - wayBackwards = getNodeID (a - b) - --- | Data for managing the virtual server nodes of this real node. --- Also contains shared data and config values. --- TODO: more data structures for k-choices bookkeeping -data RealNode = RealNode - { vservers :: [LocalNodeStateSTM] - -- ^ references to all active versers - , nodeConfig :: FediChordConf - -- ^ holds the initial configuration read at program start - , bootstrapNodes :: [(String, PortNumber)] - -- ^ nodes to be used as bootstrapping points, new ones learned during operation - } - -type RealNodeSTM = TVar RealNode - --- | represents a node and all its important state -data RemoteNodeState = RemoteNodeState - { nid :: NodeID - , domain :: String - -- ^ full public domain name the node is reachable under - , ipAddr :: HostAddress6 - -- the node's public IPv6 address - , dhtPort :: PortNumber - -- ^ port of the DHT itself - , servicePort :: PortNumber - -- ^ port of the service provided on top of the DHT - , vServerID :: Integer - -- ^ ID of this vserver - } - deriving (Show, Eq) - -instance Ord RemoteNodeState where - a `compare` b = nid a `compare` nid b - --- | represents a node and encapsulates all data and parameters that are not present for remote nodes -data LocalNodeState = LocalNodeState - { nodeState :: RemoteNodeState - -- ^ represents common data present both in remote and local node representations - , nodeCacheSTM :: TVar NodeCache - -- ^ EpiChord node cache with expiry times for nodes - , cacheWriteQueue :: TQueue (NodeCache -> NodeCache) - -- ^ cache updates are not written directly to the 'nodeCache' but queued and - , successors :: [RemoteNodeState] -- could be a set instead as these are ordered as well - -- ^ successor nodes in ascending order by distance - , predecessors :: [RemoteNodeState] - -- ^ predecessor nodes in ascending order by distance - , kNeighbours :: Int - -- ^ desired length of predecessor and successor list - , lNumBestNodes :: Int - -- ^ number of best next hops to provide - , pNumParallelQueries :: Int - -- ^ number of parallel sent queries - , jEntriesPerSlice :: Int - -- ^ number of desired entries per cache slice - , parentRealNode :: RealNodeSTM - -- ^ the parent node managing this vserver instance - } - deriving (Show, Eq) - --- | for concurrent access, LocalNodeState is wrapped in a TVar -type LocalNodeStateSTM = TVar LocalNodeState - --- | class for various NodeState representations, providing --- getters and setters for common values -class NodeState a where - -- getters for common properties - getNid :: a -> NodeID - getDomain :: a -> String - getIpAddr :: a -> HostAddress6 - getDhtPort :: a -> PortNumber - getServicePort :: a -> PortNumber - getVServerID :: a -> Integer - -- setters for common properties - setNid :: NodeID -> a -> a - setDomain :: String -> a -> a - setIpAddr :: HostAddress6 -> a -> a - setDhtPort :: PortNumber -> a -> a - setServicePort :: PortNumber -> a -> a - setVServerID :: Integer -> a -> a - toRemoteNodeState :: a -> RemoteNodeState - -instance NodeState RemoteNodeState where - getNid = nid - getDomain = domain - getIpAddr = ipAddr - getDhtPort = dhtPort - getServicePort = servicePort - getVServerID = vServerID - setNid nid' ns = ns {nid = nid'} - setDomain domain' ns = ns {domain = domain'} - setIpAddr ipAddr' ns = ns {ipAddr = ipAddr'} - setDhtPort dhtPort' ns = ns {dhtPort = dhtPort'} - setServicePort servicePort' ns = ns {servicePort = servicePort'} - setVServerID vServerID' ns = ns {vServerID = vServerID'} - toRemoteNodeState = id - --- | helper function for setting values on the 'RemoteNodeState' contained in the 'LocalNodeState' -propagateNodeStateSet_ :: (RemoteNodeState -> RemoteNodeState) -> LocalNodeState -> LocalNodeState -propagateNodeStateSet_ func ns = let - newNs = func $ nodeState ns - in - ns {nodeState = newNs} - - -instance NodeState LocalNodeState where - getNid = getNid . nodeState - getDomain = getDomain . nodeState - getIpAddr = getIpAddr . nodeState - getDhtPort = getDhtPort . nodeState - getServicePort = getServicePort . nodeState - getVServerID = getVServerID . nodeState - setNid nid' = propagateNodeStateSet_ $ setNid nid' - setDomain domain' = propagateNodeStateSet_ $ setDomain domain' - setIpAddr ipAddr' = propagateNodeStateSet_ $ setIpAddr ipAddr' - setDhtPort dhtPort' = propagateNodeStateSet_ $ setDhtPort dhtPort' - setServicePort servicePort' = propagateNodeStateSet_ $ setServicePort servicePort' - setVServerID vServerID' = propagateNodeStateSet_ $ setVServerID vServerID' - toRemoteNodeState = nodeState - --- | defining Show instances to be able to print NodeState for debug purposes -instance Typeable a => Show (TVar a) where - show x = show (typeOf x) - -instance Typeable a => Show (TQueue a) where - show x = show (typeOf x) - - --- | convenience function that replaces the predecessors of a 'LocalNodeState' with the k closest nodes from the provided list -setPredecessors :: [RemoteNodeState] -> LocalNodeState -> LocalNodeState -setPredecessors preds ns = ns {predecessors = takeRMapPredecessors (getNid ns) (kNeighbours ns) . rMapFromList . filter ((/=) (getNid ns) . getNid) $ preds} - --- | convenience function that replaces the successors of a 'LocalNodeState' with the k closest nodes from the provided list -setSuccessors :: [RemoteNodeState] -> LocalNodeState -> LocalNodeState -setSuccessors succs ns = ns {successors = takeRMapSuccessors (getNid ns) (kNeighbours ns) . rMapFromList . filter ((/=) (getNid ns) . getNid) $ succs} - --- | sets the predecessors of a 'LocalNodeState' to the closest k nodes of the current predecessors and the provided list, combined -addPredecessors :: [RemoteNodeState] -> LocalNodeState -> LocalNodeState -addPredecessors preds ns = ns {predecessors = takeRMapPredecessors (getNid ns) (kNeighbours ns) . addRMapEntries (filter ((/=) (getNid ns) . getNid) preds) . rMapFromList $ predecessors ns} - --- | sets the successors of a 'LocalNodeState' to the closest k nodes of the current successors and the provided list, combined -addSuccessors :: [RemoteNodeState] -> LocalNodeState -> LocalNodeState -addSuccessors succs ns = ns {successors = takeRMapSuccessors (getNid ns) (kNeighbours ns) . addRMapEntries (filter ((/=) (getNid ns) . getNid) succs) . rMapFromList $ successors ns} - --- | Class for all types that can be identified via an EpiChord key. --- Used for restricting the types a 'RingMap' can store -class (Eq a, Show a) => HasKeyID a where - getKeyID :: a -> NodeID - -instance HasKeyID RemoteNodeState where - getKeyID = getNid - -instance HasKeyID CacheEntry where - getKeyID (CacheEntry _ ns _) = getNid ns - -instance HasKeyID NodeID where - getKeyID = id - -type NodeCache = RingMap CacheEntry - --- | generic data structure for holding elements with a key and modular lookup -newtype RingMap a = RingMap { getRingMap :: HasKeyID a => Map.Map NodeID (RingEntry a) } - -instance (HasKeyID a) => Eq (RingMap a) where - a == b = getRingMap a == getRingMap b - -instance (HasKeyID a) => Show (RingMap a) where - show rmap = shows "RingMap " (show $ getRingMap rmap) - --- | entry of a 'RingMap' that holds a value and can also --- wrap around the lookup direction at the edges of the name space. -data RingEntry a = KeyEntry a - | ProxyEntry (NodeID, ProxyDirection) (Maybe (RingEntry a)) - deriving (Show, Eq) - --- | 'RingEntry' type for usage as a node cache -data CacheEntry = CacheEntry Bool RemoteNodeState POSIXTime - deriving (Show, Eq) - - --- | as a compromise, only KeyEntry components are ordered by their NodeID --- while ProxyEntry components should never be tried to be ordered. -instance (HasKeyID a, Eq a) => Ord (RingEntry a) where - a `compare` b = compare (extractID a) (extractID b) - where - extractID (KeyEntry e) = getKeyID e - extractID ProxyEntry{} = error "proxy entries should never appear outside of the RingMap" - -data ProxyDirection = Backwards - | Forwards - deriving (Show, Eq) - -instance Enum ProxyDirection where - toEnum (-1) = Backwards - toEnum 1 = Forwards - toEnum _ = error "no such ProxyDirection" - fromEnum Backwards = - 1 - fromEnum Forwards = 1 - --- | helper function for getting the a from a RingEntry a -extractRingEntry :: HasKeyID a => RingEntry a -> Maybe a -extractRingEntry (KeyEntry entry) = Just entry -extractRingEntry (ProxyEntry _ (Just (KeyEntry entry))) = Just entry -extractRingEntry _ = Nothing - ---- useful function for getting entries for a full cache transfer -cacheEntries :: NodeCache -> [CacheEntry] -cacheEntries = mapMaybe extractRingEntry . Map.elems . getRingMap - --- | An empty 'RingMap' needs to be initialised with 2 proxy entries, --- linking the modular name space together by connecting @minBound@ and @maxBound@ -emptyRMap :: HasKeyID a => RingMap a -emptyRMap = RingMap . Map.fromList $ proxyEntry <$> [(maxBound, (minBound, Forwards)), (minBound, (maxBound, Backwards))] - where - proxyEntry (from,to) = (from, ProxyEntry to Nothing) - -initCache :: NodeCache -initCache = emptyRMap - --- | Maybe returns the entry stored at given key -rMapLookup :: HasKeyID a - => NodeID -- ^lookup key - -> RingMap a -- ^lookup cache - -> Maybe a -rMapLookup key rmap = extractRingEntry =<< Map.lookup key (getRingMap rmap) - -cacheLookup :: NodeID -- ^lookup key - -> NodeCache -- ^lookup cache - -> Maybe CacheEntry -cacheLookup = rMapLookup - --- | returns number of present 'KeyEntry' in a properly initialised 'RingMap' -rMapSize :: (HasKeyID a, Integral i) - => RingMap a - -> i -rMapSize rmap = fromIntegral $ Map.size innerMap - oneIfEntry minBound - oneIfEntry maxBound - where - innerMap = getRingMap rmap - oneIfEntry :: Integral i => NodeID -> i - oneIfEntry nid - | isNothing (rMapLookup nid rmap) = 1 - | otherwise = 0 - --- | a wrapper around lookup functions, making the lookup redirectable by a @ProxyEntry@ --- to simulate a modular ring -lookupWrapper :: HasKeyID a - => (NodeID -> Map.Map NodeID (RingEntry a) -> Maybe (NodeID, RingEntry a)) - -> (NodeID -> Map.Map NodeID (RingEntry a) -> Maybe (NodeID, RingEntry a)) - -> ProxyDirection - -> NodeID - -> RingMap a - -> Maybe a -lookupWrapper f fRepeat direction key rmap = - case f key $ getRingMap rmap of - -- the proxy entry found holds a - Just (_, ProxyEntry _ (Just (KeyEntry entry))) -> Just entry - -- proxy entry holds another proxy entry, this should not happen - Just (_, ProxyEntry _ (Just (ProxyEntry _ _))) -> Nothing - -- proxy entry without own entry is a pointer on where to continue - -- if lookup direction is the same as pointer direction: follow pointer - Just (foundKey, ProxyEntry (pointerID, pointerDirection) Nothing) -> - let newKey = if pointerDirection == direction - then pointerID - else foundKey + (fromInteger . toInteger . fromEnum $ direction) - in if rMapNotEmpty rmap - then lookupWrapper fRepeat fRepeat direction newKey rmap - else Nothing - -- normal entries are returned - Just (_, KeyEntry entry) -> Just entry - Nothing -> Nothing - where - rMapNotEmpty :: (HasKeyID a) => RingMap a -> Bool - rMapNotEmpty rmap' = (Map.size (getRingMap rmap') > 2) -- there are more than the 2 ProxyEntries - || isJust (rMapLookup minBound rmap') -- or one of the ProxyEntries holds a node - || isJust (rMapLookup maxBound rmap') - --- | find the successor node to a given key on a modular EpiChord ring. --- Note: The EpiChord definition of "successor" includes the node at the key itself, --- if existing. -rMapLookupSucc :: HasKeyID a - => NodeID -- ^lookup key - -> RingMap a -- ^ring cache - -> Maybe a -rMapLookupSucc = lookupWrapper Map.lookupGE Map.lookupGE Forwards - -cacheLookupSucc :: NodeID -- ^lookup key - -> NodeCache -- ^ring cache - -> Maybe CacheEntry -cacheLookupSucc = rMapLookupSucc - --- | find the predecessor node to a given key on a modular EpiChord ring. -rMapLookupPred :: HasKeyID a - => NodeID -- ^lookup key - -> RingMap a -- ^ring cache - -> Maybe a -rMapLookupPred = lookupWrapper Map.lookupLT Map.lookupLE Backwards - -cacheLookupPred :: NodeID -- ^lookup key - -> NodeCache -- ^ring cache - -> Maybe CacheEntry -cacheLookupPred = rMapLookupPred - -addRMapEntryWith :: HasKeyID a - => (RingEntry a -> RingEntry a -> RingEntry a) - -> a - -> RingMap a - -> RingMap a -addRMapEntryWith combineFunc entry = RingMap - . Map.insertWith combineFunc (getKeyID entry) (KeyEntry entry) - . getRingMap - -addRMapEntry :: HasKeyID a - => a - -> RingMap a - -> RingMap a -addRMapEntry = addRMapEntryWith insertCombineFunction - where - insertCombineFunction newVal oldVal = - case oldVal of - ProxyEntry n _ -> ProxyEntry n (Just newVal) - KeyEntry _ -> newVal - - -addRMapEntries :: (Foldable t, HasKeyID a) - => t a - -> RingMap a - -> RingMap a -addRMapEntries entries rmap = foldr' addRMapEntry rmap entries - -setRMapEntries :: (Foldable t, HasKeyID a) - => t a - -> RingMap a -setRMapEntries entries = addRMapEntries entries emptyRMap - -deleteRMapEntry :: (HasKeyID a) - => NodeID - -> RingMap a - -> RingMap a -deleteRMapEntry nid = RingMap . Map.update modifier nid . getRingMap - where - modifier (ProxyEntry idPointer _) = Just (ProxyEntry idPointer Nothing) - modifier KeyEntry {} = Nothing - -rMapToList :: (HasKeyID a) => RingMap a -> [a] -rMapToList = mapMaybe extractRingEntry . Map.elems . getRingMap - -rMapFromList :: (HasKeyID a) => [a] -> RingMap a -rMapFromList = setRMapEntries - --- | takes up to i entries from a 'RingMap' by calling a getter function on a --- *startAt* value and after that on the previously returned value. --- Stops once i entries have been taken or an entry has been encountered twice --- (meaning the ring has been traversed completely). --- Forms the basis for 'takeRMapSuccessors' and 'takeRMapPredecessors'. -takeRMapEntries_ :: (HasKeyID a, Integral i) - => (NodeID -> RingMap a -> Maybe a) - -> NodeID - -> i - -> RingMap a - -> [a] --- TODO: might be more efficient with dlists -takeRMapEntries_ getterFunc startAt num rmap = reverse $ - case getterFunc startAt rmap of - Nothing -> [] - Just anEntry -> takeEntriesUntil (getKeyID anEntry) (getKeyID anEntry) (num-1) [anEntry] - where - takeEntriesUntil havingReached previousEntry remaining takeAcc - | remaining <= 0 = takeAcc - | getKeyID (fromJust $ getterFunc previousEntry rmap) == havingReached = takeAcc - | otherwise = let (Just gotEntry) = getterFunc previousEntry rmap - in takeEntriesUntil havingReached (getKeyID gotEntry) (remaining-1) (gotEntry:takeAcc) - -takeRMapPredecessors :: (HasKeyID a, Integral i) - => NodeID - -> i - -> RingMap a - -> [a] -takeRMapPredecessors = takeRMapEntries_ rMapLookupPred - -takeRMapSuccessors :: (HasKeyID a, Integral i) - => NodeID - -> i - -> RingMap a - -> [a] -takeRMapSuccessors = takeRMapEntries_ rMapLookupSucc - --- clean up cache entries: once now - entry > maxAge --- transfer difference now - entry to other node - --- | return the @NodeState@ data from a cache entry without checking its validation status -cacheGetNodeStateUnvalidated :: CacheEntry -> RemoteNodeState -cacheGetNodeStateUnvalidated (CacheEntry _ nState _) = nState - --- | converts a 'HostAddress6' IP address to a big-endian strict ByteString -ipAddrAsBS :: HostAddress6 -> BS.ByteString -ipAddrAsBS (a, b, c, d) = mconcat $ fmap NetworkBytes.bytestring32 [a, b, c, d] - --- | converts a ByteString in big endian order to an IPv6 address 'HostAddress6' -bsAsIpAddr :: BS.ByteString -> HostAddress6 -bsAsIpAddr bytes = (a,b,c,d) - where - a:b:c:d:_ = fmap NetworkBytes.word32 . chunkBytes 4 $ bytes - - --- | generates a 256 bit long NodeID using SHAKE128, represented as ByteString -genNodeIDBS :: HostAddress6 -- ^a node's IPv6 address - -> String -- ^a node's 1st and 2nd level domain name - -> Word8 -- ^the used vserver ID - -> BS.ByteString -- ^the NodeID as a 256bit ByteString big-endian unsigned integer -genNodeIDBS ip nodeDomain vserver = - hashIpaddrUpper `BS.append` hashID nodeDomain' `BS.append` hashIpaddLower - where - vsBS = BS.pack [vserver] -- attention: only works for vserver IDs up to 255 - ipaddrNet = BS.take 8 (ipAddrAsBS ip) `BS.append` vsBS - nodeDomain' = BSU.fromString nodeDomain `BS.append` vsBS - hashID bstr = BS.pack . BA.unpack $ (hash bstr :: Digest (SHAKE128 128)) - (hashIpaddrUpper, hashIpaddLower) = BS.splitAt 64 $ hashID ipaddrNet - - --- | generates a 256 bit long @NodeID@ using SHAKE128 -genNodeID :: HostAddress6 -- ^a node's IPv6 address - -> String -- ^a node's 1st and 2nd level domain name - -> Word8 -- ^the used vserver ID - -> NodeID -- ^the generated @NodeID@ -genNodeID ip nodeDomain vs = NodeID . byteStringToUInteger $ genNodeIDBS ip nodeDomain vs - --- | generates a 256 bit long key identifier, represented as ByteString, for looking up its data on the DHT -genKeyIDBS :: String -- ^the key string - -> BS.ByteString -- ^the key ID represented as a @ByteString@ -genKeyIDBS key = BS.pack . BA.unpack $ (hash (BSU.fromString key) :: Digest SHA3_256) - --- | generates a 256 bit long key identifier for looking up its data on the DHT -genKeyID :: String -- ^the key string - -> NodeID -- ^the key ID -genKeyID = NodeID . byteStringToUInteger . genKeyIDBS - - --- | parses the bit pattern of a ByteString as an unsigned Integer in Big Endian order --- by iterating it byte-wise from the back and shifting the byte values according to their offset -byteStringToUInteger :: BS.ByteString -> Integer -byteStringToUInteger bs = sum $ parsedBytes 0 bs - where - parsedBytes :: Integer -> BS.ByteString -> [ Integer ] - parsedBytes offset uintBs = case BS.unsnoc uintBs of - Nothing -> [] - Just (bs', w) -> parseWithOffset offset w : parsedBytes (offset+1) bs' - - parseWithOffset :: Integer -> Word8 -> Integer - parseWithOffset 0 word = toInteger word -- a shift of 0 is always 0 - parseWithOffset offset word = toInteger word * 2^(8 * offset) - --- Todo: DHT backend can learn potential initial bootstrapping points through the instances mentioned in the received AP-relay messages --- persist them on disk so they can be used for all following bootstraps - --- | configuration values used for initialising the FediChord DHT -data FediChordConf = FediChordConf - { confDomain :: String - -- ^ the domain/ hostname the node is reachable under - , confIP :: HostAddress6 - -- ^ IP address of outgoing packets - , confDhtPort :: Int - -- ^ listening port for the FediChord DHT - , confBootstrapNodes :: [(String, PortNumber)] - -- ^ list of potential bootstrapping nodes - , confBootstrapSamplingInterval :: Int - -- ^ pause between sampling the own ID through bootstrap nodes, in seconds - } - deriving (Show, Eq) - - diff --git a/src/Hash2Pub/ProtocolTypes.hs b/src/Hash2Pub/ProtocolTypes.hs deleted file mode 100644 index 15cb863..0000000 --- a/src/Hash2Pub/ProtocolTypes.hs +++ /dev/null @@ -1,101 +0,0 @@ -module Hash2Pub.ProtocolTypes where - -import qualified Data.Map as Map -import Data.Maybe (mapMaybe) -import qualified Data.Set as Set -import Data.Time.Clock.POSIX (POSIXTime) - -import Hash2Pub.FediChordTypes - -data QueryResponse = FORWARD (Set.Set RemoteCacheEntry) - | FOUND RemoteNodeState - deriving (Show, Eq) - --- === protocol serialisation data types - -data Action = QueryID - | Join - | Leave - | Stabilise - | Ping - deriving (Show, Eq, Enum) - -data FediChordMessage = Request - { requestID :: Integer - , sender :: RemoteNodeState - , part :: Integer - , isFinalPart :: Bool - -- ^ part starts at 1 - , action :: Action - , payload :: Maybe ActionPayload - } - | Response - { requestID :: Integer - , senderID :: NodeID - , part :: Integer - , isFinalPart :: Bool - , action :: Action - , payload :: Maybe ActionPayload - } - deriving (Show, Eq) - -instance Ord FediChordMessage where - compare a@Request{} b@Request{} | requestID a == requestID b = part a `compare` part b - | otherwise = requestID a `compare` requestID b - compare a@Response{} b@Response{} | requestID a == requestID b = part a `compare` part b - | otherwise = requestID a `compare` requestID b - -- comparing different constructor types always yields "not equal" - compare _ _ = LT - -data ActionPayload = QueryIDRequestPayload - { queryTargetID :: NodeID - , queryLBestNodes :: Integer - } - | JoinRequestPayload - | LeaveRequestPayload - { leaveSuccessors :: [RemoteNodeState] - , leavePredecessors :: [RemoteNodeState] - } - | StabiliseRequestPayload - | PingRequestPayload - | QueryIDResponsePayload - { queryResult :: QueryResponse - } - | JoinResponsePayload - { joinSuccessors :: [RemoteNodeState] - , joinPredecessors :: [RemoteNodeState] - , joinCache :: [RemoteCacheEntry] - } - | LeaveResponsePayload - | StabiliseResponsePayload - { stabiliseSuccessors :: [RemoteNodeState] - , stabilisePredecessors :: [RemoteNodeState] - } - | PingResponsePayload - { pingNodeStates :: [RemoteNodeState] - } - deriving (Show, Eq) - --- | global limit of parts per message used when (de)serialising messages. --- Used to limit the impact of DOS attempts with partial messages. -maximumParts :: Num a => a -maximumParts = 150 - --- | dedicated data type for cache entries sent to or received from the network, --- as these have to be considered as unvalidated. Also helps with separation of trust. -data RemoteCacheEntry = RemoteCacheEntry RemoteNodeState POSIXTime - deriving (Show, Eq) - -instance Ord RemoteCacheEntry where - (RemoteCacheEntry ns1 _) `compare` (RemoteCacheEntry ns2 _) = nid ns1 `compare` nid ns2 - -toRemoteCacheEntry :: CacheEntry -> RemoteCacheEntry -toRemoteCacheEntry (CacheEntry _ ns ts) = RemoteCacheEntry ns ts - --- | a list of all entries of a 'NodeCache' as 'RemoteCacheEntry', useful for cache transfers -toRemoteCache :: NodeCache -> [RemoteCacheEntry] -toRemoteCache cache = toRemoteCacheEntry <$> cacheEntries cache - --- | extract the 'NodeState' from a 'RemoteCacheEntry' -remoteNode :: RemoteCacheEntry -> RemoteNodeState -remoteNode (RemoteCacheEntry ns _) = ns diff --git a/test/FediChordSpec.hs b/test/FediChordSpec.hs index 1cace7a..50f0d66 100644 --- a/test/FediChordSpec.hs +++ b/test/FediChordSpec.hs @@ -1,13 +1,13 @@ {-# LANGUAGE OverloadedStrings #-} module FediChordSpec where -import Control.Concurrent.STM.TVar import Control.Exception -import Data.ASN1.Parse (runParseASN1) -import qualified Data.ByteString as BS -import qualified Data.Map.Strict as Map -import Data.Maybe (fromJust, isJust) -import qualified Data.Set as Set +import Data.ASN1.Parse (runParseASN1) +import qualified Data.ByteString as BS +import Data.IORef +import qualified Data.Map.Strict as Map +import Data.Maybe (fromJust) +import qualified Data.Set as Set import Data.Time.Clock.POSIX import Network.Socket import Test.Hspec @@ -15,7 +15,6 @@ import Test.Hspec import Hash2Pub.ASN1Coding import Hash2Pub.DHTProtocol import Hash2Pub.FediChord -import Hash2Pub.FediChordTypes spec :: Spec spec = do @@ -56,13 +55,14 @@ spec = do it "can be initialised" $ print exampleNodeState it "can be initialised partly and then modified later" $ do - let ns = RemoteNodeState { + let ns = NodeState { nid = undefined , domain = exampleNodeDomain , ipAddr = exampleIp , dhtPort = 2342 - , servicePort = 513 + , apPort = Nothing , vServerID = undefined + , internals = Nothing } nsReady = ns { nid = genNodeID (ipAddr ns) (domain ns) 3 @@ -81,8 +81,8 @@ spec = do newCache = addCacheEntryPure 10 (RemoteCacheEntry exampleNodeState 10) (addCacheEntryPure 10 (RemoteCacheEntry anotherNode 10) emptyCache) exampleID = nid exampleNodeState it "entries can be added to a node cache and looked up again" $ do - rMapSize emptyCache `shouldBe` 0 - rMapSize newCache `shouldBe` 2 + -- the cache includes 2 additional proxy elements right from the start + Map.size newCache - Map.size emptyCache `shouldBe` 2 -- normal entry lookup nid . cacheGetNodeStateUnvalidated <$> cacheLookup anotherID newCache `shouldBe` Just anotherID nid . cacheGetNodeStateUnvalidated <$> cacheLookup (anotherID+1) newCache `shouldBe` Nothing @@ -121,72 +121,50 @@ spec = do let emptyCache = initCache nid1 = toNodeID 2^(23::Integer)+1 - node1 = setPredecessors [node4] . setNid nid1 <$> exampleLocalNode + node1 = do + eln <- exampleLocalNode -- is at 2^23.00000017198264 = 8388609 + pure $ putPredecessors [nid4] $ eln {nid = nid1} nid2 = toNodeID 2^(230::Integer)+12 node2 = exampleNodeState { nid = nid2} nid3 = toNodeID 2^(25::Integer)+10 node3 = exampleNodeState { nid = nid3} nid4 = toNodeID 2^(9::Integer)+100 node4 = exampleNodeState { nid = nid4} - nid5 = toNodeID 2^(25::Integer)+100 - node5 = exampleNodeState { nid = nid5} - cacheWith2Entries :: NodeCache - cacheWith2Entries = addCacheEntryPure 10 (RemoteCacheEntry node5 10) (addCacheEntryPure 10 (RemoteCacheEntry node2 10) emptyCache) - cacheWith4Entries = addCacheEntryPure 10 (RemoteCacheEntry node3 10) (addCacheEntryPure 10 (RemoteCacheEntry node4 10) cacheWith2Entries) - it "unjoined nodes should never return themselfs" $ do - exampleLocalNodeAsRemote <- toRemoteNodeState <$> exampleLocalNode + cacheWith2Entries :: IO NodeCache + cacheWith2Entries = addCacheEntryPure 10 <$> (RemoteCacheEntry <$> node1 <*> pure 10) <*> pure (addCacheEntryPure 10 (RemoteCacheEntry node2 10) emptyCache) + cacheWith4Entries = addCacheEntryPure 10 (RemoteCacheEntry node3 10) <$> (addCacheEntryPure 10 (RemoteCacheEntry node4 10) <$> cacheWith2Entries) + it "works on an empty cache" $ do queryLocalCache <$> exampleLocalNode <*> pure emptyCache <*> pure 3 <*> pure (toNodeID 2^(9::Integer)+5) `shouldReturn` FORWARD Set.empty - (FORWARD fwSet) <- queryLocalCache <$> exampleLocalNode <*> pure cacheWith4Entries <*> pure 1 <*> (getNid <$> exampleLocalNode) - remoteNode (head $ Set.elems fwSet) `shouldBe` node4 - it "joined nodes do not fall back to the default" $ - queryLocalCache <$> node1 <*> pure emptyCache <*> pure 1 <*> pure (toNodeID 3) `shouldReturn` FORWARD Set.empty + queryLocalCache <$> exampleLocalNode <*> pure emptyCache <*> pure 1 <*> pure (toNodeID 2342) `shouldReturn` FORWARD Set.empty it "works on a cache with less entries than needed" $ do - (FORWARD nodeset) <- queryLocalCache <$> node1 <*> pure cacheWith2Entries <*> pure 4 <*> pure (toNodeID 2^(9::Integer)+5) - Set.map (nid . remoteNode) nodeset `shouldBe` Set.fromList [ nid5, nid2 ] + (FORWARD nodeset) <- queryLocalCache <$> exampleLocalNode <*> cacheWith2Entries <*> pure 4 <*> pure (toNodeID 2^(9::Integer)+5) + Set.map (nid . remoteNode_) nodeset `shouldBe` Set.fromList [ nid1, nid2 ] it "works on a cache with sufficient entries" $ do - (FORWARD nodeset1) <- queryLocalCache <$> node1 <*> pure cacheWith4Entries <*> pure 3 <*> pure (toNodeID 2^(9::Integer)+5) - (FORWARD nodeset2) <- queryLocalCache <$> node1 <*> pure cacheWith4Entries <*> pure 1 <*> pure (toNodeID 2^(9::Integer)+5) - Set.map (nid . remoteNode) nodeset1 `shouldBe` Set.fromList [nid4, nid2, nid5] - Set.map (nid . remoteNode) nodeset2 `shouldBe` Set.fromList [nid4] + (FORWARD nodeset1) <- queryLocalCache <$> exampleLocalNode <*> cacheWith4Entries <*> pure 3 <*> pure (toNodeID 2^(9::Integer)+5) + (FORWARD nodeset2) <- queryLocalCache <$> exampleLocalNode <*> cacheWith4Entries <*> pure 1 <*> pure (toNodeID 2^(9::Integer)+5) + Set.map (nid . remoteNode_) nodeset1 `shouldBe` Set.fromList [nid4, nid2, nid3] + Set.map (nid . remoteNode_) nodeset2 `shouldBe` Set.fromList [nid4] it "recognises the node's own responsibility" $ do - FOUND selfQueryRes <- queryLocalCache <$> node1 <*> pure cacheWith4Entries <*> pure 3 <*> pure nid1 - getNid <$> node1 `shouldReturn` getNid selfQueryRes - FOUND responsibilityResult <- queryLocalCache <$> node1 <*> pure cacheWith4Entries <*> pure 3 <*> pure (toNodeID 2^(22::Integer)) - getNid <$> node1 `shouldReturn` getNid responsibilityResult - describe "successors and predecessors do not disturb the ring characteristics of EpiChord operations (see #48)" $ do - let - emptyCache = initCache - -- implicitly relies on kNeighbours to be <= 3 - thisNid = toNodeID 1000 - thisNode = setNid thisNid <$> exampleLocalNode - nid2 = toNodeID 1003 - node2 = exampleNodeState { nid = nid2} - nid3 = toNodeID 1010 - node3 = exampleNodeState { nid = nid3} - nid4 = toNodeID 1020 - node4 = exampleNodeState { nid = nid4} - nid5 = toNodeID 1025 - node5 = exampleNodeState { nid = nid5} - allRemoteNodes = [node2, node3, node4, node5] - it "lookups also work for slices larger than 1/2 key space" $ do - node <- setSuccessors allRemoteNodes . setPredecessors allRemoteNodes <$> thisNode - -- do lookup on empty cache but with successors for a key > 1/2 key space - -- succeeding the node - queryLocalCache node emptyCache 1 (nid5 + 10) `shouldBe` FOUND (toRemoteNodeState node) - + FOUND selfQueryRes <- queryLocalCache <$> node1 <*> cacheWith4Entries <*> pure 3 <*> pure nid1 + nid <$> node1 `shouldReturn` nid selfQueryRes + FOUND responsibilityResult <- queryLocalCache <$> node1 <*> cacheWith4Entries <*> pure 3 <*> pure (toNodeID 2^(22::Integer)) + nid <$> node1 `shouldReturn` nid responsibilityResult + it "does not fail on nodes without neighbours (initial state)" $ do + (FORWARD nodeset) <- queryLocalCache <$> exampleLocalNode <*> cacheWith4Entries <*> pure 3 <*> pure (toNodeID 11) + Set.map (nid . remoteNode_ ) nodeset `shouldBe` Set.fromList [nid4, nid2, nid3] describe "Messages can be encoded to and decoded from ASN.1" $ do -- define test messages let - someNodes = fmap (flip setNid exampleNodeState . fromInteger) [3..12] + someNodeIDs = fmap fromInteger [3..12] qidReqPayload = QueryIDRequestPayload { queryTargetID = nid exampleNodeState , queryLBestNodes = 3 } jReqPayload = JoinRequestPayload lReqPayload = LeaveRequestPayload { - leaveSuccessors = someNodes - , leavePredecessors = someNodes + leaveSuccessors = someNodeIDs + , leavePredecessors = someNodeIDs } stabReqPayload = StabiliseRequestPayload pingReqPayload = PingRequestPayload @@ -200,8 +178,8 @@ spec = do ] } jResPayload = JoinResponsePayload { - joinSuccessors = someNodes - , joinPredecessors = someNodes + joinSuccessors = someNodeIDs + , joinPredecessors = someNodeIDs , joinCache = [ RemoteCacheEntry exampleNodeState (toEnum 23420001) , RemoteCacheEntry (exampleNodeState {nid = fromInteger (-5)}) (toEnum 0) @@ -209,7 +187,7 @@ spec = do } lResPayload = LeaveResponsePayload stabResPayload = StabiliseResponsePayload { - stabiliseSuccessors = someNodes + stabiliseSuccessors = someNodeIDs , stabilisePredecessors = [] } pingResPayload = PingResponsePayload { @@ -221,16 +199,16 @@ spec = do requestTemplate = Request { requestID = 2342 , sender = exampleNodeState + , parts = 1 , part = 1 - , isFinalPart = True , action = undefined , payload = undefined } responseTemplate = Response { - requestID = 2342 + responseTo = 2342 , senderID = nid exampleNodeState + , parts = 1 , part = 1 - , isFinalPart = True , action = undefined , payload = undefined } @@ -238,12 +216,6 @@ spec = do responseWith a pa = responseTemplate {action = a, payload = Just pa} encodeDecodeAndCheck msg = runParseASN1 parseMessage (encodeMessage msg) `shouldBe` pure msg - largeMessage = responseWith Join $ JoinResponsePayload { - joinSuccessors = flip setNid exampleNodeState . fromInteger <$> [-20..150] - , joinPredecessors = flip setNid exampleNodeState . fromInteger <$> [5..11] - , joinCache = [ RemoteCacheEntry (exampleNodeState {nid = node}) 290001 | node <- [50602,506011..60000]] - } - it "messages are encoded and decoded correctly from and to ASN1" $ do encodeDecodeAndCheck $ requestWith QueryID qidReqPayload encodeDecodeAndCheck $ requestWith Join jReqPayload @@ -259,54 +231,35 @@ spec = do it "messages are encoded and decoded to ASN.1 DER properly" $ deserialiseMessage (fromJust $ Map.lookup 1 (serialiseMessage 652 $ responseWith Ping pingResPayload)) `shouldBe` Right (responseWith Ping pingResPayload) it "messages too large for a single packet can (often) be split into multiple parts" $ do + let largeMessage = responseWith Join $ JoinResponsePayload { + joinSuccessors = fromInteger <$> [-20..150] + , joinPredecessors = fromInteger <$> [5..11] + , joinCache = [ RemoteCacheEntry (exampleNodeState {nid = node}) 290001 | node <- [50602,506011..60000]] + } -- TODO: once splitting works more efficient, test for exact number or payload, see #18 length (serialiseMessage 600 largeMessage) > 1 `shouldBe` True - length (serialiseMessage 60000 largeMessage) `shouldBe` 1 - it "message part numbering starts at the submitted part number" $ do - isJust (Map.lookup 1 (serialiseMessage 600 largeMessage)) `shouldBe` True - let startAt5 = serialiseMessage 600 (largeMessage {part = 5}) - Map.lookup 1 startAt5 `shouldBe` Nothing - part <$> (deserialiseMessage . fromJust) (Map.lookup 5 startAt5) `shouldBe` Right 5 - describe "join cache lookup" $ - it "A bootstrap cache initialised with just one node returns that one." $ do - let - bootstrapNid = toNodeID 34804191837661041451755206127000721433747285589603756490902196113256157045194 - bootstrapNode = setNid bootstrapNid exampleNodeState - bootstrapCache = addCacheEntryPure 10 (RemoteCacheEntry bootstrapNode 19) initCache - ownId = toNodeID 34804191837661041451755206127000721433707928516052624394829818586723613390165 - ownNode <- setNid ownId <$> exampleLocalNode - let (FORWARD qResult) = queryLocalCache ownNode bootstrapCache 2 ownId - remoteNode (head $ Set.elems qResult) `shouldBe` bootstrapNode - + length (serialiseMessage 6000 largeMessage) `shouldBe` 1 -- some example data -exampleNodeState :: RemoteNodeState -exampleNodeState = RemoteNodeState { +exampleNodeState :: NodeState +exampleNodeState = NodeState { nid = toNodeID 12 , domain = exampleNodeDomain , ipAddr = exampleIp , dhtPort = 2342 - , servicePort = 513 + , apPort = Nothing , vServerID = 0 + , internals = Nothing } -exampleLocalNode :: IO LocalNodeState -exampleLocalNode = nodeStateInit =<< (newTVarIO $ RealNode { - vservers = [] - , nodeConfig = exampleFediConf - , bootstrapNodes = confBootstrapNodes exampleFediConf - }) - - -exampleFediConf :: FediChordConf -exampleFediConf = FediChordConf { +exampleLocalNode :: IO NodeState +exampleLocalNode = nodeStateInit $ FediChordConf { confDomain = "example.social" , confIP = exampleIp , confDhtPort = 2342 } - exampleNodeDomain :: String exampleNodeDomain = "example.social" exampleVs :: (Integral i) => i