Compare commits


38 Commits

Author SHA1 Message Date
Trolli Schmittlauch e79ba52e00 update ghc to 8.6.4, nixpkgs base to 20.09
- relaxes some version constraints as dirty update quickfix
- removes hie integration as that project is abandoned, todo: switch to
  haskell-languageserver instead
2021-01-01 14:30:33 +01:00
Trolli Schmittlauch 4aa4667a1d kChoices cost calculations for rebalance decisions
- loop with load queries and cost calculations on whether to do a vs
- actual relocation still missing though
- untested
2020-10-10 04:35:24 +02:00
Trolli Schmittlauch 6aebd982f8 make RingMap an instance of Traversable
- some examples tested out by hand, but not thorough test case or even
  QuickCheck coverage
2020-10-07 19:24:15 +02:00
Trolli Schmittlauch 048a6ce391 modularise VS candidate load querying into own function 2020-10-07 16:17:45 +02:00
Trolli Schmittlauch 8bd4e04bcd bootstrapQueryId doesn't need a STM'd node state 2020-10-07 15:50:44 +02:00
Trolli Schmittlauch 0cb4b6815c start implementing k-choices rebalancing, considering 1 VS each run
only loop implemented, rebalancing not implemented
2020-10-07 00:43:06 +02:00
Trolli Schmittlauch b111515178 add config option for k-choices rebalance interval 2020-10-06 16:01:29 +02:00
Trolli Schmittlauch ecb127e6af k-choices cost calculation for departure cost 2020-10-05 22:48:56 +02:00
Trolli Schmittlauch 5ed8a28fde refactor vservers map to RingMap to be able to index it
- in preparation for periodical rebalancing
- makes it possible to look up the next vserver for iterating through
  it, after refreshing the map in-between
- added some necessary RingMap functions
2020-10-05 02:27:02 +02:00
Trolli Schmittlauch bb0fb0919a refactor request sender ID spoof check to suit k-choices
- mostly refactored the checks into its own function
- now additionally check the vserver number limit
- refactoring to pass that limit to the checking function invocations
- closes #74
2020-09-29 02:59:42 +02:00
Trolli Schmittlauch b2b4fe3dd8 change vserver ID representation type to Word8
- performance improvement: avoid unnecessary representation and
  conversion from/to Integer
- part of hot path: with k-choices, all possible IDs are regularly
  generated and checked
- preparation for #74
2020-09-29 02:06:31 +02:00
Trolli Schmittlauch c208aeceaa rename `isJoined` to reflect its scope on a single VS
This should be enough to close #76, as it was only used in the scope of
a single LocalNodeState anyways.
2020-09-29 00:45:15 +02:00
Trolli Schmittlauch 0ee8f0dc43 adjust joinOnNewEntreisThread to k-choices join 2020-09-29 00:34:11 +02:00
Trolli Schmittlauch 21ecf9b041 bootstrapQueryID: try all possible node IDs of a bootstrap node
- closes #77
- when k-choices (#2) joining, try addressing each possible node ID of
  the bootstrap node until success
- bugfix: include correct target ID of node that shall respond in
  QueryID requests
2020-09-28 00:56:15 +02:00
Trolli Schmittlauch 9a61c186e3 start restructuring joinOnNewEntries flow
- overview comment on possible flow
- cache query
- doesn't compile yet
2020-09-26 22:08:09 +02:00
Trolli Schmittlauch 578cc362b9 fix tests 2020-09-25 22:33:40 +02:00
Trolli Schmittlauch 1a0de55b8c integrate k-choices into `tryBootstrapJoin` flow
part of #2
2020-09-25 22:07:12 +02:00
Trolli Schmittlauch 7a87d86c32 k-choices error handling: detect empty joins, finer fail granularity 2020-09-25 02:04:34 +02:00
Trolli Schmittlauch 3b6d129bfc implement k-choices join functions
large commit, contains a number of things:
- k-choices #2 cost calculation
- k-choices parameters
- adjusting ASN.1 network messages to contain all values required for
  cost calculation #71
- adjusting stats to contain required values
- k-choices node and vserver join functions
- placeholder/ dummy invocation of k-choices join
2020-09-25 01:41:04 +02:00
Trolli Schmittlauch 62da66aade add runtime flag for enabling k-choices or not
any value except "off" means on

contributes to #2
2020-09-22 23:12:30 +02:00
Trolli Schmittlauch 13c5b385b1 make inclusion of HIE overlay conditional as well 2020-09-21 22:14:33 +02:00
Trolli Schmittlauch 1ed0281417 respond to QueryLoad requests
closes #71
closes #72
contributes to #2
2020-09-21 18:15:40 +02:00
Trolli Schmittlauch 499c90e63a stylish run 2020-09-21 02:23:06 +02:00
Trolli Schmittlauch 1a7afed062 finish restructuring fediMainThreads
contributes to #34
2020-09-21 02:22:46 +02:00
Trolli Schmittlauch 8e8ea41dc4 re-structure convergenceSampleThread to work on a RealNode and iterate over all joined vservers
contributes to #34
2020-09-21 02:18:28 +02:00
Trolli Schmittlauch 33ae904d17 re-structure cacheVerifyThread to work on a RealNode and iterate over all joined vservers
contributes to #34
2020-09-21 02:11:43 +02:00
Trolli Schmittlauch 68de73d919 re-structure fediChordMessageHandler to dispatch requests to the responsible vserver
contributes to #34
2020-09-20 21:19:55 +02:00
Trolli Schmittlauch 0ab6ee9c8f re-strucuture fediChordInit flow to also do the bootstrapping 2020-09-20 19:30:35 +02:00
Trolli Schmittlauch 12dfc56a73 fediChordInit returns a RealNode, manages vservers as map
- contributes to #34
2020-09-19 23:01:55 +02:00
Trolli Schmittlauch 9bf7365a2c include target ID in request to address individual vserver
- necessary for dispatching the request to a certain vserver
- also refactored request sending to use a common `mkRequest`
- contributes to #2
2020-09-19 20:41:58 +02:00
Trolli Schmittlauch 5e745cd035 only specify upper key bound when querying load
As a querying node does not always know the lower bound of the queried
segment – determined by the predecessor – let the currently responsible
node provide that bound instead.

affects #71
2020-09-19 15:37:41 +02:00
Trolli Schmittlauch 30bf0529ed send load query request, parse result and represent it
- sending side of #71
- introduces SegmentLoadStats to hold the response data
- contributes to #2
2020-09-19 02:05:29 +02:00
Trolli Schmittlauch 576ea2c3f6 calculate service load rates, interface for querying loads
- define data type for load representation
- this representation can be queried from any Service (closes #72)
- loads are periodically calculated from measured rates (contributes to #2)
2020-09-18 20:36:46 +02:00
Trolli Schmittlauch 7dd7e96cce conversion of RingMap to key-value list 2020-09-18 20:36:35 +02:00
Trolli Schmittlauch a1cfbbac48 bump nixpkgs revision 2020-09-18 20:36:35 +02:00
Trolli Schmittlauch af27cded19 adjust payload parser naming for consistency and clarity 2020-09-18 20:36:35 +02:00
Trolli Schmittlauch 41aaa8ff70 parse ASN.1 representation of load querying
includes tests
contributes to #71
2020-09-18 20:36:35 +02:00
Trolli Schmittlauch ddea599022 extend ASN.1 schema for requesting load information
contributes to #71
2020-09-18 01:26:42 +02:00
13 changed files with 1103 additions and 435 deletions

View File

@ -6,20 +6,22 @@ Domain ::= VisibleString
Partnum ::= INTEGER (0..150)
Action ::= ENUMERATED {queryID, join, leave, stabilise, ping}
Action ::= ENUMERATED {queryID, join, leave, stabilise, ping, queryLoad}
Request ::= SEQUENCE {
action Action,
requestID INTEGER (0..4294967295), -- arbitrarily restricting to an unsigned 32bit integer
receiverID NodeID,
sender NodeState,
part Partnum, -- part number of this message, starts at 1
finalPart BOOLEAN, -- flag indicating this `part` to be the last of this reuest
actionPayload CHOICE {
queryIDRequestPayload QueryIDRequestPayload,
joinRequestPayload JoinRequestPayload,
leaveRequestPayload LeaveRequestPayload,
stabiliseRequestPayload StabiliseRequestPayload,
pingRequestPayload PingRequestPayload
leaveRequestPayload LeaveRequestPayload,
stabiliseRequestPayload StabiliseRequestPayload,
pingRequestPayload PingRequestPayload,
loadRequestPayload LoadRequestPayload
} OPTIONAL -- just for symmetry reasons with response, requests without a payload have no meaning
@ -34,11 +36,12 @@ Response ::= SEQUENCE {
finalPart BOOLEAN, -- flag indicating this `part` to be the last of this response
action Action,
actionPayload CHOICE {
queryIDResponsePayload QueryIDResponsePayload,
joinResponsePayload JoinResponsePayload,
queryIDResponsePayload QueryIDResponsePayload,
joinResponsePayload JoinResponsePayload,
leaveResponsePayload LeaveResponsePayload,
stabiliseResponsePayload StabiliseResponsePayload,
pingResponsePayload PingResponsePayload
pingResponsePayload PingResponsePayload,
loadResponsePayload LoadResponsePayload
} OPTIONAL -- no payload when just ACKing a previous request
@ -101,5 +104,15 @@ PingRequestPayload ::= NULL -- do not include a node/ vserver ID, so that
-- learning all active vserver IDs handled by the server at once
PingResponsePayload ::= SEQUENCE OF NodeState
LoadRequestPayload ::= SEQUENCE {
upperSegmentBound NodeID
LoadResponsePayload ::= SEQUENCE {
loadSum REAL,
remainingLoadTarget REAL,
totalCapacity REAL,
lowerBound NodeID

View File

@ -46,7 +46,7 @@ category: Network
common deps
build-depends: base ^>=, containers ^>=, bytestring, utf8-string ^>=, network ^>=, time ^>=, cmdargs ^>= 0.10, cryptonite ^>= 0.25, memory, async, stm, asn1-encoding, asn1-types, asn1-parse, publicsuffix, network-byte-order, safe, iproute, mtl, random, servant, servant-server, servant-client, warp, text, unordered-containers, hashable, unicode-transforms, http-client, http-types, unbounded-delays, dlist, formatting
build-depends: base >=4, containers ^>=, bytestring, utf8-string ^>=, network ^>=3.1, time, cmdargs ^>= 0.10, cryptonite, memory, async, stm, asn1-encoding, asn1-types, asn1-parse, publicsuffix, network-byte-order, safe, iproute, mtl, random, servant, servant-server, servant-client, warp, text, unordered-containers, hashable, unicode-transforms, http-client, http-types, unbounded-delays, dlist, formatting
ghc-options: -Wall -Wpartial-fields -O2

View File

@ -18,38 +18,20 @@ main = do
-- ToDo: parse and pass config
-- probably use `tomland` for that
(fConf, sConf) <- readConfig
-- TODO: first initialise 'RealNode', then the vservers
-- ToDo: load persisted caches, bootstrapping nodes …
(serverSock, thisNode) <- fediChordInit fConf (runService sConf :: DHT d => d -> IO (PostService d))
-- currently no masking is necessary, as there is nothing to clean up
nodeCacheWriterThread <- forkIO $ nodeCacheWriter thisNode
-- try joining the DHT using one of the provided bootstrapping nodes
joinedState <- tryBootstrapJoining thisNode
either (\err -> do
-- handle unsuccessful join
putStrLn $ err <> " Error joining, start listening for incoming requests anyways"
print =<< readTVarIO thisNode
-- launch thread attempting to join on new cache entries
_ <- forkIO $ joinOnNewEntriesThread thisNode
wait =<< async (fediMainThreads serverSock thisNode)
(\joinedNS -> do
-- launch main eventloop with successfully joined state
putStrLn "successful join"
wait =<< async (fediMainThreads serverSock thisNode)
pure ()
(fediThreads, thisNode) <- fediChordInit fConf (runService sConf :: DHT d => d -> IO (PostService d))
-- wait for all DHT threads to terminate, this keeps the main thread running
wait fediThreads
readConfig :: IO (FediChordConf, ServiceConf)
readConfig = do
confDomainString : ipString : portString : servicePortString : speedupString : remainingArgs <- getArgs
confDomainString : ipString : portString : servicePortString : speedupString : loadBalancingEnabled : remainingArgs <- getArgs
-- allow starting the initial node without bootstrapping info to avoid
-- waiting for timeout
speedup = read speedupString
statsEvalD = 120 * 10^6 `div` speedup
confBootstrapNodes' = case remainingArgs of
bootstrapHost : bootstrapPortString : _ ->
[(bootstrapHost, read bootstrapPortString)]
@ -67,6 +49,11 @@ readConfig = do
, confResponsePurgeAge = 60 / fromIntegral speedup
, confRequestTimeout = 5 * 10^6 `div` speedup
, confRequestRetries = 3
, confEnableKChoices = loadBalancingEnabled /= "off"
, confKChoicesOverload = 0.9
, confKChoicesUnderload = 0.1
, confKChoicesMaxVS = 8
, confKChoicesRebalanceInterval = round (realToFrac statsEvalD * 1.1 :: Double)
sConf = ServiceConf
{ confSubscriptionExpiryTime = 24*3600 / fromIntegral speedup
@ -74,7 +61,7 @@ readConfig = do
, confServiceHost = confDomainString
, confLogfilePath = "../simulationData/logs/" <> confDomainString <> ".log"
, confSpeedupFactor = speedup
, confStatsEvalDelay = 120 * 10^6 `div` speedup
, confStatsEvalDelay = statsEvalD
pure (fConf, sConf)

View File

@ -1,26 +1,18 @@
compiler ? "ghc865",
withHIE ? false
compiler ? "ghc884"
# pin all-hies for getting the language server
all-hies = fetchTarball {
url = "";
sha256 = "sha256:0br6wsqpfk1lzz90f7zw439w1ir2p54268qilw9l2pk6yz7ganfx";
pkgs = import (
builtins.fetchGit {
name = "nixpkgs-pinned";
url =;
ref = "refs/heads/release-20.03";
rev = "de3780b937d2984f9b5e20d191f23be4f857b3aa";
ref = "refs/heads/release-20.09";
rev = "e065200fc90175a8f6e50e76ef10a48786126e1c";
}) {
# Pass no config for purity
config = {};
overlays = [
(import all-hies {}).overlay
overlays = [];
hp = pkgs.haskell.packages."${compiler}";
src = pkgs.nix-gitignore.gitignoreSource [] ./.;
@ -38,7 +30,6 @@ in
++ (if withHIE then [ hie ] else []);

View File

@ -1 +1 @@
(import ./default.nix {withHIE = true;}).shell
(import ./default.nix {}).shell

View File

@ -184,6 +184,19 @@ encodePayload payload'@PingResponsePayload{} =
Start Sequence
: concatMap encodeNodeState (pingNodeStates payload')
<> [End Sequence]
encodePayload payload'@LoadRequestPayload{} =
[ Start Sequence
, IntVal . getNodeID $ loadSegmentUpperBound payload'
, End Sequence
encodePayload payload'@LoadResponsePayload{} =
[ Start Sequence
, Real $ loadSum payload'
, Real $ loadRemainingTarget payload'
, Real $ loadTotalCapacity payload'
, IntVal . getNodeID $ loadSegmentLowerBound payload'
, End Sequence
encodeNodeState :: NodeState a => a -> [ASN1]
encodeNodeState ns = [
@ -193,7 +206,7 @@ encodeNodeState ns = [
, OctetString (ipAddrAsBS $ getIpAddr ns)
, IntVal (toInteger . getDhtPort $ ns)
, IntVal (toInteger . getServicePort $ ns)
, IntVal (getVServerID ns)
, IntVal (toInteger $ getVServerID ns)
, End Sequence
@ -215,10 +228,11 @@ encodeQueryResult FORWARD{} = Enumerated 1
encodeMessage :: FediChordMessage -- ^ the 'FediChordMessage to be encoded
-> [ASN1]
(Request requestID sender part isFinalPart action requestPayload) =
(Request requestID receiverID sender part isFinalPart action requestPayload) =
Start Sequence
: (Enumerated . fromIntegral . fromEnum $ action)
: IntVal requestID
: (IntVal . getNodeID $ receiverID)
: encodeNodeState sender
<> [IntVal part
, Boolean isFinalPart]
@ -262,18 +276,20 @@ parseMessage = do
parseRequest :: Action -> ParseASN1 FediChordMessage
parseRequest action = do
requestID <- parseInteger
receiverID' <- fromInteger <$> parseInteger
sender <- parseNodeState
part <- parseInteger
isFinalPart <- parseBool
hasPayload <- hasNext
payload <- if not hasPayload then pure Nothing else Just <$> case action of
QueryID -> parseQueryIDRequest
Join -> parseJoinRequest
Leave -> parseLeaveRequest
Stabilise -> parseStabiliseRequest
Ping -> parsePingRequest
QueryID -> parseQueryIDRequestPayload
Join -> parseJoinRequestPayload
Leave -> parseLeaveRequestPayload
Stabilise -> parseStabiliseRequestPayload
Ping -> parsePingRequestPayload
QueryLoad -> parseLoadRequestPayload
pure $ Request requestID sender part isFinalPart action payload
pure $ Request requestID receiverID' sender part isFinalPart action payload
parseResponse :: Integer -> ParseASN1 FediChordMessage
parseResponse requestID = do
@ -283,11 +299,12 @@ parseResponse requestID = do
action <- parseEnum :: ParseASN1 Action
hasPayload <- hasNext
payload <- if not hasPayload then pure Nothing else Just <$> case action of
QueryID -> parseQueryIDResponse
Join -> parseJoinResponse
Leave -> parseLeaveResponse
Stabilise -> parseStabiliseResponse
Ping -> parsePingResponse
QueryID -> parseQueryIDResponsePayload
Join -> parseJoinResponsePayload
Leave -> parseLeaveResponsePayload
Stabilise -> parseStabiliseResponsePayload
Ping -> parsePingResponsePayload
QueryLoad -> parseLoadResponsePayload
pure $ Response requestID senderID part isFinalPart action payload
@ -305,6 +322,13 @@ parseInteger = do
IntVal parsed -> pure parsed
x -> throwParseError $ "Expected IntVal but got " <> show x
parseReal :: ParseASN1 Double
parseReal = do
i <- getNext
case i of
Real parsed -> pure parsed
x -> throwParseError $ "Expected Real but got " <> show x
parseEnum :: Enum a => ParseASN1 a
parseEnum = do
e <- getNext
@ -346,7 +370,7 @@ parseNodeState = onNextContainer Sequence $ do
, domain = domain'
, dhtPort = dhtPort'
, servicePort = servicePort'
, vServerID = vServer'
, vServerID = fromInteger vServer'
, ipAddr = ip'
@ -360,13 +384,13 @@ parseCacheEntry = onNextContainer Sequence $ do
parseNodeCache :: ParseASN1 [RemoteCacheEntry]
parseNodeCache = onNextContainer Sequence $ getMany parseCacheEntry
parseJoinRequest :: ParseASN1 ActionPayload
parseJoinRequest = do
parseJoinRequestPayload :: ParseASN1 ActionPayload
parseJoinRequestPayload = do
pure JoinRequestPayload
parseJoinResponse :: ParseASN1 ActionPayload
parseJoinResponse = onNextContainer Sequence $ do
parseJoinResponsePayload :: ParseASN1 ActionPayload
parseJoinResponsePayload = onNextContainer Sequence $ do
succ' <- onNextContainer Sequence (getMany parseNodeState)
pred' <- onNextContainer Sequence (getMany parseNodeState)
cache <- parseNodeCache
@ -376,8 +400,8 @@ parseJoinResponse = onNextContainer Sequence $ do
, joinCache = cache
parseQueryIDRequest :: ParseASN1 ActionPayload
parseQueryIDRequest = onNextContainer Sequence $ do
parseQueryIDRequestPayload :: ParseASN1 ActionPayload
parseQueryIDRequestPayload = onNextContainer Sequence $ do
targetID <- fromInteger <$> parseInteger
lBestNodes <- parseInteger
pure $ QueryIDRequestPayload {
@ -385,8 +409,8 @@ parseQueryIDRequest = onNextContainer Sequence $ do
, queryLBestNodes = lBestNodes
parseQueryIDResponse :: ParseASN1 ActionPayload
parseQueryIDResponse = onNextContainer Sequence $ do
parseQueryIDResponsePayload :: ParseASN1 ActionPayload
parseQueryIDResponsePayload = onNextContainer Sequence $ do
Enumerated resultType <- getNext
result <- case resultType of
0 -> FOUND <$> parseNodeState
@ -396,13 +420,13 @@ parseQueryIDResponse = onNextContainer Sequence $ do
queryResult = result
parseStabiliseRequest :: ParseASN1 ActionPayload
parseStabiliseRequest = do
parseStabiliseRequestPayload :: ParseASN1 ActionPayload
parseStabiliseRequestPayload = do
pure StabiliseRequestPayload
parseStabiliseResponse :: ParseASN1 ActionPayload
parseStabiliseResponse = onNextContainer Sequence $ do
parseStabiliseResponsePayload :: ParseASN1 ActionPayload
parseStabiliseResponsePayload = onNextContainer Sequence $ do
succ' <- onNextContainer Sequence (getMany parseNodeState)
pred' <- onNextContainer Sequence (getMany parseNodeState)
pure $ StabiliseResponsePayload {
@ -410,8 +434,8 @@ parseStabiliseResponse = onNextContainer Sequence $ do
, stabilisePredecessors = pred'
parseLeaveRequest :: ParseASN1 ActionPayload
parseLeaveRequest = onNextContainer Sequence $ do
parseLeaveRequestPayload :: ParseASN1 ActionPayload
parseLeaveRequestPayload = onNextContainer Sequence $ do
succ' <- onNextContainer Sequence (getMany parseNodeState)
pred' <- onNextContainer Sequence (getMany parseNodeState)
doMigration <- parseBool
@ -421,19 +445,40 @@ parseLeaveRequest = onNextContainer Sequence $ do
, leaveDoMigration = doMigration
parseLeaveResponse :: ParseASN1 ActionPayload
parseLeaveResponse = do
parseLeaveResponsePayload :: ParseASN1 ActionPayload
parseLeaveResponsePayload = do
pure LeaveResponsePayload
parsePingRequest :: ParseASN1 ActionPayload
parsePingRequest = do
parsePingRequestPayload :: ParseASN1 ActionPayload
parsePingRequestPayload = do
pure PingRequestPayload
parsePingResponse :: ParseASN1 ActionPayload
parsePingResponse = onNextContainer Sequence $ do
parsePingResponsePayload :: ParseASN1 ActionPayload
parsePingResponsePayload = onNextContainer Sequence $ do
handledNodes <- getMany parseNodeState
pure $ PingResponsePayload {
pingNodeStates = handledNodes
parseLoadRequestPayload :: ParseASN1 ActionPayload
parseLoadRequestPayload = onNextContainer Sequence $ do
loadUpperBound' <- fromInteger <$> parseInteger
pure LoadRequestPayload
{ loadSegmentUpperBound = loadUpperBound'
parseLoadResponsePayload :: ParseASN1 ActionPayload
parseLoadResponsePayload = onNextContainer Sequence $ do
loadSum' <- parseReal
loadRemainingTarget' <- parseReal
loadTotalCapacity' <- parseReal
loadSegmentLowerBound' <- fromInteger <$> parseInteger
pure LoadResponsePayload
{ loadSum = loadSum'
, loadRemainingTarget = loadRemainingTarget'
, loadTotalCapacity = loadTotalCapacity'
, loadSegmentLowerBound = loadSegmentLowerBound'

View File

@ -15,6 +15,7 @@ module Hash2Pub.DHTProtocol
, Action(..)
, ActionPayload(..)
, FediChordMessage(..)
, mkRequest
, maximumParts
, sendQueryIdMessages
, requestQueryID
@ -22,6 +23,7 @@ module Hash2Pub.DHTProtocol
, requestLeave
, requestPing
, requestStabilise
, requestQueryLoad
, lookupMessage
, sendRequestTo
, queryIdLookupLoop
@ -36,7 +38,7 @@ module Hash2Pub.DHTProtocol
, isPossibleSuccessor
, isPossiblePredecessor
, isInOwnResponsibilitySlice
, isJoined
, vsIsJoined
, closestCachePredecessors
@ -49,7 +51,8 @@ import Control.Concurrent.STM.TQueue
import Control.Concurrent.STM.TVar
import Control.Exception
import Control.Monad (foldM, forM, forM_, void, when)
import Control.Monad.Except (MonadError (..), runExceptT)
import Control.Monad.Except (MonadError (..), liftEither,
import Control.Monad.IO.Class (MonadIO (..))
import qualified Data.ByteString as BS
import Data.Either (rights)
@ -63,6 +66,7 @@ import Data.Maybe (fromJust, fromMaybe, isJust,
isNothing, mapMaybe, maybe)
import qualified Data.Set as Set
import Data.Time.Clock.POSIX
import Data.Word (Word8)
import Network.Socket hiding (recv, recvFrom, send,
import Network.Socket.ByteString
@ -74,23 +78,27 @@ import Hash2Pub.ASN1Coding
import Hash2Pub.FediChordTypes (CacheEntry (..),
CacheEntry (..),
FediChordConf (..),
HasKeyID (..),
HasKeyID (..), LoadStats (..),
LocalNodeState (..),
LocalNodeStateSTM, NodeCache,
NodeID, NodeState (..),
RealNode (..), RealNodeSTM,
RemoteNodeState (..),
RingEntry (..), RingMap (..),
SegmentLoadStats (..),
Service (..), addRMapEntry,
cacheLookup, cacheLookupPred,
cacheLookupSucc, genNodeID,
getKeyID, localCompare,
getKeyID, hasValidNodeId,
loadSliceSum, localCompare,
rMapFromList, rMapLookupPred,
setPredecessors, setSuccessors)
import Hash2Pub.ProtocolTypes
import Hash2Pub.RingMap
import Debug.Trace (trace)
@ -103,7 +111,7 @@ queryLocalCache ownState nCache lBestNodes targetID
-- as target ID falls between own ID and first predecessor, it is handled by this node
-- This only makes sense if the node is part of the DHT by having joined.
-- A default answer to nodes querying an unjoined node is provided by 'respondQueryID'.
| isJoined ownState && targetID `isInOwnResponsibilitySlice` ownState = FOUND . toRemoteNodeState $ ownState
| vsIsJoined ownState && targetID `isInOwnResponsibilitySlice` ownState = FOUND . toRemoteNodeState $ ownState
-- my interpretation: the "l best next hops" are the l-1 closest preceding nodes and
-- the closest succeeding node (like with the p initiated parallel queries
| otherwise = FORWARD $ closestSuccessor `Set.union` closestCachePredecessors (lBestNodes-1) targetID nCache
@ -227,8 +235,8 @@ markCacheEntryAsVerified timestamp nid = RingMap . Map.adjust adjustFunc nid . g
-- | uses the successor and predecessor list of a node as an indicator for whether a
-- node has properly joined the DHT
isJoined :: LocalNodeState s -> Bool
isJoined ns = not . all null $ [successors ns, predecessors ns]
vsIsJoined :: LocalNodeState s -> Bool
vsIsJoined ns = not . all null $ [successors ns, predecessors ns]
-- | the size limit to be used when serialising messages for sending
sendMessageSize :: Num i => i
@ -237,27 +245,37 @@ sendMessageSize = 1200
-- ====== message send and receive operations ======
-- encode the response to a request that just signals successful receipt
ackRequest :: NodeID -> FediChordMessage -> Map.Map Integer BS.ByteString
ackRequest ownID req@Request{} = serialiseMessage sendMessageSize $ Response {
ackRequest :: FediChordMessage -> Map.Map Integer BS.ByteString
ackRequest req@Request{} = serialiseMessage sendMessageSize $ Response {
requestID = requestID req
, senderID = ownID
, senderID = receiverID req
, part = part req
, isFinalPart = False
, action = action req
, payload = Nothing
ackRequest _ _ = Map.empty
ackRequest _ = Map.empty
-- | extract the first payload from a received message set
extractFirstPayload :: Set.Set FediChordMessage -> Maybe ActionPayload
extractFirstPayload msgSet = foldr' (\msg plAcc ->
if isNothing plAcc && isJust (payload msg)
then payload msg
else plAcc
) Nothing msgSet
-- | Dispatch incoming requests to the dedicated handling and response function, and enqueue
-- the response to be sent.
handleIncomingRequest :: Service s (RealNodeSTM s)
=> LocalNodeStateSTM s -- ^ the handling node
=> Word8 -- ^ maximum number of vservers, because of decision to @dropSpoofedIDs@ in here and not already in @fediMessageHandler@
-> LocalNodeStateSTM s -- ^ the handling node
-> TQueue (BS.ByteString, SockAddr) -- ^ send queue
-> Set.Set FediChordMessage -- ^ all parts of the request to handle
-> SockAddr -- ^ source address of the request
-> IO ()
handleIncomingRequest nsSTM sendQ msgSet sourceAddr = do
handleIncomingRequest vsLimit nsSTM sendQ msgSet sourceAddr = do
ns <- readTVarIO nsSTM
-- add nodestate to cache
now <- getPOSIXTime
@ -265,19 +283,20 @@ handleIncomingRequest nsSTM sendQ msgSet sourceAddr = do
Nothing -> pure ()
Just aPart -> do
let (SockAddrInet6 _ _ sourceIP _) = sourceAddr
queueAddEntries (Identity $ RemoteCacheEntry (sender aPart) now) ns
queueAddEntries (Identity $ RemoteCacheEntry (sender aPart) now) (cacheWriteQueue ns)
-- distinguish on whether and how to respond. If responding, pass message to response generating function and write responses to send queue
maybe (pure ()) (
mapM_ (\resp -> atomically $ writeTQueue sendQ (resp, sourceAddr))
=<< (case action aPart of
Ping -> Just <$> respondPing nsSTM msgSet
Join -> dropSpoofedIDs sourceIP nsSTM msgSet respondJoin
Join -> dropSpoofedIDs vsLimit sourceIP nsSTM msgSet respondJoin
-- ToDo: figure out what happens if not joined
QueryID -> Just <$> respondQueryID nsSTM msgSet
-- only when joined
Leave -> if isJoined ns then dropSpoofedIDs sourceIP nsSTM msgSet respondLeave else pure Nothing
Stabilise -> if isJoined ns then dropSpoofedIDs sourceIP nsSTM msgSet respondStabilise else pure Nothing
Leave -> if vsIsJoined ns then dropSpoofedIDs vsLimit sourceIP nsSTM msgSet respondLeave else pure Nothing
Stabilise -> if vsIsJoined ns then dropSpoofedIDs vsLimit sourceIP nsSTM msgSet respondStabilise else pure Nothing
QueryLoad -> if vsIsJoined ns then Just <$> respondQueryLoad nsSTM msgSet else pure Nothing
-- for single part request, response starts with part number 1. For multipart requests, response starts with part number n+1.
@ -287,19 +306,18 @@ handleIncomingRequest nsSTM sendQ msgSet sourceAddr = do
-- | Filter out requests with spoofed node IDs by recomputing the ID using
-- the sender IP.
-- For valid (non-spoofed) sender IDs, the passed responder function is invoked.
dropSpoofedIDs :: HostAddress6 -- msg source address
dropSpoofedIDs :: Word8 -- ^ maximum number of vservers per node
-> HostAddress6 -- ^ msg source address
-> LocalNodeStateSTM s
-> Set.Set FediChordMessage -- message parts of the request
-> (LocalNodeStateSTM s -> Set.Set FediChordMessage -> IO (Map.Map Integer BS.ByteString)) -- reponder function to be invoked for valid requests
-> Set.Set FediChordMessage -- ^ message parts of the request
-> (LocalNodeStateSTM s -> Set.Set FediChordMessage -> IO (Map.Map Integer BS.ByteString)) -- ^ reponder function to be invoked for valid requests
-> IO (Maybe (Map.Map Integer BS.ByteString))
dropSpoofedIDs addr nsSTM' msgSet' responder =
dropSpoofedIDs limVs addr nsSTM' msgSet' responder =
aRequestPart = Set.elemAt 0 msgSet
senderNs = sender aRequestPart
givenSenderID = getNid senderNs
recomputedID = genNodeID addr (getDomain senderNs) (fromInteger $ getVServerID senderNs)
if recomputedID == givenSenderID
if hasValidNodeId limVs senderNs addr
then Just <$> responder nsSTM' msgSet'
else pure Nothing
@ -317,19 +335,15 @@ respondQueryID nsSTM msgSet = do
aRequestPart = Set.elemAt 0 msgSet
senderID = getNid . sender $ aRequestPart
senderPayload = foldr' (\msg plAcc ->
if isNothing plAcc && isJust (payload msg)
then payload msg
else plAcc
) Nothing msgSet
-- return only empty message serialisation if no payload was included in parts
senderPayload = extractFirstPayload msgSet
-- return only empty message serialisation if no payload was included in parts
maybe (pure Map.empty) (\senderPayload' -> do
responseMsg <- atomically $ do
nsSnap <- readTVar nsSTM
cache <- readTVar $ nodeCacheSTM nsSnap
responsePayload = QueryIDResponsePayload {
queryResult = if isJoined nsSnap
queryResult = if vsIsJoined nsSnap
then queryLocalCache nsSnap cache (fromIntegral $ queryLBestNodes senderPayload') (queryTargetID senderPayload')
-- if not joined yet, attract responsibility for
-- all keys to make bootstrapping possible
@ -422,6 +436,47 @@ respondPing nsSTM msgSet = do
pure $ serialiseMessage sendMessageSize pingResponse
respondQueryLoad :: Service s (RealNodeSTM s) => LocalNodeStateSTM s -> Set.Set FediChordMessage -> IO (Map.Map Integer BS.ByteString)
respondQueryLoad nsSTM msgSet = do
nsSnap <- readTVarIO nsSTM
-- this message cannot be split reasonably, so just
-- consider the first payload
aRequestPart = Set.elemAt 0 msgSet
senderPayload = extractFirstPayload msgSet
responsePl <- maybe (pure Nothing) (\pl ->
case pl of
LoadRequestPayload{} -> do
parentNode <- readTVarIO (parentRealNode nsSnap)
serv = nodeService parentNode
conf = nodeConfig parentNode
lStats <- getServiceLoadStats serv
directSucc = getNid . head . predecessors $ nsSnap
handledTagSum = loadSliceSum lStats directSucc (loadSegmentUpperBound pl)
pure $ Just LoadResponsePayload
{ loadSum = handledTagSum
, loadRemainingTarget = remainingLoadTarget conf lStats
, loadTotalCapacity = totalCapacity lStats
, loadSegmentLowerBound = directSucc
_ -> pure Nothing
pure $ maybe
(\pl -> serialiseMessage sendMessageSize $ Response
{ requestID = requestID aRequestPart
, senderID = getNid nsSnap
, part = if Set.size msgSet == 1 then 1 else fromIntegral $ Set.size msgSet + 1
, isFinalPart = False
, action = QueryLoad
, payload = Just pl
) responsePl
respondJoin :: Service s (RealNodeSTM s) => LocalNodeStateSTM s -> Set.Set FediChordMessage -> IO (Map.Map Integer BS.ByteString)
respondJoin nsSTM msgSet = do
@ -434,7 +489,7 @@ respondJoin nsSTM msgSet = do
senderNS = sender aRequestPart
-- if not joined yet, attract responsibility for
-- all keys to make bootstrapping possible
responsibilityLookup = if isJoined nsSnap then queryLocalCache nsSnap cache 1 (getNid senderNS) else FOUND (toRemoteNodeState nsSnap)
responsibilityLookup = if vsIsJoined nsSnap then queryLocalCache nsSnap cache 1 (getNid senderNS) else FOUND (toRemoteNodeState nsSnap)
thisNodeResponsible (FOUND _) = True
thisNodeResponsible (FORWARD _) = False
-- check whether the joining node falls into our responsibility
@ -481,6 +536,21 @@ respondJoin nsSTM msgSet = do
-- ....... request sending .......
-- | defautl constructor for request messages, fills standard values like
-- part number to avoid code repition
mkRequest :: LocalNodeState s -> NodeID -> Action -> Maybe ActionPayload -> (Integer -> FediChordMessage)
mkRequest ns targetID action pl rid = Request
{ requestID = rid
, receiverID = targetID
, sender = toRemoteNodeState ns
-- part number and final flag can be changed by ASN1 encoder to make packet
-- fit the MTU restrictions
, part = 1
, isFinalPart = True
, action = action
, payload = pl
-- | send a join request and return the joined 'LocalNodeState' including neighbours
requestJoin :: (NodeState a, Service s (RealNodeSTM s)) => a -- ^ currently responsible node to be contacted
-> LocalNodeStateSTM s -- ^ joining NodeState
@ -492,7 +562,7 @@ requestJoin toJoinOn ownStateSTM = do
let srcAddr = confIP nodeConf
bracket (mkSendSocket srcAddr (getDomain toJoinOn) (getDhtPort toJoinOn)) close (\sock -> do
-- extract own state for getting request information
responses <- sendRequestTo (confRequestTimeout nodeConf) (confRequestRetries nodeConf) (\rid -> Request rid (toRemoteNodeState ownState) 1 True Join (Just JoinRequestPayload)) sock
responses <- sendRequestTo (confRequestTimeout nodeConf) (confRequestRetries nodeConf) (mkRequest ownState (getNid toJoinOn) Join (Just JoinRequestPayload)) sock
(cacheInsertQ, joinedState) <- atomically $ do
stateSnap <- readTVar ownStateSTM
@ -523,7 +593,7 @@ requestJoin toJoinOn ownStateSTM = do
writeTVar ownStateSTM newState
pure (cacheInsertQ, newState)
-- execute the cache insertions
mapM_ (\f -> f joinedState) cacheInsertQ
mapM_ (\f -> f (cacheWriteQueue joinedState)) cacheInsertQ
if responses == Set.empty
then pure . Left $ "join error: got no response from " <> show (getNid toJoinOn)
else do
@ -581,14 +651,14 @@ sendQueryIdMessages :: (Integral i)
-> Maybe i -- ^ optionally provide an explicit @l@ parameter of number of nodes to be returned
-> [RemoteNodeState] -- ^ nodes to query
-> IO QueryResponse -- ^ accumulated response
sendQueryIdMessages targetID ns lParam targets = do
sendQueryIdMessages lookupID ns lParam targets = do
-- create connected sockets to all query targets and use them for request handling
nodeConf <- nodeConfig <$> readTVarIO (parentRealNode ns)
let srcAddr = confIP nodeConf
queryThreads <- mapM (\resultNode -> async $ bracket (mkSendSocket srcAddr (getDomain resultNode) (getDhtPort resultNode)) close (
sendRequestTo (confRequestTimeout nodeConf) (confRequestRetries nodeConf) (lookupMessage targetID ns Nothing)
sendRequestTo (confRequestTimeout nodeConf) (confRequestRetries nodeConf) (lookupMessage lookupID ns Nothing (getNid resultNode))
)) targets
-- ToDo: process results immediately instead of waiting for the last one to finish, see
-- ToDo: exception handling, maybe log them
@ -605,7 +675,7 @@ sendQueryIdMessages targetID ns lParam targets = do
_ -> Set.empty
-- forward entries to global cache
queueAddEntries entrySet ns
queueAddEntries entrySet (cacheWriteQueue ns)
-- return accumulated QueryResult
pure $ case acc of
-- once a FOUND as been encountered, return this as a result
@ -621,13 +691,14 @@ sendQueryIdMessages targetID ns lParam targets = do
-- | Create a QueryID message to be supplied to 'sendRequestTo'
lookupMessage :: Integral i
=> NodeID -- ^ target ID
=> NodeID -- ^ lookup ID to be looked up
-> LocalNodeState s -- ^ sender node state
-> Maybe i -- ^ optionally provide a different l parameter
-> NodeID -- ^ target ID of message destination
-> (Integer -> FediChordMessage)
lookupMessage targetID ns lParam = \rID -> Request rID (toRemoteNodeState ns) 1 True QueryID (Just $ pl ns targetID)
lookupMessage lookupID ns lParam targetID = mkRequest ns targetID QueryID (Just $ pl ns lookupID)
pl ns' targetID' = QueryIDRequestPayload { queryTargetID = targetID', queryLBestNodes = maybe (fromIntegral $ lNumBestNodes ns) fromIntegral lParam }
pl ns' lookupID' = QueryIDRequestPayload { queryTargetID = lookupID', queryLBestNodes = maybe (fromIntegral $ lNumBestNodes ns') fromIntegral lParam }
-- | Send a stabilise request to provided 'RemoteNode' and, if successful,
@ -638,16 +709,7 @@ requestStabilise :: LocalNodeState s -- ^ sending node
requestStabilise ns neighbour = do
nodeConf <- nodeConfig <$> readTVarIO (parentRealNode ns)
let srcAddr = confIP nodeConf
responses <- bracket (mkSendSocket srcAddr (getDomain neighbour) (getDhtPort neighbour)) close (fmap Right . sendRequestTo (confRequestTimeout nodeConf) (confRequestRetries nodeConf) (\rid ->
Request {
requestID = rid
, sender = toRemoteNodeState ns
, part = 1
, isFinalPart = False
, action = Stabilise
, payload = Just StabiliseRequestPayload
responses <- bracket (mkSendSocket srcAddr (getDomain neighbour) (getDhtPort neighbour)) close (fmap Right . sendRequestTo (confRequestTimeout nodeConf) (confRequestRetries nodeConf) (mkRequest ns (getNid neighbour) Stabilise (Just StabiliseRequestPayload))
) `catch` (\e -> pure . Left $ displayException (e :: IOException))
-- forward IO error messages
@ -660,7 +722,7 @@ requestStabilise ns neighbour = do
([],[]) respSet
-- update successfully responded neighbour in cache
maybe (pure ()) (\p -> queueUpdateVerifieds (Identity $ senderID p) ns) $ headMay (Set.elems respSet)
maybe (pure ()) (\p -> queueUpdateVerifieds (Identity $ senderID p) (cacheWriteQueue ns)) $ headMay (Set.elems respSet)
pure $ if null responsePreds && null responseSuccs
then Left "no neighbours returned"
else Right (responsePreds, responseSuccs)
@ -682,17 +744,12 @@ requestLeave ns doMigration target = do
, leavePredecessors = predecessors ns
, leaveDoMigration = doMigration
responses <- bracket (mkSendSocket srcAddr (getDomain target) (getDhtPort target)) close (fmap Right . sendRequestTo (confRequestTimeout nodeConf) (confRequestRetries nodeConf) (\rid ->
Request {
requestID = rid
, sender = toRemoteNodeState ns
, part = 1
, isFinalPart = False
, action = Leave
, payload = Just leavePayload
) `catch` (\e -> pure . Left $ displayException (e :: IOException))
responses <- bracket
(mkSendSocket srcAddr (getDomain target) (getDhtPort target))
(fmap Right
. sendRequestTo (confRequestTimeout nodeConf) (confRequestRetries nodeConf) (mkRequest ns (getNid target) Leave (Just leavePayload))
) `catch` (\e -> pure . Left $ displayException (e :: IOException))
-- forward IO error messages
(pure . Left)
@ -708,16 +765,7 @@ requestPing ns target = do
let srcAddr = confIP nodeConf
responses <- bracket (mkSendSocket srcAddr (getDomain target) (getDhtPort target)) close
(\sock -> do
resp <- sendRequestTo (confRequestTimeout nodeConf) (confRequestRetries nodeConf) (\rid ->
Request {
requestID = rid
, sender = toRemoteNodeState ns
, part = 1
, isFinalPart = False
, action = Ping
, payload = Just PingRequestPayload
) sock
resp <- sendRequestTo (confRequestTimeout nodeConf) (confRequestRetries nodeConf) (mkRequest ns (getNid target) Ping (Just PingRequestPayload)) sock
(SockAddrInet6 _ _ peerAddr _) <- getPeerName sock
pure $ Right (peerAddr, resp)
) `catch` (\e -> pure . Left $ displayException (e :: IOException))
@ -733,10 +781,9 @@ requestPing ns target = do
-- recompute ID for each received node and mark as verified in cache
now <- getPOSIXTime
forM_ responseVss (\vs ->
let recomputedID = genNodeID peerAddr (getDomain vs) (fromInteger $ getVServerID vs)
in if recomputedID == getNid vs
then atomically $ writeTQueue (cacheWriteQueue ns) $ addNodeAsVerifiedPure now vs
else pure ()
if hasValidNodeId (confKChoicesMaxVS nodeConf) vs peerAddr
then atomically $ writeTQueue (cacheWriteQueue ns) $ addNodeAsVerifiedPure now vs
else pure ()
pure $ if null responseVss
then Left "no active vServer IDs returned, ignoring node"
@ -744,6 +791,37 @@ requestPing ns target = do
) responses
-- still need a particular vserver as LocalNodeState, because requests need a sender
requestQueryLoad :: (MonadError String m, MonadIO m)
=> LocalNodeState s -- ^ the local source vserver for the request
-> NodeID -- ^ upper bound of the segment queried, lower bound is set automatically by the queried node
-> RemoteNodeState -- ^ target node to query
-> m SegmentLoadStats
requestQueryLoad ns upperIdBound target = do
nodeConf <- nodeConfig <$> liftIO (readTVarIO $ parentRealNode ns)
srcAddr = confIP nodeConf
loadReqPl = LoadRequestPayload
{ loadSegmentUpperBound = upperIdBound
responses <- liftIO $ bracket (mkSendSocket srcAddr (getDomain target) (getDhtPort target)) close
(fmap Right . sendRequestTo (confRequestTimeout nodeConf) (confRequestRetries nodeConf) (mkRequest ns (getNid target) QueryLoad (Just loadReqPl))
) `catch` (\e -> pure . Left $ displayException (e :: IOException))
responseMsgSet <- liftEither responses
-- throws an error if an exception happened
loadResPl <- maybe (throwError "no load response payload found") pure
(extractFirstPayload responseMsgSet)
pure SegmentLoadStats
{ segmentLowerKeyBound = loadSegmentLowerBound loadResPl
, segmentUpperKeyBound = upperIdBound
, segmentLoad = loadSum loadResPl
, segmentOwnerRemainingLoadTarget = loadRemainingTarget loadResPl
, segmentOwnerCapacity = loadTotalCapacity loadResPl
, segmentCurrentOwner = target
-- | Generic function for sending a request over a connected socket and collecting the response.
-- Serialises the message and tries to deliver its parts for a number of attempts within a specified timeout.
sendRequestTo :: Int -- ^ timeout in milliseconds
@ -800,24 +878,24 @@ sendRequestTo timeoutMillis numAttempts msgIncomplete sock = do
-- | enqueue a list of RemoteCacheEntries to be added to the global NodeCache
queueAddEntries :: Foldable c => c RemoteCacheEntry
-> LocalNodeState s
-> TQueue (NodeCache -> NodeCache)
-> IO ()
queueAddEntries entries ns = do
queueAddEntries entries cacheQ = do
now <- getPOSIXTime
forM_ entries $ \entry -> atomically $ writeTQueue (cacheWriteQueue ns) $ addCacheEntryPure now entry
forM_ entries $ \entry -> atomically $ writeTQueue cacheQ $ addCacheEntryPure now entry
-- | enque a list of node IDs to be deleted from the global NodeCache
queueDeleteEntries :: Foldable c
=> c NodeID
-> LocalNodeState s
-> TQueue (NodeCache -> NodeCache)
-> IO ()
queueDeleteEntries ids ns = forM_ ids $ atomically . writeTQueue (cacheWriteQueue ns) . deleteCacheEntry
queueDeleteEntries ids cacheQ = forM_ ids $ atomically . writeTQueue cacheQ . deleteCacheEntry
-- | enque a single node ID to be deleted from the global NodeCache
queueDeleteEntry :: NodeID
-> LocalNodeState s
-> TQueue (NodeCache -> NodeCache)
-> IO ()
queueDeleteEntry toDelete = queueDeleteEntries $ Identity toDelete
@ -826,11 +904,11 @@ queueDeleteEntry toDelete = queueDeleteEntries $ Identity toDelete
-- global 'NodeCache'.
queueUpdateVerifieds :: Foldable c
=> c NodeID
-> LocalNodeState s
-> TQueue (NodeCache -> NodeCache)
-> IO ()
queueUpdateVerifieds nIds ns = do
queueUpdateVerifieds nIds cacheQ = do
now <- getPOSIXTime
forM_ nIds $ \nid' -> atomically $ writeTQueue (cacheWriteQueue ns) $
forM_ nIds $ \nid' -> atomically $ writeTQueue cacheQ $
markCacheEntryAsVerified (Just now) nid'
-- | retry an IO action at most *i* times until it delivers a result

File diff suppressed because it is too large Load Diff

View File

@ -7,8 +7,8 @@
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE RankNTypes #-}
module Hash2Pub.FediChordTypes (
NodeID -- abstract, but newtype constructors cannot be hidden
module Hash2Pub.FediChordTypes
( NodeID -- abstract, but newtype constructors cannot be hidden
, idBits
, getNodeID
, toNodeID
@ -18,6 +18,13 @@ module Hash2Pub.FediChordTypes (
, RemoteNodeState (..)
, RealNode (..)
, RealNodeSTM
, VSMap
, LoadStats (..)
, emptyLoadStats
, remainingLoadTarget
, loadSliceSum
, addVserver
, SegmentLoadStats (..)
, setSuccessors
, setPredecessors
, NodeCache
@ -51,6 +58,7 @@ module Hash2Pub.FediChordTypes (
, localCompare
, genNodeID
, genNodeIDBS
, hasValidNodeId
, genKeyID
, genKeyIDBS
, byteStringToUInteger
@ -60,12 +68,14 @@ module Hash2Pub.FediChordTypes (
, DHT(..)
, Service(..)
, ServiceConf(..)
) where
) where
import Control.Exception
import Data.Foldable (foldr')
import Data.Function (on)
import qualified Data.Hashable as Hashable
import Data.HashMap.Strict (HashMap)
import qualified Data.HashMap.Strict as HMap
import Data.List (delete, nub, sortBy)
import qualified Data.Map.Strict as Map
import Data.Maybe (fromJust, fromMaybe, isJust,
@ -148,17 +158,27 @@ a `localCompare` b
-- Also contains shared data and config values.
-- TODO: more data structures for k-choices bookkeeping
data RealNode s = RealNode
{ vservers :: [LocalNodeStateSTM s]
-- ^ references to all active versers
, nodeConfig :: FediChordConf
{ vservers :: VSMap s
-- ^ map of all active VServer node IDs to their node state
, nodeConfig :: FediChordConf
-- ^ holds the initial configuration read at program start
, bootstrapNodes :: [(String, PortNumber)]
, bootstrapNodes :: [(String, PortNumber)]
-- ^ nodes to be used as bootstrapping points, new ones learned during operation
, lookupCacheSTM :: TVar LookupCache
, lookupCacheSTM :: TVar LookupCache
-- ^ a global cache of looked up keys and their associated nodes
, nodeService :: s (RealNodeSTM s)
, globalNodeCacheSTM :: TVar NodeCache
-- ^ EpiChord node cache with expiry times for nodes.
, globalCacheWriteQueue :: TQueue (NodeCache -> NodeCache)
-- ^ cache updates are not written directly to the 'globalNodeCacheSTM'
, nodeService :: s (RealNodeSTM s)
-- | insert a new vserver mapping into a node
addVserver :: (NodeID, LocalNodeStateSTM s) -> RealNode s -> RealNode s
addVserver (key, nstate) node = node
{ vservers = addRMapEntry key nstate (vservers node) }
type VSMap s = RingMap NodeID (LocalNodeStateSTM s)
type RealNodeSTM s = TVar (RealNode s)
-- | represents a node and all its important state
@ -172,7 +192,7 @@ data RemoteNodeState = RemoteNodeState
-- ^ port of the DHT itself
, servicePort :: PortNumber
-- ^ port of the service provided on top of the DHT
, vServerID :: Integer
, vServerID :: Word8
-- ^ ID of this vserver
deriving (Show, Eq)
@ -185,9 +205,9 @@ data LocalNodeState s = LocalNodeState
{ nodeState :: RemoteNodeState
-- ^ represents common data present both in remote and local node representations
, nodeCacheSTM :: TVar NodeCache
-- ^ EpiChord node cache with expiry times for nodes
-- ^ reference to the 'globalNodeCacheSTM'
, cacheWriteQueue :: TQueue (NodeCache -> NodeCache)
-- ^ cache updates are not written directly to the 'nodeCache' but queued and
-- ^ reference to the 'globalCacheWriteQueue
, successors :: [RemoteNodeState] -- could be a set instead as these are ordered as well
-- ^ successor nodes in ascending order by distance
, predecessors :: [RemoteNodeState]
@ -217,14 +237,14 @@ class NodeState a where
getIpAddr :: a -> HostAddress6
getDhtPort :: a -> PortNumber
getServicePort :: a -> PortNumber
getVServerID :: a -> Integer
getVServerID :: a -> Word8
-- setters for common properties
setNid :: NodeID -> a -> a
setDomain :: String -> a -> a
setIpAddr :: HostAddress6 -> a -> a
setDhtPort :: PortNumber -> a -> a
setServicePort :: PortNumber -> a -> a
setVServerID :: Integer -> a -> a
setVServerID :: Word8 -> a -> a
toRemoteNodeState :: a -> RemoteNodeState
instance NodeState RemoteNodeState where
@ -373,6 +393,11 @@ genNodeID :: HostAddress6 -- ^a node's IPv6 address
-> NodeID -- ^the generated @NodeID@
genNodeID ip nodeDomain vs = NodeID . byteStringToUInteger $ genNodeIDBS ip nodeDomain vs
hasValidNodeId :: Word8 -> RemoteNodeState -> HostAddress6 -> Bool
hasValidNodeId numVs rns addr = getVServerID rns < numVs && getNid rns == genNodeID addr (getDomain rns) (getVServerID rns)
-- | generates a 256 bit long key identifier, represented as ByteString, for looking up its data on the DHT
genKeyIDBS :: String -- ^the key string
-> BS.ByteString -- ^the key ID represented as a @ByteString@
@ -427,9 +452,70 @@ data FediChordConf = FediChordConf
-- ^ how long to wait until response has arrived, in milliseconds
, confRequestRetries :: Int
-- ^ how often re-sending a timed-out request can be retried
, confEnableKChoices :: Bool
-- ^ whether to enable k-choices load balancing
, confKChoicesOverload :: Double
-- ^ fraction of capacity above which a node considers itself overloaded
, confKChoicesUnderload :: Double
-- ^ fraction of capacity below which a node considers itself underloaded
, confKChoicesMaxVS :: Word8
-- ^ upper limit of vserver index κ
, confKChoicesRebalanceInterval :: Int
-- ^ interval between vserver rebalance attempts
deriving (Show, Eq)
-- ====== k-choices load balancing types ======
data LoadStats = LoadStats
{ loadPerTag :: RingMap NodeID Double
-- ^ map of loads for each handled tag
, totalCapacity :: Double
-- ^ total designated capacity of the service
, compensatedLoadSum :: Double
-- ^ effective load reevant for load balancing after compensating for
deriving (Show, Eq)
-- | calculates the mismatch from the target load by taking into account the
-- underload and overload limits
remainingLoadTarget :: FediChordConf -> LoadStats -> Double
remainingLoadTarget conf lstats = targetLoad - compensatedLoadSum lstats
targetLoad = totalCapacity lstats * (confKChoicesUnderload conf + confKChoicesOverload conf) / 2
-- | calculates the sum of tag load in a contiguous slice between to keys
loadSliceSum :: LoadStats
-> NodeID -- ^ lower segment bound
-> NodeID -- ^ upper segment bound
-> Double -- ^ sum of all tag loads within that segment
loadSliceSum stats from to = sum . takeRMapSuccessorsFromTo from to $ loadPerTag stats
data SegmentLoadStats = SegmentLoadStats
{ segmentLowerKeyBound :: NodeID
-- ^ segment start key
, segmentUpperKeyBound :: NodeID
-- ^ segment end key
, segmentLoad :: Double
-- ^ sum of load of all keys in the segment
, segmentOwnerRemainingLoadTarget :: Double
-- ^ remaining load target of the current segment handler:
, segmentOwnerCapacity :: Double
-- ^ total capacity of the current segment handler node, used for normalisation
, segmentCurrentOwner :: RemoteNodeState
-- ^ the current owner of the segment that needs to be joined on
-- TODO: figure out a better way of initialising
emptyLoadStats :: LoadStats
emptyLoadStats = LoadStats
{ loadPerTag = emptyRMap
, totalCapacity = 0
, compensatedLoadSum = 0
-- ====== Service Types ============
class Service s d where
@ -445,6 +531,7 @@ class Service s d where
-> IO (Either String ()) -- ^ success or failure
-- | Wait for an incoming migration from a given node to succeed, may block forever
waitForMigrationFrom :: s d -> NodeID -> IO ()
getServiceLoadStats :: s d -> IO LoadStats
instance Hashable.Hashable NodeID where
hashWithSalt salt = Hashable.hashWithSalt salt . getNodeID

View File

@ -22,7 +22,7 @@ import qualified Data.DList as D
import Data.Either (lefts, rights)
import qualified Data.HashMap.Strict as HMap
import qualified Data.HashSet as HSet
import Data.Maybe (fromJust, isJust)
import Data.Maybe (fromJust, fromMaybe, isJust)
import Data.String (fromString)
import Data.Text.Lazy (Text)
import qualified Data.Text.Lazy as Txt
@ -64,8 +64,10 @@ data PostService d = PostService
, migrationsInProgress :: TVar (HMap.HashMap NodeID (MVar ()))
, httpMan :: HTTP.Manager
, statsQueue :: TQueue StatsEvent
, loadStats :: TVar RelayStats
-- ^ current load stats, replaced periodically
, relayStats :: TVar RelayStats
-- ^ current relay stats, replaced periodically
, loadStats :: TVar LoadStats
-- ^ current load values of the relay, replaced periodically and used by
, logFileHandle :: Handle
deriving (Typeable)
@ -96,7 +98,8 @@ instance DHT d => Service PostService d where
migrationsInProgress' <- newTVarIO HMap.empty
httpMan' <- HTTP.newManager HTTP.defaultManagerSettings
statsQueue' <- newTQueueIO
loadStats' <- newTVarIO emptyStats
relayStats' <- newTVarIO emptyStats
loadStats' <- newTVarIO emptyLoadStats
loggingFile <- openFile (confLogfilePath conf) WriteMode
hSetBuffering loggingFile LineBuffering
@ -112,6 +115,7 @@ instance DHT d => Service PostService d where
, migrationsInProgress = migrationsInProgress'
, httpMan = httpMan'
, statsQueue = statsQueue'
, relayStats = relayStats'
, loadStats = loadStats'
, logFileHandle = loggingFile
@ -153,6 +157,12 @@ instance DHT d => Service PostService d where
-- block until migration finished
takeMVar migrationSynchroniser
getServiceLoadStats = getLoadStats
getLoadStats :: PostService d -> IO LoadStats
getLoadStats serv = readTVarIO $ loadStats serv
-- | return a WAI application
postServiceApplication :: DHT d => PostService d -> Application
@ -835,7 +845,12 @@ evaluateStatsThread serv statsAcc = getPOSIXTime >>= loop
-- persistently store in a TVar so it can be retrieved later by the DHT
let timePassed = (now - previousTs) * fromIntegral (confSpeedupFactor $ serviceConf serv)
rateStats = evaluateStats timePassed summedStats
atomically $ writeTVar (loadStats serv) rateStats
currentSubscribers <- readTVarIO $ subscribers serv
-- translate the rate statistics to load values
loads <- evaluateLoadStats rateStats currentSubscribers
atomically $
writeTVar (relayStats serv) rateStats
>> writeTVar (loadStats serv) loads
-- and now what? write a log to file
-- format: total relayReceiveRates;total relayDeliveryRates;postFetchRate;postPublishRate; subscriberSum
-- later: current (reported) load, target load
@ -859,6 +874,33 @@ evaluateStatsThread serv statsAcc = getPOSIXTime >>= loop
0 tagMap
-- | calculate load values from rate statistics
evaluateLoadStats :: RelayStats -> RelayTags -> IO LoadStats
evaluateLoadStats currentStats currentSubscribers = do
-- load caused by each tag: incomingPostRate * ( 1 + subscribers)
-- calculate remaining load target: post publish rate * 2.5 - sum loadPerTag - postFetchRate
totalCapacity' = 2.5 * postPublishRate currentStats
(loadSum, loadPerTag') <- foldM (\(loadSum, loadPerTag') (key, (subscriberMapSTM, _, _)) -> do
numSubscribers <- HMap.size <$> readTVarIO subscriberMapSTM
thisTagRate = fromMaybe 0 $ rMapLookup key (relayReceiveRates currentStats)
thisTagLoad = thisTagRate * (1 + fromIntegral numSubscribers)
pure (loadSum + thisTagLoad, addRMapEntry key thisTagLoad loadPerTag')
(0, emptyRMap)
$ rMapToListWithKeys currentSubscribers
let remainingLoadTarget' = totalCapacity' - loadSum - postFetchRate currentStats
pure LoadStats
{ loadPerTag = loadPerTag'
, totalCapacity = totalCapacity'
-- load caused by post fetches cannot be influenced by re-balancing nodes,
-- but still reduces the totally available capacity
, compensatedLoadSum = loadSum + postFetchRate currentStats
-- | Evaluate the accumulated statistic events: Currently mostly calculates the event
-- rates by dividing through the collection time frame
evaluateStats :: POSIXTime -> RelayStats -> RelayStats

View File

@ -16,10 +16,12 @@ data Action = QueryID
| Leave
| Stabilise
| Ping
| QueryLoad
deriving (Show, Eq, Enum)
data FediChordMessage = Request
{ requestID :: Integer
, receiverID :: NodeID
, sender :: RemoteNodeState
, part :: Integer
, isFinalPart :: Bool
@ -57,6 +59,10 @@ data ActionPayload = QueryIDRequestPayload
| StabiliseRequestPayload
| PingRequestPayload
| LoadRequestPayload
{ loadSegmentUpperBound :: NodeID
-- ^ upper bound of segment interested in,
| QueryIDResponsePayload
{ queryResult :: QueryResponse
@ -73,6 +79,12 @@ data ActionPayload = QueryIDRequestPayload
| PingResponsePayload
{ pingNodeStates :: [RemoteNodeState]
| LoadResponsePayload
{ loadSum :: Double
, loadRemainingTarget :: Double
, loadTotalCapacity :: Double
, loadSegmentLowerBound :: NodeID
deriving (Show, Eq)
-- | global limit of parts per message used when (de)serialising messages.

View File

@ -47,6 +47,13 @@ instance (Bounded k, Ord k) => Foldable (RingMap k) where
traversingFL acc (ProxyEntry _ Nothing) = acc
traversingFL acc (ProxyEntry _ (Just entry)) = traversingFL acc entry
instance (Bounded k, Ord k) => Traversable (RingMap k) where
traverse f = fmap RingMap . traverse traversingF . getRingMap
traversingF (KeyEntry entry) = KeyEntry <$> f entry
traversingF (ProxyEntry to Nothing) = pure $ ProxyEntry to Nothing
traversingF (ProxyEntry to (Just entry)) = ProxyEntry to . Just <$> traversingF entry
-- | entry of a 'RingMap' that holds a value and can also
-- wrap around the lookup direction at the edges of the name space.
@ -106,6 +113,23 @@ rMapSize rmap = fromIntegral $ Map.size innerMap - oneIfEntry rmap minBound - on
| isNothing (rMapLookup nid rmap') = 1
| otherwise = 0
-- | whether the RingMap is empty (except for empty proxy entries)
nullRMap :: (Num k, Bounded k, Ord k)
=> RingMap k a
-> Bool
-- basic idea: look for a predecessor starting from the upper Map bound,
-- Nothing indicates no entry being found
nullRMap = isNothing . rMapLookupPred maxBound
-- | O(logn( Is the key a member of the RingMap?
memberRMap :: (Bounded k, Ord k)
=> k
-> RingMap k a
-> Bool
memberRMap key = isJust . rMapLookup key
-- | a wrapper around lookup functions, making the lookup redirectable by a @ProxyEntry@
-- to simulate a modular ring
lookupWrapper :: (Bounded k, Ord k, Num k)
@ -198,12 +222,28 @@ deleteRMapEntry nid = RingMap . Map.update modifier nid . getRingMap
modifier (ProxyEntry idPointer _) = Just (ProxyEntry idPointer Nothing)
modifier KeyEntry {} = Nothing
-- TODO: rename this to elems
rMapToList :: (Bounded k, Ord k) => RingMap k a -> [a]
rMapToList = mapMaybe extractRingEntry . Map.elems . getRingMap
-- TODO: rename this to toList
rMapToListWithKeys :: (Bounded k, Ord k) => RingMap k a -> [(k, a)]
rMapToListWithKeys = Map.foldrWithKey (\k v acc ->
maybe acc (\val -> (k, val):acc) $ extractRingEntry v
. getRingMap
rMapFromList :: (Bounded k, Ord k) => [(k, a)] -> RingMap k a
rMapFromList = setRMapEntries
-- | this just merges the underlying 'Map.Map' s and does not check whether the
-- ProxyEntry pointers are consistent, so better only create unions of
-- equal-pointered RingMaps
unionRMap :: (Bounded k, Ord k) => RingMap k a -> RingMap k a -> RingMap k a
unionRMap a b = RingMap $ Map.union (getRingMap a) (getRingMap b)
-- | takes up to i entries from a 'RingMap' by calling a getter function on a
-- *startAt* value and after that on the previously returned value.
-- Stops once i entries have been taken or an entry has been encountered twice

View File

@ -7,6 +7,7 @@ import Control.Concurrent.STM.TVar
import Control.Exception
import Data.ASN1.Parse (runParseASN1)
import qualified Data.ByteString as BS
import qualified Data.HashMap.Strict as HMap
import qualified Data.Map.Strict as Map
import Data.Maybe (fromJust, isJust)
import qualified Data.Set as Set
@ -18,6 +19,7 @@ import Hash2Pub.ASN1Coding
import Hash2Pub.DHTProtocol
import Hash2Pub.FediChord
import Hash2Pub.FediChordTypes
import Hash2Pub.RingMap
spec :: Spec
spec = do
@ -221,14 +223,16 @@ spec = do
, exampleNodeState {nid = fromInteger (-5)}
requestTemplate = Request {
requestID = 2342
, sender = exampleNodeState
, part = 1
, isFinalPart = True
, action = undefined
, payload = undefined
qLoadReqPayload = LoadRequestPayload
{ loadSegmentUpperBound = 1025
qLoadResPayload = LoadResponsePayload
{ loadSum = 3.141
, loadRemainingTarget = -1.337
, loadTotalCapacity = 2.21
, loadSegmentLowerBound = 12
responseTemplate = Response {
requestID = 2342
, senderID = nid exampleNodeState
@ -237,7 +241,7 @@ spec = do
, action = undefined
, payload = undefined
requestWith a pa = requestTemplate {action = a, payload = Just pa}
requestWith senderNode a pa = mkRequest senderNode 4545 a (Just pa) 2342
responseWith a pa = responseTemplate {action = a, payload = Just pa}
encodeDecodeAndCheck msg = runParseASN1 parseMessage (encodeMessage msg) `shouldBe` pure msg
@ -248,17 +252,20 @@ spec = do
it "messages are encoded and decoded correctly from and to ASN1" $ do
encodeDecodeAndCheck $ requestWith QueryID qidReqPayload
encodeDecodeAndCheck $ requestWith Join jReqPayload
encodeDecodeAndCheck $ requestWith Leave lReqPayload
encodeDecodeAndCheck $ requestWith Stabilise stabReqPayload
encodeDecodeAndCheck $ requestWith Ping pingReqPayload
localNS <- exampleLocalNode
encodeDecodeAndCheck $ requestWith localNS QueryID qidReqPayload
encodeDecodeAndCheck $ requestWith localNS Join jReqPayload
encodeDecodeAndCheck $ requestWith localNS Leave lReqPayload
encodeDecodeAndCheck $ requestWith localNS Stabilise stabReqPayload
encodeDecodeAndCheck $ requestWith localNS Ping pingReqPayload
encodeDecodeAndCheck $ requestWith localNS QueryLoad qLoadReqPayload
encodeDecodeAndCheck $ responseWith QueryID qidResPayload1
encodeDecodeAndCheck $ responseWith QueryID qidResPayload2
encodeDecodeAndCheck $ responseWith Join jResPayload
encodeDecodeAndCheck $ responseWith Leave lResPayload
encodeDecodeAndCheck $ responseWith Stabilise stabResPayload
encodeDecodeAndCheck $ responseWith Ping pingResPayload
encodeDecodeAndCheck $ responseWith QueryLoad qLoadResPayload
it "messages are encoded and decoded to ASN.1 DER properly" $
deserialiseMessage (fromJust $ Map.lookup 1 (serialiseMessage 652 $ responseWith Ping pingResPayload)) `shouldBe` Right (responseWith Ping pingResPayload)
it "messages too large for a single packet can (often) be split into multiple parts" $ do
@ -297,13 +304,13 @@ exampleNodeState = RemoteNodeState {
exampleLocalNode :: IO (LocalNodeState MockService)
exampleLocalNode = do
realNode <- newTVarIO $ RealNode {
vservers = []
realNodeSTM <- newTVarIO $ RealNode {
vservers = emptyRMap
, nodeConfig = exampleFediConf
, bootstrapNodes = confBootstrapNodes exampleFediConf
, nodeService = MockService
nodeStateInit realNode
nodeStateInit realNodeSTM 0
exampleFediConf :: FediChordConf