Compare commits
	
		
			38 commits
		
	
	
		
	
	| Author | SHA1 | Date | |
|---|---|---|---|
| e79ba52e00 | |||
| 4aa4667a1d | |||
| 6aebd982f8 | |||
| 048a6ce391 | |||
| 8bd4e04bcd | |||
| 0cb4b6815c | |||
| b111515178 | |||
| ecb127e6af | |||
| 5ed8a28fde | |||
| bb0fb0919a | |||
| b2b4fe3dd8 | |||
| c208aeceaa | |||
| 0ee8f0dc43 | |||
| 21ecf9b041 | |||
| 9a61c186e3 | |||
| 578cc362b9 | |||
| 1a0de55b8c | |||
| 7a87d86c32 | |||
| 3b6d129bfc | |||
| 62da66aade | |||
| 13c5b385b1 | |||
| 1ed0281417 | |||
| 499c90e63a | |||
| 1a7afed062 | |||
| 8e8ea41dc4 | |||
| 33ae904d17 | |||
| 68de73d919 | |||
| 0ab6ee9c8f | |||
| 12dfc56a73 | |||
| 9bf7365a2c | |||
| 5e745cd035 | |||
| 30bf0529ed | |||
| 576ea2c3f6 | |||
| 7dd7e96cce | |||
| a1cfbbac48 | |||
| af27cded19 | |||
| 41aaa8ff70 | |||
| ddea599022 | 
					 13 changed files with 1099 additions and 424 deletions
				
			
		|  | @ -6,20 +6,22 @@ Domain ::= VisibleString | ||||||
| 
 | 
 | ||||||
| Partnum ::= INTEGER (0..150) | Partnum ::= INTEGER (0..150) | ||||||
| 
 | 
 | ||||||
| Action ::= ENUMERATED {queryID, join, leave, stabilise, ping} | Action ::= ENUMERATED {queryID, join, leave, stabilise, ping, queryLoad} | ||||||
| 
 | 
 | ||||||
| Request ::= SEQUENCE { | Request ::= SEQUENCE { | ||||||
| 	action			Action, | 	action			Action, | ||||||
| 	requestID		INTEGER (0..4294967295),	-- arbitrarily restricting to an unsigned 32bit integer | 	requestID		INTEGER (0..4294967295),	-- arbitrarily restricting to an unsigned 32bit integer | ||||||
|  | 	receiverID		NodeID, | ||||||
| 	sender			NodeState, | 	sender			NodeState, | ||||||
| 	part			Partnum,	-- part number of this message, starts at 1 | 	part			Partnum,	-- part number of this message, starts at 1 | ||||||
| 	finalPart		BOOLEAN,	-- flag indicating this `part` to be the last of this reuest | 	finalPart		BOOLEAN,	-- flag indicating this `part` to be the last of this reuest | ||||||
| 	actionPayload	CHOICE { | 	actionPayload	CHOICE { | ||||||
| 		queryIDRequestPayload		QueryIDRequestPayload, | 		queryIDRequestPayload		QueryIDRequestPayload, | ||||||
| 		joinRequestPayload			JoinRequestPayload, | 		joinRequestPayload			JoinRequestPayload, | ||||||
| 		leaveRequestPayload		LeaveRequestPayload, | 		leaveRequestPayload			LeaveRequestPayload, | ||||||
| 		stabiliseRequestPayload	StabiliseRequestPayload, | 		stabiliseRequestPayload		StabiliseRequestPayload, | ||||||
| 		pingRequestPayload			PingRequestPayload | 		pingRequestPayload			PingRequestPayload, | ||||||
|  | 		loadRequestPayload			LoadRequestPayload | ||||||
| 		}			OPTIONAL			-- just for symmetry reasons with response, requests without a payload have no meaning | 		}			OPTIONAL			-- just for symmetry reasons with response, requests without a payload have no meaning | ||||||
| 	} | 	} | ||||||
| 
 | 
 | ||||||
|  | @ -34,11 +36,12 @@ Response ::= SEQUENCE { | ||||||
| 	finalPart		BOOLEAN,	-- flag indicating this `part` to be the last of this response | 	finalPart		BOOLEAN,	-- flag indicating this `part` to be the last of this response | ||||||
| 	action			Action, | 	action			Action, | ||||||
| 	actionPayload	CHOICE { | 	actionPayload	CHOICE { | ||||||
| 		queryIDResponsePayload	QueryIDResponsePayload, | 		queryIDResponsePayload		QueryIDResponsePayload, | ||||||
| 		joinResponsePayload		JoinResponsePayload, | 		joinResponsePayload			JoinResponsePayload, | ||||||
| 		leaveResponsePayload		LeaveResponsePayload, | 		leaveResponsePayload		LeaveResponsePayload, | ||||||
| 		stabiliseResponsePayload	StabiliseResponsePayload, | 		stabiliseResponsePayload	StabiliseResponsePayload, | ||||||
| 		pingResponsePayload		PingResponsePayload | 		pingResponsePayload			PingResponsePayload, | ||||||
|  | 		loadResponsePayload			LoadResponsePayload | ||||||
| 		}			OPTIONAL			-- no payload when just ACKing a previous request | 		}			OPTIONAL			-- no payload when just ACKing a previous request | ||||||
| 	} | 	} | ||||||
| 
 | 
 | ||||||
|  | @ -101,5 +104,15 @@ PingRequestPayload ::= NULL		-- do not include a node/ vserver ID, so that | ||||||
| -- learning all active vserver IDs handled by the server at once | -- learning all active vserver IDs handled by the server at once | ||||||
| PingResponsePayload ::= SEQUENCE OF NodeState | PingResponsePayload ::= SEQUENCE OF NodeState | ||||||
| 
 | 
 | ||||||
|  | LoadRequestPayload ::= SEQUENCE { | ||||||
|  | 	upperSegmentBound		NodeID | ||||||
|  | 	} | ||||||
|  | 
 | ||||||
|  | LoadResponsePayload ::= SEQUENCE { | ||||||
|  | 	loadSum					REAL, | ||||||
|  | 	remainingLoadTarget		REAL, | ||||||
|  | 	totalCapacity			REAL, | ||||||
|  | 	lowerBound				NodeID | ||||||
|  | 	} | ||||||
| 
 | 
 | ||||||
| END | END | ||||||
|  |  | ||||||
|  | @ -91,7 +91,7 @@ executable Hash2Pub | ||||||
|   -- Base language which the package is written in. |   -- Base language which the package is written in. | ||||||
|   default-language:    Haskell2010 |   default-language:    Haskell2010 | ||||||
| 
 | 
 | ||||||
|   ghc-options:         -threaded -rtsopts -with-rtsopts=-N |   ghc-options:         -threaded | ||||||
| 
 | 
 | ||||||
| executable Experiment | executable Experiment | ||||||
|   -- experiment runner |   -- experiment runner | ||||||
|  |  | ||||||
|  | @ -1,7 +1,7 @@ | ||||||
| # Hash2Pub | # Hash2Pub | ||||||
| 
 | 
 | ||||||
| ***This is heavily WIP and does not provide any useful functionality yet***.   | ***This is heavily WIP and does not provide any useful functionality yet***.   | ||||||
| I aim for always having the `mainline` branch in a state where it builds and tests pass. | I aim for always having the master branch at a state where it builds and tests pass. | ||||||
| 
 | 
 | ||||||
| A fully-decentralised relay for global hashtag federation in [ActivityPub](https://activitypub.rocks) based on a distributed hash table. | A fully-decentralised relay for global hashtag federation in [ActivityPub](https://activitypub.rocks) based on a distributed hash table. | ||||||
| It allows querying and subscribing to all posts of a certain hashtag and is implemented in Haskell. | It allows querying and subscribing to all posts of a certain hashtag and is implemented in Haskell. | ||||||
|  | @ -10,8 +10,6 @@ This is the practical implementation of the concept presented in the paper [Dece | ||||||
| 
 | 
 | ||||||
| The ASN.1 module schema used for DHT messages can be found in `FediChord.asn1`. | The ASN.1 module schema used for DHT messages can be found in `FediChord.asn1`. | ||||||
| 
 | 
 | ||||||
| For further questions and discussins, please refer to the **Hash2Pub topic in [SocialHub](https://socialhub.activitypub.rocks/c/software/hash2pub/48)**. |  | ||||||
| 
 |  | ||||||
| ## Building | ## Building | ||||||
| 
 | 
 | ||||||
| The project and its developent environment are built with [Nix](https://nixos.org/nix/). | The project and its developent environment are built with [Nix](https://nixos.org/nix/). | ||||||
|  |  | ||||||
|  | @ -40,7 +40,7 @@ executeSchedule :: Int  -- ^ speedup factor | ||||||
|                 -> IO () |                 -> IO () | ||||||
| executeSchedule speedup events = do | executeSchedule speedup events = do | ||||||
|     -- initialise HTTP manager |     -- initialise HTTP manager | ||||||
|     httpMan <- HTTP.newManager $ HTTP.defaultManagerSettings { HTTP.managerResponseTimeout = HTTP.responseTimeoutMicro 60000000 } |     httpMan <- HTTP.newManager HTTP.defaultManagerSettings | ||||||
|     forM_ events $ \(delay, tag, (pubHost, pubPort)) -> do |     forM_ events $ \(delay, tag, (pubHost, pubPort)) -> do | ||||||
|         _ <- forkIO $ |         _ <- forkIO $ | ||||||
|             clientPublishPost httpMan pubHost pubPort ("foobar #" <> tag) |             clientPublishPost httpMan pubHost pubPort ("foobar #" <> tag) | ||||||
|  |  | ||||||
							
								
								
									
										35
									
								
								app/Main.hs
									
										
									
									
									
								
							
							
						
						
									
										35
									
								
								app/Main.hs
									
										
									
									
									
								
							|  | @ -18,38 +18,20 @@ main = do | ||||||
|     -- ToDo: parse and pass config |     -- ToDo: parse and pass config | ||||||
|     -- probably use `tomland` for that |     -- probably use `tomland` for that | ||||||
|     (fConf, sConf) <- readConfig |     (fConf, sConf) <- readConfig | ||||||
|     -- TODO: first initialise 'RealNode', then the vservers |  | ||||||
|     -- ToDo: load persisted caches, bootstrapping nodes … |     -- ToDo: load persisted caches, bootstrapping nodes … | ||||||
|     (serverSock, thisNode) <- fediChordInit fConf (runService sConf :: DHT d => d -> IO (PostService d)) |     (fediThreads, thisNode) <- fediChordInit fConf (runService sConf :: DHT d => d -> IO (PostService d)) | ||||||
|     -- currently no masking is necessary, as there is nothing to clean up |     -- wait for all DHT threads to terminate, this keeps the main thread running | ||||||
|     nodeCacheWriterThread <- forkIO $ nodeCacheWriter thisNode |     wait fediThreads | ||||||
|     -- try joining the DHT using one of the provided bootstrapping nodes |  | ||||||
|     joinedState <- tryBootstrapJoining thisNode |  | ||||||
|     either (\err -> do |  | ||||||
|         -- handle unsuccessful join |  | ||||||
| 
 |  | ||||||
|         putStrLn $ err <> " Error joining, start listening for incoming requests anyways" |  | ||||||
|         print =<< readTVarIO thisNode |  | ||||||
|         -- launch thread attempting to join on new cache entries |  | ||||||
|         _ <- forkIO $ joinOnNewEntriesThread thisNode |  | ||||||
|         wait =<< async (fediMainThreads serverSock thisNode) |  | ||||||
|            ) |  | ||||||
|            (\joinedNS -> do |  | ||||||
|         -- launch main eventloop with successfully joined state |  | ||||||
|         putStrLn "successful join" |  | ||||||
|         wait =<< async (fediMainThreads serverSock thisNode) |  | ||||||
|            ) |  | ||||||
|         joinedState |  | ||||||
|     pure () |  | ||||||
| 
 | 
 | ||||||
| 
 | 
 | ||||||
| readConfig :: IO (FediChordConf, ServiceConf) | readConfig :: IO (FediChordConf, ServiceConf) | ||||||
| readConfig = do | readConfig = do | ||||||
|     confDomainString : ipString : portString : servicePortString : speedupString : remainingArgs <- getArgs |     confDomainString : ipString : portString : servicePortString : speedupString : loadBalancingEnabled : remainingArgs <- getArgs | ||||||
|     -- allow starting the initial node without bootstrapping info to avoid |     -- allow starting the initial node without bootstrapping info to avoid | ||||||
|     -- waiting for timeout |     -- waiting for timeout | ||||||
|     let |     let | ||||||
|         speedup = read speedupString |         speedup = read speedupString | ||||||
|  |         statsEvalD = 120 * 10^6 `div` speedup | ||||||
|         confBootstrapNodes' = case remainingArgs of |         confBootstrapNodes' = case remainingArgs of | ||||||
|             bootstrapHost : bootstrapPortString : _ -> |             bootstrapHost : bootstrapPortString : _ -> | ||||||
|                 [(bootstrapHost, read bootstrapPortString)] |                 [(bootstrapHost, read bootstrapPortString)] | ||||||
|  | @ -67,6 +49,11 @@ readConfig = do | ||||||
|           , confResponsePurgeAge = 60 / fromIntegral speedup |           , confResponsePurgeAge = 60 / fromIntegral speedup | ||||||
|           , confRequestTimeout = 5 * 10^6 `div` speedup |           , confRequestTimeout = 5 * 10^6 `div` speedup | ||||||
|           , confRequestRetries = 3 |           , confRequestRetries = 3 | ||||||
|  |           , confEnableKChoices = loadBalancingEnabled /= "off" | ||||||
|  |           , confKChoicesOverload = 0.9 | ||||||
|  |           , confKChoicesUnderload = 0.1 | ||||||
|  |           , confKChoicesMaxVS = 8 | ||||||
|  |           , confKChoicesRebalanceInterval = round  (realToFrac statsEvalD * 1.1 :: Double) | ||||||
|           } |           } | ||||||
|         sConf = ServiceConf |         sConf = ServiceConf | ||||||
|           { confSubscriptionExpiryTime = 24*3600 / fromIntegral speedup |           { confSubscriptionExpiryTime = 24*3600 / fromIntegral speedup | ||||||
|  | @ -74,7 +61,7 @@ readConfig = do | ||||||
|           , confServiceHost = confDomainString |           , confServiceHost = confDomainString | ||||||
|           , confLogfilePath = "../simulationData/logs/" <> confDomainString <> ".log" |           , confLogfilePath = "../simulationData/logs/" <> confDomainString <> ".log" | ||||||
|           , confSpeedupFactor = speedup |           , confSpeedupFactor = speedup | ||||||
|           , confStatsEvalDelay = 120 * 10^6 `div` speedup |           , confStatsEvalDelay = statsEvalD | ||||||
|           } |           } | ||||||
|     pure (fConf, sConf) |     pure (fConf, sConf) | ||||||
| 
 | 
 | ||||||
|  |  | ||||||
|  | @ -184,6 +184,19 @@ encodePayload payload'@PingResponsePayload{} = | ||||||
|     Start Sequence |     Start Sequence | ||||||
|   : concatMap encodeNodeState (pingNodeStates payload') |   : concatMap encodeNodeState (pingNodeStates payload') | ||||||
|   <> [End Sequence] |   <> [End Sequence] | ||||||
|  | encodePayload payload'@LoadRequestPayload{} = | ||||||
|  |   [ Start Sequence | ||||||
|  |   , IntVal . getNodeID $ loadSegmentUpperBound payload' | ||||||
|  |   , End Sequence | ||||||
|  |   ] | ||||||
|  | encodePayload payload'@LoadResponsePayload{} = | ||||||
|  |   [ Start Sequence | ||||||
|  |   , Real $ loadSum payload' | ||||||
|  |   , Real $ loadRemainingTarget payload' | ||||||
|  |   , Real $ loadTotalCapacity payload' | ||||||
|  |   , IntVal . getNodeID $ loadSegmentLowerBound payload' | ||||||
|  |   , End Sequence | ||||||
|  |   ] | ||||||
| 
 | 
 | ||||||
| encodeNodeState :: NodeState a => a -> [ASN1] | encodeNodeState :: NodeState a => a -> [ASN1] | ||||||
| encodeNodeState ns = [ | encodeNodeState ns = [ | ||||||
|  | @ -193,7 +206,7 @@ encodeNodeState ns = [ | ||||||
|   , OctetString (ipAddrAsBS $ getIpAddr ns) |   , OctetString (ipAddrAsBS $ getIpAddr ns) | ||||||
|   , IntVal (toInteger . getDhtPort $ ns) |   , IntVal (toInteger . getDhtPort $ ns) | ||||||
|   , IntVal (toInteger . getServicePort $ ns) |   , IntVal (toInteger . getServicePort $ ns) | ||||||
|   , IntVal (getVServerID ns) |   , IntVal (toInteger $ getVServerID ns) | ||||||
|   , End Sequence |   , End Sequence | ||||||
|                      ] |                      ] | ||||||
| 
 | 
 | ||||||
|  | @ -215,10 +228,11 @@ encodeQueryResult FORWARD{} = Enumerated 1 | ||||||
| encodeMessage :: FediChordMessage   -- ^ the 'FediChordMessage to be encoded | encodeMessage :: FediChordMessage   -- ^ the 'FediChordMessage to be encoded | ||||||
|               -> [ASN1] |               -> [ASN1] | ||||||
| encodeMessage | encodeMessage | ||||||
|     (Request requestID sender part isFinalPart action requestPayload) = |     (Request requestID receiverID sender part isFinalPart action requestPayload) = | ||||||
|         Start Sequence |         Start Sequence | ||||||
|       : (Enumerated . fromIntegral . fromEnum $ action) |       : (Enumerated . fromIntegral . fromEnum $ action) | ||||||
|       : IntVal requestID |       : IntVal requestID | ||||||
|  |       : (IntVal . getNodeID $ receiverID) | ||||||
|       : encodeNodeState sender |       : encodeNodeState sender | ||||||
|       <> [IntVal part |       <> [IntVal part | ||||||
|         , Boolean isFinalPart] |         , Boolean isFinalPart] | ||||||
|  | @ -262,18 +276,20 @@ parseMessage = do | ||||||
| parseRequest :: Action -> ParseASN1 FediChordMessage | parseRequest :: Action -> ParseASN1 FediChordMessage | ||||||
| parseRequest action = do | parseRequest action = do | ||||||
|     requestID <- parseInteger |     requestID <- parseInteger | ||||||
|  |     receiverID' <- fromInteger <$> parseInteger | ||||||
|     sender <- parseNodeState |     sender <- parseNodeState | ||||||
|     part <- parseInteger |     part <- parseInteger | ||||||
|     isFinalPart <- parseBool |     isFinalPart <- parseBool | ||||||
|     hasPayload <- hasNext |     hasPayload <- hasNext | ||||||
|     payload <- if not hasPayload then pure Nothing else Just <$> case action of |     payload <- if not hasPayload then pure Nothing else Just <$> case action of | ||||||
|                  QueryID   -> parseQueryIDRequest |                  QueryID   -> parseQueryIDRequestPayload | ||||||
|                  Join      -> parseJoinRequest |                  Join      -> parseJoinRequestPayload | ||||||
|                  Leave     -> parseLeaveRequest |                  Leave     -> parseLeaveRequestPayload | ||||||
|                  Stabilise -> parseStabiliseRequest |                  Stabilise -> parseStabiliseRequestPayload | ||||||
|                  Ping      -> parsePingRequest |                  Ping      -> parsePingRequestPayload | ||||||
|  |                  QueryLoad -> parseLoadRequestPayload | ||||||
| 
 | 
 | ||||||
|     pure $ Request requestID sender part isFinalPart action payload |     pure $ Request requestID receiverID' sender part isFinalPart action payload | ||||||
| 
 | 
 | ||||||
| parseResponse :: Integer -> ParseASN1 FediChordMessage | parseResponse :: Integer -> ParseASN1 FediChordMessage | ||||||
| parseResponse requestID = do | parseResponse requestID = do | ||||||
|  | @ -283,11 +299,12 @@ parseResponse requestID = do | ||||||
|     action <- parseEnum :: ParseASN1 Action |     action <- parseEnum :: ParseASN1 Action | ||||||
|     hasPayload <- hasNext |     hasPayload <- hasNext | ||||||
|     payload <- if not hasPayload then pure Nothing else Just <$> case action of |     payload <- if not hasPayload then pure Nothing else Just <$> case action of | ||||||
|                  QueryID   -> parseQueryIDResponse |                  QueryID   -> parseQueryIDResponsePayload | ||||||
|                  Join      -> parseJoinResponse |                  Join      -> parseJoinResponsePayload | ||||||
|                  Leave     -> parseLeaveResponse |                  Leave     -> parseLeaveResponsePayload | ||||||
|                  Stabilise -> parseStabiliseResponse |                  Stabilise -> parseStabiliseResponsePayload | ||||||
|                  Ping      -> parsePingResponse |                  Ping      -> parsePingResponsePayload | ||||||
|  |                  QueryLoad -> parseLoadResponsePayload | ||||||
| 
 | 
 | ||||||
|     pure $ Response requestID senderID part isFinalPart action payload |     pure $ Response requestID senderID part isFinalPart action payload | ||||||
| 
 | 
 | ||||||
|  | @ -305,6 +322,13 @@ parseInteger = do | ||||||
|         IntVal parsed -> pure parsed |         IntVal parsed -> pure parsed | ||||||
|         x -> throwParseError $ "Expected IntVal but got " <> show x |         x -> throwParseError $ "Expected IntVal but got " <> show x | ||||||
| 
 | 
 | ||||||
|  | parseReal :: ParseASN1 Double | ||||||
|  | parseReal = do | ||||||
|  |     i <- getNext | ||||||
|  |     case i of | ||||||
|  |       Real parsed -> pure parsed | ||||||
|  |       x           -> throwParseError $ "Expected Real but got " <> show x | ||||||
|  | 
 | ||||||
| parseEnum :: Enum a => ParseASN1 a | parseEnum :: Enum a => ParseASN1 a | ||||||
| parseEnum = do | parseEnum = do | ||||||
|     e <- getNext |     e <- getNext | ||||||
|  | @ -346,7 +370,7 @@ parseNodeState = onNextContainer Sequence $ do | ||||||
|       , domain = domain' |       , domain = domain' | ||||||
|       , dhtPort = dhtPort' |       , dhtPort = dhtPort' | ||||||
|       , servicePort = servicePort' |       , servicePort = servicePort' | ||||||
|       , vServerID = vServer' |       , vServerID = fromInteger vServer' | ||||||
|       , ipAddr = ip' |       , ipAddr = ip' | ||||||
|                      } |                      } | ||||||
| 
 | 
 | ||||||
|  | @ -360,13 +384,13 @@ parseCacheEntry = onNextContainer Sequence $ do | ||||||
| parseNodeCache :: ParseASN1 [RemoteCacheEntry] | parseNodeCache :: ParseASN1 [RemoteCacheEntry] | ||||||
| parseNodeCache = onNextContainer Sequence $ getMany parseCacheEntry | parseNodeCache = onNextContainer Sequence $ getMany parseCacheEntry | ||||||
| 
 | 
 | ||||||
| parseJoinRequest :: ParseASN1 ActionPayload | parseJoinRequestPayload :: ParseASN1 ActionPayload | ||||||
| parseJoinRequest = do | parseJoinRequestPayload = do | ||||||
|     parseNull |     parseNull | ||||||
|     pure JoinRequestPayload |     pure JoinRequestPayload | ||||||
| 
 | 
 | ||||||
| parseJoinResponse :: ParseASN1 ActionPayload | parseJoinResponsePayload :: ParseASN1 ActionPayload | ||||||
| parseJoinResponse = onNextContainer Sequence $ do | parseJoinResponsePayload = onNextContainer Sequence $ do | ||||||
|     succ' <- onNextContainer Sequence (getMany parseNodeState) |     succ' <- onNextContainer Sequence (getMany parseNodeState) | ||||||
|     pred' <- onNextContainer Sequence (getMany parseNodeState) |     pred' <- onNextContainer Sequence (getMany parseNodeState) | ||||||
|     cache <- parseNodeCache |     cache <- parseNodeCache | ||||||
|  | @ -376,8 +400,8 @@ parseJoinResponse = onNextContainer Sequence $ do | ||||||
|       , joinCache = cache |       , joinCache = cache | ||||||
|                                  } |                                  } | ||||||
| 
 | 
 | ||||||
| parseQueryIDRequest :: ParseASN1 ActionPayload | parseQueryIDRequestPayload :: ParseASN1 ActionPayload | ||||||
| parseQueryIDRequest = onNextContainer Sequence $ do | parseQueryIDRequestPayload = onNextContainer Sequence $ do | ||||||
|     targetID <- fromInteger <$> parseInteger |     targetID <- fromInteger <$> parseInteger | ||||||
|     lBestNodes <- parseInteger |     lBestNodes <- parseInteger | ||||||
|     pure $ QueryIDRequestPayload { |     pure $ QueryIDRequestPayload { | ||||||
|  | @ -385,8 +409,8 @@ parseQueryIDRequest = onNextContainer Sequence $ do | ||||||
|       , queryLBestNodes = lBestNodes |       , queryLBestNodes = lBestNodes | ||||||
|                                    } |                                    } | ||||||
| 
 | 
 | ||||||
| parseQueryIDResponse :: ParseASN1 ActionPayload | parseQueryIDResponsePayload :: ParseASN1 ActionPayload | ||||||
| parseQueryIDResponse = onNextContainer Sequence $ do | parseQueryIDResponsePayload = onNextContainer Sequence $ do | ||||||
|     Enumerated resultType <- getNext |     Enumerated resultType <- getNext | ||||||
|     result <- case resultType of |     result <- case resultType of | ||||||
|                   0 -> FOUND <$> parseNodeState |                   0 -> FOUND <$> parseNodeState | ||||||
|  | @ -396,13 +420,13 @@ parseQueryIDResponse = onNextContainer Sequence $ do | ||||||
|         queryResult = result |         queryResult = result | ||||||
|                            } |                            } | ||||||
| 
 | 
 | ||||||
| parseStabiliseRequest :: ParseASN1 ActionPayload | parseStabiliseRequestPayload :: ParseASN1 ActionPayload | ||||||
| parseStabiliseRequest = do | parseStabiliseRequestPayload = do | ||||||
|     parseNull |     parseNull | ||||||
|     pure StabiliseRequestPayload |     pure StabiliseRequestPayload | ||||||
| 
 | 
 | ||||||
| parseStabiliseResponse :: ParseASN1 ActionPayload | parseStabiliseResponsePayload :: ParseASN1 ActionPayload | ||||||
| parseStabiliseResponse = onNextContainer Sequence $ do | parseStabiliseResponsePayload = onNextContainer Sequence $ do | ||||||
|     succ' <- onNextContainer Sequence (getMany parseNodeState) |     succ' <- onNextContainer Sequence (getMany parseNodeState) | ||||||
|     pred' <- onNextContainer Sequence (getMany parseNodeState) |     pred' <- onNextContainer Sequence (getMany parseNodeState) | ||||||
|     pure $ StabiliseResponsePayload { |     pure $ StabiliseResponsePayload { | ||||||
|  | @ -410,8 +434,8 @@ parseStabiliseResponse = onNextContainer Sequence $ do | ||||||
|       , stabilisePredecessors = pred' |       , stabilisePredecessors = pred' | ||||||
|                                       } |                                       } | ||||||
| 
 | 
 | ||||||
| parseLeaveRequest :: ParseASN1 ActionPayload | parseLeaveRequestPayload :: ParseASN1 ActionPayload | ||||||
| parseLeaveRequest = onNextContainer Sequence $ do | parseLeaveRequestPayload = onNextContainer Sequence $ do | ||||||
|     succ' <- onNextContainer Sequence (getMany parseNodeState) |     succ' <- onNextContainer Sequence (getMany parseNodeState) | ||||||
|     pred' <- onNextContainer Sequence (getMany parseNodeState) |     pred' <- onNextContainer Sequence (getMany parseNodeState) | ||||||
|     doMigration <- parseBool |     doMigration <- parseBool | ||||||
|  | @ -421,19 +445,40 @@ parseLeaveRequest = onNextContainer Sequence $ do | ||||||
|       , leaveDoMigration = doMigration |       , leaveDoMigration = doMigration | ||||||
|                                       } |                                       } | ||||||
| 
 | 
 | ||||||
| parseLeaveResponse :: ParseASN1 ActionPayload | parseLeaveResponsePayload :: ParseASN1 ActionPayload | ||||||
| parseLeaveResponse = do | parseLeaveResponsePayload = do | ||||||
|     parseNull |     parseNull | ||||||
|     pure LeaveResponsePayload |     pure LeaveResponsePayload | ||||||
| 
 | 
 | ||||||
| parsePingRequest :: ParseASN1 ActionPayload | parsePingRequestPayload :: ParseASN1 ActionPayload | ||||||
| parsePingRequest = do | parsePingRequestPayload = do | ||||||
|     parseNull |     parseNull | ||||||
|     pure PingRequestPayload |     pure PingRequestPayload | ||||||
| 
 | 
 | ||||||
| parsePingResponse :: ParseASN1 ActionPayload | parsePingResponsePayload :: ParseASN1 ActionPayload | ||||||
| parsePingResponse = onNextContainer Sequence $ do | parsePingResponsePayload = onNextContainer Sequence $ do | ||||||
|     handledNodes <- getMany parseNodeState |     handledNodes <- getMany parseNodeState | ||||||
|     pure $ PingResponsePayload { |     pure $ PingResponsePayload { | ||||||
|         pingNodeStates = handledNodes |         pingNodeStates = handledNodes | ||||||
|                                  } |                                  } | ||||||
|  | 
 | ||||||
|  | parseLoadRequestPayload :: ParseASN1 ActionPayload | ||||||
|  | parseLoadRequestPayload = onNextContainer Sequence $ do | ||||||
|  |     loadUpperBound' <- fromInteger <$> parseInteger | ||||||
|  |     pure LoadRequestPayload | ||||||
|  |         { loadSegmentUpperBound = loadUpperBound' | ||||||
|  |         } | ||||||
|  | 
 | ||||||
|  | parseLoadResponsePayload :: ParseASN1 ActionPayload | ||||||
|  | parseLoadResponsePayload = onNextContainer Sequence $ do | ||||||
|  |     loadSum' <- parseReal | ||||||
|  |     loadRemainingTarget' <- parseReal | ||||||
|  |     loadTotalCapacity' <- parseReal | ||||||
|  |     loadSegmentLowerBound' <- fromInteger <$> parseInteger | ||||||
|  |     pure LoadResponsePayload | ||||||
|  |         { loadSum = loadSum' | ||||||
|  |         , loadRemainingTarget = loadRemainingTarget' | ||||||
|  |         , loadTotalCapacity = loadTotalCapacity' | ||||||
|  |         , loadSegmentLowerBound = loadSegmentLowerBound' | ||||||
|  |         } | ||||||
|  | 
 | ||||||
|  |  | ||||||
|  | @ -15,6 +15,7 @@ module Hash2Pub.DHTProtocol | ||||||
|     , Action(..) |     , Action(..) | ||||||
|     , ActionPayload(..) |     , ActionPayload(..) | ||||||
|     , FediChordMessage(..) |     , FediChordMessage(..) | ||||||
|  |     , mkRequest | ||||||
|     , maximumParts |     , maximumParts | ||||||
|     , sendQueryIdMessages |     , sendQueryIdMessages | ||||||
|     , requestQueryID |     , requestQueryID | ||||||
|  | @ -22,6 +23,7 @@ module Hash2Pub.DHTProtocol | ||||||
|     , requestLeave |     , requestLeave | ||||||
|     , requestPing |     , requestPing | ||||||
|     , requestStabilise |     , requestStabilise | ||||||
|  |     , requestQueryLoad | ||||||
|     , lookupMessage |     , lookupMessage | ||||||
|     , sendRequestTo |     , sendRequestTo | ||||||
|     , queryIdLookupLoop |     , queryIdLookupLoop | ||||||
|  | @ -36,7 +38,7 @@ module Hash2Pub.DHTProtocol | ||||||
|     , isPossibleSuccessor |     , isPossibleSuccessor | ||||||
|     , isPossiblePredecessor |     , isPossiblePredecessor | ||||||
|     , isInOwnResponsibilitySlice |     , isInOwnResponsibilitySlice | ||||||
|     , isJoined |     , vsIsJoined | ||||||
|     , closestCachePredecessors |     , closestCachePredecessors | ||||||
|     ) |     ) | ||||||
|         where |         where | ||||||
|  | @ -49,7 +51,8 @@ import           Control.Concurrent.STM.TQueue | ||||||
| import           Control.Concurrent.STM.TVar | import           Control.Concurrent.STM.TVar | ||||||
| import           Control.Exception | import           Control.Exception | ||||||
| import           Control.Monad                  (foldM, forM, forM_, void, when) | import           Control.Monad                  (foldM, forM, forM_, void, when) | ||||||
| import           Control.Monad.Except           (MonadError (..), runExceptT) | import           Control.Monad.Except           (MonadError (..), liftEither, | ||||||
|  |                                                  runExceptT) | ||||||
| import           Control.Monad.IO.Class         (MonadIO (..)) | import           Control.Monad.IO.Class         (MonadIO (..)) | ||||||
| import qualified Data.ByteString                as BS | import qualified Data.ByteString                as BS | ||||||
| import           Data.Either                    (rights) | import           Data.Either                    (rights) | ||||||
|  | @ -63,6 +66,7 @@ import           Data.Maybe                     (fromJust, fromMaybe, isJust, | ||||||
|                                                  isNothing, mapMaybe, maybe) |                                                  isNothing, mapMaybe, maybe) | ||||||
| import qualified Data.Set                       as Set | import qualified Data.Set                       as Set | ||||||
| import           Data.Time.Clock.POSIX | import           Data.Time.Clock.POSIX | ||||||
|  | import           Data.Word                      (Word8) | ||||||
| import           Network.Socket                 hiding (recv, recvFrom, send, | import           Network.Socket                 hiding (recv, recvFrom, send, | ||||||
|                                                  sendTo) |                                                  sendTo) | ||||||
| import           Network.Socket.ByteString | import           Network.Socket.ByteString | ||||||
|  | @ -74,23 +78,27 @@ import           Hash2Pub.ASN1Coding | ||||||
| import           Hash2Pub.FediChordTypes        (CacheEntry (..), | import           Hash2Pub.FediChordTypes        (CacheEntry (..), | ||||||
|                                                  CacheEntry (..), |                                                  CacheEntry (..), | ||||||
|                                                  FediChordConf (..), |                                                  FediChordConf (..), | ||||||
|                                                  HasKeyID (..), |                                                  HasKeyID (..), LoadStats (..), | ||||||
|                                                  LocalNodeState (..), |                                                  LocalNodeState (..), | ||||||
|                                                  LocalNodeStateSTM, NodeCache, |                                                  LocalNodeStateSTM, NodeCache, | ||||||
|                                                  NodeID, NodeState (..), |                                                  NodeID, NodeState (..), | ||||||
|                                                  RealNode (..), RealNodeSTM, |                                                  RealNode (..), RealNodeSTM, | ||||||
|                                                  RemoteNodeState (..), |                                                  RemoteNodeState (..), | ||||||
|                                                  RingEntry (..), RingMap (..), |                                                  RingEntry (..), RingMap (..), | ||||||
|  |                                                  SegmentLoadStats (..), | ||||||
|                                                  Service (..), addRMapEntry, |                                                  Service (..), addRMapEntry, | ||||||
|                                                  addRMapEntryWith, |                                                  addRMapEntryWith, | ||||||
|                                                  cacheGetNodeStateUnvalidated, |                                                  cacheGetNodeStateUnvalidated, | ||||||
|                                                  cacheLookup, cacheLookupPred, |                                                  cacheLookup, cacheLookupPred, | ||||||
|                                                  cacheLookupSucc, genNodeID, |                                                  cacheLookupSucc, genNodeID, | ||||||
|                                                  getKeyID, localCompare, |                                                  getKeyID, hasValidNodeId, | ||||||
|  |                                                  loadSliceSum, localCompare, | ||||||
|                                                  rMapFromList, rMapLookupPred, |                                                  rMapFromList, rMapLookupPred, | ||||||
|                                                  rMapLookupSucc, |                                                  rMapLookupSucc, | ||||||
|  |                                                  remainingLoadTarget, | ||||||
|                                                  setPredecessors, setSuccessors) |                                                  setPredecessors, setSuccessors) | ||||||
| import           Hash2Pub.ProtocolTypes | import           Hash2Pub.ProtocolTypes | ||||||
|  | import           Hash2Pub.RingMap | ||||||
| 
 | 
 | ||||||
| import           Debug.Trace                    (trace) | import           Debug.Trace                    (trace) | ||||||
| 
 | 
 | ||||||
|  | @ -103,7 +111,7 @@ queryLocalCache ownState nCache lBestNodes targetID | ||||||
|     -- as target ID falls between own ID and first predecessor, it is handled by this node |     -- as target ID falls between own ID and first predecessor, it is handled by this node | ||||||
|     -- This only makes sense if the node is part of the DHT by having joined. |     -- This only makes sense if the node is part of the DHT by having joined. | ||||||
|     -- A default answer to nodes querying an unjoined node is provided by 'respondQueryID'. |     -- A default answer to nodes querying an unjoined node is provided by 'respondQueryID'. | ||||||
|       | isJoined ownState && targetID `isInOwnResponsibilitySlice` ownState = FOUND . toRemoteNodeState $ ownState |       | vsIsJoined ownState && targetID `isInOwnResponsibilitySlice` ownState = FOUND . toRemoteNodeState $ ownState | ||||||
|     -- my interpretation: the "l best next hops" are the l-1 closest preceding nodes and |     -- my interpretation: the "l best next hops" are the l-1 closest preceding nodes and | ||||||
|     -- the closest succeeding node (like with the p initiated parallel queries |     -- the closest succeeding node (like with the p initiated parallel queries | ||||||
|       | otherwise = FORWARD $ closestSuccessor `Set.union` closestCachePredecessors (lBestNodes-1) targetID nCache |       | otherwise = FORWARD $ closestSuccessor `Set.union` closestCachePredecessors (lBestNodes-1) targetID nCache | ||||||
|  | @ -227,8 +235,8 @@ markCacheEntryAsVerified timestamp nid = RingMap . Map.adjust adjustFunc nid . g | ||||||
| 
 | 
 | ||||||
| -- | uses the successor and predecessor list of a node as an indicator for whether a | -- | uses the successor and predecessor list of a node as an indicator for whether a | ||||||
| -- node has properly joined the DHT | -- node has properly joined the DHT | ||||||
| isJoined :: LocalNodeState s -> Bool | vsIsJoined :: LocalNodeState s -> Bool | ||||||
| isJoined ns = not . all null $ [successors ns, predecessors ns] | vsIsJoined ns = not . all null $ [successors ns, predecessors ns] | ||||||
| 
 | 
 | ||||||
| -- | the size limit to be used when serialising messages for sending | -- | the size limit to be used when serialising messages for sending | ||||||
| sendMessageSize :: Num i => i | sendMessageSize :: Num i => i | ||||||
|  | @ -237,27 +245,37 @@ sendMessageSize = 1200 | ||||||
| -- ====== message send and receive operations ====== | -- ====== message send and receive operations ====== | ||||||
| 
 | 
 | ||||||
| -- encode the response to a request that just signals successful receipt | -- encode the response to a request that just signals successful receipt | ||||||
| ackRequest :: NodeID -> FediChordMessage -> Map.Map Integer BS.ByteString | ackRequest :: FediChordMessage -> Map.Map Integer BS.ByteString | ||||||
| ackRequest ownID req@Request{} = serialiseMessage sendMessageSize $ Response { | ackRequest req@Request{} = serialiseMessage sendMessageSize $ Response { | ||||||
|     requestID = requestID req |     requestID = requestID req | ||||||
|   , senderID = ownID |   , senderID = receiverID req | ||||||
|   , part = part req |   , part = part req | ||||||
|   , isFinalPart = False |   , isFinalPart = False | ||||||
|   , action = action req |   , action = action req | ||||||
|   , payload = Nothing |   , payload = Nothing | ||||||
|                                                                              } |                                                                              } | ||||||
| ackRequest _ _ = Map.empty | ackRequest _ = Map.empty | ||||||
|  | 
 | ||||||
|  | 
 | ||||||
|  | -- | extract the first payload from a received message set | ||||||
|  | extractFirstPayload :: Set.Set FediChordMessage -> Maybe ActionPayload | ||||||
|  | extractFirstPayload msgSet = foldr' (\msg plAcc -> | ||||||
|  |     if isNothing plAcc && isJust (payload msg) | ||||||
|  |        then payload msg | ||||||
|  |        else plAcc | ||||||
|  |                                  ) Nothing msgSet | ||||||
| 
 | 
 | ||||||
| 
 | 
 | ||||||
| -- | Dispatch incoming requests to the dedicated handling and response function, and enqueue | -- | Dispatch incoming requests to the dedicated handling and response function, and enqueue | ||||||
| -- the response to be sent. | -- the response to be sent. | ||||||
| handleIncomingRequest :: Service s (RealNodeSTM s) | handleIncomingRequest :: Service s (RealNodeSTM s) | ||||||
|                       => LocalNodeStateSTM s                     -- ^ the handling node |                       => Word8                              -- ^ maximum number of vservers, because of decision to @dropSpoofedIDs@ in here and not already in @fediMessageHandler@ | ||||||
|  |                       -> LocalNodeStateSTM s                     -- ^ the handling node | ||||||
|                       -> TQueue (BS.ByteString, SockAddr)   -- ^ send queue |                       -> TQueue (BS.ByteString, SockAddr)   -- ^ send queue | ||||||
|                       -> Set.Set FediChordMessage           -- ^ all parts of the request to handle |                       -> Set.Set FediChordMessage           -- ^ all parts of the request to handle | ||||||
|                       -> SockAddr                           -- ^ source address of the request |                       -> SockAddr                           -- ^ source address of the request | ||||||
|                       -> IO () |                       -> IO () | ||||||
| handleIncomingRequest nsSTM sendQ msgSet sourceAddr = do | handleIncomingRequest vsLimit nsSTM sendQ msgSet sourceAddr = do | ||||||
|     ns <- readTVarIO nsSTM |     ns <- readTVarIO nsSTM | ||||||
|     -- add nodestate to cache |     -- add nodestate to cache | ||||||
|     now <- getPOSIXTime |     now <- getPOSIXTime | ||||||
|  | @ -265,19 +283,20 @@ handleIncomingRequest nsSTM sendQ msgSet sourceAddr = do | ||||||
|       Nothing -> pure () |       Nothing -> pure () | ||||||
|       Just aPart -> do |       Just aPart -> do | ||||||
|         let (SockAddrInet6 _ _ sourceIP _) = sourceAddr |         let (SockAddrInet6 _ _ sourceIP _) = sourceAddr | ||||||
|         queueAddEntries (Identity $ RemoteCacheEntry (sender aPart) now) ns |         queueAddEntries (Identity $ RemoteCacheEntry (sender aPart) now) (cacheWriteQueue ns) | ||||||
|         -- distinguish on whether and how to respond. If responding, pass message to response generating function and write responses to send queue |         -- distinguish on whether and how to respond. If responding, pass message to response generating function and write responses to send queue | ||||||
|         maybe (pure ()) ( |         maybe (pure ()) ( | ||||||
|             mapM_ (\resp -> atomically $ writeTQueue sendQ (resp, sourceAddr)) |             mapM_ (\resp -> atomically $ writeTQueue sendQ (resp, sourceAddr)) | ||||||
|                         ) |                         ) | ||||||
|             =<< (case action aPart of |             =<< (case action aPart of | ||||||
|                 Ping -> Just <$> respondPing nsSTM msgSet |                 Ping -> Just <$> respondPing nsSTM msgSet | ||||||
|                 Join -> dropSpoofedIDs sourceIP nsSTM msgSet respondJoin |                 Join -> dropSpoofedIDs vsLimit sourceIP nsSTM msgSet respondJoin | ||||||
|                 -- ToDo: figure out what happens if not joined |                 -- ToDo: figure out what happens if not joined | ||||||
|                 QueryID -> Just <$> respondQueryID nsSTM msgSet |                 QueryID -> Just <$> respondQueryID nsSTM msgSet | ||||||
|                 -- only when joined |                 -- only when joined | ||||||
|                 Leave -> if isJoined ns then dropSpoofedIDs sourceIP nsSTM msgSet respondLeave else pure Nothing |                 Leave -> if vsIsJoined ns then dropSpoofedIDs vsLimit sourceIP nsSTM msgSet respondLeave else pure Nothing | ||||||
|                 Stabilise -> if isJoined ns then dropSpoofedIDs sourceIP nsSTM msgSet respondStabilise else pure Nothing |                 Stabilise -> if vsIsJoined ns then dropSpoofedIDs vsLimit sourceIP nsSTM msgSet respondStabilise else pure Nothing | ||||||
|  |                 QueryLoad -> if vsIsJoined ns then Just <$> respondQueryLoad nsSTM msgSet else pure Nothing | ||||||
|             ) |             ) | ||||||
|       -- for single part request, response starts with part number 1. For multipart requests, response starts with part number n+1. |       -- for single part request, response starts with part number 1. For multipart requests, response starts with part number n+1. | ||||||
| 
 | 
 | ||||||
|  | @ -287,19 +306,18 @@ handleIncomingRequest nsSTM sendQ msgSet sourceAddr = do | ||||||
|       -- | Filter out requests with spoofed node IDs by recomputing the ID using |       -- | Filter out requests with spoofed node IDs by recomputing the ID using | ||||||
|       -- the sender IP. |       -- the sender IP. | ||||||
|       -- For valid (non-spoofed) sender IDs, the passed responder function is invoked. |       -- For valid (non-spoofed) sender IDs, the passed responder function is invoked. | ||||||
|       dropSpoofedIDs :: HostAddress6        -- msg source address |       dropSpoofedIDs :: Word8       -- ^ maximum number of vservers per node | ||||||
|  |                      -> HostAddress6        -- ^ msg source address | ||||||
|                      -> LocalNodeStateSTM s |                      -> LocalNodeStateSTM s | ||||||
|                      -> Set.Set FediChordMessage    -- message parts of the request |                      -> Set.Set FediChordMessage    -- ^ message parts of the request | ||||||
|                      -> (LocalNodeStateSTM s -> Set.Set FediChordMessage -> IO (Map.Map Integer BS.ByteString))     -- reponder function to be invoked for valid requests |                      -> (LocalNodeStateSTM s -> Set.Set FediChordMessage -> IO (Map.Map Integer BS.ByteString))     -- ^ reponder function to be invoked for valid requests | ||||||
|                      -> IO (Maybe (Map.Map Integer BS.ByteString)) |                      -> IO (Maybe (Map.Map Integer BS.ByteString)) | ||||||
|       dropSpoofedIDs addr nsSTM' msgSet' responder = |       dropSpoofedIDs limVs addr nsSTM' msgSet' responder = | ||||||
|           let |           let | ||||||
|             aRequestPart = Set.elemAt 0 msgSet |             aRequestPart = Set.elemAt 0 msgSet | ||||||
|             senderNs = sender aRequestPart |             senderNs = sender aRequestPart | ||||||
|             givenSenderID = getNid senderNs |  | ||||||
|             recomputedID = genNodeID addr (getDomain senderNs) (fromInteger $ getVServerID senderNs) |  | ||||||
|           in |           in | ||||||
|           if recomputedID == givenSenderID |           if hasValidNodeId limVs senderNs addr | ||||||
|              then Just <$> responder nsSTM' msgSet' |              then Just <$> responder nsSTM' msgSet' | ||||||
|              else pure Nothing |              else pure Nothing | ||||||
| 
 | 
 | ||||||
|  | @ -317,19 +335,15 @@ respondQueryID nsSTM msgSet = do | ||||||
|     let |     let | ||||||
|         aRequestPart = Set.elemAt 0 msgSet |         aRequestPart = Set.elemAt 0 msgSet | ||||||
|         senderID = getNid . sender $ aRequestPart |         senderID = getNid . sender $ aRequestPart | ||||||
|         senderPayload = foldr' (\msg plAcc -> |         senderPayload = extractFirstPayload msgSet | ||||||
|             if isNothing plAcc && isJust (payload msg) |         -- return only empty message serialisation if no payload was included in parts | ||||||
|                then payload msg |  | ||||||
|                else plAcc |  | ||||||
|                                ) Nothing msgSet |  | ||||||
|     -- return only empty message serialisation if no payload was included in parts |  | ||||||
|     maybe (pure Map.empty) (\senderPayload' -> do |     maybe (pure Map.empty) (\senderPayload' -> do | ||||||
|         responseMsg <- atomically $ do |         responseMsg <- atomically $ do | ||||||
|             nsSnap <- readTVar nsSTM |             nsSnap <- readTVar nsSTM | ||||||
|             cache <- readTVar $ nodeCacheSTM nsSnap |             cache <- readTVar $ nodeCacheSTM nsSnap | ||||||
|             let |             let | ||||||
|                 responsePayload = QueryIDResponsePayload { |                 responsePayload = QueryIDResponsePayload { | ||||||
|                     queryResult = if isJoined nsSnap |                     queryResult = if vsIsJoined nsSnap | ||||||
|                                      then queryLocalCache nsSnap cache (fromIntegral $ queryLBestNodes senderPayload') (queryTargetID senderPayload') |                                      then queryLocalCache nsSnap cache (fromIntegral $ queryLBestNodes senderPayload') (queryTargetID senderPayload') | ||||||
|                                      -- if not joined yet, attract responsibility for |                                      -- if not joined yet, attract responsibility for | ||||||
|                                      -- all keys to make bootstrapping possible |                                      -- all keys to make bootstrapping possible | ||||||
|  | @ -422,6 +436,47 @@ respondPing nsSTM msgSet = do | ||||||
|                                 } |                                 } | ||||||
|     pure $ serialiseMessage sendMessageSize pingResponse |     pure $ serialiseMessage sendMessageSize pingResponse | ||||||
| 
 | 
 | ||||||
|  | respondQueryLoad :: Service s (RealNodeSTM s) => LocalNodeStateSTM s -> Set.Set FediChordMessage -> IO (Map.Map Integer BS.ByteString) | ||||||
|  | respondQueryLoad nsSTM msgSet = do | ||||||
|  |     nsSnap <- readTVarIO nsSTM | ||||||
|  |     -- this message cannot be split reasonably, so just | ||||||
|  |     -- consider the first payload | ||||||
|  |     let | ||||||
|  |         aRequestPart = Set.elemAt 0 msgSet | ||||||
|  |         senderPayload = extractFirstPayload msgSet | ||||||
|  |     responsePl <- maybe (pure Nothing) (\pl -> | ||||||
|  |         case pl of | ||||||
|  |           LoadRequestPayload{} -> do | ||||||
|  |             parentNode <- readTVarIO (parentRealNode nsSnap) | ||||||
|  |             let | ||||||
|  |                 serv = nodeService parentNode | ||||||
|  |                 conf = nodeConfig parentNode | ||||||
|  |             lStats <- getServiceLoadStats serv | ||||||
|  |             let | ||||||
|  |                 directSucc = getNid . head . predecessors $ nsSnap | ||||||
|  |                 handledTagSum = loadSliceSum lStats directSucc (loadSegmentUpperBound pl) | ||||||
|  |             pure $ Just LoadResponsePayload | ||||||
|  |                 { loadSum = handledTagSum | ||||||
|  |                 , loadRemainingTarget = remainingLoadTarget conf lStats | ||||||
|  |                 , loadTotalCapacity = totalCapacity lStats | ||||||
|  |                 , loadSegmentLowerBound = directSucc | ||||||
|  |                 } | ||||||
|  |           _ -> pure Nothing | ||||||
|  |                            ) | ||||||
|  |                            senderPayload | ||||||
|  | 
 | ||||||
|  |     pure $ maybe | ||||||
|  |         Map.empty | ||||||
|  |         (\pl -> serialiseMessage sendMessageSize $ Response | ||||||
|  |             { requestID = requestID aRequestPart | ||||||
|  |             , senderID = getNid nsSnap | ||||||
|  |             , part = if Set.size msgSet == 1 then 1 else fromIntegral $ Set.size msgSet + 1 | ||||||
|  |             , isFinalPart = False | ||||||
|  |             , action = QueryLoad | ||||||
|  |             , payload = Just pl | ||||||
|  |             } | ||||||
|  |         ) responsePl | ||||||
|  | 
 | ||||||
| 
 | 
 | ||||||
| respondJoin :: Service s (RealNodeSTM s) => LocalNodeStateSTM s -> Set.Set FediChordMessage -> IO (Map.Map Integer BS.ByteString) | respondJoin :: Service s (RealNodeSTM s) => LocalNodeStateSTM s -> Set.Set FediChordMessage -> IO (Map.Map Integer BS.ByteString) | ||||||
| respondJoin nsSTM msgSet = do | respondJoin nsSTM msgSet = do | ||||||
|  | @ -434,7 +489,7 @@ respondJoin nsSTM msgSet = do | ||||||
|             senderNS = sender aRequestPart |             senderNS = sender aRequestPart | ||||||
|             -- if not joined yet, attract responsibility for |             -- if not joined yet, attract responsibility for | ||||||
|             -- all keys to make bootstrapping possible |             -- all keys to make bootstrapping possible | ||||||
|             responsibilityLookup = if isJoined nsSnap then queryLocalCache nsSnap cache 1 (getNid senderNS) else FOUND (toRemoteNodeState nsSnap) |             responsibilityLookup = if vsIsJoined nsSnap then queryLocalCache nsSnap cache 1 (getNid senderNS) else FOUND (toRemoteNodeState nsSnap) | ||||||
|             thisNodeResponsible (FOUND _)   = True |             thisNodeResponsible (FOUND _)   = True | ||||||
|             thisNodeResponsible (FORWARD _) = False |             thisNodeResponsible (FORWARD _) = False | ||||||
|         -- check whether the joining node falls into our responsibility |         -- check whether the joining node falls into our responsibility | ||||||
|  | @ -481,6 +536,21 @@ respondJoin nsSTM msgSet = do | ||||||
| 
 | 
 | ||||||
| -- ....... request sending ....... | -- ....... request sending ....... | ||||||
| 
 | 
 | ||||||
|  | -- | defautl constructor for request messages, fills standard values like | ||||||
|  | -- part number to avoid code repition | ||||||
|  | mkRequest :: LocalNodeState s -> NodeID -> Action -> Maybe ActionPayload -> (Integer -> FediChordMessage) | ||||||
|  | mkRequest ns targetID action pl rid = Request | ||||||
|  |     { requestID = rid | ||||||
|  |     , receiverID = targetID | ||||||
|  |     , sender = toRemoteNodeState ns | ||||||
|  |     -- part number and final flag can be changed by ASN1 encoder to make packet | ||||||
|  |     -- fit the MTU restrictions | ||||||
|  |     , part = 1 | ||||||
|  |     , isFinalPart = True | ||||||
|  |     , action = action | ||||||
|  |     , payload = pl | ||||||
|  |     } | ||||||
|  | 
 | ||||||
| -- | send a join request and return the joined 'LocalNodeState' including neighbours | -- | send a join request and return the joined 'LocalNodeState' including neighbours | ||||||
| requestJoin :: (NodeState a, Service s (RealNodeSTM s)) => a             -- ^ currently responsible node to be contacted | requestJoin :: (NodeState a, Service s (RealNodeSTM s)) => a             -- ^ currently responsible node to be contacted | ||||||
|             -> LocalNodeStateSTM s               -- ^ joining NodeState |             -> LocalNodeStateSTM s               -- ^ joining NodeState | ||||||
|  | @ -492,7 +562,7 @@ requestJoin toJoinOn ownStateSTM = do | ||||||
|     let srcAddr = confIP nodeConf |     let srcAddr = confIP nodeConf | ||||||
|     bracket (mkSendSocket srcAddr (getDomain toJoinOn) (getDhtPort toJoinOn)) close (\sock -> do |     bracket (mkSendSocket srcAddr (getDomain toJoinOn) (getDhtPort toJoinOn)) close (\sock -> do | ||||||
|         -- extract own state for getting request information |         -- extract own state for getting request information | ||||||
|         responses <- sendRequestTo (confRequestTimeout nodeConf) (confRequestRetries nodeConf) (\rid -> Request rid (toRemoteNodeState ownState) 1 True Join (Just JoinRequestPayload)) sock |         responses <- sendRequestTo (confRequestTimeout nodeConf) (confRequestRetries nodeConf) (mkRequest ownState (getNid toJoinOn) Join (Just JoinRequestPayload)) sock | ||||||
|         (cacheInsertQ, joinedState) <- atomically $ do |         (cacheInsertQ, joinedState) <- atomically $ do | ||||||
|             stateSnap <- readTVar ownStateSTM |             stateSnap <- readTVar ownStateSTM | ||||||
|             let |             let | ||||||
|  | @ -523,7 +593,7 @@ requestJoin toJoinOn ownStateSTM = do | ||||||
|             writeTVar ownStateSTM newState |             writeTVar ownStateSTM newState | ||||||
|             pure (cacheInsertQ, newState) |             pure (cacheInsertQ, newState) | ||||||
|         -- execute the cache insertions |         -- execute the cache insertions | ||||||
|         mapM_ (\f -> f joinedState) cacheInsertQ |         mapM_ (\f -> f (cacheWriteQueue joinedState)) cacheInsertQ | ||||||
|         if responses == Set.empty |         if responses == Set.empty | ||||||
|                   then pure . Left $ "join error: got no response from " <> show (getNid toJoinOn) |                   then pure . Left $ "join error: got no response from " <> show (getNid toJoinOn) | ||||||
|            else do |            else do | ||||||
|  | @ -581,14 +651,14 @@ sendQueryIdMessages :: (Integral i) | ||||||
|                     -> Maybe i                       -- ^ optionally provide an explicit @l@ parameter of number of nodes to be returned |                     -> Maybe i                       -- ^ optionally provide an explicit @l@ parameter of number of nodes to be returned | ||||||
|                    -> [RemoteNodeState]             -- ^ nodes to query |                    -> [RemoteNodeState]             -- ^ nodes to query | ||||||
|                    -> IO QueryResponse -- ^ accumulated response |                    -> IO QueryResponse -- ^ accumulated response | ||||||
| sendQueryIdMessages targetID ns lParam targets = do | sendQueryIdMessages lookupID ns lParam targets = do | ||||||
| 
 | 
 | ||||||
|           -- create connected sockets to all query targets and use them for request handling |           -- create connected sockets to all query targets and use them for request handling | ||||||
| 
 | 
 | ||||||
|           nodeConf <- nodeConfig <$> readTVarIO (parentRealNode ns) |           nodeConf <- nodeConfig <$> readTVarIO (parentRealNode ns) | ||||||
|           let srcAddr = confIP nodeConf |           let srcAddr = confIP nodeConf | ||||||
|           queryThreads <- mapM (\resultNode -> async $ bracket (mkSendSocket srcAddr (getDomain resultNode) (getDhtPort resultNode)) close ( |           queryThreads <- mapM (\resultNode -> async $ bracket (mkSendSocket srcAddr (getDomain resultNode) (getDhtPort resultNode)) close ( | ||||||
|               sendRequestTo (confRequestTimeout nodeConf) (confRequestRetries nodeConf) (lookupMessage targetID ns Nothing) |               sendRequestTo (confRequestTimeout nodeConf) (confRequestRetries nodeConf) (lookupMessage lookupID ns Nothing (getNid resultNode)) | ||||||
|                                                                                                                                    )) targets |                                                                                                                                    )) targets | ||||||
|           -- ToDo: process results immediately instead of waiting for the last one to finish, see https://stackoverflow.com/a/38815224/9198613 |           -- ToDo: process results immediately instead of waiting for the last one to finish, see https://stackoverflow.com/a/38815224/9198613 | ||||||
|           -- ToDo: exception handling, maybe log them |           -- ToDo: exception handling, maybe log them | ||||||
|  | @ -605,7 +675,7 @@ sendQueryIdMessages targetID ns lParam targets = do | ||||||
|                              _ -> Set.empty |                              _ -> Set.empty | ||||||
| 
 | 
 | ||||||
|             -- forward entries to global cache |             -- forward entries to global cache | ||||||
|             queueAddEntries entrySet ns |             queueAddEntries entrySet (cacheWriteQueue ns) | ||||||
|             -- return accumulated QueryResult |             -- return accumulated QueryResult | ||||||
|             pure $ case acc of |             pure $ case acc of | ||||||
|               -- once a FOUND as been encountered, return this as a result |               -- once a FOUND as been encountered, return this as a result | ||||||
|  | @ -621,13 +691,14 @@ sendQueryIdMessages targetID ns lParam targets = do | ||||||
| 
 | 
 | ||||||
| -- | Create a QueryID message to be supplied to 'sendRequestTo' | -- | Create a QueryID message to be supplied to 'sendRequestTo' | ||||||
| lookupMessage :: Integral i | lookupMessage :: Integral i | ||||||
|               => NodeID         -- ^ target ID |               => NodeID         -- ^ lookup ID to be looked up | ||||||
|               -> LocalNodeState s -- ^ sender node state |               -> LocalNodeState s -- ^ sender node state | ||||||
|               -> Maybe i        -- ^ optionally provide a different l parameter |               -> Maybe i        -- ^ optionally provide a different l parameter | ||||||
|  |               -> NodeID         -- ^ target ID of message destination | ||||||
|               ->  (Integer -> FediChordMessage) |               ->  (Integer -> FediChordMessage) | ||||||
| lookupMessage targetID ns lParam = \rID -> Request rID (toRemoteNodeState ns) 1 True QueryID (Just $ pl ns targetID) | lookupMessage lookupID ns lParam targetID = mkRequest ns targetID QueryID (Just $ pl ns lookupID) | ||||||
|   where |   where | ||||||
|     pl ns' targetID' = QueryIDRequestPayload { queryTargetID = targetID', queryLBestNodes = maybe (fromIntegral $ lNumBestNodes ns) fromIntegral lParam } |       pl ns' lookupID' = QueryIDRequestPayload { queryTargetID = lookupID', queryLBestNodes = maybe (fromIntegral $ lNumBestNodes ns') fromIntegral lParam } | ||||||
| 
 | 
 | ||||||
| 
 | 
 | ||||||
| -- | Send a stabilise request to provided 'RemoteNode' and, if successful, | -- | Send a stabilise request to provided 'RemoteNode' and, if successful, | ||||||
|  | @ -638,16 +709,7 @@ requestStabilise :: LocalNodeState s      -- ^ sending node | ||||||
| requestStabilise ns neighbour = do | requestStabilise ns neighbour = do | ||||||
|     nodeConf <- nodeConfig <$> readTVarIO (parentRealNode ns) |     nodeConf <- nodeConfig <$> readTVarIO (parentRealNode ns) | ||||||
|     let srcAddr = confIP nodeConf |     let srcAddr = confIP nodeConf | ||||||
|     responses <- bracket (mkSendSocket srcAddr (getDomain neighbour) (getDhtPort neighbour)) close (fmap Right . sendRequestTo (confRequestTimeout nodeConf) (confRequestRetries nodeConf) (\rid -> |     responses <- bracket (mkSendSocket srcAddr (getDomain neighbour) (getDhtPort neighbour)) close (fmap Right . sendRequestTo (confRequestTimeout nodeConf) (confRequestRetries nodeConf) (mkRequest ns (getNid neighbour) Stabilise (Just StabiliseRequestPayload)) | ||||||
|         Request { |  | ||||||
|             requestID = rid |  | ||||||
|           , sender = toRemoteNodeState ns |  | ||||||
|           , part = 1 |  | ||||||
|           , isFinalPart = False |  | ||||||
|           , action = Stabilise |  | ||||||
|           , payload = Just StabiliseRequestPayload |  | ||||||
|                 } |  | ||||||
|                          ) |  | ||||||
|                                                                                            ) `catch`  (\e -> pure . Left $ displayException (e :: IOException)) |                                                                                            ) `catch`  (\e -> pure . Left $ displayException (e :: IOException)) | ||||||
|     either |     either | ||||||
|         -- forward IO error messages |         -- forward IO error messages | ||||||
|  | @ -660,7 +722,7 @@ requestStabilise ns neighbour = do | ||||||
|                                                       ) |                                                       ) | ||||||
|                                                       ([],[]) respSet |                                                       ([],[]) respSet | ||||||
|             -- update successfully responded neighbour in cache |             -- update successfully responded neighbour in cache | ||||||
|             maybe (pure ()) (\p -> queueUpdateVerifieds (Identity $ senderID p) ns) $ headMay (Set.elems respSet) |             maybe (pure ()) (\p -> queueUpdateVerifieds (Identity $ senderID p) (cacheWriteQueue ns)) $ headMay (Set.elems respSet) | ||||||
|             pure $ if null responsePreds && null responseSuccs |             pure $ if null responsePreds && null responseSuccs | ||||||
|                       then Left "no neighbours returned" |                       then Left "no neighbours returned" | ||||||
|                       else Right (responsePreds, responseSuccs) |                       else Right (responsePreds, responseSuccs) | ||||||
|  | @ -682,17 +744,12 @@ requestLeave ns doMigration target = do | ||||||
|       , leavePredecessors = predecessors ns |       , leavePredecessors = predecessors ns | ||||||
|       , leaveDoMigration = doMigration |       , leaveDoMigration = doMigration | ||||||
|                                            } |                                            } | ||||||
|     responses <- bracket (mkSendSocket srcAddr (getDomain target) (getDhtPort target)) close (fmap Right . sendRequestTo (confRequestTimeout nodeConf) (confRequestRetries nodeConf) (\rid -> |     responses <- bracket | ||||||
|         Request { |         (mkSendSocket srcAddr (getDomain target) (getDhtPort target)) | ||||||
|             requestID = rid |         close | ||||||
|           , sender = toRemoteNodeState ns |         (fmap Right | ||||||
|           , part = 1 |         . sendRequestTo (confRequestTimeout nodeConf) (confRequestRetries nodeConf) (mkRequest ns (getNid target) Leave (Just leavePayload)) | ||||||
|           , isFinalPart = False |         ) `catch`  (\e -> pure . Left $ displayException (e :: IOException)) | ||||||
|           , action = Leave |  | ||||||
|           , payload = Just leavePayload |  | ||||||
|                 } |  | ||||||
|                          ) |  | ||||||
|                                                                                            ) `catch`  (\e -> pure . Left $ displayException (e :: IOException)) |  | ||||||
|     either |     either | ||||||
|         -- forward IO error messages |         -- forward IO error messages | ||||||
|         (pure . Left) |         (pure . Left) | ||||||
|  | @ -708,16 +765,7 @@ requestPing ns target = do | ||||||
|     let srcAddr = confIP nodeConf |     let srcAddr = confIP nodeConf | ||||||
|     responses <- bracket (mkSendSocket srcAddr (getDomain target) (getDhtPort target)) close |     responses <- bracket (mkSendSocket srcAddr (getDomain target) (getDhtPort target)) close | ||||||
|         (\sock -> do |         (\sock -> do | ||||||
|             resp <- sendRequestTo (confRequestTimeout nodeConf) (confRequestRetries nodeConf) (\rid -> |             resp <- sendRequestTo (confRequestTimeout nodeConf) (confRequestRetries nodeConf) (mkRequest ns (getNid target) Ping (Just PingRequestPayload)) sock | ||||||
|                 Request { |  | ||||||
|                     requestID = rid |  | ||||||
|                   , sender = toRemoteNodeState ns |  | ||||||
|                   , part = 1 |  | ||||||
|                   , isFinalPart = False |  | ||||||
|                   , action = Ping |  | ||||||
|                   , payload = Just PingRequestPayload |  | ||||||
|                         } |  | ||||||
|                              ) sock |  | ||||||
|             (SockAddrInet6 _ _ peerAddr _) <- getPeerName sock |             (SockAddrInet6 _ _ peerAddr _) <- getPeerName sock | ||||||
|             pure $ Right (peerAddr, resp) |             pure $ Right (peerAddr, resp) | ||||||
|                                                                                                ) `catch`  (\e -> pure . Left $ displayException (e :: IOException)) |                                                                                                ) `catch`  (\e -> pure . Left $ displayException (e :: IOException)) | ||||||
|  | @ -733,10 +781,9 @@ requestPing ns target = do | ||||||
|             -- recompute ID for each received node and mark as verified in cache |             -- recompute ID for each received node and mark as verified in cache | ||||||
|             now <- getPOSIXTime |             now <- getPOSIXTime | ||||||
|             forM_ responseVss (\vs -> |             forM_ responseVss (\vs -> | ||||||
|                 let recomputedID = genNodeID peerAddr (getDomain vs) (fromInteger $ getVServerID vs) |                 if hasValidNodeId (confKChoicesMaxVS nodeConf) vs peerAddr | ||||||
|                  in if recomputedID == getNid vs |                    then atomically $ writeTQueue (cacheWriteQueue ns) $ addNodeAsVerifiedPure now vs | ||||||
|                        then atomically $ writeTQueue (cacheWriteQueue ns) $ addNodeAsVerifiedPure now vs |                    else pure () | ||||||
|                        else pure () |  | ||||||
|                               ) |                               ) | ||||||
|             pure $ if null responseVss |             pure $ if null responseVss | ||||||
|                       then Left "no active vServer IDs returned, ignoring node" |                       then Left "no active vServer IDs returned, ignoring node" | ||||||
|  | @ -744,6 +791,37 @@ requestPing ns target = do | ||||||
|         ) responses |         ) responses | ||||||
| 
 | 
 | ||||||
| 
 | 
 | ||||||
|  | -- still need a particular vserver as LocalNodeState, because requests need a sender | ||||||
|  | requestQueryLoad :: (MonadError String m, MonadIO m) | ||||||
|  |                  => LocalNodeState s    -- ^ the local source vserver for the request | ||||||
|  |                  -> NodeID              -- ^ upper bound of the segment queried, lower bound is set automatically by the queried node | ||||||
|  |                  -> RemoteNodeState     -- ^ target node to query | ||||||
|  |                  -> m SegmentLoadStats | ||||||
|  | requestQueryLoad ns upperIdBound target = do | ||||||
|  |     nodeConf <- nodeConfig <$> liftIO (readTVarIO $ parentRealNode ns) | ||||||
|  |     let | ||||||
|  |         srcAddr = confIP nodeConf | ||||||
|  |         loadReqPl = LoadRequestPayload | ||||||
|  |             { loadSegmentUpperBound = upperIdBound | ||||||
|  |             } | ||||||
|  |     responses <- liftIO $ bracket (mkSendSocket srcAddr (getDomain target) (getDhtPort target)) close | ||||||
|  |         (fmap Right . sendRequestTo (confRequestTimeout nodeConf) (confRequestRetries nodeConf) (mkRequest ns (getNid target) QueryLoad (Just loadReqPl)) | ||||||
|  |         ) `catch`  (\e -> pure . Left $ displayException (e :: IOException)) | ||||||
|  |     responseMsgSet <- liftEither responses | ||||||
|  |     -- throws an error if an exception happened | ||||||
|  |     loadResPl <- maybe (throwError "no load response payload found") pure | ||||||
|  |         (extractFirstPayload responseMsgSet) | ||||||
|  |     pure SegmentLoadStats | ||||||
|  |         { segmentLowerKeyBound = loadSegmentLowerBound loadResPl | ||||||
|  |         , segmentUpperKeyBound = upperIdBound | ||||||
|  |         , segmentLoad = loadSum loadResPl | ||||||
|  |         , segmentOwnerRemainingLoadTarget = loadRemainingTarget loadResPl | ||||||
|  |         , segmentOwnerCapacity = loadTotalCapacity loadResPl | ||||||
|  |         , segmentCurrentOwner = target | ||||||
|  |         } | ||||||
|  | 
 | ||||||
|  | 
 | ||||||
|  | 
 | ||||||
| -- | Generic function for sending a request over a connected socket and collecting the response. | -- | Generic function for sending a request over a connected socket and collecting the response. | ||||||
| -- Serialises the message and tries to deliver its parts for a number of attempts within a specified timeout. | -- Serialises the message and tries to deliver its parts for a number of attempts within a specified timeout. | ||||||
| sendRequestTo :: Int                    -- ^ timeout in milliseconds | sendRequestTo :: Int                    -- ^ timeout in milliseconds | ||||||
|  | @ -800,24 +878,24 @@ sendRequestTo timeoutMillis numAttempts msgIncomplete sock = do | ||||||
| 
 | 
 | ||||||
| -- | enqueue a list of RemoteCacheEntries to be added to the global NodeCache | -- | enqueue a list of RemoteCacheEntries to be added to the global NodeCache | ||||||
| queueAddEntries :: Foldable c => c RemoteCacheEntry | queueAddEntries :: Foldable c => c RemoteCacheEntry | ||||||
|                 -> LocalNodeState s |                 -> TQueue (NodeCache -> NodeCache) | ||||||
|                 -> IO () |                 -> IO () | ||||||
| queueAddEntries entries ns = do | queueAddEntries entries cacheQ = do | ||||||
|     now <- getPOSIXTime |     now <- getPOSIXTime | ||||||
|     forM_ entries $ \entry -> atomically $ writeTQueue (cacheWriteQueue ns)  $ addCacheEntryPure now entry |     forM_ entries $ \entry -> atomically $ writeTQueue cacheQ  $ addCacheEntryPure now entry | ||||||
| 
 | 
 | ||||||
| 
 | 
 | ||||||
| -- | enque a list of node IDs to be deleted from the global NodeCache | -- | enque a list of node IDs to be deleted from the global NodeCache | ||||||
| queueDeleteEntries :: Foldable c | queueDeleteEntries :: Foldable c | ||||||
|                    => c NodeID |                    => c NodeID | ||||||
|                    -> LocalNodeState s |                    -> TQueue (NodeCache -> NodeCache) | ||||||
|                    -> IO () |                    -> IO () | ||||||
| queueDeleteEntries ids ns = forM_ ids $ atomically . writeTQueue (cacheWriteQueue ns) . deleteCacheEntry | queueDeleteEntries ids cacheQ = forM_ ids $ atomically . writeTQueue cacheQ . deleteCacheEntry | ||||||
| 
 | 
 | ||||||
| 
 | 
 | ||||||
| -- | enque a single node ID to be deleted from the global NodeCache | -- | enque a single node ID to be deleted from the global NodeCache | ||||||
| queueDeleteEntry :: NodeID | queueDeleteEntry :: NodeID | ||||||
|                  -> LocalNodeState s |                  -> TQueue (NodeCache -> NodeCache) | ||||||
|                  -> IO () |                  -> IO () | ||||||
| queueDeleteEntry toDelete = queueDeleteEntries $ Identity toDelete | queueDeleteEntry toDelete = queueDeleteEntries $ Identity toDelete | ||||||
| 
 | 
 | ||||||
|  | @ -826,11 +904,11 @@ queueDeleteEntry toDelete = queueDeleteEntries $ Identity toDelete | ||||||
| -- global 'NodeCache'. | -- global 'NodeCache'. | ||||||
| queueUpdateVerifieds :: Foldable c | queueUpdateVerifieds :: Foldable c | ||||||
|                      => c NodeID |                      => c NodeID | ||||||
|                      -> LocalNodeState s |                      -> TQueue (NodeCache -> NodeCache) | ||||||
|                      -> IO () |                      -> IO () | ||||||
| queueUpdateVerifieds nIds ns = do | queueUpdateVerifieds nIds cacheQ = do | ||||||
|     now <- getPOSIXTime |     now <- getPOSIXTime | ||||||
|     forM_ nIds $ \nid' -> atomically $ writeTQueue (cacheWriteQueue ns) $ |     forM_ nIds $ \nid' -> atomically $ writeTQueue cacheQ $ | ||||||
|         markCacheEntryAsVerified (Just now) nid' |         markCacheEntryAsVerified (Just now) nid' | ||||||
| 
 | 
 | ||||||
| -- | retry an IO action at most *i* times until it delivers a result | -- | retry an IO action at most *i* times until it delivers a result | ||||||
|  |  | ||||||
										
											
												File diff suppressed because it is too large
												Load diff
											
										
									
								
							|  | @ -7,8 +7,8 @@ | ||||||
| {-# LANGUAGE OverloadedStrings          #-} | {-# LANGUAGE OverloadedStrings          #-} | ||||||
| {-# LANGUAGE RankNTypes                 #-} | {-# LANGUAGE RankNTypes                 #-} | ||||||
| 
 | 
 | ||||||
| module Hash2Pub.FediChordTypes ( | module Hash2Pub.FediChordTypes | ||||||
|     NodeID -- abstract, but newtype constructors cannot be hidden |   ( NodeID -- abstract, but newtype constructors cannot be hidden | ||||||
|   , idBits |   , idBits | ||||||
|   , getNodeID |   , getNodeID | ||||||
|   , toNodeID |   , toNodeID | ||||||
|  | @ -18,6 +18,13 @@ module Hash2Pub.FediChordTypes ( | ||||||
|   , RemoteNodeState (..) |   , RemoteNodeState (..) | ||||||
|   , RealNode (..) |   , RealNode (..) | ||||||
|   , RealNodeSTM |   , RealNodeSTM | ||||||
|  |   , VSMap | ||||||
|  |   , LoadStats (..) | ||||||
|  |   , emptyLoadStats | ||||||
|  |   , remainingLoadTarget | ||||||
|  |   , loadSliceSum | ||||||
|  |   , addVserver | ||||||
|  |   , SegmentLoadStats (..) | ||||||
|   , setSuccessors |   , setSuccessors | ||||||
|   , setPredecessors |   , setPredecessors | ||||||
|   , NodeCache |   , NodeCache | ||||||
|  | @ -51,6 +58,7 @@ module Hash2Pub.FediChordTypes ( | ||||||
|   , localCompare |   , localCompare | ||||||
|   , genNodeID |   , genNodeID | ||||||
|   , genNodeIDBS |   , genNodeIDBS | ||||||
|  |   , hasValidNodeId | ||||||
|   , genKeyID |   , genKeyID | ||||||
|   , genKeyIDBS |   , genKeyIDBS | ||||||
|   , byteStringToUInteger |   , byteStringToUInteger | ||||||
|  | @ -60,12 +68,14 @@ module Hash2Pub.FediChordTypes ( | ||||||
|   , DHT(..) |   , DHT(..) | ||||||
|   , Service(..) |   , Service(..) | ||||||
|   , ServiceConf(..) |   , ServiceConf(..) | ||||||
|                            ) where |   ) where | ||||||
| 
 | 
 | ||||||
| import           Control.Exception | import           Control.Exception | ||||||
| import           Data.Foldable                 (foldr') | import           Data.Foldable                 (foldr') | ||||||
| import           Data.Function                 (on) | import           Data.Function                 (on) | ||||||
| import qualified Data.Hashable                 as Hashable | import qualified Data.Hashable                 as Hashable | ||||||
|  | import           Data.HashMap.Strict           (HashMap) | ||||||
|  | import qualified Data.HashMap.Strict           as HMap | ||||||
| import           Data.List                     (delete, nub, sortBy) | import           Data.List                     (delete, nub, sortBy) | ||||||
| import qualified Data.Map.Strict               as Map | import qualified Data.Map.Strict               as Map | ||||||
| import           Data.Maybe                    (fromJust, fromMaybe, isJust, | import           Data.Maybe                    (fromJust, fromMaybe, isJust, | ||||||
|  | @ -148,17 +158,27 @@ a `localCompare` b | ||||||
| -- Also contains shared data and config values. | -- Also contains shared data and config values. | ||||||
| -- TODO: more data structures for k-choices bookkeeping | -- TODO: more data structures for k-choices bookkeeping | ||||||
| data RealNode s = RealNode | data RealNode s = RealNode | ||||||
|     { vservers       :: [LocalNodeStateSTM s] |     { vservers              :: VSMap s | ||||||
|     -- ^ references to all active versers |     -- ^ map of all active VServer node IDs to their node state | ||||||
|     , nodeConfig     :: FediChordConf |     , nodeConfig            :: FediChordConf | ||||||
|     -- ^ holds the initial configuration read at program start |     -- ^ holds the initial configuration read at program start | ||||||
|     , bootstrapNodes :: [(String, PortNumber)] |     , bootstrapNodes        :: [(String, PortNumber)] | ||||||
|     -- ^ nodes to be used as bootstrapping points, new ones learned during operation |     -- ^ nodes to be used as bootstrapping points, new ones learned during operation | ||||||
|     , lookupCacheSTM :: TVar LookupCache |     , lookupCacheSTM        :: TVar LookupCache | ||||||
|     -- ^ a global cache of looked up keys and their associated nodes |     -- ^ a global cache of looked up keys and their associated nodes | ||||||
|     , nodeService    :: s (RealNodeSTM s) |     , globalNodeCacheSTM    :: TVar NodeCache | ||||||
|  |     -- ^ EpiChord node cache with expiry times for nodes. | ||||||
|  |     , globalCacheWriteQueue :: TQueue (NodeCache -> NodeCache) | ||||||
|  |     -- ^ cache updates are not written directly to the  'globalNodeCacheSTM' | ||||||
|  |     , nodeService           :: s (RealNodeSTM s) | ||||||
|     } |     } | ||||||
| 
 | 
 | ||||||
|  | -- | insert a new vserver mapping into a node | ||||||
|  | addVserver :: (NodeID, LocalNodeStateSTM s) -> RealNode s -> RealNode s | ||||||
|  | addVserver (key, nstate) node = node | ||||||
|  |     { vservers = addRMapEntry key nstate (vservers node) } | ||||||
|  | 
 | ||||||
|  | type VSMap s = RingMap NodeID (LocalNodeStateSTM s) | ||||||
| type RealNodeSTM s = TVar (RealNode s) | type RealNodeSTM s = TVar (RealNode s) | ||||||
| 
 | 
 | ||||||
| -- | represents a node and all its important state | -- | represents a node and all its important state | ||||||
|  | @ -172,7 +192,7 @@ data RemoteNodeState = RemoteNodeState | ||||||
|     -- ^ port of the DHT itself |     -- ^ port of the DHT itself | ||||||
|     , servicePort :: PortNumber |     , servicePort :: PortNumber | ||||||
|     -- ^ port of the service provided on top of the DHT |     -- ^ port of the service provided on top of the DHT | ||||||
|     , vServerID   :: Integer |     , vServerID   :: Word8 | ||||||
|     -- ^ ID of this vserver |     -- ^ ID of this vserver | ||||||
|     } |     } | ||||||
|     deriving (Show, Eq) |     deriving (Show, Eq) | ||||||
|  | @ -185,9 +205,9 @@ data LocalNodeState s = LocalNodeState | ||||||
|     { nodeState           :: RemoteNodeState |     { nodeState           :: RemoteNodeState | ||||||
|     -- ^ represents common data present both in remote and local node representations |     -- ^ represents common data present both in remote and local node representations | ||||||
|     , nodeCacheSTM        :: TVar NodeCache |     , nodeCacheSTM        :: TVar NodeCache | ||||||
|     -- ^ EpiChord node cache with expiry times for nodes |     -- ^ reference to the 'globalNodeCacheSTM' | ||||||
|     , cacheWriteQueue     :: TQueue (NodeCache -> NodeCache) |     , cacheWriteQueue     :: TQueue (NodeCache -> NodeCache) | ||||||
|     -- ^ cache updates are not written directly to the  'nodeCache' but queued and |     -- ^ reference to the 'globalCacheWriteQueue | ||||||
|     , successors          :: [RemoteNodeState] -- could be a set instead as these are ordered as well |     , successors          :: [RemoteNodeState] -- could be a set instead as these are ordered as well | ||||||
|     -- ^ successor nodes in ascending order by distance |     -- ^ successor nodes in ascending order by distance | ||||||
|     , predecessors        :: [RemoteNodeState] |     , predecessors        :: [RemoteNodeState] | ||||||
|  | @ -217,14 +237,14 @@ class NodeState a where | ||||||
|     getIpAddr :: a -> HostAddress6 |     getIpAddr :: a -> HostAddress6 | ||||||
|     getDhtPort :: a -> PortNumber |     getDhtPort :: a -> PortNumber | ||||||
|     getServicePort :: a -> PortNumber |     getServicePort :: a -> PortNumber | ||||||
|     getVServerID :: a -> Integer |     getVServerID :: a -> Word8 | ||||||
|     -- setters for common properties |     -- setters for common properties | ||||||
|     setNid :: NodeID -> a -> a |     setNid :: NodeID -> a -> a | ||||||
|     setDomain :: String -> a -> a |     setDomain :: String -> a -> a | ||||||
|     setIpAddr :: HostAddress6 -> a -> a |     setIpAddr :: HostAddress6 -> a -> a | ||||||
|     setDhtPort :: PortNumber -> a -> a |     setDhtPort :: PortNumber -> a -> a | ||||||
|     setServicePort :: PortNumber -> a -> a |     setServicePort :: PortNumber -> a -> a | ||||||
|     setVServerID :: Integer -> a -> a |     setVServerID :: Word8 -> a -> a | ||||||
|     toRemoteNodeState :: a -> RemoteNodeState |     toRemoteNodeState :: a -> RemoteNodeState | ||||||
| 
 | 
 | ||||||
| instance NodeState RemoteNodeState where | instance NodeState RemoteNodeState where | ||||||
|  | @ -373,6 +393,11 @@ genNodeID :: HostAddress6       -- ^a node's IPv6 address | ||||||
|             -> NodeID           -- ^the generated @NodeID@ |             -> NodeID           -- ^the generated @NodeID@ | ||||||
| genNodeID ip nodeDomain vs = NodeID . byteStringToUInteger $ genNodeIDBS ip nodeDomain vs | genNodeID ip nodeDomain vs = NodeID . byteStringToUInteger $ genNodeIDBS ip nodeDomain vs | ||||||
| 
 | 
 | ||||||
|  | 
 | ||||||
|  | hasValidNodeId :: Word8 -> RemoteNodeState -> HostAddress6 -> Bool | ||||||
|  | hasValidNodeId numVs rns addr = getVServerID rns < numVs && getNid rns == genNodeID addr (getDomain rns) (getVServerID rns) | ||||||
|  | 
 | ||||||
|  | 
 | ||||||
| -- | generates a 256 bit long key identifier, represented as ByteString, for looking up its data on the DHT | -- | generates a 256 bit long key identifier, represented as ByteString, for looking up its data on the DHT | ||||||
| genKeyIDBS :: String            -- ^the key string | genKeyIDBS :: String            -- ^the key string | ||||||
|            -> BS.ByteString     -- ^the key ID represented as a @ByteString@ |            -> BS.ByteString     -- ^the key ID represented as a @ByteString@ | ||||||
|  | @ -427,9 +452,70 @@ data FediChordConf = FediChordConf | ||||||
|     -- ^ how long to wait until response has arrived, in milliseconds |     -- ^ how long to wait until response has arrived, in milliseconds | ||||||
|     , confRequestRetries            :: Int |     , confRequestRetries            :: Int | ||||||
|     -- ^ how often re-sending a timed-out request can be retried |     -- ^ how often re-sending a timed-out request can be retried | ||||||
|  |     , confEnableKChoices            :: Bool | ||||||
|  |     -- ^ whether to enable k-choices load balancing | ||||||
|  |     , confKChoicesOverload          :: Double | ||||||
|  |     -- ^ fraction of capacity above which a node considers itself overloaded | ||||||
|  |     , confKChoicesUnderload         :: Double | ||||||
|  |     -- ^ fraction of capacity below which a node considers itself underloaded | ||||||
|  |     , confKChoicesMaxVS             :: Word8 | ||||||
|  |     -- ^ upper limit of vserver index κ | ||||||
|  |     , confKChoicesRebalanceInterval :: Int | ||||||
|  |     -- ^ interval between vserver rebalance attempts | ||||||
|     } |     } | ||||||
|     deriving (Show, Eq) |     deriving (Show, Eq) | ||||||
| 
 | 
 | ||||||
|  | -- ====== k-choices load balancing types ====== | ||||||
|  | 
 | ||||||
|  | data LoadStats = LoadStats | ||||||
|  |     { loadPerTag         :: RingMap NodeID Double | ||||||
|  |     -- ^ map of loads for each handled tag | ||||||
|  |     , totalCapacity      :: Double | ||||||
|  |     -- ^ total designated capacity of the service | ||||||
|  |     , compensatedLoadSum :: Double | ||||||
|  |     -- ^ effective load reevant for load balancing after compensating for | ||||||
|  |     } | ||||||
|  |     deriving (Show, Eq) | ||||||
|  | 
 | ||||||
|  | -- | calculates the mismatch from the target load by taking into account the | ||||||
|  | -- underload and overload limits | ||||||
|  | remainingLoadTarget :: FediChordConf -> LoadStats -> Double | ||||||
|  | remainingLoadTarget conf lstats = targetLoad - compensatedLoadSum lstats | ||||||
|  |     where | ||||||
|  |         targetLoad = totalCapacity lstats * (confKChoicesUnderload conf + confKChoicesOverload conf) / 2 | ||||||
|  | 
 | ||||||
|  | 
 | ||||||
|  | -- | calculates the sum of tag load in a contiguous slice between to keys | ||||||
|  | loadSliceSum :: LoadStats | ||||||
|  |              -> NodeID  -- ^ lower segment bound | ||||||
|  |              -> NodeID  -- ^ upper segment bound | ||||||
|  |              -> Double  -- ^ sum of all tag loads within that segment | ||||||
|  | loadSliceSum stats from to = sum . takeRMapSuccessorsFromTo from to $ loadPerTag stats | ||||||
|  | 
 | ||||||
|  | 
 | ||||||
|  | data SegmentLoadStats = SegmentLoadStats | ||||||
|  |     { segmentLowerKeyBound            :: NodeID | ||||||
|  |     -- ^ segment start key | ||||||
|  |     , segmentUpperKeyBound            :: NodeID | ||||||
|  |     -- ^ segment end key | ||||||
|  |     , segmentLoad                     :: Double | ||||||
|  |     -- ^ sum of load of all keys in the segment | ||||||
|  |     , segmentOwnerRemainingLoadTarget :: Double | ||||||
|  |     -- ^ remaining load target of the current segment handler: | ||||||
|  |     , segmentOwnerCapacity            :: Double | ||||||
|  |     -- ^ total capacity of the current segment handler node, used for normalisation | ||||||
|  |     , segmentCurrentOwner             :: RemoteNodeState | ||||||
|  |     -- ^ the current owner of the segment that needs to be joined on | ||||||
|  |     } | ||||||
|  | 
 | ||||||
|  | -- TODO: figure out a better way of initialising | ||||||
|  | emptyLoadStats :: LoadStats | ||||||
|  | emptyLoadStats = LoadStats | ||||||
|  |     { loadPerTag = emptyRMap | ||||||
|  |     , totalCapacity = 0 | ||||||
|  |     , compensatedLoadSum = 0 | ||||||
|  |     } | ||||||
|  | 
 | ||||||
| -- ====== Service Types ============ | -- ====== Service Types ============ | ||||||
| 
 | 
 | ||||||
| class Service s d where | class Service s d where | ||||||
|  | @ -445,6 +531,7 @@ class Service s d where | ||||||
|                 -> IO (Either String ())    -- ^ success or failure |                 -> IO (Either String ())    -- ^ success or failure | ||||||
|     -- | Wait for an incoming migration from a given node to succeed, may block forever |     -- | Wait for an incoming migration from a given node to succeed, may block forever | ||||||
|     waitForMigrationFrom :: s d -> NodeID -> IO () |     waitForMigrationFrom :: s d -> NodeID -> IO () | ||||||
|  |     getServiceLoadStats :: s d -> IO LoadStats | ||||||
| 
 | 
 | ||||||
| instance Hashable.Hashable NodeID where | instance Hashable.Hashable NodeID where | ||||||
|     hashWithSalt salt = Hashable.hashWithSalt salt . getNodeID |     hashWithSalt salt = Hashable.hashWithSalt salt . getNodeID | ||||||
|  |  | ||||||
|  | @ -22,7 +22,7 @@ import qualified Data.DList                as D | ||||||
| import           Data.Either               (lefts, rights) | import           Data.Either               (lefts, rights) | ||||||
| import qualified Data.HashMap.Strict       as HMap | import qualified Data.HashMap.Strict       as HMap | ||||||
| import qualified Data.HashSet              as HSet | import qualified Data.HashSet              as HSet | ||||||
| import           Data.Maybe                (fromJust, isJust) | import           Data.Maybe                (fromJust, fromMaybe, isJust) | ||||||
| import           Data.String               (fromString) | import           Data.String               (fromString) | ||||||
| import           Data.Text.Lazy            (Text) | import           Data.Text.Lazy            (Text) | ||||||
| import qualified Data.Text.Lazy            as Txt | import qualified Data.Text.Lazy            as Txt | ||||||
|  | @ -64,8 +64,10 @@ data PostService d = PostService | ||||||
|     , migrationsInProgress :: TVar (HMap.HashMap NodeID (MVar ())) |     , migrationsInProgress :: TVar (HMap.HashMap NodeID (MVar ())) | ||||||
|     , httpMan              :: HTTP.Manager |     , httpMan              :: HTTP.Manager | ||||||
|     , statsQueue           :: TQueue StatsEvent |     , statsQueue           :: TQueue StatsEvent | ||||||
|     , loadStats            :: TVar RelayStats |     , relayStats           :: TVar RelayStats | ||||||
|     -- ^ current load stats, replaced periodically |     -- ^ current relay stats, replaced periodically | ||||||
|  |     , loadStats            :: TVar LoadStats | ||||||
|  |     -- ^ current load values of the relay, replaced periodically and used by | ||||||
|     , logFileHandle        :: Handle |     , logFileHandle        :: Handle | ||||||
|     } |     } | ||||||
|     deriving (Typeable) |     deriving (Typeable) | ||||||
|  | @ -96,7 +98,8 @@ instance DHT d => Service PostService d where | ||||||
|         migrationsInProgress' <- newTVarIO HMap.empty |         migrationsInProgress' <- newTVarIO HMap.empty | ||||||
|         httpMan' <- HTTP.newManager HTTP.defaultManagerSettings |         httpMan' <- HTTP.newManager HTTP.defaultManagerSettings | ||||||
|         statsQueue' <- newTQueueIO |         statsQueue' <- newTQueueIO | ||||||
|         loadStats' <- newTVarIO emptyStats |         relayStats' <- newTVarIO emptyStats | ||||||
|  |         loadStats' <- newTVarIO emptyLoadStats | ||||||
|         loggingFile <- openFile (confLogfilePath conf) WriteMode |         loggingFile <- openFile (confLogfilePath conf) WriteMode | ||||||
|         hSetBuffering loggingFile LineBuffering |         hSetBuffering loggingFile LineBuffering | ||||||
|         let |         let | ||||||
|  | @ -112,6 +115,7 @@ instance DHT d => Service PostService d where | ||||||
|               , migrationsInProgress = migrationsInProgress' |               , migrationsInProgress = migrationsInProgress' | ||||||
|               , httpMan = httpMan' |               , httpMan = httpMan' | ||||||
|               , statsQueue = statsQueue' |               , statsQueue = statsQueue' | ||||||
|  |               , relayStats = relayStats' | ||||||
|               , loadStats = loadStats' |               , loadStats = loadStats' | ||||||
|               , logFileHandle = loggingFile |               , logFileHandle = loggingFile | ||||||
|               } |               } | ||||||
|  | @ -153,6 +157,12 @@ instance DHT d => Service PostService d where | ||||||
|         -- block until migration finished |         -- block until migration finished | ||||||
|         takeMVar migrationSynchroniser |         takeMVar migrationSynchroniser | ||||||
| 
 | 
 | ||||||
|  |     getServiceLoadStats = getLoadStats | ||||||
|  | 
 | ||||||
|  | 
 | ||||||
|  | getLoadStats :: PostService d -> IO LoadStats | ||||||
|  | getLoadStats serv = readTVarIO $ loadStats serv | ||||||
|  | 
 | ||||||
| 
 | 
 | ||||||
| -- | return a WAI application | -- | return a WAI application | ||||||
| postServiceApplication :: DHT d => PostService d -> Application | postServiceApplication :: DHT d => PostService d -> Application | ||||||
|  | @ -835,7 +845,12 @@ evaluateStatsThread serv statsAcc = getPOSIXTime >>= loop | ||||||
|           -- persistently store in a TVar so it can be retrieved later by the DHT |           -- persistently store in a TVar so it can be retrieved later by the DHT | ||||||
|           let timePassed = (now - previousTs) * fromIntegral (confSpeedupFactor $ serviceConf serv) |           let timePassed = (now - previousTs) * fromIntegral (confSpeedupFactor $ serviceConf serv) | ||||||
|               rateStats = evaluateStats timePassed summedStats |               rateStats = evaluateStats timePassed summedStats | ||||||
|           atomically $ writeTVar (loadStats serv) rateStats |           currentSubscribers <- readTVarIO $ subscribers serv | ||||||
|  |           -- translate the rate statistics to load values | ||||||
|  |           loads <- evaluateLoadStats rateStats currentSubscribers | ||||||
|  |           atomically $ | ||||||
|  |               writeTVar (relayStats serv) rateStats | ||||||
|  |               >> writeTVar (loadStats serv) loads | ||||||
|           -- and now what? write a log to file |           -- and now what? write a log to file | ||||||
|           -- format: total relayReceiveRates;total relayDeliveryRates;postFetchRate;postPublishRate; subscriberSum |           -- format: total relayReceiveRates;total relayDeliveryRates;postFetchRate;postPublishRate; subscriberSum | ||||||
|           -- later: current (reported) load, target load |           -- later: current (reported) load, target load | ||||||
|  | @ -859,6 +874,33 @@ evaluateStatsThread serv statsAcc = getPOSIXTime >>= loop | ||||||
|                 0 tagMap |                 0 tagMap | ||||||
| 
 | 
 | ||||||
| 
 | 
 | ||||||
|  | -- | calculate load values from rate statistics | ||||||
|  | evaluateLoadStats :: RelayStats -> RelayTags -> IO LoadStats | ||||||
|  | evaluateLoadStats currentStats currentSubscribers = do | ||||||
|  |     -- load caused by each tag: incomingPostRate * ( 1 + subscribers) | ||||||
|  |     -- calculate remaining load target: post publish rate * 2.5 - sum loadPerTag - postFetchRate | ||||||
|  |     let | ||||||
|  |         totalCapacity' = 2.5 * postPublishRate currentStats | ||||||
|  |     (loadSum, loadPerTag') <- foldM (\(loadSum, loadPerTag') (key, (subscriberMapSTM, _, _)) -> do | ||||||
|  |         numSubscribers <- HMap.size <$> readTVarIO subscriberMapSTM | ||||||
|  |         let | ||||||
|  |             thisTagRate = fromMaybe 0 $ rMapLookup key (relayReceiveRates currentStats) | ||||||
|  |             thisTagLoad = thisTagRate * (1 + fromIntegral numSubscribers) | ||||||
|  |         pure (loadSum + thisTagLoad, addRMapEntry key thisTagLoad loadPerTag') | ||||||
|  |         ) | ||||||
|  |         (0, emptyRMap) | ||||||
|  |         $ rMapToListWithKeys currentSubscribers | ||||||
|  |     let remainingLoadTarget' = totalCapacity' - loadSum - postFetchRate currentStats | ||||||
|  |     pure LoadStats | ||||||
|  |         { loadPerTag = loadPerTag' | ||||||
|  |         , totalCapacity = totalCapacity' | ||||||
|  |         -- load caused by post fetches cannot be influenced by re-balancing nodes, | ||||||
|  |         -- but still reduces the totally available capacity | ||||||
|  |         , compensatedLoadSum = loadSum + postFetchRate currentStats | ||||||
|  |         } | ||||||
|  | 
 | ||||||
|  | 
 | ||||||
|  | 
 | ||||||
| -- | Evaluate the accumulated statistic events: Currently mostly calculates the event | -- | Evaluate the accumulated statistic events: Currently mostly calculates the event | ||||||
| -- rates by dividing through the collection time frame | -- rates by dividing through the collection time frame | ||||||
| evaluateStats :: POSIXTime -> RelayStats -> RelayStats | evaluateStats :: POSIXTime -> RelayStats -> RelayStats | ||||||
|  |  | ||||||
|  | @ -16,10 +16,12 @@ data Action = QueryID | ||||||
|     | Leave |     | Leave | ||||||
|     | Stabilise |     | Stabilise | ||||||
|     | Ping |     | Ping | ||||||
|  |     | QueryLoad | ||||||
|     deriving (Show, Eq, Enum) |     deriving (Show, Eq, Enum) | ||||||
| 
 | 
 | ||||||
| data FediChordMessage = Request | data FediChordMessage = Request | ||||||
|     { requestID   :: Integer |     { requestID   :: Integer | ||||||
|  |     , receiverID  :: NodeID | ||||||
|     , sender      :: RemoteNodeState |     , sender      :: RemoteNodeState | ||||||
|     , part        :: Integer |     , part        :: Integer | ||||||
|     , isFinalPart :: Bool |     , isFinalPart :: Bool | ||||||
|  | @ -57,6 +59,10 @@ data ActionPayload = QueryIDRequestPayload | ||||||
|     } |     } | ||||||
|     | StabiliseRequestPayload |     | StabiliseRequestPayload | ||||||
|     | PingRequestPayload |     | PingRequestPayload | ||||||
|  |     | LoadRequestPayload | ||||||
|  |     { loadSegmentUpperBound :: NodeID | ||||||
|  |     -- ^ upper bound of segment interested in, | ||||||
|  |     } | ||||||
|     | QueryIDResponsePayload |     | QueryIDResponsePayload | ||||||
|     { queryResult :: QueryResponse |     { queryResult :: QueryResponse | ||||||
|     } |     } | ||||||
|  | @ -73,6 +79,12 @@ data ActionPayload = QueryIDRequestPayload | ||||||
|     | PingResponsePayload |     | PingResponsePayload | ||||||
|     { pingNodeStates :: [RemoteNodeState] |     { pingNodeStates :: [RemoteNodeState] | ||||||
|     } |     } | ||||||
|  |     | LoadResponsePayload | ||||||
|  |     { loadSum               :: Double | ||||||
|  |     , loadRemainingTarget   :: Double | ||||||
|  |     , loadTotalCapacity     :: Double | ||||||
|  |     , loadSegmentLowerBound :: NodeID | ||||||
|  |     } | ||||||
|     deriving (Show, Eq) |     deriving (Show, Eq) | ||||||
| 
 | 
 | ||||||
| -- | global limit of parts per message used when (de)serialising messages. | -- | global limit of parts per message used when (de)serialising messages. | ||||||
|  |  | ||||||
|  | @ -47,6 +47,13 @@ instance (Bounded k, Ord k) => Foldable (RingMap k) where | ||||||
|           traversingFL acc (ProxyEntry _ Nothing) = acc |           traversingFL acc (ProxyEntry _ Nothing) = acc | ||||||
|           traversingFL acc (ProxyEntry _ (Just entry)) = traversingFL acc entry |           traversingFL acc (ProxyEntry _ (Just entry)) = traversingFL acc entry | ||||||
| 
 | 
 | ||||||
|  | instance (Bounded k, Ord k) => Traversable (RingMap k) where | ||||||
|  |     traverse f = fmap RingMap . traverse traversingF . getRingMap | ||||||
|  |       where | ||||||
|  |           traversingF (KeyEntry entry) = KeyEntry <$> f entry | ||||||
|  |           traversingF (ProxyEntry to Nothing) = pure $ ProxyEntry to Nothing | ||||||
|  |           traversingF (ProxyEntry to (Just entry)) = ProxyEntry to . Just <$> traversingF entry | ||||||
|  | 
 | ||||||
| 
 | 
 | ||||||
| -- | entry of a 'RingMap' that holds a value and can also | -- | entry of a 'RingMap' that holds a value and can also | ||||||
| -- wrap around the lookup direction at the edges of the name space. | -- wrap around the lookup direction at the edges of the name space. | ||||||
|  | @ -106,6 +113,23 @@ rMapSize rmap = fromIntegral $ Map.size innerMap - oneIfEntry rmap minBound - on | ||||||
|       | isNothing (rMapLookup nid rmap') = 1 |       | isNothing (rMapLookup nid rmap') = 1 | ||||||
|       | otherwise = 0 |       | otherwise = 0 | ||||||
| 
 | 
 | ||||||
|  | 
 | ||||||
|  | -- | whether the RingMap is empty (except for empty proxy entries) | ||||||
|  | nullRMap :: (Num k, Bounded k, Ord k) | ||||||
|  |          => RingMap k a | ||||||
|  |          -> Bool | ||||||
|  | -- basic idea: look for a predecessor starting from the upper Map bound, | ||||||
|  | -- Nothing indicates no entry being found | ||||||
|  | nullRMap = isNothing . rMapLookupPred maxBound | ||||||
|  | 
 | ||||||
|  | 
 | ||||||
|  | -- | O(logn( Is the key a member of the RingMap? | ||||||
|  | memberRMap :: (Bounded k, Ord k) | ||||||
|  |            => k | ||||||
|  |            -> RingMap k a | ||||||
|  |            -> Bool | ||||||
|  | memberRMap key = isJust . rMapLookup key | ||||||
|  | 
 | ||||||
| -- | a wrapper around lookup functions, making the lookup redirectable by a @ProxyEntry@ | -- | a wrapper around lookup functions, making the lookup redirectable by a @ProxyEntry@ | ||||||
| -- to simulate a modular ring | -- to simulate a modular ring | ||||||
| lookupWrapper :: (Bounded k, Ord k, Num k) | lookupWrapper :: (Bounded k, Ord k, Num k) | ||||||
|  | @ -198,12 +222,28 @@ deleteRMapEntry nid = RingMap . Map.update modifier nid . getRingMap | ||||||
|     modifier (ProxyEntry idPointer _) = Just (ProxyEntry idPointer Nothing) |     modifier (ProxyEntry idPointer _) = Just (ProxyEntry idPointer Nothing) | ||||||
|     modifier KeyEntry {}              = Nothing |     modifier KeyEntry {}              = Nothing | ||||||
| 
 | 
 | ||||||
|  | -- TODO: rename this to elems | ||||||
| rMapToList :: (Bounded k, Ord k) => RingMap k a -> [a] | rMapToList :: (Bounded k, Ord k) => RingMap k a -> [a] | ||||||
| rMapToList = mapMaybe extractRingEntry . Map.elems . getRingMap | rMapToList = mapMaybe extractRingEntry . Map.elems . getRingMap | ||||||
| 
 | 
 | ||||||
|  | -- TODO: rename this to toList | ||||||
|  | rMapToListWithKeys :: (Bounded k, Ord k) => RingMap k a -> [(k, a)] | ||||||
|  | rMapToListWithKeys = Map.foldrWithKey (\k v acc -> | ||||||
|  |     maybe acc (\val -> (k, val):acc) $ extractRingEntry v | ||||||
|  |                                       ) | ||||||
|  |                                       [] | ||||||
|  |                                       . getRingMap | ||||||
|  | 
 | ||||||
| rMapFromList :: (Bounded k, Ord k) => [(k, a)] -> RingMap k a | rMapFromList :: (Bounded k, Ord k) => [(k, a)] -> RingMap k a | ||||||
| rMapFromList = setRMapEntries | rMapFromList = setRMapEntries | ||||||
| 
 | 
 | ||||||
|  | 
 | ||||||
|  | -- | this just merges the underlying 'Map.Map' s and does not check whether the | ||||||
|  | -- ProxyEntry pointers are consistent, so better only create unions of | ||||||
|  | -- equal-pointered RingMaps | ||||||
|  | unionRMap :: (Bounded k, Ord k) => RingMap k a -> RingMap k a -> RingMap k a | ||||||
|  | unionRMap a b = RingMap $ Map.union (getRingMap a) (getRingMap b) | ||||||
|  | 
 | ||||||
| -- | takes up to i entries from a 'RingMap' by calling a getter function on a | -- | takes up to i entries from a 'RingMap' by calling a getter function on a | ||||||
| -- *startAt* value and after that on the previously returned value. | -- *startAt* value and after that on the previously returned value. | ||||||
| -- Stops once i entries have been taken or an entry has been encountered twice | -- Stops once i entries have been taken or an entry has been encountered twice | ||||||
|  |  | ||||||
|  | @ -7,6 +7,7 @@ import           Control.Concurrent.STM.TVar | ||||||
| import           Control.Exception | import           Control.Exception | ||||||
| import           Data.ASN1.Parse             (runParseASN1) | import           Data.ASN1.Parse             (runParseASN1) | ||||||
| import qualified Data.ByteString             as BS | import qualified Data.ByteString             as BS | ||||||
|  | import qualified Data.HashMap.Strict         as HMap | ||||||
| import qualified Data.Map.Strict             as Map | import qualified Data.Map.Strict             as Map | ||||||
| import           Data.Maybe                  (fromJust, isJust) | import           Data.Maybe                  (fromJust, isJust) | ||||||
| import qualified Data.Set                    as Set | import qualified Data.Set                    as Set | ||||||
|  | @ -18,6 +19,7 @@ import           Hash2Pub.ASN1Coding | ||||||
| import           Hash2Pub.DHTProtocol | import           Hash2Pub.DHTProtocol | ||||||
| import           Hash2Pub.FediChord | import           Hash2Pub.FediChord | ||||||
| import           Hash2Pub.FediChordTypes | import           Hash2Pub.FediChordTypes | ||||||
|  | import           Hash2Pub.RingMap | ||||||
| 
 | 
 | ||||||
| spec :: Spec | spec :: Spec | ||||||
| spec = do | spec = do | ||||||
|  | @ -221,14 +223,16 @@ spec = do | ||||||
|                   , exampleNodeState {nid = fromInteger (-5)} |                   , exampleNodeState {nid = fromInteger (-5)} | ||||||
|                                  ] |                                  ] | ||||||
|                                               } |                                               } | ||||||
|             requestTemplate = Request { |             qLoadReqPayload = LoadRequestPayload | ||||||
|                 requestID = 2342 |                 { loadSegmentUpperBound = 1025 | ||||||
|               , sender = exampleNodeState |                 } | ||||||
|               , part = 1 |             qLoadResPayload = LoadResponsePayload | ||||||
|               , isFinalPart = True |                 { loadSum = 3.141 | ||||||
|               , action = undefined |                 , loadRemainingTarget = -1.337 | ||||||
|               , payload = undefined |                 , loadTotalCapacity = 2.21 | ||||||
|                                       } |                 , loadSegmentLowerBound = 12 | ||||||
|  |                 } | ||||||
|  | 
 | ||||||
|             responseTemplate = Response { |             responseTemplate = Response { | ||||||
|                 requestID = 2342 |                 requestID = 2342 | ||||||
|               , senderID = nid exampleNodeState |               , senderID = nid exampleNodeState | ||||||
|  | @ -237,7 +241,7 @@ spec = do | ||||||
|               , action = undefined |               , action = undefined | ||||||
|               , payload = undefined |               , payload = undefined | ||||||
|                                       } |                                       } | ||||||
|             requestWith a pa = requestTemplate {action = a, payload = Just pa} |             requestWith senderNode a pa = mkRequest senderNode 4545 a (Just pa) 2342 | ||||||
|             responseWith a pa = responseTemplate {action = a, payload = Just pa} |             responseWith a pa = responseTemplate {action = a, payload = Just pa} | ||||||
| 
 | 
 | ||||||
|             encodeDecodeAndCheck msg = runParseASN1 parseMessage (encodeMessage msg) `shouldBe` pure msg |             encodeDecodeAndCheck msg = runParseASN1 parseMessage (encodeMessage msg) `shouldBe` pure msg | ||||||
|  | @ -248,17 +252,20 @@ spec = do | ||||||
|                                                                        } |                                                                        } | ||||||
| 
 | 
 | ||||||
|         it "messages are encoded and decoded correctly from and to ASN1" $ do |         it "messages are encoded and decoded correctly from and to ASN1" $ do | ||||||
|             encodeDecodeAndCheck $ requestWith QueryID qidReqPayload |             localNS <- exampleLocalNode | ||||||
|             encodeDecodeAndCheck $ requestWith Join jReqPayload |             encodeDecodeAndCheck $ requestWith localNS QueryID qidReqPayload | ||||||
|             encodeDecodeAndCheck $ requestWith Leave lReqPayload |             encodeDecodeAndCheck $ requestWith localNS Join jReqPayload | ||||||
|             encodeDecodeAndCheck $ requestWith Stabilise stabReqPayload |             encodeDecodeAndCheck $ requestWith localNS Leave lReqPayload | ||||||
|             encodeDecodeAndCheck $ requestWith Ping pingReqPayload |             encodeDecodeAndCheck $ requestWith localNS Stabilise stabReqPayload | ||||||
|  |             encodeDecodeAndCheck $ requestWith localNS Ping pingReqPayload | ||||||
|  |             encodeDecodeAndCheck $ requestWith localNS QueryLoad qLoadReqPayload | ||||||
|             encodeDecodeAndCheck $ responseWith QueryID qidResPayload1 |             encodeDecodeAndCheck $ responseWith QueryID qidResPayload1 | ||||||
|             encodeDecodeAndCheck $ responseWith QueryID qidResPayload2 |             encodeDecodeAndCheck $ responseWith QueryID qidResPayload2 | ||||||
|             encodeDecodeAndCheck $ responseWith Join jResPayload |             encodeDecodeAndCheck $ responseWith Join jResPayload | ||||||
|             encodeDecodeAndCheck $ responseWith Leave lResPayload |             encodeDecodeAndCheck $ responseWith Leave lResPayload | ||||||
|             encodeDecodeAndCheck $ responseWith Stabilise stabResPayload |             encodeDecodeAndCheck $ responseWith Stabilise stabResPayload | ||||||
|             encodeDecodeAndCheck $ responseWith Ping pingResPayload |             encodeDecodeAndCheck $ responseWith Ping pingResPayload | ||||||
|  |             encodeDecodeAndCheck $ responseWith QueryLoad qLoadResPayload | ||||||
|         it "messages are encoded and decoded to ASN.1 DER properly" $ |         it "messages are encoded and decoded to ASN.1 DER properly" $ | ||||||
|             deserialiseMessage (fromJust $ Map.lookup 1 (serialiseMessage 652  $ responseWith Ping pingResPayload)) `shouldBe` Right (responseWith Ping pingResPayload) |             deserialiseMessage (fromJust $ Map.lookup 1 (serialiseMessage 652  $ responseWith Ping pingResPayload)) `shouldBe` Right (responseWith Ping pingResPayload) | ||||||
|         it "messages too large for a single packet can (often) be split into multiple parts" $ do |         it "messages too large for a single packet can (often) be split into multiple parts" $ do | ||||||
|  | @ -297,13 +304,13 @@ exampleNodeState = RemoteNodeState { | ||||||
| 
 | 
 | ||||||
| exampleLocalNode :: IO (LocalNodeState MockService) | exampleLocalNode :: IO (LocalNodeState MockService) | ||||||
| exampleLocalNode = do | exampleLocalNode = do | ||||||
|     realNode <- newTVarIO $ RealNode { |     realNodeSTM <- newTVarIO $ RealNode { | ||||||
|             vservers = [] |             vservers = emptyRMap | ||||||
|           , nodeConfig = exampleFediConf |           , nodeConfig = exampleFediConf | ||||||
|           , bootstrapNodes = confBootstrapNodes exampleFediConf |           , bootstrapNodes = confBootstrapNodes exampleFediConf | ||||||
|           , nodeService = MockService |           , nodeService = MockService | ||||||
|                                                         } |                                                         } | ||||||
|     nodeStateInit realNode |     nodeStateInit realNodeSTM 0 | ||||||
| 
 | 
 | ||||||
| 
 | 
 | ||||||
| exampleFediConf :: FediChordConf | exampleFediConf :: FediChordConf | ||||||
|  |  | ||||||
		Loading…
	
	Add table
		Add a link
		
	
		Reference in a new issue