Compare commits
	
		
			5 commits
		
	
	
		
	
	| Author | SHA1 | Date | |
|---|---|---|---|
| b46f66e2c0 | |||
| ea14ff9b09 | |||
| 9d8df6d3d8 | |||
| d7355aa04d | |||
| d8b2186016 | 
					 13 changed files with 424 additions and 1099 deletions
				
			
		| 
						 | 
					@ -6,22 +6,20 @@ Domain ::= VisibleString
 | 
				
			||||||
 | 
					
 | 
				
			||||||
Partnum ::= INTEGER (0..150)
 | 
					Partnum ::= INTEGER (0..150)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
Action ::= ENUMERATED {queryID, join, leave, stabilise, ping, queryLoad}
 | 
					Action ::= ENUMERATED {queryID, join, leave, stabilise, ping}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
Request ::= SEQUENCE {
 | 
					Request ::= SEQUENCE {
 | 
				
			||||||
	action			Action,
 | 
						action			Action,
 | 
				
			||||||
	requestID		INTEGER (0..4294967295),	-- arbitrarily restricting to an unsigned 32bit integer
 | 
						requestID		INTEGER (0..4294967295),	-- arbitrarily restricting to an unsigned 32bit integer
 | 
				
			||||||
	receiverID		NodeID,
 | 
					 | 
				
			||||||
	sender			NodeState,
 | 
						sender			NodeState,
 | 
				
			||||||
	part			Partnum,	-- part number of this message, starts at 1
 | 
						part			Partnum,	-- part number of this message, starts at 1
 | 
				
			||||||
	finalPart		BOOLEAN,	-- flag indicating this `part` to be the last of this reuest
 | 
						finalPart		BOOLEAN,	-- flag indicating this `part` to be the last of this reuest
 | 
				
			||||||
	actionPayload	CHOICE {
 | 
						actionPayload	CHOICE {
 | 
				
			||||||
		queryIDRequestPayload		QueryIDRequestPayload,
 | 
							queryIDRequestPayload		QueryIDRequestPayload,
 | 
				
			||||||
		joinRequestPayload			JoinRequestPayload,
 | 
							joinRequestPayload			JoinRequestPayload,
 | 
				
			||||||
		leaveRequestPayload			LeaveRequestPayload,
 | 
							leaveRequestPayload		LeaveRequestPayload,
 | 
				
			||||||
		stabiliseRequestPayload		StabiliseRequestPayload,
 | 
							stabiliseRequestPayload	StabiliseRequestPayload,
 | 
				
			||||||
		pingRequestPayload			PingRequestPayload,
 | 
							pingRequestPayload			PingRequestPayload
 | 
				
			||||||
		loadRequestPayload			LoadRequestPayload
 | 
					 | 
				
			||||||
		}			OPTIONAL			-- just for symmetry reasons with response, requests without a payload have no meaning
 | 
							}			OPTIONAL			-- just for symmetry reasons with response, requests without a payload have no meaning
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
| 
						 | 
					@ -36,12 +34,11 @@ Response ::= SEQUENCE {
 | 
				
			||||||
	finalPart		BOOLEAN,	-- flag indicating this `part` to be the last of this response
 | 
						finalPart		BOOLEAN,	-- flag indicating this `part` to be the last of this response
 | 
				
			||||||
	action			Action,
 | 
						action			Action,
 | 
				
			||||||
	actionPayload	CHOICE {
 | 
						actionPayload	CHOICE {
 | 
				
			||||||
		queryIDResponsePayload		QueryIDResponsePayload,
 | 
							queryIDResponsePayload	QueryIDResponsePayload,
 | 
				
			||||||
		joinResponsePayload			JoinResponsePayload,
 | 
							joinResponsePayload		JoinResponsePayload,
 | 
				
			||||||
		leaveResponsePayload		LeaveResponsePayload,
 | 
							leaveResponsePayload		LeaveResponsePayload,
 | 
				
			||||||
		stabiliseResponsePayload	StabiliseResponsePayload,
 | 
							stabiliseResponsePayload	StabiliseResponsePayload,
 | 
				
			||||||
		pingResponsePayload			PingResponsePayload,
 | 
							pingResponsePayload		PingResponsePayload
 | 
				
			||||||
		loadResponsePayload			LoadResponsePayload
 | 
					 | 
				
			||||||
		}			OPTIONAL			-- no payload when just ACKing a previous request
 | 
							}			OPTIONAL			-- no payload when just ACKing a previous request
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
| 
						 | 
					@ -104,15 +101,5 @@ PingRequestPayload ::= NULL		-- do not include a node/ vserver ID, so that
 | 
				
			||||||
-- learning all active vserver IDs handled by the server at once
 | 
					-- learning all active vserver IDs handled by the server at once
 | 
				
			||||||
PingResponsePayload ::= SEQUENCE OF NodeState
 | 
					PingResponsePayload ::= SEQUENCE OF NodeState
 | 
				
			||||||
 | 
					
 | 
				
			||||||
LoadRequestPayload ::= SEQUENCE {
 | 
					 | 
				
			||||||
	upperSegmentBound		NodeID
 | 
					 | 
				
			||||||
	}
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
LoadResponsePayload ::= SEQUENCE {
 | 
					 | 
				
			||||||
	loadSum					REAL,
 | 
					 | 
				
			||||||
	remainingLoadTarget		REAL,
 | 
					 | 
				
			||||||
	totalCapacity			REAL,
 | 
					 | 
				
			||||||
	lowerBound				NodeID
 | 
					 | 
				
			||||||
	}
 | 
					 | 
				
			||||||
 | 
					
 | 
				
			||||||
END
 | 
					END
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
| 
						 | 
					@ -91,7 +91,7 @@ executable Hash2Pub
 | 
				
			||||||
  -- Base language which the package is written in.
 | 
					  -- Base language which the package is written in.
 | 
				
			||||||
  default-language:    Haskell2010
 | 
					  default-language:    Haskell2010
 | 
				
			||||||
 | 
					
 | 
				
			||||||
  ghc-options:         -threaded
 | 
					  ghc-options:         -threaded -rtsopts -with-rtsopts=-N
 | 
				
			||||||
 | 
					
 | 
				
			||||||
executable Experiment
 | 
					executable Experiment
 | 
				
			||||||
  -- experiment runner
 | 
					  -- experiment runner
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
| 
						 | 
					@ -1,7 +1,7 @@
 | 
				
			||||||
# Hash2Pub
 | 
					# Hash2Pub
 | 
				
			||||||
 | 
					
 | 
				
			||||||
***This is heavily WIP and does not provide any useful functionality yet***.  
 | 
					***This is heavily WIP and does not provide any useful functionality yet***.  
 | 
				
			||||||
I aim for always having the master branch at a state where it builds and tests pass.
 | 
					I aim for always having the `mainline` branch in a state where it builds and tests pass.
 | 
				
			||||||
 | 
					
 | 
				
			||||||
A fully-decentralised relay for global hashtag federation in [ActivityPub](https://activitypub.rocks) based on a distributed hash table.
 | 
					A fully-decentralised relay for global hashtag federation in [ActivityPub](https://activitypub.rocks) based on a distributed hash table.
 | 
				
			||||||
It allows querying and subscribing to all posts of a certain hashtag and is implemented in Haskell.
 | 
					It allows querying and subscribing to all posts of a certain hashtag and is implemented in Haskell.
 | 
				
			||||||
| 
						 | 
					@ -10,6 +10,8 @@ This is the practical implementation of the concept presented in the paper [Dece
 | 
				
			||||||
 | 
					
 | 
				
			||||||
The ASN.1 module schema used for DHT messages can be found in `FediChord.asn1`.
 | 
					The ASN.1 module schema used for DHT messages can be found in `FediChord.asn1`.
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					For further questions and discussins, please refer to the **Hash2Pub topic in [SocialHub](https://socialhub.activitypub.rocks/c/software/hash2pub/48)**.
 | 
				
			||||||
 | 
					
 | 
				
			||||||
## Building
 | 
					## Building
 | 
				
			||||||
 | 
					
 | 
				
			||||||
The project and its developent environment are built with [Nix](https://nixos.org/nix/).
 | 
					The project and its developent environment are built with [Nix](https://nixos.org/nix/).
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
| 
						 | 
					@ -40,7 +40,7 @@ executeSchedule :: Int  -- ^ speedup factor
 | 
				
			||||||
                -> IO ()
 | 
					                -> IO ()
 | 
				
			||||||
executeSchedule speedup events = do
 | 
					executeSchedule speedup events = do
 | 
				
			||||||
    -- initialise HTTP manager
 | 
					    -- initialise HTTP manager
 | 
				
			||||||
    httpMan <- HTTP.newManager HTTP.defaultManagerSettings
 | 
					    httpMan <- HTTP.newManager $ HTTP.defaultManagerSettings { HTTP.managerResponseTimeout = HTTP.responseTimeoutMicro 60000000 }
 | 
				
			||||||
    forM_ events $ \(delay, tag, (pubHost, pubPort)) -> do
 | 
					    forM_ events $ \(delay, tag, (pubHost, pubPort)) -> do
 | 
				
			||||||
        _ <- forkIO $
 | 
					        _ <- forkIO $
 | 
				
			||||||
            clientPublishPost httpMan pubHost pubPort ("foobar #" <> tag)
 | 
					            clientPublishPost httpMan pubHost pubPort ("foobar #" <> tag)
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
							
								
								
									
										35
									
								
								app/Main.hs
									
										
									
									
									
								
							
							
						
						
									
										35
									
								
								app/Main.hs
									
										
									
									
									
								
							| 
						 | 
					@ -18,20 +18,38 @@ main = do
 | 
				
			||||||
    -- ToDo: parse and pass config
 | 
					    -- ToDo: parse and pass config
 | 
				
			||||||
    -- probably use `tomland` for that
 | 
					    -- probably use `tomland` for that
 | 
				
			||||||
    (fConf, sConf) <- readConfig
 | 
					    (fConf, sConf) <- readConfig
 | 
				
			||||||
 | 
					    -- TODO: first initialise 'RealNode', then the vservers
 | 
				
			||||||
    -- ToDo: load persisted caches, bootstrapping nodes …
 | 
					    -- ToDo: load persisted caches, bootstrapping nodes …
 | 
				
			||||||
    (fediThreads, thisNode) <- fediChordInit fConf (runService sConf :: DHT d => d -> IO (PostService d))
 | 
					    (serverSock, thisNode) <- fediChordInit fConf (runService sConf :: DHT d => d -> IO (PostService d))
 | 
				
			||||||
    -- wait for all DHT threads to terminate, this keeps the main thread running
 | 
					    -- currently no masking is necessary, as there is nothing to clean up
 | 
				
			||||||
    wait fediThreads
 | 
					    nodeCacheWriterThread <- forkIO $ nodeCacheWriter thisNode
 | 
				
			||||||
 | 
					    -- try joining the DHT using one of the provided bootstrapping nodes
 | 
				
			||||||
 | 
					    joinedState <- tryBootstrapJoining thisNode
 | 
				
			||||||
 | 
					    either (\err -> do
 | 
				
			||||||
 | 
					        -- handle unsuccessful join
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        putStrLn $ err <> " Error joining, start listening for incoming requests anyways"
 | 
				
			||||||
 | 
					        print =<< readTVarIO thisNode
 | 
				
			||||||
 | 
					        -- launch thread attempting to join on new cache entries
 | 
				
			||||||
 | 
					        _ <- forkIO $ joinOnNewEntriesThread thisNode
 | 
				
			||||||
 | 
					        wait =<< async (fediMainThreads serverSock thisNode)
 | 
				
			||||||
 | 
					           )
 | 
				
			||||||
 | 
					           (\joinedNS -> do
 | 
				
			||||||
 | 
					        -- launch main eventloop with successfully joined state
 | 
				
			||||||
 | 
					        putStrLn "successful join"
 | 
				
			||||||
 | 
					        wait =<< async (fediMainThreads serverSock thisNode)
 | 
				
			||||||
 | 
					           )
 | 
				
			||||||
 | 
					        joinedState
 | 
				
			||||||
 | 
					    pure ()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
readConfig :: IO (FediChordConf, ServiceConf)
 | 
					readConfig :: IO (FediChordConf, ServiceConf)
 | 
				
			||||||
readConfig = do
 | 
					readConfig = do
 | 
				
			||||||
    confDomainString : ipString : portString : servicePortString : speedupString : loadBalancingEnabled : remainingArgs <- getArgs
 | 
					    confDomainString : ipString : portString : servicePortString : speedupString : remainingArgs <- getArgs
 | 
				
			||||||
    -- allow starting the initial node without bootstrapping info to avoid
 | 
					    -- allow starting the initial node without bootstrapping info to avoid
 | 
				
			||||||
    -- waiting for timeout
 | 
					    -- waiting for timeout
 | 
				
			||||||
    let
 | 
					    let
 | 
				
			||||||
        speedup = read speedupString
 | 
					        speedup = read speedupString
 | 
				
			||||||
        statsEvalD = 120 * 10^6 `div` speedup
 | 
					 | 
				
			||||||
        confBootstrapNodes' = case remainingArgs of
 | 
					        confBootstrapNodes' = case remainingArgs of
 | 
				
			||||||
            bootstrapHost : bootstrapPortString : _ ->
 | 
					            bootstrapHost : bootstrapPortString : _ ->
 | 
				
			||||||
                [(bootstrapHost, read bootstrapPortString)]
 | 
					                [(bootstrapHost, read bootstrapPortString)]
 | 
				
			||||||
| 
						 | 
					@ -49,11 +67,6 @@ readConfig = do
 | 
				
			||||||
          , confResponsePurgeAge = 60 / fromIntegral speedup
 | 
					          , confResponsePurgeAge = 60 / fromIntegral speedup
 | 
				
			||||||
          , confRequestTimeout = 5 * 10^6 `div` speedup
 | 
					          , confRequestTimeout = 5 * 10^6 `div` speedup
 | 
				
			||||||
          , confRequestRetries = 3
 | 
					          , confRequestRetries = 3
 | 
				
			||||||
          , confEnableKChoices = loadBalancingEnabled /= "off"
 | 
					 | 
				
			||||||
          , confKChoicesOverload = 0.9
 | 
					 | 
				
			||||||
          , confKChoicesUnderload = 0.1
 | 
					 | 
				
			||||||
          , confKChoicesMaxVS = 8
 | 
					 | 
				
			||||||
          , confKChoicesRebalanceInterval = round  (realToFrac statsEvalD * 1.1 :: Double)
 | 
					 | 
				
			||||||
          }
 | 
					          }
 | 
				
			||||||
        sConf = ServiceConf
 | 
					        sConf = ServiceConf
 | 
				
			||||||
          { confSubscriptionExpiryTime = 24*3600 / fromIntegral speedup
 | 
					          { confSubscriptionExpiryTime = 24*3600 / fromIntegral speedup
 | 
				
			||||||
| 
						 | 
					@ -61,7 +74,7 @@ readConfig = do
 | 
				
			||||||
          , confServiceHost = confDomainString
 | 
					          , confServiceHost = confDomainString
 | 
				
			||||||
          , confLogfilePath = "../simulationData/logs/" <> confDomainString <> ".log"
 | 
					          , confLogfilePath = "../simulationData/logs/" <> confDomainString <> ".log"
 | 
				
			||||||
          , confSpeedupFactor = speedup
 | 
					          , confSpeedupFactor = speedup
 | 
				
			||||||
          , confStatsEvalDelay = statsEvalD
 | 
					          , confStatsEvalDelay = 120 * 10^6 `div` speedup
 | 
				
			||||||
          }
 | 
					          }
 | 
				
			||||||
    pure (fConf, sConf)
 | 
					    pure (fConf, sConf)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
| 
						 | 
					@ -184,19 +184,6 @@ encodePayload payload'@PingResponsePayload{} =
 | 
				
			||||||
    Start Sequence
 | 
					    Start Sequence
 | 
				
			||||||
  : concatMap encodeNodeState (pingNodeStates payload')
 | 
					  : concatMap encodeNodeState (pingNodeStates payload')
 | 
				
			||||||
  <> [End Sequence]
 | 
					  <> [End Sequence]
 | 
				
			||||||
encodePayload payload'@LoadRequestPayload{} =
 | 
					 | 
				
			||||||
  [ Start Sequence
 | 
					 | 
				
			||||||
  , IntVal . getNodeID $ loadSegmentUpperBound payload'
 | 
					 | 
				
			||||||
  , End Sequence
 | 
					 | 
				
			||||||
  ]
 | 
					 | 
				
			||||||
encodePayload payload'@LoadResponsePayload{} =
 | 
					 | 
				
			||||||
  [ Start Sequence
 | 
					 | 
				
			||||||
  , Real $ loadSum payload'
 | 
					 | 
				
			||||||
  , Real $ loadRemainingTarget payload'
 | 
					 | 
				
			||||||
  , Real $ loadTotalCapacity payload'
 | 
					 | 
				
			||||||
  , IntVal . getNodeID $ loadSegmentLowerBound payload'
 | 
					 | 
				
			||||||
  , End Sequence
 | 
					 | 
				
			||||||
  ]
 | 
					 | 
				
			||||||
 | 
					
 | 
				
			||||||
encodeNodeState :: NodeState a => a -> [ASN1]
 | 
					encodeNodeState :: NodeState a => a -> [ASN1]
 | 
				
			||||||
encodeNodeState ns = [
 | 
					encodeNodeState ns = [
 | 
				
			||||||
| 
						 | 
					@ -206,7 +193,7 @@ encodeNodeState ns = [
 | 
				
			||||||
  , OctetString (ipAddrAsBS $ getIpAddr ns)
 | 
					  , OctetString (ipAddrAsBS $ getIpAddr ns)
 | 
				
			||||||
  , IntVal (toInteger . getDhtPort $ ns)
 | 
					  , IntVal (toInteger . getDhtPort $ ns)
 | 
				
			||||||
  , IntVal (toInteger . getServicePort $ ns)
 | 
					  , IntVal (toInteger . getServicePort $ ns)
 | 
				
			||||||
  , IntVal (toInteger $ getVServerID ns)
 | 
					  , IntVal (getVServerID ns)
 | 
				
			||||||
  , End Sequence
 | 
					  , End Sequence
 | 
				
			||||||
                     ]
 | 
					                     ]
 | 
				
			||||||
 | 
					
 | 
				
			||||||
| 
						 | 
					@ -228,11 +215,10 @@ encodeQueryResult FORWARD{} = Enumerated 1
 | 
				
			||||||
encodeMessage :: FediChordMessage   -- ^ the 'FediChordMessage to be encoded
 | 
					encodeMessage :: FediChordMessage   -- ^ the 'FediChordMessage to be encoded
 | 
				
			||||||
              -> [ASN1]
 | 
					              -> [ASN1]
 | 
				
			||||||
encodeMessage
 | 
					encodeMessage
 | 
				
			||||||
    (Request requestID receiverID sender part isFinalPart action requestPayload) =
 | 
					    (Request requestID sender part isFinalPart action requestPayload) =
 | 
				
			||||||
        Start Sequence
 | 
					        Start Sequence
 | 
				
			||||||
      : (Enumerated . fromIntegral . fromEnum $ action)
 | 
					      : (Enumerated . fromIntegral . fromEnum $ action)
 | 
				
			||||||
      : IntVal requestID
 | 
					      : IntVal requestID
 | 
				
			||||||
      : (IntVal . getNodeID $ receiverID)
 | 
					 | 
				
			||||||
      : encodeNodeState sender
 | 
					      : encodeNodeState sender
 | 
				
			||||||
      <> [IntVal part
 | 
					      <> [IntVal part
 | 
				
			||||||
        , Boolean isFinalPart]
 | 
					        , Boolean isFinalPart]
 | 
				
			||||||
| 
						 | 
					@ -276,20 +262,18 @@ parseMessage = do
 | 
				
			||||||
parseRequest :: Action -> ParseASN1 FediChordMessage
 | 
					parseRequest :: Action -> ParseASN1 FediChordMessage
 | 
				
			||||||
parseRequest action = do
 | 
					parseRequest action = do
 | 
				
			||||||
    requestID <- parseInteger
 | 
					    requestID <- parseInteger
 | 
				
			||||||
    receiverID' <- fromInteger <$> parseInteger
 | 
					 | 
				
			||||||
    sender <- parseNodeState
 | 
					    sender <- parseNodeState
 | 
				
			||||||
    part <- parseInteger
 | 
					    part <- parseInteger
 | 
				
			||||||
    isFinalPart <- parseBool
 | 
					    isFinalPart <- parseBool
 | 
				
			||||||
    hasPayload <- hasNext
 | 
					    hasPayload <- hasNext
 | 
				
			||||||
    payload <- if not hasPayload then pure Nothing else Just <$> case action of
 | 
					    payload <- if not hasPayload then pure Nothing else Just <$> case action of
 | 
				
			||||||
                 QueryID   -> parseQueryIDRequestPayload
 | 
					                 QueryID   -> parseQueryIDRequest
 | 
				
			||||||
                 Join      -> parseJoinRequestPayload
 | 
					                 Join      -> parseJoinRequest
 | 
				
			||||||
                 Leave     -> parseLeaveRequestPayload
 | 
					                 Leave     -> parseLeaveRequest
 | 
				
			||||||
                 Stabilise -> parseStabiliseRequestPayload
 | 
					                 Stabilise -> parseStabiliseRequest
 | 
				
			||||||
                 Ping      -> parsePingRequestPayload
 | 
					                 Ping      -> parsePingRequest
 | 
				
			||||||
                 QueryLoad -> parseLoadRequestPayload
 | 
					 | 
				
			||||||
 | 
					
 | 
				
			||||||
    pure $ Request requestID receiverID' sender part isFinalPart action payload
 | 
					    pure $ Request requestID sender part isFinalPart action payload
 | 
				
			||||||
 | 
					
 | 
				
			||||||
parseResponse :: Integer -> ParseASN1 FediChordMessage
 | 
					parseResponse :: Integer -> ParseASN1 FediChordMessage
 | 
				
			||||||
parseResponse requestID = do
 | 
					parseResponse requestID = do
 | 
				
			||||||
| 
						 | 
					@ -299,12 +283,11 @@ parseResponse requestID = do
 | 
				
			||||||
    action <- parseEnum :: ParseASN1 Action
 | 
					    action <- parseEnum :: ParseASN1 Action
 | 
				
			||||||
    hasPayload <- hasNext
 | 
					    hasPayload <- hasNext
 | 
				
			||||||
    payload <- if not hasPayload then pure Nothing else Just <$> case action of
 | 
					    payload <- if not hasPayload then pure Nothing else Just <$> case action of
 | 
				
			||||||
                 QueryID   -> parseQueryIDResponsePayload
 | 
					                 QueryID   -> parseQueryIDResponse
 | 
				
			||||||
                 Join      -> parseJoinResponsePayload
 | 
					                 Join      -> parseJoinResponse
 | 
				
			||||||
                 Leave     -> parseLeaveResponsePayload
 | 
					                 Leave     -> parseLeaveResponse
 | 
				
			||||||
                 Stabilise -> parseStabiliseResponsePayload
 | 
					                 Stabilise -> parseStabiliseResponse
 | 
				
			||||||
                 Ping      -> parsePingResponsePayload
 | 
					                 Ping      -> parsePingResponse
 | 
				
			||||||
                 QueryLoad -> parseLoadResponsePayload
 | 
					 | 
				
			||||||
 | 
					
 | 
				
			||||||
    pure $ Response requestID senderID part isFinalPart action payload
 | 
					    pure $ Response requestID senderID part isFinalPart action payload
 | 
				
			||||||
 | 
					
 | 
				
			||||||
| 
						 | 
					@ -322,13 +305,6 @@ parseInteger = do
 | 
				
			||||||
        IntVal parsed -> pure parsed
 | 
					        IntVal parsed -> pure parsed
 | 
				
			||||||
        x -> throwParseError $ "Expected IntVal but got " <> show x
 | 
					        x -> throwParseError $ "Expected IntVal but got " <> show x
 | 
				
			||||||
 | 
					
 | 
				
			||||||
parseReal :: ParseASN1 Double
 | 
					 | 
				
			||||||
parseReal = do
 | 
					 | 
				
			||||||
    i <- getNext
 | 
					 | 
				
			||||||
    case i of
 | 
					 | 
				
			||||||
      Real parsed -> pure parsed
 | 
					 | 
				
			||||||
      x           -> throwParseError $ "Expected Real but got " <> show x
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
parseEnum :: Enum a => ParseASN1 a
 | 
					parseEnum :: Enum a => ParseASN1 a
 | 
				
			||||||
parseEnum = do
 | 
					parseEnum = do
 | 
				
			||||||
    e <- getNext
 | 
					    e <- getNext
 | 
				
			||||||
| 
						 | 
					@ -370,7 +346,7 @@ parseNodeState = onNextContainer Sequence $ do
 | 
				
			||||||
      , domain = domain'
 | 
					      , domain = domain'
 | 
				
			||||||
      , dhtPort = dhtPort'
 | 
					      , dhtPort = dhtPort'
 | 
				
			||||||
      , servicePort = servicePort'
 | 
					      , servicePort = servicePort'
 | 
				
			||||||
      , vServerID = fromInteger vServer'
 | 
					      , vServerID = vServer'
 | 
				
			||||||
      , ipAddr = ip'
 | 
					      , ipAddr = ip'
 | 
				
			||||||
                     }
 | 
					                     }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
| 
						 | 
					@ -384,13 +360,13 @@ parseCacheEntry = onNextContainer Sequence $ do
 | 
				
			||||||
parseNodeCache :: ParseASN1 [RemoteCacheEntry]
 | 
					parseNodeCache :: ParseASN1 [RemoteCacheEntry]
 | 
				
			||||||
parseNodeCache = onNextContainer Sequence $ getMany parseCacheEntry
 | 
					parseNodeCache = onNextContainer Sequence $ getMany parseCacheEntry
 | 
				
			||||||
 | 
					
 | 
				
			||||||
parseJoinRequestPayload :: ParseASN1 ActionPayload
 | 
					parseJoinRequest :: ParseASN1 ActionPayload
 | 
				
			||||||
parseJoinRequestPayload = do
 | 
					parseJoinRequest = do
 | 
				
			||||||
    parseNull
 | 
					    parseNull
 | 
				
			||||||
    pure JoinRequestPayload
 | 
					    pure JoinRequestPayload
 | 
				
			||||||
 | 
					
 | 
				
			||||||
parseJoinResponsePayload :: ParseASN1 ActionPayload
 | 
					parseJoinResponse :: ParseASN1 ActionPayload
 | 
				
			||||||
parseJoinResponsePayload = onNextContainer Sequence $ do
 | 
					parseJoinResponse = onNextContainer Sequence $ do
 | 
				
			||||||
    succ' <- onNextContainer Sequence (getMany parseNodeState)
 | 
					    succ' <- onNextContainer Sequence (getMany parseNodeState)
 | 
				
			||||||
    pred' <- onNextContainer Sequence (getMany parseNodeState)
 | 
					    pred' <- onNextContainer Sequence (getMany parseNodeState)
 | 
				
			||||||
    cache <- parseNodeCache
 | 
					    cache <- parseNodeCache
 | 
				
			||||||
| 
						 | 
					@ -400,8 +376,8 @@ parseJoinResponsePayload = onNextContainer Sequence $ do
 | 
				
			||||||
      , joinCache = cache
 | 
					      , joinCache = cache
 | 
				
			||||||
                                 }
 | 
					                                 }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
parseQueryIDRequestPayload :: ParseASN1 ActionPayload
 | 
					parseQueryIDRequest :: ParseASN1 ActionPayload
 | 
				
			||||||
parseQueryIDRequestPayload = onNextContainer Sequence $ do
 | 
					parseQueryIDRequest = onNextContainer Sequence $ do
 | 
				
			||||||
    targetID <- fromInteger <$> parseInteger
 | 
					    targetID <- fromInteger <$> parseInteger
 | 
				
			||||||
    lBestNodes <- parseInteger
 | 
					    lBestNodes <- parseInteger
 | 
				
			||||||
    pure $ QueryIDRequestPayload {
 | 
					    pure $ QueryIDRequestPayload {
 | 
				
			||||||
| 
						 | 
					@ -409,8 +385,8 @@ parseQueryIDRequestPayload = onNextContainer Sequence $ do
 | 
				
			||||||
      , queryLBestNodes = lBestNodes
 | 
					      , queryLBestNodes = lBestNodes
 | 
				
			||||||
                                   }
 | 
					                                   }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
parseQueryIDResponsePayload :: ParseASN1 ActionPayload
 | 
					parseQueryIDResponse :: ParseASN1 ActionPayload
 | 
				
			||||||
parseQueryIDResponsePayload = onNextContainer Sequence $ do
 | 
					parseQueryIDResponse = onNextContainer Sequence $ do
 | 
				
			||||||
    Enumerated resultType <- getNext
 | 
					    Enumerated resultType <- getNext
 | 
				
			||||||
    result <- case resultType of
 | 
					    result <- case resultType of
 | 
				
			||||||
                  0 -> FOUND <$> parseNodeState
 | 
					                  0 -> FOUND <$> parseNodeState
 | 
				
			||||||
| 
						 | 
					@ -420,13 +396,13 @@ parseQueryIDResponsePayload = onNextContainer Sequence $ do
 | 
				
			||||||
        queryResult = result
 | 
					        queryResult = result
 | 
				
			||||||
                           }
 | 
					                           }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
parseStabiliseRequestPayload :: ParseASN1 ActionPayload
 | 
					parseStabiliseRequest :: ParseASN1 ActionPayload
 | 
				
			||||||
parseStabiliseRequestPayload = do
 | 
					parseStabiliseRequest = do
 | 
				
			||||||
    parseNull
 | 
					    parseNull
 | 
				
			||||||
    pure StabiliseRequestPayload
 | 
					    pure StabiliseRequestPayload
 | 
				
			||||||
 | 
					
 | 
				
			||||||
parseStabiliseResponsePayload :: ParseASN1 ActionPayload
 | 
					parseStabiliseResponse :: ParseASN1 ActionPayload
 | 
				
			||||||
parseStabiliseResponsePayload = onNextContainer Sequence $ do
 | 
					parseStabiliseResponse = onNextContainer Sequence $ do
 | 
				
			||||||
    succ' <- onNextContainer Sequence (getMany parseNodeState)
 | 
					    succ' <- onNextContainer Sequence (getMany parseNodeState)
 | 
				
			||||||
    pred' <- onNextContainer Sequence (getMany parseNodeState)
 | 
					    pred' <- onNextContainer Sequence (getMany parseNodeState)
 | 
				
			||||||
    pure $ StabiliseResponsePayload {
 | 
					    pure $ StabiliseResponsePayload {
 | 
				
			||||||
| 
						 | 
					@ -434,8 +410,8 @@ parseStabiliseResponsePayload = onNextContainer Sequence $ do
 | 
				
			||||||
      , stabilisePredecessors = pred'
 | 
					      , stabilisePredecessors = pred'
 | 
				
			||||||
                                      }
 | 
					                                      }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
parseLeaveRequestPayload :: ParseASN1 ActionPayload
 | 
					parseLeaveRequest :: ParseASN1 ActionPayload
 | 
				
			||||||
parseLeaveRequestPayload = onNextContainer Sequence $ do
 | 
					parseLeaveRequest = onNextContainer Sequence $ do
 | 
				
			||||||
    succ' <- onNextContainer Sequence (getMany parseNodeState)
 | 
					    succ' <- onNextContainer Sequence (getMany parseNodeState)
 | 
				
			||||||
    pred' <- onNextContainer Sequence (getMany parseNodeState)
 | 
					    pred' <- onNextContainer Sequence (getMany parseNodeState)
 | 
				
			||||||
    doMigration <- parseBool
 | 
					    doMigration <- parseBool
 | 
				
			||||||
| 
						 | 
					@ -445,40 +421,19 @@ parseLeaveRequestPayload = onNextContainer Sequence $ do
 | 
				
			||||||
      , leaveDoMigration = doMigration
 | 
					      , leaveDoMigration = doMigration
 | 
				
			||||||
                                      }
 | 
					                                      }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
parseLeaveResponsePayload :: ParseASN1 ActionPayload
 | 
					parseLeaveResponse :: ParseASN1 ActionPayload
 | 
				
			||||||
parseLeaveResponsePayload = do
 | 
					parseLeaveResponse = do
 | 
				
			||||||
    parseNull
 | 
					    parseNull
 | 
				
			||||||
    pure LeaveResponsePayload
 | 
					    pure LeaveResponsePayload
 | 
				
			||||||
 | 
					
 | 
				
			||||||
parsePingRequestPayload :: ParseASN1 ActionPayload
 | 
					parsePingRequest :: ParseASN1 ActionPayload
 | 
				
			||||||
parsePingRequestPayload = do
 | 
					parsePingRequest = do
 | 
				
			||||||
    parseNull
 | 
					    parseNull
 | 
				
			||||||
    pure PingRequestPayload
 | 
					    pure PingRequestPayload
 | 
				
			||||||
 | 
					
 | 
				
			||||||
parsePingResponsePayload :: ParseASN1 ActionPayload
 | 
					parsePingResponse :: ParseASN1 ActionPayload
 | 
				
			||||||
parsePingResponsePayload = onNextContainer Sequence $ do
 | 
					parsePingResponse = onNextContainer Sequence $ do
 | 
				
			||||||
    handledNodes <- getMany parseNodeState
 | 
					    handledNodes <- getMany parseNodeState
 | 
				
			||||||
    pure $ PingResponsePayload {
 | 
					    pure $ PingResponsePayload {
 | 
				
			||||||
        pingNodeStates = handledNodes
 | 
					        pingNodeStates = handledNodes
 | 
				
			||||||
                                 }
 | 
					                                 }
 | 
				
			||||||
 | 
					 | 
				
			||||||
parseLoadRequestPayload :: ParseASN1 ActionPayload
 | 
					 | 
				
			||||||
parseLoadRequestPayload = onNextContainer Sequence $ do
 | 
					 | 
				
			||||||
    loadUpperBound' <- fromInteger <$> parseInteger
 | 
					 | 
				
			||||||
    pure LoadRequestPayload
 | 
					 | 
				
			||||||
        { loadSegmentUpperBound = loadUpperBound'
 | 
					 | 
				
			||||||
        }
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
parseLoadResponsePayload :: ParseASN1 ActionPayload
 | 
					 | 
				
			||||||
parseLoadResponsePayload = onNextContainer Sequence $ do
 | 
					 | 
				
			||||||
    loadSum' <- parseReal
 | 
					 | 
				
			||||||
    loadRemainingTarget' <- parseReal
 | 
					 | 
				
			||||||
    loadTotalCapacity' <- parseReal
 | 
					 | 
				
			||||||
    loadSegmentLowerBound' <- fromInteger <$> parseInteger
 | 
					 | 
				
			||||||
    pure LoadResponsePayload
 | 
					 | 
				
			||||||
        { loadSum = loadSum'
 | 
					 | 
				
			||||||
        , loadRemainingTarget = loadRemainingTarget'
 | 
					 | 
				
			||||||
        , loadTotalCapacity = loadTotalCapacity'
 | 
					 | 
				
			||||||
        , loadSegmentLowerBound = loadSegmentLowerBound'
 | 
					 | 
				
			||||||
        }
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
| 
						 | 
					@ -15,7 +15,6 @@ module Hash2Pub.DHTProtocol
 | 
				
			||||||
    , Action(..)
 | 
					    , Action(..)
 | 
				
			||||||
    , ActionPayload(..)
 | 
					    , ActionPayload(..)
 | 
				
			||||||
    , FediChordMessage(..)
 | 
					    , FediChordMessage(..)
 | 
				
			||||||
    , mkRequest
 | 
					 | 
				
			||||||
    , maximumParts
 | 
					    , maximumParts
 | 
				
			||||||
    , sendQueryIdMessages
 | 
					    , sendQueryIdMessages
 | 
				
			||||||
    , requestQueryID
 | 
					    , requestQueryID
 | 
				
			||||||
| 
						 | 
					@ -23,7 +22,6 @@ module Hash2Pub.DHTProtocol
 | 
				
			||||||
    , requestLeave
 | 
					    , requestLeave
 | 
				
			||||||
    , requestPing
 | 
					    , requestPing
 | 
				
			||||||
    , requestStabilise
 | 
					    , requestStabilise
 | 
				
			||||||
    , requestQueryLoad
 | 
					 | 
				
			||||||
    , lookupMessage
 | 
					    , lookupMessage
 | 
				
			||||||
    , sendRequestTo
 | 
					    , sendRequestTo
 | 
				
			||||||
    , queryIdLookupLoop
 | 
					    , queryIdLookupLoop
 | 
				
			||||||
| 
						 | 
					@ -38,7 +36,7 @@ module Hash2Pub.DHTProtocol
 | 
				
			||||||
    , isPossibleSuccessor
 | 
					    , isPossibleSuccessor
 | 
				
			||||||
    , isPossiblePredecessor
 | 
					    , isPossiblePredecessor
 | 
				
			||||||
    , isInOwnResponsibilitySlice
 | 
					    , isInOwnResponsibilitySlice
 | 
				
			||||||
    , vsIsJoined
 | 
					    , isJoined
 | 
				
			||||||
    , closestCachePredecessors
 | 
					    , closestCachePredecessors
 | 
				
			||||||
    )
 | 
					    )
 | 
				
			||||||
        where
 | 
					        where
 | 
				
			||||||
| 
						 | 
					@ -51,8 +49,7 @@ import           Control.Concurrent.STM.TQueue
 | 
				
			||||||
import           Control.Concurrent.STM.TVar
 | 
					import           Control.Concurrent.STM.TVar
 | 
				
			||||||
import           Control.Exception
 | 
					import           Control.Exception
 | 
				
			||||||
import           Control.Monad                  (foldM, forM, forM_, void, when)
 | 
					import           Control.Monad                  (foldM, forM, forM_, void, when)
 | 
				
			||||||
import           Control.Monad.Except           (MonadError (..), liftEither,
 | 
					import           Control.Monad.Except           (MonadError (..), runExceptT)
 | 
				
			||||||
                                                 runExceptT)
 | 
					 | 
				
			||||||
import           Control.Monad.IO.Class         (MonadIO (..))
 | 
					import           Control.Monad.IO.Class         (MonadIO (..))
 | 
				
			||||||
import qualified Data.ByteString                as BS
 | 
					import qualified Data.ByteString                as BS
 | 
				
			||||||
import           Data.Either                    (rights)
 | 
					import           Data.Either                    (rights)
 | 
				
			||||||
| 
						 | 
					@ -66,7 +63,6 @@ import           Data.Maybe                     (fromJust, fromMaybe, isJust,
 | 
				
			||||||
                                                 isNothing, mapMaybe, maybe)
 | 
					                                                 isNothing, mapMaybe, maybe)
 | 
				
			||||||
import qualified Data.Set                       as Set
 | 
					import qualified Data.Set                       as Set
 | 
				
			||||||
import           Data.Time.Clock.POSIX
 | 
					import           Data.Time.Clock.POSIX
 | 
				
			||||||
import           Data.Word                      (Word8)
 | 
					 | 
				
			||||||
import           Network.Socket                 hiding (recv, recvFrom, send,
 | 
					import           Network.Socket                 hiding (recv, recvFrom, send,
 | 
				
			||||||
                                                 sendTo)
 | 
					                                                 sendTo)
 | 
				
			||||||
import           Network.Socket.ByteString
 | 
					import           Network.Socket.ByteString
 | 
				
			||||||
| 
						 | 
					@ -78,27 +74,23 @@ import           Hash2Pub.ASN1Coding
 | 
				
			||||||
import           Hash2Pub.FediChordTypes        (CacheEntry (..),
 | 
					import           Hash2Pub.FediChordTypes        (CacheEntry (..),
 | 
				
			||||||
                                                 CacheEntry (..),
 | 
					                                                 CacheEntry (..),
 | 
				
			||||||
                                                 FediChordConf (..),
 | 
					                                                 FediChordConf (..),
 | 
				
			||||||
                                                 HasKeyID (..), LoadStats (..),
 | 
					                                                 HasKeyID (..),
 | 
				
			||||||
                                                 LocalNodeState (..),
 | 
					                                                 LocalNodeState (..),
 | 
				
			||||||
                                                 LocalNodeStateSTM, NodeCache,
 | 
					                                                 LocalNodeStateSTM, NodeCache,
 | 
				
			||||||
                                                 NodeID, NodeState (..),
 | 
					                                                 NodeID, NodeState (..),
 | 
				
			||||||
                                                 RealNode (..), RealNodeSTM,
 | 
					                                                 RealNode (..), RealNodeSTM,
 | 
				
			||||||
                                                 RemoteNodeState (..),
 | 
					                                                 RemoteNodeState (..),
 | 
				
			||||||
                                                 RingEntry (..), RingMap (..),
 | 
					                                                 RingEntry (..), RingMap (..),
 | 
				
			||||||
                                                 SegmentLoadStats (..),
 | 
					 | 
				
			||||||
                                                 Service (..), addRMapEntry,
 | 
					                                                 Service (..), addRMapEntry,
 | 
				
			||||||
                                                 addRMapEntryWith,
 | 
					                                                 addRMapEntryWith,
 | 
				
			||||||
                                                 cacheGetNodeStateUnvalidated,
 | 
					                                                 cacheGetNodeStateUnvalidated,
 | 
				
			||||||
                                                 cacheLookup, cacheLookupPred,
 | 
					                                                 cacheLookup, cacheLookupPred,
 | 
				
			||||||
                                                 cacheLookupSucc, genNodeID,
 | 
					                                                 cacheLookupSucc, genNodeID,
 | 
				
			||||||
                                                 getKeyID, hasValidNodeId,
 | 
					                                                 getKeyID, localCompare,
 | 
				
			||||||
                                                 loadSliceSum, localCompare,
 | 
					 | 
				
			||||||
                                                 rMapFromList, rMapLookupPred,
 | 
					                                                 rMapFromList, rMapLookupPred,
 | 
				
			||||||
                                                 rMapLookupSucc,
 | 
					                                                 rMapLookupSucc,
 | 
				
			||||||
                                                 remainingLoadTarget,
 | 
					 | 
				
			||||||
                                                 setPredecessors, setSuccessors)
 | 
					                                                 setPredecessors, setSuccessors)
 | 
				
			||||||
import           Hash2Pub.ProtocolTypes
 | 
					import           Hash2Pub.ProtocolTypes
 | 
				
			||||||
import           Hash2Pub.RingMap
 | 
					 | 
				
			||||||
 | 
					
 | 
				
			||||||
import           Debug.Trace                    (trace)
 | 
					import           Debug.Trace                    (trace)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
| 
						 | 
					@ -111,7 +103,7 @@ queryLocalCache ownState nCache lBestNodes targetID
 | 
				
			||||||
    -- as target ID falls between own ID and first predecessor, it is handled by this node
 | 
					    -- as target ID falls between own ID and first predecessor, it is handled by this node
 | 
				
			||||||
    -- This only makes sense if the node is part of the DHT by having joined.
 | 
					    -- This only makes sense if the node is part of the DHT by having joined.
 | 
				
			||||||
    -- A default answer to nodes querying an unjoined node is provided by 'respondQueryID'.
 | 
					    -- A default answer to nodes querying an unjoined node is provided by 'respondQueryID'.
 | 
				
			||||||
      | vsIsJoined ownState && targetID `isInOwnResponsibilitySlice` ownState = FOUND . toRemoteNodeState $ ownState
 | 
					      | isJoined ownState && targetID `isInOwnResponsibilitySlice` ownState = FOUND . toRemoteNodeState $ ownState
 | 
				
			||||||
    -- my interpretation: the "l best next hops" are the l-1 closest preceding nodes and
 | 
					    -- my interpretation: the "l best next hops" are the l-1 closest preceding nodes and
 | 
				
			||||||
    -- the closest succeeding node (like with the p initiated parallel queries
 | 
					    -- the closest succeeding node (like with the p initiated parallel queries
 | 
				
			||||||
      | otherwise = FORWARD $ closestSuccessor `Set.union` closestCachePredecessors (lBestNodes-1) targetID nCache
 | 
					      | otherwise = FORWARD $ closestSuccessor `Set.union` closestCachePredecessors (lBestNodes-1) targetID nCache
 | 
				
			||||||
| 
						 | 
					@ -235,8 +227,8 @@ markCacheEntryAsVerified timestamp nid = RingMap . Map.adjust adjustFunc nid . g
 | 
				
			||||||
 | 
					
 | 
				
			||||||
-- | uses the successor and predecessor list of a node as an indicator for whether a
 | 
					-- | uses the successor and predecessor list of a node as an indicator for whether a
 | 
				
			||||||
-- node has properly joined the DHT
 | 
					-- node has properly joined the DHT
 | 
				
			||||||
vsIsJoined :: LocalNodeState s -> Bool
 | 
					isJoined :: LocalNodeState s -> Bool
 | 
				
			||||||
vsIsJoined ns = not . all null $ [successors ns, predecessors ns]
 | 
					isJoined ns = not . all null $ [successors ns, predecessors ns]
 | 
				
			||||||
 | 
					
 | 
				
			||||||
-- | the size limit to be used when serialising messages for sending
 | 
					-- | the size limit to be used when serialising messages for sending
 | 
				
			||||||
sendMessageSize :: Num i => i
 | 
					sendMessageSize :: Num i => i
 | 
				
			||||||
| 
						 | 
					@ -245,37 +237,27 @@ sendMessageSize = 1200
 | 
				
			||||||
-- ====== message send and receive operations ======
 | 
					-- ====== message send and receive operations ======
 | 
				
			||||||
 | 
					
 | 
				
			||||||
-- encode the response to a request that just signals successful receipt
 | 
					-- encode the response to a request that just signals successful receipt
 | 
				
			||||||
ackRequest :: FediChordMessage -> Map.Map Integer BS.ByteString
 | 
					ackRequest :: NodeID -> FediChordMessage -> Map.Map Integer BS.ByteString
 | 
				
			||||||
ackRequest req@Request{} = serialiseMessage sendMessageSize $ Response {
 | 
					ackRequest ownID req@Request{} = serialiseMessage sendMessageSize $ Response {
 | 
				
			||||||
    requestID = requestID req
 | 
					    requestID = requestID req
 | 
				
			||||||
  , senderID = receiverID req
 | 
					  , senderID = ownID
 | 
				
			||||||
  , part = part req
 | 
					  , part = part req
 | 
				
			||||||
  , isFinalPart = False
 | 
					  , isFinalPart = False
 | 
				
			||||||
  , action = action req
 | 
					  , action = action req
 | 
				
			||||||
  , payload = Nothing
 | 
					  , payload = Nothing
 | 
				
			||||||
                                                                             }
 | 
					                                                                             }
 | 
				
			||||||
ackRequest _ = Map.empty
 | 
					ackRequest _ _ = Map.empty
 | 
				
			||||||
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
-- | extract the first payload from a received message set
 | 
					 | 
				
			||||||
extractFirstPayload :: Set.Set FediChordMessage -> Maybe ActionPayload
 | 
					 | 
				
			||||||
extractFirstPayload msgSet = foldr' (\msg plAcc ->
 | 
					 | 
				
			||||||
    if isNothing plAcc && isJust (payload msg)
 | 
					 | 
				
			||||||
       then payload msg
 | 
					 | 
				
			||||||
       else plAcc
 | 
					 | 
				
			||||||
                                 ) Nothing msgSet
 | 
					 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
-- | Dispatch incoming requests to the dedicated handling and response function, and enqueue
 | 
					-- | Dispatch incoming requests to the dedicated handling and response function, and enqueue
 | 
				
			||||||
-- the response to be sent.
 | 
					-- the response to be sent.
 | 
				
			||||||
handleIncomingRequest :: Service s (RealNodeSTM s)
 | 
					handleIncomingRequest :: Service s (RealNodeSTM s)
 | 
				
			||||||
                      => Word8                              -- ^ maximum number of vservers, because of decision to @dropSpoofedIDs@ in here and not already in @fediMessageHandler@
 | 
					                      => LocalNodeStateSTM s                     -- ^ the handling node
 | 
				
			||||||
                      -> LocalNodeStateSTM s                     -- ^ the handling node
 | 
					 | 
				
			||||||
                      -> TQueue (BS.ByteString, SockAddr)   -- ^ send queue
 | 
					                      -> TQueue (BS.ByteString, SockAddr)   -- ^ send queue
 | 
				
			||||||
                      -> Set.Set FediChordMessage           -- ^ all parts of the request to handle
 | 
					                      -> Set.Set FediChordMessage           -- ^ all parts of the request to handle
 | 
				
			||||||
                      -> SockAddr                           -- ^ source address of the request
 | 
					                      -> SockAddr                           -- ^ source address of the request
 | 
				
			||||||
                      -> IO ()
 | 
					                      -> IO ()
 | 
				
			||||||
handleIncomingRequest vsLimit nsSTM sendQ msgSet sourceAddr = do
 | 
					handleIncomingRequest nsSTM sendQ msgSet sourceAddr = do
 | 
				
			||||||
    ns <- readTVarIO nsSTM
 | 
					    ns <- readTVarIO nsSTM
 | 
				
			||||||
    -- add nodestate to cache
 | 
					    -- add nodestate to cache
 | 
				
			||||||
    now <- getPOSIXTime
 | 
					    now <- getPOSIXTime
 | 
				
			||||||
| 
						 | 
					@ -283,20 +265,19 @@ handleIncomingRequest vsLimit nsSTM sendQ msgSet sourceAddr = do
 | 
				
			||||||
      Nothing -> pure ()
 | 
					      Nothing -> pure ()
 | 
				
			||||||
      Just aPart -> do
 | 
					      Just aPart -> do
 | 
				
			||||||
        let (SockAddrInet6 _ _ sourceIP _) = sourceAddr
 | 
					        let (SockAddrInet6 _ _ sourceIP _) = sourceAddr
 | 
				
			||||||
        queueAddEntries (Identity $ RemoteCacheEntry (sender aPart) now) (cacheWriteQueue ns)
 | 
					        queueAddEntries (Identity $ RemoteCacheEntry (sender aPart) now) ns
 | 
				
			||||||
        -- distinguish on whether and how to respond. If responding, pass message to response generating function and write responses to send queue
 | 
					        -- distinguish on whether and how to respond. If responding, pass message to response generating function and write responses to send queue
 | 
				
			||||||
        maybe (pure ()) (
 | 
					        maybe (pure ()) (
 | 
				
			||||||
            mapM_ (\resp -> atomically $ writeTQueue sendQ (resp, sourceAddr))
 | 
					            mapM_ (\resp -> atomically $ writeTQueue sendQ (resp, sourceAddr))
 | 
				
			||||||
                        )
 | 
					                        )
 | 
				
			||||||
            =<< (case action aPart of
 | 
					            =<< (case action aPart of
 | 
				
			||||||
                Ping -> Just <$> respondPing nsSTM msgSet
 | 
					                Ping -> Just <$> respondPing nsSTM msgSet
 | 
				
			||||||
                Join -> dropSpoofedIDs vsLimit sourceIP nsSTM msgSet respondJoin
 | 
					                Join -> dropSpoofedIDs sourceIP nsSTM msgSet respondJoin
 | 
				
			||||||
                -- ToDo: figure out what happens if not joined
 | 
					                -- ToDo: figure out what happens if not joined
 | 
				
			||||||
                QueryID -> Just <$> respondQueryID nsSTM msgSet
 | 
					                QueryID -> Just <$> respondQueryID nsSTM msgSet
 | 
				
			||||||
                -- only when joined
 | 
					                -- only when joined
 | 
				
			||||||
                Leave -> if vsIsJoined ns then dropSpoofedIDs vsLimit sourceIP nsSTM msgSet respondLeave else pure Nothing
 | 
					                Leave -> if isJoined ns then dropSpoofedIDs sourceIP nsSTM msgSet respondLeave else pure Nothing
 | 
				
			||||||
                Stabilise -> if vsIsJoined ns then dropSpoofedIDs vsLimit sourceIP nsSTM msgSet respondStabilise else pure Nothing
 | 
					                Stabilise -> if isJoined ns then dropSpoofedIDs sourceIP nsSTM msgSet respondStabilise else pure Nothing
 | 
				
			||||||
                QueryLoad -> if vsIsJoined ns then Just <$> respondQueryLoad nsSTM msgSet else pure Nothing
 | 
					 | 
				
			||||||
            )
 | 
					            )
 | 
				
			||||||
      -- for single part request, response starts with part number 1. For multipart requests, response starts with part number n+1.
 | 
					      -- for single part request, response starts with part number 1. For multipart requests, response starts with part number n+1.
 | 
				
			||||||
 | 
					
 | 
				
			||||||
| 
						 | 
					@ -306,18 +287,19 @@ handleIncomingRequest vsLimit nsSTM sendQ msgSet sourceAddr = do
 | 
				
			||||||
      -- | Filter out requests with spoofed node IDs by recomputing the ID using
 | 
					      -- | Filter out requests with spoofed node IDs by recomputing the ID using
 | 
				
			||||||
      -- the sender IP.
 | 
					      -- the sender IP.
 | 
				
			||||||
      -- For valid (non-spoofed) sender IDs, the passed responder function is invoked.
 | 
					      -- For valid (non-spoofed) sender IDs, the passed responder function is invoked.
 | 
				
			||||||
      dropSpoofedIDs :: Word8       -- ^ maximum number of vservers per node
 | 
					      dropSpoofedIDs :: HostAddress6        -- msg source address
 | 
				
			||||||
                     -> HostAddress6        -- ^ msg source address
 | 
					 | 
				
			||||||
                     -> LocalNodeStateSTM s
 | 
					                     -> LocalNodeStateSTM s
 | 
				
			||||||
                     -> Set.Set FediChordMessage    -- ^ message parts of the request
 | 
					                     -> Set.Set FediChordMessage    -- message parts of the request
 | 
				
			||||||
                     -> (LocalNodeStateSTM s -> Set.Set FediChordMessage -> IO (Map.Map Integer BS.ByteString))     -- ^ reponder function to be invoked for valid requests
 | 
					                     -> (LocalNodeStateSTM s -> Set.Set FediChordMessage -> IO (Map.Map Integer BS.ByteString))     -- reponder function to be invoked for valid requests
 | 
				
			||||||
                     -> IO (Maybe (Map.Map Integer BS.ByteString))
 | 
					                     -> IO (Maybe (Map.Map Integer BS.ByteString))
 | 
				
			||||||
      dropSpoofedIDs limVs addr nsSTM' msgSet' responder =
 | 
					      dropSpoofedIDs addr nsSTM' msgSet' responder =
 | 
				
			||||||
          let
 | 
					          let
 | 
				
			||||||
            aRequestPart = Set.elemAt 0 msgSet
 | 
					            aRequestPart = Set.elemAt 0 msgSet
 | 
				
			||||||
            senderNs = sender aRequestPart
 | 
					            senderNs = sender aRequestPart
 | 
				
			||||||
 | 
					            givenSenderID = getNid senderNs
 | 
				
			||||||
 | 
					            recomputedID = genNodeID addr (getDomain senderNs) (fromInteger $ getVServerID senderNs)
 | 
				
			||||||
          in
 | 
					          in
 | 
				
			||||||
          if hasValidNodeId limVs senderNs addr
 | 
					          if recomputedID == givenSenderID
 | 
				
			||||||
             then Just <$> responder nsSTM' msgSet'
 | 
					             then Just <$> responder nsSTM' msgSet'
 | 
				
			||||||
             else pure Nothing
 | 
					             else pure Nothing
 | 
				
			||||||
 | 
					
 | 
				
			||||||
| 
						 | 
					@ -335,15 +317,19 @@ respondQueryID nsSTM msgSet = do
 | 
				
			||||||
    let
 | 
					    let
 | 
				
			||||||
        aRequestPart = Set.elemAt 0 msgSet
 | 
					        aRequestPart = Set.elemAt 0 msgSet
 | 
				
			||||||
        senderID = getNid . sender $ aRequestPart
 | 
					        senderID = getNid . sender $ aRequestPart
 | 
				
			||||||
        senderPayload = extractFirstPayload msgSet
 | 
					        senderPayload = foldr' (\msg plAcc ->
 | 
				
			||||||
        -- return only empty message serialisation if no payload was included in parts
 | 
					            if isNothing plAcc && isJust (payload msg)
 | 
				
			||||||
 | 
					               then payload msg
 | 
				
			||||||
 | 
					               else plAcc
 | 
				
			||||||
 | 
					                               ) Nothing msgSet
 | 
				
			||||||
 | 
					    -- return only empty message serialisation if no payload was included in parts
 | 
				
			||||||
    maybe (pure Map.empty) (\senderPayload' -> do
 | 
					    maybe (pure Map.empty) (\senderPayload' -> do
 | 
				
			||||||
        responseMsg <- atomically $ do
 | 
					        responseMsg <- atomically $ do
 | 
				
			||||||
            nsSnap <- readTVar nsSTM
 | 
					            nsSnap <- readTVar nsSTM
 | 
				
			||||||
            cache <- readTVar $ nodeCacheSTM nsSnap
 | 
					            cache <- readTVar $ nodeCacheSTM nsSnap
 | 
				
			||||||
            let
 | 
					            let
 | 
				
			||||||
                responsePayload = QueryIDResponsePayload {
 | 
					                responsePayload = QueryIDResponsePayload {
 | 
				
			||||||
                    queryResult = if vsIsJoined nsSnap
 | 
					                    queryResult = if isJoined nsSnap
 | 
				
			||||||
                                     then queryLocalCache nsSnap cache (fromIntegral $ queryLBestNodes senderPayload') (queryTargetID senderPayload')
 | 
					                                     then queryLocalCache nsSnap cache (fromIntegral $ queryLBestNodes senderPayload') (queryTargetID senderPayload')
 | 
				
			||||||
                                     -- if not joined yet, attract responsibility for
 | 
					                                     -- if not joined yet, attract responsibility for
 | 
				
			||||||
                                     -- all keys to make bootstrapping possible
 | 
					                                     -- all keys to make bootstrapping possible
 | 
				
			||||||
| 
						 | 
					@ -436,47 +422,6 @@ respondPing nsSTM msgSet = do
 | 
				
			||||||
                                }
 | 
					                                }
 | 
				
			||||||
    pure $ serialiseMessage sendMessageSize pingResponse
 | 
					    pure $ serialiseMessage sendMessageSize pingResponse
 | 
				
			||||||
 | 
					
 | 
				
			||||||
respondQueryLoad :: Service s (RealNodeSTM s) => LocalNodeStateSTM s -> Set.Set FediChordMessage -> IO (Map.Map Integer BS.ByteString)
 | 
					 | 
				
			||||||
respondQueryLoad nsSTM msgSet = do
 | 
					 | 
				
			||||||
    nsSnap <- readTVarIO nsSTM
 | 
					 | 
				
			||||||
    -- this message cannot be split reasonably, so just
 | 
					 | 
				
			||||||
    -- consider the first payload
 | 
					 | 
				
			||||||
    let
 | 
					 | 
				
			||||||
        aRequestPart = Set.elemAt 0 msgSet
 | 
					 | 
				
			||||||
        senderPayload = extractFirstPayload msgSet
 | 
					 | 
				
			||||||
    responsePl <- maybe (pure Nothing) (\pl ->
 | 
					 | 
				
			||||||
        case pl of
 | 
					 | 
				
			||||||
          LoadRequestPayload{} -> do
 | 
					 | 
				
			||||||
            parentNode <- readTVarIO (parentRealNode nsSnap)
 | 
					 | 
				
			||||||
            let
 | 
					 | 
				
			||||||
                serv = nodeService parentNode
 | 
					 | 
				
			||||||
                conf = nodeConfig parentNode
 | 
					 | 
				
			||||||
            lStats <- getServiceLoadStats serv
 | 
					 | 
				
			||||||
            let
 | 
					 | 
				
			||||||
                directSucc = getNid . head . predecessors $ nsSnap
 | 
					 | 
				
			||||||
                handledTagSum = loadSliceSum lStats directSucc (loadSegmentUpperBound pl)
 | 
					 | 
				
			||||||
            pure $ Just LoadResponsePayload
 | 
					 | 
				
			||||||
                { loadSum = handledTagSum
 | 
					 | 
				
			||||||
                , loadRemainingTarget = remainingLoadTarget conf lStats
 | 
					 | 
				
			||||||
                , loadTotalCapacity = totalCapacity lStats
 | 
					 | 
				
			||||||
                , loadSegmentLowerBound = directSucc
 | 
					 | 
				
			||||||
                }
 | 
					 | 
				
			||||||
          _ -> pure Nothing
 | 
					 | 
				
			||||||
                           )
 | 
					 | 
				
			||||||
                           senderPayload
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
    pure $ maybe
 | 
					 | 
				
			||||||
        Map.empty
 | 
					 | 
				
			||||||
        (\pl -> serialiseMessage sendMessageSize $ Response
 | 
					 | 
				
			||||||
            { requestID = requestID aRequestPart
 | 
					 | 
				
			||||||
            , senderID = getNid nsSnap
 | 
					 | 
				
			||||||
            , part = if Set.size msgSet == 1 then 1 else fromIntegral $ Set.size msgSet + 1
 | 
					 | 
				
			||||||
            , isFinalPart = False
 | 
					 | 
				
			||||||
            , action = QueryLoad
 | 
					 | 
				
			||||||
            , payload = Just pl
 | 
					 | 
				
			||||||
            }
 | 
					 | 
				
			||||||
        ) responsePl
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
 | 
					
 | 
				
			||||||
respondJoin :: Service s (RealNodeSTM s) => LocalNodeStateSTM s -> Set.Set FediChordMessage -> IO (Map.Map Integer BS.ByteString)
 | 
					respondJoin :: Service s (RealNodeSTM s) => LocalNodeStateSTM s -> Set.Set FediChordMessage -> IO (Map.Map Integer BS.ByteString)
 | 
				
			||||||
respondJoin nsSTM msgSet = do
 | 
					respondJoin nsSTM msgSet = do
 | 
				
			||||||
| 
						 | 
					@ -489,7 +434,7 @@ respondJoin nsSTM msgSet = do
 | 
				
			||||||
            senderNS = sender aRequestPart
 | 
					            senderNS = sender aRequestPart
 | 
				
			||||||
            -- if not joined yet, attract responsibility for
 | 
					            -- if not joined yet, attract responsibility for
 | 
				
			||||||
            -- all keys to make bootstrapping possible
 | 
					            -- all keys to make bootstrapping possible
 | 
				
			||||||
            responsibilityLookup = if vsIsJoined nsSnap then queryLocalCache nsSnap cache 1 (getNid senderNS) else FOUND (toRemoteNodeState nsSnap)
 | 
					            responsibilityLookup = if isJoined nsSnap then queryLocalCache nsSnap cache 1 (getNid senderNS) else FOUND (toRemoteNodeState nsSnap)
 | 
				
			||||||
            thisNodeResponsible (FOUND _)   = True
 | 
					            thisNodeResponsible (FOUND _)   = True
 | 
				
			||||||
            thisNodeResponsible (FORWARD _) = False
 | 
					            thisNodeResponsible (FORWARD _) = False
 | 
				
			||||||
        -- check whether the joining node falls into our responsibility
 | 
					        -- check whether the joining node falls into our responsibility
 | 
				
			||||||
| 
						 | 
					@ -536,21 +481,6 @@ respondJoin nsSTM msgSet = do
 | 
				
			||||||
 | 
					
 | 
				
			||||||
-- ....... request sending .......
 | 
					-- ....... request sending .......
 | 
				
			||||||
 | 
					
 | 
				
			||||||
-- | defautl constructor for request messages, fills standard values like
 | 
					 | 
				
			||||||
-- part number to avoid code repition
 | 
					 | 
				
			||||||
mkRequest :: LocalNodeState s -> NodeID -> Action -> Maybe ActionPayload -> (Integer -> FediChordMessage)
 | 
					 | 
				
			||||||
mkRequest ns targetID action pl rid = Request
 | 
					 | 
				
			||||||
    { requestID = rid
 | 
					 | 
				
			||||||
    , receiverID = targetID
 | 
					 | 
				
			||||||
    , sender = toRemoteNodeState ns
 | 
					 | 
				
			||||||
    -- part number and final flag can be changed by ASN1 encoder to make packet
 | 
					 | 
				
			||||||
    -- fit the MTU restrictions
 | 
					 | 
				
			||||||
    , part = 1
 | 
					 | 
				
			||||||
    , isFinalPart = True
 | 
					 | 
				
			||||||
    , action = action
 | 
					 | 
				
			||||||
    , payload = pl
 | 
					 | 
				
			||||||
    }
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
-- | send a join request and return the joined 'LocalNodeState' including neighbours
 | 
					-- | send a join request and return the joined 'LocalNodeState' including neighbours
 | 
				
			||||||
requestJoin :: (NodeState a, Service s (RealNodeSTM s)) => a             -- ^ currently responsible node to be contacted
 | 
					requestJoin :: (NodeState a, Service s (RealNodeSTM s)) => a             -- ^ currently responsible node to be contacted
 | 
				
			||||||
            -> LocalNodeStateSTM s               -- ^ joining NodeState
 | 
					            -> LocalNodeStateSTM s               -- ^ joining NodeState
 | 
				
			||||||
| 
						 | 
					@ -562,7 +492,7 @@ requestJoin toJoinOn ownStateSTM = do
 | 
				
			||||||
    let srcAddr = confIP nodeConf
 | 
					    let srcAddr = confIP nodeConf
 | 
				
			||||||
    bracket (mkSendSocket srcAddr (getDomain toJoinOn) (getDhtPort toJoinOn)) close (\sock -> do
 | 
					    bracket (mkSendSocket srcAddr (getDomain toJoinOn) (getDhtPort toJoinOn)) close (\sock -> do
 | 
				
			||||||
        -- extract own state for getting request information
 | 
					        -- extract own state for getting request information
 | 
				
			||||||
        responses <- sendRequestTo (confRequestTimeout nodeConf) (confRequestRetries nodeConf) (mkRequest ownState (getNid toJoinOn) Join (Just JoinRequestPayload)) sock
 | 
					        responses <- sendRequestTo (confRequestTimeout nodeConf) (confRequestRetries nodeConf) (\rid -> Request rid (toRemoteNodeState ownState) 1 True Join (Just JoinRequestPayload)) sock
 | 
				
			||||||
        (cacheInsertQ, joinedState) <- atomically $ do
 | 
					        (cacheInsertQ, joinedState) <- atomically $ do
 | 
				
			||||||
            stateSnap <- readTVar ownStateSTM
 | 
					            stateSnap <- readTVar ownStateSTM
 | 
				
			||||||
            let
 | 
					            let
 | 
				
			||||||
| 
						 | 
					@ -593,7 +523,7 @@ requestJoin toJoinOn ownStateSTM = do
 | 
				
			||||||
            writeTVar ownStateSTM newState
 | 
					            writeTVar ownStateSTM newState
 | 
				
			||||||
            pure (cacheInsertQ, newState)
 | 
					            pure (cacheInsertQ, newState)
 | 
				
			||||||
        -- execute the cache insertions
 | 
					        -- execute the cache insertions
 | 
				
			||||||
        mapM_ (\f -> f (cacheWriteQueue joinedState)) cacheInsertQ
 | 
					        mapM_ (\f -> f joinedState) cacheInsertQ
 | 
				
			||||||
        if responses == Set.empty
 | 
					        if responses == Set.empty
 | 
				
			||||||
                  then pure . Left $ "join error: got no response from " <> show (getNid toJoinOn)
 | 
					                  then pure . Left $ "join error: got no response from " <> show (getNid toJoinOn)
 | 
				
			||||||
           else do
 | 
					           else do
 | 
				
			||||||
| 
						 | 
					@ -651,14 +581,14 @@ sendQueryIdMessages :: (Integral i)
 | 
				
			||||||
                    -> Maybe i                       -- ^ optionally provide an explicit @l@ parameter of number of nodes to be returned
 | 
					                    -> Maybe i                       -- ^ optionally provide an explicit @l@ parameter of number of nodes to be returned
 | 
				
			||||||
                   -> [RemoteNodeState]             -- ^ nodes to query
 | 
					                   -> [RemoteNodeState]             -- ^ nodes to query
 | 
				
			||||||
                   -> IO QueryResponse -- ^ accumulated response
 | 
					                   -> IO QueryResponse -- ^ accumulated response
 | 
				
			||||||
sendQueryIdMessages lookupID ns lParam targets = do
 | 
					sendQueryIdMessages targetID ns lParam targets = do
 | 
				
			||||||
 | 
					
 | 
				
			||||||
          -- create connected sockets to all query targets and use them for request handling
 | 
					          -- create connected sockets to all query targets and use them for request handling
 | 
				
			||||||
 | 
					
 | 
				
			||||||
          nodeConf <- nodeConfig <$> readTVarIO (parentRealNode ns)
 | 
					          nodeConf <- nodeConfig <$> readTVarIO (parentRealNode ns)
 | 
				
			||||||
          let srcAddr = confIP nodeConf
 | 
					          let srcAddr = confIP nodeConf
 | 
				
			||||||
          queryThreads <- mapM (\resultNode -> async $ bracket (mkSendSocket srcAddr (getDomain resultNode) (getDhtPort resultNode)) close (
 | 
					          queryThreads <- mapM (\resultNode -> async $ bracket (mkSendSocket srcAddr (getDomain resultNode) (getDhtPort resultNode)) close (
 | 
				
			||||||
              sendRequestTo (confRequestTimeout nodeConf) (confRequestRetries nodeConf) (lookupMessage lookupID ns Nothing (getNid resultNode))
 | 
					              sendRequestTo (confRequestTimeout nodeConf) (confRequestRetries nodeConf) (lookupMessage targetID ns Nothing)
 | 
				
			||||||
                                                                                                                                   )) targets
 | 
					                                                                                                                                   )) targets
 | 
				
			||||||
          -- ToDo: process results immediately instead of waiting for the last one to finish, see https://stackoverflow.com/a/38815224/9198613
 | 
					          -- ToDo: process results immediately instead of waiting for the last one to finish, see https://stackoverflow.com/a/38815224/9198613
 | 
				
			||||||
          -- ToDo: exception handling, maybe log them
 | 
					          -- ToDo: exception handling, maybe log them
 | 
				
			||||||
| 
						 | 
					@ -675,7 +605,7 @@ sendQueryIdMessages lookupID ns lParam targets = do
 | 
				
			||||||
                             _ -> Set.empty
 | 
					                             _ -> Set.empty
 | 
				
			||||||
 | 
					
 | 
				
			||||||
            -- forward entries to global cache
 | 
					            -- forward entries to global cache
 | 
				
			||||||
            queueAddEntries entrySet (cacheWriteQueue ns)
 | 
					            queueAddEntries entrySet ns
 | 
				
			||||||
            -- return accumulated QueryResult
 | 
					            -- return accumulated QueryResult
 | 
				
			||||||
            pure $ case acc of
 | 
					            pure $ case acc of
 | 
				
			||||||
              -- once a FOUND as been encountered, return this as a result
 | 
					              -- once a FOUND as been encountered, return this as a result
 | 
				
			||||||
| 
						 | 
					@ -691,14 +621,13 @@ sendQueryIdMessages lookupID ns lParam targets = do
 | 
				
			||||||
 | 
					
 | 
				
			||||||
-- | Create a QueryID message to be supplied to 'sendRequestTo'
 | 
					-- | Create a QueryID message to be supplied to 'sendRequestTo'
 | 
				
			||||||
lookupMessage :: Integral i
 | 
					lookupMessage :: Integral i
 | 
				
			||||||
              => NodeID         -- ^ lookup ID to be looked up
 | 
					              => NodeID         -- ^ target ID
 | 
				
			||||||
              -> LocalNodeState s -- ^ sender node state
 | 
					              -> LocalNodeState s -- ^ sender node state
 | 
				
			||||||
              -> Maybe i        -- ^ optionally provide a different l parameter
 | 
					              -> Maybe i        -- ^ optionally provide a different l parameter
 | 
				
			||||||
              -> NodeID         -- ^ target ID of message destination
 | 
					 | 
				
			||||||
              ->  (Integer -> FediChordMessage)
 | 
					              ->  (Integer -> FediChordMessage)
 | 
				
			||||||
lookupMessage lookupID ns lParam targetID = mkRequest ns targetID QueryID (Just $ pl ns lookupID)
 | 
					lookupMessage targetID ns lParam = \rID -> Request rID (toRemoteNodeState ns) 1 True QueryID (Just $ pl ns targetID)
 | 
				
			||||||
  where
 | 
					  where
 | 
				
			||||||
      pl ns' lookupID' = QueryIDRequestPayload { queryTargetID = lookupID', queryLBestNodes = maybe (fromIntegral $ lNumBestNodes ns') fromIntegral lParam }
 | 
					    pl ns' targetID' = QueryIDRequestPayload { queryTargetID = targetID', queryLBestNodes = maybe (fromIntegral $ lNumBestNodes ns) fromIntegral lParam }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
-- | Send a stabilise request to provided 'RemoteNode' and, if successful,
 | 
					-- | Send a stabilise request to provided 'RemoteNode' and, if successful,
 | 
				
			||||||
| 
						 | 
					@ -709,7 +638,16 @@ requestStabilise :: LocalNodeState s      -- ^ sending node
 | 
				
			||||||
requestStabilise ns neighbour = do
 | 
					requestStabilise ns neighbour = do
 | 
				
			||||||
    nodeConf <- nodeConfig <$> readTVarIO (parentRealNode ns)
 | 
					    nodeConf <- nodeConfig <$> readTVarIO (parentRealNode ns)
 | 
				
			||||||
    let srcAddr = confIP nodeConf
 | 
					    let srcAddr = confIP nodeConf
 | 
				
			||||||
    responses <- bracket (mkSendSocket srcAddr (getDomain neighbour) (getDhtPort neighbour)) close (fmap Right . sendRequestTo (confRequestTimeout nodeConf) (confRequestRetries nodeConf) (mkRequest ns (getNid neighbour) Stabilise (Just StabiliseRequestPayload))
 | 
					    responses <- bracket (mkSendSocket srcAddr (getDomain neighbour) (getDhtPort neighbour)) close (fmap Right . sendRequestTo (confRequestTimeout nodeConf) (confRequestRetries nodeConf) (\rid ->
 | 
				
			||||||
 | 
					        Request {
 | 
				
			||||||
 | 
					            requestID = rid
 | 
				
			||||||
 | 
					          , sender = toRemoteNodeState ns
 | 
				
			||||||
 | 
					          , part = 1
 | 
				
			||||||
 | 
					          , isFinalPart = False
 | 
				
			||||||
 | 
					          , action = Stabilise
 | 
				
			||||||
 | 
					          , payload = Just StabiliseRequestPayload
 | 
				
			||||||
 | 
					                }
 | 
				
			||||||
 | 
					                         )
 | 
				
			||||||
                                                                                           ) `catch`  (\e -> pure . Left $ displayException (e :: IOException))
 | 
					                                                                                           ) `catch`  (\e -> pure . Left $ displayException (e :: IOException))
 | 
				
			||||||
    either
 | 
					    either
 | 
				
			||||||
        -- forward IO error messages
 | 
					        -- forward IO error messages
 | 
				
			||||||
| 
						 | 
					@ -722,7 +660,7 @@ requestStabilise ns neighbour = do
 | 
				
			||||||
                                                      )
 | 
					                                                      )
 | 
				
			||||||
                                                      ([],[]) respSet
 | 
					                                                      ([],[]) respSet
 | 
				
			||||||
            -- update successfully responded neighbour in cache
 | 
					            -- update successfully responded neighbour in cache
 | 
				
			||||||
            maybe (pure ()) (\p -> queueUpdateVerifieds (Identity $ senderID p) (cacheWriteQueue ns)) $ headMay (Set.elems respSet)
 | 
					            maybe (pure ()) (\p -> queueUpdateVerifieds (Identity $ senderID p) ns) $ headMay (Set.elems respSet)
 | 
				
			||||||
            pure $ if null responsePreds && null responseSuccs
 | 
					            pure $ if null responsePreds && null responseSuccs
 | 
				
			||||||
                      then Left "no neighbours returned"
 | 
					                      then Left "no neighbours returned"
 | 
				
			||||||
                      else Right (responsePreds, responseSuccs)
 | 
					                      else Right (responsePreds, responseSuccs)
 | 
				
			||||||
| 
						 | 
					@ -744,12 +682,17 @@ requestLeave ns doMigration target = do
 | 
				
			||||||
      , leavePredecessors = predecessors ns
 | 
					      , leavePredecessors = predecessors ns
 | 
				
			||||||
      , leaveDoMigration = doMigration
 | 
					      , leaveDoMigration = doMigration
 | 
				
			||||||
                                           }
 | 
					                                           }
 | 
				
			||||||
    responses <- bracket
 | 
					    responses <- bracket (mkSendSocket srcAddr (getDomain target) (getDhtPort target)) close (fmap Right . sendRequestTo (confRequestTimeout nodeConf) (confRequestRetries nodeConf) (\rid ->
 | 
				
			||||||
        (mkSendSocket srcAddr (getDomain target) (getDhtPort target))
 | 
					        Request {
 | 
				
			||||||
        close
 | 
					            requestID = rid
 | 
				
			||||||
        (fmap Right
 | 
					          , sender = toRemoteNodeState ns
 | 
				
			||||||
        . sendRequestTo (confRequestTimeout nodeConf) (confRequestRetries nodeConf) (mkRequest ns (getNid target) Leave (Just leavePayload))
 | 
					          , part = 1
 | 
				
			||||||
        ) `catch`  (\e -> pure . Left $ displayException (e :: IOException))
 | 
					          , isFinalPart = False
 | 
				
			||||||
 | 
					          , action = Leave
 | 
				
			||||||
 | 
					          , payload = Just leavePayload
 | 
				
			||||||
 | 
					                }
 | 
				
			||||||
 | 
					                         )
 | 
				
			||||||
 | 
					                                                                                           ) `catch`  (\e -> pure . Left $ displayException (e :: IOException))
 | 
				
			||||||
    either
 | 
					    either
 | 
				
			||||||
        -- forward IO error messages
 | 
					        -- forward IO error messages
 | 
				
			||||||
        (pure . Left)
 | 
					        (pure . Left)
 | 
				
			||||||
| 
						 | 
					@ -765,7 +708,16 @@ requestPing ns target = do
 | 
				
			||||||
    let srcAddr = confIP nodeConf
 | 
					    let srcAddr = confIP nodeConf
 | 
				
			||||||
    responses <- bracket (mkSendSocket srcAddr (getDomain target) (getDhtPort target)) close
 | 
					    responses <- bracket (mkSendSocket srcAddr (getDomain target) (getDhtPort target)) close
 | 
				
			||||||
        (\sock -> do
 | 
					        (\sock -> do
 | 
				
			||||||
            resp <- sendRequestTo (confRequestTimeout nodeConf) (confRequestRetries nodeConf) (mkRequest ns (getNid target) Ping (Just PingRequestPayload)) sock
 | 
					            resp <- sendRequestTo (confRequestTimeout nodeConf) (confRequestRetries nodeConf) (\rid ->
 | 
				
			||||||
 | 
					                Request {
 | 
				
			||||||
 | 
					                    requestID = rid
 | 
				
			||||||
 | 
					                  , sender = toRemoteNodeState ns
 | 
				
			||||||
 | 
					                  , part = 1
 | 
				
			||||||
 | 
					                  , isFinalPart = False
 | 
				
			||||||
 | 
					                  , action = Ping
 | 
				
			||||||
 | 
					                  , payload = Just PingRequestPayload
 | 
				
			||||||
 | 
					                        }
 | 
				
			||||||
 | 
					                             ) sock
 | 
				
			||||||
            (SockAddrInet6 _ _ peerAddr _) <- getPeerName sock
 | 
					            (SockAddrInet6 _ _ peerAddr _) <- getPeerName sock
 | 
				
			||||||
            pure $ Right (peerAddr, resp)
 | 
					            pure $ Right (peerAddr, resp)
 | 
				
			||||||
                                                                                               ) `catch`  (\e -> pure . Left $ displayException (e :: IOException))
 | 
					                                                                                               ) `catch`  (\e -> pure . Left $ displayException (e :: IOException))
 | 
				
			||||||
| 
						 | 
					@ -781,9 +733,10 @@ requestPing ns target = do
 | 
				
			||||||
            -- recompute ID for each received node and mark as verified in cache
 | 
					            -- recompute ID for each received node and mark as verified in cache
 | 
				
			||||||
            now <- getPOSIXTime
 | 
					            now <- getPOSIXTime
 | 
				
			||||||
            forM_ responseVss (\vs ->
 | 
					            forM_ responseVss (\vs ->
 | 
				
			||||||
                if hasValidNodeId (confKChoicesMaxVS nodeConf) vs peerAddr
 | 
					                let recomputedID = genNodeID peerAddr (getDomain vs) (fromInteger $ getVServerID vs)
 | 
				
			||||||
                   then atomically $ writeTQueue (cacheWriteQueue ns) $ addNodeAsVerifiedPure now vs
 | 
					                 in if recomputedID == getNid vs
 | 
				
			||||||
                   else pure ()
 | 
					                       then atomically $ writeTQueue (cacheWriteQueue ns) $ addNodeAsVerifiedPure now vs
 | 
				
			||||||
 | 
					                       else pure ()
 | 
				
			||||||
                              )
 | 
					                              )
 | 
				
			||||||
            pure $ if null responseVss
 | 
					            pure $ if null responseVss
 | 
				
			||||||
                      then Left "no active vServer IDs returned, ignoring node"
 | 
					                      then Left "no active vServer IDs returned, ignoring node"
 | 
				
			||||||
| 
						 | 
					@ -791,37 +744,6 @@ requestPing ns target = do
 | 
				
			||||||
        ) responses
 | 
					        ) responses
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
-- still need a particular vserver as LocalNodeState, because requests need a sender
 | 
					 | 
				
			||||||
requestQueryLoad :: (MonadError String m, MonadIO m)
 | 
					 | 
				
			||||||
                 => LocalNodeState s    -- ^ the local source vserver for the request
 | 
					 | 
				
			||||||
                 -> NodeID              -- ^ upper bound of the segment queried, lower bound is set automatically by the queried node
 | 
					 | 
				
			||||||
                 -> RemoteNodeState     -- ^ target node to query
 | 
					 | 
				
			||||||
                 -> m SegmentLoadStats
 | 
					 | 
				
			||||||
requestQueryLoad ns upperIdBound target = do
 | 
					 | 
				
			||||||
    nodeConf <- nodeConfig <$> liftIO (readTVarIO $ parentRealNode ns)
 | 
					 | 
				
			||||||
    let
 | 
					 | 
				
			||||||
        srcAddr = confIP nodeConf
 | 
					 | 
				
			||||||
        loadReqPl = LoadRequestPayload
 | 
					 | 
				
			||||||
            { loadSegmentUpperBound = upperIdBound
 | 
					 | 
				
			||||||
            }
 | 
					 | 
				
			||||||
    responses <- liftIO $ bracket (mkSendSocket srcAddr (getDomain target) (getDhtPort target)) close
 | 
					 | 
				
			||||||
        (fmap Right . sendRequestTo (confRequestTimeout nodeConf) (confRequestRetries nodeConf) (mkRequest ns (getNid target) QueryLoad (Just loadReqPl))
 | 
					 | 
				
			||||||
        ) `catch`  (\e -> pure . Left $ displayException (e :: IOException))
 | 
					 | 
				
			||||||
    responseMsgSet <- liftEither responses
 | 
					 | 
				
			||||||
    -- throws an error if an exception happened
 | 
					 | 
				
			||||||
    loadResPl <- maybe (throwError "no load response payload found") pure
 | 
					 | 
				
			||||||
        (extractFirstPayload responseMsgSet)
 | 
					 | 
				
			||||||
    pure SegmentLoadStats
 | 
					 | 
				
			||||||
        { segmentLowerKeyBound = loadSegmentLowerBound loadResPl
 | 
					 | 
				
			||||||
        , segmentUpperKeyBound = upperIdBound
 | 
					 | 
				
			||||||
        , segmentLoad = loadSum loadResPl
 | 
					 | 
				
			||||||
        , segmentOwnerRemainingLoadTarget = loadRemainingTarget loadResPl
 | 
					 | 
				
			||||||
        , segmentOwnerCapacity = loadTotalCapacity loadResPl
 | 
					 | 
				
			||||||
        , segmentCurrentOwner = target
 | 
					 | 
				
			||||||
        }
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
-- | Generic function for sending a request over a connected socket and collecting the response.
 | 
					-- | Generic function for sending a request over a connected socket and collecting the response.
 | 
				
			||||||
-- Serialises the message and tries to deliver its parts for a number of attempts within a specified timeout.
 | 
					-- Serialises the message and tries to deliver its parts for a number of attempts within a specified timeout.
 | 
				
			||||||
sendRequestTo :: Int                    -- ^ timeout in milliseconds
 | 
					sendRequestTo :: Int                    -- ^ timeout in milliseconds
 | 
				
			||||||
| 
						 | 
					@ -878,24 +800,24 @@ sendRequestTo timeoutMillis numAttempts msgIncomplete sock = do
 | 
				
			||||||
 | 
					
 | 
				
			||||||
-- | enqueue a list of RemoteCacheEntries to be added to the global NodeCache
 | 
					-- | enqueue a list of RemoteCacheEntries to be added to the global NodeCache
 | 
				
			||||||
queueAddEntries :: Foldable c => c RemoteCacheEntry
 | 
					queueAddEntries :: Foldable c => c RemoteCacheEntry
 | 
				
			||||||
                -> TQueue (NodeCache -> NodeCache)
 | 
					                -> LocalNodeState s
 | 
				
			||||||
                -> IO ()
 | 
					                -> IO ()
 | 
				
			||||||
queueAddEntries entries cacheQ = do
 | 
					queueAddEntries entries ns = do
 | 
				
			||||||
    now <- getPOSIXTime
 | 
					    now <- getPOSIXTime
 | 
				
			||||||
    forM_ entries $ \entry -> atomically $ writeTQueue cacheQ  $ addCacheEntryPure now entry
 | 
					    forM_ entries $ \entry -> atomically $ writeTQueue (cacheWriteQueue ns)  $ addCacheEntryPure now entry
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
-- | enque a list of node IDs to be deleted from the global NodeCache
 | 
					-- | enque a list of node IDs to be deleted from the global NodeCache
 | 
				
			||||||
queueDeleteEntries :: Foldable c
 | 
					queueDeleteEntries :: Foldable c
 | 
				
			||||||
                   => c NodeID
 | 
					                   => c NodeID
 | 
				
			||||||
                   -> TQueue (NodeCache -> NodeCache)
 | 
					                   -> LocalNodeState s
 | 
				
			||||||
                   -> IO ()
 | 
					                   -> IO ()
 | 
				
			||||||
queueDeleteEntries ids cacheQ = forM_ ids $ atomically . writeTQueue cacheQ . deleteCacheEntry
 | 
					queueDeleteEntries ids ns = forM_ ids $ atomically . writeTQueue (cacheWriteQueue ns) . deleteCacheEntry
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
-- | enque a single node ID to be deleted from the global NodeCache
 | 
					-- | enque a single node ID to be deleted from the global NodeCache
 | 
				
			||||||
queueDeleteEntry :: NodeID
 | 
					queueDeleteEntry :: NodeID
 | 
				
			||||||
                 -> TQueue (NodeCache -> NodeCache)
 | 
					                 -> LocalNodeState s
 | 
				
			||||||
                 -> IO ()
 | 
					                 -> IO ()
 | 
				
			||||||
queueDeleteEntry toDelete = queueDeleteEntries $ Identity toDelete
 | 
					queueDeleteEntry toDelete = queueDeleteEntries $ Identity toDelete
 | 
				
			||||||
 | 
					
 | 
				
			||||||
| 
						 | 
					@ -904,11 +826,11 @@ queueDeleteEntry toDelete = queueDeleteEntries $ Identity toDelete
 | 
				
			||||||
-- global 'NodeCache'.
 | 
					-- global 'NodeCache'.
 | 
				
			||||||
queueUpdateVerifieds :: Foldable c
 | 
					queueUpdateVerifieds :: Foldable c
 | 
				
			||||||
                     => c NodeID
 | 
					                     => c NodeID
 | 
				
			||||||
                     -> TQueue (NodeCache -> NodeCache)
 | 
					                     -> LocalNodeState s
 | 
				
			||||||
                     -> IO ()
 | 
					                     -> IO ()
 | 
				
			||||||
queueUpdateVerifieds nIds cacheQ = do
 | 
					queueUpdateVerifieds nIds ns = do
 | 
				
			||||||
    now <- getPOSIXTime
 | 
					    now <- getPOSIXTime
 | 
				
			||||||
    forM_ nIds $ \nid' -> atomically $ writeTQueue cacheQ $
 | 
					    forM_ nIds $ \nid' -> atomically $ writeTQueue (cacheWriteQueue ns) $
 | 
				
			||||||
        markCacheEntryAsVerified (Just now) nid'
 | 
					        markCacheEntryAsVerified (Just now) nid'
 | 
				
			||||||
 | 
					
 | 
				
			||||||
-- | retry an IO action at most *i* times until it delivers a result
 | 
					-- | retry an IO action at most *i* times until it delivers a result
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
										
											
												File diff suppressed because it is too large
												Load diff
											
										
									
								
							| 
						 | 
					@ -7,8 +7,8 @@
 | 
				
			||||||
{-# LANGUAGE OverloadedStrings          #-}
 | 
					{-# LANGUAGE OverloadedStrings          #-}
 | 
				
			||||||
{-# LANGUAGE RankNTypes                 #-}
 | 
					{-# LANGUAGE RankNTypes                 #-}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
module Hash2Pub.FediChordTypes
 | 
					module Hash2Pub.FediChordTypes (
 | 
				
			||||||
  ( NodeID -- abstract, but newtype constructors cannot be hidden
 | 
					    NodeID -- abstract, but newtype constructors cannot be hidden
 | 
				
			||||||
  , idBits
 | 
					  , idBits
 | 
				
			||||||
  , getNodeID
 | 
					  , getNodeID
 | 
				
			||||||
  , toNodeID
 | 
					  , toNodeID
 | 
				
			||||||
| 
						 | 
					@ -18,13 +18,6 @@ module Hash2Pub.FediChordTypes
 | 
				
			||||||
  , RemoteNodeState (..)
 | 
					  , RemoteNodeState (..)
 | 
				
			||||||
  , RealNode (..)
 | 
					  , RealNode (..)
 | 
				
			||||||
  , RealNodeSTM
 | 
					  , RealNodeSTM
 | 
				
			||||||
  , VSMap
 | 
					 | 
				
			||||||
  , LoadStats (..)
 | 
					 | 
				
			||||||
  , emptyLoadStats
 | 
					 | 
				
			||||||
  , remainingLoadTarget
 | 
					 | 
				
			||||||
  , loadSliceSum
 | 
					 | 
				
			||||||
  , addVserver
 | 
					 | 
				
			||||||
  , SegmentLoadStats (..)
 | 
					 | 
				
			||||||
  , setSuccessors
 | 
					  , setSuccessors
 | 
				
			||||||
  , setPredecessors
 | 
					  , setPredecessors
 | 
				
			||||||
  , NodeCache
 | 
					  , NodeCache
 | 
				
			||||||
| 
						 | 
					@ -58,7 +51,6 @@ module Hash2Pub.FediChordTypes
 | 
				
			||||||
  , localCompare
 | 
					  , localCompare
 | 
				
			||||||
  , genNodeID
 | 
					  , genNodeID
 | 
				
			||||||
  , genNodeIDBS
 | 
					  , genNodeIDBS
 | 
				
			||||||
  , hasValidNodeId
 | 
					 | 
				
			||||||
  , genKeyID
 | 
					  , genKeyID
 | 
				
			||||||
  , genKeyIDBS
 | 
					  , genKeyIDBS
 | 
				
			||||||
  , byteStringToUInteger
 | 
					  , byteStringToUInteger
 | 
				
			||||||
| 
						 | 
					@ -68,14 +60,12 @@ module Hash2Pub.FediChordTypes
 | 
				
			||||||
  , DHT(..)
 | 
					  , DHT(..)
 | 
				
			||||||
  , Service(..)
 | 
					  , Service(..)
 | 
				
			||||||
  , ServiceConf(..)
 | 
					  , ServiceConf(..)
 | 
				
			||||||
  ) where
 | 
					                           ) where
 | 
				
			||||||
 | 
					
 | 
				
			||||||
import           Control.Exception
 | 
					import           Control.Exception
 | 
				
			||||||
import           Data.Foldable                 (foldr')
 | 
					import           Data.Foldable                 (foldr')
 | 
				
			||||||
import           Data.Function                 (on)
 | 
					import           Data.Function                 (on)
 | 
				
			||||||
import qualified Data.Hashable                 as Hashable
 | 
					import qualified Data.Hashable                 as Hashable
 | 
				
			||||||
import           Data.HashMap.Strict           (HashMap)
 | 
					 | 
				
			||||||
import qualified Data.HashMap.Strict           as HMap
 | 
					 | 
				
			||||||
import           Data.List                     (delete, nub, sortBy)
 | 
					import           Data.List                     (delete, nub, sortBy)
 | 
				
			||||||
import qualified Data.Map.Strict               as Map
 | 
					import qualified Data.Map.Strict               as Map
 | 
				
			||||||
import           Data.Maybe                    (fromJust, fromMaybe, isJust,
 | 
					import           Data.Maybe                    (fromJust, fromMaybe, isJust,
 | 
				
			||||||
| 
						 | 
					@ -158,27 +148,17 @@ a `localCompare` b
 | 
				
			||||||
-- Also contains shared data and config values.
 | 
					-- Also contains shared data and config values.
 | 
				
			||||||
-- TODO: more data structures for k-choices bookkeeping
 | 
					-- TODO: more data structures for k-choices bookkeeping
 | 
				
			||||||
data RealNode s = RealNode
 | 
					data RealNode s = RealNode
 | 
				
			||||||
    { vservers              :: VSMap s
 | 
					    { vservers       :: [LocalNodeStateSTM s]
 | 
				
			||||||
    -- ^ map of all active VServer node IDs to their node state
 | 
					    -- ^ references to all active versers
 | 
				
			||||||
    , nodeConfig            :: FediChordConf
 | 
					    , nodeConfig     :: FediChordConf
 | 
				
			||||||
    -- ^ holds the initial configuration read at program start
 | 
					    -- ^ holds the initial configuration read at program start
 | 
				
			||||||
    , bootstrapNodes        :: [(String, PortNumber)]
 | 
					    , bootstrapNodes :: [(String, PortNumber)]
 | 
				
			||||||
    -- ^ nodes to be used as bootstrapping points, new ones learned during operation
 | 
					    -- ^ nodes to be used as bootstrapping points, new ones learned during operation
 | 
				
			||||||
    , lookupCacheSTM        :: TVar LookupCache
 | 
					    , lookupCacheSTM :: TVar LookupCache
 | 
				
			||||||
    -- ^ a global cache of looked up keys and their associated nodes
 | 
					    -- ^ a global cache of looked up keys and their associated nodes
 | 
				
			||||||
    , globalNodeCacheSTM    :: TVar NodeCache
 | 
					    , nodeService    :: s (RealNodeSTM s)
 | 
				
			||||||
    -- ^ EpiChord node cache with expiry times for nodes.
 | 
					 | 
				
			||||||
    , globalCacheWriteQueue :: TQueue (NodeCache -> NodeCache)
 | 
					 | 
				
			||||||
    -- ^ cache updates are not written directly to the  'globalNodeCacheSTM'
 | 
					 | 
				
			||||||
    , nodeService           :: s (RealNodeSTM s)
 | 
					 | 
				
			||||||
    }
 | 
					    }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
-- | insert a new vserver mapping into a node
 | 
					 | 
				
			||||||
addVserver :: (NodeID, LocalNodeStateSTM s) -> RealNode s -> RealNode s
 | 
					 | 
				
			||||||
addVserver (key, nstate) node = node
 | 
					 | 
				
			||||||
    { vservers = addRMapEntry key nstate (vservers node) }
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
type VSMap s = RingMap NodeID (LocalNodeStateSTM s)
 | 
					 | 
				
			||||||
type RealNodeSTM s = TVar (RealNode s)
 | 
					type RealNodeSTM s = TVar (RealNode s)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
-- | represents a node and all its important state
 | 
					-- | represents a node and all its important state
 | 
				
			||||||
| 
						 | 
					@ -192,7 +172,7 @@ data RemoteNodeState = RemoteNodeState
 | 
				
			||||||
    -- ^ port of the DHT itself
 | 
					    -- ^ port of the DHT itself
 | 
				
			||||||
    , servicePort :: PortNumber
 | 
					    , servicePort :: PortNumber
 | 
				
			||||||
    -- ^ port of the service provided on top of the DHT
 | 
					    -- ^ port of the service provided on top of the DHT
 | 
				
			||||||
    , vServerID   :: Word8
 | 
					    , vServerID   :: Integer
 | 
				
			||||||
    -- ^ ID of this vserver
 | 
					    -- ^ ID of this vserver
 | 
				
			||||||
    }
 | 
					    }
 | 
				
			||||||
    deriving (Show, Eq)
 | 
					    deriving (Show, Eq)
 | 
				
			||||||
| 
						 | 
					@ -205,9 +185,9 @@ data LocalNodeState s = LocalNodeState
 | 
				
			||||||
    { nodeState           :: RemoteNodeState
 | 
					    { nodeState           :: RemoteNodeState
 | 
				
			||||||
    -- ^ represents common data present both in remote and local node representations
 | 
					    -- ^ represents common data present both in remote and local node representations
 | 
				
			||||||
    , nodeCacheSTM        :: TVar NodeCache
 | 
					    , nodeCacheSTM        :: TVar NodeCache
 | 
				
			||||||
    -- ^ reference to the 'globalNodeCacheSTM'
 | 
					    -- ^ EpiChord node cache with expiry times for nodes
 | 
				
			||||||
    , cacheWriteQueue     :: TQueue (NodeCache -> NodeCache)
 | 
					    , cacheWriteQueue     :: TQueue (NodeCache -> NodeCache)
 | 
				
			||||||
    -- ^ reference to the 'globalCacheWriteQueue
 | 
					    -- ^ cache updates are not written directly to the  'nodeCache' but queued and
 | 
				
			||||||
    , successors          :: [RemoteNodeState] -- could be a set instead as these are ordered as well
 | 
					    , successors          :: [RemoteNodeState] -- could be a set instead as these are ordered as well
 | 
				
			||||||
    -- ^ successor nodes in ascending order by distance
 | 
					    -- ^ successor nodes in ascending order by distance
 | 
				
			||||||
    , predecessors        :: [RemoteNodeState]
 | 
					    , predecessors        :: [RemoteNodeState]
 | 
				
			||||||
| 
						 | 
					@ -237,14 +217,14 @@ class NodeState a where
 | 
				
			||||||
    getIpAddr :: a -> HostAddress6
 | 
					    getIpAddr :: a -> HostAddress6
 | 
				
			||||||
    getDhtPort :: a -> PortNumber
 | 
					    getDhtPort :: a -> PortNumber
 | 
				
			||||||
    getServicePort :: a -> PortNumber
 | 
					    getServicePort :: a -> PortNumber
 | 
				
			||||||
    getVServerID :: a -> Word8
 | 
					    getVServerID :: a -> Integer
 | 
				
			||||||
    -- setters for common properties
 | 
					    -- setters for common properties
 | 
				
			||||||
    setNid :: NodeID -> a -> a
 | 
					    setNid :: NodeID -> a -> a
 | 
				
			||||||
    setDomain :: String -> a -> a
 | 
					    setDomain :: String -> a -> a
 | 
				
			||||||
    setIpAddr :: HostAddress6 -> a -> a
 | 
					    setIpAddr :: HostAddress6 -> a -> a
 | 
				
			||||||
    setDhtPort :: PortNumber -> a -> a
 | 
					    setDhtPort :: PortNumber -> a -> a
 | 
				
			||||||
    setServicePort :: PortNumber -> a -> a
 | 
					    setServicePort :: PortNumber -> a -> a
 | 
				
			||||||
    setVServerID :: Word8 -> a -> a
 | 
					    setVServerID :: Integer -> a -> a
 | 
				
			||||||
    toRemoteNodeState :: a -> RemoteNodeState
 | 
					    toRemoteNodeState :: a -> RemoteNodeState
 | 
				
			||||||
 | 
					
 | 
				
			||||||
instance NodeState RemoteNodeState where
 | 
					instance NodeState RemoteNodeState where
 | 
				
			||||||
| 
						 | 
					@ -393,11 +373,6 @@ genNodeID :: HostAddress6       -- ^a node's IPv6 address
 | 
				
			||||||
            -> NodeID           -- ^the generated @NodeID@
 | 
					            -> NodeID           -- ^the generated @NodeID@
 | 
				
			||||||
genNodeID ip nodeDomain vs = NodeID . byteStringToUInteger $ genNodeIDBS ip nodeDomain vs
 | 
					genNodeID ip nodeDomain vs = NodeID . byteStringToUInteger $ genNodeIDBS ip nodeDomain vs
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					 | 
				
			||||||
hasValidNodeId :: Word8 -> RemoteNodeState -> HostAddress6 -> Bool
 | 
					 | 
				
			||||||
hasValidNodeId numVs rns addr = getVServerID rns < numVs && getNid rns == genNodeID addr (getDomain rns) (getVServerID rns)
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
-- | generates a 256 bit long key identifier, represented as ByteString, for looking up its data on the DHT
 | 
					-- | generates a 256 bit long key identifier, represented as ByteString, for looking up its data on the DHT
 | 
				
			||||||
genKeyIDBS :: String            -- ^the key string
 | 
					genKeyIDBS :: String            -- ^the key string
 | 
				
			||||||
           -> BS.ByteString     -- ^the key ID represented as a @ByteString@
 | 
					           -> BS.ByteString     -- ^the key ID represented as a @ByteString@
 | 
				
			||||||
| 
						 | 
					@ -452,70 +427,9 @@ data FediChordConf = FediChordConf
 | 
				
			||||||
    -- ^ how long to wait until response has arrived, in milliseconds
 | 
					    -- ^ how long to wait until response has arrived, in milliseconds
 | 
				
			||||||
    , confRequestRetries            :: Int
 | 
					    , confRequestRetries            :: Int
 | 
				
			||||||
    -- ^ how often re-sending a timed-out request can be retried
 | 
					    -- ^ how often re-sending a timed-out request can be retried
 | 
				
			||||||
    , confEnableKChoices            :: Bool
 | 
					 | 
				
			||||||
    -- ^ whether to enable k-choices load balancing
 | 
					 | 
				
			||||||
    , confKChoicesOverload          :: Double
 | 
					 | 
				
			||||||
    -- ^ fraction of capacity above which a node considers itself overloaded
 | 
					 | 
				
			||||||
    , confKChoicesUnderload         :: Double
 | 
					 | 
				
			||||||
    -- ^ fraction of capacity below which a node considers itself underloaded
 | 
					 | 
				
			||||||
    , confKChoicesMaxVS             :: Word8
 | 
					 | 
				
			||||||
    -- ^ upper limit of vserver index κ
 | 
					 | 
				
			||||||
    , confKChoicesRebalanceInterval :: Int
 | 
					 | 
				
			||||||
    -- ^ interval between vserver rebalance attempts
 | 
					 | 
				
			||||||
    }
 | 
					    }
 | 
				
			||||||
    deriving (Show, Eq)
 | 
					    deriving (Show, Eq)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
-- ====== k-choices load balancing types ======
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
data LoadStats = LoadStats
 | 
					 | 
				
			||||||
    { loadPerTag         :: RingMap NodeID Double
 | 
					 | 
				
			||||||
    -- ^ map of loads for each handled tag
 | 
					 | 
				
			||||||
    , totalCapacity      :: Double
 | 
					 | 
				
			||||||
    -- ^ total designated capacity of the service
 | 
					 | 
				
			||||||
    , compensatedLoadSum :: Double
 | 
					 | 
				
			||||||
    -- ^ effective load reevant for load balancing after compensating for
 | 
					 | 
				
			||||||
    }
 | 
					 | 
				
			||||||
    deriving (Show, Eq)
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
-- | calculates the mismatch from the target load by taking into account the
 | 
					 | 
				
			||||||
-- underload and overload limits
 | 
					 | 
				
			||||||
remainingLoadTarget :: FediChordConf -> LoadStats -> Double
 | 
					 | 
				
			||||||
remainingLoadTarget conf lstats = targetLoad - compensatedLoadSum lstats
 | 
					 | 
				
			||||||
    where
 | 
					 | 
				
			||||||
        targetLoad = totalCapacity lstats * (confKChoicesUnderload conf + confKChoicesOverload conf) / 2
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
-- | calculates the sum of tag load in a contiguous slice between to keys
 | 
					 | 
				
			||||||
loadSliceSum :: LoadStats
 | 
					 | 
				
			||||||
             -> NodeID  -- ^ lower segment bound
 | 
					 | 
				
			||||||
             -> NodeID  -- ^ upper segment bound
 | 
					 | 
				
			||||||
             -> Double  -- ^ sum of all tag loads within that segment
 | 
					 | 
				
			||||||
loadSliceSum stats from to = sum . takeRMapSuccessorsFromTo from to $ loadPerTag stats
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
data SegmentLoadStats = SegmentLoadStats
 | 
					 | 
				
			||||||
    { segmentLowerKeyBound            :: NodeID
 | 
					 | 
				
			||||||
    -- ^ segment start key
 | 
					 | 
				
			||||||
    , segmentUpperKeyBound            :: NodeID
 | 
					 | 
				
			||||||
    -- ^ segment end key
 | 
					 | 
				
			||||||
    , segmentLoad                     :: Double
 | 
					 | 
				
			||||||
    -- ^ sum of load of all keys in the segment
 | 
					 | 
				
			||||||
    , segmentOwnerRemainingLoadTarget :: Double
 | 
					 | 
				
			||||||
    -- ^ remaining load target of the current segment handler:
 | 
					 | 
				
			||||||
    , segmentOwnerCapacity            :: Double
 | 
					 | 
				
			||||||
    -- ^ total capacity of the current segment handler node, used for normalisation
 | 
					 | 
				
			||||||
    , segmentCurrentOwner             :: RemoteNodeState
 | 
					 | 
				
			||||||
    -- ^ the current owner of the segment that needs to be joined on
 | 
					 | 
				
			||||||
    }
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
-- TODO: figure out a better way of initialising
 | 
					 | 
				
			||||||
emptyLoadStats :: LoadStats
 | 
					 | 
				
			||||||
emptyLoadStats = LoadStats
 | 
					 | 
				
			||||||
    { loadPerTag = emptyRMap
 | 
					 | 
				
			||||||
    , totalCapacity = 0
 | 
					 | 
				
			||||||
    , compensatedLoadSum = 0
 | 
					 | 
				
			||||||
    }
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
-- ====== Service Types ============
 | 
					-- ====== Service Types ============
 | 
				
			||||||
 | 
					
 | 
				
			||||||
class Service s d where
 | 
					class Service s d where
 | 
				
			||||||
| 
						 | 
					@ -531,7 +445,6 @@ class Service s d where
 | 
				
			||||||
                -> IO (Either String ())    -- ^ success or failure
 | 
					                -> IO (Either String ())    -- ^ success or failure
 | 
				
			||||||
    -- | Wait for an incoming migration from a given node to succeed, may block forever
 | 
					    -- | Wait for an incoming migration from a given node to succeed, may block forever
 | 
				
			||||||
    waitForMigrationFrom :: s d -> NodeID -> IO ()
 | 
					    waitForMigrationFrom :: s d -> NodeID -> IO ()
 | 
				
			||||||
    getServiceLoadStats :: s d -> IO LoadStats
 | 
					 | 
				
			||||||
 | 
					
 | 
				
			||||||
instance Hashable.Hashable NodeID where
 | 
					instance Hashable.Hashable NodeID where
 | 
				
			||||||
    hashWithSalt salt = Hashable.hashWithSalt salt . getNodeID
 | 
					    hashWithSalt salt = Hashable.hashWithSalt salt . getNodeID
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
| 
						 | 
					@ -22,7 +22,7 @@ import qualified Data.DList                as D
 | 
				
			||||||
import           Data.Either               (lefts, rights)
 | 
					import           Data.Either               (lefts, rights)
 | 
				
			||||||
import qualified Data.HashMap.Strict       as HMap
 | 
					import qualified Data.HashMap.Strict       as HMap
 | 
				
			||||||
import qualified Data.HashSet              as HSet
 | 
					import qualified Data.HashSet              as HSet
 | 
				
			||||||
import           Data.Maybe                (fromJust, fromMaybe, isJust)
 | 
					import           Data.Maybe                (fromJust, isJust)
 | 
				
			||||||
import           Data.String               (fromString)
 | 
					import           Data.String               (fromString)
 | 
				
			||||||
import           Data.Text.Lazy            (Text)
 | 
					import           Data.Text.Lazy            (Text)
 | 
				
			||||||
import qualified Data.Text.Lazy            as Txt
 | 
					import qualified Data.Text.Lazy            as Txt
 | 
				
			||||||
| 
						 | 
					@ -64,10 +64,8 @@ data PostService d = PostService
 | 
				
			||||||
    , migrationsInProgress :: TVar (HMap.HashMap NodeID (MVar ()))
 | 
					    , migrationsInProgress :: TVar (HMap.HashMap NodeID (MVar ()))
 | 
				
			||||||
    , httpMan              :: HTTP.Manager
 | 
					    , httpMan              :: HTTP.Manager
 | 
				
			||||||
    , statsQueue           :: TQueue StatsEvent
 | 
					    , statsQueue           :: TQueue StatsEvent
 | 
				
			||||||
    , relayStats           :: TVar RelayStats
 | 
					    , loadStats            :: TVar RelayStats
 | 
				
			||||||
    -- ^ current relay stats, replaced periodically
 | 
					    -- ^ current load stats, replaced periodically
 | 
				
			||||||
    , loadStats            :: TVar LoadStats
 | 
					 | 
				
			||||||
    -- ^ current load values of the relay, replaced periodically and used by
 | 
					 | 
				
			||||||
    , logFileHandle        :: Handle
 | 
					    , logFileHandle        :: Handle
 | 
				
			||||||
    }
 | 
					    }
 | 
				
			||||||
    deriving (Typeable)
 | 
					    deriving (Typeable)
 | 
				
			||||||
| 
						 | 
					@ -98,8 +96,7 @@ instance DHT d => Service PostService d where
 | 
				
			||||||
        migrationsInProgress' <- newTVarIO HMap.empty
 | 
					        migrationsInProgress' <- newTVarIO HMap.empty
 | 
				
			||||||
        httpMan' <- HTTP.newManager HTTP.defaultManagerSettings
 | 
					        httpMan' <- HTTP.newManager HTTP.defaultManagerSettings
 | 
				
			||||||
        statsQueue' <- newTQueueIO
 | 
					        statsQueue' <- newTQueueIO
 | 
				
			||||||
        relayStats' <- newTVarIO emptyStats
 | 
					        loadStats' <- newTVarIO emptyStats
 | 
				
			||||||
        loadStats' <- newTVarIO emptyLoadStats
 | 
					 | 
				
			||||||
        loggingFile <- openFile (confLogfilePath conf) WriteMode
 | 
					        loggingFile <- openFile (confLogfilePath conf) WriteMode
 | 
				
			||||||
        hSetBuffering loggingFile LineBuffering
 | 
					        hSetBuffering loggingFile LineBuffering
 | 
				
			||||||
        let
 | 
					        let
 | 
				
			||||||
| 
						 | 
					@ -115,7 +112,6 @@ instance DHT d => Service PostService d where
 | 
				
			||||||
              , migrationsInProgress = migrationsInProgress'
 | 
					              , migrationsInProgress = migrationsInProgress'
 | 
				
			||||||
              , httpMan = httpMan'
 | 
					              , httpMan = httpMan'
 | 
				
			||||||
              , statsQueue = statsQueue'
 | 
					              , statsQueue = statsQueue'
 | 
				
			||||||
              , relayStats = relayStats'
 | 
					 | 
				
			||||||
              , loadStats = loadStats'
 | 
					              , loadStats = loadStats'
 | 
				
			||||||
              , logFileHandle = loggingFile
 | 
					              , logFileHandle = loggingFile
 | 
				
			||||||
              }
 | 
					              }
 | 
				
			||||||
| 
						 | 
					@ -157,12 +153,6 @@ instance DHT d => Service PostService d where
 | 
				
			||||||
        -- block until migration finished
 | 
					        -- block until migration finished
 | 
				
			||||||
        takeMVar migrationSynchroniser
 | 
					        takeMVar migrationSynchroniser
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    getServiceLoadStats = getLoadStats
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
getLoadStats :: PostService d -> IO LoadStats
 | 
					 | 
				
			||||||
getLoadStats serv = readTVarIO $ loadStats serv
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
 | 
					
 | 
				
			||||||
-- | return a WAI application
 | 
					-- | return a WAI application
 | 
				
			||||||
postServiceApplication :: DHT d => PostService d -> Application
 | 
					postServiceApplication :: DHT d => PostService d -> Application
 | 
				
			||||||
| 
						 | 
					@ -845,12 +835,7 @@ evaluateStatsThread serv statsAcc = getPOSIXTime >>= loop
 | 
				
			||||||
          -- persistently store in a TVar so it can be retrieved later by the DHT
 | 
					          -- persistently store in a TVar so it can be retrieved later by the DHT
 | 
				
			||||||
          let timePassed = (now - previousTs) * fromIntegral (confSpeedupFactor $ serviceConf serv)
 | 
					          let timePassed = (now - previousTs) * fromIntegral (confSpeedupFactor $ serviceConf serv)
 | 
				
			||||||
              rateStats = evaluateStats timePassed summedStats
 | 
					              rateStats = evaluateStats timePassed summedStats
 | 
				
			||||||
          currentSubscribers <- readTVarIO $ subscribers serv
 | 
					          atomically $ writeTVar (loadStats serv) rateStats
 | 
				
			||||||
          -- translate the rate statistics to load values
 | 
					 | 
				
			||||||
          loads <- evaluateLoadStats rateStats currentSubscribers
 | 
					 | 
				
			||||||
          atomically $
 | 
					 | 
				
			||||||
              writeTVar (relayStats serv) rateStats
 | 
					 | 
				
			||||||
              >> writeTVar (loadStats serv) loads
 | 
					 | 
				
			||||||
          -- and now what? write a log to file
 | 
					          -- and now what? write a log to file
 | 
				
			||||||
          -- format: total relayReceiveRates;total relayDeliveryRates;postFetchRate;postPublishRate; subscriberSum
 | 
					          -- format: total relayReceiveRates;total relayDeliveryRates;postFetchRate;postPublishRate; subscriberSum
 | 
				
			||||||
          -- later: current (reported) load, target load
 | 
					          -- later: current (reported) load, target load
 | 
				
			||||||
| 
						 | 
					@ -874,33 +859,6 @@ evaluateStatsThread serv statsAcc = getPOSIXTime >>= loop
 | 
				
			||||||
                0 tagMap
 | 
					                0 tagMap
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
-- | calculate load values from rate statistics
 | 
					 | 
				
			||||||
evaluateLoadStats :: RelayStats -> RelayTags -> IO LoadStats
 | 
					 | 
				
			||||||
evaluateLoadStats currentStats currentSubscribers = do
 | 
					 | 
				
			||||||
    -- load caused by each tag: incomingPostRate * ( 1 + subscribers)
 | 
					 | 
				
			||||||
    -- calculate remaining load target: post publish rate * 2.5 - sum loadPerTag - postFetchRate
 | 
					 | 
				
			||||||
    let
 | 
					 | 
				
			||||||
        totalCapacity' = 2.5 * postPublishRate currentStats
 | 
					 | 
				
			||||||
    (loadSum, loadPerTag') <- foldM (\(loadSum, loadPerTag') (key, (subscriberMapSTM, _, _)) -> do
 | 
					 | 
				
			||||||
        numSubscribers <- HMap.size <$> readTVarIO subscriberMapSTM
 | 
					 | 
				
			||||||
        let
 | 
					 | 
				
			||||||
            thisTagRate = fromMaybe 0 $ rMapLookup key (relayReceiveRates currentStats)
 | 
					 | 
				
			||||||
            thisTagLoad = thisTagRate * (1 + fromIntegral numSubscribers)
 | 
					 | 
				
			||||||
        pure (loadSum + thisTagLoad, addRMapEntry key thisTagLoad loadPerTag')
 | 
					 | 
				
			||||||
        )
 | 
					 | 
				
			||||||
        (0, emptyRMap)
 | 
					 | 
				
			||||||
        $ rMapToListWithKeys currentSubscribers
 | 
					 | 
				
			||||||
    let remainingLoadTarget' = totalCapacity' - loadSum - postFetchRate currentStats
 | 
					 | 
				
			||||||
    pure LoadStats
 | 
					 | 
				
			||||||
        { loadPerTag = loadPerTag'
 | 
					 | 
				
			||||||
        , totalCapacity = totalCapacity'
 | 
					 | 
				
			||||||
        -- load caused by post fetches cannot be influenced by re-balancing nodes,
 | 
					 | 
				
			||||||
        -- but still reduces the totally available capacity
 | 
					 | 
				
			||||||
        , compensatedLoadSum = loadSum + postFetchRate currentStats
 | 
					 | 
				
			||||||
        }
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
-- | Evaluate the accumulated statistic events: Currently mostly calculates the event
 | 
					-- | Evaluate the accumulated statistic events: Currently mostly calculates the event
 | 
				
			||||||
-- rates by dividing through the collection time frame
 | 
					-- rates by dividing through the collection time frame
 | 
				
			||||||
evaluateStats :: POSIXTime -> RelayStats -> RelayStats
 | 
					evaluateStats :: POSIXTime -> RelayStats -> RelayStats
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
| 
						 | 
					@ -16,12 +16,10 @@ data Action = QueryID
 | 
				
			||||||
    | Leave
 | 
					    | Leave
 | 
				
			||||||
    | Stabilise
 | 
					    | Stabilise
 | 
				
			||||||
    | Ping
 | 
					    | Ping
 | 
				
			||||||
    | QueryLoad
 | 
					 | 
				
			||||||
    deriving (Show, Eq, Enum)
 | 
					    deriving (Show, Eq, Enum)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
data FediChordMessage = Request
 | 
					data FediChordMessage = Request
 | 
				
			||||||
    { requestID   :: Integer
 | 
					    { requestID   :: Integer
 | 
				
			||||||
    , receiverID  :: NodeID
 | 
					 | 
				
			||||||
    , sender      :: RemoteNodeState
 | 
					    , sender      :: RemoteNodeState
 | 
				
			||||||
    , part        :: Integer
 | 
					    , part        :: Integer
 | 
				
			||||||
    , isFinalPart :: Bool
 | 
					    , isFinalPart :: Bool
 | 
				
			||||||
| 
						 | 
					@ -59,10 +57,6 @@ data ActionPayload = QueryIDRequestPayload
 | 
				
			||||||
    }
 | 
					    }
 | 
				
			||||||
    | StabiliseRequestPayload
 | 
					    | StabiliseRequestPayload
 | 
				
			||||||
    | PingRequestPayload
 | 
					    | PingRequestPayload
 | 
				
			||||||
    | LoadRequestPayload
 | 
					 | 
				
			||||||
    { loadSegmentUpperBound :: NodeID
 | 
					 | 
				
			||||||
    -- ^ upper bound of segment interested in,
 | 
					 | 
				
			||||||
    }
 | 
					 | 
				
			||||||
    | QueryIDResponsePayload
 | 
					    | QueryIDResponsePayload
 | 
				
			||||||
    { queryResult :: QueryResponse
 | 
					    { queryResult :: QueryResponse
 | 
				
			||||||
    }
 | 
					    }
 | 
				
			||||||
| 
						 | 
					@ -79,12 +73,6 @@ data ActionPayload = QueryIDRequestPayload
 | 
				
			||||||
    | PingResponsePayload
 | 
					    | PingResponsePayload
 | 
				
			||||||
    { pingNodeStates :: [RemoteNodeState]
 | 
					    { pingNodeStates :: [RemoteNodeState]
 | 
				
			||||||
    }
 | 
					    }
 | 
				
			||||||
    | LoadResponsePayload
 | 
					 | 
				
			||||||
    { loadSum               :: Double
 | 
					 | 
				
			||||||
    , loadRemainingTarget   :: Double
 | 
					 | 
				
			||||||
    , loadTotalCapacity     :: Double
 | 
					 | 
				
			||||||
    , loadSegmentLowerBound :: NodeID
 | 
					 | 
				
			||||||
    }
 | 
					 | 
				
			||||||
    deriving (Show, Eq)
 | 
					    deriving (Show, Eq)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
-- | global limit of parts per message used when (de)serialising messages.
 | 
					-- | global limit of parts per message used when (de)serialising messages.
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
| 
						 | 
					@ -47,13 +47,6 @@ instance (Bounded k, Ord k) => Foldable (RingMap k) where
 | 
				
			||||||
          traversingFL acc (ProxyEntry _ Nothing) = acc
 | 
					          traversingFL acc (ProxyEntry _ Nothing) = acc
 | 
				
			||||||
          traversingFL acc (ProxyEntry _ (Just entry)) = traversingFL acc entry
 | 
					          traversingFL acc (ProxyEntry _ (Just entry)) = traversingFL acc entry
 | 
				
			||||||
 | 
					
 | 
				
			||||||
instance (Bounded k, Ord k) => Traversable (RingMap k) where
 | 
					 | 
				
			||||||
    traverse f = fmap RingMap . traverse traversingF . getRingMap
 | 
					 | 
				
			||||||
      where
 | 
					 | 
				
			||||||
          traversingF (KeyEntry entry) = KeyEntry <$> f entry
 | 
					 | 
				
			||||||
          traversingF (ProxyEntry to Nothing) = pure $ ProxyEntry to Nothing
 | 
					 | 
				
			||||||
          traversingF (ProxyEntry to (Just entry)) = ProxyEntry to . Just <$> traversingF entry
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
 | 
					
 | 
				
			||||||
-- | entry of a 'RingMap' that holds a value and can also
 | 
					-- | entry of a 'RingMap' that holds a value and can also
 | 
				
			||||||
-- wrap around the lookup direction at the edges of the name space.
 | 
					-- wrap around the lookup direction at the edges of the name space.
 | 
				
			||||||
| 
						 | 
					@ -113,23 +106,6 @@ rMapSize rmap = fromIntegral $ Map.size innerMap - oneIfEntry rmap minBound - on
 | 
				
			||||||
      | isNothing (rMapLookup nid rmap') = 1
 | 
					      | isNothing (rMapLookup nid rmap') = 1
 | 
				
			||||||
      | otherwise = 0
 | 
					      | otherwise = 0
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					 | 
				
			||||||
-- | whether the RingMap is empty (except for empty proxy entries)
 | 
					 | 
				
			||||||
nullRMap :: (Num k, Bounded k, Ord k)
 | 
					 | 
				
			||||||
         => RingMap k a
 | 
					 | 
				
			||||||
         -> Bool
 | 
					 | 
				
			||||||
-- basic idea: look for a predecessor starting from the upper Map bound,
 | 
					 | 
				
			||||||
-- Nothing indicates no entry being found
 | 
					 | 
				
			||||||
nullRMap = isNothing . rMapLookupPred maxBound
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
-- | O(logn( Is the key a member of the RingMap?
 | 
					 | 
				
			||||||
memberRMap :: (Bounded k, Ord k)
 | 
					 | 
				
			||||||
           => k
 | 
					 | 
				
			||||||
           -> RingMap k a
 | 
					 | 
				
			||||||
           -> Bool
 | 
					 | 
				
			||||||
memberRMap key = isJust . rMapLookup key
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
-- | a wrapper around lookup functions, making the lookup redirectable by a @ProxyEntry@
 | 
					-- | a wrapper around lookup functions, making the lookup redirectable by a @ProxyEntry@
 | 
				
			||||||
-- to simulate a modular ring
 | 
					-- to simulate a modular ring
 | 
				
			||||||
lookupWrapper :: (Bounded k, Ord k, Num k)
 | 
					lookupWrapper :: (Bounded k, Ord k, Num k)
 | 
				
			||||||
| 
						 | 
					@ -222,28 +198,12 @@ deleteRMapEntry nid = RingMap . Map.update modifier nid . getRingMap
 | 
				
			||||||
    modifier (ProxyEntry idPointer _) = Just (ProxyEntry idPointer Nothing)
 | 
					    modifier (ProxyEntry idPointer _) = Just (ProxyEntry idPointer Nothing)
 | 
				
			||||||
    modifier KeyEntry {}              = Nothing
 | 
					    modifier KeyEntry {}              = Nothing
 | 
				
			||||||
 | 
					
 | 
				
			||||||
-- TODO: rename this to elems
 | 
					 | 
				
			||||||
rMapToList :: (Bounded k, Ord k) => RingMap k a -> [a]
 | 
					rMapToList :: (Bounded k, Ord k) => RingMap k a -> [a]
 | 
				
			||||||
rMapToList = mapMaybe extractRingEntry . Map.elems . getRingMap
 | 
					rMapToList = mapMaybe extractRingEntry . Map.elems . getRingMap
 | 
				
			||||||
 | 
					
 | 
				
			||||||
-- TODO: rename this to toList
 | 
					 | 
				
			||||||
rMapToListWithKeys :: (Bounded k, Ord k) => RingMap k a -> [(k, a)]
 | 
					 | 
				
			||||||
rMapToListWithKeys = Map.foldrWithKey (\k v acc ->
 | 
					 | 
				
			||||||
    maybe acc (\val -> (k, val):acc) $ extractRingEntry v
 | 
					 | 
				
			||||||
                                      )
 | 
					 | 
				
			||||||
                                      []
 | 
					 | 
				
			||||||
                                      . getRingMap
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
rMapFromList :: (Bounded k, Ord k) => [(k, a)] -> RingMap k a
 | 
					rMapFromList :: (Bounded k, Ord k) => [(k, a)] -> RingMap k a
 | 
				
			||||||
rMapFromList = setRMapEntries
 | 
					rMapFromList = setRMapEntries
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					 | 
				
			||||||
-- | this just merges the underlying 'Map.Map' s and does not check whether the
 | 
					 | 
				
			||||||
-- ProxyEntry pointers are consistent, so better only create unions of
 | 
					 | 
				
			||||||
-- equal-pointered RingMaps
 | 
					 | 
				
			||||||
unionRMap :: (Bounded k, Ord k) => RingMap k a -> RingMap k a -> RingMap k a
 | 
					 | 
				
			||||||
unionRMap a b = RingMap $ Map.union (getRingMap a) (getRingMap b)
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
-- | takes up to i entries from a 'RingMap' by calling a getter function on a
 | 
					-- | takes up to i entries from a 'RingMap' by calling a getter function on a
 | 
				
			||||||
-- *startAt* value and after that on the previously returned value.
 | 
					-- *startAt* value and after that on the previously returned value.
 | 
				
			||||||
-- Stops once i entries have been taken or an entry has been encountered twice
 | 
					-- Stops once i entries have been taken or an entry has been encountered twice
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
| 
						 | 
					@ -7,7 +7,6 @@ import           Control.Concurrent.STM.TVar
 | 
				
			||||||
import           Control.Exception
 | 
					import           Control.Exception
 | 
				
			||||||
import           Data.ASN1.Parse             (runParseASN1)
 | 
					import           Data.ASN1.Parse             (runParseASN1)
 | 
				
			||||||
import qualified Data.ByteString             as BS
 | 
					import qualified Data.ByteString             as BS
 | 
				
			||||||
import qualified Data.HashMap.Strict         as HMap
 | 
					 | 
				
			||||||
import qualified Data.Map.Strict             as Map
 | 
					import qualified Data.Map.Strict             as Map
 | 
				
			||||||
import           Data.Maybe                  (fromJust, isJust)
 | 
					import           Data.Maybe                  (fromJust, isJust)
 | 
				
			||||||
import qualified Data.Set                    as Set
 | 
					import qualified Data.Set                    as Set
 | 
				
			||||||
| 
						 | 
					@ -19,7 +18,6 @@ import           Hash2Pub.ASN1Coding
 | 
				
			||||||
import           Hash2Pub.DHTProtocol
 | 
					import           Hash2Pub.DHTProtocol
 | 
				
			||||||
import           Hash2Pub.FediChord
 | 
					import           Hash2Pub.FediChord
 | 
				
			||||||
import           Hash2Pub.FediChordTypes
 | 
					import           Hash2Pub.FediChordTypes
 | 
				
			||||||
import           Hash2Pub.RingMap
 | 
					 | 
				
			||||||
 | 
					
 | 
				
			||||||
spec :: Spec
 | 
					spec :: Spec
 | 
				
			||||||
spec = do
 | 
					spec = do
 | 
				
			||||||
| 
						 | 
					@ -223,16 +221,14 @@ spec = do
 | 
				
			||||||
                  , exampleNodeState {nid = fromInteger (-5)}
 | 
					                  , exampleNodeState {nid = fromInteger (-5)}
 | 
				
			||||||
                                 ]
 | 
					                                 ]
 | 
				
			||||||
                                              }
 | 
					                                              }
 | 
				
			||||||
            qLoadReqPayload = LoadRequestPayload
 | 
					            requestTemplate = Request {
 | 
				
			||||||
                { loadSegmentUpperBound = 1025
 | 
					                requestID = 2342
 | 
				
			||||||
                }
 | 
					              , sender = exampleNodeState
 | 
				
			||||||
            qLoadResPayload = LoadResponsePayload
 | 
					              , part = 1
 | 
				
			||||||
                { loadSum = 3.141
 | 
					              , isFinalPart = True
 | 
				
			||||||
                , loadRemainingTarget = -1.337
 | 
					              , action = undefined
 | 
				
			||||||
                , loadTotalCapacity = 2.21
 | 
					              , payload = undefined
 | 
				
			||||||
                , loadSegmentLowerBound = 12
 | 
					                                      }
 | 
				
			||||||
                }
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
            responseTemplate = Response {
 | 
					            responseTemplate = Response {
 | 
				
			||||||
                requestID = 2342
 | 
					                requestID = 2342
 | 
				
			||||||
              , senderID = nid exampleNodeState
 | 
					              , senderID = nid exampleNodeState
 | 
				
			||||||
| 
						 | 
					@ -241,7 +237,7 @@ spec = do
 | 
				
			||||||
              , action = undefined
 | 
					              , action = undefined
 | 
				
			||||||
              , payload = undefined
 | 
					              , payload = undefined
 | 
				
			||||||
                                      }
 | 
					                                      }
 | 
				
			||||||
            requestWith senderNode a pa = mkRequest senderNode 4545 a (Just pa) 2342
 | 
					            requestWith a pa = requestTemplate {action = a, payload = Just pa}
 | 
				
			||||||
            responseWith a pa = responseTemplate {action = a, payload = Just pa}
 | 
					            responseWith a pa = responseTemplate {action = a, payload = Just pa}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
            encodeDecodeAndCheck msg = runParseASN1 parseMessage (encodeMessage msg) `shouldBe` pure msg
 | 
					            encodeDecodeAndCheck msg = runParseASN1 parseMessage (encodeMessage msg) `shouldBe` pure msg
 | 
				
			||||||
| 
						 | 
					@ -252,20 +248,17 @@ spec = do
 | 
				
			||||||
                                                                       }
 | 
					                                                                       }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
        it "messages are encoded and decoded correctly from and to ASN1" $ do
 | 
					        it "messages are encoded and decoded correctly from and to ASN1" $ do
 | 
				
			||||||
            localNS <- exampleLocalNode
 | 
					            encodeDecodeAndCheck $ requestWith QueryID qidReqPayload
 | 
				
			||||||
            encodeDecodeAndCheck $ requestWith localNS QueryID qidReqPayload
 | 
					            encodeDecodeAndCheck $ requestWith Join jReqPayload
 | 
				
			||||||
            encodeDecodeAndCheck $ requestWith localNS Join jReqPayload
 | 
					            encodeDecodeAndCheck $ requestWith Leave lReqPayload
 | 
				
			||||||
            encodeDecodeAndCheck $ requestWith localNS Leave lReqPayload
 | 
					            encodeDecodeAndCheck $ requestWith Stabilise stabReqPayload
 | 
				
			||||||
            encodeDecodeAndCheck $ requestWith localNS Stabilise stabReqPayload
 | 
					            encodeDecodeAndCheck $ requestWith Ping pingReqPayload
 | 
				
			||||||
            encodeDecodeAndCheck $ requestWith localNS Ping pingReqPayload
 | 
					 | 
				
			||||||
            encodeDecodeAndCheck $ requestWith localNS QueryLoad qLoadReqPayload
 | 
					 | 
				
			||||||
            encodeDecodeAndCheck $ responseWith QueryID qidResPayload1
 | 
					            encodeDecodeAndCheck $ responseWith QueryID qidResPayload1
 | 
				
			||||||
            encodeDecodeAndCheck $ responseWith QueryID qidResPayload2
 | 
					            encodeDecodeAndCheck $ responseWith QueryID qidResPayload2
 | 
				
			||||||
            encodeDecodeAndCheck $ responseWith Join jResPayload
 | 
					            encodeDecodeAndCheck $ responseWith Join jResPayload
 | 
				
			||||||
            encodeDecodeAndCheck $ responseWith Leave lResPayload
 | 
					            encodeDecodeAndCheck $ responseWith Leave lResPayload
 | 
				
			||||||
            encodeDecodeAndCheck $ responseWith Stabilise stabResPayload
 | 
					            encodeDecodeAndCheck $ responseWith Stabilise stabResPayload
 | 
				
			||||||
            encodeDecodeAndCheck $ responseWith Ping pingResPayload
 | 
					            encodeDecodeAndCheck $ responseWith Ping pingResPayload
 | 
				
			||||||
            encodeDecodeAndCheck $ responseWith QueryLoad qLoadResPayload
 | 
					 | 
				
			||||||
        it "messages are encoded and decoded to ASN.1 DER properly" $
 | 
					        it "messages are encoded and decoded to ASN.1 DER properly" $
 | 
				
			||||||
            deserialiseMessage (fromJust $ Map.lookup 1 (serialiseMessage 652  $ responseWith Ping pingResPayload)) `shouldBe` Right (responseWith Ping pingResPayload)
 | 
					            deserialiseMessage (fromJust $ Map.lookup 1 (serialiseMessage 652  $ responseWith Ping pingResPayload)) `shouldBe` Right (responseWith Ping pingResPayload)
 | 
				
			||||||
        it "messages too large for a single packet can (often) be split into multiple parts" $ do
 | 
					        it "messages too large for a single packet can (often) be split into multiple parts" $ do
 | 
				
			||||||
| 
						 | 
					@ -304,13 +297,13 @@ exampleNodeState = RemoteNodeState {
 | 
				
			||||||
 | 
					
 | 
				
			||||||
exampleLocalNode :: IO (LocalNodeState MockService)
 | 
					exampleLocalNode :: IO (LocalNodeState MockService)
 | 
				
			||||||
exampleLocalNode = do
 | 
					exampleLocalNode = do
 | 
				
			||||||
    realNodeSTM <- newTVarIO $ RealNode {
 | 
					    realNode <- newTVarIO $ RealNode {
 | 
				
			||||||
            vservers = emptyRMap
 | 
					            vservers = []
 | 
				
			||||||
          , nodeConfig = exampleFediConf
 | 
					          , nodeConfig = exampleFediConf
 | 
				
			||||||
          , bootstrapNodes = confBootstrapNodes exampleFediConf
 | 
					          , bootstrapNodes = confBootstrapNodes exampleFediConf
 | 
				
			||||||
          , nodeService = MockService
 | 
					          , nodeService = MockService
 | 
				
			||||||
                                                        }
 | 
					                                                        }
 | 
				
			||||||
    nodeStateInit realNodeSTM 0
 | 
					    nodeStateInit realNode
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
exampleFediConf :: FediChordConf
 | 
					exampleFediConf :: FediChordConf
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
		Loading…
	
	Add table
		Add a link
		
	
		Reference in a new issue