Compare commits
	
		
			38 commits
		
	
	
		
	
	| Author | SHA1 | Date | |
|---|---|---|---|
| e79ba52e00 | |||
| 4aa4667a1d | |||
| 6aebd982f8 | |||
| 048a6ce391 | |||
| 8bd4e04bcd | |||
| 0cb4b6815c | |||
| b111515178 | |||
| ecb127e6af | |||
| 5ed8a28fde | |||
| bb0fb0919a | |||
| b2b4fe3dd8 | |||
| c208aeceaa | |||
| 0ee8f0dc43 | |||
| 21ecf9b041 | |||
| 9a61c186e3 | |||
| 578cc362b9 | |||
| 1a0de55b8c | |||
| 7a87d86c32 | |||
| 3b6d129bfc | |||
| 62da66aade | |||
| 13c5b385b1 | |||
| 1ed0281417 | |||
| 499c90e63a | |||
| 1a7afed062 | |||
| 8e8ea41dc4 | |||
| 33ae904d17 | |||
| 68de73d919 | |||
| 0ab6ee9c8f | |||
| 12dfc56a73 | |||
| 9bf7365a2c | |||
| 5e745cd035 | |||
| 30bf0529ed | |||
| 576ea2c3f6 | |||
| 7dd7e96cce | |||
| a1cfbbac48 | |||
| af27cded19 | |||
| 41aaa8ff70 | |||
| ddea599022 | 
					 13 changed files with 1099 additions and 424 deletions
				
			
		|  | @ -6,20 +6,22 @@ Domain ::= VisibleString | |||
| 
 | ||||
| Partnum ::= INTEGER (0..150) | ||||
| 
 | ||||
| Action ::= ENUMERATED {queryID, join, leave, stabilise, ping} | ||||
| Action ::= ENUMERATED {queryID, join, leave, stabilise, ping, queryLoad} | ||||
| 
 | ||||
| Request ::= SEQUENCE { | ||||
| 	action			Action, | ||||
| 	requestID		INTEGER (0..4294967295),	-- arbitrarily restricting to an unsigned 32bit integer | ||||
| 	receiverID		NodeID, | ||||
| 	sender			NodeState, | ||||
| 	part			Partnum,	-- part number of this message, starts at 1 | ||||
| 	finalPart		BOOLEAN,	-- flag indicating this `part` to be the last of this reuest | ||||
| 	actionPayload	CHOICE { | ||||
| 		queryIDRequestPayload		QueryIDRequestPayload, | ||||
| 		joinRequestPayload			JoinRequestPayload, | ||||
| 		leaveRequestPayload		LeaveRequestPayload, | ||||
| 		stabiliseRequestPayload	StabiliseRequestPayload, | ||||
| 		pingRequestPayload			PingRequestPayload | ||||
| 		leaveRequestPayload			LeaveRequestPayload, | ||||
| 		stabiliseRequestPayload		StabiliseRequestPayload, | ||||
| 		pingRequestPayload			PingRequestPayload, | ||||
| 		loadRequestPayload			LoadRequestPayload | ||||
| 		}			OPTIONAL			-- just for symmetry reasons with response, requests without a payload have no meaning | ||||
| 	} | ||||
| 
 | ||||
|  | @ -34,11 +36,12 @@ Response ::= SEQUENCE { | |||
| 	finalPart		BOOLEAN,	-- flag indicating this `part` to be the last of this response | ||||
| 	action			Action, | ||||
| 	actionPayload	CHOICE { | ||||
| 		queryIDResponsePayload	QueryIDResponsePayload, | ||||
| 		joinResponsePayload		JoinResponsePayload, | ||||
| 		queryIDResponsePayload		QueryIDResponsePayload, | ||||
| 		joinResponsePayload			JoinResponsePayload, | ||||
| 		leaveResponsePayload		LeaveResponsePayload, | ||||
| 		stabiliseResponsePayload	StabiliseResponsePayload, | ||||
| 		pingResponsePayload		PingResponsePayload | ||||
| 		pingResponsePayload			PingResponsePayload, | ||||
| 		loadResponsePayload			LoadResponsePayload | ||||
| 		}			OPTIONAL			-- no payload when just ACKing a previous request | ||||
| 	} | ||||
| 
 | ||||
|  | @ -101,5 +104,15 @@ PingRequestPayload ::= NULL		-- do not include a node/ vserver ID, so that | |||
| -- learning all active vserver IDs handled by the server at once | ||||
| PingResponsePayload ::= SEQUENCE OF NodeState | ||||
| 
 | ||||
| LoadRequestPayload ::= SEQUENCE { | ||||
| 	upperSegmentBound		NodeID | ||||
| 	} | ||||
| 
 | ||||
| LoadResponsePayload ::= SEQUENCE { | ||||
| 	loadSum					REAL, | ||||
| 	remainingLoadTarget		REAL, | ||||
| 	totalCapacity			REAL, | ||||
| 	lowerBound				NodeID | ||||
| 	} | ||||
| 
 | ||||
| END | ||||
|  |  | |||
|  | @ -91,7 +91,7 @@ executable Hash2Pub | |||
|   -- Base language which the package is written in. | ||||
|   default-language:    Haskell2010 | ||||
| 
 | ||||
|   ghc-options:         -threaded -rtsopts -with-rtsopts=-N | ||||
|   ghc-options:         -threaded | ||||
| 
 | ||||
| executable Experiment | ||||
|   -- experiment runner | ||||
|  |  | |||
|  | @ -1,7 +1,7 @@ | |||
| # Hash2Pub | ||||
| 
 | ||||
| ***This is heavily WIP and does not provide any useful functionality yet***.   | ||||
| I aim for always having the `mainline` branch in a state where it builds and tests pass. | ||||
| I aim for always having the master branch at a state where it builds and tests pass. | ||||
| 
 | ||||
| A fully-decentralised relay for global hashtag federation in [ActivityPub](https://activitypub.rocks) based on a distributed hash table. | ||||
| It allows querying and subscribing to all posts of a certain hashtag and is implemented in Haskell. | ||||
|  | @ -10,8 +10,6 @@ This is the practical implementation of the concept presented in the paper [Dece | |||
| 
 | ||||
| The ASN.1 module schema used for DHT messages can be found in `FediChord.asn1`. | ||||
| 
 | ||||
| For further questions and discussins, please refer to the **Hash2Pub topic in [SocialHub](https://socialhub.activitypub.rocks/c/software/hash2pub/48)**. | ||||
| 
 | ||||
| ## Building | ||||
| 
 | ||||
| The project and its developent environment are built with [Nix](https://nixos.org/nix/). | ||||
|  |  | |||
|  | @ -40,7 +40,7 @@ executeSchedule :: Int  -- ^ speedup factor | |||
|                 -> IO () | ||||
| executeSchedule speedup events = do | ||||
|     -- initialise HTTP manager | ||||
|     httpMan <- HTTP.newManager $ HTTP.defaultManagerSettings { HTTP.managerResponseTimeout = HTTP.responseTimeoutMicro 60000000 } | ||||
|     httpMan <- HTTP.newManager HTTP.defaultManagerSettings | ||||
|     forM_ events $ \(delay, tag, (pubHost, pubPort)) -> do | ||||
|         _ <- forkIO $ | ||||
|             clientPublishPost httpMan pubHost pubPort ("foobar #" <> tag) | ||||
|  |  | |||
							
								
								
									
										35
									
								
								app/Main.hs
									
										
									
									
									
								
							
							
						
						
									
										35
									
								
								app/Main.hs
									
										
									
									
									
								
							|  | @ -18,38 +18,20 @@ main = do | |||
|     -- ToDo: parse and pass config | ||||
|     -- probably use `tomland` for that | ||||
|     (fConf, sConf) <- readConfig | ||||
|     -- TODO: first initialise 'RealNode', then the vservers | ||||
|     -- ToDo: load persisted caches, bootstrapping nodes … | ||||
|     (serverSock, thisNode) <- fediChordInit fConf (runService sConf :: DHT d => d -> IO (PostService d)) | ||||
|     -- currently no masking is necessary, as there is nothing to clean up | ||||
|     nodeCacheWriterThread <- forkIO $ nodeCacheWriter thisNode | ||||
|     -- try joining the DHT using one of the provided bootstrapping nodes | ||||
|     joinedState <- tryBootstrapJoining thisNode | ||||
|     either (\err -> do | ||||
|         -- handle unsuccessful join | ||||
| 
 | ||||
|         putStrLn $ err <> " Error joining, start listening for incoming requests anyways" | ||||
|         print =<< readTVarIO thisNode | ||||
|         -- launch thread attempting to join on new cache entries | ||||
|         _ <- forkIO $ joinOnNewEntriesThread thisNode | ||||
|         wait =<< async (fediMainThreads serverSock thisNode) | ||||
|            ) | ||||
|            (\joinedNS -> do | ||||
|         -- launch main eventloop with successfully joined state | ||||
|         putStrLn "successful join" | ||||
|         wait =<< async (fediMainThreads serverSock thisNode) | ||||
|            ) | ||||
|         joinedState | ||||
|     pure () | ||||
|     (fediThreads, thisNode) <- fediChordInit fConf (runService sConf :: DHT d => d -> IO (PostService d)) | ||||
|     -- wait for all DHT threads to terminate, this keeps the main thread running | ||||
|     wait fediThreads | ||||
| 
 | ||||
| 
 | ||||
| readConfig :: IO (FediChordConf, ServiceConf) | ||||
| readConfig = do | ||||
|     confDomainString : ipString : portString : servicePortString : speedupString : remainingArgs <- getArgs | ||||
|     confDomainString : ipString : portString : servicePortString : speedupString : loadBalancingEnabled : remainingArgs <- getArgs | ||||
|     -- allow starting the initial node without bootstrapping info to avoid | ||||
|     -- waiting for timeout | ||||
|     let | ||||
|         speedup = read speedupString | ||||
|         statsEvalD = 120 * 10^6 `div` speedup | ||||
|         confBootstrapNodes' = case remainingArgs of | ||||
|             bootstrapHost : bootstrapPortString : _ -> | ||||
|                 [(bootstrapHost, read bootstrapPortString)] | ||||
|  | @ -67,6 +49,11 @@ readConfig = do | |||
|           , confResponsePurgeAge = 60 / fromIntegral speedup | ||||
|           , confRequestTimeout = 5 * 10^6 `div` speedup | ||||
|           , confRequestRetries = 3 | ||||
|           , confEnableKChoices = loadBalancingEnabled /= "off" | ||||
|           , confKChoicesOverload = 0.9 | ||||
|           , confKChoicesUnderload = 0.1 | ||||
|           , confKChoicesMaxVS = 8 | ||||
|           , confKChoicesRebalanceInterval = round  (realToFrac statsEvalD * 1.1 :: Double) | ||||
|           } | ||||
|         sConf = ServiceConf | ||||
|           { confSubscriptionExpiryTime = 24*3600 / fromIntegral speedup | ||||
|  | @ -74,7 +61,7 @@ readConfig = do | |||
|           , confServiceHost = confDomainString | ||||
|           , confLogfilePath = "../simulationData/logs/" <> confDomainString <> ".log" | ||||
|           , confSpeedupFactor = speedup | ||||
|           , confStatsEvalDelay = 120 * 10^6 `div` speedup | ||||
|           , confStatsEvalDelay = statsEvalD | ||||
|           } | ||||
|     pure (fConf, sConf) | ||||
| 
 | ||||
|  |  | |||
|  | @ -184,6 +184,19 @@ encodePayload payload'@PingResponsePayload{} = | |||
|     Start Sequence | ||||
|   : concatMap encodeNodeState (pingNodeStates payload') | ||||
|   <> [End Sequence] | ||||
| encodePayload payload'@LoadRequestPayload{} = | ||||
|   [ Start Sequence | ||||
|   , IntVal . getNodeID $ loadSegmentUpperBound payload' | ||||
|   , End Sequence | ||||
|   ] | ||||
| encodePayload payload'@LoadResponsePayload{} = | ||||
|   [ Start Sequence | ||||
|   , Real $ loadSum payload' | ||||
|   , Real $ loadRemainingTarget payload' | ||||
|   , Real $ loadTotalCapacity payload' | ||||
|   , IntVal . getNodeID $ loadSegmentLowerBound payload' | ||||
|   , End Sequence | ||||
|   ] | ||||
| 
 | ||||
| encodeNodeState :: NodeState a => a -> [ASN1] | ||||
| encodeNodeState ns = [ | ||||
|  | @ -193,7 +206,7 @@ encodeNodeState ns = [ | |||
|   , OctetString (ipAddrAsBS $ getIpAddr ns) | ||||
|   , IntVal (toInteger . getDhtPort $ ns) | ||||
|   , IntVal (toInteger . getServicePort $ ns) | ||||
|   , IntVal (getVServerID ns) | ||||
|   , IntVal (toInteger $ getVServerID ns) | ||||
|   , End Sequence | ||||
|                      ] | ||||
| 
 | ||||
|  | @ -215,10 +228,11 @@ encodeQueryResult FORWARD{} = Enumerated 1 | |||
| encodeMessage :: FediChordMessage   -- ^ the 'FediChordMessage to be encoded | ||||
|               -> [ASN1] | ||||
| encodeMessage | ||||
|     (Request requestID sender part isFinalPart action requestPayload) = | ||||
|     (Request requestID receiverID sender part isFinalPart action requestPayload) = | ||||
|         Start Sequence | ||||
|       : (Enumerated . fromIntegral . fromEnum $ action) | ||||
|       : IntVal requestID | ||||
|       : (IntVal . getNodeID $ receiverID) | ||||
|       : encodeNodeState sender | ||||
|       <> [IntVal part | ||||
|         , Boolean isFinalPart] | ||||
|  | @ -262,18 +276,20 @@ parseMessage = do | |||
| parseRequest :: Action -> ParseASN1 FediChordMessage | ||||
| parseRequest action = do | ||||
|     requestID <- parseInteger | ||||
|     receiverID' <- fromInteger <$> parseInteger | ||||
|     sender <- parseNodeState | ||||
|     part <- parseInteger | ||||
|     isFinalPart <- parseBool | ||||
|     hasPayload <- hasNext | ||||
|     payload <- if not hasPayload then pure Nothing else Just <$> case action of | ||||
|                  QueryID   -> parseQueryIDRequest | ||||
|                  Join      -> parseJoinRequest | ||||
|                  Leave     -> parseLeaveRequest | ||||
|                  Stabilise -> parseStabiliseRequest | ||||
|                  Ping      -> parsePingRequest | ||||
|                  QueryID   -> parseQueryIDRequestPayload | ||||
|                  Join      -> parseJoinRequestPayload | ||||
|                  Leave     -> parseLeaveRequestPayload | ||||
|                  Stabilise -> parseStabiliseRequestPayload | ||||
|                  Ping      -> parsePingRequestPayload | ||||
|                  QueryLoad -> parseLoadRequestPayload | ||||
| 
 | ||||
|     pure $ Request requestID sender part isFinalPart action payload | ||||
|     pure $ Request requestID receiverID' sender part isFinalPart action payload | ||||
| 
 | ||||
| parseResponse :: Integer -> ParseASN1 FediChordMessage | ||||
| parseResponse requestID = do | ||||
|  | @ -283,11 +299,12 @@ parseResponse requestID = do | |||
|     action <- parseEnum :: ParseASN1 Action | ||||
|     hasPayload <- hasNext | ||||
|     payload <- if not hasPayload then pure Nothing else Just <$> case action of | ||||
|                  QueryID   -> parseQueryIDResponse | ||||
|                  Join      -> parseJoinResponse | ||||
|                  Leave     -> parseLeaveResponse | ||||
|                  Stabilise -> parseStabiliseResponse | ||||
|                  Ping      -> parsePingResponse | ||||
|                  QueryID   -> parseQueryIDResponsePayload | ||||
|                  Join      -> parseJoinResponsePayload | ||||
|                  Leave     -> parseLeaveResponsePayload | ||||
|                  Stabilise -> parseStabiliseResponsePayload | ||||
|                  Ping      -> parsePingResponsePayload | ||||
|                  QueryLoad -> parseLoadResponsePayload | ||||
| 
 | ||||
|     pure $ Response requestID senderID part isFinalPart action payload | ||||
| 
 | ||||
|  | @ -305,6 +322,13 @@ parseInteger = do | |||
|         IntVal parsed -> pure parsed | ||||
|         x -> throwParseError $ "Expected IntVal but got " <> show x | ||||
| 
 | ||||
| parseReal :: ParseASN1 Double | ||||
| parseReal = do | ||||
|     i <- getNext | ||||
|     case i of | ||||
|       Real parsed -> pure parsed | ||||
|       x           -> throwParseError $ "Expected Real but got " <> show x | ||||
| 
 | ||||
| parseEnum :: Enum a => ParseASN1 a | ||||
| parseEnum = do | ||||
|     e <- getNext | ||||
|  | @ -346,7 +370,7 @@ parseNodeState = onNextContainer Sequence $ do | |||
|       , domain = domain' | ||||
|       , dhtPort = dhtPort' | ||||
|       , servicePort = servicePort' | ||||
|       , vServerID = vServer' | ||||
|       , vServerID = fromInteger vServer' | ||||
|       , ipAddr = ip' | ||||
|                      } | ||||
| 
 | ||||
|  | @ -360,13 +384,13 @@ parseCacheEntry = onNextContainer Sequence $ do | |||
| parseNodeCache :: ParseASN1 [RemoteCacheEntry] | ||||
| parseNodeCache = onNextContainer Sequence $ getMany parseCacheEntry | ||||
| 
 | ||||
| parseJoinRequest :: ParseASN1 ActionPayload | ||||
| parseJoinRequest = do | ||||
| parseJoinRequestPayload :: ParseASN1 ActionPayload | ||||
| parseJoinRequestPayload = do | ||||
|     parseNull | ||||
|     pure JoinRequestPayload | ||||
| 
 | ||||
| parseJoinResponse :: ParseASN1 ActionPayload | ||||
| parseJoinResponse = onNextContainer Sequence $ do | ||||
| parseJoinResponsePayload :: ParseASN1 ActionPayload | ||||
| parseJoinResponsePayload = onNextContainer Sequence $ do | ||||
|     succ' <- onNextContainer Sequence (getMany parseNodeState) | ||||
|     pred' <- onNextContainer Sequence (getMany parseNodeState) | ||||
|     cache <- parseNodeCache | ||||
|  | @ -376,8 +400,8 @@ parseJoinResponse = onNextContainer Sequence $ do | |||
|       , joinCache = cache | ||||
|                                  } | ||||
| 
 | ||||
| parseQueryIDRequest :: ParseASN1 ActionPayload | ||||
| parseQueryIDRequest = onNextContainer Sequence $ do | ||||
| parseQueryIDRequestPayload :: ParseASN1 ActionPayload | ||||
| parseQueryIDRequestPayload = onNextContainer Sequence $ do | ||||
|     targetID <- fromInteger <$> parseInteger | ||||
|     lBestNodes <- parseInteger | ||||
|     pure $ QueryIDRequestPayload { | ||||
|  | @ -385,8 +409,8 @@ parseQueryIDRequest = onNextContainer Sequence $ do | |||
|       , queryLBestNodes = lBestNodes | ||||
|                                    } | ||||
| 
 | ||||
| parseQueryIDResponse :: ParseASN1 ActionPayload | ||||
| parseQueryIDResponse = onNextContainer Sequence $ do | ||||
| parseQueryIDResponsePayload :: ParseASN1 ActionPayload | ||||
| parseQueryIDResponsePayload = onNextContainer Sequence $ do | ||||
|     Enumerated resultType <- getNext | ||||
|     result <- case resultType of | ||||
|                   0 -> FOUND <$> parseNodeState | ||||
|  | @ -396,13 +420,13 @@ parseQueryIDResponse = onNextContainer Sequence $ do | |||
|         queryResult = result | ||||
|                            } | ||||
| 
 | ||||
| parseStabiliseRequest :: ParseASN1 ActionPayload | ||||
| parseStabiliseRequest = do | ||||
| parseStabiliseRequestPayload :: ParseASN1 ActionPayload | ||||
| parseStabiliseRequestPayload = do | ||||
|     parseNull | ||||
|     pure StabiliseRequestPayload | ||||
| 
 | ||||
| parseStabiliseResponse :: ParseASN1 ActionPayload | ||||
| parseStabiliseResponse = onNextContainer Sequence $ do | ||||
| parseStabiliseResponsePayload :: ParseASN1 ActionPayload | ||||
| parseStabiliseResponsePayload = onNextContainer Sequence $ do | ||||
|     succ' <- onNextContainer Sequence (getMany parseNodeState) | ||||
|     pred' <- onNextContainer Sequence (getMany parseNodeState) | ||||
|     pure $ StabiliseResponsePayload { | ||||
|  | @ -410,8 +434,8 @@ parseStabiliseResponse = onNextContainer Sequence $ do | |||
|       , stabilisePredecessors = pred' | ||||
|                                       } | ||||
| 
 | ||||
| parseLeaveRequest :: ParseASN1 ActionPayload | ||||
| parseLeaveRequest = onNextContainer Sequence $ do | ||||
| parseLeaveRequestPayload :: ParseASN1 ActionPayload | ||||
| parseLeaveRequestPayload = onNextContainer Sequence $ do | ||||
|     succ' <- onNextContainer Sequence (getMany parseNodeState) | ||||
|     pred' <- onNextContainer Sequence (getMany parseNodeState) | ||||
|     doMigration <- parseBool | ||||
|  | @ -421,19 +445,40 @@ parseLeaveRequest = onNextContainer Sequence $ do | |||
|       , leaveDoMigration = doMigration | ||||
|                                       } | ||||
| 
 | ||||
| parseLeaveResponse :: ParseASN1 ActionPayload | ||||
| parseLeaveResponse = do | ||||
| parseLeaveResponsePayload :: ParseASN1 ActionPayload | ||||
| parseLeaveResponsePayload = do | ||||
|     parseNull | ||||
|     pure LeaveResponsePayload | ||||
| 
 | ||||
| parsePingRequest :: ParseASN1 ActionPayload | ||||
| parsePingRequest = do | ||||
| parsePingRequestPayload :: ParseASN1 ActionPayload | ||||
| parsePingRequestPayload = do | ||||
|     parseNull | ||||
|     pure PingRequestPayload | ||||
| 
 | ||||
| parsePingResponse :: ParseASN1 ActionPayload | ||||
| parsePingResponse = onNextContainer Sequence $ do | ||||
| parsePingResponsePayload :: ParseASN1 ActionPayload | ||||
| parsePingResponsePayload = onNextContainer Sequence $ do | ||||
|     handledNodes <- getMany parseNodeState | ||||
|     pure $ PingResponsePayload { | ||||
|         pingNodeStates = handledNodes | ||||
|                                  } | ||||
| 
 | ||||
| parseLoadRequestPayload :: ParseASN1 ActionPayload | ||||
| parseLoadRequestPayload = onNextContainer Sequence $ do | ||||
|     loadUpperBound' <- fromInteger <$> parseInteger | ||||
|     pure LoadRequestPayload | ||||
|         { loadSegmentUpperBound = loadUpperBound' | ||||
|         } | ||||
| 
 | ||||
| parseLoadResponsePayload :: ParseASN1 ActionPayload | ||||
| parseLoadResponsePayload = onNextContainer Sequence $ do | ||||
|     loadSum' <- parseReal | ||||
|     loadRemainingTarget' <- parseReal | ||||
|     loadTotalCapacity' <- parseReal | ||||
|     loadSegmentLowerBound' <- fromInteger <$> parseInteger | ||||
|     pure LoadResponsePayload | ||||
|         { loadSum = loadSum' | ||||
|         , loadRemainingTarget = loadRemainingTarget' | ||||
|         , loadTotalCapacity = loadTotalCapacity' | ||||
|         , loadSegmentLowerBound = loadSegmentLowerBound' | ||||
|         } | ||||
| 
 | ||||
|  |  | |||
|  | @ -15,6 +15,7 @@ module Hash2Pub.DHTProtocol | |||
|     , Action(..) | ||||
|     , ActionPayload(..) | ||||
|     , FediChordMessage(..) | ||||
|     , mkRequest | ||||
|     , maximumParts | ||||
|     , sendQueryIdMessages | ||||
|     , requestQueryID | ||||
|  | @ -22,6 +23,7 @@ module Hash2Pub.DHTProtocol | |||
|     , requestLeave | ||||
|     , requestPing | ||||
|     , requestStabilise | ||||
|     , requestQueryLoad | ||||
|     , lookupMessage | ||||
|     , sendRequestTo | ||||
|     , queryIdLookupLoop | ||||
|  | @ -36,7 +38,7 @@ module Hash2Pub.DHTProtocol | |||
|     , isPossibleSuccessor | ||||
|     , isPossiblePredecessor | ||||
|     , isInOwnResponsibilitySlice | ||||
|     , isJoined | ||||
|     , vsIsJoined | ||||
|     , closestCachePredecessors | ||||
|     ) | ||||
|         where | ||||
|  | @ -49,7 +51,8 @@ import           Control.Concurrent.STM.TQueue | |||
| import           Control.Concurrent.STM.TVar | ||||
| import           Control.Exception | ||||
| import           Control.Monad                  (foldM, forM, forM_, void, when) | ||||
| import           Control.Monad.Except           (MonadError (..), runExceptT) | ||||
| import           Control.Monad.Except           (MonadError (..), liftEither, | ||||
|                                                  runExceptT) | ||||
| import           Control.Monad.IO.Class         (MonadIO (..)) | ||||
| import qualified Data.ByteString                as BS | ||||
| import           Data.Either                    (rights) | ||||
|  | @ -63,6 +66,7 @@ import           Data.Maybe                     (fromJust, fromMaybe, isJust, | |||
|                                                  isNothing, mapMaybe, maybe) | ||||
| import qualified Data.Set                       as Set | ||||
| import           Data.Time.Clock.POSIX | ||||
| import           Data.Word                      (Word8) | ||||
| import           Network.Socket                 hiding (recv, recvFrom, send, | ||||
|                                                  sendTo) | ||||
| import           Network.Socket.ByteString | ||||
|  | @ -74,23 +78,27 @@ import           Hash2Pub.ASN1Coding | |||
| import           Hash2Pub.FediChordTypes        (CacheEntry (..), | ||||
|                                                  CacheEntry (..), | ||||
|                                                  FediChordConf (..), | ||||
|                                                  HasKeyID (..), | ||||
|                                                  HasKeyID (..), LoadStats (..), | ||||
|                                                  LocalNodeState (..), | ||||
|                                                  LocalNodeStateSTM, NodeCache, | ||||
|                                                  NodeID, NodeState (..), | ||||
|                                                  RealNode (..), RealNodeSTM, | ||||
|                                                  RemoteNodeState (..), | ||||
|                                                  RingEntry (..), RingMap (..), | ||||
|                                                  SegmentLoadStats (..), | ||||
|                                                  Service (..), addRMapEntry, | ||||
|                                                  addRMapEntryWith, | ||||
|                                                  cacheGetNodeStateUnvalidated, | ||||
|                                                  cacheLookup, cacheLookupPred, | ||||
|                                                  cacheLookupSucc, genNodeID, | ||||
|                                                  getKeyID, localCompare, | ||||
|                                                  getKeyID, hasValidNodeId, | ||||
|                                                  loadSliceSum, localCompare, | ||||
|                                                  rMapFromList, rMapLookupPred, | ||||
|                                                  rMapLookupSucc, | ||||
|                                                  remainingLoadTarget, | ||||
|                                                  setPredecessors, setSuccessors) | ||||
| import           Hash2Pub.ProtocolTypes | ||||
| import           Hash2Pub.RingMap | ||||
| 
 | ||||
| import           Debug.Trace                    (trace) | ||||
| 
 | ||||
|  | @ -103,7 +111,7 @@ queryLocalCache ownState nCache lBestNodes targetID | |||
|     -- as target ID falls between own ID and first predecessor, it is handled by this node | ||||
|     -- This only makes sense if the node is part of the DHT by having joined. | ||||
|     -- A default answer to nodes querying an unjoined node is provided by 'respondQueryID'. | ||||
|       | isJoined ownState && targetID `isInOwnResponsibilitySlice` ownState = FOUND . toRemoteNodeState $ ownState | ||||
|       | vsIsJoined ownState && targetID `isInOwnResponsibilitySlice` ownState = FOUND . toRemoteNodeState $ ownState | ||||
|     -- my interpretation: the "l best next hops" are the l-1 closest preceding nodes and | ||||
|     -- the closest succeeding node (like with the p initiated parallel queries | ||||
|       | otherwise = FORWARD $ closestSuccessor `Set.union` closestCachePredecessors (lBestNodes-1) targetID nCache | ||||
|  | @ -227,8 +235,8 @@ markCacheEntryAsVerified timestamp nid = RingMap . Map.adjust adjustFunc nid . g | |||
| 
 | ||||
| -- | uses the successor and predecessor list of a node as an indicator for whether a | ||||
| -- node has properly joined the DHT | ||||
| isJoined :: LocalNodeState s -> Bool | ||||
| isJoined ns = not . all null $ [successors ns, predecessors ns] | ||||
| vsIsJoined :: LocalNodeState s -> Bool | ||||
| vsIsJoined ns = not . all null $ [successors ns, predecessors ns] | ||||
| 
 | ||||
| -- | the size limit to be used when serialising messages for sending | ||||
| sendMessageSize :: Num i => i | ||||
|  | @ -237,27 +245,37 @@ sendMessageSize = 1200 | |||
| -- ====== message send and receive operations ====== | ||||
| 
 | ||||
| -- encode the response to a request that just signals successful receipt | ||||
| ackRequest :: NodeID -> FediChordMessage -> Map.Map Integer BS.ByteString | ||||
| ackRequest ownID req@Request{} = serialiseMessage sendMessageSize $ Response { | ||||
| ackRequest :: FediChordMessage -> Map.Map Integer BS.ByteString | ||||
| ackRequest req@Request{} = serialiseMessage sendMessageSize $ Response { | ||||
|     requestID = requestID req | ||||
|   , senderID = ownID | ||||
|   , senderID = receiverID req | ||||
|   , part = part req | ||||
|   , isFinalPart = False | ||||
|   , action = action req | ||||
|   , payload = Nothing | ||||
|                                                                              } | ||||
| ackRequest _ _ = Map.empty | ||||
| ackRequest _ = Map.empty | ||||
| 
 | ||||
| 
 | ||||
| -- | extract the first payload from a received message set | ||||
| extractFirstPayload :: Set.Set FediChordMessage -> Maybe ActionPayload | ||||
| extractFirstPayload msgSet = foldr' (\msg plAcc -> | ||||
|     if isNothing plAcc && isJust (payload msg) | ||||
|        then payload msg | ||||
|        else plAcc | ||||
|                                  ) Nothing msgSet | ||||
| 
 | ||||
| 
 | ||||
| -- | Dispatch incoming requests to the dedicated handling and response function, and enqueue | ||||
| -- the response to be sent. | ||||
| handleIncomingRequest :: Service s (RealNodeSTM s) | ||||
|                       => LocalNodeStateSTM s                     -- ^ the handling node | ||||
|                       => Word8                              -- ^ maximum number of vservers, because of decision to @dropSpoofedIDs@ in here and not already in @fediMessageHandler@ | ||||
|                       -> LocalNodeStateSTM s                     -- ^ the handling node | ||||
|                       -> TQueue (BS.ByteString, SockAddr)   -- ^ send queue | ||||
|                       -> Set.Set FediChordMessage           -- ^ all parts of the request to handle | ||||
|                       -> SockAddr                           -- ^ source address of the request | ||||
|                       -> IO () | ||||
| handleIncomingRequest nsSTM sendQ msgSet sourceAddr = do | ||||
| handleIncomingRequest vsLimit nsSTM sendQ msgSet sourceAddr = do | ||||
|     ns <- readTVarIO nsSTM | ||||
|     -- add nodestate to cache | ||||
|     now <- getPOSIXTime | ||||
|  | @ -265,19 +283,20 @@ handleIncomingRequest nsSTM sendQ msgSet sourceAddr = do | |||
|       Nothing -> pure () | ||||
|       Just aPart -> do | ||||
|         let (SockAddrInet6 _ _ sourceIP _) = sourceAddr | ||||
|         queueAddEntries (Identity $ RemoteCacheEntry (sender aPart) now) ns | ||||
|         queueAddEntries (Identity $ RemoteCacheEntry (sender aPart) now) (cacheWriteQueue ns) | ||||
|         -- distinguish on whether and how to respond. If responding, pass message to response generating function and write responses to send queue | ||||
|         maybe (pure ()) ( | ||||
|             mapM_ (\resp -> atomically $ writeTQueue sendQ (resp, sourceAddr)) | ||||
|                         ) | ||||
|             =<< (case action aPart of | ||||
|                 Ping -> Just <$> respondPing nsSTM msgSet | ||||
|                 Join -> dropSpoofedIDs sourceIP nsSTM msgSet respondJoin | ||||
|                 Join -> dropSpoofedIDs vsLimit sourceIP nsSTM msgSet respondJoin | ||||
|                 -- ToDo: figure out what happens if not joined | ||||
|                 QueryID -> Just <$> respondQueryID nsSTM msgSet | ||||
|                 -- only when joined | ||||
|                 Leave -> if isJoined ns then dropSpoofedIDs sourceIP nsSTM msgSet respondLeave else pure Nothing | ||||
|                 Stabilise -> if isJoined ns then dropSpoofedIDs sourceIP nsSTM msgSet respondStabilise else pure Nothing | ||||
|                 Leave -> if vsIsJoined ns then dropSpoofedIDs vsLimit sourceIP nsSTM msgSet respondLeave else pure Nothing | ||||
|                 Stabilise -> if vsIsJoined ns then dropSpoofedIDs vsLimit sourceIP nsSTM msgSet respondStabilise else pure Nothing | ||||
|                 QueryLoad -> if vsIsJoined ns then Just <$> respondQueryLoad nsSTM msgSet else pure Nothing | ||||
|             ) | ||||
|       -- for single part request, response starts with part number 1. For multipart requests, response starts with part number n+1. | ||||
| 
 | ||||
|  | @ -287,19 +306,18 @@ handleIncomingRequest nsSTM sendQ msgSet sourceAddr = do | |||
|       -- | Filter out requests with spoofed node IDs by recomputing the ID using | ||||
|       -- the sender IP. | ||||
|       -- For valid (non-spoofed) sender IDs, the passed responder function is invoked. | ||||
|       dropSpoofedIDs :: HostAddress6        -- msg source address | ||||
|       dropSpoofedIDs :: Word8       -- ^ maximum number of vservers per node | ||||
|                      -> HostAddress6        -- ^ msg source address | ||||
|                      -> LocalNodeStateSTM s | ||||
|                      -> Set.Set FediChordMessage    -- message parts of the request | ||||
|                      -> (LocalNodeStateSTM s -> Set.Set FediChordMessage -> IO (Map.Map Integer BS.ByteString))     -- reponder function to be invoked for valid requests | ||||
|                      -> Set.Set FediChordMessage    -- ^ message parts of the request | ||||
|                      -> (LocalNodeStateSTM s -> Set.Set FediChordMessage -> IO (Map.Map Integer BS.ByteString))     -- ^ reponder function to be invoked for valid requests | ||||
|                      -> IO (Maybe (Map.Map Integer BS.ByteString)) | ||||
|       dropSpoofedIDs addr nsSTM' msgSet' responder = | ||||
|       dropSpoofedIDs limVs addr nsSTM' msgSet' responder = | ||||
|           let | ||||
|             aRequestPart = Set.elemAt 0 msgSet | ||||
|             senderNs = sender aRequestPart | ||||
|             givenSenderID = getNid senderNs | ||||
|             recomputedID = genNodeID addr (getDomain senderNs) (fromInteger $ getVServerID senderNs) | ||||
|           in | ||||
|           if recomputedID == givenSenderID | ||||
|           if hasValidNodeId limVs senderNs addr | ||||
|              then Just <$> responder nsSTM' msgSet' | ||||
|              else pure Nothing | ||||
| 
 | ||||
|  | @ -317,19 +335,15 @@ respondQueryID nsSTM msgSet = do | |||
|     let | ||||
|         aRequestPart = Set.elemAt 0 msgSet | ||||
|         senderID = getNid . sender $ aRequestPart | ||||
|         senderPayload = foldr' (\msg plAcc -> | ||||
|             if isNothing plAcc && isJust (payload msg) | ||||
|                then payload msg | ||||
|                else plAcc | ||||
|                                ) Nothing msgSet | ||||
|     -- return only empty message serialisation if no payload was included in parts | ||||
|         senderPayload = extractFirstPayload msgSet | ||||
|         -- return only empty message serialisation if no payload was included in parts | ||||
|     maybe (pure Map.empty) (\senderPayload' -> do | ||||
|         responseMsg <- atomically $ do | ||||
|             nsSnap <- readTVar nsSTM | ||||
|             cache <- readTVar $ nodeCacheSTM nsSnap | ||||
|             let | ||||
|                 responsePayload = QueryIDResponsePayload { | ||||
|                     queryResult = if isJoined nsSnap | ||||
|                     queryResult = if vsIsJoined nsSnap | ||||
|                                      then queryLocalCache nsSnap cache (fromIntegral $ queryLBestNodes senderPayload') (queryTargetID senderPayload') | ||||
|                                      -- if not joined yet, attract responsibility for | ||||
|                                      -- all keys to make bootstrapping possible | ||||
|  | @ -422,6 +436,47 @@ respondPing nsSTM msgSet = do | |||
|                                 } | ||||
|     pure $ serialiseMessage sendMessageSize pingResponse | ||||
| 
 | ||||
| respondQueryLoad :: Service s (RealNodeSTM s) => LocalNodeStateSTM s -> Set.Set FediChordMessage -> IO (Map.Map Integer BS.ByteString) | ||||
| respondQueryLoad nsSTM msgSet = do | ||||
|     nsSnap <- readTVarIO nsSTM | ||||
|     -- this message cannot be split reasonably, so just | ||||
|     -- consider the first payload | ||||
|     let | ||||
|         aRequestPart = Set.elemAt 0 msgSet | ||||
|         senderPayload = extractFirstPayload msgSet | ||||
|     responsePl <- maybe (pure Nothing) (\pl -> | ||||
|         case pl of | ||||
|           LoadRequestPayload{} -> do | ||||
|             parentNode <- readTVarIO (parentRealNode nsSnap) | ||||
|             let | ||||
|                 serv = nodeService parentNode | ||||
|                 conf = nodeConfig parentNode | ||||
|             lStats <- getServiceLoadStats serv | ||||
|             let | ||||
|                 directSucc = getNid . head . predecessors $ nsSnap | ||||
|                 handledTagSum = loadSliceSum lStats directSucc (loadSegmentUpperBound pl) | ||||
|             pure $ Just LoadResponsePayload | ||||
|                 { loadSum = handledTagSum | ||||
|                 , loadRemainingTarget = remainingLoadTarget conf lStats | ||||
|                 , loadTotalCapacity = totalCapacity lStats | ||||
|                 , loadSegmentLowerBound = directSucc | ||||
|                 } | ||||
|           _ -> pure Nothing | ||||
|                            ) | ||||
|                            senderPayload | ||||
| 
 | ||||
|     pure $ maybe | ||||
|         Map.empty | ||||
|         (\pl -> serialiseMessage sendMessageSize $ Response | ||||
|             { requestID = requestID aRequestPart | ||||
|             , senderID = getNid nsSnap | ||||
|             , part = if Set.size msgSet == 1 then 1 else fromIntegral $ Set.size msgSet + 1 | ||||
|             , isFinalPart = False | ||||
|             , action = QueryLoad | ||||
|             , payload = Just pl | ||||
|             } | ||||
|         ) responsePl | ||||
| 
 | ||||
| 
 | ||||
| respondJoin :: Service s (RealNodeSTM s) => LocalNodeStateSTM s -> Set.Set FediChordMessage -> IO (Map.Map Integer BS.ByteString) | ||||
| respondJoin nsSTM msgSet = do | ||||
|  | @ -434,7 +489,7 @@ respondJoin nsSTM msgSet = do | |||
|             senderNS = sender aRequestPart | ||||
|             -- if not joined yet, attract responsibility for | ||||
|             -- all keys to make bootstrapping possible | ||||
|             responsibilityLookup = if isJoined nsSnap then queryLocalCache nsSnap cache 1 (getNid senderNS) else FOUND (toRemoteNodeState nsSnap) | ||||
|             responsibilityLookup = if vsIsJoined nsSnap then queryLocalCache nsSnap cache 1 (getNid senderNS) else FOUND (toRemoteNodeState nsSnap) | ||||
|             thisNodeResponsible (FOUND _)   = True | ||||
|             thisNodeResponsible (FORWARD _) = False | ||||
|         -- check whether the joining node falls into our responsibility | ||||
|  | @ -481,6 +536,21 @@ respondJoin nsSTM msgSet = do | |||
| 
 | ||||
| -- ....... request sending ....... | ||||
| 
 | ||||
| -- | defautl constructor for request messages, fills standard values like | ||||
| -- part number to avoid code repition | ||||
| mkRequest :: LocalNodeState s -> NodeID -> Action -> Maybe ActionPayload -> (Integer -> FediChordMessage) | ||||
| mkRequest ns targetID action pl rid = Request | ||||
|     { requestID = rid | ||||
|     , receiverID = targetID | ||||
|     , sender = toRemoteNodeState ns | ||||
|     -- part number and final flag can be changed by ASN1 encoder to make packet | ||||
|     -- fit the MTU restrictions | ||||
|     , part = 1 | ||||
|     , isFinalPart = True | ||||
|     , action = action | ||||
|     , payload = pl | ||||
|     } | ||||
| 
 | ||||
| -- | send a join request and return the joined 'LocalNodeState' including neighbours | ||||
| requestJoin :: (NodeState a, Service s (RealNodeSTM s)) => a             -- ^ currently responsible node to be contacted | ||||
|             -> LocalNodeStateSTM s               -- ^ joining NodeState | ||||
|  | @ -492,7 +562,7 @@ requestJoin toJoinOn ownStateSTM = do | |||
|     let srcAddr = confIP nodeConf | ||||
|     bracket (mkSendSocket srcAddr (getDomain toJoinOn) (getDhtPort toJoinOn)) close (\sock -> do | ||||
|         -- extract own state for getting request information | ||||
|         responses <- sendRequestTo (confRequestTimeout nodeConf) (confRequestRetries nodeConf) (\rid -> Request rid (toRemoteNodeState ownState) 1 True Join (Just JoinRequestPayload)) sock | ||||
|         responses <- sendRequestTo (confRequestTimeout nodeConf) (confRequestRetries nodeConf) (mkRequest ownState (getNid toJoinOn) Join (Just JoinRequestPayload)) sock | ||||
|         (cacheInsertQ, joinedState) <- atomically $ do | ||||
|             stateSnap <- readTVar ownStateSTM | ||||
|             let | ||||
|  | @ -523,7 +593,7 @@ requestJoin toJoinOn ownStateSTM = do | |||
|             writeTVar ownStateSTM newState | ||||
|             pure (cacheInsertQ, newState) | ||||
|         -- execute the cache insertions | ||||
|         mapM_ (\f -> f joinedState) cacheInsertQ | ||||
|         mapM_ (\f -> f (cacheWriteQueue joinedState)) cacheInsertQ | ||||
|         if responses == Set.empty | ||||
|                   then pure . Left $ "join error: got no response from " <> show (getNid toJoinOn) | ||||
|            else do | ||||
|  | @ -581,14 +651,14 @@ sendQueryIdMessages :: (Integral i) | |||
|                     -> Maybe i                       -- ^ optionally provide an explicit @l@ parameter of number of nodes to be returned | ||||
|                    -> [RemoteNodeState]             -- ^ nodes to query | ||||
|                    -> IO QueryResponse -- ^ accumulated response | ||||
| sendQueryIdMessages targetID ns lParam targets = do | ||||
| sendQueryIdMessages lookupID ns lParam targets = do | ||||
| 
 | ||||
|           -- create connected sockets to all query targets and use them for request handling | ||||
| 
 | ||||
|           nodeConf <- nodeConfig <$> readTVarIO (parentRealNode ns) | ||||
|           let srcAddr = confIP nodeConf | ||||
|           queryThreads <- mapM (\resultNode -> async $ bracket (mkSendSocket srcAddr (getDomain resultNode) (getDhtPort resultNode)) close ( | ||||
|               sendRequestTo (confRequestTimeout nodeConf) (confRequestRetries nodeConf) (lookupMessage targetID ns Nothing) | ||||
|               sendRequestTo (confRequestTimeout nodeConf) (confRequestRetries nodeConf) (lookupMessage lookupID ns Nothing (getNid resultNode)) | ||||
|                                                                                                                                    )) targets | ||||
|           -- ToDo: process results immediately instead of waiting for the last one to finish, see https://stackoverflow.com/a/38815224/9198613 | ||||
|           -- ToDo: exception handling, maybe log them | ||||
|  | @ -605,7 +675,7 @@ sendQueryIdMessages targetID ns lParam targets = do | |||
|                              _ -> Set.empty | ||||
| 
 | ||||
|             -- forward entries to global cache | ||||
|             queueAddEntries entrySet ns | ||||
|             queueAddEntries entrySet (cacheWriteQueue ns) | ||||
|             -- return accumulated QueryResult | ||||
|             pure $ case acc of | ||||
|               -- once a FOUND as been encountered, return this as a result | ||||
|  | @ -621,13 +691,14 @@ sendQueryIdMessages targetID ns lParam targets = do | |||
| 
 | ||||
| -- | Create a QueryID message to be supplied to 'sendRequestTo' | ||||
| lookupMessage :: Integral i | ||||
|               => NodeID         -- ^ target ID | ||||
|               => NodeID         -- ^ lookup ID to be looked up | ||||
|               -> LocalNodeState s -- ^ sender node state | ||||
|               -> Maybe i        -- ^ optionally provide a different l parameter | ||||
|               -> NodeID         -- ^ target ID of message destination | ||||
|               ->  (Integer -> FediChordMessage) | ||||
| lookupMessage targetID ns lParam = \rID -> Request rID (toRemoteNodeState ns) 1 True QueryID (Just $ pl ns targetID) | ||||
| lookupMessage lookupID ns lParam targetID = mkRequest ns targetID QueryID (Just $ pl ns lookupID) | ||||
|   where | ||||
|     pl ns' targetID' = QueryIDRequestPayload { queryTargetID = targetID', queryLBestNodes = maybe (fromIntegral $ lNumBestNodes ns) fromIntegral lParam } | ||||
|       pl ns' lookupID' = QueryIDRequestPayload { queryTargetID = lookupID', queryLBestNodes = maybe (fromIntegral $ lNumBestNodes ns') fromIntegral lParam } | ||||
| 
 | ||||
| 
 | ||||
| -- | Send a stabilise request to provided 'RemoteNode' and, if successful, | ||||
|  | @ -638,16 +709,7 @@ requestStabilise :: LocalNodeState s      -- ^ sending node | |||
| requestStabilise ns neighbour = do | ||||
|     nodeConf <- nodeConfig <$> readTVarIO (parentRealNode ns) | ||||
|     let srcAddr = confIP nodeConf | ||||
|     responses <- bracket (mkSendSocket srcAddr (getDomain neighbour) (getDhtPort neighbour)) close (fmap Right . sendRequestTo (confRequestTimeout nodeConf) (confRequestRetries nodeConf) (\rid -> | ||||
|         Request { | ||||
|             requestID = rid | ||||
|           , sender = toRemoteNodeState ns | ||||
|           , part = 1 | ||||
|           , isFinalPart = False | ||||
|           , action = Stabilise | ||||
|           , payload = Just StabiliseRequestPayload | ||||
|                 } | ||||
|                          ) | ||||
|     responses <- bracket (mkSendSocket srcAddr (getDomain neighbour) (getDhtPort neighbour)) close (fmap Right . sendRequestTo (confRequestTimeout nodeConf) (confRequestRetries nodeConf) (mkRequest ns (getNid neighbour) Stabilise (Just StabiliseRequestPayload)) | ||||
|                                                                                            ) `catch`  (\e -> pure . Left $ displayException (e :: IOException)) | ||||
|     either | ||||
|         -- forward IO error messages | ||||
|  | @ -660,7 +722,7 @@ requestStabilise ns neighbour = do | |||
|                                                       ) | ||||
|                                                       ([],[]) respSet | ||||
|             -- update successfully responded neighbour in cache | ||||
|             maybe (pure ()) (\p -> queueUpdateVerifieds (Identity $ senderID p) ns) $ headMay (Set.elems respSet) | ||||
|             maybe (pure ()) (\p -> queueUpdateVerifieds (Identity $ senderID p) (cacheWriteQueue ns)) $ headMay (Set.elems respSet) | ||||
|             pure $ if null responsePreds && null responseSuccs | ||||
|                       then Left "no neighbours returned" | ||||
|                       else Right (responsePreds, responseSuccs) | ||||
|  | @ -682,17 +744,12 @@ requestLeave ns doMigration target = do | |||
|       , leavePredecessors = predecessors ns | ||||
|       , leaveDoMigration = doMigration | ||||
|                                            } | ||||
|     responses <- bracket (mkSendSocket srcAddr (getDomain target) (getDhtPort target)) close (fmap Right . sendRequestTo (confRequestTimeout nodeConf) (confRequestRetries nodeConf) (\rid -> | ||||
|         Request { | ||||
|             requestID = rid | ||||
|           , sender = toRemoteNodeState ns | ||||
|           , part = 1 | ||||
|           , isFinalPart = False | ||||
|           , action = Leave | ||||
|           , payload = Just leavePayload | ||||
|                 } | ||||
|                          ) | ||||
|                                                                                            ) `catch`  (\e -> pure . Left $ displayException (e :: IOException)) | ||||
|     responses <- bracket | ||||
|         (mkSendSocket srcAddr (getDomain target) (getDhtPort target)) | ||||
|         close | ||||
|         (fmap Right | ||||
|         . sendRequestTo (confRequestTimeout nodeConf) (confRequestRetries nodeConf) (mkRequest ns (getNid target) Leave (Just leavePayload)) | ||||
|         ) `catch`  (\e -> pure . Left $ displayException (e :: IOException)) | ||||
|     either | ||||
|         -- forward IO error messages | ||||
|         (pure . Left) | ||||
|  | @ -708,16 +765,7 @@ requestPing ns target = do | |||
|     let srcAddr = confIP nodeConf | ||||
|     responses <- bracket (mkSendSocket srcAddr (getDomain target) (getDhtPort target)) close | ||||
|         (\sock -> do | ||||
|             resp <- sendRequestTo (confRequestTimeout nodeConf) (confRequestRetries nodeConf) (\rid -> | ||||
|                 Request { | ||||
|                     requestID = rid | ||||
|                   , sender = toRemoteNodeState ns | ||||
|                   , part = 1 | ||||
|                   , isFinalPart = False | ||||
|                   , action = Ping | ||||
|                   , payload = Just PingRequestPayload | ||||
|                         } | ||||
|                              ) sock | ||||
|             resp <- sendRequestTo (confRequestTimeout nodeConf) (confRequestRetries nodeConf) (mkRequest ns (getNid target) Ping (Just PingRequestPayload)) sock | ||||
|             (SockAddrInet6 _ _ peerAddr _) <- getPeerName sock | ||||
|             pure $ Right (peerAddr, resp) | ||||
|                                                                                                ) `catch`  (\e -> pure . Left $ displayException (e :: IOException)) | ||||
|  | @ -733,10 +781,9 @@ requestPing ns target = do | |||
|             -- recompute ID for each received node and mark as verified in cache | ||||
|             now <- getPOSIXTime | ||||
|             forM_ responseVss (\vs -> | ||||
|                 let recomputedID = genNodeID peerAddr (getDomain vs) (fromInteger $ getVServerID vs) | ||||
|                  in if recomputedID == getNid vs | ||||
|                        then atomically $ writeTQueue (cacheWriteQueue ns) $ addNodeAsVerifiedPure now vs | ||||
|                        else pure () | ||||
|                 if hasValidNodeId (confKChoicesMaxVS nodeConf) vs peerAddr | ||||
|                    then atomically $ writeTQueue (cacheWriteQueue ns) $ addNodeAsVerifiedPure now vs | ||||
|                    else pure () | ||||
|                               ) | ||||
|             pure $ if null responseVss | ||||
|                       then Left "no active vServer IDs returned, ignoring node" | ||||
|  | @ -744,6 +791,37 @@ requestPing ns target = do | |||
|         ) responses | ||||
| 
 | ||||
| 
 | ||||
| -- still need a particular vserver as LocalNodeState, because requests need a sender | ||||
| requestQueryLoad :: (MonadError String m, MonadIO m) | ||||
|                  => LocalNodeState s    -- ^ the local source vserver for the request | ||||
|                  -> NodeID              -- ^ upper bound of the segment queried, lower bound is set automatically by the queried node | ||||
|                  -> RemoteNodeState     -- ^ target node to query | ||||
|                  -> m SegmentLoadStats | ||||
| requestQueryLoad ns upperIdBound target = do | ||||
|     nodeConf <- nodeConfig <$> liftIO (readTVarIO $ parentRealNode ns) | ||||
|     let | ||||
|         srcAddr = confIP nodeConf | ||||
|         loadReqPl = LoadRequestPayload | ||||
|             { loadSegmentUpperBound = upperIdBound | ||||
|             } | ||||
|     responses <- liftIO $ bracket (mkSendSocket srcAddr (getDomain target) (getDhtPort target)) close | ||||
|         (fmap Right . sendRequestTo (confRequestTimeout nodeConf) (confRequestRetries nodeConf) (mkRequest ns (getNid target) QueryLoad (Just loadReqPl)) | ||||
|         ) `catch`  (\e -> pure . Left $ displayException (e :: IOException)) | ||||
|     responseMsgSet <- liftEither responses | ||||
|     -- throws an error if an exception happened | ||||
|     loadResPl <- maybe (throwError "no load response payload found") pure | ||||
|         (extractFirstPayload responseMsgSet) | ||||
|     pure SegmentLoadStats | ||||
|         { segmentLowerKeyBound = loadSegmentLowerBound loadResPl | ||||
|         , segmentUpperKeyBound = upperIdBound | ||||
|         , segmentLoad = loadSum loadResPl | ||||
|         , segmentOwnerRemainingLoadTarget = loadRemainingTarget loadResPl | ||||
|         , segmentOwnerCapacity = loadTotalCapacity loadResPl | ||||
|         , segmentCurrentOwner = target | ||||
|         } | ||||
| 
 | ||||
| 
 | ||||
| 
 | ||||
| -- | Generic function for sending a request over a connected socket and collecting the response. | ||||
| -- Serialises the message and tries to deliver its parts for a number of attempts within a specified timeout. | ||||
| sendRequestTo :: Int                    -- ^ timeout in milliseconds | ||||
|  | @ -800,24 +878,24 @@ sendRequestTo timeoutMillis numAttempts msgIncomplete sock = do | |||
| 
 | ||||
| -- | enqueue a list of RemoteCacheEntries to be added to the global NodeCache | ||||
| queueAddEntries :: Foldable c => c RemoteCacheEntry | ||||
|                 -> LocalNodeState s | ||||
|                 -> TQueue (NodeCache -> NodeCache) | ||||
|                 -> IO () | ||||
| queueAddEntries entries ns = do | ||||
| queueAddEntries entries cacheQ = do | ||||
|     now <- getPOSIXTime | ||||
|     forM_ entries $ \entry -> atomically $ writeTQueue (cacheWriteQueue ns)  $ addCacheEntryPure now entry | ||||
|     forM_ entries $ \entry -> atomically $ writeTQueue cacheQ  $ addCacheEntryPure now entry | ||||
| 
 | ||||
| 
 | ||||
| -- | enque a list of node IDs to be deleted from the global NodeCache | ||||
| queueDeleteEntries :: Foldable c | ||||
|                    => c NodeID | ||||
|                    -> LocalNodeState s | ||||
|                    -> TQueue (NodeCache -> NodeCache) | ||||
|                    -> IO () | ||||
| queueDeleteEntries ids ns = forM_ ids $ atomically . writeTQueue (cacheWriteQueue ns) . deleteCacheEntry | ||||
| queueDeleteEntries ids cacheQ = forM_ ids $ atomically . writeTQueue cacheQ . deleteCacheEntry | ||||
| 
 | ||||
| 
 | ||||
| -- | enque a single node ID to be deleted from the global NodeCache | ||||
| queueDeleteEntry :: NodeID | ||||
|                  -> LocalNodeState s | ||||
|                  -> TQueue (NodeCache -> NodeCache) | ||||
|                  -> IO () | ||||
| queueDeleteEntry toDelete = queueDeleteEntries $ Identity toDelete | ||||
| 
 | ||||
|  | @ -826,11 +904,11 @@ queueDeleteEntry toDelete = queueDeleteEntries $ Identity toDelete | |||
| -- global 'NodeCache'. | ||||
| queueUpdateVerifieds :: Foldable c | ||||
|                      => c NodeID | ||||
|                      -> LocalNodeState s | ||||
|                      -> TQueue (NodeCache -> NodeCache) | ||||
|                      -> IO () | ||||
| queueUpdateVerifieds nIds ns = do | ||||
| queueUpdateVerifieds nIds cacheQ = do | ||||
|     now <- getPOSIXTime | ||||
|     forM_ nIds $ \nid' -> atomically $ writeTQueue (cacheWriteQueue ns) $ | ||||
|     forM_ nIds $ \nid' -> atomically $ writeTQueue cacheQ $ | ||||
|         markCacheEntryAsVerified (Just now) nid' | ||||
| 
 | ||||
| -- | retry an IO action at most *i* times until it delivers a result | ||||
|  |  | |||
										
											
												File diff suppressed because it is too large
												Load diff
											
										
									
								
							|  | @ -7,8 +7,8 @@ | |||
| {-# LANGUAGE OverloadedStrings          #-} | ||||
| {-# LANGUAGE RankNTypes                 #-} | ||||
| 
 | ||||
| module Hash2Pub.FediChordTypes ( | ||||
|     NodeID -- abstract, but newtype constructors cannot be hidden | ||||
| module Hash2Pub.FediChordTypes | ||||
|   ( NodeID -- abstract, but newtype constructors cannot be hidden | ||||
|   , idBits | ||||
|   , getNodeID | ||||
|   , toNodeID | ||||
|  | @ -18,6 +18,13 @@ module Hash2Pub.FediChordTypes ( | |||
|   , RemoteNodeState (..) | ||||
|   , RealNode (..) | ||||
|   , RealNodeSTM | ||||
|   , VSMap | ||||
|   , LoadStats (..) | ||||
|   , emptyLoadStats | ||||
|   , remainingLoadTarget | ||||
|   , loadSliceSum | ||||
|   , addVserver | ||||
|   , SegmentLoadStats (..) | ||||
|   , setSuccessors | ||||
|   , setPredecessors | ||||
|   , NodeCache | ||||
|  | @ -51,6 +58,7 @@ module Hash2Pub.FediChordTypes ( | |||
|   , localCompare | ||||
|   , genNodeID | ||||
|   , genNodeIDBS | ||||
|   , hasValidNodeId | ||||
|   , genKeyID | ||||
|   , genKeyIDBS | ||||
|   , byteStringToUInteger | ||||
|  | @ -60,12 +68,14 @@ module Hash2Pub.FediChordTypes ( | |||
|   , DHT(..) | ||||
|   , Service(..) | ||||
|   , ServiceConf(..) | ||||
|                            ) where | ||||
|   ) where | ||||
| 
 | ||||
| import           Control.Exception | ||||
| import           Data.Foldable                 (foldr') | ||||
| import           Data.Function                 (on) | ||||
| import qualified Data.Hashable                 as Hashable | ||||
| import           Data.HashMap.Strict           (HashMap) | ||||
| import qualified Data.HashMap.Strict           as HMap | ||||
| import           Data.List                     (delete, nub, sortBy) | ||||
| import qualified Data.Map.Strict               as Map | ||||
| import           Data.Maybe                    (fromJust, fromMaybe, isJust, | ||||
|  | @ -148,17 +158,27 @@ a `localCompare` b | |||
| -- Also contains shared data and config values. | ||||
| -- TODO: more data structures for k-choices bookkeeping | ||||
| data RealNode s = RealNode | ||||
|     { vservers       :: [LocalNodeStateSTM s] | ||||
|     -- ^ references to all active versers | ||||
|     , nodeConfig     :: FediChordConf | ||||
|     { vservers              :: VSMap s | ||||
|     -- ^ map of all active VServer node IDs to their node state | ||||
|     , nodeConfig            :: FediChordConf | ||||
|     -- ^ holds the initial configuration read at program start | ||||
|     , bootstrapNodes :: [(String, PortNumber)] | ||||
|     , bootstrapNodes        :: [(String, PortNumber)] | ||||
|     -- ^ nodes to be used as bootstrapping points, new ones learned during operation | ||||
|     , lookupCacheSTM :: TVar LookupCache | ||||
|     , lookupCacheSTM        :: TVar LookupCache | ||||
|     -- ^ a global cache of looked up keys and their associated nodes | ||||
|     , nodeService    :: s (RealNodeSTM s) | ||||
|     , globalNodeCacheSTM    :: TVar NodeCache | ||||
|     -- ^ EpiChord node cache with expiry times for nodes. | ||||
|     , globalCacheWriteQueue :: TQueue (NodeCache -> NodeCache) | ||||
|     -- ^ cache updates are not written directly to the  'globalNodeCacheSTM' | ||||
|     , nodeService           :: s (RealNodeSTM s) | ||||
|     } | ||||
| 
 | ||||
| -- | insert a new vserver mapping into a node | ||||
| addVserver :: (NodeID, LocalNodeStateSTM s) -> RealNode s -> RealNode s | ||||
| addVserver (key, nstate) node = node | ||||
|     { vservers = addRMapEntry key nstate (vservers node) } | ||||
| 
 | ||||
| type VSMap s = RingMap NodeID (LocalNodeStateSTM s) | ||||
| type RealNodeSTM s = TVar (RealNode s) | ||||
| 
 | ||||
| -- | represents a node and all its important state | ||||
|  | @ -172,7 +192,7 @@ data RemoteNodeState = RemoteNodeState | |||
|     -- ^ port of the DHT itself | ||||
|     , servicePort :: PortNumber | ||||
|     -- ^ port of the service provided on top of the DHT | ||||
|     , vServerID   :: Integer | ||||
|     , vServerID   :: Word8 | ||||
|     -- ^ ID of this vserver | ||||
|     } | ||||
|     deriving (Show, Eq) | ||||
|  | @ -185,9 +205,9 @@ data LocalNodeState s = LocalNodeState | |||
|     { nodeState           :: RemoteNodeState | ||||
|     -- ^ represents common data present both in remote and local node representations | ||||
|     , nodeCacheSTM        :: TVar NodeCache | ||||
|     -- ^ EpiChord node cache with expiry times for nodes | ||||
|     -- ^ reference to the 'globalNodeCacheSTM' | ||||
|     , cacheWriteQueue     :: TQueue (NodeCache -> NodeCache) | ||||
|     -- ^ cache updates are not written directly to the  'nodeCache' but queued and | ||||
|     -- ^ reference to the 'globalCacheWriteQueue | ||||
|     , successors          :: [RemoteNodeState] -- could be a set instead as these are ordered as well | ||||
|     -- ^ successor nodes in ascending order by distance | ||||
|     , predecessors        :: [RemoteNodeState] | ||||
|  | @ -217,14 +237,14 @@ class NodeState a where | |||
|     getIpAddr :: a -> HostAddress6 | ||||
|     getDhtPort :: a -> PortNumber | ||||
|     getServicePort :: a -> PortNumber | ||||
|     getVServerID :: a -> Integer | ||||
|     getVServerID :: a -> Word8 | ||||
|     -- setters for common properties | ||||
|     setNid :: NodeID -> a -> a | ||||
|     setDomain :: String -> a -> a | ||||
|     setIpAddr :: HostAddress6 -> a -> a | ||||
|     setDhtPort :: PortNumber -> a -> a | ||||
|     setServicePort :: PortNumber -> a -> a | ||||
|     setVServerID :: Integer -> a -> a | ||||
|     setVServerID :: Word8 -> a -> a | ||||
|     toRemoteNodeState :: a -> RemoteNodeState | ||||
| 
 | ||||
| instance NodeState RemoteNodeState where | ||||
|  | @ -373,6 +393,11 @@ genNodeID :: HostAddress6       -- ^a node's IPv6 address | |||
|             -> NodeID           -- ^the generated @NodeID@ | ||||
| genNodeID ip nodeDomain vs = NodeID . byteStringToUInteger $ genNodeIDBS ip nodeDomain vs | ||||
| 
 | ||||
| 
 | ||||
| hasValidNodeId :: Word8 -> RemoteNodeState -> HostAddress6 -> Bool | ||||
| hasValidNodeId numVs rns addr = getVServerID rns < numVs && getNid rns == genNodeID addr (getDomain rns) (getVServerID rns) | ||||
| 
 | ||||
| 
 | ||||
| -- | generates a 256 bit long key identifier, represented as ByteString, for looking up its data on the DHT | ||||
| genKeyIDBS :: String            -- ^the key string | ||||
|            -> BS.ByteString     -- ^the key ID represented as a @ByteString@ | ||||
|  | @ -427,9 +452,70 @@ data FediChordConf = FediChordConf | |||
|     -- ^ how long to wait until response has arrived, in milliseconds | ||||
|     , confRequestRetries            :: Int | ||||
|     -- ^ how often re-sending a timed-out request can be retried | ||||
|     , confEnableKChoices            :: Bool | ||||
|     -- ^ whether to enable k-choices load balancing | ||||
|     , confKChoicesOverload          :: Double | ||||
|     -- ^ fraction of capacity above which a node considers itself overloaded | ||||
|     , confKChoicesUnderload         :: Double | ||||
|     -- ^ fraction of capacity below which a node considers itself underloaded | ||||
|     , confKChoicesMaxVS             :: Word8 | ||||
|     -- ^ upper limit of vserver index κ | ||||
|     , confKChoicesRebalanceInterval :: Int | ||||
|     -- ^ interval between vserver rebalance attempts | ||||
|     } | ||||
|     deriving (Show, Eq) | ||||
| 
 | ||||
| -- ====== k-choices load balancing types ====== | ||||
| 
 | ||||
| data LoadStats = LoadStats | ||||
|     { loadPerTag         :: RingMap NodeID Double | ||||
|     -- ^ map of loads for each handled tag | ||||
|     , totalCapacity      :: Double | ||||
|     -- ^ total designated capacity of the service | ||||
|     , compensatedLoadSum :: Double | ||||
|     -- ^ effective load reevant for load balancing after compensating for | ||||
|     } | ||||
|     deriving (Show, Eq) | ||||
| 
 | ||||
| -- | calculates the mismatch from the target load by taking into account the | ||||
| -- underload and overload limits | ||||
| remainingLoadTarget :: FediChordConf -> LoadStats -> Double | ||||
| remainingLoadTarget conf lstats = targetLoad - compensatedLoadSum lstats | ||||
|     where | ||||
|         targetLoad = totalCapacity lstats * (confKChoicesUnderload conf + confKChoicesOverload conf) / 2 | ||||
| 
 | ||||
| 
 | ||||
| -- | calculates the sum of tag load in a contiguous slice between to keys | ||||
| loadSliceSum :: LoadStats | ||||
|              -> NodeID  -- ^ lower segment bound | ||||
|              -> NodeID  -- ^ upper segment bound | ||||
|              -> Double  -- ^ sum of all tag loads within that segment | ||||
| loadSliceSum stats from to = sum . takeRMapSuccessorsFromTo from to $ loadPerTag stats | ||||
| 
 | ||||
| 
 | ||||
| data SegmentLoadStats = SegmentLoadStats | ||||
|     { segmentLowerKeyBound            :: NodeID | ||||
|     -- ^ segment start key | ||||
|     , segmentUpperKeyBound            :: NodeID | ||||
|     -- ^ segment end key | ||||
|     , segmentLoad                     :: Double | ||||
|     -- ^ sum of load of all keys in the segment | ||||
|     , segmentOwnerRemainingLoadTarget :: Double | ||||
|     -- ^ remaining load target of the current segment handler: | ||||
|     , segmentOwnerCapacity            :: Double | ||||
|     -- ^ total capacity of the current segment handler node, used for normalisation | ||||
|     , segmentCurrentOwner             :: RemoteNodeState | ||||
|     -- ^ the current owner of the segment that needs to be joined on | ||||
|     } | ||||
| 
 | ||||
| -- TODO: figure out a better way of initialising | ||||
| emptyLoadStats :: LoadStats | ||||
| emptyLoadStats = LoadStats | ||||
|     { loadPerTag = emptyRMap | ||||
|     , totalCapacity = 0 | ||||
|     , compensatedLoadSum = 0 | ||||
|     } | ||||
| 
 | ||||
| -- ====== Service Types ============ | ||||
| 
 | ||||
| class Service s d where | ||||
|  | @ -445,6 +531,7 @@ class Service s d where | |||
|                 -> IO (Either String ())    -- ^ success or failure | ||||
|     -- | Wait for an incoming migration from a given node to succeed, may block forever | ||||
|     waitForMigrationFrom :: s d -> NodeID -> IO () | ||||
|     getServiceLoadStats :: s d -> IO LoadStats | ||||
| 
 | ||||
| instance Hashable.Hashable NodeID where | ||||
|     hashWithSalt salt = Hashable.hashWithSalt salt . getNodeID | ||||
|  |  | |||
|  | @ -22,7 +22,7 @@ import qualified Data.DList                as D | |||
| import           Data.Either               (lefts, rights) | ||||
| import qualified Data.HashMap.Strict       as HMap | ||||
| import qualified Data.HashSet              as HSet | ||||
| import           Data.Maybe                (fromJust, isJust) | ||||
| import           Data.Maybe                (fromJust, fromMaybe, isJust) | ||||
| import           Data.String               (fromString) | ||||
| import           Data.Text.Lazy            (Text) | ||||
| import qualified Data.Text.Lazy            as Txt | ||||
|  | @ -64,8 +64,10 @@ data PostService d = PostService | |||
|     , migrationsInProgress :: TVar (HMap.HashMap NodeID (MVar ())) | ||||
|     , httpMan              :: HTTP.Manager | ||||
|     , statsQueue           :: TQueue StatsEvent | ||||
|     , loadStats            :: TVar RelayStats | ||||
|     -- ^ current load stats, replaced periodically | ||||
|     , relayStats           :: TVar RelayStats | ||||
|     -- ^ current relay stats, replaced periodically | ||||
|     , loadStats            :: TVar LoadStats | ||||
|     -- ^ current load values of the relay, replaced periodically and used by | ||||
|     , logFileHandle        :: Handle | ||||
|     } | ||||
|     deriving (Typeable) | ||||
|  | @ -96,7 +98,8 @@ instance DHT d => Service PostService d where | |||
|         migrationsInProgress' <- newTVarIO HMap.empty | ||||
|         httpMan' <- HTTP.newManager HTTP.defaultManagerSettings | ||||
|         statsQueue' <- newTQueueIO | ||||
|         loadStats' <- newTVarIO emptyStats | ||||
|         relayStats' <- newTVarIO emptyStats | ||||
|         loadStats' <- newTVarIO emptyLoadStats | ||||
|         loggingFile <- openFile (confLogfilePath conf) WriteMode | ||||
|         hSetBuffering loggingFile LineBuffering | ||||
|         let | ||||
|  | @ -112,6 +115,7 @@ instance DHT d => Service PostService d where | |||
|               , migrationsInProgress = migrationsInProgress' | ||||
|               , httpMan = httpMan' | ||||
|               , statsQueue = statsQueue' | ||||
|               , relayStats = relayStats' | ||||
|               , loadStats = loadStats' | ||||
|               , logFileHandle = loggingFile | ||||
|               } | ||||
|  | @ -153,6 +157,12 @@ instance DHT d => Service PostService d where | |||
|         -- block until migration finished | ||||
|         takeMVar migrationSynchroniser | ||||
| 
 | ||||
|     getServiceLoadStats = getLoadStats | ||||
| 
 | ||||
| 
 | ||||
| getLoadStats :: PostService d -> IO LoadStats | ||||
| getLoadStats serv = readTVarIO $ loadStats serv | ||||
| 
 | ||||
| 
 | ||||
| -- | return a WAI application | ||||
| postServiceApplication :: DHT d => PostService d -> Application | ||||
|  | @ -835,7 +845,12 @@ evaluateStatsThread serv statsAcc = getPOSIXTime >>= loop | |||
|           -- persistently store in a TVar so it can be retrieved later by the DHT | ||||
|           let timePassed = (now - previousTs) * fromIntegral (confSpeedupFactor $ serviceConf serv) | ||||
|               rateStats = evaluateStats timePassed summedStats | ||||
|           atomically $ writeTVar (loadStats serv) rateStats | ||||
|           currentSubscribers <- readTVarIO $ subscribers serv | ||||
|           -- translate the rate statistics to load values | ||||
|           loads <- evaluateLoadStats rateStats currentSubscribers | ||||
|           atomically $ | ||||
|               writeTVar (relayStats serv) rateStats | ||||
|               >> writeTVar (loadStats serv) loads | ||||
|           -- and now what? write a log to file | ||||
|           -- format: total relayReceiveRates;total relayDeliveryRates;postFetchRate;postPublishRate; subscriberSum | ||||
|           -- later: current (reported) load, target load | ||||
|  | @ -859,6 +874,33 @@ evaluateStatsThread serv statsAcc = getPOSIXTime >>= loop | |||
|                 0 tagMap | ||||
| 
 | ||||
| 
 | ||||
| -- | calculate load values from rate statistics | ||||
| evaluateLoadStats :: RelayStats -> RelayTags -> IO LoadStats | ||||
| evaluateLoadStats currentStats currentSubscribers = do | ||||
|     -- load caused by each tag: incomingPostRate * ( 1 + subscribers) | ||||
|     -- calculate remaining load target: post publish rate * 2.5 - sum loadPerTag - postFetchRate | ||||
|     let | ||||
|         totalCapacity' = 2.5 * postPublishRate currentStats | ||||
|     (loadSum, loadPerTag') <- foldM (\(loadSum, loadPerTag') (key, (subscriberMapSTM, _, _)) -> do | ||||
|         numSubscribers <- HMap.size <$> readTVarIO subscriberMapSTM | ||||
|         let | ||||
|             thisTagRate = fromMaybe 0 $ rMapLookup key (relayReceiveRates currentStats) | ||||
|             thisTagLoad = thisTagRate * (1 + fromIntegral numSubscribers) | ||||
|         pure (loadSum + thisTagLoad, addRMapEntry key thisTagLoad loadPerTag') | ||||
|         ) | ||||
|         (0, emptyRMap) | ||||
|         $ rMapToListWithKeys currentSubscribers | ||||
|     let remainingLoadTarget' = totalCapacity' - loadSum - postFetchRate currentStats | ||||
|     pure LoadStats | ||||
|         { loadPerTag = loadPerTag' | ||||
|         , totalCapacity = totalCapacity' | ||||
|         -- load caused by post fetches cannot be influenced by re-balancing nodes, | ||||
|         -- but still reduces the totally available capacity | ||||
|         , compensatedLoadSum = loadSum + postFetchRate currentStats | ||||
|         } | ||||
| 
 | ||||
| 
 | ||||
| 
 | ||||
| -- | Evaluate the accumulated statistic events: Currently mostly calculates the event | ||||
| -- rates by dividing through the collection time frame | ||||
| evaluateStats :: POSIXTime -> RelayStats -> RelayStats | ||||
|  |  | |||
|  | @ -16,10 +16,12 @@ data Action = QueryID | |||
|     | Leave | ||||
|     | Stabilise | ||||
|     | Ping | ||||
|     | QueryLoad | ||||
|     deriving (Show, Eq, Enum) | ||||
| 
 | ||||
| data FediChordMessage = Request | ||||
|     { requestID   :: Integer | ||||
|     , receiverID  :: NodeID | ||||
|     , sender      :: RemoteNodeState | ||||
|     , part        :: Integer | ||||
|     , isFinalPart :: Bool | ||||
|  | @ -57,6 +59,10 @@ data ActionPayload = QueryIDRequestPayload | |||
|     } | ||||
|     | StabiliseRequestPayload | ||||
|     | PingRequestPayload | ||||
|     | LoadRequestPayload | ||||
|     { loadSegmentUpperBound :: NodeID | ||||
|     -- ^ upper bound of segment interested in, | ||||
|     } | ||||
|     | QueryIDResponsePayload | ||||
|     { queryResult :: QueryResponse | ||||
|     } | ||||
|  | @ -73,6 +79,12 @@ data ActionPayload = QueryIDRequestPayload | |||
|     | PingResponsePayload | ||||
|     { pingNodeStates :: [RemoteNodeState] | ||||
|     } | ||||
|     | LoadResponsePayload | ||||
|     { loadSum               :: Double | ||||
|     , loadRemainingTarget   :: Double | ||||
|     , loadTotalCapacity     :: Double | ||||
|     , loadSegmentLowerBound :: NodeID | ||||
|     } | ||||
|     deriving (Show, Eq) | ||||
| 
 | ||||
| -- | global limit of parts per message used when (de)serialising messages. | ||||
|  |  | |||
|  | @ -47,6 +47,13 @@ instance (Bounded k, Ord k) => Foldable (RingMap k) where | |||
|           traversingFL acc (ProxyEntry _ Nothing) = acc | ||||
|           traversingFL acc (ProxyEntry _ (Just entry)) = traversingFL acc entry | ||||
| 
 | ||||
| instance (Bounded k, Ord k) => Traversable (RingMap k) where | ||||
|     traverse f = fmap RingMap . traverse traversingF . getRingMap | ||||
|       where | ||||
|           traversingF (KeyEntry entry) = KeyEntry <$> f entry | ||||
|           traversingF (ProxyEntry to Nothing) = pure $ ProxyEntry to Nothing | ||||
|           traversingF (ProxyEntry to (Just entry)) = ProxyEntry to . Just <$> traversingF entry | ||||
| 
 | ||||
| 
 | ||||
| -- | entry of a 'RingMap' that holds a value and can also | ||||
| -- wrap around the lookup direction at the edges of the name space. | ||||
|  | @ -106,6 +113,23 @@ rMapSize rmap = fromIntegral $ Map.size innerMap - oneIfEntry rmap minBound - on | |||
|       | isNothing (rMapLookup nid rmap') = 1 | ||||
|       | otherwise = 0 | ||||
| 
 | ||||
| 
 | ||||
| -- | whether the RingMap is empty (except for empty proxy entries) | ||||
| nullRMap :: (Num k, Bounded k, Ord k) | ||||
|          => RingMap k a | ||||
|          -> Bool | ||||
| -- basic idea: look for a predecessor starting from the upper Map bound, | ||||
| -- Nothing indicates no entry being found | ||||
| nullRMap = isNothing . rMapLookupPred maxBound | ||||
| 
 | ||||
| 
 | ||||
| -- | O(logn( Is the key a member of the RingMap? | ||||
| memberRMap :: (Bounded k, Ord k) | ||||
|            => k | ||||
|            -> RingMap k a | ||||
|            -> Bool | ||||
| memberRMap key = isJust . rMapLookup key | ||||
| 
 | ||||
| -- | a wrapper around lookup functions, making the lookup redirectable by a @ProxyEntry@ | ||||
| -- to simulate a modular ring | ||||
| lookupWrapper :: (Bounded k, Ord k, Num k) | ||||
|  | @ -198,12 +222,28 @@ deleteRMapEntry nid = RingMap . Map.update modifier nid . getRingMap | |||
|     modifier (ProxyEntry idPointer _) = Just (ProxyEntry idPointer Nothing) | ||||
|     modifier KeyEntry {}              = Nothing | ||||
| 
 | ||||
| -- TODO: rename this to elems | ||||
| rMapToList :: (Bounded k, Ord k) => RingMap k a -> [a] | ||||
| rMapToList = mapMaybe extractRingEntry . Map.elems . getRingMap | ||||
| 
 | ||||
| -- TODO: rename this to toList | ||||
| rMapToListWithKeys :: (Bounded k, Ord k) => RingMap k a -> [(k, a)] | ||||
| rMapToListWithKeys = Map.foldrWithKey (\k v acc -> | ||||
|     maybe acc (\val -> (k, val):acc) $ extractRingEntry v | ||||
|                                       ) | ||||
|                                       [] | ||||
|                                       . getRingMap | ||||
| 
 | ||||
| rMapFromList :: (Bounded k, Ord k) => [(k, a)] -> RingMap k a | ||||
| rMapFromList = setRMapEntries | ||||
| 
 | ||||
| 
 | ||||
| -- | this just merges the underlying 'Map.Map' s and does not check whether the | ||||
| -- ProxyEntry pointers are consistent, so better only create unions of | ||||
| -- equal-pointered RingMaps | ||||
| unionRMap :: (Bounded k, Ord k) => RingMap k a -> RingMap k a -> RingMap k a | ||||
| unionRMap a b = RingMap $ Map.union (getRingMap a) (getRingMap b) | ||||
| 
 | ||||
| -- | takes up to i entries from a 'RingMap' by calling a getter function on a | ||||
| -- *startAt* value and after that on the previously returned value. | ||||
| -- Stops once i entries have been taken or an entry has been encountered twice | ||||
|  |  | |||
|  | @ -7,6 +7,7 @@ import           Control.Concurrent.STM.TVar | |||
| import           Control.Exception | ||||
| import           Data.ASN1.Parse             (runParseASN1) | ||||
| import qualified Data.ByteString             as BS | ||||
| import qualified Data.HashMap.Strict         as HMap | ||||
| import qualified Data.Map.Strict             as Map | ||||
| import           Data.Maybe                  (fromJust, isJust) | ||||
| import qualified Data.Set                    as Set | ||||
|  | @ -18,6 +19,7 @@ import           Hash2Pub.ASN1Coding | |||
| import           Hash2Pub.DHTProtocol | ||||
| import           Hash2Pub.FediChord | ||||
| import           Hash2Pub.FediChordTypes | ||||
| import           Hash2Pub.RingMap | ||||
| 
 | ||||
| spec :: Spec | ||||
| spec = do | ||||
|  | @ -221,14 +223,16 @@ spec = do | |||
|                   , exampleNodeState {nid = fromInteger (-5)} | ||||
|                                  ] | ||||
|                                               } | ||||
|             requestTemplate = Request { | ||||
|                 requestID = 2342 | ||||
|               , sender = exampleNodeState | ||||
|               , part = 1 | ||||
|               , isFinalPart = True | ||||
|               , action = undefined | ||||
|               , payload = undefined | ||||
|                                       } | ||||
|             qLoadReqPayload = LoadRequestPayload | ||||
|                 { loadSegmentUpperBound = 1025 | ||||
|                 } | ||||
|             qLoadResPayload = LoadResponsePayload | ||||
|                 { loadSum = 3.141 | ||||
|                 , loadRemainingTarget = -1.337 | ||||
|                 , loadTotalCapacity = 2.21 | ||||
|                 , loadSegmentLowerBound = 12 | ||||
|                 } | ||||
| 
 | ||||
|             responseTemplate = Response { | ||||
|                 requestID = 2342 | ||||
|               , senderID = nid exampleNodeState | ||||
|  | @ -237,7 +241,7 @@ spec = do | |||
|               , action = undefined | ||||
|               , payload = undefined | ||||
|                                       } | ||||
|             requestWith a pa = requestTemplate {action = a, payload = Just pa} | ||||
|             requestWith senderNode a pa = mkRequest senderNode 4545 a (Just pa) 2342 | ||||
|             responseWith a pa = responseTemplate {action = a, payload = Just pa} | ||||
| 
 | ||||
|             encodeDecodeAndCheck msg = runParseASN1 parseMessage (encodeMessage msg) `shouldBe` pure msg | ||||
|  | @ -248,17 +252,20 @@ spec = do | |||
|                                                                        } | ||||
| 
 | ||||
|         it "messages are encoded and decoded correctly from and to ASN1" $ do | ||||
|             encodeDecodeAndCheck $ requestWith QueryID qidReqPayload | ||||
|             encodeDecodeAndCheck $ requestWith Join jReqPayload | ||||
|             encodeDecodeAndCheck $ requestWith Leave lReqPayload | ||||
|             encodeDecodeAndCheck $ requestWith Stabilise stabReqPayload | ||||
|             encodeDecodeAndCheck $ requestWith Ping pingReqPayload | ||||
|             localNS <- exampleLocalNode | ||||
|             encodeDecodeAndCheck $ requestWith localNS QueryID qidReqPayload | ||||
|             encodeDecodeAndCheck $ requestWith localNS Join jReqPayload | ||||
|             encodeDecodeAndCheck $ requestWith localNS Leave lReqPayload | ||||
|             encodeDecodeAndCheck $ requestWith localNS Stabilise stabReqPayload | ||||
|             encodeDecodeAndCheck $ requestWith localNS Ping pingReqPayload | ||||
|             encodeDecodeAndCheck $ requestWith localNS QueryLoad qLoadReqPayload | ||||
|             encodeDecodeAndCheck $ responseWith QueryID qidResPayload1 | ||||
|             encodeDecodeAndCheck $ responseWith QueryID qidResPayload2 | ||||
|             encodeDecodeAndCheck $ responseWith Join jResPayload | ||||
|             encodeDecodeAndCheck $ responseWith Leave lResPayload | ||||
|             encodeDecodeAndCheck $ responseWith Stabilise stabResPayload | ||||
|             encodeDecodeAndCheck $ responseWith Ping pingResPayload | ||||
|             encodeDecodeAndCheck $ responseWith QueryLoad qLoadResPayload | ||||
|         it "messages are encoded and decoded to ASN.1 DER properly" $ | ||||
|             deserialiseMessage (fromJust $ Map.lookup 1 (serialiseMessage 652  $ responseWith Ping pingResPayload)) `shouldBe` Right (responseWith Ping pingResPayload) | ||||
|         it "messages too large for a single packet can (often) be split into multiple parts" $ do | ||||
|  | @ -297,13 +304,13 @@ exampleNodeState = RemoteNodeState { | |||
| 
 | ||||
| exampleLocalNode :: IO (LocalNodeState MockService) | ||||
| exampleLocalNode = do | ||||
|     realNode <- newTVarIO $ RealNode { | ||||
|             vservers = [] | ||||
|     realNodeSTM <- newTVarIO $ RealNode { | ||||
|             vservers = emptyRMap | ||||
|           , nodeConfig = exampleFediConf | ||||
|           , bootstrapNodes = confBootstrapNodes exampleFediConf | ||||
|           , nodeService = MockService | ||||
|                                                         } | ||||
|     nodeStateInit realNode | ||||
|     nodeStateInit realNodeSTM 0 | ||||
| 
 | ||||
| 
 | ||||
| exampleFediConf :: FediChordConf | ||||
|  |  | |||
		Loading…
	
	Add table
		Add a link
		
	
		Reference in a new issue