Compare commits

..

5 commits

Author SHA1 Message Date
Trolli Schmittlauch b46f66e2c0 update Readme with latest branch name and pointer to SocialHub 2021-08-16 20:12:53 +02:00
Trolli Schmittlauch ea14ff9b09 update ghc to 8.6.4, nixpkgs base to 20.09
- relaxes some version constraints as dirty update quickfix
- removes hie integration as that project is abandoned, todo: switch to
  haskell-languageserver instead
2021-01-01 14:33:03 +01:00
Trolli Schmittlauch 9d8df6d3d8 make the multithread-runtime use all cores by default 2020-10-02 02:36:02 +02:00
Trolli Schmittlauch d7355aa04d increase HTTP timeout for initial post publication to 60 seconds
After a while, experiments made some publication events time-out.
Increasing the timeout just in case, although it i likely to be a mere symptom but the core fault.
2020-09-22 19:47:39 +02:00
Trolli Schmittlauch d8b2186016 make inclusion of HIE overlay conditional as well 2020-09-21 22:16:02 +02:00
13 changed files with 424 additions and 1099 deletions

View file

@ -6,22 +6,20 @@ Domain ::= VisibleString
Partnum ::= INTEGER (0..150) Partnum ::= INTEGER (0..150)
Action ::= ENUMERATED {queryID, join, leave, stabilise, ping, queryLoad} Action ::= ENUMERATED {queryID, join, leave, stabilise, ping}
Request ::= SEQUENCE { Request ::= SEQUENCE {
action Action, action Action,
requestID INTEGER (0..4294967295), -- arbitrarily restricting to an unsigned 32bit integer requestID INTEGER (0..4294967295), -- arbitrarily restricting to an unsigned 32bit integer
receiverID NodeID,
sender NodeState, sender NodeState,
part Partnum, -- part number of this message, starts at 1 part Partnum, -- part number of this message, starts at 1
finalPart BOOLEAN, -- flag indicating this `part` to be the last of this reuest finalPart BOOLEAN, -- flag indicating this `part` to be the last of this reuest
actionPayload CHOICE { actionPayload CHOICE {
queryIDRequestPayload QueryIDRequestPayload, queryIDRequestPayload QueryIDRequestPayload,
joinRequestPayload JoinRequestPayload, joinRequestPayload JoinRequestPayload,
leaveRequestPayload LeaveRequestPayload, leaveRequestPayload LeaveRequestPayload,
stabiliseRequestPayload StabiliseRequestPayload, stabiliseRequestPayload StabiliseRequestPayload,
pingRequestPayload PingRequestPayload, pingRequestPayload PingRequestPayload
loadRequestPayload LoadRequestPayload
} OPTIONAL -- just for symmetry reasons with response, requests without a payload have no meaning } OPTIONAL -- just for symmetry reasons with response, requests without a payload have no meaning
} }
@ -36,12 +34,11 @@ Response ::= SEQUENCE {
finalPart BOOLEAN, -- flag indicating this `part` to be the last of this response finalPart BOOLEAN, -- flag indicating this `part` to be the last of this response
action Action, action Action,
actionPayload CHOICE { actionPayload CHOICE {
queryIDResponsePayload QueryIDResponsePayload, queryIDResponsePayload QueryIDResponsePayload,
joinResponsePayload JoinResponsePayload, joinResponsePayload JoinResponsePayload,
leaveResponsePayload LeaveResponsePayload, leaveResponsePayload LeaveResponsePayload,
stabiliseResponsePayload StabiliseResponsePayload, stabiliseResponsePayload StabiliseResponsePayload,
pingResponsePayload PingResponsePayload, pingResponsePayload PingResponsePayload
loadResponsePayload LoadResponsePayload
} OPTIONAL -- no payload when just ACKing a previous request } OPTIONAL -- no payload when just ACKing a previous request
} }
@ -104,15 +101,5 @@ PingRequestPayload ::= NULL -- do not include a node/ vserver ID, so that
-- learning all active vserver IDs handled by the server at once -- learning all active vserver IDs handled by the server at once
PingResponsePayload ::= SEQUENCE OF NodeState PingResponsePayload ::= SEQUENCE OF NodeState
LoadRequestPayload ::= SEQUENCE {
upperSegmentBound NodeID
}
LoadResponsePayload ::= SEQUENCE {
loadSum REAL,
remainingLoadTarget REAL,
totalCapacity REAL,
lowerBound NodeID
}
END END

View file

@ -91,7 +91,7 @@ executable Hash2Pub
-- Base language which the package is written in. -- Base language which the package is written in.
default-language: Haskell2010 default-language: Haskell2010
ghc-options: -threaded ghc-options: -threaded -rtsopts -with-rtsopts=-N
executable Experiment executable Experiment
-- experiment runner -- experiment runner

View file

@ -1,7 +1,7 @@
# Hash2Pub # Hash2Pub
***This is heavily WIP and does not provide any useful functionality yet***. ***This is heavily WIP and does not provide any useful functionality yet***.
I aim for always having the master branch at a state where it builds and tests pass. I aim for always having the `mainline` branch in a state where it builds and tests pass.
A fully-decentralised relay for global hashtag federation in [ActivityPub](https://activitypub.rocks) based on a distributed hash table. A fully-decentralised relay for global hashtag federation in [ActivityPub](https://activitypub.rocks) based on a distributed hash table.
It allows querying and subscribing to all posts of a certain hashtag and is implemented in Haskell. It allows querying and subscribing to all posts of a certain hashtag and is implemented in Haskell.
@ -10,6 +10,8 @@ This is the practical implementation of the concept presented in the paper [Dece
The ASN.1 module schema used for DHT messages can be found in `FediChord.asn1`. The ASN.1 module schema used for DHT messages can be found in `FediChord.asn1`.
For further questions and discussins, please refer to the **Hash2Pub topic in [SocialHub](https://socialhub.activitypub.rocks/c/software/hash2pub/48)**.
## Building ## Building
The project and its developent environment are built with [Nix](https://nixos.org/nix/). The project and its developent environment are built with [Nix](https://nixos.org/nix/).

View file

@ -40,7 +40,7 @@ executeSchedule :: Int -- ^ speedup factor
-> IO () -> IO ()
executeSchedule speedup events = do executeSchedule speedup events = do
-- initialise HTTP manager -- initialise HTTP manager
httpMan <- HTTP.newManager HTTP.defaultManagerSettings httpMan <- HTTP.newManager $ HTTP.defaultManagerSettings { HTTP.managerResponseTimeout = HTTP.responseTimeoutMicro 60000000 }
forM_ events $ \(delay, tag, (pubHost, pubPort)) -> do forM_ events $ \(delay, tag, (pubHost, pubPort)) -> do
_ <- forkIO $ _ <- forkIO $
clientPublishPost httpMan pubHost pubPort ("foobar #" <> tag) clientPublishPost httpMan pubHost pubPort ("foobar #" <> tag)

View file

@ -18,20 +18,38 @@ main = do
-- ToDo: parse and pass config -- ToDo: parse and pass config
-- probably use `tomland` for that -- probably use `tomland` for that
(fConf, sConf) <- readConfig (fConf, sConf) <- readConfig
-- TODO: first initialise 'RealNode', then the vservers
-- ToDo: load persisted caches, bootstrapping nodes … -- ToDo: load persisted caches, bootstrapping nodes …
(fediThreads, thisNode) <- fediChordInit fConf (runService sConf :: DHT d => d -> IO (PostService d)) (serverSock, thisNode) <- fediChordInit fConf (runService sConf :: DHT d => d -> IO (PostService d))
-- wait for all DHT threads to terminate, this keeps the main thread running -- currently no masking is necessary, as there is nothing to clean up
wait fediThreads nodeCacheWriterThread <- forkIO $ nodeCacheWriter thisNode
-- try joining the DHT using one of the provided bootstrapping nodes
joinedState <- tryBootstrapJoining thisNode
either (\err -> do
-- handle unsuccessful join
putStrLn $ err <> " Error joining, start listening for incoming requests anyways"
print =<< readTVarIO thisNode
-- launch thread attempting to join on new cache entries
_ <- forkIO $ joinOnNewEntriesThread thisNode
wait =<< async (fediMainThreads serverSock thisNode)
)
(\joinedNS -> do
-- launch main eventloop with successfully joined state
putStrLn "successful join"
wait =<< async (fediMainThreads serverSock thisNode)
)
joinedState
pure ()
readConfig :: IO (FediChordConf, ServiceConf) readConfig :: IO (FediChordConf, ServiceConf)
readConfig = do readConfig = do
confDomainString : ipString : portString : servicePortString : speedupString : loadBalancingEnabled : remainingArgs <- getArgs confDomainString : ipString : portString : servicePortString : speedupString : remainingArgs <- getArgs
-- allow starting the initial node without bootstrapping info to avoid -- allow starting the initial node without bootstrapping info to avoid
-- waiting for timeout -- waiting for timeout
let let
speedup = read speedupString speedup = read speedupString
statsEvalD = 120 * 10^6 `div` speedup
confBootstrapNodes' = case remainingArgs of confBootstrapNodes' = case remainingArgs of
bootstrapHost : bootstrapPortString : _ -> bootstrapHost : bootstrapPortString : _ ->
[(bootstrapHost, read bootstrapPortString)] [(bootstrapHost, read bootstrapPortString)]
@ -49,11 +67,6 @@ readConfig = do
, confResponsePurgeAge = 60 / fromIntegral speedup , confResponsePurgeAge = 60 / fromIntegral speedup
, confRequestTimeout = 5 * 10^6 `div` speedup , confRequestTimeout = 5 * 10^6 `div` speedup
, confRequestRetries = 3 , confRequestRetries = 3
, confEnableKChoices = loadBalancingEnabled /= "off"
, confKChoicesOverload = 0.9
, confKChoicesUnderload = 0.1
, confKChoicesMaxVS = 8
, confKChoicesRebalanceInterval = round (realToFrac statsEvalD * 1.1 :: Double)
} }
sConf = ServiceConf sConf = ServiceConf
{ confSubscriptionExpiryTime = 24*3600 / fromIntegral speedup { confSubscriptionExpiryTime = 24*3600 / fromIntegral speedup
@ -61,7 +74,7 @@ readConfig = do
, confServiceHost = confDomainString , confServiceHost = confDomainString
, confLogfilePath = "../simulationData/logs/" <> confDomainString <> ".log" , confLogfilePath = "../simulationData/logs/" <> confDomainString <> ".log"
, confSpeedupFactor = speedup , confSpeedupFactor = speedup
, confStatsEvalDelay = statsEvalD , confStatsEvalDelay = 120 * 10^6 `div` speedup
} }
pure (fConf, sConf) pure (fConf, sConf)

View file

@ -184,19 +184,6 @@ encodePayload payload'@PingResponsePayload{} =
Start Sequence Start Sequence
: concatMap encodeNodeState (pingNodeStates payload') : concatMap encodeNodeState (pingNodeStates payload')
<> [End Sequence] <> [End Sequence]
encodePayload payload'@LoadRequestPayload{} =
[ Start Sequence
, IntVal . getNodeID $ loadSegmentUpperBound payload'
, End Sequence
]
encodePayload payload'@LoadResponsePayload{} =
[ Start Sequence
, Real $ loadSum payload'
, Real $ loadRemainingTarget payload'
, Real $ loadTotalCapacity payload'
, IntVal . getNodeID $ loadSegmentLowerBound payload'
, End Sequence
]
encodeNodeState :: NodeState a => a -> [ASN1] encodeNodeState :: NodeState a => a -> [ASN1]
encodeNodeState ns = [ encodeNodeState ns = [
@ -206,7 +193,7 @@ encodeNodeState ns = [
, OctetString (ipAddrAsBS $ getIpAddr ns) , OctetString (ipAddrAsBS $ getIpAddr ns)
, IntVal (toInteger . getDhtPort $ ns) , IntVal (toInteger . getDhtPort $ ns)
, IntVal (toInteger . getServicePort $ ns) , IntVal (toInteger . getServicePort $ ns)
, IntVal (toInteger $ getVServerID ns) , IntVal (getVServerID ns)
, End Sequence , End Sequence
] ]
@ -228,11 +215,10 @@ encodeQueryResult FORWARD{} = Enumerated 1
encodeMessage :: FediChordMessage -- ^ the 'FediChordMessage to be encoded encodeMessage :: FediChordMessage -- ^ the 'FediChordMessage to be encoded
-> [ASN1] -> [ASN1]
encodeMessage encodeMessage
(Request requestID receiverID sender part isFinalPart action requestPayload) = (Request requestID sender part isFinalPart action requestPayload) =
Start Sequence Start Sequence
: (Enumerated . fromIntegral . fromEnum $ action) : (Enumerated . fromIntegral . fromEnum $ action)
: IntVal requestID : IntVal requestID
: (IntVal . getNodeID $ receiverID)
: encodeNodeState sender : encodeNodeState sender
<> [IntVal part <> [IntVal part
, Boolean isFinalPart] , Boolean isFinalPart]
@ -276,20 +262,18 @@ parseMessage = do
parseRequest :: Action -> ParseASN1 FediChordMessage parseRequest :: Action -> ParseASN1 FediChordMessage
parseRequest action = do parseRequest action = do
requestID <- parseInteger requestID <- parseInteger
receiverID' <- fromInteger <$> parseInteger
sender <- parseNodeState sender <- parseNodeState
part <- parseInteger part <- parseInteger
isFinalPart <- parseBool isFinalPart <- parseBool
hasPayload <- hasNext hasPayload <- hasNext
payload <- if not hasPayload then pure Nothing else Just <$> case action of payload <- if not hasPayload then pure Nothing else Just <$> case action of
QueryID -> parseQueryIDRequestPayload QueryID -> parseQueryIDRequest
Join -> parseJoinRequestPayload Join -> parseJoinRequest
Leave -> parseLeaveRequestPayload Leave -> parseLeaveRequest
Stabilise -> parseStabiliseRequestPayload Stabilise -> parseStabiliseRequest
Ping -> parsePingRequestPayload Ping -> parsePingRequest
QueryLoad -> parseLoadRequestPayload
pure $ Request requestID receiverID' sender part isFinalPart action payload pure $ Request requestID sender part isFinalPart action payload
parseResponse :: Integer -> ParseASN1 FediChordMessage parseResponse :: Integer -> ParseASN1 FediChordMessage
parseResponse requestID = do parseResponse requestID = do
@ -299,12 +283,11 @@ parseResponse requestID = do
action <- parseEnum :: ParseASN1 Action action <- parseEnum :: ParseASN1 Action
hasPayload <- hasNext hasPayload <- hasNext
payload <- if not hasPayload then pure Nothing else Just <$> case action of payload <- if not hasPayload then pure Nothing else Just <$> case action of
QueryID -> parseQueryIDResponsePayload QueryID -> parseQueryIDResponse
Join -> parseJoinResponsePayload Join -> parseJoinResponse
Leave -> parseLeaveResponsePayload Leave -> parseLeaveResponse
Stabilise -> parseStabiliseResponsePayload Stabilise -> parseStabiliseResponse
Ping -> parsePingResponsePayload Ping -> parsePingResponse
QueryLoad -> parseLoadResponsePayload
pure $ Response requestID senderID part isFinalPart action payload pure $ Response requestID senderID part isFinalPart action payload
@ -322,13 +305,6 @@ parseInteger = do
IntVal parsed -> pure parsed IntVal parsed -> pure parsed
x -> throwParseError $ "Expected IntVal but got " <> show x x -> throwParseError $ "Expected IntVal but got " <> show x
parseReal :: ParseASN1 Double
parseReal = do
i <- getNext
case i of
Real parsed -> pure parsed
x -> throwParseError $ "Expected Real but got " <> show x
parseEnum :: Enum a => ParseASN1 a parseEnum :: Enum a => ParseASN1 a
parseEnum = do parseEnum = do
e <- getNext e <- getNext
@ -370,7 +346,7 @@ parseNodeState = onNextContainer Sequence $ do
, domain = domain' , domain = domain'
, dhtPort = dhtPort' , dhtPort = dhtPort'
, servicePort = servicePort' , servicePort = servicePort'
, vServerID = fromInteger vServer' , vServerID = vServer'
, ipAddr = ip' , ipAddr = ip'
} }
@ -384,13 +360,13 @@ parseCacheEntry = onNextContainer Sequence $ do
parseNodeCache :: ParseASN1 [RemoteCacheEntry] parseNodeCache :: ParseASN1 [RemoteCacheEntry]
parseNodeCache = onNextContainer Sequence $ getMany parseCacheEntry parseNodeCache = onNextContainer Sequence $ getMany parseCacheEntry
parseJoinRequestPayload :: ParseASN1 ActionPayload parseJoinRequest :: ParseASN1 ActionPayload
parseJoinRequestPayload = do parseJoinRequest = do
parseNull parseNull
pure JoinRequestPayload pure JoinRequestPayload
parseJoinResponsePayload :: ParseASN1 ActionPayload parseJoinResponse :: ParseASN1 ActionPayload
parseJoinResponsePayload = onNextContainer Sequence $ do parseJoinResponse = onNextContainer Sequence $ do
succ' <- onNextContainer Sequence (getMany parseNodeState) succ' <- onNextContainer Sequence (getMany parseNodeState)
pred' <- onNextContainer Sequence (getMany parseNodeState) pred' <- onNextContainer Sequence (getMany parseNodeState)
cache <- parseNodeCache cache <- parseNodeCache
@ -400,8 +376,8 @@ parseJoinResponsePayload = onNextContainer Sequence $ do
, joinCache = cache , joinCache = cache
} }
parseQueryIDRequestPayload :: ParseASN1 ActionPayload parseQueryIDRequest :: ParseASN1 ActionPayload
parseQueryIDRequestPayload = onNextContainer Sequence $ do parseQueryIDRequest = onNextContainer Sequence $ do
targetID <- fromInteger <$> parseInteger targetID <- fromInteger <$> parseInteger
lBestNodes <- parseInteger lBestNodes <- parseInteger
pure $ QueryIDRequestPayload { pure $ QueryIDRequestPayload {
@ -409,8 +385,8 @@ parseQueryIDRequestPayload = onNextContainer Sequence $ do
, queryLBestNodes = lBestNodes , queryLBestNodes = lBestNodes
} }
parseQueryIDResponsePayload :: ParseASN1 ActionPayload parseQueryIDResponse :: ParseASN1 ActionPayload
parseQueryIDResponsePayload = onNextContainer Sequence $ do parseQueryIDResponse = onNextContainer Sequence $ do
Enumerated resultType <- getNext Enumerated resultType <- getNext
result <- case resultType of result <- case resultType of
0 -> FOUND <$> parseNodeState 0 -> FOUND <$> parseNodeState
@ -420,13 +396,13 @@ parseQueryIDResponsePayload = onNextContainer Sequence $ do
queryResult = result queryResult = result
} }
parseStabiliseRequestPayload :: ParseASN1 ActionPayload parseStabiliseRequest :: ParseASN1 ActionPayload
parseStabiliseRequestPayload = do parseStabiliseRequest = do
parseNull parseNull
pure StabiliseRequestPayload pure StabiliseRequestPayload
parseStabiliseResponsePayload :: ParseASN1 ActionPayload parseStabiliseResponse :: ParseASN1 ActionPayload
parseStabiliseResponsePayload = onNextContainer Sequence $ do parseStabiliseResponse = onNextContainer Sequence $ do
succ' <- onNextContainer Sequence (getMany parseNodeState) succ' <- onNextContainer Sequence (getMany parseNodeState)
pred' <- onNextContainer Sequence (getMany parseNodeState) pred' <- onNextContainer Sequence (getMany parseNodeState)
pure $ StabiliseResponsePayload { pure $ StabiliseResponsePayload {
@ -434,8 +410,8 @@ parseStabiliseResponsePayload = onNextContainer Sequence $ do
, stabilisePredecessors = pred' , stabilisePredecessors = pred'
} }
parseLeaveRequestPayload :: ParseASN1 ActionPayload parseLeaveRequest :: ParseASN1 ActionPayload
parseLeaveRequestPayload = onNextContainer Sequence $ do parseLeaveRequest = onNextContainer Sequence $ do
succ' <- onNextContainer Sequence (getMany parseNodeState) succ' <- onNextContainer Sequence (getMany parseNodeState)
pred' <- onNextContainer Sequence (getMany parseNodeState) pred' <- onNextContainer Sequence (getMany parseNodeState)
doMigration <- parseBool doMigration <- parseBool
@ -445,40 +421,19 @@ parseLeaveRequestPayload = onNextContainer Sequence $ do
, leaveDoMigration = doMigration , leaveDoMigration = doMigration
} }
parseLeaveResponsePayload :: ParseASN1 ActionPayload parseLeaveResponse :: ParseASN1 ActionPayload
parseLeaveResponsePayload = do parseLeaveResponse = do
parseNull parseNull
pure LeaveResponsePayload pure LeaveResponsePayload
parsePingRequestPayload :: ParseASN1 ActionPayload parsePingRequest :: ParseASN1 ActionPayload
parsePingRequestPayload = do parsePingRequest = do
parseNull parseNull
pure PingRequestPayload pure PingRequestPayload
parsePingResponsePayload :: ParseASN1 ActionPayload parsePingResponse :: ParseASN1 ActionPayload
parsePingResponsePayload = onNextContainer Sequence $ do parsePingResponse = onNextContainer Sequence $ do
handledNodes <- getMany parseNodeState handledNodes <- getMany parseNodeState
pure $ PingResponsePayload { pure $ PingResponsePayload {
pingNodeStates = handledNodes pingNodeStates = handledNodes
} }
parseLoadRequestPayload :: ParseASN1 ActionPayload
parseLoadRequestPayload = onNextContainer Sequence $ do
loadUpperBound' <- fromInteger <$> parseInteger
pure LoadRequestPayload
{ loadSegmentUpperBound = loadUpperBound'
}
parseLoadResponsePayload :: ParseASN1 ActionPayload
parseLoadResponsePayload = onNextContainer Sequence $ do
loadSum' <- parseReal
loadRemainingTarget' <- parseReal
loadTotalCapacity' <- parseReal
loadSegmentLowerBound' <- fromInteger <$> parseInteger
pure LoadResponsePayload
{ loadSum = loadSum'
, loadRemainingTarget = loadRemainingTarget'
, loadTotalCapacity = loadTotalCapacity'
, loadSegmentLowerBound = loadSegmentLowerBound'
}

View file

@ -15,7 +15,6 @@ module Hash2Pub.DHTProtocol
, Action(..) , Action(..)
, ActionPayload(..) , ActionPayload(..)
, FediChordMessage(..) , FediChordMessage(..)
, mkRequest
, maximumParts , maximumParts
, sendQueryIdMessages , sendQueryIdMessages
, requestQueryID , requestQueryID
@ -23,7 +22,6 @@ module Hash2Pub.DHTProtocol
, requestLeave , requestLeave
, requestPing , requestPing
, requestStabilise , requestStabilise
, requestQueryLoad
, lookupMessage , lookupMessage
, sendRequestTo , sendRequestTo
, queryIdLookupLoop , queryIdLookupLoop
@ -38,7 +36,7 @@ module Hash2Pub.DHTProtocol
, isPossibleSuccessor , isPossibleSuccessor
, isPossiblePredecessor , isPossiblePredecessor
, isInOwnResponsibilitySlice , isInOwnResponsibilitySlice
, vsIsJoined , isJoined
, closestCachePredecessors , closestCachePredecessors
) )
where where
@ -51,8 +49,7 @@ import Control.Concurrent.STM.TQueue
import Control.Concurrent.STM.TVar import Control.Concurrent.STM.TVar
import Control.Exception import Control.Exception
import Control.Monad (foldM, forM, forM_, void, when) import Control.Monad (foldM, forM, forM_, void, when)
import Control.Monad.Except (MonadError (..), liftEither, import Control.Monad.Except (MonadError (..), runExceptT)
runExceptT)
import Control.Monad.IO.Class (MonadIO (..)) import Control.Monad.IO.Class (MonadIO (..))
import qualified Data.ByteString as BS import qualified Data.ByteString as BS
import Data.Either (rights) import Data.Either (rights)
@ -66,7 +63,6 @@ import Data.Maybe (fromJust, fromMaybe, isJust,
isNothing, mapMaybe, maybe) isNothing, mapMaybe, maybe)
import qualified Data.Set as Set import qualified Data.Set as Set
import Data.Time.Clock.POSIX import Data.Time.Clock.POSIX
import Data.Word (Word8)
import Network.Socket hiding (recv, recvFrom, send, import Network.Socket hiding (recv, recvFrom, send,
sendTo) sendTo)
import Network.Socket.ByteString import Network.Socket.ByteString
@ -78,27 +74,23 @@ import Hash2Pub.ASN1Coding
import Hash2Pub.FediChordTypes (CacheEntry (..), import Hash2Pub.FediChordTypes (CacheEntry (..),
CacheEntry (..), CacheEntry (..),
FediChordConf (..), FediChordConf (..),
HasKeyID (..), LoadStats (..), HasKeyID (..),
LocalNodeState (..), LocalNodeState (..),
LocalNodeStateSTM, NodeCache, LocalNodeStateSTM, NodeCache,
NodeID, NodeState (..), NodeID, NodeState (..),
RealNode (..), RealNodeSTM, RealNode (..), RealNodeSTM,
RemoteNodeState (..), RemoteNodeState (..),
RingEntry (..), RingMap (..), RingEntry (..), RingMap (..),
SegmentLoadStats (..),
Service (..), addRMapEntry, Service (..), addRMapEntry,
addRMapEntryWith, addRMapEntryWith,
cacheGetNodeStateUnvalidated, cacheGetNodeStateUnvalidated,
cacheLookup, cacheLookupPred, cacheLookup, cacheLookupPred,
cacheLookupSucc, genNodeID, cacheLookupSucc, genNodeID,
getKeyID, hasValidNodeId, getKeyID, localCompare,
loadSliceSum, localCompare,
rMapFromList, rMapLookupPred, rMapFromList, rMapLookupPred,
rMapLookupSucc, rMapLookupSucc,
remainingLoadTarget,
setPredecessors, setSuccessors) setPredecessors, setSuccessors)
import Hash2Pub.ProtocolTypes import Hash2Pub.ProtocolTypes
import Hash2Pub.RingMap
import Debug.Trace (trace) import Debug.Trace (trace)
@ -111,7 +103,7 @@ queryLocalCache ownState nCache lBestNodes targetID
-- as target ID falls between own ID and first predecessor, it is handled by this node -- as target ID falls between own ID and first predecessor, it is handled by this node
-- This only makes sense if the node is part of the DHT by having joined. -- This only makes sense if the node is part of the DHT by having joined.
-- A default answer to nodes querying an unjoined node is provided by 'respondQueryID'. -- A default answer to nodes querying an unjoined node is provided by 'respondQueryID'.
| vsIsJoined ownState && targetID `isInOwnResponsibilitySlice` ownState = FOUND . toRemoteNodeState $ ownState | isJoined ownState && targetID `isInOwnResponsibilitySlice` ownState = FOUND . toRemoteNodeState $ ownState
-- my interpretation: the "l best next hops" are the l-1 closest preceding nodes and -- my interpretation: the "l best next hops" are the l-1 closest preceding nodes and
-- the closest succeeding node (like with the p initiated parallel queries -- the closest succeeding node (like with the p initiated parallel queries
| otherwise = FORWARD $ closestSuccessor `Set.union` closestCachePredecessors (lBestNodes-1) targetID nCache | otherwise = FORWARD $ closestSuccessor `Set.union` closestCachePredecessors (lBestNodes-1) targetID nCache
@ -235,8 +227,8 @@ markCacheEntryAsVerified timestamp nid = RingMap . Map.adjust adjustFunc nid . g
-- | uses the successor and predecessor list of a node as an indicator for whether a -- | uses the successor and predecessor list of a node as an indicator for whether a
-- node has properly joined the DHT -- node has properly joined the DHT
vsIsJoined :: LocalNodeState s -> Bool isJoined :: LocalNodeState s -> Bool
vsIsJoined ns = not . all null $ [successors ns, predecessors ns] isJoined ns = not . all null $ [successors ns, predecessors ns]
-- | the size limit to be used when serialising messages for sending -- | the size limit to be used when serialising messages for sending
sendMessageSize :: Num i => i sendMessageSize :: Num i => i
@ -245,37 +237,27 @@ sendMessageSize = 1200
-- ====== message send and receive operations ====== -- ====== message send and receive operations ======
-- encode the response to a request that just signals successful receipt -- encode the response to a request that just signals successful receipt
ackRequest :: FediChordMessage -> Map.Map Integer BS.ByteString ackRequest :: NodeID -> FediChordMessage -> Map.Map Integer BS.ByteString
ackRequest req@Request{} = serialiseMessage sendMessageSize $ Response { ackRequest ownID req@Request{} = serialiseMessage sendMessageSize $ Response {
requestID = requestID req requestID = requestID req
, senderID = receiverID req , senderID = ownID
, part = part req , part = part req
, isFinalPart = False , isFinalPart = False
, action = action req , action = action req
, payload = Nothing , payload = Nothing
} }
ackRequest _ = Map.empty ackRequest _ _ = Map.empty
-- | extract the first payload from a received message set
extractFirstPayload :: Set.Set FediChordMessage -> Maybe ActionPayload
extractFirstPayload msgSet = foldr' (\msg plAcc ->
if isNothing plAcc && isJust (payload msg)
then payload msg
else plAcc
) Nothing msgSet
-- | Dispatch incoming requests to the dedicated handling and response function, and enqueue -- | Dispatch incoming requests to the dedicated handling and response function, and enqueue
-- the response to be sent. -- the response to be sent.
handleIncomingRequest :: Service s (RealNodeSTM s) handleIncomingRequest :: Service s (RealNodeSTM s)
=> Word8 -- ^ maximum number of vservers, because of decision to @dropSpoofedIDs@ in here and not already in @fediMessageHandler@ => LocalNodeStateSTM s -- ^ the handling node
-> LocalNodeStateSTM s -- ^ the handling node
-> TQueue (BS.ByteString, SockAddr) -- ^ send queue -> TQueue (BS.ByteString, SockAddr) -- ^ send queue
-> Set.Set FediChordMessage -- ^ all parts of the request to handle -> Set.Set FediChordMessage -- ^ all parts of the request to handle
-> SockAddr -- ^ source address of the request -> SockAddr -- ^ source address of the request
-> IO () -> IO ()
handleIncomingRequest vsLimit nsSTM sendQ msgSet sourceAddr = do handleIncomingRequest nsSTM sendQ msgSet sourceAddr = do
ns <- readTVarIO nsSTM ns <- readTVarIO nsSTM
-- add nodestate to cache -- add nodestate to cache
now <- getPOSIXTime now <- getPOSIXTime
@ -283,20 +265,19 @@ handleIncomingRequest vsLimit nsSTM sendQ msgSet sourceAddr = do
Nothing -> pure () Nothing -> pure ()
Just aPart -> do Just aPart -> do
let (SockAddrInet6 _ _ sourceIP _) = sourceAddr let (SockAddrInet6 _ _ sourceIP _) = sourceAddr
queueAddEntries (Identity $ RemoteCacheEntry (sender aPart) now) (cacheWriteQueue ns) queueAddEntries (Identity $ RemoteCacheEntry (sender aPart) now) ns
-- distinguish on whether and how to respond. If responding, pass message to response generating function and write responses to send queue -- distinguish on whether and how to respond. If responding, pass message to response generating function and write responses to send queue
maybe (pure ()) ( maybe (pure ()) (
mapM_ (\resp -> atomically $ writeTQueue sendQ (resp, sourceAddr)) mapM_ (\resp -> atomically $ writeTQueue sendQ (resp, sourceAddr))
) )
=<< (case action aPart of =<< (case action aPart of
Ping -> Just <$> respondPing nsSTM msgSet Ping -> Just <$> respondPing nsSTM msgSet
Join -> dropSpoofedIDs vsLimit sourceIP nsSTM msgSet respondJoin Join -> dropSpoofedIDs sourceIP nsSTM msgSet respondJoin
-- ToDo: figure out what happens if not joined -- ToDo: figure out what happens if not joined
QueryID -> Just <$> respondQueryID nsSTM msgSet QueryID -> Just <$> respondQueryID nsSTM msgSet
-- only when joined -- only when joined
Leave -> if vsIsJoined ns then dropSpoofedIDs vsLimit sourceIP nsSTM msgSet respondLeave else pure Nothing Leave -> if isJoined ns then dropSpoofedIDs sourceIP nsSTM msgSet respondLeave else pure Nothing
Stabilise -> if vsIsJoined ns then dropSpoofedIDs vsLimit sourceIP nsSTM msgSet respondStabilise else pure Nothing Stabilise -> if isJoined ns then dropSpoofedIDs sourceIP nsSTM msgSet respondStabilise else pure Nothing
QueryLoad -> if vsIsJoined ns then Just <$> respondQueryLoad nsSTM msgSet else pure Nothing
) )
-- for single part request, response starts with part number 1. For multipart requests, response starts with part number n+1. -- for single part request, response starts with part number 1. For multipart requests, response starts with part number n+1.
@ -306,18 +287,19 @@ handleIncomingRequest vsLimit nsSTM sendQ msgSet sourceAddr = do
-- | Filter out requests with spoofed node IDs by recomputing the ID using -- | Filter out requests with spoofed node IDs by recomputing the ID using
-- the sender IP. -- the sender IP.
-- For valid (non-spoofed) sender IDs, the passed responder function is invoked. -- For valid (non-spoofed) sender IDs, the passed responder function is invoked.
dropSpoofedIDs :: Word8 -- ^ maximum number of vservers per node dropSpoofedIDs :: HostAddress6 -- msg source address
-> HostAddress6 -- ^ msg source address
-> LocalNodeStateSTM s -> LocalNodeStateSTM s
-> Set.Set FediChordMessage -- ^ message parts of the request -> Set.Set FediChordMessage -- message parts of the request
-> (LocalNodeStateSTM s -> Set.Set FediChordMessage -> IO (Map.Map Integer BS.ByteString)) -- ^ reponder function to be invoked for valid requests -> (LocalNodeStateSTM s -> Set.Set FediChordMessage -> IO (Map.Map Integer BS.ByteString)) -- reponder function to be invoked for valid requests
-> IO (Maybe (Map.Map Integer BS.ByteString)) -> IO (Maybe (Map.Map Integer BS.ByteString))
dropSpoofedIDs limVs addr nsSTM' msgSet' responder = dropSpoofedIDs addr nsSTM' msgSet' responder =
let let
aRequestPart = Set.elemAt 0 msgSet aRequestPart = Set.elemAt 0 msgSet
senderNs = sender aRequestPart senderNs = sender aRequestPart
givenSenderID = getNid senderNs
recomputedID = genNodeID addr (getDomain senderNs) (fromInteger $ getVServerID senderNs)
in in
if hasValidNodeId limVs senderNs addr if recomputedID == givenSenderID
then Just <$> responder nsSTM' msgSet' then Just <$> responder nsSTM' msgSet'
else pure Nothing else pure Nothing
@ -335,15 +317,19 @@ respondQueryID nsSTM msgSet = do
let let
aRequestPart = Set.elemAt 0 msgSet aRequestPart = Set.elemAt 0 msgSet
senderID = getNid . sender $ aRequestPart senderID = getNid . sender $ aRequestPart
senderPayload = extractFirstPayload msgSet senderPayload = foldr' (\msg plAcc ->
-- return only empty message serialisation if no payload was included in parts if isNothing plAcc && isJust (payload msg)
then payload msg
else plAcc
) Nothing msgSet
-- return only empty message serialisation if no payload was included in parts
maybe (pure Map.empty) (\senderPayload' -> do maybe (pure Map.empty) (\senderPayload' -> do
responseMsg <- atomically $ do responseMsg <- atomically $ do
nsSnap <- readTVar nsSTM nsSnap <- readTVar nsSTM
cache <- readTVar $ nodeCacheSTM nsSnap cache <- readTVar $ nodeCacheSTM nsSnap
let let
responsePayload = QueryIDResponsePayload { responsePayload = QueryIDResponsePayload {
queryResult = if vsIsJoined nsSnap queryResult = if isJoined nsSnap
then queryLocalCache nsSnap cache (fromIntegral $ queryLBestNodes senderPayload') (queryTargetID senderPayload') then queryLocalCache nsSnap cache (fromIntegral $ queryLBestNodes senderPayload') (queryTargetID senderPayload')
-- if not joined yet, attract responsibility for -- if not joined yet, attract responsibility for
-- all keys to make bootstrapping possible -- all keys to make bootstrapping possible
@ -436,47 +422,6 @@ respondPing nsSTM msgSet = do
} }
pure $ serialiseMessage sendMessageSize pingResponse pure $ serialiseMessage sendMessageSize pingResponse
respondQueryLoad :: Service s (RealNodeSTM s) => LocalNodeStateSTM s -> Set.Set FediChordMessage -> IO (Map.Map Integer BS.ByteString)
respondQueryLoad nsSTM msgSet = do
nsSnap <- readTVarIO nsSTM
-- this message cannot be split reasonably, so just
-- consider the first payload
let
aRequestPart = Set.elemAt 0 msgSet
senderPayload = extractFirstPayload msgSet
responsePl <- maybe (pure Nothing) (\pl ->
case pl of
LoadRequestPayload{} -> do
parentNode <- readTVarIO (parentRealNode nsSnap)
let
serv = nodeService parentNode
conf = nodeConfig parentNode
lStats <- getServiceLoadStats serv
let
directSucc = getNid . head . predecessors $ nsSnap
handledTagSum = loadSliceSum lStats directSucc (loadSegmentUpperBound pl)
pure $ Just LoadResponsePayload
{ loadSum = handledTagSum
, loadRemainingTarget = remainingLoadTarget conf lStats
, loadTotalCapacity = totalCapacity lStats
, loadSegmentLowerBound = directSucc
}
_ -> pure Nothing
)
senderPayload
pure $ maybe
Map.empty
(\pl -> serialiseMessage sendMessageSize $ Response
{ requestID = requestID aRequestPart
, senderID = getNid nsSnap
, part = if Set.size msgSet == 1 then 1 else fromIntegral $ Set.size msgSet + 1
, isFinalPart = False
, action = QueryLoad
, payload = Just pl
}
) responsePl
respondJoin :: Service s (RealNodeSTM s) => LocalNodeStateSTM s -> Set.Set FediChordMessage -> IO (Map.Map Integer BS.ByteString) respondJoin :: Service s (RealNodeSTM s) => LocalNodeStateSTM s -> Set.Set FediChordMessage -> IO (Map.Map Integer BS.ByteString)
respondJoin nsSTM msgSet = do respondJoin nsSTM msgSet = do
@ -489,7 +434,7 @@ respondJoin nsSTM msgSet = do
senderNS = sender aRequestPart senderNS = sender aRequestPart
-- if not joined yet, attract responsibility for -- if not joined yet, attract responsibility for
-- all keys to make bootstrapping possible -- all keys to make bootstrapping possible
responsibilityLookup = if vsIsJoined nsSnap then queryLocalCache nsSnap cache 1 (getNid senderNS) else FOUND (toRemoteNodeState nsSnap) responsibilityLookup = if isJoined nsSnap then queryLocalCache nsSnap cache 1 (getNid senderNS) else FOUND (toRemoteNodeState nsSnap)
thisNodeResponsible (FOUND _) = True thisNodeResponsible (FOUND _) = True
thisNodeResponsible (FORWARD _) = False thisNodeResponsible (FORWARD _) = False
-- check whether the joining node falls into our responsibility -- check whether the joining node falls into our responsibility
@ -536,21 +481,6 @@ respondJoin nsSTM msgSet = do
-- ....... request sending ....... -- ....... request sending .......
-- | defautl constructor for request messages, fills standard values like
-- part number to avoid code repition
mkRequest :: LocalNodeState s -> NodeID -> Action -> Maybe ActionPayload -> (Integer -> FediChordMessage)
mkRequest ns targetID action pl rid = Request
{ requestID = rid
, receiverID = targetID
, sender = toRemoteNodeState ns
-- part number and final flag can be changed by ASN1 encoder to make packet
-- fit the MTU restrictions
, part = 1
, isFinalPart = True
, action = action
, payload = pl
}
-- | send a join request and return the joined 'LocalNodeState' including neighbours -- | send a join request and return the joined 'LocalNodeState' including neighbours
requestJoin :: (NodeState a, Service s (RealNodeSTM s)) => a -- ^ currently responsible node to be contacted requestJoin :: (NodeState a, Service s (RealNodeSTM s)) => a -- ^ currently responsible node to be contacted
-> LocalNodeStateSTM s -- ^ joining NodeState -> LocalNodeStateSTM s -- ^ joining NodeState
@ -562,7 +492,7 @@ requestJoin toJoinOn ownStateSTM = do
let srcAddr = confIP nodeConf let srcAddr = confIP nodeConf
bracket (mkSendSocket srcAddr (getDomain toJoinOn) (getDhtPort toJoinOn)) close (\sock -> do bracket (mkSendSocket srcAddr (getDomain toJoinOn) (getDhtPort toJoinOn)) close (\sock -> do
-- extract own state for getting request information -- extract own state for getting request information
responses <- sendRequestTo (confRequestTimeout nodeConf) (confRequestRetries nodeConf) (mkRequest ownState (getNid toJoinOn) Join (Just JoinRequestPayload)) sock responses <- sendRequestTo (confRequestTimeout nodeConf) (confRequestRetries nodeConf) (\rid -> Request rid (toRemoteNodeState ownState) 1 True Join (Just JoinRequestPayload)) sock
(cacheInsertQ, joinedState) <- atomically $ do (cacheInsertQ, joinedState) <- atomically $ do
stateSnap <- readTVar ownStateSTM stateSnap <- readTVar ownStateSTM
let let
@ -593,7 +523,7 @@ requestJoin toJoinOn ownStateSTM = do
writeTVar ownStateSTM newState writeTVar ownStateSTM newState
pure (cacheInsertQ, newState) pure (cacheInsertQ, newState)
-- execute the cache insertions -- execute the cache insertions
mapM_ (\f -> f (cacheWriteQueue joinedState)) cacheInsertQ mapM_ (\f -> f joinedState) cacheInsertQ
if responses == Set.empty if responses == Set.empty
then pure . Left $ "join error: got no response from " <> show (getNid toJoinOn) then pure . Left $ "join error: got no response from " <> show (getNid toJoinOn)
else do else do
@ -651,14 +581,14 @@ sendQueryIdMessages :: (Integral i)
-> Maybe i -- ^ optionally provide an explicit @l@ parameter of number of nodes to be returned -> Maybe i -- ^ optionally provide an explicit @l@ parameter of number of nodes to be returned
-> [RemoteNodeState] -- ^ nodes to query -> [RemoteNodeState] -- ^ nodes to query
-> IO QueryResponse -- ^ accumulated response -> IO QueryResponse -- ^ accumulated response
sendQueryIdMessages lookupID ns lParam targets = do sendQueryIdMessages targetID ns lParam targets = do
-- create connected sockets to all query targets and use them for request handling -- create connected sockets to all query targets and use them for request handling
nodeConf <- nodeConfig <$> readTVarIO (parentRealNode ns) nodeConf <- nodeConfig <$> readTVarIO (parentRealNode ns)
let srcAddr = confIP nodeConf let srcAddr = confIP nodeConf
queryThreads <- mapM (\resultNode -> async $ bracket (mkSendSocket srcAddr (getDomain resultNode) (getDhtPort resultNode)) close ( queryThreads <- mapM (\resultNode -> async $ bracket (mkSendSocket srcAddr (getDomain resultNode) (getDhtPort resultNode)) close (
sendRequestTo (confRequestTimeout nodeConf) (confRequestRetries nodeConf) (lookupMessage lookupID ns Nothing (getNid resultNode)) sendRequestTo (confRequestTimeout nodeConf) (confRequestRetries nodeConf) (lookupMessage targetID ns Nothing)
)) targets )) targets
-- ToDo: process results immediately instead of waiting for the last one to finish, see https://stackoverflow.com/a/38815224/9198613 -- ToDo: process results immediately instead of waiting for the last one to finish, see https://stackoverflow.com/a/38815224/9198613
-- ToDo: exception handling, maybe log them -- ToDo: exception handling, maybe log them
@ -675,7 +605,7 @@ sendQueryIdMessages lookupID ns lParam targets = do
_ -> Set.empty _ -> Set.empty
-- forward entries to global cache -- forward entries to global cache
queueAddEntries entrySet (cacheWriteQueue ns) queueAddEntries entrySet ns
-- return accumulated QueryResult -- return accumulated QueryResult
pure $ case acc of pure $ case acc of
-- once a FOUND as been encountered, return this as a result -- once a FOUND as been encountered, return this as a result
@ -691,14 +621,13 @@ sendQueryIdMessages lookupID ns lParam targets = do
-- | Create a QueryID message to be supplied to 'sendRequestTo' -- | Create a QueryID message to be supplied to 'sendRequestTo'
lookupMessage :: Integral i lookupMessage :: Integral i
=> NodeID -- ^ lookup ID to be looked up => NodeID -- ^ target ID
-> LocalNodeState s -- ^ sender node state -> LocalNodeState s -- ^ sender node state
-> Maybe i -- ^ optionally provide a different l parameter -> Maybe i -- ^ optionally provide a different l parameter
-> NodeID -- ^ target ID of message destination
-> (Integer -> FediChordMessage) -> (Integer -> FediChordMessage)
lookupMessage lookupID ns lParam targetID = mkRequest ns targetID QueryID (Just $ pl ns lookupID) lookupMessage targetID ns lParam = \rID -> Request rID (toRemoteNodeState ns) 1 True QueryID (Just $ pl ns targetID)
where where
pl ns' lookupID' = QueryIDRequestPayload { queryTargetID = lookupID', queryLBestNodes = maybe (fromIntegral $ lNumBestNodes ns') fromIntegral lParam } pl ns' targetID' = QueryIDRequestPayload { queryTargetID = targetID', queryLBestNodes = maybe (fromIntegral $ lNumBestNodes ns) fromIntegral lParam }
-- | Send a stabilise request to provided 'RemoteNode' and, if successful, -- | Send a stabilise request to provided 'RemoteNode' and, if successful,
@ -709,7 +638,16 @@ requestStabilise :: LocalNodeState s -- ^ sending node
requestStabilise ns neighbour = do requestStabilise ns neighbour = do
nodeConf <- nodeConfig <$> readTVarIO (parentRealNode ns) nodeConf <- nodeConfig <$> readTVarIO (parentRealNode ns)
let srcAddr = confIP nodeConf let srcAddr = confIP nodeConf
responses <- bracket (mkSendSocket srcAddr (getDomain neighbour) (getDhtPort neighbour)) close (fmap Right . sendRequestTo (confRequestTimeout nodeConf) (confRequestRetries nodeConf) (mkRequest ns (getNid neighbour) Stabilise (Just StabiliseRequestPayload)) responses <- bracket (mkSendSocket srcAddr (getDomain neighbour) (getDhtPort neighbour)) close (fmap Right . sendRequestTo (confRequestTimeout nodeConf) (confRequestRetries nodeConf) (\rid ->
Request {
requestID = rid
, sender = toRemoteNodeState ns
, part = 1
, isFinalPart = False
, action = Stabilise
, payload = Just StabiliseRequestPayload
}
)
) `catch` (\e -> pure . Left $ displayException (e :: IOException)) ) `catch` (\e -> pure . Left $ displayException (e :: IOException))
either either
-- forward IO error messages -- forward IO error messages
@ -722,7 +660,7 @@ requestStabilise ns neighbour = do
) )
([],[]) respSet ([],[]) respSet
-- update successfully responded neighbour in cache -- update successfully responded neighbour in cache
maybe (pure ()) (\p -> queueUpdateVerifieds (Identity $ senderID p) (cacheWriteQueue ns)) $ headMay (Set.elems respSet) maybe (pure ()) (\p -> queueUpdateVerifieds (Identity $ senderID p) ns) $ headMay (Set.elems respSet)
pure $ if null responsePreds && null responseSuccs pure $ if null responsePreds && null responseSuccs
then Left "no neighbours returned" then Left "no neighbours returned"
else Right (responsePreds, responseSuccs) else Right (responsePreds, responseSuccs)
@ -744,12 +682,17 @@ requestLeave ns doMigration target = do
, leavePredecessors = predecessors ns , leavePredecessors = predecessors ns
, leaveDoMigration = doMigration , leaveDoMigration = doMigration
} }
responses <- bracket responses <- bracket (mkSendSocket srcAddr (getDomain target) (getDhtPort target)) close (fmap Right . sendRequestTo (confRequestTimeout nodeConf) (confRequestRetries nodeConf) (\rid ->
(mkSendSocket srcAddr (getDomain target) (getDhtPort target)) Request {
close requestID = rid
(fmap Right , sender = toRemoteNodeState ns
. sendRequestTo (confRequestTimeout nodeConf) (confRequestRetries nodeConf) (mkRequest ns (getNid target) Leave (Just leavePayload)) , part = 1
) `catch` (\e -> pure . Left $ displayException (e :: IOException)) , isFinalPart = False
, action = Leave
, payload = Just leavePayload
}
)
) `catch` (\e -> pure . Left $ displayException (e :: IOException))
either either
-- forward IO error messages -- forward IO error messages
(pure . Left) (pure . Left)
@ -765,7 +708,16 @@ requestPing ns target = do
let srcAddr = confIP nodeConf let srcAddr = confIP nodeConf
responses <- bracket (mkSendSocket srcAddr (getDomain target) (getDhtPort target)) close responses <- bracket (mkSendSocket srcAddr (getDomain target) (getDhtPort target)) close
(\sock -> do (\sock -> do
resp <- sendRequestTo (confRequestTimeout nodeConf) (confRequestRetries nodeConf) (mkRequest ns (getNid target) Ping (Just PingRequestPayload)) sock resp <- sendRequestTo (confRequestTimeout nodeConf) (confRequestRetries nodeConf) (\rid ->
Request {
requestID = rid
, sender = toRemoteNodeState ns
, part = 1
, isFinalPart = False
, action = Ping
, payload = Just PingRequestPayload
}
) sock
(SockAddrInet6 _ _ peerAddr _) <- getPeerName sock (SockAddrInet6 _ _ peerAddr _) <- getPeerName sock
pure $ Right (peerAddr, resp) pure $ Right (peerAddr, resp)
) `catch` (\e -> pure . Left $ displayException (e :: IOException)) ) `catch` (\e -> pure . Left $ displayException (e :: IOException))
@ -781,9 +733,10 @@ requestPing ns target = do
-- recompute ID for each received node and mark as verified in cache -- recompute ID for each received node and mark as verified in cache
now <- getPOSIXTime now <- getPOSIXTime
forM_ responseVss (\vs -> forM_ responseVss (\vs ->
if hasValidNodeId (confKChoicesMaxVS nodeConf) vs peerAddr let recomputedID = genNodeID peerAddr (getDomain vs) (fromInteger $ getVServerID vs)
then atomically $ writeTQueue (cacheWriteQueue ns) $ addNodeAsVerifiedPure now vs in if recomputedID == getNid vs
else pure () then atomically $ writeTQueue (cacheWriteQueue ns) $ addNodeAsVerifiedPure now vs
else pure ()
) )
pure $ if null responseVss pure $ if null responseVss
then Left "no active vServer IDs returned, ignoring node" then Left "no active vServer IDs returned, ignoring node"
@ -791,37 +744,6 @@ requestPing ns target = do
) responses ) responses
-- still need a particular vserver as LocalNodeState, because requests need a sender
requestQueryLoad :: (MonadError String m, MonadIO m)
=> LocalNodeState s -- ^ the local source vserver for the request
-> NodeID -- ^ upper bound of the segment queried, lower bound is set automatically by the queried node
-> RemoteNodeState -- ^ target node to query
-> m SegmentLoadStats
requestQueryLoad ns upperIdBound target = do
nodeConf <- nodeConfig <$> liftIO (readTVarIO $ parentRealNode ns)
let
srcAddr = confIP nodeConf
loadReqPl = LoadRequestPayload
{ loadSegmentUpperBound = upperIdBound
}
responses <- liftIO $ bracket (mkSendSocket srcAddr (getDomain target) (getDhtPort target)) close
(fmap Right . sendRequestTo (confRequestTimeout nodeConf) (confRequestRetries nodeConf) (mkRequest ns (getNid target) QueryLoad (Just loadReqPl))
) `catch` (\e -> pure . Left $ displayException (e :: IOException))
responseMsgSet <- liftEither responses
-- throws an error if an exception happened
loadResPl <- maybe (throwError "no load response payload found") pure
(extractFirstPayload responseMsgSet)
pure SegmentLoadStats
{ segmentLowerKeyBound = loadSegmentLowerBound loadResPl
, segmentUpperKeyBound = upperIdBound
, segmentLoad = loadSum loadResPl
, segmentOwnerRemainingLoadTarget = loadRemainingTarget loadResPl
, segmentOwnerCapacity = loadTotalCapacity loadResPl
, segmentCurrentOwner = target
}
-- | Generic function for sending a request over a connected socket and collecting the response. -- | Generic function for sending a request over a connected socket and collecting the response.
-- Serialises the message and tries to deliver its parts for a number of attempts within a specified timeout. -- Serialises the message and tries to deliver its parts for a number of attempts within a specified timeout.
sendRequestTo :: Int -- ^ timeout in milliseconds sendRequestTo :: Int -- ^ timeout in milliseconds
@ -878,24 +800,24 @@ sendRequestTo timeoutMillis numAttempts msgIncomplete sock = do
-- | enqueue a list of RemoteCacheEntries to be added to the global NodeCache -- | enqueue a list of RemoteCacheEntries to be added to the global NodeCache
queueAddEntries :: Foldable c => c RemoteCacheEntry queueAddEntries :: Foldable c => c RemoteCacheEntry
-> TQueue (NodeCache -> NodeCache) -> LocalNodeState s
-> IO () -> IO ()
queueAddEntries entries cacheQ = do queueAddEntries entries ns = do
now <- getPOSIXTime now <- getPOSIXTime
forM_ entries $ \entry -> atomically $ writeTQueue cacheQ $ addCacheEntryPure now entry forM_ entries $ \entry -> atomically $ writeTQueue (cacheWriteQueue ns) $ addCacheEntryPure now entry
-- | enque a list of node IDs to be deleted from the global NodeCache -- | enque a list of node IDs to be deleted from the global NodeCache
queueDeleteEntries :: Foldable c queueDeleteEntries :: Foldable c
=> c NodeID => c NodeID
-> TQueue (NodeCache -> NodeCache) -> LocalNodeState s
-> IO () -> IO ()
queueDeleteEntries ids cacheQ = forM_ ids $ atomically . writeTQueue cacheQ . deleteCacheEntry queueDeleteEntries ids ns = forM_ ids $ atomically . writeTQueue (cacheWriteQueue ns) . deleteCacheEntry
-- | enque a single node ID to be deleted from the global NodeCache -- | enque a single node ID to be deleted from the global NodeCache
queueDeleteEntry :: NodeID queueDeleteEntry :: NodeID
-> TQueue (NodeCache -> NodeCache) -> LocalNodeState s
-> IO () -> IO ()
queueDeleteEntry toDelete = queueDeleteEntries $ Identity toDelete queueDeleteEntry toDelete = queueDeleteEntries $ Identity toDelete
@ -904,11 +826,11 @@ queueDeleteEntry toDelete = queueDeleteEntries $ Identity toDelete
-- global 'NodeCache'. -- global 'NodeCache'.
queueUpdateVerifieds :: Foldable c queueUpdateVerifieds :: Foldable c
=> c NodeID => c NodeID
-> TQueue (NodeCache -> NodeCache) -> LocalNodeState s
-> IO () -> IO ()
queueUpdateVerifieds nIds cacheQ = do queueUpdateVerifieds nIds ns = do
now <- getPOSIXTime now <- getPOSIXTime
forM_ nIds $ \nid' -> atomically $ writeTQueue cacheQ $ forM_ nIds $ \nid' -> atomically $ writeTQueue (cacheWriteQueue ns) $
markCacheEntryAsVerified (Just now) nid' markCacheEntryAsVerified (Just now) nid'
-- | retry an IO action at most *i* times until it delivers a result -- | retry an IO action at most *i* times until it delivers a result

File diff suppressed because it is too large Load diff

View file

@ -7,8 +7,8 @@
{-# LANGUAGE OverloadedStrings #-} {-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE RankNTypes #-} {-# LANGUAGE RankNTypes #-}
module Hash2Pub.FediChordTypes module Hash2Pub.FediChordTypes (
( NodeID -- abstract, but newtype constructors cannot be hidden NodeID -- abstract, but newtype constructors cannot be hidden
, idBits , idBits
, getNodeID , getNodeID
, toNodeID , toNodeID
@ -18,13 +18,6 @@ module Hash2Pub.FediChordTypes
, RemoteNodeState (..) , RemoteNodeState (..)
, RealNode (..) , RealNode (..)
, RealNodeSTM , RealNodeSTM
, VSMap
, LoadStats (..)
, emptyLoadStats
, remainingLoadTarget
, loadSliceSum
, addVserver
, SegmentLoadStats (..)
, setSuccessors , setSuccessors
, setPredecessors , setPredecessors
, NodeCache , NodeCache
@ -58,7 +51,6 @@ module Hash2Pub.FediChordTypes
, localCompare , localCompare
, genNodeID , genNodeID
, genNodeIDBS , genNodeIDBS
, hasValidNodeId
, genKeyID , genKeyID
, genKeyIDBS , genKeyIDBS
, byteStringToUInteger , byteStringToUInteger
@ -68,14 +60,12 @@ module Hash2Pub.FediChordTypes
, DHT(..) , DHT(..)
, Service(..) , Service(..)
, ServiceConf(..) , ServiceConf(..)
) where ) where
import Control.Exception import Control.Exception
import Data.Foldable (foldr') import Data.Foldable (foldr')
import Data.Function (on) import Data.Function (on)
import qualified Data.Hashable as Hashable import qualified Data.Hashable as Hashable
import Data.HashMap.Strict (HashMap)
import qualified Data.HashMap.Strict as HMap
import Data.List (delete, nub, sortBy) import Data.List (delete, nub, sortBy)
import qualified Data.Map.Strict as Map import qualified Data.Map.Strict as Map
import Data.Maybe (fromJust, fromMaybe, isJust, import Data.Maybe (fromJust, fromMaybe, isJust,
@ -158,27 +148,17 @@ a `localCompare` b
-- Also contains shared data and config values. -- Also contains shared data and config values.
-- TODO: more data structures for k-choices bookkeeping -- TODO: more data structures for k-choices bookkeeping
data RealNode s = RealNode data RealNode s = RealNode
{ vservers :: VSMap s { vservers :: [LocalNodeStateSTM s]
-- ^ map of all active VServer node IDs to their node state -- ^ references to all active versers
, nodeConfig :: FediChordConf , nodeConfig :: FediChordConf
-- ^ holds the initial configuration read at program start -- ^ holds the initial configuration read at program start
, bootstrapNodes :: [(String, PortNumber)] , bootstrapNodes :: [(String, PortNumber)]
-- ^ nodes to be used as bootstrapping points, new ones learned during operation -- ^ nodes to be used as bootstrapping points, new ones learned during operation
, lookupCacheSTM :: TVar LookupCache , lookupCacheSTM :: TVar LookupCache
-- ^ a global cache of looked up keys and their associated nodes -- ^ a global cache of looked up keys and their associated nodes
, globalNodeCacheSTM :: TVar NodeCache , nodeService :: s (RealNodeSTM s)
-- ^ EpiChord node cache with expiry times for nodes.
, globalCacheWriteQueue :: TQueue (NodeCache -> NodeCache)
-- ^ cache updates are not written directly to the 'globalNodeCacheSTM'
, nodeService :: s (RealNodeSTM s)
} }
-- | insert a new vserver mapping into a node
addVserver :: (NodeID, LocalNodeStateSTM s) -> RealNode s -> RealNode s
addVserver (key, nstate) node = node
{ vservers = addRMapEntry key nstate (vservers node) }
type VSMap s = RingMap NodeID (LocalNodeStateSTM s)
type RealNodeSTM s = TVar (RealNode s) type RealNodeSTM s = TVar (RealNode s)
-- | represents a node and all its important state -- | represents a node and all its important state
@ -192,7 +172,7 @@ data RemoteNodeState = RemoteNodeState
-- ^ port of the DHT itself -- ^ port of the DHT itself
, servicePort :: PortNumber , servicePort :: PortNumber
-- ^ port of the service provided on top of the DHT -- ^ port of the service provided on top of the DHT
, vServerID :: Word8 , vServerID :: Integer
-- ^ ID of this vserver -- ^ ID of this vserver
} }
deriving (Show, Eq) deriving (Show, Eq)
@ -205,9 +185,9 @@ data LocalNodeState s = LocalNodeState
{ nodeState :: RemoteNodeState { nodeState :: RemoteNodeState
-- ^ represents common data present both in remote and local node representations -- ^ represents common data present both in remote and local node representations
, nodeCacheSTM :: TVar NodeCache , nodeCacheSTM :: TVar NodeCache
-- ^ reference to the 'globalNodeCacheSTM' -- ^ EpiChord node cache with expiry times for nodes
, cacheWriteQueue :: TQueue (NodeCache -> NodeCache) , cacheWriteQueue :: TQueue (NodeCache -> NodeCache)
-- ^ reference to the 'globalCacheWriteQueue -- ^ cache updates are not written directly to the 'nodeCache' but queued and
, successors :: [RemoteNodeState] -- could be a set instead as these are ordered as well , successors :: [RemoteNodeState] -- could be a set instead as these are ordered as well
-- ^ successor nodes in ascending order by distance -- ^ successor nodes in ascending order by distance
, predecessors :: [RemoteNodeState] , predecessors :: [RemoteNodeState]
@ -237,14 +217,14 @@ class NodeState a where
getIpAddr :: a -> HostAddress6 getIpAddr :: a -> HostAddress6
getDhtPort :: a -> PortNumber getDhtPort :: a -> PortNumber
getServicePort :: a -> PortNumber getServicePort :: a -> PortNumber
getVServerID :: a -> Word8 getVServerID :: a -> Integer
-- setters for common properties -- setters for common properties
setNid :: NodeID -> a -> a setNid :: NodeID -> a -> a
setDomain :: String -> a -> a setDomain :: String -> a -> a
setIpAddr :: HostAddress6 -> a -> a setIpAddr :: HostAddress6 -> a -> a
setDhtPort :: PortNumber -> a -> a setDhtPort :: PortNumber -> a -> a
setServicePort :: PortNumber -> a -> a setServicePort :: PortNumber -> a -> a
setVServerID :: Word8 -> a -> a setVServerID :: Integer -> a -> a
toRemoteNodeState :: a -> RemoteNodeState toRemoteNodeState :: a -> RemoteNodeState
instance NodeState RemoteNodeState where instance NodeState RemoteNodeState where
@ -393,11 +373,6 @@ genNodeID :: HostAddress6 -- ^a node's IPv6 address
-> NodeID -- ^the generated @NodeID@ -> NodeID -- ^the generated @NodeID@
genNodeID ip nodeDomain vs = NodeID . byteStringToUInteger $ genNodeIDBS ip nodeDomain vs genNodeID ip nodeDomain vs = NodeID . byteStringToUInteger $ genNodeIDBS ip nodeDomain vs
hasValidNodeId :: Word8 -> RemoteNodeState -> HostAddress6 -> Bool
hasValidNodeId numVs rns addr = getVServerID rns < numVs && getNid rns == genNodeID addr (getDomain rns) (getVServerID rns)
-- | generates a 256 bit long key identifier, represented as ByteString, for looking up its data on the DHT -- | generates a 256 bit long key identifier, represented as ByteString, for looking up its data on the DHT
genKeyIDBS :: String -- ^the key string genKeyIDBS :: String -- ^the key string
-> BS.ByteString -- ^the key ID represented as a @ByteString@ -> BS.ByteString -- ^the key ID represented as a @ByteString@
@ -452,70 +427,9 @@ data FediChordConf = FediChordConf
-- ^ how long to wait until response has arrived, in milliseconds -- ^ how long to wait until response has arrived, in milliseconds
, confRequestRetries :: Int , confRequestRetries :: Int
-- ^ how often re-sending a timed-out request can be retried -- ^ how often re-sending a timed-out request can be retried
, confEnableKChoices :: Bool
-- ^ whether to enable k-choices load balancing
, confKChoicesOverload :: Double
-- ^ fraction of capacity above which a node considers itself overloaded
, confKChoicesUnderload :: Double
-- ^ fraction of capacity below which a node considers itself underloaded
, confKChoicesMaxVS :: Word8
-- ^ upper limit of vserver index κ
, confKChoicesRebalanceInterval :: Int
-- ^ interval between vserver rebalance attempts
} }
deriving (Show, Eq) deriving (Show, Eq)
-- ====== k-choices load balancing types ======
data LoadStats = LoadStats
{ loadPerTag :: RingMap NodeID Double
-- ^ map of loads for each handled tag
, totalCapacity :: Double
-- ^ total designated capacity of the service
, compensatedLoadSum :: Double
-- ^ effective load reevant for load balancing after compensating for
}
deriving (Show, Eq)
-- | calculates the mismatch from the target load by taking into account the
-- underload and overload limits
remainingLoadTarget :: FediChordConf -> LoadStats -> Double
remainingLoadTarget conf lstats = targetLoad - compensatedLoadSum lstats
where
targetLoad = totalCapacity lstats * (confKChoicesUnderload conf + confKChoicesOverload conf) / 2
-- | calculates the sum of tag load in a contiguous slice between to keys
loadSliceSum :: LoadStats
-> NodeID -- ^ lower segment bound
-> NodeID -- ^ upper segment bound
-> Double -- ^ sum of all tag loads within that segment
loadSliceSum stats from to = sum . takeRMapSuccessorsFromTo from to $ loadPerTag stats
data SegmentLoadStats = SegmentLoadStats
{ segmentLowerKeyBound :: NodeID
-- ^ segment start key
, segmentUpperKeyBound :: NodeID
-- ^ segment end key
, segmentLoad :: Double
-- ^ sum of load of all keys in the segment
, segmentOwnerRemainingLoadTarget :: Double
-- ^ remaining load target of the current segment handler:
, segmentOwnerCapacity :: Double
-- ^ total capacity of the current segment handler node, used for normalisation
, segmentCurrentOwner :: RemoteNodeState
-- ^ the current owner of the segment that needs to be joined on
}
-- TODO: figure out a better way of initialising
emptyLoadStats :: LoadStats
emptyLoadStats = LoadStats
{ loadPerTag = emptyRMap
, totalCapacity = 0
, compensatedLoadSum = 0
}
-- ====== Service Types ============ -- ====== Service Types ============
class Service s d where class Service s d where
@ -531,7 +445,6 @@ class Service s d where
-> IO (Either String ()) -- ^ success or failure -> IO (Either String ()) -- ^ success or failure
-- | Wait for an incoming migration from a given node to succeed, may block forever -- | Wait for an incoming migration from a given node to succeed, may block forever
waitForMigrationFrom :: s d -> NodeID -> IO () waitForMigrationFrom :: s d -> NodeID -> IO ()
getServiceLoadStats :: s d -> IO LoadStats
instance Hashable.Hashable NodeID where instance Hashable.Hashable NodeID where
hashWithSalt salt = Hashable.hashWithSalt salt . getNodeID hashWithSalt salt = Hashable.hashWithSalt salt . getNodeID

View file

@ -22,7 +22,7 @@ import qualified Data.DList as D
import Data.Either (lefts, rights) import Data.Either (lefts, rights)
import qualified Data.HashMap.Strict as HMap import qualified Data.HashMap.Strict as HMap
import qualified Data.HashSet as HSet import qualified Data.HashSet as HSet
import Data.Maybe (fromJust, fromMaybe, isJust) import Data.Maybe (fromJust, isJust)
import Data.String (fromString) import Data.String (fromString)
import Data.Text.Lazy (Text) import Data.Text.Lazy (Text)
import qualified Data.Text.Lazy as Txt import qualified Data.Text.Lazy as Txt
@ -64,10 +64,8 @@ data PostService d = PostService
, migrationsInProgress :: TVar (HMap.HashMap NodeID (MVar ())) , migrationsInProgress :: TVar (HMap.HashMap NodeID (MVar ()))
, httpMan :: HTTP.Manager , httpMan :: HTTP.Manager
, statsQueue :: TQueue StatsEvent , statsQueue :: TQueue StatsEvent
, relayStats :: TVar RelayStats , loadStats :: TVar RelayStats
-- ^ current relay stats, replaced periodically -- ^ current load stats, replaced periodically
, loadStats :: TVar LoadStats
-- ^ current load values of the relay, replaced periodically and used by
, logFileHandle :: Handle , logFileHandle :: Handle
} }
deriving (Typeable) deriving (Typeable)
@ -98,8 +96,7 @@ instance DHT d => Service PostService d where
migrationsInProgress' <- newTVarIO HMap.empty migrationsInProgress' <- newTVarIO HMap.empty
httpMan' <- HTTP.newManager HTTP.defaultManagerSettings httpMan' <- HTTP.newManager HTTP.defaultManagerSettings
statsQueue' <- newTQueueIO statsQueue' <- newTQueueIO
relayStats' <- newTVarIO emptyStats loadStats' <- newTVarIO emptyStats
loadStats' <- newTVarIO emptyLoadStats
loggingFile <- openFile (confLogfilePath conf) WriteMode loggingFile <- openFile (confLogfilePath conf) WriteMode
hSetBuffering loggingFile LineBuffering hSetBuffering loggingFile LineBuffering
let let
@ -115,7 +112,6 @@ instance DHT d => Service PostService d where
, migrationsInProgress = migrationsInProgress' , migrationsInProgress = migrationsInProgress'
, httpMan = httpMan' , httpMan = httpMan'
, statsQueue = statsQueue' , statsQueue = statsQueue'
, relayStats = relayStats'
, loadStats = loadStats' , loadStats = loadStats'
, logFileHandle = loggingFile , logFileHandle = loggingFile
} }
@ -157,12 +153,6 @@ instance DHT d => Service PostService d where
-- block until migration finished -- block until migration finished
takeMVar migrationSynchroniser takeMVar migrationSynchroniser
getServiceLoadStats = getLoadStats
getLoadStats :: PostService d -> IO LoadStats
getLoadStats serv = readTVarIO $ loadStats serv
-- | return a WAI application -- | return a WAI application
postServiceApplication :: DHT d => PostService d -> Application postServiceApplication :: DHT d => PostService d -> Application
@ -845,12 +835,7 @@ evaluateStatsThread serv statsAcc = getPOSIXTime >>= loop
-- persistently store in a TVar so it can be retrieved later by the DHT -- persistently store in a TVar so it can be retrieved later by the DHT
let timePassed = (now - previousTs) * fromIntegral (confSpeedupFactor $ serviceConf serv) let timePassed = (now - previousTs) * fromIntegral (confSpeedupFactor $ serviceConf serv)
rateStats = evaluateStats timePassed summedStats rateStats = evaluateStats timePassed summedStats
currentSubscribers <- readTVarIO $ subscribers serv atomically $ writeTVar (loadStats serv) rateStats
-- translate the rate statistics to load values
loads <- evaluateLoadStats rateStats currentSubscribers
atomically $
writeTVar (relayStats serv) rateStats
>> writeTVar (loadStats serv) loads
-- and now what? write a log to file -- and now what? write a log to file
-- format: total relayReceiveRates;total relayDeliveryRates;postFetchRate;postPublishRate; subscriberSum -- format: total relayReceiveRates;total relayDeliveryRates;postFetchRate;postPublishRate; subscriberSum
-- later: current (reported) load, target load -- later: current (reported) load, target load
@ -874,33 +859,6 @@ evaluateStatsThread serv statsAcc = getPOSIXTime >>= loop
0 tagMap 0 tagMap
-- | calculate load values from rate statistics
evaluateLoadStats :: RelayStats -> RelayTags -> IO LoadStats
evaluateLoadStats currentStats currentSubscribers = do
-- load caused by each tag: incomingPostRate * ( 1 + subscribers)
-- calculate remaining load target: post publish rate * 2.5 - sum loadPerTag - postFetchRate
let
totalCapacity' = 2.5 * postPublishRate currentStats
(loadSum, loadPerTag') <- foldM (\(loadSum, loadPerTag') (key, (subscriberMapSTM, _, _)) -> do
numSubscribers <- HMap.size <$> readTVarIO subscriberMapSTM
let
thisTagRate = fromMaybe 0 $ rMapLookup key (relayReceiveRates currentStats)
thisTagLoad = thisTagRate * (1 + fromIntegral numSubscribers)
pure (loadSum + thisTagLoad, addRMapEntry key thisTagLoad loadPerTag')
)
(0, emptyRMap)
$ rMapToListWithKeys currentSubscribers
let remainingLoadTarget' = totalCapacity' - loadSum - postFetchRate currentStats
pure LoadStats
{ loadPerTag = loadPerTag'
, totalCapacity = totalCapacity'
-- load caused by post fetches cannot be influenced by re-balancing nodes,
-- but still reduces the totally available capacity
, compensatedLoadSum = loadSum + postFetchRate currentStats
}
-- | Evaluate the accumulated statistic events: Currently mostly calculates the event -- | Evaluate the accumulated statistic events: Currently mostly calculates the event
-- rates by dividing through the collection time frame -- rates by dividing through the collection time frame
evaluateStats :: POSIXTime -> RelayStats -> RelayStats evaluateStats :: POSIXTime -> RelayStats -> RelayStats

View file

@ -16,12 +16,10 @@ data Action = QueryID
| Leave | Leave
| Stabilise | Stabilise
| Ping | Ping
| QueryLoad
deriving (Show, Eq, Enum) deriving (Show, Eq, Enum)
data FediChordMessage = Request data FediChordMessage = Request
{ requestID :: Integer { requestID :: Integer
, receiverID :: NodeID
, sender :: RemoteNodeState , sender :: RemoteNodeState
, part :: Integer , part :: Integer
, isFinalPart :: Bool , isFinalPart :: Bool
@ -59,10 +57,6 @@ data ActionPayload = QueryIDRequestPayload
} }
| StabiliseRequestPayload | StabiliseRequestPayload
| PingRequestPayload | PingRequestPayload
| LoadRequestPayload
{ loadSegmentUpperBound :: NodeID
-- ^ upper bound of segment interested in,
}
| QueryIDResponsePayload | QueryIDResponsePayload
{ queryResult :: QueryResponse { queryResult :: QueryResponse
} }
@ -79,12 +73,6 @@ data ActionPayload = QueryIDRequestPayload
| PingResponsePayload | PingResponsePayload
{ pingNodeStates :: [RemoteNodeState] { pingNodeStates :: [RemoteNodeState]
} }
| LoadResponsePayload
{ loadSum :: Double
, loadRemainingTarget :: Double
, loadTotalCapacity :: Double
, loadSegmentLowerBound :: NodeID
}
deriving (Show, Eq) deriving (Show, Eq)
-- | global limit of parts per message used when (de)serialising messages. -- | global limit of parts per message used when (de)serialising messages.

View file

@ -47,13 +47,6 @@ instance (Bounded k, Ord k) => Foldable (RingMap k) where
traversingFL acc (ProxyEntry _ Nothing) = acc traversingFL acc (ProxyEntry _ Nothing) = acc
traversingFL acc (ProxyEntry _ (Just entry)) = traversingFL acc entry traversingFL acc (ProxyEntry _ (Just entry)) = traversingFL acc entry
instance (Bounded k, Ord k) => Traversable (RingMap k) where
traverse f = fmap RingMap . traverse traversingF . getRingMap
where
traversingF (KeyEntry entry) = KeyEntry <$> f entry
traversingF (ProxyEntry to Nothing) = pure $ ProxyEntry to Nothing
traversingF (ProxyEntry to (Just entry)) = ProxyEntry to . Just <$> traversingF entry
-- | entry of a 'RingMap' that holds a value and can also -- | entry of a 'RingMap' that holds a value and can also
-- wrap around the lookup direction at the edges of the name space. -- wrap around the lookup direction at the edges of the name space.
@ -113,23 +106,6 @@ rMapSize rmap = fromIntegral $ Map.size innerMap - oneIfEntry rmap minBound - on
| isNothing (rMapLookup nid rmap') = 1 | isNothing (rMapLookup nid rmap') = 1
| otherwise = 0 | otherwise = 0
-- | whether the RingMap is empty (except for empty proxy entries)
nullRMap :: (Num k, Bounded k, Ord k)
=> RingMap k a
-> Bool
-- basic idea: look for a predecessor starting from the upper Map bound,
-- Nothing indicates no entry being found
nullRMap = isNothing . rMapLookupPred maxBound
-- | O(logn( Is the key a member of the RingMap?
memberRMap :: (Bounded k, Ord k)
=> k
-> RingMap k a
-> Bool
memberRMap key = isJust . rMapLookup key
-- | a wrapper around lookup functions, making the lookup redirectable by a @ProxyEntry@ -- | a wrapper around lookup functions, making the lookup redirectable by a @ProxyEntry@
-- to simulate a modular ring -- to simulate a modular ring
lookupWrapper :: (Bounded k, Ord k, Num k) lookupWrapper :: (Bounded k, Ord k, Num k)
@ -222,28 +198,12 @@ deleteRMapEntry nid = RingMap . Map.update modifier nid . getRingMap
modifier (ProxyEntry idPointer _) = Just (ProxyEntry idPointer Nothing) modifier (ProxyEntry idPointer _) = Just (ProxyEntry idPointer Nothing)
modifier KeyEntry {} = Nothing modifier KeyEntry {} = Nothing
-- TODO: rename this to elems
rMapToList :: (Bounded k, Ord k) => RingMap k a -> [a] rMapToList :: (Bounded k, Ord k) => RingMap k a -> [a]
rMapToList = mapMaybe extractRingEntry . Map.elems . getRingMap rMapToList = mapMaybe extractRingEntry . Map.elems . getRingMap
-- TODO: rename this to toList
rMapToListWithKeys :: (Bounded k, Ord k) => RingMap k a -> [(k, a)]
rMapToListWithKeys = Map.foldrWithKey (\k v acc ->
maybe acc (\val -> (k, val):acc) $ extractRingEntry v
)
[]
. getRingMap
rMapFromList :: (Bounded k, Ord k) => [(k, a)] -> RingMap k a rMapFromList :: (Bounded k, Ord k) => [(k, a)] -> RingMap k a
rMapFromList = setRMapEntries rMapFromList = setRMapEntries
-- | this just merges the underlying 'Map.Map' s and does not check whether the
-- ProxyEntry pointers are consistent, so better only create unions of
-- equal-pointered RingMaps
unionRMap :: (Bounded k, Ord k) => RingMap k a -> RingMap k a -> RingMap k a
unionRMap a b = RingMap $ Map.union (getRingMap a) (getRingMap b)
-- | takes up to i entries from a 'RingMap' by calling a getter function on a -- | takes up to i entries from a 'RingMap' by calling a getter function on a
-- *startAt* value and after that on the previously returned value. -- *startAt* value and after that on the previously returned value.
-- Stops once i entries have been taken or an entry has been encountered twice -- Stops once i entries have been taken or an entry has been encountered twice

View file

@ -7,7 +7,6 @@ import Control.Concurrent.STM.TVar
import Control.Exception import Control.Exception
import Data.ASN1.Parse (runParseASN1) import Data.ASN1.Parse (runParseASN1)
import qualified Data.ByteString as BS import qualified Data.ByteString as BS
import qualified Data.HashMap.Strict as HMap
import qualified Data.Map.Strict as Map import qualified Data.Map.Strict as Map
import Data.Maybe (fromJust, isJust) import Data.Maybe (fromJust, isJust)
import qualified Data.Set as Set import qualified Data.Set as Set
@ -19,7 +18,6 @@ import Hash2Pub.ASN1Coding
import Hash2Pub.DHTProtocol import Hash2Pub.DHTProtocol
import Hash2Pub.FediChord import Hash2Pub.FediChord
import Hash2Pub.FediChordTypes import Hash2Pub.FediChordTypes
import Hash2Pub.RingMap
spec :: Spec spec :: Spec
spec = do spec = do
@ -223,16 +221,14 @@ spec = do
, exampleNodeState {nid = fromInteger (-5)} , exampleNodeState {nid = fromInteger (-5)}
] ]
} }
qLoadReqPayload = LoadRequestPayload requestTemplate = Request {
{ loadSegmentUpperBound = 1025 requestID = 2342
} , sender = exampleNodeState
qLoadResPayload = LoadResponsePayload , part = 1
{ loadSum = 3.141 , isFinalPart = True
, loadRemainingTarget = -1.337 , action = undefined
, loadTotalCapacity = 2.21 , payload = undefined
, loadSegmentLowerBound = 12 }
}
responseTemplate = Response { responseTemplate = Response {
requestID = 2342 requestID = 2342
, senderID = nid exampleNodeState , senderID = nid exampleNodeState
@ -241,7 +237,7 @@ spec = do
, action = undefined , action = undefined
, payload = undefined , payload = undefined
} }
requestWith senderNode a pa = mkRequest senderNode 4545 a (Just pa) 2342 requestWith a pa = requestTemplate {action = a, payload = Just pa}
responseWith a pa = responseTemplate {action = a, payload = Just pa} responseWith a pa = responseTemplate {action = a, payload = Just pa}
encodeDecodeAndCheck msg = runParseASN1 parseMessage (encodeMessage msg) `shouldBe` pure msg encodeDecodeAndCheck msg = runParseASN1 parseMessage (encodeMessage msg) `shouldBe` pure msg
@ -252,20 +248,17 @@ spec = do
} }
it "messages are encoded and decoded correctly from and to ASN1" $ do it "messages are encoded and decoded correctly from and to ASN1" $ do
localNS <- exampleLocalNode encodeDecodeAndCheck $ requestWith QueryID qidReqPayload
encodeDecodeAndCheck $ requestWith localNS QueryID qidReqPayload encodeDecodeAndCheck $ requestWith Join jReqPayload
encodeDecodeAndCheck $ requestWith localNS Join jReqPayload encodeDecodeAndCheck $ requestWith Leave lReqPayload
encodeDecodeAndCheck $ requestWith localNS Leave lReqPayload encodeDecodeAndCheck $ requestWith Stabilise stabReqPayload
encodeDecodeAndCheck $ requestWith localNS Stabilise stabReqPayload encodeDecodeAndCheck $ requestWith Ping pingReqPayload
encodeDecodeAndCheck $ requestWith localNS Ping pingReqPayload
encodeDecodeAndCheck $ requestWith localNS QueryLoad qLoadReqPayload
encodeDecodeAndCheck $ responseWith QueryID qidResPayload1 encodeDecodeAndCheck $ responseWith QueryID qidResPayload1
encodeDecodeAndCheck $ responseWith QueryID qidResPayload2 encodeDecodeAndCheck $ responseWith QueryID qidResPayload2
encodeDecodeAndCheck $ responseWith Join jResPayload encodeDecodeAndCheck $ responseWith Join jResPayload
encodeDecodeAndCheck $ responseWith Leave lResPayload encodeDecodeAndCheck $ responseWith Leave lResPayload
encodeDecodeAndCheck $ responseWith Stabilise stabResPayload encodeDecodeAndCheck $ responseWith Stabilise stabResPayload
encodeDecodeAndCheck $ responseWith Ping pingResPayload encodeDecodeAndCheck $ responseWith Ping pingResPayload
encodeDecodeAndCheck $ responseWith QueryLoad qLoadResPayload
it "messages are encoded and decoded to ASN.1 DER properly" $ it "messages are encoded and decoded to ASN.1 DER properly" $
deserialiseMessage (fromJust $ Map.lookup 1 (serialiseMessage 652 $ responseWith Ping pingResPayload)) `shouldBe` Right (responseWith Ping pingResPayload) deserialiseMessage (fromJust $ Map.lookup 1 (serialiseMessage 652 $ responseWith Ping pingResPayload)) `shouldBe` Right (responseWith Ping pingResPayload)
it "messages too large for a single packet can (often) be split into multiple parts" $ do it "messages too large for a single packet can (often) be split into multiple parts" $ do
@ -304,13 +297,13 @@ exampleNodeState = RemoteNodeState {
exampleLocalNode :: IO (LocalNodeState MockService) exampleLocalNode :: IO (LocalNodeState MockService)
exampleLocalNode = do exampleLocalNode = do
realNodeSTM <- newTVarIO $ RealNode { realNode <- newTVarIO $ RealNode {
vservers = emptyRMap vservers = []
, nodeConfig = exampleFediConf , nodeConfig = exampleFediConf
, bootstrapNodes = confBootstrapNodes exampleFediConf , bootstrapNodes = confBootstrapNodes exampleFediConf
, nodeService = MockService , nodeService = MockService
} }
nodeStateInit realNodeSTM 0 nodeStateInit realNode
exampleFediConf :: FediChordConf exampleFediConf :: FediChordConf