diff --git a/.hlint.yaml b/.hlint.yaml index 4fa15a6..dfe9a89 100644 --- a/.hlint.yaml +++ b/.hlint.yaml @@ -4,5 +4,5 @@ - error: { lhs: return, rhs: pure } -- ignore: {name: "Avoid lambda using `infix`"} +- ignore: {name: ["Avoid lambda using `infix`", "Use lambda-case"]} diff --git a/FediChord.asn1 b/FediChord.asn1 index 7c53cb0..f278f8f 100644 --- a/FediChord.asn1 +++ b/FediChord.asn1 @@ -4,14 +4,16 @@ NodeID ::= INTEGER (0..115792089237316195423570985008687907853269984665640564039 Domain ::= VisibleString +Partnum ::= INTEGER (0..150) + Action ::= ENUMERATED {queryID, join, leave, stabilise, ping} Request ::= SEQUENCE { action Action, - requestID INTEGER, + requestID INTEGER (0..4294967295), -- arbitrarily restricting to an unsigned 32bit integer sender NodeState, - parts INTEGER (0..150), -- number of message parts - part INTEGER (0..150), -- part number of this message, starts at 1 + part Partnum, -- part number of this message, starts at 1 + finalPart BOOLEAN, -- flag indicating this `part` to be the last of this reuest actionPayload CHOICE { queryIDRequestPayload QueryIDRequestPayload, joinRequestPayload JoinRequestPayload, @@ -25,10 +27,11 @@ Request ::= SEQUENCE { -- request and response instead of explicit flag Response ::= SEQUENCE { - responseTo INTEGER, + -- requestID of the request responding to + requestID INTEGER (0..4294967295), -- arbitrarily restricting to an unsigned 32bit integer senderID NodeID, - parts INTEGER (0..150), - part INTEGER (0..150), + part Partnum, + finalPart BOOLEAN, -- flag indicating this `part` to be the last of this response action Action, actionPayload CHOICE { queryIDResponsePayload QueryIDResponsePayload, @@ -44,7 +47,7 @@ NodeState ::= SEQUENCE { domain Domain, ipAddr OCTET STRING (SIZE(16)), dhtPort INTEGER, - apPort INTEGER, + servicePort INTEGER, vServerID INTEGER (0..255) } @@ -59,8 +62,8 @@ NodeCache ::= SEQUENCE OF CacheEntry JoinRequestPayload ::= NULL JoinResponsePayload ::= SEQUENCE { - successors SEQUENCE OF NodeID, - predecessors SEQUENCE OF NodeID, + successors SEQUENCE OF NodeState, + predecessors SEQUENCE OF NodeState, cache NodeCache } @@ -79,14 +82,14 @@ QueryIDResponsePayload ::= SEQUENCE { StabiliseRequestPayload ::= NULL StabiliseResponsePayload ::= SEQUENCE { - successors SEQUENCE OF NodeID, - predecessors SEQUENCE OF NodeID + successors SEQUENCE OF NodeState, + predecessors SEQUENCE OF NodeState -- ToDo: transfer of handled key data, if newly responsible for it } LeaveRequestPayload ::= SEQUENCE { - successors SEQUENCE OF NodeID, - predecessors SEQUENCE OF NodeID + successors SEQUENCE OF NodeState, + predecessors SEQUENCE OF NodeState -- ToDo: transfer of own data to newly responsible node } diff --git a/Hash2Pub.cabal b/Hash2Pub.cabal index 4906c08..d1ee4b1 100644 --- a/Hash2Pub.cabal +++ b/Hash2Pub.cabal @@ -46,7 +46,7 @@ category: Network extra-source-files: CHANGELOG.md common deps - build-depends: base ^>=4.12.0.0, containers ^>=0.6.0.1, bytestring, utf8-string ^>=1.0.1.1, network ^>=2.8.0.1, time ^>=1.8.0.2, cmdargs ^>= 0.10, cryptonite ^>= 0.25, memory, async, stm, asn1-encoding, asn1-types, asn1-parse, publicsuffix, network-byte-order, safe, iproute + build-depends: base ^>=4.12.0.0, containers ^>=0.6.0.1, bytestring, utf8-string ^>=1.0.1.1, network ^>=2.8.0.1, time ^>=1.8.0.2, cmdargs ^>= 0.10, cryptonite ^>= 0.25, memory, async, stm, asn1-encoding, asn1-types, asn1-parse, publicsuffix, network-byte-order, safe, iproute, mtl, random ghc-options: -Wall @@ -55,7 +55,7 @@ library import: deps -- Modules exported by the library. - exposed-modules: Hash2Pub.FediChord, Hash2Pub.DHTProtocol, Hash2Pub.ASN1Coding + exposed-modules: Hash2Pub.FediChord, Hash2Pub.FediChordTypes, Hash2Pub.DHTProtocol, Hash2Pub.ASN1Coding, Hash2Pub.ProtocolTypes -- Modules included in this library but not exported. other-modules: Hash2Pub.Utils diff --git a/app/Main.hs b/app/Main.hs index 1956f64..cdfc2b3 100644 --- a/app/Main.hs +++ b/app/Main.hs @@ -1,7 +1,12 @@ module Main where import Control.Concurrent -import Data.IP (IPv6, toHostAddress6) +import Control.Concurrent.Async +import Control.Concurrent.STM +import Control.Concurrent.STM.TVar +import Control.Exception +import Data.Either +import Data.IP (IPv6, toHostAddress6) import System.Environment import Hash2Pub.FediChord @@ -11,22 +16,39 @@ main = do -- ToDo: parse and pass config -- probably use `tomland` for that conf <- readConfig + -- TODO: first initialise 'RealNode', then the vservers -- ToDo: load persisted caches, bootstrapping nodes … (serverSock, thisNode) <- fediChordInit conf - print thisNode - print serverSock -- currently no masking is necessary, as there is nothing to clean up cacheWriterThread <- forkIO $ cacheWriter thisNode - -- idea: list of bootstrapping nodes, try joining within a timeout - -- stop main thread from terminating during development - getChar + -- try joining the DHT using one of the provided bootstrapping nodes + joinedState <- tryBootstrapJoining thisNode + either (\err -> do + -- handle unsuccessful join + + putStrLn $ err <> " Error joining, start listening for incoming requests anyways" + print =<< readTVarIO thisNode + -- launch thread attempting to join on new cache entries + _ <- forkIO $ joinOnNewEntriesThread thisNode + wait =<< async (fediMainThreads serverSock thisNode) + ) + (\joinedNS -> do + -- launch main eventloop with successfully joined state + putStrLn "successful join" + wait =<< async (fediMainThreads serverSock thisNode) + ) + joinedState pure () + readConfig :: IO FediChordConf readConfig = do - confDomainString : ipString : portString : _ <- getArgs + confDomainString : ipString : portString : bootstrapHost : bootstrapPortString : _ <- getArgs pure $ FediChordConf { confDomain = confDomainString , confIP = toHostAddress6 . read $ ipString , confDhtPort = read portString + , confBootstrapNodes = [(bootstrapHost, read bootstrapPortString)] + --, confStabiliseInterval = 60 + , confBootstrapSamplingInterval = 180 } diff --git a/src/Hash2Pub/ASN1Coding.hs b/src/Hash2Pub/ASN1Coding.hs index b2e7b64..456dac6 100644 --- a/src/Hash2Pub/ASN1Coding.hs +++ b/src/Hash2Pub/ASN1Coding.hs @@ -16,8 +16,8 @@ import qualified Data.Set as Set import Data.Time.Clock.POSIX () import Safe -import Hash2Pub.DHTProtocol -import Hash2Pub.FediChord +import Hash2Pub.FediChordTypes +import Hash2Pub.ProtocolTypes import Hash2Pub.Utils import Debug.Trace @@ -77,6 +77,9 @@ chunkLength numParts totalSize = ceiling $ (realToFrac totalSize :: Double) / re -- The number of parts per message is limited to 150 for DOS protection reasons. -- The returned byte strings might exceed the desired maximum length, as only the payload (and not all of them) -- can be split into multiple parts. +-- +-- The return type is a Map from part number to encoded part, to be able to acknowledge +-- an encoded part without having to decode its number. serialiseMessage :: Int -- maximum message size in bytes -> FediChordMessage -- mesage to be serialised in preparation for sending -> Map.Map Integer BS.ByteString -- list of ASN.1 DER encoded messages together representing @@ -100,11 +103,11 @@ serialiseMessage maxBytesLength msg = modifyMessage i (partNum, pl) pls = (partNum, msg { part = partNum , payload = Just pl - , parts = fromIntegral i + , isFinalPart = partNum == fromIntegral i }):pls -- part starts at 1 payloadParts :: Int -> Maybe [(Integer, ActionPayload)] - payloadParts i = zip [1..] . splitPayload i <$> actionPayload + payloadParts i = zip [(part msg)..] . splitPayload i <$> actionPayload actionPayload = payload msg encodedMsgs i = Map.map encodeMsg $ messageParts i maxMsgLength = maximum . fmap BS.length . Map.elems @@ -127,20 +130,20 @@ encodePayload LeaveResponsePayload = [Null] encodePayload payload'@LeaveRequestPayload{} = Start Sequence : Start Sequence - : fmap (IntVal . getNodeID) (leaveSuccessors payload') + : concatMap encodeNodeState (leaveSuccessors payload') <> [End Sequence , Start Sequence] - <> fmap (IntVal . getNodeID) (leavePredecessors payload') + <> concatMap encodeNodeState (leavePredecessors payload') <> [End Sequence , End Sequence] -- currently StabiliseResponsePayload and LeaveRequestPayload are equal encodePayload payload'@StabiliseResponsePayload{} = Start Sequence : Start Sequence - : fmap (IntVal . getNodeID) (stabiliseSuccessors payload') + : concatMap encodeNodeState (stabiliseSuccessors payload') <> [End Sequence , Start Sequence] - <> fmap (IntVal . getNodeID) (stabilisePredecessors payload') + <> concatMap encodeNodeState (stabilisePredecessors payload') <> [End Sequence , End Sequence] encodePayload payload'@StabiliseRequestPayload = [Null] @@ -167,10 +170,10 @@ encodePayload payload'@QueryIDRequestPayload{} = [ encodePayload payload'@JoinResponsePayload{} = Start Sequence : Start Sequence - : fmap (IntVal . getNodeID) (joinSuccessors payload') + : concatMap encodeNodeState (joinSuccessors payload') <> [End Sequence , Start Sequence] - <> fmap (IntVal . getNodeID) (joinPredecessors payload') + <> concatMap encodeNodeState (joinPredecessors payload') <> [End Sequence , Start Sequence] <> concatMap encodeCacheEntry (joinCache payload') @@ -183,15 +186,15 @@ encodePayload payload'@PingResponsePayload{} = : concatMap encodeNodeState (pingNodeStates payload') <> [End Sequence] -encodeNodeState :: NodeState -> [ASN1] +encodeNodeState :: NodeState a => a -> [ASN1] encodeNodeState ns = [ Start Sequence - , IntVal (getNodeID . nid $ ns) - , ASN1String . asn1CharacterString Visible $ domain ns - , OctetString (ipAddrAsBS $ ipAddr ns) - , IntVal (toInteger . dhtPort $ ns) - , IntVal (maybe 0 toInteger $ apPort ns) - , IntVal (vServerID ns) + , IntVal (getNodeID . getNid $ ns) + , ASN1String . asn1CharacterString Visible $ getDomain ns + , OctetString (ipAddrAsBS $ getIpAddr ns) + , IntVal (toInteger . getDhtPort $ ns) + , IntVal (toInteger . getServicePort $ ns) + , IntVal (getVServerID ns) , End Sequence ] @@ -213,23 +216,22 @@ encodeQueryResult FORWARD{} = Enumerated 1 encodeMessage :: FediChordMessage -- ^ the 'FediChordMessage to be encoded -> [ASN1] encodeMessage - (Request requestID sender parts part action requestPayload) = + (Request requestID sender part isFinalPart action requestPayload) = Start Sequence : (Enumerated . fromIntegral . fromEnum $ action) : IntVal requestID : encodeNodeState sender - <> [ - IntVal parts - , IntVal part ] + <> [IntVal part + , Boolean isFinalPart] <> maybe [] encodePayload requestPayload <> [End Sequence] encodeMessage - (Response responseTo senderID parts part action responsePayload) = [ + (Response requestID senderID part isFinalPart action responsePayload) = [ Start Sequence - , IntVal responseTo + , IntVal requestID , IntVal . getNodeID $ senderID - , IntVal parts , IntVal part + , Boolean isFinalPart , Enumerated . fromIntegral . fromEnum $ action] <> maybe [] encodePayload responsePayload <> [End Sequence] @@ -262,8 +264,8 @@ parseRequest :: Action -> ParseASN1 FediChordMessage parseRequest action = do requestID <- parseInteger sender <- parseNodeState - parts <- parseInteger part <- parseInteger + isFinalPart <- parseBool hasPayload <- hasNext payload <- if not hasPayload then pure Nothing else Just <$> case action of QueryID -> parseQueryIDRequest @@ -272,13 +274,13 @@ parseRequest action = do Stabilise -> parseStabiliseRequest Ping -> parsePingRequest - pure $ Request requestID sender parts part action payload + pure $ Request requestID sender part isFinalPart action payload parseResponse :: Integer -> ParseASN1 FediChordMessage -parseResponse responseTo = do +parseResponse requestID = do senderID <- fromInteger <$> parseInteger :: ParseASN1 NodeID - parts <- parseInteger part <- parseInteger + isFinalPart <- parseBool action <- parseEnum :: ParseASN1 Action hasPayload <- hasNext payload <- if not hasPayload then pure Nothing else Just <$> case action of @@ -288,7 +290,14 @@ parseResponse responseTo = do Stabilise -> parseStabiliseResponse Ping -> parsePingResponse - pure $ Response responseTo senderID parts part action payload + pure $ Response requestID senderID part isFinalPart action payload + +parseBool :: ParseASN1 Bool +parseBool = do + i <- getNext + case i of + Boolean parsed -> pure parsed + x -> throwParseError $ "Expected Boolean but got " <> show x parseInteger :: ParseASN1 Integer parseInteger = do @@ -325,22 +334,21 @@ parseNull = do Null -> pure () x -> throwParseError $ "Expected Null but got " <> show x -parseNodeState :: ParseASN1 NodeState +parseNodeState :: ParseASN1 RemoteNodeState parseNodeState = onNextContainer Sequence $ do nid' <- fromInteger <$> parseInteger domain' <- parseString ip' <- bsAsIpAddr <$> parseOctets dhtPort' <- fromInteger <$> parseInteger - apPort' <- fromInteger <$> parseInteger + servicePort' <- fromInteger <$> parseInteger vServer' <- parseInteger - pure NodeState { + pure RemoteNodeState { nid = nid' , domain = domain' , dhtPort = dhtPort' - , apPort = if apPort' == 0 then Nothing else Just apPort' + , servicePort = servicePort' , vServerID = vServer' , ipAddr = ip' - , internals = Nothing } @@ -360,8 +368,8 @@ parseJoinRequest = do parseJoinResponse :: ParseASN1 ActionPayload parseJoinResponse = onNextContainer Sequence $ do - succ' <- fmap fromInteger <$> onNextContainer Sequence (getMany parseInteger) - pred' <- fmap fromInteger <$> onNextContainer Sequence (getMany parseInteger) + succ' <- onNextContainer Sequence (getMany parseNodeState) + pred' <- onNextContainer Sequence (getMany parseNodeState) cache <- parseNodeCache pure $ JoinResponsePayload { joinSuccessors = succ' @@ -396,8 +404,8 @@ parseStabiliseRequest = do parseStabiliseResponse :: ParseASN1 ActionPayload parseStabiliseResponse = onNextContainer Sequence $ do - succ' <- fmap fromInteger <$> onNextContainer Sequence (getMany parseInteger) - pred' <- fmap fromInteger <$> onNextContainer Sequence (getMany parseInteger) + succ' <- onNextContainer Sequence (getMany parseNodeState) + pred' <- onNextContainer Sequence (getMany parseNodeState) pure $ StabiliseResponsePayload { stabiliseSuccessors = succ' , stabilisePredecessors = pred' @@ -405,8 +413,8 @@ parseStabiliseResponse = onNextContainer Sequence $ do parseLeaveRequest :: ParseASN1 ActionPayload parseLeaveRequest = onNextContainer Sequence $ do - succ' <- fmap fromInteger <$> onNextContainer Sequence (getMany parseInteger) - pred' <- fmap fromInteger <$> onNextContainer Sequence (getMany parseInteger) + succ' <- onNextContainer Sequence (getMany parseNodeState) + pred' <- onNextContainer Sequence (getMany parseNodeState) pure $ LeaveRequestPayload { leaveSuccessors = succ' , leavePredecessors = pred' diff --git a/src/Hash2Pub/DHTProtocol.hs b/src/Hash2Pub/DHTProtocol.hs index d83a4bc..a2dd676 100644 --- a/src/Hash2Pub/DHTProtocol.hs +++ b/src/Hash2Pub/DHTProtocol.hs @@ -3,153 +3,146 @@ module Hash2Pub.DHTProtocol , queryLocalCache , addCacheEntry , addCacheEntryPure + , addNodeAsVerified + , addNodeAsVerifiedPure , deleteCacheEntry - , markCacheEntryAsVerified + , deserialiseMessage , RemoteCacheEntry(..) , toRemoteCacheEntry - , remoteNode_ + , remoteNode , Action(..) , ActionPayload(..) , FediChordMessage(..) , maximumParts + , sendQueryIdMessages + , requestQueryID + , requestJoin + , requestPing + , requestStabilise + , lookupMessage + , sendRequestTo + , queryIdLookupLoop + , queueAddEntries + , queueDeleteEntries + , queueDeleteEntry + , resolve + , mkSendSocket + , mkServerSocket + , handleIncomingRequest + , ackRequest + , isPossibleSuccessor + , isPossiblePredecessor + , isJoined + , closestCachePredecessors ) where -import qualified Data.Map as Map -import Data.Maybe (fromMaybe, maybe) -import qualified Data.Set as Set +import Control.Concurrent.Async +import Control.Concurrent.STM +import Control.Concurrent.STM.TBQueue +import Control.Concurrent.STM.TQueue +import Control.Concurrent.STM.TVar +import Control.Exception +import Control.Monad (foldM, forM, forM_) +import qualified Data.ByteString as BS +import Data.Either (rights) +import Data.Foldable (foldl', foldr') +import Data.Functor.Identity +import Data.IP (IPv6, fromHostAddress6, + toHostAddress6) +import Data.List (delete, nub, sortBy) +import qualified Data.Map as Map +import Data.Maybe (fromJust, fromMaybe, isJust, + isNothing, mapMaybe, maybe) +import qualified Data.Set as Set import Data.Time.Clock.POSIX -import Network.Socket hiding (recv, recvFrom, send, sendTo) +import Network.Socket hiding (recv, recvFrom, send, + sendTo) import Network.Socket.ByteString +import Safe +import System.Random +import System.Timeout -import Hash2Pub.FediChord (CacheEntry (..), NodeCache, NodeID, - NodeState (..), - cacheGetNodeStateUnvalidated, - cacheLookup, cacheLookupPred, - cacheLookupSucc, getPredecessors, - getSuccessors, localCompare, - putPredecessors, putSuccessors) +import Hash2Pub.ASN1Coding +import Hash2Pub.FediChordTypes (CacheEntry (..), + CacheEntry (..), HasKeyID (..), + LocalNodeState (..), + LocalNodeStateSTM, NodeCache, + NodeID, NodeState (..), + RemoteNodeState (..), + RingEntry (..), RingMap (..), + addRMapEntry, addRMapEntryWith, + cacheGetNodeStateUnvalidated, + cacheLookup, cacheLookupPred, + cacheLookupSucc, genNodeID, + getKeyID, localCompare, + rMapFromList, rMapLookupPred, + rMapLookupSucc, + setPredecessors, setSuccessors) +import Hash2Pub.ProtocolTypes -import Debug.Trace (trace) +import Debug.Trace (trace) -- === queries === -data QueryResponse = FORWARD (Set.Set RemoteCacheEntry) -- ^ return closest nodes from local cache. - -- whole cache entry is returned for making - -- the entry time stamp available to the - -- protocol serialiser - | FOUND NodeState -- ^node is the responsible node for queried ID - deriving (Show, Eq) - -- TODO: evaluate more fine-grained argument passing to allow granular locking -- | look up an ID to either claim responsibility for it or return the closest l nodes from the local cache -queryLocalCache :: NodeState -> NodeCache -> Int -> NodeID -> QueryResponse +queryLocalCache :: LocalNodeState -> NodeCache -> Int -> NodeID -> QueryResponse queryLocalCache ownState nCache lBestNodes targetID -- as target ID falls between own ID and first predecessor, it is handled by this node - | (targetID `localCompare` ownID) `elem` [LT, EQ] && not (null preds) && (targetID `localCompare` head preds == GT) = FOUND ownState + -- This only makes sense if the node is part of the DHT by having joined. + -- A default answer to nodes querying an unjoined node is provided by 'respondQueryID'. + | isJoined ownState && targetID `isInOwnResponsibilitySlice` ownState = FOUND . toRemoteNodeState $ ownState -- my interpretation: the "l best next hops" are the l-1 closest preceding nodes and -- the closest succeeding node (like with the p initiated parallel queries - | otherwise = FORWARD $ closestSuccessor `Set.union` closestPredecessors + | otherwise = FORWARD $ closestSuccessor `Set.union` closestCachePredecessors (lBestNodes-1) targetID nCache where - preds = fromMaybe [] $ getPredecessors ownState - ownID = nid ownState + ownID = getNid ownState + preds = predecessors ownState closestSuccessor :: Set.Set RemoteCacheEntry - closestSuccessor = maybe Set.empty Set.singleton $ toRemoteCacheEntry =<< cacheLookupSucc targetID nCache + closestSuccessor = maybe Set.empty (Set.singleton . toRemoteCacheEntry) $ cacheLookupSucc targetID nCache - closestPredecessors :: Set.Set RemoteCacheEntry - closestPredecessors = closestPredecessor (lBestNodes-1) $ nid ownState - closestPredecessor :: (Integral n, Show n) => n -> NodeID -> Set.Set RemoteCacheEntry - closestPredecessor 0 _ = Set.empty - closestPredecessor remainingLookups lastID - | remainingLookups < 0 = Set.empty - | otherwise = - let result = cacheLookupPred lastID nCache - in - case toRemoteCacheEntry =<< result of - Nothing -> Set.empty - Just nPred@(RemoteCacheEntry ns ts) -> Set.insert nPred $ closestPredecessor (remainingLookups-1) (nid ns) --- === protocol serialisation data types +-- | look up the 3 direct predecessor cache entries of a given ID +closestCachePredecessors :: (Integral n) + => n -- ^ number of entries to look up + -> NodeID -- ^ target ID to get the predecessors of + -> NodeCache -- ^ cache to use for lookup + -> Set.Set RemoteCacheEntry +closestCachePredecessors 0 _ _ = Set.empty +closestCachePredecessors remainingLookups lastID nCache + | remainingLookups < 0 = Set.empty + | otherwise = + let result = cacheLookupPred lastID nCache + in + case toRemoteCacheEntry <$> result of + Nothing -> Set.empty + Just nPred@(RemoteCacheEntry ns ts) -> Set.insert nPred $ closestCachePredecessors (remainingLookups-1) (nid ns) nCache -data Action = - QueryID - | Join - | Leave - | Stabilise - | Ping - deriving (Show, Eq, Enum) +-- | Determines whether a lookup key is within the responsibility slice of a node, +-- as it falls between its first predecessor and the node itself. +-- Looks up the successor of the lookup key on a 'RingMap' representation of the +-- predecessor list with the node itself added. If the result is the same as the node +-- itself then it falls into the responsibility interval. +isInOwnResponsibilitySlice :: HasKeyID a => a -> LocalNodeState -> Bool +isInOwnResponsibilitySlice lookupTarget ownNs = (getKeyID <$> rMapLookupSucc (getKeyID lookupTarget) predecessorRMap) == pure (getNid ownNs) + where + predecessorList = predecessors ownNs + -- add node itself to RingMap representation, to distinguish between + -- responsibility of own node and predecessor + predecessorRMap = addRMapEntry (toRemoteNodeState ownNs) $ rMapFromList predecessorList + closestPredecessor = headMay predecessorList -data FediChordMessage = - Request { - requestID :: Integer - , sender :: NodeState - , parts :: Integer - , part :: Integer - -- ^ part starts at 0 - , action :: Action - , payload :: Maybe ActionPayload - } - | Response { - responseTo :: Integer - , senderID :: NodeID - , parts :: Integer - , part :: Integer - , action :: Action - , payload :: Maybe ActionPayload - } deriving (Show, Eq) +isPossiblePredecessor :: HasKeyID a => a -> LocalNodeState -> Bool +isPossiblePredecessor = isInOwnResponsibilitySlice -data ActionPayload = - QueryIDRequestPayload { - queryTargetID :: NodeID - , queryLBestNodes :: Integer - } - | JoinRequestPayload - | LeaveRequestPayload { - leaveSuccessors :: [NodeID] - , leavePredecessors :: [NodeID] - } - | StabiliseRequestPayload - | PingRequestPayload - | QueryIDResponsePayload { - queryResult :: QueryResponse - } - | JoinResponsePayload { - joinSuccessors :: [NodeID] - , joinPredecessors :: [NodeID] - , joinCache :: [RemoteCacheEntry] - } - | LeaveResponsePayload - | StabiliseResponsePayload { - stabiliseSuccessors :: [NodeID] - , stabilisePredecessors :: [NodeID] - } - | PingResponsePayload { - pingNodeStates :: [NodeState] - } - deriving (Show, Eq) - --- | global limit of parts per message used when (de)serialising messages. --- Used to limit the impact of DOS attempts with partial messages. -maximumParts :: Num a => a -maximumParts = 150 - --- | dedicated data type for cache entries sent to or received from the network, --- as these have to be considered as unvalidated. Also helps with separation of trust. -data RemoteCacheEntry = RemoteCacheEntry NodeState POSIXTime - deriving (Show, Eq) - -instance Ord RemoteCacheEntry where - (RemoteCacheEntry ns1 _) `compare` (RemoteCacheEntry ns2 _) = nid ns1 `compare` nid ns2 - -toRemoteCacheEntry :: CacheEntry -> Maybe RemoteCacheEntry -toRemoteCacheEntry (NodeEntry _ ns ts) = Just $ RemoteCacheEntry ns ts -toRemoteCacheEntry (ProxyEntry _ (Just entry@NodeEntry{})) = toRemoteCacheEntry entry -toRemoteCacheEntry _ = Nothing - --- helper function for use in tests -remoteNode_ :: RemoteCacheEntry -> NodeState -remoteNode_ (RemoteCacheEntry ns _) = ns +isPossibleSuccessor :: HasKeyID a => a -> LocalNodeState -> Bool +isPossibleSuccessor lookupTarget ownNs = (getKeyID <$> rMapLookupPred (getKeyID lookupTarget) successorRMap) == pure (getNid ownNs) + where + successorList = successors ownNs + successorRMap = addRMapEntry (toRemoteNodeState ownNs) $ rMapFromList successorList + closestSuccessor = headMay successorList -- cache operations @@ -164,18 +157,18 @@ addCacheEntry entry cache = do -- | pure version of 'addCacheEntry' with current time explicitly specified as argument addCacheEntryPure :: POSIXTime -- ^ current time - -> RemoteCacheEntry -- ^ a remote cache entry received from network - -> NodeCache -- ^ node cache to insert to - -> NodeCache -- ^ new node cache with the element inserted + -> RemoteCacheEntry -- ^ a remote cache entry received from network + -> NodeCache -- ^ node cache to insert to + -> NodeCache -- ^ new node cache with the element inserted addCacheEntryPure now (RemoteCacheEntry ns ts) cache = let -- TODO: limit diffSeconds to some maximum value to prevent malicious nodes from inserting entries valid nearly until eternity timestamp' = if ts <= now then ts else now - newCache = Map.insertWith insertCombineFunction (nid ns) (NodeEntry False ns timestamp') cache - insertCombineFunction newVal@(NodeEntry newValidationState newNode newTimestamp) oldVal = + newCache = addRMapEntryWith insertCombineFunction (CacheEntry False ns timestamp') cache + insertCombineFunction newVal@(KeyEntry (CacheEntry newValidationState newNode newTimestamp)) oldVal = case oldVal of ProxyEntry n _ -> ProxyEntry n (Just newVal) - NodeEntry oldValidationState _ oldTimestamp -> NodeEntry oldValidationState newNode (max oldTimestamp newTimestamp) + KeyEntry (CacheEntry oldValidationState _ oldTimestamp) -> KeyEntry (CacheEntry oldValidationState newNode (max oldTimestamp newTimestamp)) in newCache @@ -183,10 +176,30 @@ addCacheEntryPure now (RemoteCacheEntry ns ts) cache = deleteCacheEntry :: NodeID -- ^ID of the node to be deleted -> NodeCache -- ^cache to delete from -> NodeCache -- ^cache without the specified element -deleteCacheEntry = Map.update modifier +deleteCacheEntry nid = RingMap . Map.update modifier nid . getRingMap where modifier (ProxyEntry idPointer _) = Just (ProxyEntry idPointer Nothing) - modifier NodeEntry {} = Nothing + modifier KeyEntry {} = Nothing + + +-- | Add a 'RemoteNodeState' to the node cache marked as verified. +-- If an entry already exists, it is replaced by the new verified one. +addNodeAsVerified :: RemoteNodeState + -> NodeCache + -> IO NodeCache +addNodeAsVerified node cache = do + now <- getPOSIXTime + pure $ addNodeAsVerifiedPure now node cache + + +-- | Pure variant of 'addNodeAsVerified' with current time explicitly specified as an argument +addNodeAsVerifiedPure :: POSIXTime + -> RemoteNodeState + -> NodeCache + -> NodeCache +addNodeAsVerifiedPure now node = addRMapEntry (CacheEntry True node now) + + -- | Mark a cache entry as verified after pinging it, possibly bumping its timestamp. markCacheEntryAsVerified :: Maybe POSIXTime -- ^ the (current) timestamp to be @@ -194,12 +207,529 @@ markCacheEntryAsVerified :: Maybe POSIXTime -- ^ the (current) timestamp to -> NodeID -- ^ which node to mark -> NodeCache -- ^ current node cache -> NodeCache -- ^ new NodeCache with the updated entry -markCacheEntryAsVerified timestamp = Map.adjust adjustFunc +markCacheEntryAsVerified timestamp nid = RingMap . Map.adjust adjustFunc nid . getRingMap where - adjustFunc (NodeEntry _ ns ts) = NodeEntry True ns $ fromMaybe ts timestamp + adjustFunc (KeyEntry (CacheEntry _ ns ts)) = KeyEntry (CacheEntry True ns $ fromMaybe ts timestamp) adjustFunc (ProxyEntry _ (Just entry)) = adjustFunc entry adjustFunc entry = entry + +-- | uses the successor and predecessor list of a node as an indicator for whether a +-- node has properly joined the DHT +isJoined :: LocalNodeState -> Bool +isJoined ns = not . all null $ [successors ns, predecessors ns] + +-- | the size limit to be used when serialising messages for sending +sendMessageSize :: Num i => i +sendMessageSize = 1200 + +-- ====== message send and receive operations ====== + +-- encode the response to a request that just signals successful receipt +ackRequest :: NodeID -> FediChordMessage -> Map.Map Integer BS.ByteString +ackRequest ownID req@Request{} = serialiseMessage sendMessageSize $ Response { + requestID = requestID req + , senderID = ownID + , part = part req + , isFinalPart = False + , action = action req + , payload = Nothing + } +ackRequest _ _ = Map.empty + + +-- | Dispatch incoming requests to the dedicated handling and response function, and enqueue +-- the response to be sent. +handleIncomingRequest :: LocalNodeStateSTM -- ^ the handling node + -> TQueue (BS.ByteString, SockAddr) -- ^ send queue + -> Set.Set FediChordMessage -- ^ all parts of the request to handle + -> SockAddr -- ^ source address of the request + -> IO () +handleIncomingRequest nsSTM sendQ msgSet sourceAddr = do + putStrLn $ "handling incoming request: " <> show msgSet + ns <- readTVarIO nsSTM + -- add nodestate to cache + now <- getPOSIXTime + case headMay . Set.elems $ msgSet of + Nothing -> pure () + Just aPart -> do + queueAddEntries (Identity $ RemoteCacheEntry (sender aPart) now) ns + -- distinguish on whether and how to respond. If responding, pass message to response generating function and write responses to send queue + maybe (pure ()) ( + mapM_ (\resp -> atomically $ writeTQueue sendQ (resp, sourceAddr)) + ) + =<< (case action aPart of + Ping -> Just <$> respondPing nsSTM msgSet + Join -> Just <$> respondJoin nsSTM msgSet + -- ToDo: figure out what happens if not joined + QueryID -> Just <$> respondQueryID nsSTM msgSet + -- only when joined + Leave -> if isJoined ns then Just <$> respondLeave nsSTM msgSet else pure Nothing + Stabilise -> if isJoined ns then Just <$> respondStabilise nsSTM msgSet else pure Nothing + ) + -- for single part request, response starts with part number 1. For multipart requests, response starts with part number n+1. + + -- TODO: determine request type only from first part, but catch RecSelError on each record access when folding, because otherwise different request type parts can make this crash + -- TODO: test case: mixed message types of parts + + +-- ....... response sending ....... + +-- TODO: could all these respond* functions be in STM instead of IO? + + +-- | execute a key ID lookup on local cache and respond with the result +respondQueryID :: LocalNodeStateSTM -> Set.Set FediChordMessage -> IO (Map.Map Integer BS.ByteString) +respondQueryID nsSTM msgSet = do + putStrLn "responding to a QueryID request" + -- this message cannot be split reasonably, so just + -- consider the first payload + let + aRequestPart = Set.elemAt 0 msgSet + senderID = getNid . sender $ aRequestPart + senderPayload = foldr' (\msg plAcc -> + if isNothing plAcc && isJust (payload msg) + then payload msg + else plAcc + ) Nothing msgSet + -- return only empty message serialisation if no payload was included in parts + maybe (pure Map.empty) (\senderPayload' -> do + responseMsg <- atomically $ do + nsSnap <- readTVar nsSTM + cache <- readTVar $ nodeCacheSTM nsSnap + let + responsePayload = QueryIDResponsePayload { + queryResult = if isJoined nsSnap + then queryLocalCache nsSnap cache (fromIntegral $ queryLBestNodes senderPayload') (queryTargetID senderPayload') + -- if not joined yet, attract responsibility for + -- all keys to make bootstrapping possible + else FOUND (toRemoteNodeState nsSnap) + } + queryResponseMsg = Response { + requestID = requestID aRequestPart + , senderID = getNid nsSnap + , part = if Set.size msgSet == 1 then 1 else fromIntegral $ Set.size msgSet + 1 + , isFinalPart = False + , action = QueryID + , payload = Just responsePayload + } + pure queryResponseMsg + pure $ serialiseMessage sendMessageSize responseMsg + ) senderPayload + +-- | Respond to a Leave request by removing the leaving node from local data structures +-- and confirming with response. +-- TODO: copy over key data from leaver and confirm +respondLeave :: LocalNodeStateSTM -> Set.Set FediChordMessage -> IO (Map.Map Integer BS.ByteString) +respondLeave nsSTM msgSet = do + -- combine payload of all parts + let (requestPreds, requestSuccs) = foldr' (\msg (predAcc, succAcc) -> + (maybe predAcc (++ predAcc) (leavePredecessors <$> payload msg) + ,maybe succAcc (++ succAcc) (leaveSuccessors <$> payload msg)) + ) + ([],[]) msgSet + aRequestPart = Set.elemAt 0 msgSet + senderID = getNid . sender $ aRequestPart + responseMsg <- atomically $ do + nsSnap <- readTVar nsSTM + -- remove leaving node from successors, predecessors and NodeCache + writeTQueue (cacheWriteQueue nsSnap) $ deleteCacheEntry senderID + writeTVar nsSTM $ + -- add predecessors and successors of leaving node to own lists + setPredecessors (filter ((/=) senderID . getNid) $ requestPreds <> predecessors nsSnap) + . setSuccessors (filter ((/=) senderID . getNid) $ requestSuccs <> successors nsSnap) $ nsSnap + -- TODO: handle handover of key data + let leaveResponse = Response { + requestID = requestID aRequestPart + , senderID = getNid nsSnap + , part = if Set.size msgSet == 1 then 1 else fromIntegral $ Set.size msgSet + 1 + , isFinalPart = False + , action = Leave + , payload = Just LeaveResponsePayload + } + pure leaveResponse + pure $ serialiseMessage sendMessageSize responseMsg + +-- | respond to stabilise requests by returning successor and predecessor list +respondStabilise :: LocalNodeStateSTM -> Set.Set FediChordMessage -> IO (Map.Map Integer BS.ByteString) +respondStabilise nsSTM msgSet = do + nsSnap <- readTVarIO nsSTM + let + aRequestPart = Set.elemAt 0 msgSet + responsePayload = StabiliseResponsePayload { + stabiliseSuccessors = successors nsSnap + , stabilisePredecessors = predecessors nsSnap + } + stabiliseResponse = Response { + requestID = requestID aRequestPart + , senderID = getNid nsSnap + , part = if Set.size msgSet == 1 then 1 else fromIntegral $ Set.size msgSet + 1 + , isFinalPart = False + , action = Stabilise + , payload = Just responsePayload + } + -- TODO: return service endpoint for copying over key data + pure $ serialiseMessage sendMessageSize stabiliseResponse + + +-- | respond to Ping request by returning all active vserver NodeStates +respondPing :: LocalNodeStateSTM -> Set.Set FediChordMessage -> IO (Map.Map Integer BS.ByteString) +respondPing nsSTM msgSet = do + -- TODO: respond with all active VS when implementing k-choices + nsSnap <- readTVarIO nsSTM + let + aRequestPart = Set.elemAt 0 msgSet + responsePayload = PingResponsePayload { pingNodeStates = [ toRemoteNodeState nsSnap ] } + pingResponse = Response { + requestID = requestID aRequestPart + , senderID = getNid nsSnap + , part = if Set.size msgSet == 1 then 1 else fromIntegral $ Set.size msgSet + 1 + , isFinalPart = False + , action = Ping + , payload = Just responsePayload + } + pure $ serialiseMessage sendMessageSize pingResponse + +-- this modifies node state, so locking and IO seems to be necessary. +-- Still try to keep as much code as possible pure +respondJoin :: LocalNodeStateSTM -> Set.Set FediChordMessage -> IO (Map.Map Integer BS.ByteString) +respondJoin nsSTM msgSet = do + -- atomically read and modify the node state according to the parsed request + responseMsg <- atomically $ do + nsSnap <- readTVar nsSTM + cache <- readTVar $ nodeCacheSTM nsSnap + let + aRequestPart = Set.elemAt 0 msgSet + senderNS = sender aRequestPart + responsibilityLookup = queryLocalCache nsSnap cache 1 (getNid senderNS) + thisNodeResponsible (FOUND _) = True + thisNodeResponsible (FORWARD _) = False + -- check whether the joining node falls into our responsibility + if thisNodeResponsible responsibilityLookup + then do + -- if yes, adjust own predecessors/ successors and return those in a response + let + newPreds = senderNS:predecessors nsSnap + joinedNS = setPredecessors newPreds nsSnap + responsePayload = JoinResponsePayload { + joinSuccessors = successors joinedNS + , joinPredecessors = predecessors joinedNS + , joinCache = toRemoteCache cache + } + joinResponse = Response { + requestID = requestID aRequestPart + , senderID = getNid joinedNS + , part = if Set.size msgSet == 1 then 1 else fromIntegral $ Set.size msgSet + 1 + , isFinalPart = False + , action = Join + , payload = Just responsePayload + } + writeTVar nsSTM joinedNS + pure joinResponse + -- otherwise respond with empty payload + else pure Response { + requestID = requestID aRequestPart + , senderID = getNid nsSnap + , part = if Set.size msgSet == 1 then 1 else fromIntegral $ Set.size msgSet + 1 + , isFinalPart = False + , action = Join + , payload = Nothing + } + + pure $ serialiseMessage sendMessageSize responseMsg + -- TODO: notify service layer to copy over data now handled by the new joined node + +-- ....... request sending ....... + +-- | send a join request and return the joined 'LocalNodeState' including neighbours +requestJoin :: NodeState a => a -- ^ currently responsible node to be contacted + -> LocalNodeStateSTM -- ^ joining NodeState + -> IO (Either String LocalNodeStateSTM) -- ^ node after join with all its new information +requestJoin toJoinOn ownStateSTM = + bracket (mkSendSocket (getDomain toJoinOn) (getDhtPort toJoinOn)) close (\sock -> do + -- extract own state for getting request information + ownState <- readTVarIO ownStateSTM + responses <- sendRequestTo 5000 3 (\rid -> Request rid (toRemoteNodeState ownState) 1 True Join (Just JoinRequestPayload)) sock + (cacheInsertQ, joinedState) <- atomically $ do + stateSnap <- readTVar ownStateSTM + let + (cacheInsertQ, predAccSet, succAccSet) = foldl' + (\(insertQ, predAccSet', succAccSet') msg -> + let + insertQ' = maybe insertQ (\msgPl -> + -- collect list of insertion statements into global cache + queueAddEntries (joinCache msgPl) : insertQ + ) $ payload msg + -- collect received predecessors and successors + predAccSet'' = maybe predAccSet' ( + foldr' Set.insert predAccSet' . joinPredecessors + ) $ payload msg + succAccSet'' = maybe succAccSet' ( + foldr' Set.insert succAccSet' . joinSuccessors + ) $ payload msg + in + (insertQ', predAccSet'', succAccSet'') + ) + -- reset predecessors and successors + ([], Set.empty, Set.empty) + responses + -- sort, slice and set the accumulated successors and predecessors + newState = setSuccessors (Set.elems succAccSet) . setPredecessors (Set.elems predAccSet) $ stateSnap + writeTVar ownStateSTM newState + pure (cacheInsertQ, newState) + -- execute the cache insertions + mapM_ (\f -> f joinedState) cacheInsertQ + pure $ if responses == Set.empty + then Left $ "join error: got no response from " <> show (getNid toJoinOn) + else if null (predecessors joinedState) && null (successors joinedState) + then Left "join error: no predecessors or successors" + -- successful join + else Right ownStateSTM + ) + `catch` (\e -> pure . Left $ displayException (e :: IOException)) + + +-- | Send a 'QueryID' 'Request' for getting the node that handles a certain key ID. +requestQueryID :: LocalNodeState -- ^ NodeState of the querying node + -> NodeID -- ^ target key ID to look up + -> IO RemoteNodeState -- ^ the node responsible for handling that key +-- 1. do a local lookup for the l closest nodes +-- 2. create l sockets +-- 3. send a message async concurrently to all l nodes +-- 4. collect the results, insert them into cache +-- 5. repeat until FOUND (problem: new entries not necessarily already in cache, explicitly compare with closer results) +-- TODO: deal with lookup failures +requestQueryID ns targetID = do + firstCacheSnapshot <- readTVarIO . nodeCacheSTM $ ns + -- TODO: make maxAttempts configurable + queryIdLookupLoop firstCacheSnapshot ns 50 targetID + +-- | like 'requestQueryID, but allows passing of a custom cache, e.g. for joining +queryIdLookupLoop :: NodeCache -> LocalNodeState -> Int -> NodeID -> IO RemoteNodeState +-- return node itself as default fallback value against infinite recursion. +-- TODO: consider using an Either instead of a default value +queryIdLookupLoop _ ns 0 _ = pure $ toRemoteNodeState ns +queryIdLookupLoop cacheSnapshot ns maxAttempts targetID = do + let localResult = queryLocalCache ns cacheSnapshot (lNumBestNodes ns) targetID + -- FOUND can only be returned if targetID is owned by local node + case localResult of + FOUND thisNode -> pure thisNode + FORWARD nodeSet -> do + responseEntries <- sendQueryIdMessages targetID ns Nothing (remoteNode <$> Set.elems nodeSet) + now <- getPOSIXTime + -- check for a FOUND and return it + case responseEntries of + FOUND foundNode -> pure foundNode + -- if not FOUND, insert entries into local cache copy and recurse + FORWARD entrySet -> + let newLCache = foldr' ( + addCacheEntryPure now + ) cacheSnapshot entrySet + in + queryIdLookupLoop newLCache ns (maxAttempts - 1) targetID + + +sendQueryIdMessages :: (Integral i) + => NodeID -- ^ target key ID to look up + -> LocalNodeState -- ^ node state of the node doing the query + -> Maybe i -- ^ optionally provide an explicit @l@ parameter of number of nodes to be returned + -> [RemoteNodeState] -- ^ nodes to query + -> IO QueryResponse -- ^ accumulated response +sendQueryIdMessages targetID ns lParam targets = do + + -- create connected sockets to all query targets and use them for request handling + -- ToDo: make attempts and timeout configurable + queryThreads <- mapM (\resultNode -> async $ bracket (mkSendSocket (getDomain resultNode) (getDhtPort resultNode)) close ( + sendRequestTo 5000 3 (lookupMessage targetID ns Nothing) + )) targets + -- ToDo: process results immediately instead of waiting for the last one to finish, see https://stackoverflow.com/a/38815224/9198613 + -- ToDo: exception handling, maybe log them + responses <- (mconcat . fmap Set.elems) . rights <$> mapM waitCatch queryThreads + -- insert new cache entries both into global cache as well as return accumulated QueryResponses for further processing + now <- getPOSIXTime + -- collect cache entries from all responses + foldM (\acc resp -> do + let entrySet = case queryResult <$> payload resp of + Just (FOUND result1) -> Set.singleton (RemoteCacheEntry result1 now) + Just (FORWARD resultset) -> resultset + _ -> Set.empty + + -- forward entries to global cache + queueAddEntries entrySet ns + -- return accumulated QueryResult + pure $ case acc of + -- once a FOUND as been encountered, return this as a result + isFound@FOUND{} -> isFound + FORWARD accSet -> FORWARD $ entrySet `Set.union` accSet + + ) (FORWARD Set.empty) responses + +-- | Create a QueryID message to be supplied to 'sendRequestTo' +lookupMessage :: Integral i + => NodeID -- ^ target ID + -> LocalNodeState -- ^ sender node state + -> Maybe i -- ^ optionally provide a different l parameter + -> (Integer -> FediChordMessage) +lookupMessage targetID ns lParam = \rID -> Request rID (toRemoteNodeState ns) 1 True QueryID (Just $ pl ns targetID) + where + pl ns' targetID' = QueryIDRequestPayload { queryTargetID = targetID', queryLBestNodes = maybe (fromIntegral $ lNumBestNodes ns) fromIntegral lParam } + + +-- | Send a stabilise request to provided 'RemoteNode' and, if successful, +-- return parsed neighbour lists +requestStabilise :: LocalNodeState -- ^ sending node + -> RemoteNodeState -- ^ neighbour node to send to + -> IO (Either String ([RemoteNodeState], [RemoteNodeState])) -- ^ (predecessors, successors) of responding node +requestStabilise ns neighbour = do + responses <- bracket (mkSendSocket (getDomain neighbour) (getDhtPort neighbour)) close (fmap Right . sendRequestTo 5000 3 (\rid -> + Request { + requestID = rid + , sender = toRemoteNodeState ns + , part = 1 + , isFinalPart = False + , action = Stabilise + , payload = Just StabiliseRequestPayload + } + ) + ) `catch` (\e -> pure . Left $ displayException (e :: IOException)) + either + -- forward IO error messages + (pure . Left) + (\respSet -> do + -- fold all reply parts together + let (responsePreds, responseSuccs) = foldr' (\msg (predAcc, succAcc) -> + (maybe predAcc (++ predAcc) (stabilisePredecessors <$> payload msg) + ,maybe succAcc (++ succAcc) (stabiliseSuccessors <$> payload msg)) + ) + ([],[]) respSet + -- update successfully responded neighbour in cache + now <- getPOSIXTime + maybe (pure ()) (\p -> queueAddEntries (Identity $ RemoteCacheEntry (sender p) now) ns) $ headMay (Set.elems respSet) + pure $ if null responsePreds && null responseSuccs + then Left "no neighbours returned" + else Right (responsePreds, responseSuccs) + ) responses + + +requestPing :: LocalNodeState -- ^ sending node + -> RemoteNodeState -- ^ node to be PINGed + -> IO (Either String [RemoteNodeState]) -- ^ all active vServers of the pinged node +requestPing ns target = do + responses <- bracket (mkSendSocket (getDomain target) (getDhtPort target)) close + (\sock -> do + resp <- sendRequestTo 5000 3 (\rid -> + Request { + requestID = rid + , sender = toRemoteNodeState ns + , part = 1 + , isFinalPart = False + , action = Ping + , payload = Just PingRequestPayload + } + ) sock + (SockAddrInet6 _ _ peerAddr _) <- getPeerName sock + pure $ Right (peerAddr, resp) + ) `catch` (\e -> pure . Left $ displayException (e :: IOException)) + either + -- forward IO error messages + (pure . Left) + (\(peerAddr, respSet) -> do + -- fold all reply parts together + let responseVss = foldr' (\msg acc -> + maybe acc (foldr' (:) acc) (pingNodeStates <$> payload msg) + ) + [] respSet + -- recompute ID for each received node and mark as verified in cache + now <- getPOSIXTime + forM_ responseVss (\vs -> + let recomputedID = genNodeID peerAddr (getDomain vs) (fromInteger $ getVServerID vs) + in if recomputedID == getNid vs + then atomically $ writeTQueue (cacheWriteQueue ns) $ addNodeAsVerifiedPure now vs + else pure () + ) + pure $ if null responseVss + then Left "no active vServer IDs returned, ignoring node" + else Right responseVss + ) responses + + + +-- | Generic function for sending a request over a connected socket and collecting the response. +-- Serialises the message and tries to deliver its parts for a number of attempts within a specified timeout. +sendRequestTo :: Int -- ^ timeout in seconds + -> Int -- ^ number of retries + -> (Integer -> FediChordMessage) -- ^ the message to be sent, still needing a requestID + -> Socket -- ^ connected socket to use for sending + -> IO (Set.Set FediChordMessage) -- ^ responses +sendRequestTo timeoutMillis numAttempts msgIncomplete sock = do + -- give the message a random request ID + randomID <- randomRIO (0, 2^32-1) + let + msgComplete = msgIncomplete randomID + requests = serialiseMessage sendMessageSize msgComplete + putStrLn $ "sending request message " <> show msgComplete + -- create a queue for passing received response messages back, even after a timeout + responseQ <- newTBQueueIO $ 2*maximumParts -- keep room for duplicate packets + -- start sendAndAck with timeout + attempts numAttempts . timeout timeoutMillis $ sendAndAck responseQ sock requests + -- after timeout, check received responses, delete them from unacked message set/ map and rerun senAndAck with that if necessary. + recvdParts <- atomically $ flushTBQueue responseQ + pure $ Set.fromList recvdParts + where + sendAndAck :: TBQueue FediChordMessage -- ^ the queue for putting in the received responses + -> Socket -- ^ the socket used for sending and receiving for this particular remote node + -> Map.Map Integer BS.ByteString -- ^ the remaining unacked request parts + -> IO () + sendAndAck responseQueue sock remainingSends = do + sendMany sock $ Map.elems remainingSends + -- if all requests have been acked/ responded to, return prematurely + recvLoop responseQueue remainingSends Set.empty Nothing + recvLoop :: TBQueue FediChordMessage -- ^ the queue for putting in the received responses + -> Map.Map Integer BS.ByteString -- ^ the remaining unacked request parts + -> Set.Set Integer -- ^ already received response part numbers + -> Maybe Integer -- ^ total number of response parts if already known + -> IO () + recvLoop responseQueue remainingSends' receivedPartNums totalParts = do + -- 65535 is maximum length of UDP packets, as long as + -- no IPv6 jumbograms are used + response <- deserialiseMessage <$> recv sock 65535 + case response of + Right msg@Response{} -> do + atomically $ writeTBQueue responseQueue msg + let + newTotalParts = if isFinalPart msg then Just (part msg) else totalParts + newRemaining = Map.delete (part msg) remainingSends' + newReceivedParts = Set.insert (part msg) receivedPartNums + if Map.null newRemaining && maybe False (\p -> Set.size receivedPartNums == fromIntegral p) newTotalParts + then pure () + else recvLoop responseQueue newRemaining receivedPartNums newTotalParts + -- drop errors and invalid messages + Left _ -> recvLoop responseQueue remainingSends' receivedPartNums totalParts + + +-- | enqueue a list of RemoteCacheEntries to be added to the global NodeCache +queueAddEntries :: Foldable c => c RemoteCacheEntry + -> LocalNodeState + -> IO () +queueAddEntries entries ns = do + now <- getPOSIXTime + forM_ entries $ \entry -> atomically $ writeTQueue (cacheWriteQueue ns) $ addCacheEntryPure now entry + + +-- | enque a list of node IDs to be deleted from the global NodeCache +queueDeleteEntries :: Foldable c + => c NodeID + -> LocalNodeState + -> IO () +queueDeleteEntries ids ns = forM_ ids $ atomically . writeTQueue (cacheWriteQueue ns) . deleteCacheEntry + + +-- | enque a single node ID to be deleted from the global NodeCache +queueDeleteEntry :: NodeID + -> LocalNodeState + -> IO () +queueDeleteEntry toDelete = queueDeleteEntries $ Identity toDelete + -- | retry an IO action at most *i* times until it delivers a result attempts :: Int -- ^ number of retries *i* -> IO (Maybe a) -- ^ action to retry @@ -210,3 +740,39 @@ attempts i action = do case actionResult of Nothing -> attempts (i-1) action Just res -> pure $ Just res + +-- ====== network socket operations ====== + +-- | resolve a specified host and return the 'AddrInfo' for it. +-- If no hostname or IP is specified, the 'AddrInfo' can be used to bind to all +-- addresses; +-- if no port is specified an arbitrary free port is selected. +resolve :: Maybe String -- ^ hostname or IP address to be resolved + -> Maybe PortNumber -- ^ port number of either local bind or remote + -> IO AddrInfo +resolve host port = let + hints = defaultHints { addrFamily = AF_INET6, addrSocketType = Datagram + , addrFlags = [AI_PASSIVE] } + in + head <$> getAddrInfo (Just hints) host (show <$> port) + +-- | create an unconnected UDP Datagram 'Socket' bound to the specified address +mkServerSocket :: HostAddress6 -> PortNumber -> IO Socket +mkServerSocket ip port = do + sockAddr <- addrAddress <$> resolve (Just $ show . fromHostAddress6 $ ip) (Just port) + sock <- socket AF_INET6 Datagram defaultProtocol + setSocketOption sock IPv6Only 1 + bind sock sockAddr + pure sock + +-- | create a UDP datagram socket, connected to a destination. +-- The socket gets an arbitrary free local port assigned. +mkSendSocket :: String -- ^ destination hostname or IP + -> PortNumber -- ^ destination port + -> IO Socket -- ^ a socket with an arbitrary source port +mkSendSocket dest destPort = do + destAddr <- addrAddress <$> resolve (Just dest) (Just destPort) + sendSock <- socket AF_INET6 Datagram defaultProtocol + setSocketOption sendSock IPv6Only 1 + connect sendSock destAddr + pure sendSock diff --git a/src/Hash2Pub/FediChord.hs b/src/Hash2Pub/FediChord.hs index 78dc711..2b9a2ef 100644 --- a/src/Hash2Pub/FediChord.hs +++ b/src/Hash2Pub/FediChord.hs @@ -1,6 +1,7 @@ -{-# LANGUAGE DataKinds #-} -{-# LANGUAGE GeneralizedNewtypeDeriving #-} -{-# LANGUAGE OverloadedStrings #-} +{-# LANGUAGE DataKinds #-} +{-# LANGUAGE DerivingStrategies #-} +{-# LANGUAGE FlexibleContexts #-} +{-# LANGUAGE OverloadedStrings #-} {- | Module : FediChord Description : An opinionated implementation of the EpiChord DHT by Leong et al. @@ -16,14 +17,10 @@ module Hash2Pub.FediChord ( , getNodeID , toNodeID , NodeState (..) - , InternalNodeState (..) - , getNodeCacheRef - , putNodeCache - , getSuccessors - , putSuccessors - , getPredecessors - , putPredecessors - , getLNumBestNodes + , LocalNodeState (..) + , RemoteNodeState (..) + , setSuccessors + , setPredecessors , NodeCache , CacheEntry(..) , cacheGetNodeStateUnvalidated @@ -41,427 +38,94 @@ module Hash2Pub.FediChord ( , bsAsIpAddr , FediChordConf(..) , fediChordInit + , fediChordJoin + , fediChordBootstrapJoin + , tryBootstrapJoining + , fediMainThreads + , RealNode (..) , nodeStateInit , mkServerSocket + , mkSendSocket , resolve , cacheWriter + , joinOnNewEntriesThread ) where -import Control.Exception -import qualified Data.Map.Strict as Map -import Data.Maybe (fromMaybe, isJust, mapMaybe) -import Data.Time.Clock.POSIX -import Network.Socket - --- for hashing and ID conversion +import Control.Applicative ((<|>)) +import Control.Concurrent +import Control.Concurrent.Async import Control.Concurrent.STM import Control.Concurrent.STM.TQueue -import Control.Monad (forever) +import Control.Concurrent.STM.TVar +import Control.Exception +import Control.Monad (forM_, forever) +import Control.Monad.Except import Crypto.Hash import qualified Data.ByteArray as BA import qualified Data.ByteString as BS import qualified Data.ByteString.UTF8 as BSU -import Data.IORef +import Data.Either (rights) +import Data.Foldable (foldr') +import Data.Functor.Identity import Data.IP (IPv6, fromHostAddress6, toHostAddress6) +import Data.List ((\\)) +import qualified Data.Map.Strict as Map +import Data.Maybe (catMaybes, fromJust, fromMaybe, + isJust, isNothing, mapMaybe) +import qualified Data.Set as Set +import Data.Time.Clock.POSIX import Data.Typeable (Typeable (..), typeOf) import Data.Word import qualified Network.ByteOrder as NetworkBytes +import Network.Socket hiding (recv, recvFrom, send, + sendTo) +import Network.Socket.ByteString +import Safe +import System.Random (randomRIO) +import Hash2Pub.DHTProtocol +import Hash2Pub.FediChordTypes import Hash2Pub.Utils import Debug.Trace (trace) --- define protocol constants --- | static definition of ID length in bits -idBits :: Integer -idBits = 256 - --- |NodeIDs are Integers wrapped in a newtype, to be able to redefine --- their instance behaviour --- --- for being able to check value bounds, the constructor should not be used directly --- and new values are created via @toNodeID@ (newtype constructors cannot be hidden) -newtype NodeID = NodeID { getNodeID :: Integer } deriving (Eq, Show, Enum) - --- |smart data constructor for NodeID that throws a runtime exception for out-of-bounds values. --- When needing a runtime-safe constructor with drawbacks, try @fromInteger@ -toNodeID :: Integer -> NodeID -toNodeID i = assert (i >= getNodeID minBound && i <= getNodeID maxBound) $ NodeID i - --- |NodeIDs are bounded by the value range of an unsigned Integer of length 'idBits' -instance Bounded NodeID where - minBound = NodeID 0 - maxBound = NodeID (2^idBits - 1) - --- |calculations with NodeIDs are modular arithmetic operations -instance Num NodeID where - a + b = NodeID $ (getNodeID a + getNodeID b) `mod` (getNodeID maxBound + 1) - a * b = NodeID $ (getNodeID a * getNodeID b) `mod` (getNodeID maxBound + 1) - a - b = NodeID $ (getNodeID a - getNodeID b) `mod` (getNodeID maxBound + 1) - -- |safe constructor for NodeID values with the drawback, that out-of-bound values are wrapped around - -- with modulo to fit in the allowed value space. For runtime checking, look at @toNodeID@. - fromInteger i = NodeID $ i `mod` (getNodeID maxBound + 1) - signum = NodeID . signum . getNodeID - abs = NodeID . abs . getNodeID -- ToDo: make sure that at creation time only IDs within the range are used - --- | use normal strict monotonic ordering of integers, realising the ring structure --- is done in the @NodeCache@ implementation -instance Ord NodeID where - a `compare` b = getNodeID a `compare` getNodeID b - --- | local comparison of 2 node IDs, only relevant for determining a successor or predecessor on caches with just 2 nodes -localCompare :: NodeID -> NodeID -> Ordering -a `localCompare` b - | getNodeID a == getNodeID b = EQ - | wayForwards > wayBackwards = GT - | otherwise = LT - where - wayForwards = getNodeID (b - a) - wayBackwards = getNodeID (a - b) - - --- | represents a node and all its important state -data NodeState = NodeState { - nid :: NodeID - , domain :: String - -- ^ full public domain name the node is reachable under - , ipAddr :: HostAddress6 - -- the node's public IPv6 address - , dhtPort :: PortNumber - -- ^ port of the DHT itself - , apPort :: Maybe PortNumber - -- ^ port of the ActivityPub relay and storage service - -- might have to be queried first - , vServerID :: Integer - -- ^ ID of this vserver - - -- ==== internal state ==== - , internals :: Maybe InternalNodeState - -- ^ data not present in the representation of remote nodes - -- is put into its own type. - -- This is usually @Nothing@ for all remote nodes. - } deriving (Show, Eq) - --- | encapsulates all data and parameters that are not present for remote nodes -data InternalNodeState = InternalNodeState { - nodeCache :: IORef NodeCache - -- ^ EpiChord node cache with expiry times for nodes - -- as the map is ordered, lookups for the closes preceding node can be done using @lookupLT@. - -- encapsulated into an IORef for allowing concurrent reads without locking - , cacheWriteQueue :: TQueue (NodeCache -> NodeCache) - -- ^ cache updates are not written directly to the 'nodeCache' but queued and - -- only processed by a single writer thread to prevent lost updates. - -- All nodeCache modifying functions have to be partially applied enough before - -- being put into the queue. - -- - , successors :: [NodeID] -- could be a set instead as these are ordered as well - -- ^ successor nodes in ascending order by distance - , predecessors :: [NodeID] - -- ^ predecessor nodes in ascending order by distance - ----- protocol parameters ----- - -- TODO: evaluate moving these somewhere else - , kNeighbours :: Int - -- ^ desired length of predecessor and successor list - -- needs to be parameterisable for simulation purposes - , lNumBestNodes :: Int - -- ^ number of best next hops to provide - -- needs to be parameterisable for simulation purposes - , pNumParallelQueries :: Int - -- ^ number of parallel sent queries - -- needs to be parameterisable for simulation purposes - , jEntriesPerSlice :: Int - -- ^ number of desired entries per cache slice - -- needs to be parameterisable for simulation purposes - } deriving (Show, Eq) - --- | defining Show instances to be able to print NodeState for debug purposes -instance Typeable a => Show (IORef a) where - show x = show (typeOf x) - -instance Typeable a => Show (TQueue a) where - show x = show (typeOf x) - --- | extract a value from the internals of a 'NodeState' -getInternals_ :: (InternalNodeState -> a) -> NodeState -> Maybe a -getInternals_ func ns = func <$> internals ns - --- could be done better with lenses --- | convenience function that updates an internal value of a NodeState -putInternals_ :: (InternalNodeState -> InternalNodeState) -> NodeState -> NodeState -putInternals_ func ns = let - newInternals = func <$> internals ns - in - ns {internals = newInternals } - --- | convenience function for extracting the 'NodeCache' from a 'NodeState' -getNodeCacheRef :: NodeState -> Maybe (IORef NodeCache) -getNodeCacheRef = getInternals_ nodeCache - --- | convenience function for updating the 'NodeCache' on 'NodeState' s that have --- internals. --- NodeStates without a cache (without internals) are returned unchanged -putNodeCache :: IORef NodeCache -> NodeState -> NodeState -putNodeCache nc = putInternals_ (\i -> i {nodeCache = nc}) - -getCacheWriteQueue :: NodeState -> Maybe (TQueue (NodeCache -> NodeCache)) -getCacheWriteQueue = getInternals_ cacheWriteQueue - --- | convenience function for extracting the @successors@ from a 'NodeState' -getSuccessors :: NodeState -> Maybe [NodeID] -getSuccessors = getInternals_ successors - --- | convenience function that updates the successors of a NodeState -putSuccessors :: [NodeID] -> NodeState -> NodeState -putSuccessors succ' = putInternals_ (\i -> i {successors = succ'}) - --- | convenience function for extracting the @predecessors@ from a 'NodeState' -getPredecessors :: NodeState -> Maybe [NodeID] -getPredecessors = getInternals_ predecessors - --- | convenience function that updates the predecessors of a NodeState -putPredecessors :: [NodeID] -> NodeState -> NodeState -putPredecessors pred' = putInternals_ (\i -> i {predecessors = pred'}) - --- | convenience function for extracting the @lNumBestNodes@ from a 'NodeState' -getLNumBestNodes :: NodeState -> Maybe Int -getLNumBestNodes = getInternals_ lNumBestNodes - -type NodeCache = Map.Map NodeID CacheEntry - --- |an entry of the 'nodeCache' can hold 2 different kinds of data -data CacheEntry = - -- | an entry representing its validation status, the node state and its timestamp - NodeEntry Bool NodeState POSIXTime - -- | a proxy field for closing the ring structure, indicating the lookup shall be - -- resumed at the given @NodeID@ unless the @ProxyEntry@ itself holds a @NodeEntry@ - | ProxyEntry (NodeID, ProxyDirection) (Maybe CacheEntry) - deriving (Show, Eq) - --- | as a compromise, only NodeEntry components are ordered by their NodeID --- while ProxyEntry components should never be tried to be ordered. -instance Ord CacheEntry where - - a `compare` b = compare (extractID a) (extractID b) - where - extractID (NodeEntry _ eState _) = nid eState - extractID (ProxyEntry _ _) = error "proxy entries should never appear outside of the NodeCache" - -data ProxyDirection = Backwards | Forwards deriving (Show, Eq) - -instance Enum ProxyDirection where - toEnum (-1) = Backwards - toEnum 1 = Forwards - toEnum _ = error "no such ProxyDirection" - fromEnum Backwards = - 1 - fromEnum Forwards = 1 - ---- useful function for getting entries for a full cache transfer -cacheEntries :: NodeCache -> [CacheEntry] -cacheEntries ncache = mapMaybe extractNodeEntries $ Map.elems ncache - where - extractNodeEntries (ProxyEntry _ possibleEntry) = possibleEntry - --- | An empty @NodeCache@ needs to be initialised with 2 proxy entries, --- linking the modular name space together by connecting @minBound@ and @maxBound@ -initCache :: NodeCache -initCache = Map.fromList $ proxyEntry <$> [(maxBound, (minBound, Forwards)), (minBound, (maxBound, Backwards))] - where - proxyEntry (from,to) = (from, ProxyEntry to Nothing) - --- | Maybe returns the cache entry stored at given key -cacheLookup :: NodeID -- ^lookup key - -> NodeCache -- ^lookup cache - -> Maybe CacheEntry -cacheLookup key cache = case Map.lookup key cache of - Just (ProxyEntry _ result) -> result - res -> res - --- | a wrapper around lookup functions, making the lookup redirectable by a @ProxyEntry@ --- to simulate a modular ring -lookupWrapper :: (NodeID -> NodeCache -> Maybe (NodeID, CacheEntry)) -> (NodeID -> NodeCache -> Maybe (NodeID, CacheEntry)) -> ProxyDirection -> NodeID -> NodeCache -> Maybe CacheEntry -lookupWrapper f fRepeat direction key cache = - case f key cache of - -- the proxy entry found holds a - Just (_, ProxyEntry _ (Just entry@NodeEntry{})) -> Just entry - -- proxy entry holds another proxy entry, this should not happen - Just (_, ProxyEntry _ (Just (ProxyEntry _ _))) -> Nothing - -- proxy entry without own entry is a pointer on where to continue - -- if lookup direction is the same as pointer direction: follow pointer - Just (foundKey, ProxyEntry (pointerID, pointerDirection) Nothing) -> - let newKey = if pointerDirection == direction - then pointerID - else foundKey + (fromInteger . toInteger . fromEnum $ direction) - in if cacheNotEmpty cache - then lookupWrapper fRepeat fRepeat direction newKey cache - else Nothing - -- normal entries are returned - Just (_, entry@NodeEntry{}) -> Just entry - Nothing -> Nothing - where - cacheNotEmpty :: NodeCache -> Bool - cacheNotEmpty cache' = (Map.size cache' > 2) -- there are more than the 2 ProxyEntries - || isJust ( cacheLookup minBound cache') -- or one of the ProxyEntries holds a node - || isJust (cacheLookup maxBound cache') - --- | find the successor node to a given key on a modular EpiChord ring cache. --- Note: The EpiChord definition of "successor" includes the node at the key itself, --- if existing. -cacheLookupSucc :: NodeID -- ^lookup key - -> NodeCache -- ^ring cache - -> Maybe CacheEntry -cacheLookupSucc = lookupWrapper Map.lookupGE Map.lookupGE Forwards - --- | find the predecessor node to a given key on a modular EpiChord ring cache. -cacheLookupPred :: NodeID -- ^lookup key - -> NodeCache -- ^ring cache - -> Maybe CacheEntry -cacheLookupPred = lookupWrapper Map.lookupLT Map.lookupLE Backwards - --- clean up cache entries: once now - entry > maxAge --- transfer difference now - entry to other node - --- | return the @NodeState@ data from a cache entry without checking its validation status -cacheGetNodeStateUnvalidated :: CacheEntry -> NodeState -cacheGetNodeStateUnvalidated (NodeEntry _ nState _) = nState -cacheGetNodeStateUnvalidated (ProxyEntry _ (Just entry)) = cacheGetNodeStateUnvalidated entry -cacheGetNodeStateUnvalidated _ = error "trying to pure empty node state, please report a bug" - --- | converts a 'HostAddress6' IP address to a big-endian strict ByteString -ipAddrAsBS :: HostAddress6 -> BS.ByteString -ipAddrAsBS (a, b, c, d) = mconcat $ fmap NetworkBytes.bytestring32 [a, b, c, d] - --- | converts a ByteString in big endian order to an IPv6 address 'HostAddress6' -bsAsIpAddr :: BS.ByteString -> HostAddress6 -bsAsIpAddr bytes = (a,b,c,d) - where - a:b:c:d:_ = fmap NetworkBytes.word32 . chunkBytes 4 $ bytes - - --- | generates a 256 bit long NodeID using SHAKE128, represented as ByteString -genNodeIDBS :: HostAddress6 -- ^a node's IPv6 address - -> String -- ^a node's 1st and 2nd level domain name - -> Word8 -- ^the used vserver ID - -> BS.ByteString -- ^the NodeID as a 256bit ByteString big-endian unsigned integer -genNodeIDBS ip nodeDomain vserver = - hashIpaddrUpper `BS.append` hashID nodeDomain' `BS.append` hashIpaddLower - where - vsBS = BS.pack [vserver] -- attention: only works for vserver IDs up to 255 - ipaddrNet = BS.take 8 (ipAddrAsBS ip) `BS.append` vsBS - nodeDomain' = BSU.fromString nodeDomain `BS.append` vsBS - hashID bstr = BS.pack . BA.unpack $ (hash bstr :: Digest (SHAKE128 128)) - (hashIpaddrUpper, hashIpaddLower) = BS.splitAt 64 $ hashID ipaddrNet - - --- | generates a 256 bit long @NodeID@ using SHAKE128 -genNodeID :: HostAddress6 -- ^a node's IPv6 address - -> String -- ^a node's 1st and 2nd level domain name - -> Word8 -- ^the used vserver ID - -> NodeID -- ^the generated @NodeID@ -genNodeID ip nodeDomain vs = NodeID . byteStringToUInteger $ genNodeIDBS ip nodeDomain vs - --- | generates a 256 bit long key identifier, represented as ByteString, for looking up its data on the DHT -genKeyIDBS :: String -- ^the key string - -> BS.ByteString -- ^the key ID represented as a @ByteString@ -genKeyIDBS key = BS.pack . BA.unpack $ (hash (BSU.fromString key) :: Digest SHA3_256) - --- | generates a 256 bit long key identifier for looking up its data on the DHT -genKeyID :: String -- ^the key string - -> NodeID -- ^the key ID -genKeyID = NodeID . byteStringToUInteger . genKeyIDBS - - --- | parses the bit pattern of a ByteString as an unsigned Integer in Big Endian order --- by iterating it byte-wise from the back and shifting the byte values according to their offset -byteStringToUInteger :: BS.ByteString -> Integer -byteStringToUInteger bs = sum $ parsedBytes 0 bs - where - parsedBytes :: Integer -> BS.ByteString -> [ Integer ] - parsedBytes offset uintBs = case BS.unsnoc uintBs of - Nothing -> [] - Just (bs', w) -> parseWithOffset offset w : parsedBytes (offset+1) bs' - - parseWithOffset :: Integer -> Word8 -> Integer - parseWithOffset 0 word = toInteger word -- a shift of 0 is always 0 - parseWithOffset offset word = toInteger word * 2^(8 * offset) - - - --- TODO: complete rewrite --- |checks wether the cache entries fulfill the logarithmic EpiChord invariant --- of having j entries per slice, and creates a list of necessary lookup actions. --- Should be invoked periodically. ---checkCacheSlices :: NodeState -> IO [()] ---checkCacheSlices state = case getNodeCache state of --- -- don't do anything on nodes without a cache --- Nothing -> pure [()] --- Just cache' -> checkSlice jEntries (nid state) startBound lastSucc =<< readIORef cache' --- -- TODO: do the same for predecessors --- where --- jEntries = fromMaybe 0 $ getInternals_ jEntriesPerSlice state --- lastSucc = last <$> maybeEmpty (fromMaybe [] $ getSuccessors state) --- startBound = NodeID 2^(255::Integer) + nid state --- checkSlice :: Int -> NodeID -> NodeID -> Maybe NodeID -> NodeCache -> [IO ()] --- checkSlice _ _ _ Nothing _ = [] --- checkSlice j ownID upperBound (Just lastSuccNode) cache --- | upperBound < lastSuccNode = [] --- | otherwise = --- -- continuously half the DHT namespace, take the upper part as a slice, --- -- check for existing entries in that slice and create a lookup action --- -- and recursively do this on the lower half. --- -- recursion edge case: all successors/ predecessors need to be in the --- -- first slice. --- let --- diff = getNodeID $ upperBound - ownID --- lowerBound = ownID + NodeID (diff `div` 2) --- in --- -- TODO: replace empty IO actions with actual lookups to middle of slice --- -- TODO: validate ID before adding to cache --- case Map.lookupLT upperBound cache of --- Nothing -> pure () : checkSlice j ownID lowerBound (Just lastSuccNode) cache --- Just (matchID, _) -> --- if --- matchID <= lowerBound then pure () : checkSlice j ownID lowerBound (Just lastSuccNode) cache --- else --- checkSlice j ownID lowerBound (Just lastSuccNode) cache - - --- Todo: DHT backend can learn potential initial bootstrapping points through the instances mentioned in the received AP-relay messages --- persist them on disk so they can be used for all following bootstraps - --- | configuration values used for initialising the FediChord DHT -data FediChordConf = FediChordConf { - confDomain :: String - , confIP :: HostAddress6 - , confDhtPort :: Int - } deriving (Show, Eq) - -- | initialise data structures, compute own IDs and bind to listening socket -- ToDo: load persisted state, thus this function already operates in IO -fediChordInit :: FediChordConf -> IO (Socket, NodeState) -fediChordInit conf = do - initialState <- nodeStateInit conf - serverSock <- mkServerSocket (ipAddr initialState) (dhtPort initialState) - pure (serverSock, initialState) +fediChordInit :: FediChordConf -> IO (Socket, LocalNodeStateSTM) +fediChordInit initConf = do + let realNode = RealNode { + vservers = [] + , nodeConfig = initConf + , bootstrapNodes = confBootstrapNodes initConf + } + realNodeSTM <- newTVarIO realNode + initialState <- nodeStateInit realNodeSTM + initialStateSTM <- newTVarIO initialState + serverSock <- mkServerSocket (getIpAddr initialState) (getDhtPort initialState) + pure (serverSock, initialStateSTM) -- | initialises the 'NodeState' for this local node. -- Separated from 'fediChordInit' to be usable in tests. -nodeStateInit :: FediChordConf -> IO NodeState -nodeStateInit conf = do - cacheRef <- newIORef initCache +nodeStateInit :: RealNodeSTM -> IO LocalNodeState +nodeStateInit realNodeSTM = do + realNode <- readTVarIO realNodeSTM + cacheSTM <- newTVarIO initCache q <- atomically newTQueue let - initialState = NodeState { + conf = nodeConfig realNode + vsID = 0 + containedState = RemoteNodeState { domain = confDomain conf , ipAddr = confIP conf - , nid = genNodeID (confIP conf) (confDomain conf) 0 + , nid = genNodeID (confIP conf) (confDomain conf) $ fromInteger vsID , dhtPort = toEnum $ confDhtPort conf - , apPort = Nothing - , vServerID = 0 - , internals = Just internalsInit - } - internalsInit = InternalNodeState { - nodeCache = cacheRef + , servicePort = 0 + , vServerID = vsID + } + initialState = LocalNodeState { + nodeState = containedState + , nodeCacheSTM = cacheSTM , cacheWriteQueue = q , successors = [] , predecessors = [] @@ -469,67 +133,504 @@ nodeStateInit conf = do , lNumBestNodes = 3 , pNumParallelQueries = 2 , jEntriesPerSlice = 2 + , parentRealNode = realNodeSTM } pure initialState ---fediChordJoin :: NodeState -- ^ the local 'NodeState' --- -> (String, PortNumber) -- ^ domain and port of a bootstrapping node --- -> Socket -- ^ socket used for sending and receiving the join message --- -> IO Either String NodeState -- ^ the joined 'NodeState' after a successful --- -- join, otherwise an error message ---fediChordJoin ns (joinHost, joinPort) sock = do --- -- 1. get routed to destination until FOUND --- -- 2. then send a join to the currently responsible node --- -- ToDo: implement cache management, as already all received replies should be stored in cache --- +-- | Join a new node into the DHT, using a provided bootstrap node as initial cache seed +-- for resolving the new node's position. +fediChordBootstrapJoin :: LocalNodeStateSTM -- ^ the local 'NodeState' + -> (String, PortNumber) -- ^ domain and port of a bootstrapping node + -> IO (Either String LocalNodeStateSTM) -- ^ the joined 'NodeState' after a + -- successful join, otherwise an error message +fediChordBootstrapJoin nsSTM bootstrapNode = do + -- can be invoked multiple times with all known bootstrapping nodes until successfully joined + ns <- readTVarIO nsSTM + runExceptT $ do + -- 1. get routed to the currently responsible node + lookupResp <- liftIO $ bootstrapQueryId nsSTM bootstrapNode $ getNid ns + currentlyResponsible <- liftEither lookupResp + liftIO . putStrLn $ "Trying to join on " <> show (getNid currentlyResponsible) + -- 2. then send a join to the currently responsible node + joinResult <- liftIO $ requestJoin currentlyResponsible nsSTM + liftEither joinResult + +-- Periodically lookup own ID through a random bootstrapping node to discover and merge separated DHT clusters. +-- Unjoined try joining instead. +convergenceSampleThread :: LocalNodeStateSTM -> IO () +convergenceSampleThread nsSTM = forever $ do + nsSnap <- readTVarIO nsSTM + parentNode <- readTVarIO $ parentRealNode nsSnap + if isJoined nsSnap + then + runExceptT (do + -- joined node: choose random node, do queryIDLoop, compare result with own responsibility + let bss = bootstrapNodes parentNode + randIndex <- liftIO $ randomRIO (0, length bss - 1) + chosenNode <- maybe (throwError "invalid bootstrapping node index") pure $ atMay bss randIndex + lookupResult <- liftIO $ bootstrapQueryId nsSTM chosenNode (getNid nsSnap) + currentlyResponsible <- liftEither lookupResult + if getNid currentlyResponsible /= getNid nsSnap + -- if mismatch, stabilise on the result, else do nothing + then do + stabResult <- liftIO $ requestStabilise nsSnap currentlyResponsible + (preds, succs) <- liftEither stabResult + -- TODO: verify neighbours before adding, see #55 + liftIO . atomically $ do + ns <- readTVar nsSTM + writeTVar nsSTM $ addPredecessors preds ns + else pure () + ) >> pure () + -- unjoined node: try joining through all bootstrapping nodes + else tryBootstrapJoining nsSTM >> pure () + let delaySecs = confBootstrapSamplingInterval . nodeConfig $ parentNode + threadDelay $ delaySecs * 10^6 + + +-- | Try joining the DHT through any of the bootstrapping nodes until it succeeds. +tryBootstrapJoining :: LocalNodeStateSTM -> IO (Either String LocalNodeStateSTM) +tryBootstrapJoining nsSTM = do + bss <- atomically $ do + nsSnap <- readTVar nsSTM + realNodeSnap <- readTVar $ parentRealNode nsSnap + pure $ bootstrapNodes realNodeSnap + tryJoining bss + where + tryJoining (bn:bns) = do + j <- fediChordBootstrapJoin nsSTM bn + case j of + Left err -> putStrLn ("join error: " <> err) >> tryJoining bns + Right joined -> pure $ Right joined + tryJoining [] = pure $ Left "Exhausted all bootstrap points for joining." + + +-- | Look up a key just based on the responses of a single bootstrapping node. +bootstrapQueryId :: LocalNodeStateSTM -> (String, PortNumber) -> NodeID -> IO (Either String RemoteNodeState) +bootstrapQueryId nsSTM (bootstrapHost, bootstrapPort) targetID = do + ns <- readTVarIO nsSTM + bootstrapResponse <- bracket (mkSendSocket bootstrapHost bootstrapPort) close ( + -- Initialise an empty cache only with the responses from a bootstrapping node + fmap Right . sendRequestTo 5000 3 (lookupMessage targetID ns Nothing) + ) + `catch` (\e -> pure . Left $ "Error at bootstrap QueryId: " <> displayException (e :: IOException)) + + case bootstrapResponse of + Left err -> pure $ Left err + Right resp + | resp == Set.empty -> pure . Left $ "Bootstrapping node " <> show bootstrapHost <> " gave no response." + | otherwise -> do + now <- getPOSIXTime + -- create new cache with all returned node responses + let bootstrapCache = + -- traverse response parts + foldr' (\resp cacheAcc -> case queryResult <$> payload resp of + Nothing -> cacheAcc + Just (FOUND result1) -> addCacheEntryPure now (RemoteCacheEntry result1 now) cacheAcc + Just (FORWARD resultset) -> foldr' (addCacheEntryPure now) cacheAcc resultset + ) + initCache resp + currentlyResponsible <- queryIdLookupLoop bootstrapCache ns 50 $ getNid ns + pure $ Right currentlyResponsible + + +-- | join a node to the DHT using the global node cache +-- node's position. +fediChordJoin :: LocalNodeStateSTM -- ^ the local 'NodeState' + -> IO (Either String LocalNodeStateSTM) -- ^ the joined 'NodeState' after a + -- successful join, otherwise an error message +fediChordJoin nsSTM = do + ns <- readTVarIO nsSTM + -- 1. get routed to the currently responsible node + currentlyResponsible <- requestQueryID ns $ getNid ns + putStrLn $ "Trying to join on " <> show (getNid currentlyResponsible) + -- 2. then send a join to the currently responsible node + joinResult <- requestJoin currentlyResponsible nsSTM + case joinResult of + Left err -> pure . Left $ "Error joining on " <> err + Right joinedNS -> pure . Right $ joinedNS + + +-- | Wait for new cache entries to appear and then try joining on them. +-- Exits after successful joining. +joinOnNewEntriesThread :: LocalNodeStateSTM -> IO () +joinOnNewEntriesThread nsSTM = loop + where + loop = do + nsSnap <- readTVarIO nsSTM + (lookupResult, cache) <- atomically $ do + cache <- readTVar $ nodeCacheSTM nsSnap + case queryLocalCache nsSnap cache 1 (getNid nsSnap) of + -- empty cache, block until cache changes and then retry + (FORWARD s) | Set.null s -> retry + result -> pure (result, cache) + case lookupResult of + -- already joined + FOUND _ -> do + print =<< readTVarIO nsSTM + pure () + -- otherwise try joining + FORWARD _ -> do + joinResult <- fediChordJoin nsSTM + either + -- on join failure, sleep and retry + -- TODO: make delay configurable + (const $ threadDelay (30 * 10^6) >> loop) + (const $ pure ()) + joinResult + emptyset = Set.empty -- because pattern matches don't accept qualified names + -- | cache updater thread that waits for incoming NodeCache update instructions on -- the node's cacheWriteQueue and then modifies the NodeCache as the single writer. -cacheWriter :: NodeState -> IO () -cacheWriter ns = do - let writeQueue' = getCacheWriteQueue ns - case writeQueue' of - Nothing -> pure () - Just writeQueue -> forever $ do - f <- atomically $ readTQueue writeQueue +cacheWriter :: LocalNodeStateSTM -> IO () +cacheWriter nsSTM = + forever $ atomically $ do + ns <- readTVar nsSTM + cacheModifier <- readTQueue $ cacheWriteQueue ns + modifyTVar' (nodeCacheSTM ns) cacheModifier + + +-- TODO: make max entry age configurable +maxEntryAge :: POSIXTime +maxEntryAge = 600 + + +-- | Periodically iterate through cache, clean up expired entries and verify unverified ones +cacheVerifyThread :: LocalNodeStateSTM -> IO () +cacheVerifyThread nsSTM = forever $ do + putStrLn "cache verify run: begin" + -- get cache + (ns, cache) <- atomically $ do + ns <- readTVar nsSTM + cache <- readTVar $ nodeCacheSTM ns + pure (ns, cache) + -- iterate entries: + -- for avoiding too many time syscalls, get current time before iterating. + now <- getPOSIXTime + forM_ (cacheEntries cache) (\(CacheEntry validated node ts) -> + -- case too old: delete (future work: decide whether pinging and resetting timestamp is better) + if (now - ts) > maxEntryAge + then + queueDeleteEntry (getNid node) ns + -- case unverified: try verifying, otherwise delete + else if not validated + then do + -- marking as verified is done by 'requestPing' as well + pong <- requestPing ns node + either (\_-> + queueDeleteEntry (getNid node) ns + ) + (\vss -> + if node `notElem` vss + then queueDeleteEntry (getNid node) ns + -- after verifying a node, check whether it can be a closer neighbour + else do + if node `isPossiblePredecessor` ns + then atomically $ do + ns' <- readTVar nsSTM + writeTVar nsSTM $ addPredecessors [node] ns' + else pure () + if node `isPossibleSuccessor` ns + then atomically $ do + ns' <- readTVar nsSTM + writeTVar nsSTM $ addSuccessors [node] ns' + else pure () + ) pong + else pure () + ) + + -- check the cache invariant per slice and, if necessary, do a single lookup to the + -- middle of each slice not verifying the invariant + latestNs <- readTVarIO nsSTM + latestCache <- readTVarIO $ nodeCacheSTM latestNs + let nodesToQuery targetID = case queryLocalCache latestNs latestCache (lNumBestNodes latestNs) targetID of + FOUND node -> [node] + FORWARD nodeSet -> remoteNode <$> Set.elems nodeSet + forM_ (checkCacheSliceInvariants latestNs latestCache) (\targetID -> + forkIO $ sendQueryIdMessages targetID latestNs (Just (1 + jEntriesPerSlice latestNs)) (nodesToQuery targetID) >> pure () -- ask for 1 entry more than j because of querying the middle + ) + + putStrLn "cache verify run: end" + threadDelay $ 10^6 * round maxEntryAge `div` 20 + + +-- | Checks the invariant of at least @jEntries@ per cache slice. +-- If this invariant does not hold, the middle of the slice is returned for +-- making lookups to that ID +checkCacheSliceInvariants :: LocalNodeState + -> NodeCache + -> [NodeID] -- ^ list of middle IDs of slices not + -- ^ fulfilling the invariant +checkCacheSliceInvariants ns + | isJoined ns = checkPredecessorSlice jEntries (getNid ns) startBound lastPred <> checkSuccessorSlice jEntries (getNid ns) startBound lastSucc + | otherwise = const [] + where + jEntries = jEntriesPerSlice ns + lastPred = getNid <$> lastMay (predecessors ns) + lastSucc = getNid <$> lastMay (successors ns) + -- start slice boundary: 1/2 key space + startBound = getNid ns + 2^(idBits - 1) + + checkSuccessorSlice :: Integral i => i -> NodeID -> NodeID -> Maybe NodeID -> NodeCache -> [NodeID] + checkSuccessorSlice _ _ _ Nothing _ = [] + checkSuccessorSlice j ownID upperBound (Just lastSuccID) cache + | (upperBound `localCompare` lastSuccID) == LT = [] + | otherwise = + let + diff = getNodeID $ upperBound - ownID + lowerBound = ownID + fromInteger (diff `div` 2) + middleID = lowerBound + fromInteger (diff `div` 4) + lookupResult = Set.map (getNid . remoteNode) $ closestCachePredecessors jEntries upperBound cache + in + -- check whether j entries are in the slice + if length lookupResult == jEntries + && all (\r -> (r `localCompare` lowerBound) == GT) lookupResult + && all (\r -> (r `localCompare` upperBound) == LT) lookupResult + then checkSuccessorSlice j ownID (lowerBound - 1) (Just lastSuccID) cache + -- if not enough entries, add the middle of the slice to list + else middleID : checkSuccessorSlice j ownID (lowerBound - 1) (Just lastSuccID) cache + + checkPredecessorSlice :: Integral i => i -> NodeID -> NodeID -> Maybe NodeID -> NodeCache -> [NodeID] + checkPredecessorSlice _ _ _ Nothing _ = [] + checkPredecessorSlice j ownID lowerBound (Just lastPredID) cache + | (lowerBound `localCompare` lastPredID) == GT = [] + | otherwise = + let + diff = getNodeID $ ownID - lowerBound + upperBound = ownID - fromInteger (diff `div` 2) + middleID = lowerBound + fromInteger (diff `div` 4) + lookupResult = Set.map (getNid . remoteNode) $ closestCachePredecessors jEntries upperBound cache + in + -- check whether j entries are in the slice + if length lookupResult == jEntries + && all (\r -> (r `localCompare` lowerBound) == GT) lookupResult + && all (\r -> (r `localCompare` upperBound) == LT) lookupResult + then checkPredecessorSlice j ownID (upperBound + 1) (Just lastPredID) cache + -- if not enough entries, add the middle of the slice to list + else middleID : checkPredecessorSlice j ownID (upperBound + 1) (Just lastPredID) cache + + +-- | Periodically send @StabiliseRequest' s to the closest neighbour nodes, until +-- one responds, and get their neighbours for maintaining the own neighbour lists. +-- If necessary, request new neighbours. +stabiliseThread :: LocalNodeStateSTM -> IO () +stabiliseThread nsSTM = forever $ do + ns <- readTVarIO nsSTM + + putStrLn "stabilise run: begin" + print ns + + -- iterate through the same snapshot, collect potential new neighbours + -- and nodes to be deleted, and modify these changes only at the end of + -- each stabilise run. + -- This decision makes iterating through a potentially changing list easier. + + -- don't contact all neighbours unless the previous one failed/ Left ed + + predStabilise <- stabiliseClosestResponder ns predecessors 1 [] + succStabilise <- stabiliseClosestResponder ns predecessors 1 [] + + let + (predDeletes, predNeighbours) = either (const ([], [])) id predStabilise + (succDeletes, succNeighbours) = either (const ([], [])) id succStabilise + allDeletes = predDeletes <> succDeletes + allNeighbours = predNeighbours <> succNeighbours + + -- now actually modify the node state's neighbours + updatedNs <- atomically $ do + newerNsSnap <- readTVar nsSTM let - refModifier :: NodeCache -> (NodeCache, ()) - refModifier nc = (f nc, ()) - maybe (pure ()) ( - \ref -> atomicModifyIORef' ref refModifier - ) $ getNodeCacheRef ns + -- sorting and taking only k neighbours is taken care of by the + -- setSuccessors/ setPredecessors functions + newPreds = (predecessors newerNsSnap \\ allDeletes) <> allNeighbours + newSuccs = (successors newerNsSnap \\ allDeletes) <> allNeighbours + newNs = setPredecessors newPreds . setSuccessors newSuccs $ newerNsSnap + writeTVar nsSTM newNs + pure newNs + -- delete unresponding nodes from cache as well + mapM_ (atomically . writeTQueue (cacheWriteQueue updatedNs) . deleteCacheEntry . getNid) allDeletes --- ====== network socket operations ====== + -- try looking up additional neighbours if list too short + forM_ [(length $ predecessors updatedNs)..(kNeighbours updatedNs)] (\_ -> do + ns' <- readTVarIO nsSTM + nextEntry <- requestQueryID ns' $ pred . getNid $ lastDef (toRemoteNodeState ns') (predecessors ns') + atomically $ do + latestNs <- readTVar nsSTM + writeTVar nsSTM $ addPredecessors [nextEntry] latestNs + ) --- | resolve a specified host and return the 'AddrInfo' for it. --- If no hostname or IP is specified, the 'AddrInfo' can be used to bind to all --- addresses; --- if no port is specified an arbitrary free port is selected. -resolve :: Maybe String -- ^ hostname or IP address to be resolved - -> Maybe PortNumber -- ^ port number of either local bind or remote - -> IO AddrInfo -resolve host port = let - hints = defaultHints { addrFamily = AF_INET6, addrSocketType = Datagram - , addrFlags = [AI_PASSIVE] } - in - head <$> getAddrInfo (Just hints) host (show <$> port) + forM_ [(length $ successors updatedNs)..(kNeighbours updatedNs)] (\_ -> do + ns' <- readTVarIO nsSTM + nextEntry <- requestQueryID ns' $ succ . getNid $ lastDef (toRemoteNodeState ns') (successors ns') + atomically $ do + latestNs <- readTVar nsSTM + writeTVar nsSTM $ addSuccessors [nextEntry] latestNs + ) --- | create an unconnected UDP Datagram 'Socket' bound to the specified address -mkServerSocket :: HostAddress6 -> PortNumber -> IO Socket -mkServerSocket ip port = do - sockAddr <- addrAddress <$> resolve (Just $ show . fromHostAddress6 $ ip) (Just port) - sock <- socket AF_INET6 Datagram defaultProtocol - setSocketOption sock IPv6Only 1 - bind sock sockAddr - pure sock + putStrLn "stabilise run: end" + -- TODO: make delay configurable + threadDelay (60 * 10^6) + where + -- | send a stabilise request to the n-th neighbour + -- (specified by the provided getter function) and on failure retr + -- with the n+1-th neighbour. + -- On success, return 2 lists: The failed nodes and the potential neighbours + -- returned by the queried node. + stabiliseClosestResponder :: LocalNodeState -- ^ own node + -> (LocalNodeState -> [RemoteNodeState]) -- ^ getter function for either predecessors or successors + -> Int -- ^ index of neighbour to query + -> [RemoteNodeState] -- ^ delete accumulator + -> IO (Either String ([RemoteNodeState], [RemoteNodeState])) -- ^ (nodes to be deleted, successfully pinged potential neighbours) + stabiliseClosestResponder ns neighbourGetter neighbourNum deleteAcc + | isNothing (currentNeighbour ns neighbourGetter neighbourNum) = pure $ Left "exhausted all neigbours" + | otherwise = do + let node = fromJust $ currentNeighbour ns neighbourGetter neighbourNum + stabResponse <- requestStabilise ns node + case stabResponse of + -- returning @Left@ signifies the need to try again with next from list + Left err -> stabiliseClosestResponder ns neighbourGetter (neighbourNum+1) (node:deleteAcc) + Right (succs, preds) -> do + -- ping each returned node before actually inserting them + -- send pings in parallel, check whether ID is part of the returned IDs + pingThreads <- mapM (async . checkReachability ns) $ preds <> succs + -- ToDo: exception handling, maybe log them + -- filter out own node + checkedNeighbours <- filter (/= toRemoteNodeState ns) . catMaybes . rights <$> mapM waitCatch pingThreads + pure $ Right (deleteAcc, checkedNeighbours) --- | create a UDP datagram socket, connected to a destination. --- The socket gets an arbitrary free local port assigned. -mkSendSocket :: String -- ^ destination hostname or IP - -> PortNumber -- ^ destination port - -> IO Socket -- ^ a socket with an arbitrary source port -mkSendSocket dest destPort = do - destAddr <- addrAddress <$> resolve (Just dest) (Just destPort) - sendSock <- socket AF_INET6 Datagram defaultProtocol - setSocketOption sendSock IPv6Only 1 - pure sendSock + + currentNeighbour ns neighbourGetter = atMay $ neighbourGetter ns + + checkReachability :: LocalNodeState -- ^ this node + -> RemoteNodeState -- ^ node to Ping for reachability + -> IO (Maybe RemoteNodeState) -- ^ if the Pinged node handles the requested node state then that one + checkReachability ns toCheck = do + resp <- requestPing ns toCheck + pure $ either (const Nothing) (\vss -> + if toCheck `elem` vss then Just toCheck else Nothing + ) resp + + +-- | Receives UDP packets and passes them to other threads via the given TQueue. +-- Shall be used as the single receiving thread on the server socket, as multiple +-- threads blocking on the same socket degrades performance. +recvThread :: Socket -- ^ server socket to receive packets from + -> TQueue (BS.ByteString, SockAddr) -- ^ receive queue + -> IO () +recvThread sock recvQ = forever $ do + packet <- recvFrom sock 65535 + atomically $ writeTQueue recvQ packet + +-- | Only thread to send data it gets from a TQueue through the server socket. +sendThread :: Socket -- ^ server socket used for sending + -> TQueue (BS.ByteString, SockAddr) -- ^ send queue + -> IO () +sendThread sock sendQ = forever $ do + (packet, addr) <- atomically $ readTQueue sendQ + sendAllTo sock packet addr + +-- | Sets up and manages the main server threads of FediChord +fediMainThreads :: Socket -> LocalNodeStateSTM -> IO () +fediMainThreads sock nsSTM = do + (\x -> putStrLn $ "launching threads, ns: " <> show x) =<< readTVarIO nsSTM + sendQ <- newTQueueIO + recvQ <- newTQueueIO + -- concurrently launch all handler threads, if one of them throws an exception + -- all get cancelled + concurrently_ + (fediMessageHandler sendQ recvQ nsSTM) $ + concurrently_ (stabiliseThread nsSTM) $ + concurrently_ (cacheVerifyThread nsSTM) $ + concurrently_ (convergenceSampleThread nsSTM) $ + concurrently_ + (sendThread sock sendQ) + (recvThread sock recvQ) + + +-- defining this here as, for now, the RequestMap is only used by fediMessageHandler. +-- Once that changes, move to FediChordTypes +type RequestMap = Map.Map (SockAddr, Integer) RequestMapEntry + +data RequestMapEntry = RequestMapEntry (Set.Set FediChordMessage) (Maybe Integer) + POSIXTime + +-- TODO: make purge age configurable +-- | periodically clean up old request parts +responsePurgeAge :: POSIXTime +responsePurgeAge = 60 -- seconds + +requestMapPurge :: MVar RequestMap -> IO () +requestMapPurge mapVar = forever $ do + rMapState <- takeMVar mapVar + now <- getPOSIXTime + putMVar mapVar $ Map.filter (\entry@(RequestMapEntry _ _ ts) -> + now - ts < responsePurgeAge + ) rMapState + threadDelay $ round responsePurgeAge * 2 * 10^6 + + +-- | Wait for messages, deserialise them, manage parts and acknowledgement status, +-- and pass them to their specific handling function. +fediMessageHandler :: TQueue (BS.ByteString, SockAddr) -- ^ send queue + -> TQueue (BS.ByteString, SockAddr) -- ^ receive queue + -> LocalNodeStateSTM -- ^ acting NodeState + -> IO () +fediMessageHandler sendQ recvQ nsSTM = do + -- Read node state just once, assuming that all relevant data for this function does + -- not change. + -- Other functions are passed the nsSTM reference and thus can get the latest state. + nsSnap <- readTVarIO nsSTM + -- handling multipart messages: + -- Request parts can be insert into a map (key: (sender IP against spoofing, request ID), value: timestamp + set of message parts, handle all of them when size of set == parts) before being handled. This map needs to be purged periodically by a separate thread and can be protected by an MVar for fairness. + requestMap <- newMVar (Map.empty :: RequestMap) + -- run receive loop and requestMapPurge concurrently, so that an exception makes + -- both of them fail + concurrently_ (requestMapPurge requestMap) $ forever $ do + -- wait for incoming messages + (rawMsg, sourceAddr) <- atomically $ readTQueue recvQ + let aMsg = deserialiseMessage rawMsg + either (\_ -> + -- drop invalid messages + pure () + ) + (\validMsg -> + case validMsg of + aRequest@Request{} + -- if not a multipart message, handle immediately. Response is at the same time an ACK + | part aRequest == 1 && isFinalPart aRequest -> + forkIO (handleIncomingRequest nsSTM sendQ (Set.singleton aRequest) sourceAddr) >> pure () + -- otherwise collect all message parts first before handling the whole request + | otherwise -> do + now <- getPOSIXTime + -- critical locking section of requestMap + rMapState <- takeMVar requestMap + -- insert new message and get set + let + theseMaxParts = if isFinalPart aRequest then Just (part aRequest) else Nothing + thisKey = (sourceAddr, requestID aRequest) + newMapState = Map.insertWith (\ + (RequestMapEntry thisMsgSet p' ts) (RequestMapEntry oldMsgSet p'' _) -> + RequestMapEntry (thisMsgSet `Set.union` oldMsgSet) (p' <|> p'') ts + ) + thisKey + (RequestMapEntry (Set.singleton aRequest) theseMaxParts now) + rMapState + -- put map back into MVar, end of critical section + putMVar requestMap newMapState + -- ACK the received part + forM_ (ackRequest (getNid nsSnap) aRequest) $ + \msg -> atomically $ writeTQueue sendQ (msg, sourceAddr) + -- if all parts received, then handle request. + let + (RequestMapEntry theseParts mayMaxParts _) = fromJust $ Map.lookup thisKey newMapState + numParts = Set.size theseParts + if maybe False (numParts ==) (fromIntegral <$> mayMaxParts) + then forkIO (handleIncomingRequest nsSTM sendQ theseParts sourceAddr) >> pure() + else pure() + -- Responses should never arrive on the main server port, as they are always + -- responses to requests sent from dedicated sockets on another port + _ -> pure () + ) + aMsg + + pure () diff --git a/src/Hash2Pub/FediChordTypes.hs b/src/Hash2Pub/FediChordTypes.hs new file mode 100644 index 0000000..296ebfa --- /dev/null +++ b/src/Hash2Pub/FediChordTypes.hs @@ -0,0 +1,604 @@ +{-# LANGUAGE DataKinds #-} +{-# LANGUAGE DerivingStrategies #-} +{-# LANGUAGE GeneralizedNewtypeDeriving #-} +{-# LANGUAGE OverloadedStrings #-} +{-# LANGUAGE RankNTypes #-} + +module Hash2Pub.FediChordTypes ( + NodeID -- abstract, but newtype constructors cannot be hidden + , idBits + , getNodeID + , toNodeID + , NodeState (..) + , LocalNodeState (..) + , LocalNodeStateSTM + , RemoteNodeState (..) + , RealNode (..) + , RealNodeSTM + , setSuccessors + , setPredecessors + , NodeCache + , CacheEntry(..) + , RingEntry(..) + , RingMap(..) + , HasKeyID + , getKeyID + , rMapSize + , rMapLookup + , rMapLookupPred + , rMapLookupSucc + , addRMapEntry + , addRMapEntryWith + , addPredecessors + , addSuccessors + , takeRMapPredecessors + , takeRMapSuccessors + , deleteRMapEntry + , setRMapEntries + , rMapFromList + , rMapToList + , cacheGetNodeStateUnvalidated + , initCache + , cacheEntries + , cacheLookup + , cacheLookupSucc + , cacheLookupPred + , localCompare + , genNodeID + , genNodeIDBS + , genKeyID + , genKeyIDBS + , byteStringToUInteger + , ipAddrAsBS + , bsAsIpAddr + , FediChordConf(..) + ) where + +import Control.Exception +import Data.Foldable (foldr') +import Data.Function (on) +import Data.List (delete, nub, sortBy) +import qualified Data.Map.Strict as Map +import Data.Maybe (fromJust, fromMaybe, isJust, + isNothing, mapMaybe) +import qualified Data.Set as Set +import Data.Time.Clock.POSIX +import Network.Socket + +-- for hashing and ID conversion +import Control.Concurrent.STM +import Control.Concurrent.STM.TQueue +import Control.Concurrent.STM.TVar +import Control.Monad (forever) +import Crypto.Hash +import qualified Data.ByteArray as BA +import qualified Data.ByteString as BS +import qualified Data.ByteString.UTF8 as BSU +import Data.IP (IPv6, fromHostAddress6, + toHostAddress6) +import Data.Typeable (Typeable (..), typeOf) +import Data.Word +import qualified Network.ByteOrder as NetworkBytes + +import Hash2Pub.Utils + +import Debug.Trace (trace) + + + +-- define protocol constants +-- | static definition of ID length in bits +idBits :: Integer +idBits = 256 + +-- |NodeIDs are Integers wrapped in a newtype, to be able to redefine +-- their instance behaviour +-- +-- for being able to check value bounds, the constructor should not be used directly +-- and new values are created via @toNodeID@ (newtype constructors cannot be hidden) +newtype NodeID = NodeID { getNodeID :: Integer } deriving stock (Show, Eq) deriving newtype Enum + +-- |smart data constructor for NodeID that throws a runtime exception for out-of-bounds values. +-- When needing a runtime-safe constructor with drawbacks, try @fromInteger@ +toNodeID :: Integer -> NodeID +toNodeID i = assert (i >= getNodeID minBound && i <= getNodeID maxBound) $ NodeID i + +-- |NodeIDs are bounded by the value range of an unsigned Integer of length 'idBits' +instance Bounded NodeID where + minBound = NodeID 0 + maxBound = NodeID (2^idBits - 1) + +-- |calculations with NodeIDs are modular arithmetic operations +instance Num NodeID where + a + b = NodeID $ (getNodeID a + getNodeID b) `mod` (getNodeID maxBound + 1) + a * b = NodeID $ (getNodeID a * getNodeID b) `mod` (getNodeID maxBound + 1) + a - b = NodeID $ (getNodeID a - getNodeID b) `mod` (getNodeID maxBound + 1) + -- |safe constructor for NodeID values with the drawback, that out-of-bound values are wrapped around + -- with modulo to fit in the allowed value space. For runtime checking, look at @toNodeID@. + fromInteger i = NodeID $ i `mod` (getNodeID maxBound + 1) + signum = NodeID . signum . getNodeID + abs = NodeID . abs . getNodeID -- ToDo: make sure that at creation time only IDs within the range are used + +-- | use normal strict monotonic ordering of integers, realising the ring structure +-- is done in the @NodeCache@ implementation +instance Ord NodeID where + a `compare` b = getNodeID a `compare` getNodeID b + +-- | local comparison of 2 node IDs, only relevant for determining a successor or predecessor on caches with just 2 nodes +localCompare :: NodeID -> NodeID -> Ordering +a `localCompare` b + | getNodeID a == getNodeID b = EQ + | wayForwards > wayBackwards = GT + | otherwise = LT + where + wayForwards = getNodeID (b - a) + wayBackwards = getNodeID (a - b) + +-- | Data for managing the virtual server nodes of this real node. +-- Also contains shared data and config values. +-- TODO: more data structures for k-choices bookkeeping +data RealNode = RealNode + { vservers :: [LocalNodeStateSTM] + -- ^ references to all active versers + , nodeConfig :: FediChordConf + -- ^ holds the initial configuration read at program start + , bootstrapNodes :: [(String, PortNumber)] + -- ^ nodes to be used as bootstrapping points, new ones learned during operation + } + +type RealNodeSTM = TVar RealNode + +-- | represents a node and all its important state +data RemoteNodeState = RemoteNodeState + { nid :: NodeID + , domain :: String + -- ^ full public domain name the node is reachable under + , ipAddr :: HostAddress6 + -- the node's public IPv6 address + , dhtPort :: PortNumber + -- ^ port of the DHT itself + , servicePort :: PortNumber + -- ^ port of the service provided on top of the DHT + , vServerID :: Integer + -- ^ ID of this vserver + } + deriving (Show, Eq) + +instance Ord RemoteNodeState where + a `compare` b = nid a `compare` nid b + +-- | represents a node and encapsulates all data and parameters that are not present for remote nodes +data LocalNodeState = LocalNodeState + { nodeState :: RemoteNodeState + -- ^ represents common data present both in remote and local node representations + , nodeCacheSTM :: TVar NodeCache + -- ^ EpiChord node cache with expiry times for nodes + , cacheWriteQueue :: TQueue (NodeCache -> NodeCache) + -- ^ cache updates are not written directly to the 'nodeCache' but queued and + , successors :: [RemoteNodeState] -- could be a set instead as these are ordered as well + -- ^ successor nodes in ascending order by distance + , predecessors :: [RemoteNodeState] + -- ^ predecessor nodes in ascending order by distance + , kNeighbours :: Int + -- ^ desired length of predecessor and successor list + , lNumBestNodes :: Int + -- ^ number of best next hops to provide + , pNumParallelQueries :: Int + -- ^ number of parallel sent queries + , jEntriesPerSlice :: Int + -- ^ number of desired entries per cache slice + , parentRealNode :: RealNodeSTM + -- ^ the parent node managing this vserver instance + } + deriving (Show, Eq) + +-- | for concurrent access, LocalNodeState is wrapped in a TVar +type LocalNodeStateSTM = TVar LocalNodeState + +-- | class for various NodeState representations, providing +-- getters and setters for common values +class NodeState a where + -- getters for common properties + getNid :: a -> NodeID + getDomain :: a -> String + getIpAddr :: a -> HostAddress6 + getDhtPort :: a -> PortNumber + getServicePort :: a -> PortNumber + getVServerID :: a -> Integer + -- setters for common properties + setNid :: NodeID -> a -> a + setDomain :: String -> a -> a + setIpAddr :: HostAddress6 -> a -> a + setDhtPort :: PortNumber -> a -> a + setServicePort :: PortNumber -> a -> a + setVServerID :: Integer -> a -> a + toRemoteNodeState :: a -> RemoteNodeState + +instance NodeState RemoteNodeState where + getNid = nid + getDomain = domain + getIpAddr = ipAddr + getDhtPort = dhtPort + getServicePort = servicePort + getVServerID = vServerID + setNid nid' ns = ns {nid = nid'} + setDomain domain' ns = ns {domain = domain'} + setIpAddr ipAddr' ns = ns {ipAddr = ipAddr'} + setDhtPort dhtPort' ns = ns {dhtPort = dhtPort'} + setServicePort servicePort' ns = ns {servicePort = servicePort'} + setVServerID vServerID' ns = ns {vServerID = vServerID'} + toRemoteNodeState = id + +-- | helper function for setting values on the 'RemoteNodeState' contained in the 'LocalNodeState' +propagateNodeStateSet_ :: (RemoteNodeState -> RemoteNodeState) -> LocalNodeState -> LocalNodeState +propagateNodeStateSet_ func ns = let + newNs = func $ nodeState ns + in + ns {nodeState = newNs} + + +instance NodeState LocalNodeState where + getNid = getNid . nodeState + getDomain = getDomain . nodeState + getIpAddr = getIpAddr . nodeState + getDhtPort = getDhtPort . nodeState + getServicePort = getServicePort . nodeState + getVServerID = getVServerID . nodeState + setNid nid' = propagateNodeStateSet_ $ setNid nid' + setDomain domain' = propagateNodeStateSet_ $ setDomain domain' + setIpAddr ipAddr' = propagateNodeStateSet_ $ setIpAddr ipAddr' + setDhtPort dhtPort' = propagateNodeStateSet_ $ setDhtPort dhtPort' + setServicePort servicePort' = propagateNodeStateSet_ $ setServicePort servicePort' + setVServerID vServerID' = propagateNodeStateSet_ $ setVServerID vServerID' + toRemoteNodeState = nodeState + +-- | defining Show instances to be able to print NodeState for debug purposes +instance Typeable a => Show (TVar a) where + show x = show (typeOf x) + +instance Typeable a => Show (TQueue a) where + show x = show (typeOf x) + + +-- | convenience function that replaces the predecessors of a 'LocalNodeState' with the k closest nodes from the provided list +setPredecessors :: [RemoteNodeState] -> LocalNodeState -> LocalNodeState +setPredecessors preds ns = ns {predecessors = takeRMapPredecessors (getNid ns) (kNeighbours ns) . rMapFromList . filter ((/=) (getNid ns) . getNid) $ preds} + +-- | convenience function that replaces the successors of a 'LocalNodeState' with the k closest nodes from the provided list +setSuccessors :: [RemoteNodeState] -> LocalNodeState -> LocalNodeState +setSuccessors succs ns = ns {successors = takeRMapSuccessors (getNid ns) (kNeighbours ns) . rMapFromList . filter ((/=) (getNid ns) . getNid) $ succs} + +-- | sets the predecessors of a 'LocalNodeState' to the closest k nodes of the current predecessors and the provided list, combined +addPredecessors :: [RemoteNodeState] -> LocalNodeState -> LocalNodeState +addPredecessors preds ns = ns {predecessors = takeRMapPredecessors (getNid ns) (kNeighbours ns) . addRMapEntries (filter ((/=) (getNid ns) . getNid) preds) . rMapFromList $ predecessors ns} + +-- | sets the successors of a 'LocalNodeState' to the closest k nodes of the current successors and the provided list, combined +addSuccessors :: [RemoteNodeState] -> LocalNodeState -> LocalNodeState +addSuccessors succs ns = ns {successors = takeRMapSuccessors (getNid ns) (kNeighbours ns) . addRMapEntries (filter ((/=) (getNid ns) . getNid) succs) . rMapFromList $ successors ns} + +-- | Class for all types that can be identified via an EpiChord key. +-- Used for restricting the types a 'RingMap' can store +class (Eq a, Show a) => HasKeyID a where + getKeyID :: a -> NodeID + +instance HasKeyID RemoteNodeState where + getKeyID = getNid + +instance HasKeyID CacheEntry where + getKeyID (CacheEntry _ ns _) = getNid ns + +instance HasKeyID NodeID where + getKeyID = id + +type NodeCache = RingMap CacheEntry + +-- | generic data structure for holding elements with a key and modular lookup +newtype RingMap a = RingMap { getRingMap :: HasKeyID a => Map.Map NodeID (RingEntry a) } + +instance (HasKeyID a) => Eq (RingMap a) where + a == b = getRingMap a == getRingMap b + +instance (HasKeyID a) => Show (RingMap a) where + show rmap = shows "RingMap " (show $ getRingMap rmap) + +-- | entry of a 'RingMap' that holds a value and can also +-- wrap around the lookup direction at the edges of the name space. +data RingEntry a = KeyEntry a + | ProxyEntry (NodeID, ProxyDirection) (Maybe (RingEntry a)) + deriving (Show, Eq) + +-- | 'RingEntry' type for usage as a node cache +data CacheEntry = CacheEntry Bool RemoteNodeState POSIXTime + deriving (Show, Eq) + + +-- | as a compromise, only KeyEntry components are ordered by their NodeID +-- while ProxyEntry components should never be tried to be ordered. +instance (HasKeyID a, Eq a) => Ord (RingEntry a) where + a `compare` b = compare (extractID a) (extractID b) + where + extractID (KeyEntry e) = getKeyID e + extractID ProxyEntry{} = error "proxy entries should never appear outside of the RingMap" + +data ProxyDirection = Backwards + | Forwards + deriving (Show, Eq) + +instance Enum ProxyDirection where + toEnum (-1) = Backwards + toEnum 1 = Forwards + toEnum _ = error "no such ProxyDirection" + fromEnum Backwards = - 1 + fromEnum Forwards = 1 + +-- | helper function for getting the a from a RingEntry a +extractRingEntry :: HasKeyID a => RingEntry a -> Maybe a +extractRingEntry (KeyEntry entry) = Just entry +extractRingEntry (ProxyEntry _ (Just (KeyEntry entry))) = Just entry +extractRingEntry _ = Nothing + +--- useful function for getting entries for a full cache transfer +cacheEntries :: NodeCache -> [CacheEntry] +cacheEntries = mapMaybe extractRingEntry . Map.elems . getRingMap + +-- | An empty 'RingMap' needs to be initialised with 2 proxy entries, +-- linking the modular name space together by connecting @minBound@ and @maxBound@ +emptyRMap :: HasKeyID a => RingMap a +emptyRMap = RingMap . Map.fromList $ proxyEntry <$> [(maxBound, (minBound, Forwards)), (minBound, (maxBound, Backwards))] + where + proxyEntry (from,to) = (from, ProxyEntry to Nothing) + +initCache :: NodeCache +initCache = emptyRMap + +-- | Maybe returns the entry stored at given key +rMapLookup :: HasKeyID a + => NodeID -- ^lookup key + -> RingMap a -- ^lookup cache + -> Maybe a +rMapLookup key rmap = extractRingEntry =<< Map.lookup key (getRingMap rmap) + +cacheLookup :: NodeID -- ^lookup key + -> NodeCache -- ^lookup cache + -> Maybe CacheEntry +cacheLookup = rMapLookup + +-- | returns number of present 'KeyEntry' in a properly initialised 'RingMap' +rMapSize :: (HasKeyID a, Integral i) + => RingMap a + -> i +rMapSize rmap = fromIntegral $ Map.size innerMap - oneIfEntry minBound - oneIfEntry maxBound + where + innerMap = getRingMap rmap + oneIfEntry :: Integral i => NodeID -> i + oneIfEntry nid + | isNothing (rMapLookup nid rmap) = 1 + | otherwise = 0 + +-- | a wrapper around lookup functions, making the lookup redirectable by a @ProxyEntry@ +-- to simulate a modular ring +lookupWrapper :: HasKeyID a + => (NodeID -> Map.Map NodeID (RingEntry a) -> Maybe (NodeID, RingEntry a)) + -> (NodeID -> Map.Map NodeID (RingEntry a) -> Maybe (NodeID, RingEntry a)) + -> ProxyDirection + -> NodeID + -> RingMap a + -> Maybe a +lookupWrapper f fRepeat direction key rmap = + case f key $ getRingMap rmap of + -- the proxy entry found holds a + Just (_, ProxyEntry _ (Just (KeyEntry entry))) -> Just entry + -- proxy entry holds another proxy entry, this should not happen + Just (_, ProxyEntry _ (Just (ProxyEntry _ _))) -> Nothing + -- proxy entry without own entry is a pointer on where to continue + -- if lookup direction is the same as pointer direction: follow pointer + Just (foundKey, ProxyEntry (pointerID, pointerDirection) Nothing) -> + let newKey = if pointerDirection == direction + then pointerID + else foundKey + (fromInteger . toInteger . fromEnum $ direction) + in if rMapNotEmpty rmap + then lookupWrapper fRepeat fRepeat direction newKey rmap + else Nothing + -- normal entries are returned + Just (_, KeyEntry entry) -> Just entry + Nothing -> Nothing + where + rMapNotEmpty :: (HasKeyID a) => RingMap a -> Bool + rMapNotEmpty rmap' = (Map.size (getRingMap rmap') > 2) -- there are more than the 2 ProxyEntries + || isJust (rMapLookup minBound rmap') -- or one of the ProxyEntries holds a node + || isJust (rMapLookup maxBound rmap') + +-- | find the successor node to a given key on a modular EpiChord ring. +-- Note: The EpiChord definition of "successor" includes the node at the key itself, +-- if existing. +rMapLookupSucc :: HasKeyID a + => NodeID -- ^lookup key + -> RingMap a -- ^ring cache + -> Maybe a +rMapLookupSucc = lookupWrapper Map.lookupGE Map.lookupGE Forwards + +cacheLookupSucc :: NodeID -- ^lookup key + -> NodeCache -- ^ring cache + -> Maybe CacheEntry +cacheLookupSucc = rMapLookupSucc + +-- | find the predecessor node to a given key on a modular EpiChord ring. +rMapLookupPred :: HasKeyID a + => NodeID -- ^lookup key + -> RingMap a -- ^ring cache + -> Maybe a +rMapLookupPred = lookupWrapper Map.lookupLT Map.lookupLE Backwards + +cacheLookupPred :: NodeID -- ^lookup key + -> NodeCache -- ^ring cache + -> Maybe CacheEntry +cacheLookupPred = rMapLookupPred + +addRMapEntryWith :: HasKeyID a + => (RingEntry a -> RingEntry a -> RingEntry a) + -> a + -> RingMap a + -> RingMap a +addRMapEntryWith combineFunc entry = RingMap + . Map.insertWith combineFunc (getKeyID entry) (KeyEntry entry) + . getRingMap + +addRMapEntry :: HasKeyID a + => a + -> RingMap a + -> RingMap a +addRMapEntry = addRMapEntryWith insertCombineFunction + where + insertCombineFunction newVal oldVal = + case oldVal of + ProxyEntry n _ -> ProxyEntry n (Just newVal) + KeyEntry _ -> newVal + + +addRMapEntries :: (Foldable t, HasKeyID a) + => t a + -> RingMap a + -> RingMap a +addRMapEntries entries rmap = foldr' addRMapEntry rmap entries + +setRMapEntries :: (Foldable t, HasKeyID a) + => t a + -> RingMap a +setRMapEntries entries = addRMapEntries entries emptyRMap + +deleteRMapEntry :: (HasKeyID a) + => NodeID + -> RingMap a + -> RingMap a +deleteRMapEntry nid = RingMap . Map.update modifier nid . getRingMap + where + modifier (ProxyEntry idPointer _) = Just (ProxyEntry idPointer Nothing) + modifier KeyEntry {} = Nothing + +rMapToList :: (HasKeyID a) => RingMap a -> [a] +rMapToList = mapMaybe extractRingEntry . Map.elems . getRingMap + +rMapFromList :: (HasKeyID a) => [a] -> RingMap a +rMapFromList = setRMapEntries + +-- | takes up to i entries from a 'RingMap' by calling a getter function on a +-- *startAt* value and after that on the previously returned value. +-- Stops once i entries have been taken or an entry has been encountered twice +-- (meaning the ring has been traversed completely). +-- Forms the basis for 'takeRMapSuccessors' and 'takeRMapPredecessors'. +takeRMapEntries_ :: (HasKeyID a, Integral i) + => (NodeID -> RingMap a -> Maybe a) + -> NodeID + -> i + -> RingMap a + -> [a] +-- TODO: might be more efficient with dlists +takeRMapEntries_ getterFunc startAt num rmap = reverse $ + case getterFunc startAt rmap of + Nothing -> [] + Just anEntry -> takeEntriesUntil (getKeyID anEntry) (getKeyID anEntry) (num-1) [anEntry] + where + takeEntriesUntil havingReached previousEntry remaining takeAcc + | remaining <= 0 = takeAcc + | getKeyID (fromJust $ getterFunc previousEntry rmap) == havingReached = takeAcc + | otherwise = let (Just gotEntry) = getterFunc previousEntry rmap + in takeEntriesUntil havingReached (getKeyID gotEntry) (remaining-1) (gotEntry:takeAcc) + +takeRMapPredecessors :: (HasKeyID a, Integral i) + => NodeID + -> i + -> RingMap a + -> [a] +takeRMapPredecessors = takeRMapEntries_ rMapLookupPred + +takeRMapSuccessors :: (HasKeyID a, Integral i) + => NodeID + -> i + -> RingMap a + -> [a] +takeRMapSuccessors = takeRMapEntries_ rMapLookupSucc + +-- clean up cache entries: once now - entry > maxAge +-- transfer difference now - entry to other node + +-- | return the @NodeState@ data from a cache entry without checking its validation status +cacheGetNodeStateUnvalidated :: CacheEntry -> RemoteNodeState +cacheGetNodeStateUnvalidated (CacheEntry _ nState _) = nState + +-- | converts a 'HostAddress6' IP address to a big-endian strict ByteString +ipAddrAsBS :: HostAddress6 -> BS.ByteString +ipAddrAsBS (a, b, c, d) = mconcat $ fmap NetworkBytes.bytestring32 [a, b, c, d] + +-- | converts a ByteString in big endian order to an IPv6 address 'HostAddress6' +bsAsIpAddr :: BS.ByteString -> HostAddress6 +bsAsIpAddr bytes = (a,b,c,d) + where + a:b:c:d:_ = fmap NetworkBytes.word32 . chunkBytes 4 $ bytes + + +-- | generates a 256 bit long NodeID using SHAKE128, represented as ByteString +genNodeIDBS :: HostAddress6 -- ^a node's IPv6 address + -> String -- ^a node's 1st and 2nd level domain name + -> Word8 -- ^the used vserver ID + -> BS.ByteString -- ^the NodeID as a 256bit ByteString big-endian unsigned integer +genNodeIDBS ip nodeDomain vserver = + hashIpaddrUpper `BS.append` hashID nodeDomain' `BS.append` hashIpaddLower + where + vsBS = BS.pack [vserver] -- attention: only works for vserver IDs up to 255 + ipaddrNet = BS.take 8 (ipAddrAsBS ip) `BS.append` vsBS + nodeDomain' = BSU.fromString nodeDomain `BS.append` vsBS + hashID bstr = BS.pack . BA.unpack $ (hash bstr :: Digest (SHAKE128 128)) + (hashIpaddrUpper, hashIpaddLower) = BS.splitAt 64 $ hashID ipaddrNet + + +-- | generates a 256 bit long @NodeID@ using SHAKE128 +genNodeID :: HostAddress6 -- ^a node's IPv6 address + -> String -- ^a node's 1st and 2nd level domain name + -> Word8 -- ^the used vserver ID + -> NodeID -- ^the generated @NodeID@ +genNodeID ip nodeDomain vs = NodeID . byteStringToUInteger $ genNodeIDBS ip nodeDomain vs + +-- | generates a 256 bit long key identifier, represented as ByteString, for looking up its data on the DHT +genKeyIDBS :: String -- ^the key string + -> BS.ByteString -- ^the key ID represented as a @ByteString@ +genKeyIDBS key = BS.pack . BA.unpack $ (hash (BSU.fromString key) :: Digest SHA3_256) + +-- | generates a 256 bit long key identifier for looking up its data on the DHT +genKeyID :: String -- ^the key string + -> NodeID -- ^the key ID +genKeyID = NodeID . byteStringToUInteger . genKeyIDBS + + +-- | parses the bit pattern of a ByteString as an unsigned Integer in Big Endian order +-- by iterating it byte-wise from the back and shifting the byte values according to their offset +byteStringToUInteger :: BS.ByteString -> Integer +byteStringToUInteger bs = sum $ parsedBytes 0 bs + where + parsedBytes :: Integer -> BS.ByteString -> [ Integer ] + parsedBytes offset uintBs = case BS.unsnoc uintBs of + Nothing -> [] + Just (bs', w) -> parseWithOffset offset w : parsedBytes (offset+1) bs' + + parseWithOffset :: Integer -> Word8 -> Integer + parseWithOffset 0 word = toInteger word -- a shift of 0 is always 0 + parseWithOffset offset word = toInteger word * 2^(8 * offset) + +-- Todo: DHT backend can learn potential initial bootstrapping points through the instances mentioned in the received AP-relay messages +-- persist them on disk so they can be used for all following bootstraps + +-- | configuration values used for initialising the FediChord DHT +data FediChordConf = FediChordConf + { confDomain :: String + -- ^ the domain/ hostname the node is reachable under + , confIP :: HostAddress6 + -- ^ IP address of outgoing packets + , confDhtPort :: Int + -- ^ listening port for the FediChord DHT + , confBootstrapNodes :: [(String, PortNumber)] + -- ^ list of potential bootstrapping nodes + , confBootstrapSamplingInterval :: Int + -- ^ pause between sampling the own ID through bootstrap nodes, in seconds + } + deriving (Show, Eq) + + diff --git a/src/Hash2Pub/ProtocolTypes.hs b/src/Hash2Pub/ProtocolTypes.hs new file mode 100644 index 0000000..15cb863 --- /dev/null +++ b/src/Hash2Pub/ProtocolTypes.hs @@ -0,0 +1,101 @@ +module Hash2Pub.ProtocolTypes where + +import qualified Data.Map as Map +import Data.Maybe (mapMaybe) +import qualified Data.Set as Set +import Data.Time.Clock.POSIX (POSIXTime) + +import Hash2Pub.FediChordTypes + +data QueryResponse = FORWARD (Set.Set RemoteCacheEntry) + | FOUND RemoteNodeState + deriving (Show, Eq) + +-- === protocol serialisation data types + +data Action = QueryID + | Join + | Leave + | Stabilise + | Ping + deriving (Show, Eq, Enum) + +data FediChordMessage = Request + { requestID :: Integer + , sender :: RemoteNodeState + , part :: Integer + , isFinalPart :: Bool + -- ^ part starts at 1 + , action :: Action + , payload :: Maybe ActionPayload + } + | Response + { requestID :: Integer + , senderID :: NodeID + , part :: Integer + , isFinalPart :: Bool + , action :: Action + , payload :: Maybe ActionPayload + } + deriving (Show, Eq) + +instance Ord FediChordMessage where + compare a@Request{} b@Request{} | requestID a == requestID b = part a `compare` part b + | otherwise = requestID a `compare` requestID b + compare a@Response{} b@Response{} | requestID a == requestID b = part a `compare` part b + | otherwise = requestID a `compare` requestID b + -- comparing different constructor types always yields "not equal" + compare _ _ = LT + +data ActionPayload = QueryIDRequestPayload + { queryTargetID :: NodeID + , queryLBestNodes :: Integer + } + | JoinRequestPayload + | LeaveRequestPayload + { leaveSuccessors :: [RemoteNodeState] + , leavePredecessors :: [RemoteNodeState] + } + | StabiliseRequestPayload + | PingRequestPayload + | QueryIDResponsePayload + { queryResult :: QueryResponse + } + | JoinResponsePayload + { joinSuccessors :: [RemoteNodeState] + , joinPredecessors :: [RemoteNodeState] + , joinCache :: [RemoteCacheEntry] + } + | LeaveResponsePayload + | StabiliseResponsePayload + { stabiliseSuccessors :: [RemoteNodeState] + , stabilisePredecessors :: [RemoteNodeState] + } + | PingResponsePayload + { pingNodeStates :: [RemoteNodeState] + } + deriving (Show, Eq) + +-- | global limit of parts per message used when (de)serialising messages. +-- Used to limit the impact of DOS attempts with partial messages. +maximumParts :: Num a => a +maximumParts = 150 + +-- | dedicated data type for cache entries sent to or received from the network, +-- as these have to be considered as unvalidated. Also helps with separation of trust. +data RemoteCacheEntry = RemoteCacheEntry RemoteNodeState POSIXTime + deriving (Show, Eq) + +instance Ord RemoteCacheEntry where + (RemoteCacheEntry ns1 _) `compare` (RemoteCacheEntry ns2 _) = nid ns1 `compare` nid ns2 + +toRemoteCacheEntry :: CacheEntry -> RemoteCacheEntry +toRemoteCacheEntry (CacheEntry _ ns ts) = RemoteCacheEntry ns ts + +-- | a list of all entries of a 'NodeCache' as 'RemoteCacheEntry', useful for cache transfers +toRemoteCache :: NodeCache -> [RemoteCacheEntry] +toRemoteCache cache = toRemoteCacheEntry <$> cacheEntries cache + +-- | extract the 'NodeState' from a 'RemoteCacheEntry' +remoteNode :: RemoteCacheEntry -> RemoteNodeState +remoteNode (RemoteCacheEntry ns _) = ns diff --git a/test/FediChordSpec.hs b/test/FediChordSpec.hs index 50f0d66..1cace7a 100644 --- a/test/FediChordSpec.hs +++ b/test/FediChordSpec.hs @@ -1,13 +1,13 @@ {-# LANGUAGE OverloadedStrings #-} module FediChordSpec where +import Control.Concurrent.STM.TVar import Control.Exception -import Data.ASN1.Parse (runParseASN1) -import qualified Data.ByteString as BS -import Data.IORef -import qualified Data.Map.Strict as Map -import Data.Maybe (fromJust) -import qualified Data.Set as Set +import Data.ASN1.Parse (runParseASN1) +import qualified Data.ByteString as BS +import qualified Data.Map.Strict as Map +import Data.Maybe (fromJust, isJust) +import qualified Data.Set as Set import Data.Time.Clock.POSIX import Network.Socket import Test.Hspec @@ -15,6 +15,7 @@ import Test.Hspec import Hash2Pub.ASN1Coding import Hash2Pub.DHTProtocol import Hash2Pub.FediChord +import Hash2Pub.FediChordTypes spec :: Spec spec = do @@ -55,14 +56,13 @@ spec = do it "can be initialised" $ print exampleNodeState it "can be initialised partly and then modified later" $ do - let ns = NodeState { + let ns = RemoteNodeState { nid = undefined , domain = exampleNodeDomain , ipAddr = exampleIp , dhtPort = 2342 - , apPort = Nothing + , servicePort = 513 , vServerID = undefined - , internals = Nothing } nsReady = ns { nid = genNodeID (ipAddr ns) (domain ns) 3 @@ -81,8 +81,8 @@ spec = do newCache = addCacheEntryPure 10 (RemoteCacheEntry exampleNodeState 10) (addCacheEntryPure 10 (RemoteCacheEntry anotherNode 10) emptyCache) exampleID = nid exampleNodeState it "entries can be added to a node cache and looked up again" $ do - -- the cache includes 2 additional proxy elements right from the start - Map.size newCache - Map.size emptyCache `shouldBe` 2 + rMapSize emptyCache `shouldBe` 0 + rMapSize newCache `shouldBe` 2 -- normal entry lookup nid . cacheGetNodeStateUnvalidated <$> cacheLookup anotherID newCache `shouldBe` Just anotherID nid . cacheGetNodeStateUnvalidated <$> cacheLookup (anotherID+1) newCache `shouldBe` Nothing @@ -121,50 +121,72 @@ spec = do let emptyCache = initCache nid1 = toNodeID 2^(23::Integer)+1 - node1 = do - eln <- exampleLocalNode -- is at 2^23.00000017198264 = 8388609 - pure $ putPredecessors [nid4] $ eln {nid = nid1} + node1 = setPredecessors [node4] . setNid nid1 <$> exampleLocalNode nid2 = toNodeID 2^(230::Integer)+12 node2 = exampleNodeState { nid = nid2} nid3 = toNodeID 2^(25::Integer)+10 node3 = exampleNodeState { nid = nid3} nid4 = toNodeID 2^(9::Integer)+100 node4 = exampleNodeState { nid = nid4} - cacheWith2Entries :: IO NodeCache - cacheWith2Entries = addCacheEntryPure 10 <$> (RemoteCacheEntry <$> node1 <*> pure 10) <*> pure (addCacheEntryPure 10 (RemoteCacheEntry node2 10) emptyCache) - cacheWith4Entries = addCacheEntryPure 10 (RemoteCacheEntry node3 10) <$> (addCacheEntryPure 10 (RemoteCacheEntry node4 10) <$> cacheWith2Entries) - it "works on an empty cache" $ do + nid5 = toNodeID 2^(25::Integer)+100 + node5 = exampleNodeState { nid = nid5} + cacheWith2Entries :: NodeCache + cacheWith2Entries = addCacheEntryPure 10 (RemoteCacheEntry node5 10) (addCacheEntryPure 10 (RemoteCacheEntry node2 10) emptyCache) + cacheWith4Entries = addCacheEntryPure 10 (RemoteCacheEntry node3 10) (addCacheEntryPure 10 (RemoteCacheEntry node4 10) cacheWith2Entries) + it "unjoined nodes should never return themselfs" $ do + exampleLocalNodeAsRemote <- toRemoteNodeState <$> exampleLocalNode queryLocalCache <$> exampleLocalNode <*> pure emptyCache <*> pure 3 <*> pure (toNodeID 2^(9::Integer)+5) `shouldReturn` FORWARD Set.empty - queryLocalCache <$> exampleLocalNode <*> pure emptyCache <*> pure 1 <*> pure (toNodeID 2342) `shouldReturn` FORWARD Set.empty + (FORWARD fwSet) <- queryLocalCache <$> exampleLocalNode <*> pure cacheWith4Entries <*> pure 1 <*> (getNid <$> exampleLocalNode) + remoteNode (head $ Set.elems fwSet) `shouldBe` node4 + it "joined nodes do not fall back to the default" $ + queryLocalCache <$> node1 <*> pure emptyCache <*> pure 1 <*> pure (toNodeID 3) `shouldReturn` FORWARD Set.empty it "works on a cache with less entries than needed" $ do - (FORWARD nodeset) <- queryLocalCache <$> exampleLocalNode <*> cacheWith2Entries <*> pure 4 <*> pure (toNodeID 2^(9::Integer)+5) - Set.map (nid . remoteNode_) nodeset `shouldBe` Set.fromList [ nid1, nid2 ] + (FORWARD nodeset) <- queryLocalCache <$> node1 <*> pure cacheWith2Entries <*> pure 4 <*> pure (toNodeID 2^(9::Integer)+5) + Set.map (nid . remoteNode) nodeset `shouldBe` Set.fromList [ nid5, nid2 ] it "works on a cache with sufficient entries" $ do - (FORWARD nodeset1) <- queryLocalCache <$> exampleLocalNode <*> cacheWith4Entries <*> pure 3 <*> pure (toNodeID 2^(9::Integer)+5) - (FORWARD nodeset2) <- queryLocalCache <$> exampleLocalNode <*> cacheWith4Entries <*> pure 1 <*> pure (toNodeID 2^(9::Integer)+5) - Set.map (nid . remoteNode_) nodeset1 `shouldBe` Set.fromList [nid4, nid2, nid3] - Set.map (nid . remoteNode_) nodeset2 `shouldBe` Set.fromList [nid4] + (FORWARD nodeset1) <- queryLocalCache <$> node1 <*> pure cacheWith4Entries <*> pure 3 <*> pure (toNodeID 2^(9::Integer)+5) + (FORWARD nodeset2) <- queryLocalCache <$> node1 <*> pure cacheWith4Entries <*> pure 1 <*> pure (toNodeID 2^(9::Integer)+5) + Set.map (nid . remoteNode) nodeset1 `shouldBe` Set.fromList [nid4, nid2, nid5] + Set.map (nid . remoteNode) nodeset2 `shouldBe` Set.fromList [nid4] it "recognises the node's own responsibility" $ do - FOUND selfQueryRes <- queryLocalCache <$> node1 <*> cacheWith4Entries <*> pure 3 <*> pure nid1 - nid <$> node1 `shouldReturn` nid selfQueryRes - FOUND responsibilityResult <- queryLocalCache <$> node1 <*> cacheWith4Entries <*> pure 3 <*> pure (toNodeID 2^(22::Integer)) - nid <$> node1 `shouldReturn` nid responsibilityResult - it "does not fail on nodes without neighbours (initial state)" $ do - (FORWARD nodeset) <- queryLocalCache <$> exampleLocalNode <*> cacheWith4Entries <*> pure 3 <*> pure (toNodeID 11) - Set.map (nid . remoteNode_ ) nodeset `shouldBe` Set.fromList [nid4, nid2, nid3] + FOUND selfQueryRes <- queryLocalCache <$> node1 <*> pure cacheWith4Entries <*> pure 3 <*> pure nid1 + getNid <$> node1 `shouldReturn` getNid selfQueryRes + FOUND responsibilityResult <- queryLocalCache <$> node1 <*> pure cacheWith4Entries <*> pure 3 <*> pure (toNodeID 2^(22::Integer)) + getNid <$> node1 `shouldReturn` getNid responsibilityResult + describe "successors and predecessors do not disturb the ring characteristics of EpiChord operations (see #48)" $ do + let + emptyCache = initCache + -- implicitly relies on kNeighbours to be <= 3 + thisNid = toNodeID 1000 + thisNode = setNid thisNid <$> exampleLocalNode + nid2 = toNodeID 1003 + node2 = exampleNodeState { nid = nid2} + nid3 = toNodeID 1010 + node3 = exampleNodeState { nid = nid3} + nid4 = toNodeID 1020 + node4 = exampleNodeState { nid = nid4} + nid5 = toNodeID 1025 + node5 = exampleNodeState { nid = nid5} + allRemoteNodes = [node2, node3, node4, node5] + it "lookups also work for slices larger than 1/2 key space" $ do + node <- setSuccessors allRemoteNodes . setPredecessors allRemoteNodes <$> thisNode + -- do lookup on empty cache but with successors for a key > 1/2 key space + -- succeeding the node + queryLocalCache node emptyCache 1 (nid5 + 10) `shouldBe` FOUND (toRemoteNodeState node) + describe "Messages can be encoded to and decoded from ASN.1" $ do -- define test messages let - someNodeIDs = fmap fromInteger [3..12] + someNodes = fmap (flip setNid exampleNodeState . fromInteger) [3..12] qidReqPayload = QueryIDRequestPayload { queryTargetID = nid exampleNodeState , queryLBestNodes = 3 } jReqPayload = JoinRequestPayload lReqPayload = LeaveRequestPayload { - leaveSuccessors = someNodeIDs - , leavePredecessors = someNodeIDs + leaveSuccessors = someNodes + , leavePredecessors = someNodes } stabReqPayload = StabiliseRequestPayload pingReqPayload = PingRequestPayload @@ -178,8 +200,8 @@ spec = do ] } jResPayload = JoinResponsePayload { - joinSuccessors = someNodeIDs - , joinPredecessors = someNodeIDs + joinSuccessors = someNodes + , joinPredecessors = someNodes , joinCache = [ RemoteCacheEntry exampleNodeState (toEnum 23420001) , RemoteCacheEntry (exampleNodeState {nid = fromInteger (-5)}) (toEnum 0) @@ -187,7 +209,7 @@ spec = do } lResPayload = LeaveResponsePayload stabResPayload = StabiliseResponsePayload { - stabiliseSuccessors = someNodeIDs + stabiliseSuccessors = someNodes , stabilisePredecessors = [] } pingResPayload = PingResponsePayload { @@ -199,16 +221,16 @@ spec = do requestTemplate = Request { requestID = 2342 , sender = exampleNodeState - , parts = 1 , part = 1 + , isFinalPart = True , action = undefined , payload = undefined } responseTemplate = Response { - responseTo = 2342 + requestID = 2342 , senderID = nid exampleNodeState - , parts = 1 , part = 1 + , isFinalPart = True , action = undefined , payload = undefined } @@ -216,6 +238,12 @@ spec = do responseWith a pa = responseTemplate {action = a, payload = Just pa} encodeDecodeAndCheck msg = runParseASN1 parseMessage (encodeMessage msg) `shouldBe` pure msg + largeMessage = responseWith Join $ JoinResponsePayload { + joinSuccessors = flip setNid exampleNodeState . fromInteger <$> [-20..150] + , joinPredecessors = flip setNid exampleNodeState . fromInteger <$> [5..11] + , joinCache = [ RemoteCacheEntry (exampleNodeState {nid = node}) 290001 | node <- [50602,506011..60000]] + } + it "messages are encoded and decoded correctly from and to ASN1" $ do encodeDecodeAndCheck $ requestWith QueryID qidReqPayload encodeDecodeAndCheck $ requestWith Join jReqPayload @@ -231,35 +259,54 @@ spec = do it "messages are encoded and decoded to ASN.1 DER properly" $ deserialiseMessage (fromJust $ Map.lookup 1 (serialiseMessage 652 $ responseWith Ping pingResPayload)) `shouldBe` Right (responseWith Ping pingResPayload) it "messages too large for a single packet can (often) be split into multiple parts" $ do - let largeMessage = responseWith Join $ JoinResponsePayload { - joinSuccessors = fromInteger <$> [-20..150] - , joinPredecessors = fromInteger <$> [5..11] - , joinCache = [ RemoteCacheEntry (exampleNodeState {nid = node}) 290001 | node <- [50602,506011..60000]] - } -- TODO: once splitting works more efficient, test for exact number or payload, see #18 length (serialiseMessage 600 largeMessage) > 1 `shouldBe` True - length (serialiseMessage 6000 largeMessage) `shouldBe` 1 + length (serialiseMessage 60000 largeMessage) `shouldBe` 1 + it "message part numbering starts at the submitted part number" $ do + isJust (Map.lookup 1 (serialiseMessage 600 largeMessage)) `shouldBe` True + let startAt5 = serialiseMessage 600 (largeMessage {part = 5}) + Map.lookup 1 startAt5 `shouldBe` Nothing + part <$> (deserialiseMessage . fromJust) (Map.lookup 5 startAt5) `shouldBe` Right 5 + describe "join cache lookup" $ + it "A bootstrap cache initialised with just one node returns that one." $ do + let + bootstrapNid = toNodeID 34804191837661041451755206127000721433747285589603756490902196113256157045194 + bootstrapNode = setNid bootstrapNid exampleNodeState + bootstrapCache = addCacheEntryPure 10 (RemoteCacheEntry bootstrapNode 19) initCache + ownId = toNodeID 34804191837661041451755206127000721433707928516052624394829818586723613390165 + ownNode <- setNid ownId <$> exampleLocalNode + let (FORWARD qResult) = queryLocalCache ownNode bootstrapCache 2 ownId + remoteNode (head $ Set.elems qResult) `shouldBe` bootstrapNode + -- some example data -exampleNodeState :: NodeState -exampleNodeState = NodeState { +exampleNodeState :: RemoteNodeState +exampleNodeState = RemoteNodeState { nid = toNodeID 12 , domain = exampleNodeDomain , ipAddr = exampleIp , dhtPort = 2342 - , apPort = Nothing + , servicePort = 513 , vServerID = 0 - , internals = Nothing } -exampleLocalNode :: IO NodeState -exampleLocalNode = nodeStateInit $ FediChordConf { +exampleLocalNode :: IO LocalNodeState +exampleLocalNode = nodeStateInit =<< (newTVarIO $ RealNode { + vservers = [] + , nodeConfig = exampleFediConf + , bootstrapNodes = confBootstrapNodes exampleFediConf + }) + + +exampleFediConf :: FediChordConf +exampleFediConf = FediChordConf { confDomain = "example.social" , confIP = exampleIp , confDhtPort = 2342 } + exampleNodeDomain :: String exampleNodeDomain = "example.social" exampleVs :: (Integral i) => i