Compare commits

...

38 commits

Author SHA1 Message Date
Trolli Schmittlauch e79ba52e00 update ghc to 8.6.4, nixpkgs base to 20.09
- relaxes some version constraints as dirty update quickfix
- removes hie integration as that project is abandoned, todo: switch to
  haskell-languageserver instead
2021-01-01 14:30:33 +01:00
Trolli Schmittlauch 4aa4667a1d kChoices cost calculations for rebalance decisions
- loop with load queries and cost calculations on whether to do a vs
  relocation
- actual relocation still missing though
- untested
2020-10-10 04:35:24 +02:00
Trolli Schmittlauch 6aebd982f8 make RingMap an instance of Traversable
- some examples tested out by hand, but not thorough test case or even
  QuickCheck coverage
2020-10-07 19:24:15 +02:00
Trolli Schmittlauch 048a6ce391 modularise VS candidate load querying into own function 2020-10-07 16:17:45 +02:00
Trolli Schmittlauch 8bd4e04bcd bootstrapQueryId doesn't need a STM'd node state 2020-10-07 15:50:44 +02:00
Trolli Schmittlauch 0cb4b6815c start implementing k-choices rebalancing, considering 1 VS each run
only loop implemented, rebalancing not implemented
2020-10-07 00:43:06 +02:00
Trolli Schmittlauch b111515178 add config option for k-choices rebalance interval 2020-10-06 16:01:29 +02:00
Trolli Schmittlauch ecb127e6af k-choices cost calculation for departure cost 2020-10-05 22:48:56 +02:00
Trolli Schmittlauch 5ed8a28fde refactor vservers map to RingMap to be able to index it
- in preparation for periodical rebalancing
- makes it possible to look up the next vserver for iterating through
  it, after refreshing the map in-between
- added some necessary RingMap functions
2020-10-05 02:27:02 +02:00
Trolli Schmittlauch bb0fb0919a refactor request sender ID spoof check to suit k-choices
- mostly refactored the checks into its own function
- now additionally check the vserver number limit
- refactoring to pass that limit to the checking function invocations
- closes #74
2020-09-29 02:59:42 +02:00
Trolli Schmittlauch b2b4fe3dd8 change vserver ID representation type to Word8
- performance improvement: avoid unnecessary representation and
  conversion from/to Integer
- part of hot path: with k-choices, all possible IDs are regularly
  generated and checked
- preparation for #74
2020-09-29 02:06:31 +02:00
Trolli Schmittlauch c208aeceaa rename isJoined to reflect its scope on a single VS
This should be enough to close #76, as it was only used in the scope of
a single LocalNodeState anyways.
2020-09-29 00:45:15 +02:00
Trolli Schmittlauch 0ee8f0dc43 adjust joinOnNewEntreisThread to k-choices join 2020-09-29 00:34:11 +02:00
Trolli Schmittlauch 21ecf9b041 bootstrapQueryID: try all possible node IDs of a bootstrap node
- closes #77
- when k-choices (#2) joining, try addressing each possible node ID of
  the bootstrap node until success
- bugfix: include correct target ID of node that shall respond in
  QueryID requests
2020-09-28 00:56:15 +02:00
Trolli Schmittlauch 9a61c186e3 start restructuring joinOnNewEntries flow
- overview comment on possible flow
- cache query
- doesn't compile yet
2020-09-26 22:08:09 +02:00
Trolli Schmittlauch 578cc362b9 fix tests 2020-09-25 22:33:40 +02:00
Trolli Schmittlauch 1a0de55b8c integrate k-choices into tryBootstrapJoin flow
part of #2
2020-09-25 22:07:12 +02:00
Trolli Schmittlauch 7a87d86c32 k-choices error handling: detect empty joins, finer fail granularity 2020-09-25 02:04:34 +02:00
Trolli Schmittlauch 3b6d129bfc implement k-choices join functions
large commit, contains a number of things:
- k-choices #2 cost calculation
- k-choices parameters
- adjusting ASN.1 network messages to contain all values required for
  cost calculation #71
- adjusting stats to contain required values
- k-choices node and vserver join functions
- placeholder/ dummy invocation of k-choices join
2020-09-25 01:41:04 +02:00
Trolli Schmittlauch 62da66aade add runtime flag for enabling k-choices or not
any value except "off" means on

contributes to #2
2020-09-22 23:12:30 +02:00
Trolli Schmittlauch 13c5b385b1 make inclusion of HIE overlay conditional as well 2020-09-21 22:14:33 +02:00
Trolli Schmittlauch 1ed0281417 respond to QueryLoad requests
closes #71
closes #72
contributes to #2
2020-09-21 18:15:40 +02:00
Trolli Schmittlauch 499c90e63a stylish run 2020-09-21 02:23:06 +02:00
Trolli Schmittlauch 1a7afed062 finish restructuring fediMainThreads
contributes to #34
2020-09-21 02:22:46 +02:00
Trolli Schmittlauch 8e8ea41dc4 re-structure convergenceSampleThread to work on a RealNode and iterate over all joined vservers
contributes to #34
2020-09-21 02:18:28 +02:00
Trolli Schmittlauch 33ae904d17 re-structure cacheVerifyThread to work on a RealNode and iterate over all joined vservers
contributes to #34
2020-09-21 02:11:43 +02:00
Trolli Schmittlauch 68de73d919 re-structure fediChordMessageHandler to dispatch requests to the responsible vserver
contributes to #34
2020-09-20 21:19:55 +02:00
Trolli Schmittlauch 0ab6ee9c8f re-strucuture fediChordInit flow to also do the bootstrapping 2020-09-20 19:30:35 +02:00
Trolli Schmittlauch 12dfc56a73 fediChordInit returns a RealNode, manages vservers as map
- contributes to #34
2020-09-19 23:01:55 +02:00
Trolli Schmittlauch 9bf7365a2c include target ID in request to address individual vserver
- necessary for dispatching the request to a certain vserver
- also refactored request sending to use a common `mkRequest`
- contributes to #2
2020-09-19 20:41:58 +02:00
Trolli Schmittlauch 5e745cd035 only specify upper key bound when querying load
As a querying node does not always know the lower bound of the queried
segment – determined by the predecessor – let the currently responsible
node provide that bound instead.

affects #71
2020-09-19 15:37:41 +02:00
Trolli Schmittlauch 30bf0529ed send load query request, parse result and represent it
- sending side of #71
- introduces SegmentLoadStats to hold the response data
- contributes to #2
2020-09-19 02:05:29 +02:00
Trolli Schmittlauch 576ea2c3f6 calculate service load rates, interface for querying loads
- define data type for load representation
- this representation can be queried from any Service (closes #72)
- loads are periodically calculated from measured rates (contributes to #2)
2020-09-18 20:36:46 +02:00
Trolli Schmittlauch 7dd7e96cce conversion of RingMap to key-value list 2020-09-18 20:36:35 +02:00
Trolli Schmittlauch a1cfbbac48 bump nixpkgs revision 2020-09-18 20:36:35 +02:00
Trolli Schmittlauch af27cded19 adjust payload parser naming for consistency and clarity 2020-09-18 20:36:35 +02:00
Trolli Schmittlauch 41aaa8ff70 parse ASN.1 representation of load querying
includes tests
contributes to #71
2020-09-18 20:36:35 +02:00
Trolli Schmittlauch ddea599022 extend ASN.1 schema for requesting load information
contributes to #71
2020-09-18 01:26:42 +02:00
13 changed files with 1103 additions and 435 deletions

View file

@ -6,11 +6,12 @@ Domain ::= VisibleString
Partnum ::= INTEGER (0..150) Partnum ::= INTEGER (0..150)
Action ::= ENUMERATED {queryID, join, leave, stabilise, ping} Action ::= ENUMERATED {queryID, join, leave, stabilise, ping, queryLoad}
Request ::= SEQUENCE { Request ::= SEQUENCE {
action Action, action Action,
requestID INTEGER (0..4294967295), -- arbitrarily restricting to an unsigned 32bit integer requestID INTEGER (0..4294967295), -- arbitrarily restricting to an unsigned 32bit integer
receiverID NodeID,
sender NodeState, sender NodeState,
part Partnum, -- part number of this message, starts at 1 part Partnum, -- part number of this message, starts at 1
finalPart BOOLEAN, -- flag indicating this `part` to be the last of this reuest finalPart BOOLEAN, -- flag indicating this `part` to be the last of this reuest
@ -19,7 +20,8 @@ Request ::= SEQUENCE {
joinRequestPayload JoinRequestPayload, joinRequestPayload JoinRequestPayload,
leaveRequestPayload LeaveRequestPayload, leaveRequestPayload LeaveRequestPayload,
stabiliseRequestPayload StabiliseRequestPayload, stabiliseRequestPayload StabiliseRequestPayload,
pingRequestPayload PingRequestPayload pingRequestPayload PingRequestPayload,
loadRequestPayload LoadRequestPayload
} OPTIONAL -- just for symmetry reasons with response, requests without a payload have no meaning } OPTIONAL -- just for symmetry reasons with response, requests without a payload have no meaning
} }
@ -38,7 +40,8 @@ Response ::= SEQUENCE {
joinResponsePayload JoinResponsePayload, joinResponsePayload JoinResponsePayload,
leaveResponsePayload LeaveResponsePayload, leaveResponsePayload LeaveResponsePayload,
stabiliseResponsePayload StabiliseResponsePayload, stabiliseResponsePayload StabiliseResponsePayload,
pingResponsePayload PingResponsePayload pingResponsePayload PingResponsePayload,
loadResponsePayload LoadResponsePayload
} OPTIONAL -- no payload when just ACKing a previous request } OPTIONAL -- no payload when just ACKing a previous request
} }
@ -101,5 +104,15 @@ PingRequestPayload ::= NULL -- do not include a node/ vserver ID, so that
-- learning all active vserver IDs handled by the server at once -- learning all active vserver IDs handled by the server at once
PingResponsePayload ::= SEQUENCE OF NodeState PingResponsePayload ::= SEQUENCE OF NodeState
LoadRequestPayload ::= SEQUENCE {
upperSegmentBound NodeID
}
LoadResponsePayload ::= SEQUENCE {
loadSum REAL,
remainingLoadTarget REAL,
totalCapacity REAL,
lowerBound NodeID
}
END END

View file

@ -46,7 +46,7 @@ category: Network
extra-source-files: CHANGELOG.md extra-source-files: CHANGELOG.md
common deps common deps
build-depends: base ^>=4.12.0.0, containers ^>=0.6.0.1, bytestring, utf8-string ^>=1.0.1.1, network ^>=2.8.0.1, time ^>=1.8.0.2, cmdargs ^>= 0.10, cryptonite ^>= 0.25, memory, async, stm, asn1-encoding, asn1-types, asn1-parse, publicsuffix, network-byte-order, safe, iproute, mtl, random, servant, servant-server, servant-client, warp, text, unordered-containers, hashable, unicode-transforms, http-client, http-types, unbounded-delays, dlist, formatting build-depends: base >=4, containers ^>=0.6.0.1, bytestring, utf8-string ^>=1.0.1.1, network ^>=3.1, time, cmdargs ^>= 0.10, cryptonite, memory, async, stm, asn1-encoding, asn1-types, asn1-parse, publicsuffix, network-byte-order, safe, iproute, mtl, random, servant, servant-server, servant-client, warp, text, unordered-containers, hashable, unicode-transforms, http-client, http-types, unbounded-delays, dlist, formatting
ghc-options: -Wall -Wpartial-fields -O2 ghc-options: -Wall -Wpartial-fields -O2

View file

@ -18,38 +18,20 @@ main = do
-- ToDo: parse and pass config -- ToDo: parse and pass config
-- probably use `tomland` for that -- probably use `tomland` for that
(fConf, sConf) <- readConfig (fConf, sConf) <- readConfig
-- TODO: first initialise 'RealNode', then the vservers
-- ToDo: load persisted caches, bootstrapping nodes … -- ToDo: load persisted caches, bootstrapping nodes …
(serverSock, thisNode) <- fediChordInit fConf (runService sConf :: DHT d => d -> IO (PostService d)) (fediThreads, thisNode) <- fediChordInit fConf (runService sConf :: DHT d => d -> IO (PostService d))
-- currently no masking is necessary, as there is nothing to clean up -- wait for all DHT threads to terminate, this keeps the main thread running
nodeCacheWriterThread <- forkIO $ nodeCacheWriter thisNode wait fediThreads
-- try joining the DHT using one of the provided bootstrapping nodes
joinedState <- tryBootstrapJoining thisNode
either (\err -> do
-- handle unsuccessful join
putStrLn $ err <> " Error joining, start listening for incoming requests anyways"
print =<< readTVarIO thisNode
-- launch thread attempting to join on new cache entries
_ <- forkIO $ joinOnNewEntriesThread thisNode
wait =<< async (fediMainThreads serverSock thisNode)
)
(\joinedNS -> do
-- launch main eventloop with successfully joined state
putStrLn "successful join"
wait =<< async (fediMainThreads serverSock thisNode)
)
joinedState
pure ()
readConfig :: IO (FediChordConf, ServiceConf) readConfig :: IO (FediChordConf, ServiceConf)
readConfig = do readConfig = do
confDomainString : ipString : portString : servicePortString : speedupString : remainingArgs <- getArgs confDomainString : ipString : portString : servicePortString : speedupString : loadBalancingEnabled : remainingArgs <- getArgs
-- allow starting the initial node without bootstrapping info to avoid -- allow starting the initial node without bootstrapping info to avoid
-- waiting for timeout -- waiting for timeout
let let
speedup = read speedupString speedup = read speedupString
statsEvalD = 120 * 10^6 `div` speedup
confBootstrapNodes' = case remainingArgs of confBootstrapNodes' = case remainingArgs of
bootstrapHost : bootstrapPortString : _ -> bootstrapHost : bootstrapPortString : _ ->
[(bootstrapHost, read bootstrapPortString)] [(bootstrapHost, read bootstrapPortString)]
@ -67,6 +49,11 @@ readConfig = do
, confResponsePurgeAge = 60 / fromIntegral speedup , confResponsePurgeAge = 60 / fromIntegral speedup
, confRequestTimeout = 5 * 10^6 `div` speedup , confRequestTimeout = 5 * 10^6 `div` speedup
, confRequestRetries = 3 , confRequestRetries = 3
, confEnableKChoices = loadBalancingEnabled /= "off"
, confKChoicesOverload = 0.9
, confKChoicesUnderload = 0.1
, confKChoicesMaxVS = 8
, confKChoicesRebalanceInterval = round (realToFrac statsEvalD * 1.1 :: Double)
} }
sConf = ServiceConf sConf = ServiceConf
{ confSubscriptionExpiryTime = 24*3600 / fromIntegral speedup { confSubscriptionExpiryTime = 24*3600 / fromIntegral speedup
@ -74,7 +61,7 @@ readConfig = do
, confServiceHost = confDomainString , confServiceHost = confDomainString
, confLogfilePath = "../simulationData/logs/" <> confDomainString <> ".log" , confLogfilePath = "../simulationData/logs/" <> confDomainString <> ".log"
, confSpeedupFactor = speedup , confSpeedupFactor = speedup
, confStatsEvalDelay = 120 * 10^6 `div` speedup , confStatsEvalDelay = statsEvalD
} }
pure (fConf, sConf) pure (fConf, sConf)

View file

@ -1,26 +1,18 @@
{ {
compiler ? "ghc865", compiler ? "ghc884"
withHIE ? false
}: }:
let let
# pin all-hies for getting the language server
all-hies = fetchTarball {
url = "https://github.com/infinisil/all-hies/tarball/b8fb659620b99b4a393922abaa03a1695e2ca64d";
sha256 = "sha256:0br6wsqpfk1lzz90f7zw439w1ir2p54268qilw9l2pk6yz7ganfx";
};
pkgs = import ( pkgs = import (
builtins.fetchGit { builtins.fetchGit {
name = "nixpkgs-pinned"; name = "nixpkgs-pinned";
url = https://github.com/NixOS/nixpkgs/; url = https://github.com/NixOS/nixpkgs/;
ref = "refs/heads/release-20.03"; ref = "refs/heads/release-20.09";
rev = "de3780b937d2984f9b5e20d191f23be4f857b3aa"; rev = "e065200fc90175a8f6e50e76ef10a48786126e1c";
}) { }) {
# Pass no config for purity # Pass no config for purity
config = {}; config = {};
overlays = [ overlays = [];
(import all-hies {}).overlay
];
}; };
hp = pkgs.haskell.packages."${compiler}"; hp = pkgs.haskell.packages."${compiler}";
src = pkgs.nix-gitignore.gitignoreSource [] ./.; src = pkgs.nix-gitignore.gitignoreSource [] ./.;
@ -38,7 +30,6 @@ in
hlint hlint
stylish-haskell stylish-haskell
pkgs.python3Packages.asn1ate pkgs.python3Packages.asn1ate
] ];
++ (if withHIE then [ hie ] else []);
}; };
} }

View file

@ -1 +1 @@
(import ./default.nix {withHIE = true;}).shell (import ./default.nix {}).shell

View file

@ -184,6 +184,19 @@ encodePayload payload'@PingResponsePayload{} =
Start Sequence Start Sequence
: concatMap encodeNodeState (pingNodeStates payload') : concatMap encodeNodeState (pingNodeStates payload')
<> [End Sequence] <> [End Sequence]
encodePayload payload'@LoadRequestPayload{} =
[ Start Sequence
, IntVal . getNodeID $ loadSegmentUpperBound payload'
, End Sequence
]
encodePayload payload'@LoadResponsePayload{} =
[ Start Sequence
, Real $ loadSum payload'
, Real $ loadRemainingTarget payload'
, Real $ loadTotalCapacity payload'
, IntVal . getNodeID $ loadSegmentLowerBound payload'
, End Sequence
]
encodeNodeState :: NodeState a => a -> [ASN1] encodeNodeState :: NodeState a => a -> [ASN1]
encodeNodeState ns = [ encodeNodeState ns = [
@ -193,7 +206,7 @@ encodeNodeState ns = [
, OctetString (ipAddrAsBS $ getIpAddr ns) , OctetString (ipAddrAsBS $ getIpAddr ns)
, IntVal (toInteger . getDhtPort $ ns) , IntVal (toInteger . getDhtPort $ ns)
, IntVal (toInteger . getServicePort $ ns) , IntVal (toInteger . getServicePort $ ns)
, IntVal (getVServerID ns) , IntVal (toInteger $ getVServerID ns)
, End Sequence , End Sequence
] ]
@ -215,10 +228,11 @@ encodeQueryResult FORWARD{} = Enumerated 1
encodeMessage :: FediChordMessage -- ^ the 'FediChordMessage to be encoded encodeMessage :: FediChordMessage -- ^ the 'FediChordMessage to be encoded
-> [ASN1] -> [ASN1]
encodeMessage encodeMessage
(Request requestID sender part isFinalPart action requestPayload) = (Request requestID receiverID sender part isFinalPart action requestPayload) =
Start Sequence Start Sequence
: (Enumerated . fromIntegral . fromEnum $ action) : (Enumerated . fromIntegral . fromEnum $ action)
: IntVal requestID : IntVal requestID
: (IntVal . getNodeID $ receiverID)
: encodeNodeState sender : encodeNodeState sender
<> [IntVal part <> [IntVal part
, Boolean isFinalPart] , Boolean isFinalPart]
@ -262,18 +276,20 @@ parseMessage = do
parseRequest :: Action -> ParseASN1 FediChordMessage parseRequest :: Action -> ParseASN1 FediChordMessage
parseRequest action = do parseRequest action = do
requestID <- parseInteger requestID <- parseInteger
receiverID' <- fromInteger <$> parseInteger
sender <- parseNodeState sender <- parseNodeState
part <- parseInteger part <- parseInteger
isFinalPart <- parseBool isFinalPart <- parseBool
hasPayload <- hasNext hasPayload <- hasNext
payload <- if not hasPayload then pure Nothing else Just <$> case action of payload <- if not hasPayload then pure Nothing else Just <$> case action of
QueryID -> parseQueryIDRequest QueryID -> parseQueryIDRequestPayload
Join -> parseJoinRequest Join -> parseJoinRequestPayload
Leave -> parseLeaveRequest Leave -> parseLeaveRequestPayload
Stabilise -> parseStabiliseRequest Stabilise -> parseStabiliseRequestPayload
Ping -> parsePingRequest Ping -> parsePingRequestPayload
QueryLoad -> parseLoadRequestPayload
pure $ Request requestID sender part isFinalPart action payload pure $ Request requestID receiverID' sender part isFinalPart action payload
parseResponse :: Integer -> ParseASN1 FediChordMessage parseResponse :: Integer -> ParseASN1 FediChordMessage
parseResponse requestID = do parseResponse requestID = do
@ -283,11 +299,12 @@ parseResponse requestID = do
action <- parseEnum :: ParseASN1 Action action <- parseEnum :: ParseASN1 Action
hasPayload <- hasNext hasPayload <- hasNext
payload <- if not hasPayload then pure Nothing else Just <$> case action of payload <- if not hasPayload then pure Nothing else Just <$> case action of
QueryID -> parseQueryIDResponse QueryID -> parseQueryIDResponsePayload
Join -> parseJoinResponse Join -> parseJoinResponsePayload
Leave -> parseLeaveResponse Leave -> parseLeaveResponsePayload
Stabilise -> parseStabiliseResponse Stabilise -> parseStabiliseResponsePayload
Ping -> parsePingResponse Ping -> parsePingResponsePayload
QueryLoad -> parseLoadResponsePayload
pure $ Response requestID senderID part isFinalPart action payload pure $ Response requestID senderID part isFinalPart action payload
@ -305,6 +322,13 @@ parseInteger = do
IntVal parsed -> pure parsed IntVal parsed -> pure parsed
x -> throwParseError $ "Expected IntVal but got " <> show x x -> throwParseError $ "Expected IntVal but got " <> show x
parseReal :: ParseASN1 Double
parseReal = do
i <- getNext
case i of
Real parsed -> pure parsed
x -> throwParseError $ "Expected Real but got " <> show x
parseEnum :: Enum a => ParseASN1 a parseEnum :: Enum a => ParseASN1 a
parseEnum = do parseEnum = do
e <- getNext e <- getNext
@ -346,7 +370,7 @@ parseNodeState = onNextContainer Sequence $ do
, domain = domain' , domain = domain'
, dhtPort = dhtPort' , dhtPort = dhtPort'
, servicePort = servicePort' , servicePort = servicePort'
, vServerID = vServer' , vServerID = fromInteger vServer'
, ipAddr = ip' , ipAddr = ip'
} }
@ -360,13 +384,13 @@ parseCacheEntry = onNextContainer Sequence $ do
parseNodeCache :: ParseASN1 [RemoteCacheEntry] parseNodeCache :: ParseASN1 [RemoteCacheEntry]
parseNodeCache = onNextContainer Sequence $ getMany parseCacheEntry parseNodeCache = onNextContainer Sequence $ getMany parseCacheEntry
parseJoinRequest :: ParseASN1 ActionPayload parseJoinRequestPayload :: ParseASN1 ActionPayload
parseJoinRequest = do parseJoinRequestPayload = do
parseNull parseNull
pure JoinRequestPayload pure JoinRequestPayload
parseJoinResponse :: ParseASN1 ActionPayload parseJoinResponsePayload :: ParseASN1 ActionPayload
parseJoinResponse = onNextContainer Sequence $ do parseJoinResponsePayload = onNextContainer Sequence $ do
succ' <- onNextContainer Sequence (getMany parseNodeState) succ' <- onNextContainer Sequence (getMany parseNodeState)
pred' <- onNextContainer Sequence (getMany parseNodeState) pred' <- onNextContainer Sequence (getMany parseNodeState)
cache <- parseNodeCache cache <- parseNodeCache
@ -376,8 +400,8 @@ parseJoinResponse = onNextContainer Sequence $ do
, joinCache = cache , joinCache = cache
} }
parseQueryIDRequest :: ParseASN1 ActionPayload parseQueryIDRequestPayload :: ParseASN1 ActionPayload
parseQueryIDRequest = onNextContainer Sequence $ do parseQueryIDRequestPayload = onNextContainer Sequence $ do
targetID <- fromInteger <$> parseInteger targetID <- fromInteger <$> parseInteger
lBestNodes <- parseInteger lBestNodes <- parseInteger
pure $ QueryIDRequestPayload { pure $ QueryIDRequestPayload {
@ -385,8 +409,8 @@ parseQueryIDRequest = onNextContainer Sequence $ do
, queryLBestNodes = lBestNodes , queryLBestNodes = lBestNodes
} }
parseQueryIDResponse :: ParseASN1 ActionPayload parseQueryIDResponsePayload :: ParseASN1 ActionPayload
parseQueryIDResponse = onNextContainer Sequence $ do parseQueryIDResponsePayload = onNextContainer Sequence $ do
Enumerated resultType <- getNext Enumerated resultType <- getNext
result <- case resultType of result <- case resultType of
0 -> FOUND <$> parseNodeState 0 -> FOUND <$> parseNodeState
@ -396,13 +420,13 @@ parseQueryIDResponse = onNextContainer Sequence $ do
queryResult = result queryResult = result
} }
parseStabiliseRequest :: ParseASN1 ActionPayload parseStabiliseRequestPayload :: ParseASN1 ActionPayload
parseStabiliseRequest = do parseStabiliseRequestPayload = do
parseNull parseNull
pure StabiliseRequestPayload pure StabiliseRequestPayload
parseStabiliseResponse :: ParseASN1 ActionPayload parseStabiliseResponsePayload :: ParseASN1 ActionPayload
parseStabiliseResponse = onNextContainer Sequence $ do parseStabiliseResponsePayload = onNextContainer Sequence $ do
succ' <- onNextContainer Sequence (getMany parseNodeState) succ' <- onNextContainer Sequence (getMany parseNodeState)
pred' <- onNextContainer Sequence (getMany parseNodeState) pred' <- onNextContainer Sequence (getMany parseNodeState)
pure $ StabiliseResponsePayload { pure $ StabiliseResponsePayload {
@ -410,8 +434,8 @@ parseStabiliseResponse = onNextContainer Sequence $ do
, stabilisePredecessors = pred' , stabilisePredecessors = pred'
} }
parseLeaveRequest :: ParseASN1 ActionPayload parseLeaveRequestPayload :: ParseASN1 ActionPayload
parseLeaveRequest = onNextContainer Sequence $ do parseLeaveRequestPayload = onNextContainer Sequence $ do
succ' <- onNextContainer Sequence (getMany parseNodeState) succ' <- onNextContainer Sequence (getMany parseNodeState)
pred' <- onNextContainer Sequence (getMany parseNodeState) pred' <- onNextContainer Sequence (getMany parseNodeState)
doMigration <- parseBool doMigration <- parseBool
@ -421,19 +445,40 @@ parseLeaveRequest = onNextContainer Sequence $ do
, leaveDoMigration = doMigration , leaveDoMigration = doMigration
} }
parseLeaveResponse :: ParseASN1 ActionPayload parseLeaveResponsePayload :: ParseASN1 ActionPayload
parseLeaveResponse = do parseLeaveResponsePayload = do
parseNull parseNull
pure LeaveResponsePayload pure LeaveResponsePayload
parsePingRequest :: ParseASN1 ActionPayload parsePingRequestPayload :: ParseASN1 ActionPayload
parsePingRequest = do parsePingRequestPayload = do
parseNull parseNull
pure PingRequestPayload pure PingRequestPayload
parsePingResponse :: ParseASN1 ActionPayload parsePingResponsePayload :: ParseASN1 ActionPayload
parsePingResponse = onNextContainer Sequence $ do parsePingResponsePayload = onNextContainer Sequence $ do
handledNodes <- getMany parseNodeState handledNodes <- getMany parseNodeState
pure $ PingResponsePayload { pure $ PingResponsePayload {
pingNodeStates = handledNodes pingNodeStates = handledNodes
} }
parseLoadRequestPayload :: ParseASN1 ActionPayload
parseLoadRequestPayload = onNextContainer Sequence $ do
loadUpperBound' <- fromInteger <$> parseInteger
pure LoadRequestPayload
{ loadSegmentUpperBound = loadUpperBound'
}
parseLoadResponsePayload :: ParseASN1 ActionPayload
parseLoadResponsePayload = onNextContainer Sequence $ do
loadSum' <- parseReal
loadRemainingTarget' <- parseReal
loadTotalCapacity' <- parseReal
loadSegmentLowerBound' <- fromInteger <$> parseInteger
pure LoadResponsePayload
{ loadSum = loadSum'
, loadRemainingTarget = loadRemainingTarget'
, loadTotalCapacity = loadTotalCapacity'
, loadSegmentLowerBound = loadSegmentLowerBound'
}

View file

@ -15,6 +15,7 @@ module Hash2Pub.DHTProtocol
, Action(..) , Action(..)
, ActionPayload(..) , ActionPayload(..)
, FediChordMessage(..) , FediChordMessage(..)
, mkRequest
, maximumParts , maximumParts
, sendQueryIdMessages , sendQueryIdMessages
, requestQueryID , requestQueryID
@ -22,6 +23,7 @@ module Hash2Pub.DHTProtocol
, requestLeave , requestLeave
, requestPing , requestPing
, requestStabilise , requestStabilise
, requestQueryLoad
, lookupMessage , lookupMessage
, sendRequestTo , sendRequestTo
, queryIdLookupLoop , queryIdLookupLoop
@ -36,7 +38,7 @@ module Hash2Pub.DHTProtocol
, isPossibleSuccessor , isPossibleSuccessor
, isPossiblePredecessor , isPossiblePredecessor
, isInOwnResponsibilitySlice , isInOwnResponsibilitySlice
, isJoined , vsIsJoined
, closestCachePredecessors , closestCachePredecessors
) )
where where
@ -49,7 +51,8 @@ import Control.Concurrent.STM.TQueue
import Control.Concurrent.STM.TVar import Control.Concurrent.STM.TVar
import Control.Exception import Control.Exception
import Control.Monad (foldM, forM, forM_, void, when) import Control.Monad (foldM, forM, forM_, void, when)
import Control.Monad.Except (MonadError (..), runExceptT) import Control.Monad.Except (MonadError (..), liftEither,
runExceptT)
import Control.Monad.IO.Class (MonadIO (..)) import Control.Monad.IO.Class (MonadIO (..))
import qualified Data.ByteString as BS import qualified Data.ByteString as BS
import Data.Either (rights) import Data.Either (rights)
@ -63,6 +66,7 @@ import Data.Maybe (fromJust, fromMaybe, isJust,
isNothing, mapMaybe, maybe) isNothing, mapMaybe, maybe)
import qualified Data.Set as Set import qualified Data.Set as Set
import Data.Time.Clock.POSIX import Data.Time.Clock.POSIX
import Data.Word (Word8)
import Network.Socket hiding (recv, recvFrom, send, import Network.Socket hiding (recv, recvFrom, send,
sendTo) sendTo)
import Network.Socket.ByteString import Network.Socket.ByteString
@ -74,23 +78,27 @@ import Hash2Pub.ASN1Coding
import Hash2Pub.FediChordTypes (CacheEntry (..), import Hash2Pub.FediChordTypes (CacheEntry (..),
CacheEntry (..), CacheEntry (..),
FediChordConf (..), FediChordConf (..),
HasKeyID (..), HasKeyID (..), LoadStats (..),
LocalNodeState (..), LocalNodeState (..),
LocalNodeStateSTM, NodeCache, LocalNodeStateSTM, NodeCache,
NodeID, NodeState (..), NodeID, NodeState (..),
RealNode (..), RealNodeSTM, RealNode (..), RealNodeSTM,
RemoteNodeState (..), RemoteNodeState (..),
RingEntry (..), RingMap (..), RingEntry (..), RingMap (..),
SegmentLoadStats (..),
Service (..), addRMapEntry, Service (..), addRMapEntry,
addRMapEntryWith, addRMapEntryWith,
cacheGetNodeStateUnvalidated, cacheGetNodeStateUnvalidated,
cacheLookup, cacheLookupPred, cacheLookup, cacheLookupPred,
cacheLookupSucc, genNodeID, cacheLookupSucc, genNodeID,
getKeyID, localCompare, getKeyID, hasValidNodeId,
loadSliceSum, localCompare,
rMapFromList, rMapLookupPred, rMapFromList, rMapLookupPred,
rMapLookupSucc, rMapLookupSucc,
remainingLoadTarget,
setPredecessors, setSuccessors) setPredecessors, setSuccessors)
import Hash2Pub.ProtocolTypes import Hash2Pub.ProtocolTypes
import Hash2Pub.RingMap
import Debug.Trace (trace) import Debug.Trace (trace)
@ -103,7 +111,7 @@ queryLocalCache ownState nCache lBestNodes targetID
-- as target ID falls between own ID and first predecessor, it is handled by this node -- as target ID falls between own ID and first predecessor, it is handled by this node
-- This only makes sense if the node is part of the DHT by having joined. -- This only makes sense if the node is part of the DHT by having joined.
-- A default answer to nodes querying an unjoined node is provided by 'respondQueryID'. -- A default answer to nodes querying an unjoined node is provided by 'respondQueryID'.
| isJoined ownState && targetID `isInOwnResponsibilitySlice` ownState = FOUND . toRemoteNodeState $ ownState | vsIsJoined ownState && targetID `isInOwnResponsibilitySlice` ownState = FOUND . toRemoteNodeState $ ownState
-- my interpretation: the "l best next hops" are the l-1 closest preceding nodes and -- my interpretation: the "l best next hops" are the l-1 closest preceding nodes and
-- the closest succeeding node (like with the p initiated parallel queries -- the closest succeeding node (like with the p initiated parallel queries
| otherwise = FORWARD $ closestSuccessor `Set.union` closestCachePredecessors (lBestNodes-1) targetID nCache | otherwise = FORWARD $ closestSuccessor `Set.union` closestCachePredecessors (lBestNodes-1) targetID nCache
@ -227,8 +235,8 @@ markCacheEntryAsVerified timestamp nid = RingMap . Map.adjust adjustFunc nid . g
-- | uses the successor and predecessor list of a node as an indicator for whether a -- | uses the successor and predecessor list of a node as an indicator for whether a
-- node has properly joined the DHT -- node has properly joined the DHT
isJoined :: LocalNodeState s -> Bool vsIsJoined :: LocalNodeState s -> Bool
isJoined ns = not . all null $ [successors ns, predecessors ns] vsIsJoined ns = not . all null $ [successors ns, predecessors ns]
-- | the size limit to be used when serialising messages for sending -- | the size limit to be used when serialising messages for sending
sendMessageSize :: Num i => i sendMessageSize :: Num i => i
@ -237,27 +245,37 @@ sendMessageSize = 1200
-- ====== message send and receive operations ====== -- ====== message send and receive operations ======
-- encode the response to a request that just signals successful receipt -- encode the response to a request that just signals successful receipt
ackRequest :: NodeID -> FediChordMessage -> Map.Map Integer BS.ByteString ackRequest :: FediChordMessage -> Map.Map Integer BS.ByteString
ackRequest ownID req@Request{} = serialiseMessage sendMessageSize $ Response { ackRequest req@Request{} = serialiseMessage sendMessageSize $ Response {
requestID = requestID req requestID = requestID req
, senderID = ownID , senderID = receiverID req
, part = part req , part = part req
, isFinalPart = False , isFinalPart = False
, action = action req , action = action req
, payload = Nothing , payload = Nothing
} }
ackRequest _ _ = Map.empty ackRequest _ = Map.empty
-- | extract the first payload from a received message set
extractFirstPayload :: Set.Set FediChordMessage -> Maybe ActionPayload
extractFirstPayload msgSet = foldr' (\msg plAcc ->
if isNothing plAcc && isJust (payload msg)
then payload msg
else plAcc
) Nothing msgSet
-- | Dispatch incoming requests to the dedicated handling and response function, and enqueue -- | Dispatch incoming requests to the dedicated handling and response function, and enqueue
-- the response to be sent. -- the response to be sent.
handleIncomingRequest :: Service s (RealNodeSTM s) handleIncomingRequest :: Service s (RealNodeSTM s)
=> LocalNodeStateSTM s -- ^ the handling node => Word8 -- ^ maximum number of vservers, because of decision to @dropSpoofedIDs@ in here and not already in @fediMessageHandler@
-> LocalNodeStateSTM s -- ^ the handling node
-> TQueue (BS.ByteString, SockAddr) -- ^ send queue -> TQueue (BS.ByteString, SockAddr) -- ^ send queue
-> Set.Set FediChordMessage -- ^ all parts of the request to handle -> Set.Set FediChordMessage -- ^ all parts of the request to handle
-> SockAddr -- ^ source address of the request -> SockAddr -- ^ source address of the request
-> IO () -> IO ()
handleIncomingRequest nsSTM sendQ msgSet sourceAddr = do handleIncomingRequest vsLimit nsSTM sendQ msgSet sourceAddr = do
ns <- readTVarIO nsSTM ns <- readTVarIO nsSTM
-- add nodestate to cache -- add nodestate to cache
now <- getPOSIXTime now <- getPOSIXTime
@ -265,19 +283,20 @@ handleIncomingRequest nsSTM sendQ msgSet sourceAddr = do
Nothing -> pure () Nothing -> pure ()
Just aPart -> do Just aPart -> do
let (SockAddrInet6 _ _ sourceIP _) = sourceAddr let (SockAddrInet6 _ _ sourceIP _) = sourceAddr
queueAddEntries (Identity $ RemoteCacheEntry (sender aPart) now) ns queueAddEntries (Identity $ RemoteCacheEntry (sender aPart) now) (cacheWriteQueue ns)
-- distinguish on whether and how to respond. If responding, pass message to response generating function and write responses to send queue -- distinguish on whether and how to respond. If responding, pass message to response generating function and write responses to send queue
maybe (pure ()) ( maybe (pure ()) (
mapM_ (\resp -> atomically $ writeTQueue sendQ (resp, sourceAddr)) mapM_ (\resp -> atomically $ writeTQueue sendQ (resp, sourceAddr))
) )
=<< (case action aPart of =<< (case action aPart of
Ping -> Just <$> respondPing nsSTM msgSet Ping -> Just <$> respondPing nsSTM msgSet
Join -> dropSpoofedIDs sourceIP nsSTM msgSet respondJoin Join -> dropSpoofedIDs vsLimit sourceIP nsSTM msgSet respondJoin
-- ToDo: figure out what happens if not joined -- ToDo: figure out what happens if not joined
QueryID -> Just <$> respondQueryID nsSTM msgSet QueryID -> Just <$> respondQueryID nsSTM msgSet
-- only when joined -- only when joined
Leave -> if isJoined ns then dropSpoofedIDs sourceIP nsSTM msgSet respondLeave else pure Nothing Leave -> if vsIsJoined ns then dropSpoofedIDs vsLimit sourceIP nsSTM msgSet respondLeave else pure Nothing
Stabilise -> if isJoined ns then dropSpoofedIDs sourceIP nsSTM msgSet respondStabilise else pure Nothing Stabilise -> if vsIsJoined ns then dropSpoofedIDs vsLimit sourceIP nsSTM msgSet respondStabilise else pure Nothing
QueryLoad -> if vsIsJoined ns then Just <$> respondQueryLoad nsSTM msgSet else pure Nothing
) )
-- for single part request, response starts with part number 1. For multipart requests, response starts with part number n+1. -- for single part request, response starts with part number 1. For multipart requests, response starts with part number n+1.
@ -287,19 +306,18 @@ handleIncomingRequest nsSTM sendQ msgSet sourceAddr = do
-- | Filter out requests with spoofed node IDs by recomputing the ID using -- | Filter out requests with spoofed node IDs by recomputing the ID using
-- the sender IP. -- the sender IP.
-- For valid (non-spoofed) sender IDs, the passed responder function is invoked. -- For valid (non-spoofed) sender IDs, the passed responder function is invoked.
dropSpoofedIDs :: HostAddress6 -- msg source address dropSpoofedIDs :: Word8 -- ^ maximum number of vservers per node
-> HostAddress6 -- ^ msg source address
-> LocalNodeStateSTM s -> LocalNodeStateSTM s
-> Set.Set FediChordMessage -- message parts of the request -> Set.Set FediChordMessage -- ^ message parts of the request
-> (LocalNodeStateSTM s -> Set.Set FediChordMessage -> IO (Map.Map Integer BS.ByteString)) -- reponder function to be invoked for valid requests -> (LocalNodeStateSTM s -> Set.Set FediChordMessage -> IO (Map.Map Integer BS.ByteString)) -- ^ reponder function to be invoked for valid requests
-> IO (Maybe (Map.Map Integer BS.ByteString)) -> IO (Maybe (Map.Map Integer BS.ByteString))
dropSpoofedIDs addr nsSTM' msgSet' responder = dropSpoofedIDs limVs addr nsSTM' msgSet' responder =
let let
aRequestPart = Set.elemAt 0 msgSet aRequestPart = Set.elemAt 0 msgSet
senderNs = sender aRequestPart senderNs = sender aRequestPart
givenSenderID = getNid senderNs
recomputedID = genNodeID addr (getDomain senderNs) (fromInteger $ getVServerID senderNs)
in in
if recomputedID == givenSenderID if hasValidNodeId limVs senderNs addr
then Just <$> responder nsSTM' msgSet' then Just <$> responder nsSTM' msgSet'
else pure Nothing else pure Nothing
@ -317,11 +335,7 @@ respondQueryID nsSTM msgSet = do
let let
aRequestPart = Set.elemAt 0 msgSet aRequestPart = Set.elemAt 0 msgSet
senderID = getNid . sender $ aRequestPart senderID = getNid . sender $ aRequestPart
senderPayload = foldr' (\msg plAcc -> senderPayload = extractFirstPayload msgSet
if isNothing plAcc && isJust (payload msg)
then payload msg
else plAcc
) Nothing msgSet
-- return only empty message serialisation if no payload was included in parts -- return only empty message serialisation if no payload was included in parts
maybe (pure Map.empty) (\senderPayload' -> do maybe (pure Map.empty) (\senderPayload' -> do
responseMsg <- atomically $ do responseMsg <- atomically $ do
@ -329,7 +343,7 @@ respondQueryID nsSTM msgSet = do
cache <- readTVar $ nodeCacheSTM nsSnap cache <- readTVar $ nodeCacheSTM nsSnap
let let
responsePayload = QueryIDResponsePayload { responsePayload = QueryIDResponsePayload {
queryResult = if isJoined nsSnap queryResult = if vsIsJoined nsSnap
then queryLocalCache nsSnap cache (fromIntegral $ queryLBestNodes senderPayload') (queryTargetID senderPayload') then queryLocalCache nsSnap cache (fromIntegral $ queryLBestNodes senderPayload') (queryTargetID senderPayload')
-- if not joined yet, attract responsibility for -- if not joined yet, attract responsibility for
-- all keys to make bootstrapping possible -- all keys to make bootstrapping possible
@ -422,6 +436,47 @@ respondPing nsSTM msgSet = do
} }
pure $ serialiseMessage sendMessageSize pingResponse pure $ serialiseMessage sendMessageSize pingResponse
respondQueryLoad :: Service s (RealNodeSTM s) => LocalNodeStateSTM s -> Set.Set FediChordMessage -> IO (Map.Map Integer BS.ByteString)
respondQueryLoad nsSTM msgSet = do
nsSnap <- readTVarIO nsSTM
-- this message cannot be split reasonably, so just
-- consider the first payload
let
aRequestPart = Set.elemAt 0 msgSet
senderPayload = extractFirstPayload msgSet
responsePl <- maybe (pure Nothing) (\pl ->
case pl of
LoadRequestPayload{} -> do
parentNode <- readTVarIO (parentRealNode nsSnap)
let
serv = nodeService parentNode
conf = nodeConfig parentNode
lStats <- getServiceLoadStats serv
let
directSucc = getNid . head . predecessors $ nsSnap
handledTagSum = loadSliceSum lStats directSucc (loadSegmentUpperBound pl)
pure $ Just LoadResponsePayload
{ loadSum = handledTagSum
, loadRemainingTarget = remainingLoadTarget conf lStats
, loadTotalCapacity = totalCapacity lStats
, loadSegmentLowerBound = directSucc
}
_ -> pure Nothing
)
senderPayload
pure $ maybe
Map.empty
(\pl -> serialiseMessage sendMessageSize $ Response
{ requestID = requestID aRequestPart
, senderID = getNid nsSnap
, part = if Set.size msgSet == 1 then 1 else fromIntegral $ Set.size msgSet + 1
, isFinalPart = False
, action = QueryLoad
, payload = Just pl
}
) responsePl
respondJoin :: Service s (RealNodeSTM s) => LocalNodeStateSTM s -> Set.Set FediChordMessage -> IO (Map.Map Integer BS.ByteString) respondJoin :: Service s (RealNodeSTM s) => LocalNodeStateSTM s -> Set.Set FediChordMessage -> IO (Map.Map Integer BS.ByteString)
respondJoin nsSTM msgSet = do respondJoin nsSTM msgSet = do
@ -434,7 +489,7 @@ respondJoin nsSTM msgSet = do
senderNS = sender aRequestPart senderNS = sender aRequestPart
-- if not joined yet, attract responsibility for -- if not joined yet, attract responsibility for
-- all keys to make bootstrapping possible -- all keys to make bootstrapping possible
responsibilityLookup = if isJoined nsSnap then queryLocalCache nsSnap cache 1 (getNid senderNS) else FOUND (toRemoteNodeState nsSnap) responsibilityLookup = if vsIsJoined nsSnap then queryLocalCache nsSnap cache 1 (getNid senderNS) else FOUND (toRemoteNodeState nsSnap)
thisNodeResponsible (FOUND _) = True thisNodeResponsible (FOUND _) = True
thisNodeResponsible (FORWARD _) = False thisNodeResponsible (FORWARD _) = False
-- check whether the joining node falls into our responsibility -- check whether the joining node falls into our responsibility
@ -481,6 +536,21 @@ respondJoin nsSTM msgSet = do
-- ....... request sending ....... -- ....... request sending .......
-- | defautl constructor for request messages, fills standard values like
-- part number to avoid code repition
mkRequest :: LocalNodeState s -> NodeID -> Action -> Maybe ActionPayload -> (Integer -> FediChordMessage)
mkRequest ns targetID action pl rid = Request
{ requestID = rid
, receiverID = targetID
, sender = toRemoteNodeState ns
-- part number and final flag can be changed by ASN1 encoder to make packet
-- fit the MTU restrictions
, part = 1
, isFinalPart = True
, action = action
, payload = pl
}
-- | send a join request and return the joined 'LocalNodeState' including neighbours -- | send a join request and return the joined 'LocalNodeState' including neighbours
requestJoin :: (NodeState a, Service s (RealNodeSTM s)) => a -- ^ currently responsible node to be contacted requestJoin :: (NodeState a, Service s (RealNodeSTM s)) => a -- ^ currently responsible node to be contacted
-> LocalNodeStateSTM s -- ^ joining NodeState -> LocalNodeStateSTM s -- ^ joining NodeState
@ -492,7 +562,7 @@ requestJoin toJoinOn ownStateSTM = do
let srcAddr = confIP nodeConf let srcAddr = confIP nodeConf
bracket (mkSendSocket srcAddr (getDomain toJoinOn) (getDhtPort toJoinOn)) close (\sock -> do bracket (mkSendSocket srcAddr (getDomain toJoinOn) (getDhtPort toJoinOn)) close (\sock -> do
-- extract own state for getting request information -- extract own state for getting request information
responses <- sendRequestTo (confRequestTimeout nodeConf) (confRequestRetries nodeConf) (\rid -> Request rid (toRemoteNodeState ownState) 1 True Join (Just JoinRequestPayload)) sock responses <- sendRequestTo (confRequestTimeout nodeConf) (confRequestRetries nodeConf) (mkRequest ownState (getNid toJoinOn) Join (Just JoinRequestPayload)) sock
(cacheInsertQ, joinedState) <- atomically $ do (cacheInsertQ, joinedState) <- atomically $ do
stateSnap <- readTVar ownStateSTM stateSnap <- readTVar ownStateSTM
let let
@ -523,7 +593,7 @@ requestJoin toJoinOn ownStateSTM = do
writeTVar ownStateSTM newState writeTVar ownStateSTM newState
pure (cacheInsertQ, newState) pure (cacheInsertQ, newState)
-- execute the cache insertions -- execute the cache insertions
mapM_ (\f -> f joinedState) cacheInsertQ mapM_ (\f -> f (cacheWriteQueue joinedState)) cacheInsertQ
if responses == Set.empty if responses == Set.empty
then pure . Left $ "join error: got no response from " <> show (getNid toJoinOn) then pure . Left $ "join error: got no response from " <> show (getNid toJoinOn)
else do else do
@ -581,14 +651,14 @@ sendQueryIdMessages :: (Integral i)
-> Maybe i -- ^ optionally provide an explicit @l@ parameter of number of nodes to be returned -> Maybe i -- ^ optionally provide an explicit @l@ parameter of number of nodes to be returned
-> [RemoteNodeState] -- ^ nodes to query -> [RemoteNodeState] -- ^ nodes to query
-> IO QueryResponse -- ^ accumulated response -> IO QueryResponse -- ^ accumulated response
sendQueryIdMessages targetID ns lParam targets = do sendQueryIdMessages lookupID ns lParam targets = do
-- create connected sockets to all query targets and use them for request handling -- create connected sockets to all query targets and use them for request handling
nodeConf <- nodeConfig <$> readTVarIO (parentRealNode ns) nodeConf <- nodeConfig <$> readTVarIO (parentRealNode ns)
let srcAddr = confIP nodeConf let srcAddr = confIP nodeConf
queryThreads <- mapM (\resultNode -> async $ bracket (mkSendSocket srcAddr (getDomain resultNode) (getDhtPort resultNode)) close ( queryThreads <- mapM (\resultNode -> async $ bracket (mkSendSocket srcAddr (getDomain resultNode) (getDhtPort resultNode)) close (
sendRequestTo (confRequestTimeout nodeConf) (confRequestRetries nodeConf) (lookupMessage targetID ns Nothing) sendRequestTo (confRequestTimeout nodeConf) (confRequestRetries nodeConf) (lookupMessage lookupID ns Nothing (getNid resultNode))
)) targets )) targets
-- ToDo: process results immediately instead of waiting for the last one to finish, see https://stackoverflow.com/a/38815224/9198613 -- ToDo: process results immediately instead of waiting for the last one to finish, see https://stackoverflow.com/a/38815224/9198613
-- ToDo: exception handling, maybe log them -- ToDo: exception handling, maybe log them
@ -605,7 +675,7 @@ sendQueryIdMessages targetID ns lParam targets = do
_ -> Set.empty _ -> Set.empty
-- forward entries to global cache -- forward entries to global cache
queueAddEntries entrySet ns queueAddEntries entrySet (cacheWriteQueue ns)
-- return accumulated QueryResult -- return accumulated QueryResult
pure $ case acc of pure $ case acc of
-- once a FOUND as been encountered, return this as a result -- once a FOUND as been encountered, return this as a result
@ -621,13 +691,14 @@ sendQueryIdMessages targetID ns lParam targets = do
-- | Create a QueryID message to be supplied to 'sendRequestTo' -- | Create a QueryID message to be supplied to 'sendRequestTo'
lookupMessage :: Integral i lookupMessage :: Integral i
=> NodeID -- ^ target ID => NodeID -- ^ lookup ID to be looked up
-> LocalNodeState s -- ^ sender node state -> LocalNodeState s -- ^ sender node state
-> Maybe i -- ^ optionally provide a different l parameter -> Maybe i -- ^ optionally provide a different l parameter
-> NodeID -- ^ target ID of message destination
-> (Integer -> FediChordMessage) -> (Integer -> FediChordMessage)
lookupMessage targetID ns lParam = \rID -> Request rID (toRemoteNodeState ns) 1 True QueryID (Just $ pl ns targetID) lookupMessage lookupID ns lParam targetID = mkRequest ns targetID QueryID (Just $ pl ns lookupID)
where where
pl ns' targetID' = QueryIDRequestPayload { queryTargetID = targetID', queryLBestNodes = maybe (fromIntegral $ lNumBestNodes ns) fromIntegral lParam } pl ns' lookupID' = QueryIDRequestPayload { queryTargetID = lookupID', queryLBestNodes = maybe (fromIntegral $ lNumBestNodes ns') fromIntegral lParam }
-- | Send a stabilise request to provided 'RemoteNode' and, if successful, -- | Send a stabilise request to provided 'RemoteNode' and, if successful,
@ -638,16 +709,7 @@ requestStabilise :: LocalNodeState s -- ^ sending node
requestStabilise ns neighbour = do requestStabilise ns neighbour = do
nodeConf <- nodeConfig <$> readTVarIO (parentRealNode ns) nodeConf <- nodeConfig <$> readTVarIO (parentRealNode ns)
let srcAddr = confIP nodeConf let srcAddr = confIP nodeConf
responses <- bracket (mkSendSocket srcAddr (getDomain neighbour) (getDhtPort neighbour)) close (fmap Right . sendRequestTo (confRequestTimeout nodeConf) (confRequestRetries nodeConf) (\rid -> responses <- bracket (mkSendSocket srcAddr (getDomain neighbour) (getDhtPort neighbour)) close (fmap Right . sendRequestTo (confRequestTimeout nodeConf) (confRequestRetries nodeConf) (mkRequest ns (getNid neighbour) Stabilise (Just StabiliseRequestPayload))
Request {
requestID = rid
, sender = toRemoteNodeState ns
, part = 1
, isFinalPart = False
, action = Stabilise
, payload = Just StabiliseRequestPayload
}
)
) `catch` (\e -> pure . Left $ displayException (e :: IOException)) ) `catch` (\e -> pure . Left $ displayException (e :: IOException))
either either
-- forward IO error messages -- forward IO error messages
@ -660,7 +722,7 @@ requestStabilise ns neighbour = do
) )
([],[]) respSet ([],[]) respSet
-- update successfully responded neighbour in cache -- update successfully responded neighbour in cache
maybe (pure ()) (\p -> queueUpdateVerifieds (Identity $ senderID p) ns) $ headMay (Set.elems respSet) maybe (pure ()) (\p -> queueUpdateVerifieds (Identity $ senderID p) (cacheWriteQueue ns)) $ headMay (Set.elems respSet)
pure $ if null responsePreds && null responseSuccs pure $ if null responsePreds && null responseSuccs
then Left "no neighbours returned" then Left "no neighbours returned"
else Right (responsePreds, responseSuccs) else Right (responsePreds, responseSuccs)
@ -682,16 +744,11 @@ requestLeave ns doMigration target = do
, leavePredecessors = predecessors ns , leavePredecessors = predecessors ns
, leaveDoMigration = doMigration , leaveDoMigration = doMigration
} }
responses <- bracket (mkSendSocket srcAddr (getDomain target) (getDhtPort target)) close (fmap Right . sendRequestTo (confRequestTimeout nodeConf) (confRequestRetries nodeConf) (\rid -> responses <- bracket
Request { (mkSendSocket srcAddr (getDomain target) (getDhtPort target))
requestID = rid close
, sender = toRemoteNodeState ns (fmap Right
, part = 1 . sendRequestTo (confRequestTimeout nodeConf) (confRequestRetries nodeConf) (mkRequest ns (getNid target) Leave (Just leavePayload))
, isFinalPart = False
, action = Leave
, payload = Just leavePayload
}
)
) `catch` (\e -> pure . Left $ displayException (e :: IOException)) ) `catch` (\e -> pure . Left $ displayException (e :: IOException))
either either
-- forward IO error messages -- forward IO error messages
@ -708,16 +765,7 @@ requestPing ns target = do
let srcAddr = confIP nodeConf let srcAddr = confIP nodeConf
responses <- bracket (mkSendSocket srcAddr (getDomain target) (getDhtPort target)) close responses <- bracket (mkSendSocket srcAddr (getDomain target) (getDhtPort target)) close
(\sock -> do (\sock -> do
resp <- sendRequestTo (confRequestTimeout nodeConf) (confRequestRetries nodeConf) (\rid -> resp <- sendRequestTo (confRequestTimeout nodeConf) (confRequestRetries nodeConf) (mkRequest ns (getNid target) Ping (Just PingRequestPayload)) sock
Request {
requestID = rid
, sender = toRemoteNodeState ns
, part = 1
, isFinalPart = False
, action = Ping
, payload = Just PingRequestPayload
}
) sock
(SockAddrInet6 _ _ peerAddr _) <- getPeerName sock (SockAddrInet6 _ _ peerAddr _) <- getPeerName sock
pure $ Right (peerAddr, resp) pure $ Right (peerAddr, resp)
) `catch` (\e -> pure . Left $ displayException (e :: IOException)) ) `catch` (\e -> pure . Left $ displayException (e :: IOException))
@ -733,8 +781,7 @@ requestPing ns target = do
-- recompute ID for each received node and mark as verified in cache -- recompute ID for each received node and mark as verified in cache
now <- getPOSIXTime now <- getPOSIXTime
forM_ responseVss (\vs -> forM_ responseVss (\vs ->
let recomputedID = genNodeID peerAddr (getDomain vs) (fromInteger $ getVServerID vs) if hasValidNodeId (confKChoicesMaxVS nodeConf) vs peerAddr
in if recomputedID == getNid vs
then atomically $ writeTQueue (cacheWriteQueue ns) $ addNodeAsVerifiedPure now vs then atomically $ writeTQueue (cacheWriteQueue ns) $ addNodeAsVerifiedPure now vs
else pure () else pure ()
) )
@ -744,6 +791,37 @@ requestPing ns target = do
) responses ) responses
-- still need a particular vserver as LocalNodeState, because requests need a sender
requestQueryLoad :: (MonadError String m, MonadIO m)
=> LocalNodeState s -- ^ the local source vserver for the request
-> NodeID -- ^ upper bound of the segment queried, lower bound is set automatically by the queried node
-> RemoteNodeState -- ^ target node to query
-> m SegmentLoadStats
requestQueryLoad ns upperIdBound target = do
nodeConf <- nodeConfig <$> liftIO (readTVarIO $ parentRealNode ns)
let
srcAddr = confIP nodeConf
loadReqPl = LoadRequestPayload
{ loadSegmentUpperBound = upperIdBound
}
responses <- liftIO $ bracket (mkSendSocket srcAddr (getDomain target) (getDhtPort target)) close
(fmap Right . sendRequestTo (confRequestTimeout nodeConf) (confRequestRetries nodeConf) (mkRequest ns (getNid target) QueryLoad (Just loadReqPl))
) `catch` (\e -> pure . Left $ displayException (e :: IOException))
responseMsgSet <- liftEither responses
-- throws an error if an exception happened
loadResPl <- maybe (throwError "no load response payload found") pure
(extractFirstPayload responseMsgSet)
pure SegmentLoadStats
{ segmentLowerKeyBound = loadSegmentLowerBound loadResPl
, segmentUpperKeyBound = upperIdBound
, segmentLoad = loadSum loadResPl
, segmentOwnerRemainingLoadTarget = loadRemainingTarget loadResPl
, segmentOwnerCapacity = loadTotalCapacity loadResPl
, segmentCurrentOwner = target
}
-- | Generic function for sending a request over a connected socket and collecting the response. -- | Generic function for sending a request over a connected socket and collecting the response.
-- Serialises the message and tries to deliver its parts for a number of attempts within a specified timeout. -- Serialises the message and tries to deliver its parts for a number of attempts within a specified timeout.
sendRequestTo :: Int -- ^ timeout in milliseconds sendRequestTo :: Int -- ^ timeout in milliseconds
@ -800,24 +878,24 @@ sendRequestTo timeoutMillis numAttempts msgIncomplete sock = do
-- | enqueue a list of RemoteCacheEntries to be added to the global NodeCache -- | enqueue a list of RemoteCacheEntries to be added to the global NodeCache
queueAddEntries :: Foldable c => c RemoteCacheEntry queueAddEntries :: Foldable c => c RemoteCacheEntry
-> LocalNodeState s -> TQueue (NodeCache -> NodeCache)
-> IO () -> IO ()
queueAddEntries entries ns = do queueAddEntries entries cacheQ = do
now <- getPOSIXTime now <- getPOSIXTime
forM_ entries $ \entry -> atomically $ writeTQueue (cacheWriteQueue ns) $ addCacheEntryPure now entry forM_ entries $ \entry -> atomically $ writeTQueue cacheQ $ addCacheEntryPure now entry
-- | enque a list of node IDs to be deleted from the global NodeCache -- | enque a list of node IDs to be deleted from the global NodeCache
queueDeleteEntries :: Foldable c queueDeleteEntries :: Foldable c
=> c NodeID => c NodeID
-> LocalNodeState s -> TQueue (NodeCache -> NodeCache)
-> IO () -> IO ()
queueDeleteEntries ids ns = forM_ ids $ atomically . writeTQueue (cacheWriteQueue ns) . deleteCacheEntry queueDeleteEntries ids cacheQ = forM_ ids $ atomically . writeTQueue cacheQ . deleteCacheEntry
-- | enque a single node ID to be deleted from the global NodeCache -- | enque a single node ID to be deleted from the global NodeCache
queueDeleteEntry :: NodeID queueDeleteEntry :: NodeID
-> LocalNodeState s -> TQueue (NodeCache -> NodeCache)
-> IO () -> IO ()
queueDeleteEntry toDelete = queueDeleteEntries $ Identity toDelete queueDeleteEntry toDelete = queueDeleteEntries $ Identity toDelete
@ -826,11 +904,11 @@ queueDeleteEntry toDelete = queueDeleteEntries $ Identity toDelete
-- global 'NodeCache'. -- global 'NodeCache'.
queueUpdateVerifieds :: Foldable c queueUpdateVerifieds :: Foldable c
=> c NodeID => c NodeID
-> LocalNodeState s -> TQueue (NodeCache -> NodeCache)
-> IO () -> IO ()
queueUpdateVerifieds nIds ns = do queueUpdateVerifieds nIds cacheQ = do
now <- getPOSIXTime now <- getPOSIXTime
forM_ nIds $ \nid' -> atomically $ writeTQueue (cacheWriteQueue ns) $ forM_ nIds $ \nid' -> atomically $ writeTQueue cacheQ $
markCacheEntryAsVerified (Just now) nid' markCacheEntryAsVerified (Just now) nid'
-- | retry an IO action at most *i* times until it delivers a result -- | retry an IO action at most *i* times until it delivers a result

View file

@ -63,15 +63,20 @@ import Control.Exception
import Control.Monad (forM_, forever) import Control.Monad (forM_, forever)
import Control.Monad.Except import Control.Monad.Except
import Crypto.Hash import Crypto.Hash
import Data.Bifunctor (first)
import qualified Data.ByteArray as BA import qualified Data.ByteArray as BA
import qualified Data.ByteString as BS import qualified Data.ByteString as BS
import qualified Data.ByteString.UTF8 as BSU import qualified Data.ByteString.UTF8 as BSU
import Data.Either (rights) import Data.Either (rights)
import Data.Foldable (foldr') import Data.Foldable (foldr')
import Data.Functor.Identity import Data.Functor.Identity
import Data.HashMap.Strict (HashMap)
import qualified Data.HashMap.Strict as HMap
import Data.HashSet (HashSet)
import qualified Data.HashSet as HSet
import Data.IP (IPv6, fromHostAddress6, import Data.IP (IPv6, fromHostAddress6,
toHostAddress6) toHostAddress6)
import Data.List ((\\)) import Data.List (sortBy, sortOn, (\\))
import qualified Data.Map.Strict as Map import qualified Data.Map.Strict as Map
import Data.Maybe (catMaybes, fromJust, fromMaybe, import Data.Maybe (catMaybes, fromJust, fromMaybe,
isJust, isNothing, mapMaybe) isJust, isNothing, mapMaybe)
@ -87,6 +92,7 @@ import System.Random (randomRIO)
import Hash2Pub.DHTProtocol import Hash2Pub.DHTProtocol
import Hash2Pub.FediChordTypes import Hash2Pub.FediChordTypes
import Hash2Pub.RingMap
import Hash2Pub.Utils import Hash2Pub.Utils
import Debug.Trace (trace) import Debug.Trace (trace)
@ -96,50 +102,87 @@ import Debug.Trace (trace)
fediChordInit :: (Service s (RealNodeSTM s)) fediChordInit :: (Service s (RealNodeSTM s))
=> FediChordConf => FediChordConf
-> (RealNodeSTM s -> IO (s (RealNodeSTM s))) -- ^ runner function for service -> (RealNodeSTM s -> IO (s (RealNodeSTM s))) -- ^ runner function for service
-> IO (Socket, LocalNodeStateSTM s) -> IO (Async (), RealNodeSTM s)
fediChordInit initConf serviceRunner = do fediChordInit initConf serviceRunner = do
emptyLookupCache <- newTVarIO Map.empty emptyLookupCache <- newTVarIO Map.empty
let realNode = RealNode { cacheSTM <- newTVarIO initCache
vservers = [] cacheQ <- atomically newTQueue
let realNode = RealNode
{ vservers = emptyRMap
, nodeConfig = initConf , nodeConfig = initConf
, bootstrapNodes = confBootstrapNodes initConf , bootstrapNodes = confBootstrapNodes initConf
, lookupCacheSTM = emptyLookupCache , lookupCacheSTM = emptyLookupCache
, nodeService = undefined , nodeService = undefined
, globalNodeCacheSTM = cacheSTM
, globalCacheWriteQueue = cacheQ
} }
realNodeSTM <- newTVarIO realNode realNodeSTM <- newTVarIO realNode
serverSock <- mkServerSocket (confIP initConf) (fromIntegral $ confDhtPort initConf)
-- launch service and set the reference in the RealNode -- launch service and set the reference in the RealNode
serv <- serviceRunner realNodeSTM serv <- serviceRunner realNodeSTM
atomically . modifyTVar' realNodeSTM $ \rn -> rn { nodeService = serv } atomically . modifyTVar' realNodeSTM $ \rn -> rn { nodeService = serv }
-- initialise a single vserver -- prepare for joining: start node cache writer thread
initialState <- nodeStateInit realNodeSTM -- currently no masking is necessary, as there is nothing to clean up
initialStateSTM <- newTVarIO initialState nodeCacheWriterThread <- forkIO $ nodeCacheWriter realNodeSTM
-- add vserver to list at RealNode fediThreadsAsync <-
atomically . modifyTVar' realNodeSTM $ \rn -> rn { vservers = initialStateSTM:vservers rn } either (\err -> do
serverSock <- mkServerSocket (getIpAddr initialState) (getDhtPort initialState) -- handle unsuccessful join
pure (serverSock, initialStateSTM) putStrLn $ err <> " Error joining, start listening for incoming requests anyways"
-- add an unjoined placeholder vserver to be able to listen for
-- incoming request
placeholderVS <- nodeStateInit realNodeSTM 0
placeholderVSSTM <- newTVarIO placeholderVS
atomically . modifyTVar' realNodeSTM $
addVserver (getNid placeholderVS, placeholderVSSTM)
-- launch thread attempting to join on new cache entries
_ <- forkIO $ joinOnNewEntriesThread realNodeSTM
async (fediMainThreads serverSock realNodeSTM)
)
(\_ -> do
-- launch main eventloop with successfully joined state
putStrLn "successful join"
async (fediMainThreads serverSock realNodeSTM)
)
=<< tryBootstrapJoining realNodeSTM
pure (fediThreadsAsync, realNodeSTM)
-- | Create a new vserver and join it through a provided remote node.
-- TODO: Many fediChord* functions already cover parts of this, refactor these to use
-- this function.
fediChordJoinNewVs :: (MonadError String m, MonadIO m, Service s (RealNodeSTM s))
=> RealNodeSTM s -- ^ parent real node
-> Word8 -- ^ vserver ID
-> RemoteNodeState -- ^ target node to join on
-> m (NodeID, LocalNodeStateSTM s) -- ^ on success: (vserver ID, TVar of vserver)
fediChordJoinNewVs nodeSTM vsId target = do
newVs <- liftIO $ nodeStateInit nodeSTM vsId
newVsSTM <- liftIO $ newTVarIO newVs
liftIO . putStrLn $ "Trying to join on " <> show (getNid target)
liftIO $ putStrLn "send a Join"
_ <- liftIO . requestJoin target $ newVsSTM
pure (getNid newVs, newVsSTM)
-- | initialises the 'NodeState' for this local node. -- | initialises the 'NodeState' for this local node.
-- Separated from 'fediChordInit' to be usable in tests. -- Separated from 'fediChordInit' to be usable in tests.
nodeStateInit :: Service s (RealNodeSTM s) => RealNodeSTM s -> IO (LocalNodeState s) nodeStateInit :: Service s (RealNodeSTM s) => RealNodeSTM s -> Word8 -> IO (LocalNodeState s)
nodeStateInit realNodeSTM = do nodeStateInit realNodeSTM vsID' = do
realNode <- readTVarIO realNodeSTM realNode <- readTVarIO realNodeSTM
cacheSTM <- newTVarIO initCache
q <- atomically newTQueue
let let
conf = nodeConfig realNode conf = nodeConfig realNode
vsID = 0 vsID = vsID'
containedState = RemoteNodeState { containedState = RemoteNodeState {
domain = confDomain conf domain = confDomain conf
, ipAddr = confIP conf , ipAddr = confIP conf
, nid = genNodeID (confIP conf) (confDomain conf) $ fromInteger vsID , nid = genNodeID (confIP conf) (confDomain conf) vsID
, dhtPort = toEnum $ confDhtPort conf , dhtPort = toEnum $ confDhtPort conf
, servicePort = getListeningPortFromService $ nodeService realNode , servicePort = getListeningPortFromService $ nodeService realNode
, vServerID = vsID , vServerID = vsID
} }
initialState = LocalNodeState { initialState = LocalNodeState {
nodeState = containedState nodeState = containedState
, nodeCacheSTM = cacheSTM , nodeCacheSTM = globalNodeCacheSTM realNode
, cacheWriteQueue = q , cacheWriteQueue = globalCacheWriteQueue realNode
, successors = [] , successors = []
, predecessors = [] , predecessors = []
, kNeighbours = 3 , kNeighbours = 3
@ -150,41 +193,275 @@ nodeStateInit realNodeSTM = do
} }
pure initialState pure initialState
-- | Joins a 'RealNode' to the DHT by joining several vservers, trying to match
-- the own load target best.
-- Triggers 'kChoicesVsJoin'
kChoicesNodeJoin :: (MonadError String m, MonadIO m, Service s (RealNodeSTM s))
=> RealNodeSTM s
-> Maybe (String, PortNumber) -- ^ domain and port of a bootstrapping node, if bootstrap joining
-> m ()
kChoicesNodeJoin nodeSTM bootstrapNode = do
node <- liftIO $ readTVarIO nodeSTM
-- use vserver 0 as origin of bootstrapping messages
vs0 <- liftIO $ nodeStateInit nodeSTM 0
vs0STM <- liftIO $ newTVarIO vs0
ownLoadStats <- liftIO . getServiceLoadStats . nodeService $ node
let
conf = nodeConfig node
-- T_a of k-choices
-- compute load target
joinLoadTarget = totalCapacity ownLoadStats * (confKChoicesUnderload conf + confKChoicesOverload conf) / 2
initialJoins = confKChoicesMaxVS conf `div` 2
-- edge case: however small the target is, at least join 1 vs
-- kCoicesVsJoin until target is met unless there's already an active & joined VS causing enough load
alreadyJoinedVss <- liftIO $ foldM (\sumAcc vsSTM -> readTVarIO vsSTM >>= (\vs -> pure . (+) sumAcc $ if vsIsJoined vs then 1 else 0)) 0 $ vservers node
unless (alreadyJoinedVss > 0 && compensatedLoadSum ownLoadStats >= joinLoadTarget) $ do
joinedVss <- vsJoins vs0 (totalCapacity ownLoadStats) (vservers node) joinLoadTarget (fromIntegral initialJoins - alreadyJoinedVss) nodeSTM
if nullRMap joinedVss
then throwError "k-choices join unsuccessful, no vserver joined"
else liftIO . atomically . modifyTVar' nodeSTM $ \node' -> node'
{ vservers = unionRMap joinedVss (vservers node') }
where
vsJoins :: (MonadError String m, MonadIO m, Service s (RealNodeSTM s))
=> LocalNodeState s -> Double -> VSMap s -> Double -> Int -> RealNodeSTM s -> m (VSMap s)
vsJoins _ _ vsmap _ 0 _ = pure vsmap
vsJoins queryVs capacity vsmap remainingTargetLoad remainingJoins nodeSTM'
| remainingTargetLoad <= 0 = pure vsmap
| otherwise = do
(acquiredLoad, (newNid, newVs)) <- kChoicesVsJoin queryVs bootstrapNode capacity vsmap nodeSTM' remainingTargetLoad
-- on successful vserver join add the new VS to node and recurse
vsJoins queryVs capacity (addRMapEntry newNid newVs vsmap) (remainingTargetLoad - acquiredLoad) (pred remainingJoins) nodeSTM'
-- on error, just reduce the amount of tries and retry
`catchError` (\e -> liftIO (putStrLn e) >> vsJoins queryVs capacity vsmap remainingTargetLoad (pred remainingJoins) nodeSTM')
-- error cause 1: not a single queried node has responded -> indicates permanent failure
-- error cause 2: only a certain join failed, just ignore that join target for now, but problem: it will be the chosen
-- target even at the next attempt again
-- `catchError` (const .
kChoicesVsJoin :: (MonadError String m, MonadIO m, Service s (RealNodeSTM s))
=> LocalNodeState s -- ^ vserver to be used for querying
-> Maybe (String, PortNumber) -- ^ domain and port of a bootstrapping node, if bootstrapping
-> Double -- ^ own capacity
-> VSMap s -- ^ currently active VServers
-> RealNodeSTM s -- ^ parent node is needed for initialising a new vserver
-> Double -- ^ remaining load target
-> m (Double, (NodeID, LocalNodeStateSTM s)) -- ^ on success return tuple of acquired load and newly acquired vserver
kChoicesVsJoin queryVs bootstrapNode capacity activeVss nodeSTM remainingTarget = do
conf <- nodeConfig <$> liftIO (readTVarIO nodeSTM)
-- generate all possible vs IDs
segmentLoads <- kChoicesSegmentLoads conf queryVs bootstrapNode activeVss
-- cost calculation and sort by cost
-- edge case: no possible ID has responded
(mincost, vsId, toJoinOn) <- maybe (throwError "received no load information") pure
. headMay
. sortOn (\(cost, _, _) -> cost)
. fmap (\(segment, vsId, toJoinOn) -> (kChoicesJoinCost remainingTarget capacity segment, vsId, toJoinOn))
$ segmentLoads
-- join at min cost
joinedNode <- fediChordJoinNewVs nodeSTM vsId toJoinOn
-- idea: a single join failure shall not make the whole process fail
--`catchError`
pure (mincost, joinedNode)
-- Possible optimisation:
-- Instead of sampling all join candidates again at each invocation, querying
-- all segment loads before the first join and trying to re-use these
-- load information can save round trips.
-- possible edge case: detect when joining a subsegment of one already owned
-- joining into own segments => When first joining into segment S1 and then
-- later joining into the subsegment S2, the
-- resulting load l(S1+S2) = l(S1) != l(S1) + l(S2)
-- => need to re-query the load of both S1 and S2
-- possible edge case 2: taking multiple segments from the same owner
-- changes the remainingLoadTarget at each vsJoin. This target change
-- needs to be accounted for starting from the 2nd vsJoin.
-- | query the load of all still unjoined VS positions
kChoicesSegmentLoads :: (Service s (RealNodeSTM s), MonadError String m, MonadIO m)
=> FediChordConf -- ^ config params needed for generating all possible VSs
-> LocalNodeState s -- ^ vserver to be used for querying
-> Maybe (String, PortNumber) -- ^ domain and port of a bootstrapping node, if bootstrapping
-> VSMap s -- ^ currently active VServers
-> m [(SegmentLoadStats, Word8, RemoteNodeState)]
kChoicesSegmentLoads conf queryVs bootstrapNode activeVss = do
let
-- tuples of node IDs and vserver IDs, because vserver IDs are needed for
-- LocalNodeState creation
nonJoinedIDs = filter (not . flip memberRMap activeVss . fst) [ (genNodeID (confIP conf) (confDomain conf) v, v) | v <- [0..pred (confKChoicesMaxVS conf)]]
-- query load of all possible segments
-- simplification: treat each load lookup failure as a general unavailability of that segment
-- TODO: retries for transient failures
-- TODO: parallel queries
fmap catMaybes . forM nonJoinedIDs $ (\(vsNid, vsId) -> (do
-- if bootstrap node is provided, do initial lookup via that
currentlyResponsible <- maybe
(requestQueryID queryVs vsNid)
(\bs -> bootstrapQueryId queryVs bs vsNid)
bootstrapNode
segment <- requestQueryLoad queryVs vsNid currentlyResponsible
pure $ Just (segment, vsId, currentlyResponsible)
-- store segment stats and vserver ID together, so it's clear
-- which vs needs to be joined to acquire this segment
) `catchError` const (pure Nothing)
)
kChoicesJoinCost :: Double -- ^ own remaining load target
-> Double -- ^ own capacity
-> SegmentLoadStats -- ^ load stats of neighbour vs
-> Double
kChoicesJoinCost remainingOwnLoad ownCap segment =
abs (segmentOwnerRemainingLoadTarget segment + segmentLoad segment) / segmentOwnerCapacity segment
+ abs (remainingOwnLoad - segmentLoad segment) / ownCap
- abs (segmentOwnerRemainingLoadTarget segment) / segmentOwnerCapacity segment
kChoicesDepartureCost :: Double -- ^ own remaining load target
-> Double -- ^ own capacity
-> Double -- ^ load of own segment to hand over
-> SegmentLoadStats -- ^ load stats of neighbour VS
-> Double
kChoicesDepartureCost remainingOwnLoad ownCap thisSegmentLoad segment =
abs (segmentOwnerRemainingLoadTarget segment - thisSegmentLoad) / segmentOwnerCapacity segment
+ abs (remainingOwnLoad + thisSegmentLoad) / ownCap
- abs (segmentOwnerRemainingLoadTarget segment) / segmentOwnerCapacity segment
kChoicesRebalanceThread :: (Service s (RealNodeSTM s)) => RealNodeSTM s -> IO ()
kChoicesRebalanceThread nodeSTM = do
interval <- confKChoicesRebalanceInterval . nodeConfig <$> readTVarIO nodeSTM
runExceptT $ loop interval
pure ()
where
loop interval = rebalanceVS interval `catchError` \_ -> loop interval
rebalanceVS :: (MonadError String m, MonadIO m) => Int -> m ()
rebalanceVS interval = do
liftIO $ threadDelay interval
node <- liftIO $ readTVarIO nodeSTM
let
activeVssSTM = vservers node
conf = nodeConfig node
-- use an active vserver for load queries
queryVsSTM <- maybe (throwError "no active vserver") pure
$ headMay (rMapToList activeVssSTM)
queryVs <- liftIO . readTVarIO $ queryVsSTM
-- TODO: segment load and neighbour load queries can be done in parallel
-- query load of all existing VSes neighbours
-- TODO: what happens if neighbour is one of our own vservers?
neighbourLoadFetches <- liftIO . forM activeVssSTM $ async . (\vsSTM -> runExceptT $ do
vs <- liftIO . readTVarIO $ vsSTM
succNode <- maybe
(throwError "vs has no successor")
pure
. headMay . successors $ vs
neighbourStats <- requestQueryLoad queryVs (getNid succNode) succNode
pure (getNid succNode, neighbourStats)
)
-- TODO: deal with exceptions
-- TODO: better handling of nested Eithers
-- so far this is a RingMap NodeID (Either SomeException (Either String (NodeID, SegmentLoadStats)))
neighbourLoads <- liftIO . mapM waitCatch $ neighbourLoadFetches
-- get local load stats
ownLoadStats <- liftIO . getServiceLoadStats . nodeService $ node
-- calculate all departure costs
let
departureCosts =
sortOn (\(cost, _, _) -> cost)
. foldl (\acc (ownVsId, neighbourLoad) -> case neighbourLoad of
Right (Right (neighbourId, neighbourStats)) ->
let
ownRemainingTarget = remainingLoadTarget conf ownLoadStats
thisSegmentLoad = loadSliceSum ownLoadStats ownVsId neighbourId
in
( kChoicesDepartureCost ownRemainingTarget (totalCapacity ownLoadStats) thisSegmentLoad neighbourStats
, thisSegmentLoad
, ownVsId)
:acc
_ -> acc
)
[]
$ rMapToListWithKeys neighbourLoads
-- select VS with lowest departure cost
(lowestDepartionCost, departingSegmentLoad, lowestCostDeparter) <- maybe
(throwError "not enough data for calculating departure costs")
pure
$ headMay departureCosts
-- query load of all possible available VS IDs
segmentLoads <- kChoicesSegmentLoads conf queryVs Nothing activeVssSTM
-- calculate all relocation costs of that VS
(joinCost, toJoinOn) <-
maybe (throwError "got no segment loads") pure
. headMay
. sortOn fst
. fmap (\(segment, vsId, toJoinOn) ->
let joinCosts = kChoicesJoinCost
-- when relocating a node, the load of the departing node is freed
(remainingLoadTarget conf ownLoadStats + departingSegmentLoad)
(totalCapacity ownLoadStats)
segment
in
(joinCosts, segmentCurrentOwner segment)
)
$ segmentLoads
-- if deciding to re-balance, first leave and then join
-- combined costs need to be a gain (negative) and that gain needs
-- to be larger than Epsilon
when (lowestDepartionCost + joinCost <= negate kChoicesEpsilon) $ do
liftIO . putStrLn $ "here will be a relocation!"
-- loop
rebalanceVS interval
-- TODO: make parameterisable
-- | dampening factor constant for deciding whether the match gain is worth relocating
kChoicesEpsilon :: Double
kChoicesEpsilon = 0.05
-- | Join a new node into the DHT, using a provided bootstrap node as initial cache seed -- | Join a new node into the DHT, using a provided bootstrap node as initial cache seed
-- for resolving the new node's position. -- for resolving the new node's position.
fediChordBootstrapJoin :: Service s (RealNodeSTM s) fediChordBootstrapJoin :: Service s (RealNodeSTM s)
=> LocalNodeStateSTM s -- ^ the local 'NodeState' => LocalNodeStateSTM s -- ^ the local 'NodeState'
-> (String, PortNumber) -- ^ domain and port of a bootstrapping node -> (String, PortNumber) -- ^ domain and port of a bootstrapping node
-> IO (Either String (LocalNodeStateSTM s)) -- ^ the joined 'NodeState' after a -> IO (Either String ()) -- ^ the joined 'NodeState' after a
-- successful join, otherwise an error message -- successful join, otherwise an error message
fediChordBootstrapJoin nsSTM bootstrapNode = do fediChordBootstrapJoin nsSTM bootstrapNode = do
-- can be invoked multiple times with all known bootstrapping nodes until successfully joined -- can be invoked multiple times with all known bootstrapping nodes until successfully joined
ns <- readTVarIO nsSTM ns <- readTVarIO nsSTM
runExceptT $ do runExceptT $ do
-- 1. get routed to the currently responsible node -- 1. get routed to the currently responsible node
lookupResp <- liftIO $ bootstrapQueryId nsSTM bootstrapNode $ getNid ns currentlyResponsible <- bootstrapQueryId ns bootstrapNode $ getNid ns
currentlyResponsible <- liftEither lookupResp
liftIO . putStrLn $ "Trying to join on " <> show (getNid currentlyResponsible) liftIO . putStrLn $ "Trying to join on " <> show (getNid currentlyResponsible)
-- 2. then send a join to the currently responsible node -- 2. then send a join to the currently responsible node
liftIO $ putStrLn "send a bootstrap Join" liftIO $ putStrLn "send a bootstrap Join"
joinResult <- liftIO $ requestJoin currentlyResponsible nsSTM _ <- liftEither =<< liftIO (requestJoin currentlyResponsible nsSTM)
liftEither joinResult pure ()
-- Periodically lookup own ID through a random bootstrapping node to discover and merge separated DHT clusters. -- Periodically lookup own IDs through a random bootstrapping node to discover and merge separated DHT clusters.
-- Unjoined try joining instead. -- Unjoined try joining instead.
convergenceSampleThread :: Service s (RealNodeSTM s) => LocalNodeStateSTM s -> IO () convergenceSampleThread :: Service s (RealNodeSTM s) => RealNodeSTM s -> IO ()
convergenceSampleThread nsSTM = forever $ do convergenceSampleThread nodeSTM = forever $ do
node <- readTVarIO nodeSTM
forM_ (vservers node) $ \nsSTM -> do
nsSnap <- readTVarIO nsSTM nsSnap <- readTVarIO nsSTM
parentNode <- readTVarIO $ parentRealNode nsSnap parentNode <- readTVarIO $ parentRealNode nsSnap
if isJoined nsSnap if vsIsJoined nsSnap
then then
runExceptT (do runExceptT (do
-- joined node: choose random node, do queryIDLoop, compare result with own responsibility -- joined node: choose random node, do queryIDLoop, compare result with own responsibility
let bss = bootstrapNodes parentNode let bss = bootstrapNodes parentNode
randIndex <- liftIO $ randomRIO (0, length bss - 1) randIndex <- liftIO $ randomRIO (0, length bss - 1)
chosenNode <- maybe (throwError "invalid bootstrapping node index") pure $ atMay bss randIndex chosenNode <- maybe (throwError "invalid bootstrapping node index") pure $ atMay bss randIndex
lookupResult <- liftIO $ bootstrapQueryId nsSTM chosenNode (getNid nsSnap) currentlyResponsible <- bootstrapQueryId nsSnap chosenNode (getNid nsSnap)
currentlyResponsible <- liftEither lookupResult
if getNid currentlyResponsible /= getNid nsSnap if getNid currentlyResponsible /= getNid nsSnap
-- if mismatch, stabilise on the result, else do nothing -- if mismatch, stabilise on the result, else do nothing
then do then do
@ -197,58 +474,96 @@ convergenceSampleThread nsSTM = forever $ do
else pure () else pure ()
) >> pure () ) >> pure ()
-- unjoined node: try joining through all bootstrapping nodes -- unjoined node: try joining through all bootstrapping nodes
else tryBootstrapJoining nsSTM >> pure () else tryBootstrapJoining nodeSTM >> pure ()
let delaySecs = confBootstrapSamplingInterval . nodeConfig $ parentNode
let delaySecs = confBootstrapSamplingInterval . nodeConfig $ node
threadDelay delaySecs threadDelay delaySecs
-- | Try joining the DHT through any of the bootstrapping nodes until it succeeds. -- | Try joining the DHT through any of the bootstrapping nodes until it succeeds.
tryBootstrapJoining :: Service s (RealNodeSTM s) => LocalNodeStateSTM s -> IO (Either String (LocalNodeStateSTM s)) tryBootstrapJoining :: Service s (RealNodeSTM s) => RealNodeSTM s -> IO (Either String ())
tryBootstrapJoining nsSTM = do tryBootstrapJoining nodeSTM = do
bss <- atomically $ do node <- readTVarIO nodeSTM
nsSnap <- readTVar nsSTM let
realNodeSnap <- readTVar $ parentRealNode nsSnap bss = bootstrapNodes node
pure $ bootstrapNodes realNodeSnap conf = nodeConfig node
tryJoining bss if confEnableKChoices conf
then tryJoining bss $ runExceptT . kChoicesNodeJoin nodeSTM . Just
else do
firstVS <- nodeStateInit nodeSTM 0
firstVSSTM <- newTVarIO firstVS
joinResult <- tryJoining bss (fediChordBootstrapJoin firstVSSTM)
either
(pure . Left)
(\_ -> fmap Right . atomically . modifyTVar' nodeSTM $
addVserver (getNid firstVS, firstVSSTM)
)
(joinResult :: Either String ())
where where
tryJoining (bn:bns) = do tryJoining :: [(String, PortNumber)] -> ((String, PortNumber) -> IO (Either String ())) -> IO (Either String ())
j <- fediChordBootstrapJoin nsSTM bn tryJoining (bn:bns) joinFunc = do
j <- joinFunc bn
case j of case j of
Left err -> putStrLn ("join error: " <> err) >> tryJoining bns Left err -> putStrLn ("join error: " <> err) >> tryJoining bns joinFunc
Right joined -> pure $ Right joined Right joined -> pure $ Right ()
tryJoining [] = pure $ Left "Exhausted all bootstrap points for joining." tryJoining [] _ = pure $ Left "Exhausted all bootstrap points for joining."
-- | Look up a key just based on the responses of a single bootstrapping node. -- | Look up a key just based on the responses of a single bootstrapping node.
bootstrapQueryId :: LocalNodeStateSTM s -> (String, PortNumber) -> NodeID -> IO (Either String RemoteNodeState) bootstrapQueryId :: (MonadError String m, MonadIO m, Service s (RealNodeSTM s))
bootstrapQueryId nsSTM (bootstrapHost, bootstrapPort) targetID = do => LocalNodeState s
ns <- readTVarIO nsSTM -> (String, PortNumber)
nodeConf <- nodeConfig <$> readTVarIO (parentRealNode ns) -> NodeID
-> m RemoteNodeState
bootstrapQueryId ns (bootstrapHost, bootstrapPort) targetID = do
nodeConf <- nodeConfig <$> liftIO (readTVarIO $ parentRealNode ns)
let srcAddr = confIP nodeConf let srcAddr = confIP nodeConf
bootstrapResponse <- bracket (mkSendSocket srcAddr bootstrapHost bootstrapPort) close ( -- IP address needed for ID generation, so look it up
bootstrapAddr <- addrAddress <$> liftIO (resolve (Just bootstrapHost) (Just bootstrapPort))
bootstrapIP <- case bootstrapAddr of
SockAddrInet6 _ _ bootstrapIP _ -> pure bootstrapIP
_ -> throwError $ "Expected an IPv6 address, but got " <> show bootstrapAddr
let possibleJoinIDs =
[ genNodeID bootstrapIP bootstrapHost v | v <- [0..pred (
if confEnableKChoices nodeConf then confKChoicesMaxVS nodeConf else 1)]]
tryQuery ns srcAddr nodeConf possibleJoinIDs
where
-- | try bootstrapping a query through any possible ID of the
-- given bootstrap node
tryQuery :: (MonadError String m, MonadIO m)
=> LocalNodeState s
-> HostAddress6
-> FediChordConf
-> [NodeID]
-> m RemoteNodeState
tryQuery _ _ _ [] = throwError $ "No ID of " <> show bootstrapHost <> " has responded."
tryQuery ns srcAddr nodeConf (bnid:bnids) = (do
bootstrapResponse <- liftIO $ bracket (mkSendSocket srcAddr bootstrapHost bootstrapPort) close (
-- Initialise an empty cache only with the responses from a bootstrapping node -- Initialise an empty cache only with the responses from a bootstrapping node
fmap Right . sendRequestTo (confRequestTimeout nodeConf) (confRequestRetries nodeConf) (lookupMessage targetID ns Nothing) fmap Right . sendRequestTo (confRequestTimeout nodeConf) (confRequestRetries nodeConf) (lookupMessage targetID ns Nothing bnid)
) )
`catch` (\e -> pure . Left $ "Error at bootstrap QueryId: " <> displayException (e :: IOException)) `catch` (\e -> pure . Left $ "Error at bootstrap QueryId: " <> displayException (e :: IOException))
case bootstrapResponse of case bootstrapResponse of
Left err -> pure $ Left err Left err -> throwError err
Right resp Right resp
| resp == Set.empty -> pure . Left $ "Bootstrapping node " <> show bootstrapHost <> " gave no response." | resp == Set.empty -> throwError $ "Bootstrapping node " <> show bootstrapHost <> " gave no response."
| otherwise -> do | otherwise -> do
now <- getPOSIXTime now <- liftIO getPOSIXTime
-- create new cache with all returned node responses -- create new cache with all returned node responses
let bootstrapCache = let bootstrapCache =
-- traverse response parts -- traverse response parts
foldr' (\resp cacheAcc -> case queryResult <$> payload resp of foldr' (\resp' cacheAcc -> case queryResult <$> payload resp' of
Nothing -> cacheAcc Nothing -> cacheAcc
Just (FOUND result1) -> addCacheEntryPure now (RemoteCacheEntry result1 now) cacheAcc Just (FOUND result1) -> addCacheEntryPure now (RemoteCacheEntry result1 now) cacheAcc
Just (FORWARD resultset) -> foldr' (addCacheEntryPure now) cacheAcc resultset Just (FORWARD resultset) -> foldr' (addCacheEntryPure now) cacheAcc resultset
) )
initCache resp initCache resp
currentlyResponsible <- runExceptT $ queryIdLookupLoop bootstrapCache ns 50 $ getNid ns queryIdLookupLoop bootstrapCache ns 50 $ getNid ns
pure currentlyResponsible ) `catchError` (\_ ->
-- only throw an error if all IDs have been tried
tryQuery ns srcAddr nodeConf bnids)
-- | join a node to the DHT using the global node cache -- | join a node to the DHT using the global node cache
-- node's position. -- node's position.
@ -265,6 +580,7 @@ fediChordVserverJoin nsSTM = do
joinResult <- liftIO $ requestJoin currentlyResponsible nsSTM joinResult <- liftIO $ requestJoin currentlyResponsible nsSTM
liftEither joinResult liftEither joinResult
fediChordVserverLeave :: (MonadError String m, MonadIO m, Service s (RealNodeSTM s)) => LocalNodeState s -> m () fediChordVserverLeave :: (MonadError String m, MonadIO m, Service s (RealNodeSTM s)) => LocalNodeState s -> m ()
fediChordVserverLeave ns = do fediChordVserverLeave ns = do
-- TODO: deal with failure of all successors, e.g. by invoking a stabilise -- TODO: deal with failure of all successors, e.g. by invoking a stabilise
@ -306,88 +622,124 @@ fediChordVserverLeave ns = do
-- | Wait for new cache entries to appear and then try joining on them. -- | Wait for new cache entries to appear and then try joining on them.
-- Exits after successful joining. -- Exits after successful joining.
joinOnNewEntriesThread :: Service s (RealNodeSTM s) => LocalNodeStateSTM s -> IO () joinOnNewEntriesThread :: Service s (RealNodeSTM s) => RealNodeSTM s -> IO ()
joinOnNewEntriesThread nsSTM = loop joinOnNewEntriesThread nodeSTM = loop
where where
-- situation 1: not joined yet -> vservers == empty
-- problem: empty vservers means not responding to incoming requests, so
-- another node cannot join on us an we not on them (as they're still
-- unjoined as well)
-- solution: on failure still join a dummy node, also add it as vserver
-- problem: once another node has joined on the dummy vserver, we shouldn't
-- just delete it again as it now relies on it as a neighbour
-- => either trigger a kChoicesNodeJoin with the flag that **not** at least one
-- single node needs to be joined (read vservers initially), or do an accelerated
-- periodic rebalance
-- TODO: document this approach in the docs
loop = do loop = do
nsSnap <- readTVarIO nsSTM (lookupResult, conf, firstVSSTM) <- atomically $ do
(lookupResult, parentNode) <- atomically $ do nodeSnap <- readTVar nodeSTM
cache <- readTVar $ nodeCacheSTM nsSnap let conf = nodeConfig nodeSnap
parentNode <- readTVar $ parentRealNode nsSnap case headMay (rMapToList $ vservers nodeSnap) of
case queryLocalCache nsSnap cache 1 (getNid nsSnap) of Nothing -> retry
-- empty cache, block until cache changes and then retry Just vsSTM -> do
-- take any active vserver as heuristic for whether this node has
-- successfully joined:
-- If the node hasn't joined yet, only a single placeholder node
-- is active…
firstVS <- readTVar vsSTM
cache <- readTVar $ globalNodeCacheSTM nodeSnap
case queryLocalCache firstVS cache 1 (getNid firstVS) of
-- …which, having no neighbours, returns an empty forward list
-- -> block until cache changes and then retry
(FORWARD s) | Set.null s -> retry (FORWARD s) | Set.null s -> retry
result -> pure (result, parentNode) result -> pure (result, conf, vsSTM)
case lookupResult of case lookupResult of
-- already joined -- already joined
FOUND _ -> FOUND _ ->
pure () pure ()
-- otherwise try joining -- otherwise try joining
FORWARD _ -> do FORWARD _ -> do
joinResult <- runExceptT $ fediChordVserverJoin nsSTM -- do normal join, but without bootstrap nodes
joinResult <- if confEnableKChoices conf
then runExceptT $ kChoicesNodeJoin nodeSTM Nothing
else runExceptT $ fediChordVserverJoin firstVSSTM
>> pure ()
either either
-- on join failure, sleep and retry -- on join failure, sleep and retry
(const $ threadDelay (confJoinAttemptsInterval . nodeConfig $ parentNode) >> loop) (const $ threadDelay (confJoinAttemptsInterval conf) >> loop)
(const $ pure ()) (const $ pure ())
joinResult joinResult
-- | cache updater thread that waits for incoming NodeCache update instructions on -- | cache updater thread that waits for incoming NodeCache update instructions on
-- the node's cacheWriteQueue and then modifies the NodeCache as the single writer. -- the node's cacheWriteQueue and then modifies the NodeCache as the single writer.
nodeCacheWriter :: LocalNodeStateSTM s -> IO () nodeCacheWriter :: RealNodeSTM s -> IO ()
nodeCacheWriter nsSTM = nodeCacheWriter nodeSTM = do
node <- readTVarIO nodeSTM
forever $ atomically $ do forever $ atomically $ do
ns <- readTVar nsSTM cacheModifier <- readTQueue $ globalCacheWriteQueue node
cacheModifier <- readTQueue $ cacheWriteQueue ns modifyTVar' (globalNodeCacheSTM node) cacheModifier
modifyTVar' (nodeCacheSTM ns) cacheModifier
-- | Periodically iterate through cache, clean up expired entries and verify unverified ones -- | Periodically iterate through cache, clean up expired entries and verify unverified ones
nodeCacheVerifyThread :: LocalNodeStateSTM s -> IO () nodeCacheVerifyThread :: RealNodeSTM s -> IO ()
nodeCacheVerifyThread nsSTM = forever $ do nodeCacheVerifyThread nodeSTM = forever $ do
-- get cache (node, firstVSSTM) <- atomically $ do
(ns, cache, maxEntryAge) <- atomically $ do node <- readTVar nodeSTM
ns <- readTVar nsSTM case headMay (rMapToList $ vservers node) of
cache <- readTVar $ nodeCacheSTM ns -- wait until first VS is joined
maxEntryAge <- confMaxNodeCacheAge . nodeConfig <$> readTVar (parentRealNode ns) Nothing -> retry
pure (ns, cache, maxEntryAge) Just vs' -> pure (node, vs')
let
maxEntryAge = confMaxNodeCacheAge $ nodeConfig node
cacheQ = globalCacheWriteQueue node
cache <- readTVarIO $ globalNodeCacheSTM node
-- always use the first active VS as a sender for operations like Ping
firstVS <- readTVarIO firstVSSTM
-- iterate entries: -- iterate entries:
-- for avoiding too many time syscalls, get current time before iterating. -- for avoiding too many time syscalls, get current time before iterating.
now <- getPOSIXTime now <- getPOSIXTime
forM_ (nodeCacheEntries cache) (\(CacheEntry validated node ts) -> forM_ (nodeCacheEntries cache) (\(CacheEntry validated cacheNode ts) ->
-- case too old: delete (future work: decide whether pinging and resetting timestamp is better) -- case too old: delete (future work: decide whether pinging and resetting timestamp is better)
if (now - ts) > maxEntryAge if (now - ts) > maxEntryAge
then then
queueDeleteEntry (getNid node) ns queueDeleteEntry (getNid cacheNode) cacheQ
-- case unverified: try verifying, otherwise delete -- case unverified: try verifying, otherwise delete
else if not validated else if not validated
then do then do
-- marking as verified is done by 'requestPing' as well -- marking as verified is done by 'requestPing' as well
pong <- requestPing ns node pong <- requestPing firstVS cacheNode
either (\_-> either (\_->
queueDeleteEntry (getNid node) ns queueDeleteEntry (getNid cacheNode) cacheQ
) )
(\vss -> (\vss ->
if node `notElem` vss if cacheNode `notElem` vss
then queueDeleteEntry (getNid node) ns then queueDeleteEntry (getNid cacheNode) cacheQ
-- after verifying a node, check whether it can be a closer neighbour -- after verifying a node, check whether it can be a closer neighbour
else do -- do this for each node
if node `isPossiblePredecessor` ns -- TODO: optimisation: place all LocalNodeStates on the cache ring and check whether any of them is the predecessor/ successor
else forM_ (vservers node) (\nsSTM -> do
ns <- readTVarIO nsSTM
if cacheNode `isPossiblePredecessor` ns
then atomically $ do then atomically $ do
ns' <- readTVar nsSTM ns' <- readTVar nsSTM
writeTVar nsSTM $ addPredecessors [node] ns' writeTVar nsSTM $ addPredecessors [cacheNode] ns'
else pure () else pure ()
if node `isPossibleSuccessor` ns if cacheNode `isPossibleSuccessor` ns
then atomically $ do then atomically $ do
ns' <- readTVar nsSTM ns' <- readTVar nsSTM
writeTVar nsSTM $ addSuccessors [node] ns' writeTVar nsSTM $ addSuccessors [cacheNode] ns'
else pure () else pure ()
)
) pong ) pong
else pure () else pure ()
) )
-- check the cache invariant per slice and, if necessary, do a single lookup to the -- check the cache invariant per slice and, if necessary, do a single lookup to the
-- middle of each slice not verifying the invariant -- middle of each slice not verifying the invariant
latestNode <- readTVarIO nodeSTM
forM_ (vservers latestNode) (\nsSTM -> do
latestNs <- readTVarIO nsSTM latestNs <- readTVarIO nsSTM
latestCache <- readTVarIO $ nodeCacheSTM latestNs latestCache <- readTVarIO $ nodeCacheSTM latestNs
let nodesToQuery targetID = case queryLocalCache latestNs latestCache (lNumBestNodes latestNs) targetID of let nodesToQuery targetID = case queryLocalCache latestNs latestCache (lNumBestNodes latestNs) targetID of
@ -396,6 +748,7 @@ nodeCacheVerifyThread nsSTM = forever $ do
forM_ (checkCacheSliceInvariants latestNs latestCache) (\targetID -> forM_ (checkCacheSliceInvariants latestNs latestCache) (\targetID ->
forkIO $ sendQueryIdMessages targetID latestNs (Just (1 + jEntriesPerSlice latestNs)) (nodesToQuery targetID) >> pure () -- ask for 1 entry more than j because of querying the middle forkIO $ sendQueryIdMessages targetID latestNs (Just (1 + jEntriesPerSlice latestNs)) (nodesToQuery targetID) >> pure () -- ask for 1 entry more than j because of querying the middle
) )
)
threadDelay $ fromEnum (maxEntryAge / 20) `div` 10^6 -- convert from pico to milliseconds threadDelay $ fromEnum (maxEntryAge / 20) `div` 10^6 -- convert from pico to milliseconds
@ -408,7 +761,7 @@ checkCacheSliceInvariants :: LocalNodeState s
-> [NodeID] -- ^ list of middle IDs of slices not -> [NodeID] -- ^ list of middle IDs of slices not
-- ^ fulfilling the invariant -- ^ fulfilling the invariant
checkCacheSliceInvariants ns checkCacheSliceInvariants ns
| isJoined ns = checkPredecessorSlice jEntries (getNid ns) startBound lastPred <> checkSuccessorSlice jEntries (getNid ns) startBound lastSucc | vsIsJoined ns = checkPredecessorSlice jEntries (getNid ns) startBound lastPred <> checkSuccessorSlice jEntries (getNid ns) startBound lastSucc
| otherwise = const [] | otherwise = const []
where where
jEntries = jEntriesPerSlice ns jEntries = jEntriesPerSlice ns
@ -459,8 +812,10 @@ checkCacheSliceInvariants ns
-- | Periodically send @StabiliseRequest' s to the closest neighbour nodes, until -- | Periodically send @StabiliseRequest' s to the closest neighbour nodes, until
-- one responds, and get their neighbours for maintaining the own neighbour lists. -- one responds, and get their neighbours for maintaining the own neighbour lists.
-- If necessary, request new neighbours. -- If necessary, request new neighbours.
stabiliseThread :: Service s (RealNodeSTM s) => LocalNodeStateSTM s -> IO () stabiliseThread :: Service s (RealNodeSTM s) => RealNodeSTM s -> IO ()
stabiliseThread nsSTM = forever $ do stabiliseThread nodeSTM = forever $ do
node <- readTVarIO nodeSTM
forM_ (vservers node) (\nsSTM -> do
oldNs <- readTVarIO nsSTM oldNs <- readTVarIO nsSTM
@ -541,8 +896,9 @@ stabiliseThread nsSTM = forever $ do
) )
newPredecessor newPredecessor
stabiliseDelay <- confStabiliseInterval . nodeConfig <$> readTVarIO (parentRealNode newNs) )
threadDelay stabiliseDelay
threadDelay . confStabiliseInterval . nodeConfig $ node
where where
-- | send a stabilise request to the n-th neighbour -- | send a stabilise request to the n-th neighbour
-- (specified by the provided getter function) and on failure retry -- (specified by the provided getter function) and on failure retry
@ -603,20 +959,23 @@ sendThread sock sendQ = forever $ do
sendAllTo sock packet addr sendAllTo sock packet addr
-- | Sets up and manages the main server threads of FediChord -- | Sets up and manages the main server threads of FediChord
fediMainThreads :: Service s (RealNodeSTM s) => Socket -> LocalNodeStateSTM s -> IO () fediMainThreads :: Service s (RealNodeSTM s) => Socket -> RealNodeSTM s -> IO ()
fediMainThreads sock nsSTM = do fediMainThreads sock nodeSTM = do
ns <- readTVarIO nsSTM node <- readTVarIO nodeSTM
putStrLn "launching threads" putStrLn "launching threads"
sendQ <- newTQueueIO sendQ <- newTQueueIO
recvQ <- newTQueueIO recvQ <- newTQueueIO
-- concurrently launch all handler threads, if one of them throws an exception -- concurrently launch all handler threads, if one of them throws an exception
-- all get cancelled -- all get cancelled
concurrently_ concurrently_
(fediMessageHandler sendQ recvQ nsSTM) $ (fediMessageHandler sendQ recvQ nodeSTM) $
concurrently_ (stabiliseThread nsSTM) $ -- decision whether to [1] launch 1 thread per VS or [2] let a single
concurrently_ (nodeCacheVerifyThread nsSTM) $ -- thread process all VSes sequentially:
concurrently_ (convergenceSampleThread nsSTM) $ -- choose option 2 for the sake of limiting concurrency in simulation scenario
concurrently_ (lookupCacheCleanup $ parentRealNode ns) $ concurrently_ (stabiliseThread nodeSTM) $
concurrently_ (nodeCacheVerifyThread nodeSTM) $
concurrently_ (convergenceSampleThread nodeSTM) $
concurrently_ (lookupCacheCleanup nodeSTM) $
concurrently_ concurrently_
(sendThread sock sendQ) (sendThread sock sendQ)
(recvThread sock recvQ) (recvThread sock recvQ)
@ -645,20 +1004,23 @@ requestMapPurge purgeAge mapVar = forever $ do
fediMessageHandler :: Service s (RealNodeSTM s) fediMessageHandler :: Service s (RealNodeSTM s)
=> TQueue (BS.ByteString, SockAddr) -- ^ send queue => TQueue (BS.ByteString, SockAddr) -- ^ send queue
-> TQueue (BS.ByteString, SockAddr) -- ^ receive queue -> TQueue (BS.ByteString, SockAddr) -- ^ receive queue
-> LocalNodeStateSTM s -- ^ acting NodeState -> RealNodeSTM s -- ^ node
-> IO () -> IO ()
fediMessageHandler sendQ recvQ nsSTM = do fediMessageHandler sendQ recvQ nodeSTM = do
-- Read node state just once, assuming that all relevant data for this function does nodeConf <- nodeConfig <$> readTVarIO nodeSTM
-- not change.
-- Other functions are passed the nsSTM reference and thus can get the latest state.
nsSnap <- readTVarIO nsSTM
nodeConf <- nodeConfig <$> readTVarIO (parentRealNode nsSnap)
-- handling multipart messages: -- handling multipart messages:
-- Request parts can be insert into a map (key: (sender IP against spoofing, request ID), value: timestamp + set of message parts, handle all of them when size of set == parts) before being handled. This map needs to be purged periodically by a separate thread and can be protected by an MVar for fairness. -- Request parts can be insert into a map (key: (sender IP against spoofing, request ID), value: timestamp + set of message parts, handle all of them when size of set == parts) before being handled. This map needs to be purged periodically by a separate thread and can be protected by an MVar for fairness.
requestMap <- newMVar (Map.empty :: RequestMap) requestMap <- newMVar (Map.empty :: RequestMap)
-- run receive loop and requestMapPurge concurrently, so that an exception makes -- run receive loop and requestMapPurge concurrently, so that an exception makes
-- both of them fail -- both of them fail
concurrently_ (requestMapPurge (confResponsePurgeAge nodeConf) requestMap) $ forever $ do concurrently_ (requestMapPurge (confResponsePurgeAge nodeConf) requestMap) $ forever $ do
node <- readTVarIO nodeSTM
-- Messages from invalid (spoofed) sender IDs could already be dropped here
-- or in @dispatchVS@. But as the checking on each possible ID causes an
-- overhead, it is only done for critical operations and the case
-- differentiation is done in @handleIncomingRequest@. Thus the vserver
-- number limit, required for this check, needs to be passed to that function.
let handlerFunc = handleIncomingRequest $ confKChoicesMaxVS nodeConf
-- wait for incoming messages -- wait for incoming messages
(rawMsg, sourceAddr) <- atomically $ readTQueue recvQ (rawMsg, sourceAddr) <- atomically $ readTQueue recvQ
let aMsg = deserialiseMessage rawMsg let aMsg = deserialiseMessage rawMsg
@ -668,12 +1030,14 @@ fediMessageHandler sendQ recvQ nsSTM = do
) )
(\validMsg -> (\validMsg ->
case validMsg of case validMsg of
aRequest@Request{} aRequest@Request{} -> case dispatchVS node aRequest of
-- if no match to an active vserver ID, just ignore
Nothing -> pure ()
-- if not a multipart message, handle immediately. Response is at the same time an ACK -- if not a multipart message, handle immediately. Response is at the same time an ACK
| part aRequest == 1 && isFinalPart aRequest -> Just nsSTM | part aRequest == 1 && isFinalPart aRequest ->
forkIO (handleIncomingRequest nsSTM sendQ (Set.singleton aRequest) sourceAddr) >> pure () forkIO (handlerFunc nsSTM sendQ (Set.singleton aRequest) sourceAddr) >> pure ()
-- otherwise collect all message parts first before handling the whole request -- otherwise collect all message parts first before handling the whole request
| otherwise -> do Just nsSTM | otherwise -> do
now <- getPOSIXTime now <- getPOSIXTime
-- critical locking section of requestMap -- critical locking section of requestMap
rMapState <- takeMVar requestMap rMapState <- takeMVar requestMap
@ -691,14 +1055,14 @@ fediMessageHandler sendQ recvQ nsSTM = do
-- put map back into MVar, end of critical section -- put map back into MVar, end of critical section
putMVar requestMap newMapState putMVar requestMap newMapState
-- ACK the received part -- ACK the received part
forM_ (ackRequest (getNid nsSnap) aRequest) $ forM_ (ackRequest aRequest) $
\msg -> atomically $ writeTQueue sendQ (msg, sourceAddr) \msg -> atomically $ writeTQueue sendQ (msg, sourceAddr)
-- if all parts received, then handle request. -- if all parts received, then handle request.
let let
(RequestMapEntry theseParts mayMaxParts _) = fromJust $ Map.lookup thisKey newMapState (RequestMapEntry theseParts mayMaxParts _) = fromJust $ Map.lookup thisKey newMapState
numParts = Set.size theseParts numParts = Set.size theseParts
if maybe False (numParts ==) (fromIntegral <$> mayMaxParts) if maybe False (numParts ==) (fromIntegral <$> mayMaxParts)
then forkIO (handleIncomingRequest nsSTM sendQ theseParts sourceAddr) >> pure() then forkIO (handlerFunc nsSTM sendQ theseParts sourceAddr) >> pure()
else pure() else pure()
-- Responses should never arrive on the main server port, as they are always -- Responses should never arrive on the main server port, as they are always
-- responses to requests sent from dedicated sockets on another port -- responses to requests sent from dedicated sockets on another port
@ -707,6 +1071,8 @@ fediMessageHandler sendQ recvQ nsSTM = do
aMsg aMsg
pure () pure ()
where
dispatchVS node req = rMapLookup (receiverID req) (vservers node)
-- ==== interface to service layer ==== -- ==== interface to service layer ====
@ -757,7 +1123,7 @@ updateLookupCache :: RealNodeSTM s -> NodeID -> IO (Maybe (String, PortNumber))
updateLookupCache nodeSTM keyToLookup = do updateLookupCache nodeSTM keyToLookup = do
(node, lookupSource) <- atomically $ do (node, lookupSource) <- atomically $ do
node <- readTVar nodeSTM node <- readTVar nodeSTM
let firstVs = headMay (vservers node) let firstVs = headMay (rMapToList $ vservers node)
lookupSource <- case firstVs of lookupSource <- case firstVs of
Nothing -> pure Nothing Nothing -> pure Nothing
Just vs -> Just <$> readTVar vs Just vs -> Just <$> readTVar vs

View file

@ -7,8 +7,8 @@
{-# LANGUAGE OverloadedStrings #-} {-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE RankNTypes #-} {-# LANGUAGE RankNTypes #-}
module Hash2Pub.FediChordTypes ( module Hash2Pub.FediChordTypes
NodeID -- abstract, but newtype constructors cannot be hidden ( NodeID -- abstract, but newtype constructors cannot be hidden
, idBits , idBits
, getNodeID , getNodeID
, toNodeID , toNodeID
@ -18,6 +18,13 @@ module Hash2Pub.FediChordTypes (
, RemoteNodeState (..) , RemoteNodeState (..)
, RealNode (..) , RealNode (..)
, RealNodeSTM , RealNodeSTM
, VSMap
, LoadStats (..)
, emptyLoadStats
, remainingLoadTarget
, loadSliceSum
, addVserver
, SegmentLoadStats (..)
, setSuccessors , setSuccessors
, setPredecessors , setPredecessors
, NodeCache , NodeCache
@ -51,6 +58,7 @@ module Hash2Pub.FediChordTypes (
, localCompare , localCompare
, genNodeID , genNodeID
, genNodeIDBS , genNodeIDBS
, hasValidNodeId
, genKeyID , genKeyID
, genKeyIDBS , genKeyIDBS
, byteStringToUInteger , byteStringToUInteger
@ -66,6 +74,8 @@ import Control.Exception
import Data.Foldable (foldr') import Data.Foldable (foldr')
import Data.Function (on) import Data.Function (on)
import qualified Data.Hashable as Hashable import qualified Data.Hashable as Hashable
import Data.HashMap.Strict (HashMap)
import qualified Data.HashMap.Strict as HMap
import Data.List (delete, nub, sortBy) import Data.List (delete, nub, sortBy)
import qualified Data.Map.Strict as Map import qualified Data.Map.Strict as Map
import Data.Maybe (fromJust, fromMaybe, isJust, import Data.Maybe (fromJust, fromMaybe, isJust,
@ -148,17 +158,27 @@ a `localCompare` b
-- Also contains shared data and config values. -- Also contains shared data and config values.
-- TODO: more data structures for k-choices bookkeeping -- TODO: more data structures for k-choices bookkeeping
data RealNode s = RealNode data RealNode s = RealNode
{ vservers :: [LocalNodeStateSTM s] { vservers :: VSMap s
-- ^ references to all active versers -- ^ map of all active VServer node IDs to their node state
, nodeConfig :: FediChordConf , nodeConfig :: FediChordConf
-- ^ holds the initial configuration read at program start -- ^ holds the initial configuration read at program start
, bootstrapNodes :: [(String, PortNumber)] , bootstrapNodes :: [(String, PortNumber)]
-- ^ nodes to be used as bootstrapping points, new ones learned during operation -- ^ nodes to be used as bootstrapping points, new ones learned during operation
, lookupCacheSTM :: TVar LookupCache , lookupCacheSTM :: TVar LookupCache
-- ^ a global cache of looked up keys and their associated nodes -- ^ a global cache of looked up keys and their associated nodes
, globalNodeCacheSTM :: TVar NodeCache
-- ^ EpiChord node cache with expiry times for nodes.
, globalCacheWriteQueue :: TQueue (NodeCache -> NodeCache)
-- ^ cache updates are not written directly to the 'globalNodeCacheSTM'
, nodeService :: s (RealNodeSTM s) , nodeService :: s (RealNodeSTM s)
} }
-- | insert a new vserver mapping into a node
addVserver :: (NodeID, LocalNodeStateSTM s) -> RealNode s -> RealNode s
addVserver (key, nstate) node = node
{ vservers = addRMapEntry key nstate (vservers node) }
type VSMap s = RingMap NodeID (LocalNodeStateSTM s)
type RealNodeSTM s = TVar (RealNode s) type RealNodeSTM s = TVar (RealNode s)
-- | represents a node and all its important state -- | represents a node and all its important state
@ -172,7 +192,7 @@ data RemoteNodeState = RemoteNodeState
-- ^ port of the DHT itself -- ^ port of the DHT itself
, servicePort :: PortNumber , servicePort :: PortNumber
-- ^ port of the service provided on top of the DHT -- ^ port of the service provided on top of the DHT
, vServerID :: Integer , vServerID :: Word8
-- ^ ID of this vserver -- ^ ID of this vserver
} }
deriving (Show, Eq) deriving (Show, Eq)
@ -185,9 +205,9 @@ data LocalNodeState s = LocalNodeState
{ nodeState :: RemoteNodeState { nodeState :: RemoteNodeState
-- ^ represents common data present both in remote and local node representations -- ^ represents common data present both in remote and local node representations
, nodeCacheSTM :: TVar NodeCache , nodeCacheSTM :: TVar NodeCache
-- ^ EpiChord node cache with expiry times for nodes -- ^ reference to the 'globalNodeCacheSTM'
, cacheWriteQueue :: TQueue (NodeCache -> NodeCache) , cacheWriteQueue :: TQueue (NodeCache -> NodeCache)
-- ^ cache updates are not written directly to the 'nodeCache' but queued and -- ^ reference to the 'globalCacheWriteQueue
, successors :: [RemoteNodeState] -- could be a set instead as these are ordered as well , successors :: [RemoteNodeState] -- could be a set instead as these are ordered as well
-- ^ successor nodes in ascending order by distance -- ^ successor nodes in ascending order by distance
, predecessors :: [RemoteNodeState] , predecessors :: [RemoteNodeState]
@ -217,14 +237,14 @@ class NodeState a where
getIpAddr :: a -> HostAddress6 getIpAddr :: a -> HostAddress6
getDhtPort :: a -> PortNumber getDhtPort :: a -> PortNumber
getServicePort :: a -> PortNumber getServicePort :: a -> PortNumber
getVServerID :: a -> Integer getVServerID :: a -> Word8
-- setters for common properties -- setters for common properties
setNid :: NodeID -> a -> a setNid :: NodeID -> a -> a
setDomain :: String -> a -> a setDomain :: String -> a -> a
setIpAddr :: HostAddress6 -> a -> a setIpAddr :: HostAddress6 -> a -> a
setDhtPort :: PortNumber -> a -> a setDhtPort :: PortNumber -> a -> a
setServicePort :: PortNumber -> a -> a setServicePort :: PortNumber -> a -> a
setVServerID :: Integer -> a -> a setVServerID :: Word8 -> a -> a
toRemoteNodeState :: a -> RemoteNodeState toRemoteNodeState :: a -> RemoteNodeState
instance NodeState RemoteNodeState where instance NodeState RemoteNodeState where
@ -373,6 +393,11 @@ genNodeID :: HostAddress6 -- ^a node's IPv6 address
-> NodeID -- ^the generated @NodeID@ -> NodeID -- ^the generated @NodeID@
genNodeID ip nodeDomain vs = NodeID . byteStringToUInteger $ genNodeIDBS ip nodeDomain vs genNodeID ip nodeDomain vs = NodeID . byteStringToUInteger $ genNodeIDBS ip nodeDomain vs
hasValidNodeId :: Word8 -> RemoteNodeState -> HostAddress6 -> Bool
hasValidNodeId numVs rns addr = getVServerID rns < numVs && getNid rns == genNodeID addr (getDomain rns) (getVServerID rns)
-- | generates a 256 bit long key identifier, represented as ByteString, for looking up its data on the DHT -- | generates a 256 bit long key identifier, represented as ByteString, for looking up its data on the DHT
genKeyIDBS :: String -- ^the key string genKeyIDBS :: String -- ^the key string
-> BS.ByteString -- ^the key ID represented as a @ByteString@ -> BS.ByteString -- ^the key ID represented as a @ByteString@
@ -427,9 +452,70 @@ data FediChordConf = FediChordConf
-- ^ how long to wait until response has arrived, in milliseconds -- ^ how long to wait until response has arrived, in milliseconds
, confRequestRetries :: Int , confRequestRetries :: Int
-- ^ how often re-sending a timed-out request can be retried -- ^ how often re-sending a timed-out request can be retried
, confEnableKChoices :: Bool
-- ^ whether to enable k-choices load balancing
, confKChoicesOverload :: Double
-- ^ fraction of capacity above which a node considers itself overloaded
, confKChoicesUnderload :: Double
-- ^ fraction of capacity below which a node considers itself underloaded
, confKChoicesMaxVS :: Word8
-- ^ upper limit of vserver index κ
, confKChoicesRebalanceInterval :: Int
-- ^ interval between vserver rebalance attempts
} }
deriving (Show, Eq) deriving (Show, Eq)
-- ====== k-choices load balancing types ======
data LoadStats = LoadStats
{ loadPerTag :: RingMap NodeID Double
-- ^ map of loads for each handled tag
, totalCapacity :: Double
-- ^ total designated capacity of the service
, compensatedLoadSum :: Double
-- ^ effective load reevant for load balancing after compensating for
}
deriving (Show, Eq)
-- | calculates the mismatch from the target load by taking into account the
-- underload and overload limits
remainingLoadTarget :: FediChordConf -> LoadStats -> Double
remainingLoadTarget conf lstats = targetLoad - compensatedLoadSum lstats
where
targetLoad = totalCapacity lstats * (confKChoicesUnderload conf + confKChoicesOverload conf) / 2
-- | calculates the sum of tag load in a contiguous slice between to keys
loadSliceSum :: LoadStats
-> NodeID -- ^ lower segment bound
-> NodeID -- ^ upper segment bound
-> Double -- ^ sum of all tag loads within that segment
loadSliceSum stats from to = sum . takeRMapSuccessorsFromTo from to $ loadPerTag stats
data SegmentLoadStats = SegmentLoadStats
{ segmentLowerKeyBound :: NodeID
-- ^ segment start key
, segmentUpperKeyBound :: NodeID
-- ^ segment end key
, segmentLoad :: Double
-- ^ sum of load of all keys in the segment
, segmentOwnerRemainingLoadTarget :: Double
-- ^ remaining load target of the current segment handler:
, segmentOwnerCapacity :: Double
-- ^ total capacity of the current segment handler node, used for normalisation
, segmentCurrentOwner :: RemoteNodeState
-- ^ the current owner of the segment that needs to be joined on
}
-- TODO: figure out a better way of initialising
emptyLoadStats :: LoadStats
emptyLoadStats = LoadStats
{ loadPerTag = emptyRMap
, totalCapacity = 0
, compensatedLoadSum = 0
}
-- ====== Service Types ============ -- ====== Service Types ============
class Service s d where class Service s d where
@ -445,6 +531,7 @@ class Service s d where
-> IO (Either String ()) -- ^ success or failure -> IO (Either String ()) -- ^ success or failure
-- | Wait for an incoming migration from a given node to succeed, may block forever -- | Wait for an incoming migration from a given node to succeed, may block forever
waitForMigrationFrom :: s d -> NodeID -> IO () waitForMigrationFrom :: s d -> NodeID -> IO ()
getServiceLoadStats :: s d -> IO LoadStats
instance Hashable.Hashable NodeID where instance Hashable.Hashable NodeID where
hashWithSalt salt = Hashable.hashWithSalt salt . getNodeID hashWithSalt salt = Hashable.hashWithSalt salt . getNodeID

View file

@ -22,7 +22,7 @@ import qualified Data.DList as D
import Data.Either (lefts, rights) import Data.Either (lefts, rights)
import qualified Data.HashMap.Strict as HMap import qualified Data.HashMap.Strict as HMap
import qualified Data.HashSet as HSet import qualified Data.HashSet as HSet
import Data.Maybe (fromJust, isJust) import Data.Maybe (fromJust, fromMaybe, isJust)
import Data.String (fromString) import Data.String (fromString)
import Data.Text.Lazy (Text) import Data.Text.Lazy (Text)
import qualified Data.Text.Lazy as Txt import qualified Data.Text.Lazy as Txt
@ -64,8 +64,10 @@ data PostService d = PostService
, migrationsInProgress :: TVar (HMap.HashMap NodeID (MVar ())) , migrationsInProgress :: TVar (HMap.HashMap NodeID (MVar ()))
, httpMan :: HTTP.Manager , httpMan :: HTTP.Manager
, statsQueue :: TQueue StatsEvent , statsQueue :: TQueue StatsEvent
, loadStats :: TVar RelayStats , relayStats :: TVar RelayStats
-- ^ current load stats, replaced periodically -- ^ current relay stats, replaced periodically
, loadStats :: TVar LoadStats
-- ^ current load values of the relay, replaced periodically and used by
, logFileHandle :: Handle , logFileHandle :: Handle
} }
deriving (Typeable) deriving (Typeable)
@ -96,7 +98,8 @@ instance DHT d => Service PostService d where
migrationsInProgress' <- newTVarIO HMap.empty migrationsInProgress' <- newTVarIO HMap.empty
httpMan' <- HTTP.newManager HTTP.defaultManagerSettings httpMan' <- HTTP.newManager HTTP.defaultManagerSettings
statsQueue' <- newTQueueIO statsQueue' <- newTQueueIO
loadStats' <- newTVarIO emptyStats relayStats' <- newTVarIO emptyStats
loadStats' <- newTVarIO emptyLoadStats
loggingFile <- openFile (confLogfilePath conf) WriteMode loggingFile <- openFile (confLogfilePath conf) WriteMode
hSetBuffering loggingFile LineBuffering hSetBuffering loggingFile LineBuffering
let let
@ -112,6 +115,7 @@ instance DHT d => Service PostService d where
, migrationsInProgress = migrationsInProgress' , migrationsInProgress = migrationsInProgress'
, httpMan = httpMan' , httpMan = httpMan'
, statsQueue = statsQueue' , statsQueue = statsQueue'
, relayStats = relayStats'
, loadStats = loadStats' , loadStats = loadStats'
, logFileHandle = loggingFile , logFileHandle = loggingFile
} }
@ -153,6 +157,12 @@ instance DHT d => Service PostService d where
-- block until migration finished -- block until migration finished
takeMVar migrationSynchroniser takeMVar migrationSynchroniser
getServiceLoadStats = getLoadStats
getLoadStats :: PostService d -> IO LoadStats
getLoadStats serv = readTVarIO $ loadStats serv
-- | return a WAI application -- | return a WAI application
postServiceApplication :: DHT d => PostService d -> Application postServiceApplication :: DHT d => PostService d -> Application
@ -835,7 +845,12 @@ evaluateStatsThread serv statsAcc = getPOSIXTime >>= loop
-- persistently store in a TVar so it can be retrieved later by the DHT -- persistently store in a TVar so it can be retrieved later by the DHT
let timePassed = (now - previousTs) * fromIntegral (confSpeedupFactor $ serviceConf serv) let timePassed = (now - previousTs) * fromIntegral (confSpeedupFactor $ serviceConf serv)
rateStats = evaluateStats timePassed summedStats rateStats = evaluateStats timePassed summedStats
atomically $ writeTVar (loadStats serv) rateStats currentSubscribers <- readTVarIO $ subscribers serv
-- translate the rate statistics to load values
loads <- evaluateLoadStats rateStats currentSubscribers
atomically $
writeTVar (relayStats serv) rateStats
>> writeTVar (loadStats serv) loads
-- and now what? write a log to file -- and now what? write a log to file
-- format: total relayReceiveRates;total relayDeliveryRates;postFetchRate;postPublishRate; subscriberSum -- format: total relayReceiveRates;total relayDeliveryRates;postFetchRate;postPublishRate; subscriberSum
-- later: current (reported) load, target load -- later: current (reported) load, target load
@ -859,6 +874,33 @@ evaluateStatsThread serv statsAcc = getPOSIXTime >>= loop
0 tagMap 0 tagMap
-- | calculate load values from rate statistics
evaluateLoadStats :: RelayStats -> RelayTags -> IO LoadStats
evaluateLoadStats currentStats currentSubscribers = do
-- load caused by each tag: incomingPostRate * ( 1 + subscribers)
-- calculate remaining load target: post publish rate * 2.5 - sum loadPerTag - postFetchRate
let
totalCapacity' = 2.5 * postPublishRate currentStats
(loadSum, loadPerTag') <- foldM (\(loadSum, loadPerTag') (key, (subscriberMapSTM, _, _)) -> do
numSubscribers <- HMap.size <$> readTVarIO subscriberMapSTM
let
thisTagRate = fromMaybe 0 $ rMapLookup key (relayReceiveRates currentStats)
thisTagLoad = thisTagRate * (1 + fromIntegral numSubscribers)
pure (loadSum + thisTagLoad, addRMapEntry key thisTagLoad loadPerTag')
)
(0, emptyRMap)
$ rMapToListWithKeys currentSubscribers
let remainingLoadTarget' = totalCapacity' - loadSum - postFetchRate currentStats
pure LoadStats
{ loadPerTag = loadPerTag'
, totalCapacity = totalCapacity'
-- load caused by post fetches cannot be influenced by re-balancing nodes,
-- but still reduces the totally available capacity
, compensatedLoadSum = loadSum + postFetchRate currentStats
}
-- | Evaluate the accumulated statistic events: Currently mostly calculates the event -- | Evaluate the accumulated statistic events: Currently mostly calculates the event
-- rates by dividing through the collection time frame -- rates by dividing through the collection time frame
evaluateStats :: POSIXTime -> RelayStats -> RelayStats evaluateStats :: POSIXTime -> RelayStats -> RelayStats

View file

@ -16,10 +16,12 @@ data Action = QueryID
| Leave | Leave
| Stabilise | Stabilise
| Ping | Ping
| QueryLoad
deriving (Show, Eq, Enum) deriving (Show, Eq, Enum)
data FediChordMessage = Request data FediChordMessage = Request
{ requestID :: Integer { requestID :: Integer
, receiverID :: NodeID
, sender :: RemoteNodeState , sender :: RemoteNodeState
, part :: Integer , part :: Integer
, isFinalPart :: Bool , isFinalPart :: Bool
@ -57,6 +59,10 @@ data ActionPayload = QueryIDRequestPayload
} }
| StabiliseRequestPayload | StabiliseRequestPayload
| PingRequestPayload | PingRequestPayload
| LoadRequestPayload
{ loadSegmentUpperBound :: NodeID
-- ^ upper bound of segment interested in,
}
| QueryIDResponsePayload | QueryIDResponsePayload
{ queryResult :: QueryResponse { queryResult :: QueryResponse
} }
@ -73,6 +79,12 @@ data ActionPayload = QueryIDRequestPayload
| PingResponsePayload | PingResponsePayload
{ pingNodeStates :: [RemoteNodeState] { pingNodeStates :: [RemoteNodeState]
} }
| LoadResponsePayload
{ loadSum :: Double
, loadRemainingTarget :: Double
, loadTotalCapacity :: Double
, loadSegmentLowerBound :: NodeID
}
deriving (Show, Eq) deriving (Show, Eq)
-- | global limit of parts per message used when (de)serialising messages. -- | global limit of parts per message used when (de)serialising messages.

View file

@ -47,6 +47,13 @@ instance (Bounded k, Ord k) => Foldable (RingMap k) where
traversingFL acc (ProxyEntry _ Nothing) = acc traversingFL acc (ProxyEntry _ Nothing) = acc
traversingFL acc (ProxyEntry _ (Just entry)) = traversingFL acc entry traversingFL acc (ProxyEntry _ (Just entry)) = traversingFL acc entry
instance (Bounded k, Ord k) => Traversable (RingMap k) where
traverse f = fmap RingMap . traverse traversingF . getRingMap
where
traversingF (KeyEntry entry) = KeyEntry <$> f entry
traversingF (ProxyEntry to Nothing) = pure $ ProxyEntry to Nothing
traversingF (ProxyEntry to (Just entry)) = ProxyEntry to . Just <$> traversingF entry
-- | entry of a 'RingMap' that holds a value and can also -- | entry of a 'RingMap' that holds a value and can also
-- wrap around the lookup direction at the edges of the name space. -- wrap around the lookup direction at the edges of the name space.
@ -106,6 +113,23 @@ rMapSize rmap = fromIntegral $ Map.size innerMap - oneIfEntry rmap minBound - on
| isNothing (rMapLookup nid rmap') = 1 | isNothing (rMapLookup nid rmap') = 1
| otherwise = 0 | otherwise = 0
-- | whether the RingMap is empty (except for empty proxy entries)
nullRMap :: (Num k, Bounded k, Ord k)
=> RingMap k a
-> Bool
-- basic idea: look for a predecessor starting from the upper Map bound,
-- Nothing indicates no entry being found
nullRMap = isNothing . rMapLookupPred maxBound
-- | O(logn( Is the key a member of the RingMap?
memberRMap :: (Bounded k, Ord k)
=> k
-> RingMap k a
-> Bool
memberRMap key = isJust . rMapLookup key
-- | a wrapper around lookup functions, making the lookup redirectable by a @ProxyEntry@ -- | a wrapper around lookup functions, making the lookup redirectable by a @ProxyEntry@
-- to simulate a modular ring -- to simulate a modular ring
lookupWrapper :: (Bounded k, Ord k, Num k) lookupWrapper :: (Bounded k, Ord k, Num k)
@ -198,12 +222,28 @@ deleteRMapEntry nid = RingMap . Map.update modifier nid . getRingMap
modifier (ProxyEntry idPointer _) = Just (ProxyEntry idPointer Nothing) modifier (ProxyEntry idPointer _) = Just (ProxyEntry idPointer Nothing)
modifier KeyEntry {} = Nothing modifier KeyEntry {} = Nothing
-- TODO: rename this to elems
rMapToList :: (Bounded k, Ord k) => RingMap k a -> [a] rMapToList :: (Bounded k, Ord k) => RingMap k a -> [a]
rMapToList = mapMaybe extractRingEntry . Map.elems . getRingMap rMapToList = mapMaybe extractRingEntry . Map.elems . getRingMap
-- TODO: rename this to toList
rMapToListWithKeys :: (Bounded k, Ord k) => RingMap k a -> [(k, a)]
rMapToListWithKeys = Map.foldrWithKey (\k v acc ->
maybe acc (\val -> (k, val):acc) $ extractRingEntry v
)
[]
. getRingMap
rMapFromList :: (Bounded k, Ord k) => [(k, a)] -> RingMap k a rMapFromList :: (Bounded k, Ord k) => [(k, a)] -> RingMap k a
rMapFromList = setRMapEntries rMapFromList = setRMapEntries
-- | this just merges the underlying 'Map.Map' s and does not check whether the
-- ProxyEntry pointers are consistent, so better only create unions of
-- equal-pointered RingMaps
unionRMap :: (Bounded k, Ord k) => RingMap k a -> RingMap k a -> RingMap k a
unionRMap a b = RingMap $ Map.union (getRingMap a) (getRingMap b)
-- | takes up to i entries from a 'RingMap' by calling a getter function on a -- | takes up to i entries from a 'RingMap' by calling a getter function on a
-- *startAt* value and after that on the previously returned value. -- *startAt* value and after that on the previously returned value.
-- Stops once i entries have been taken or an entry has been encountered twice -- Stops once i entries have been taken or an entry has been encountered twice

View file

@ -7,6 +7,7 @@ import Control.Concurrent.STM.TVar
import Control.Exception import Control.Exception
import Data.ASN1.Parse (runParseASN1) import Data.ASN1.Parse (runParseASN1)
import qualified Data.ByteString as BS import qualified Data.ByteString as BS
import qualified Data.HashMap.Strict as HMap
import qualified Data.Map.Strict as Map import qualified Data.Map.Strict as Map
import Data.Maybe (fromJust, isJust) import Data.Maybe (fromJust, isJust)
import qualified Data.Set as Set import qualified Data.Set as Set
@ -18,6 +19,7 @@ import Hash2Pub.ASN1Coding
import Hash2Pub.DHTProtocol import Hash2Pub.DHTProtocol
import Hash2Pub.FediChord import Hash2Pub.FediChord
import Hash2Pub.FediChordTypes import Hash2Pub.FediChordTypes
import Hash2Pub.RingMap
spec :: Spec spec :: Spec
spec = do spec = do
@ -221,14 +223,16 @@ spec = do
, exampleNodeState {nid = fromInteger (-5)} , exampleNodeState {nid = fromInteger (-5)}
] ]
} }
requestTemplate = Request { qLoadReqPayload = LoadRequestPayload
requestID = 2342 { loadSegmentUpperBound = 1025
, sender = exampleNodeState
, part = 1
, isFinalPart = True
, action = undefined
, payload = undefined
} }
qLoadResPayload = LoadResponsePayload
{ loadSum = 3.141
, loadRemainingTarget = -1.337
, loadTotalCapacity = 2.21
, loadSegmentLowerBound = 12
}
responseTemplate = Response { responseTemplate = Response {
requestID = 2342 requestID = 2342
, senderID = nid exampleNodeState , senderID = nid exampleNodeState
@ -237,7 +241,7 @@ spec = do
, action = undefined , action = undefined
, payload = undefined , payload = undefined
} }
requestWith a pa = requestTemplate {action = a, payload = Just pa} requestWith senderNode a pa = mkRequest senderNode 4545 a (Just pa) 2342
responseWith a pa = responseTemplate {action = a, payload = Just pa} responseWith a pa = responseTemplate {action = a, payload = Just pa}
encodeDecodeAndCheck msg = runParseASN1 parseMessage (encodeMessage msg) `shouldBe` pure msg encodeDecodeAndCheck msg = runParseASN1 parseMessage (encodeMessage msg) `shouldBe` pure msg
@ -248,17 +252,20 @@ spec = do
} }
it "messages are encoded and decoded correctly from and to ASN1" $ do it "messages are encoded and decoded correctly from and to ASN1" $ do
encodeDecodeAndCheck $ requestWith QueryID qidReqPayload localNS <- exampleLocalNode
encodeDecodeAndCheck $ requestWith Join jReqPayload encodeDecodeAndCheck $ requestWith localNS QueryID qidReqPayload
encodeDecodeAndCheck $ requestWith Leave lReqPayload encodeDecodeAndCheck $ requestWith localNS Join jReqPayload
encodeDecodeAndCheck $ requestWith Stabilise stabReqPayload encodeDecodeAndCheck $ requestWith localNS Leave lReqPayload
encodeDecodeAndCheck $ requestWith Ping pingReqPayload encodeDecodeAndCheck $ requestWith localNS Stabilise stabReqPayload
encodeDecodeAndCheck $ requestWith localNS Ping pingReqPayload
encodeDecodeAndCheck $ requestWith localNS QueryLoad qLoadReqPayload
encodeDecodeAndCheck $ responseWith QueryID qidResPayload1 encodeDecodeAndCheck $ responseWith QueryID qidResPayload1
encodeDecodeAndCheck $ responseWith QueryID qidResPayload2 encodeDecodeAndCheck $ responseWith QueryID qidResPayload2
encodeDecodeAndCheck $ responseWith Join jResPayload encodeDecodeAndCheck $ responseWith Join jResPayload
encodeDecodeAndCheck $ responseWith Leave lResPayload encodeDecodeAndCheck $ responseWith Leave lResPayload
encodeDecodeAndCheck $ responseWith Stabilise stabResPayload encodeDecodeAndCheck $ responseWith Stabilise stabResPayload
encodeDecodeAndCheck $ responseWith Ping pingResPayload encodeDecodeAndCheck $ responseWith Ping pingResPayload
encodeDecodeAndCheck $ responseWith QueryLoad qLoadResPayload
it "messages are encoded and decoded to ASN.1 DER properly" $ it "messages are encoded and decoded to ASN.1 DER properly" $
deserialiseMessage (fromJust $ Map.lookup 1 (serialiseMessage 652 $ responseWith Ping pingResPayload)) `shouldBe` Right (responseWith Ping pingResPayload) deserialiseMessage (fromJust $ Map.lookup 1 (serialiseMessage 652 $ responseWith Ping pingResPayload)) `shouldBe` Right (responseWith Ping pingResPayload)
it "messages too large for a single packet can (often) be split into multiple parts" $ do it "messages too large for a single packet can (often) be split into multiple parts" $ do
@ -297,13 +304,13 @@ exampleNodeState = RemoteNodeState {
exampleLocalNode :: IO (LocalNodeState MockService) exampleLocalNode :: IO (LocalNodeState MockService)
exampleLocalNode = do exampleLocalNode = do
realNode <- newTVarIO $ RealNode { realNodeSTM <- newTVarIO $ RealNode {
vservers = [] vservers = emptyRMap
, nodeConfig = exampleFediConf , nodeConfig = exampleFediConf
, bootstrapNodes = confBootstrapNodes exampleFediConf , bootstrapNodes = confBootstrapNodes exampleFediConf
, nodeService = MockService , nodeService = MockService
} }
nodeStateInit realNode nodeStateInit realNodeSTM 0
exampleFediConf :: FediChordConf exampleFediConf :: FediChordConf