forked from schmittlauch/Hash2Pub
		
	make all delays configurable and scale them according to a speedup factor
This commit is contained in:
		
							parent
							
								
									20050654bc
								
							
						
					
					
						commit
						4f08d33d2e
					
				
					 3 changed files with 43 additions and 38 deletions
				
			
		
							
								
								
									
										26
									
								
								app/Main.hs
									
										
									
									
									
								
							
							
						
						
									
										26
									
								
								app/Main.hs
									
										
									
									
									
								
							|  | @ -45,27 +45,31 @@ main = do | ||||||
| 
 | 
 | ||||||
| readConfig :: IO (FediChordConf, ServiceConf) | readConfig :: IO (FediChordConf, ServiceConf) | ||||||
| readConfig = do | readConfig = do | ||||||
|     confDomainString : ipString : portString : servicePortString : speedup : remainingArgs <- getArgs |     confDomainString : ipString : portString : servicePortString : speedupString : remainingArgs <- getArgs | ||||||
|     -- allow starting the initial node without bootstrapping info to avoid |     -- allow starting the initial node without bootstrapping info to avoid | ||||||
|     -- waiting for timeout |     -- waiting for timeout | ||||||
|     let |     let | ||||||
|  |         speedup = read speedupString | ||||||
|         confBootstrapNodes' = case remainingArgs of |         confBootstrapNodes' = case remainingArgs of | ||||||
|             bootstrapHost : bootstrapPortString : _ -> |             bootstrapHost : bootstrapPortString : _ -> | ||||||
|                 [(bootstrapHost, read bootstrapPortString)] |                 [(bootstrapHost, read bootstrapPortString)] | ||||||
|             _ -> [] |             _ -> [] | ||||||
|         fConf = FediChordConf { |         fConf = FediChordConf { | ||||||
|         confDomain = confDomainString |             confDomain = confDomainString | ||||||
|       , confIP = toHostAddress6 . read $ ipString |           , confIP = toHostAddress6 . read $ ipString | ||||||
|       , confDhtPort = read portString |           , confDhtPort = read portString | ||||||
|       , confBootstrapNodes = confBootstrapNodes' |           , confBootstrapNodes = confBootstrapNodes' | ||||||
|       --, confStabiliseInterval = 60 |           , confStabiliseInterval = 60 * 10^6 | ||||||
|       , confBootstrapSamplingInterval = 180 |           , confBootstrapSamplingInterval = 180 * 10^6 `div` speedup | ||||||
|       , confMaxLookupCacheAge = 300 |           , confMaxLookupCacheAge = 300 / fromIntegral speedup | ||||||
|  |           , confJoinAttemptsInterval = 60 * 10^6 `div` speedup | ||||||
|  |           , confMaxNodeCacheAge = 600 / fromIntegral speedup | ||||||
|  |           , confResponsePurgeAge = 60 / fromIntegral speedup | ||||||
|                            } |                            } | ||||||
|         sConf = ServiceConf { |         sConf = ServiceConf { | ||||||
|         confSubscriptionExpiryTime = fromIntegral $ 2*3600 `div` (read speedup :: Integer) |             confSubscriptionExpiryTime = fromIntegral $ 2*3600 `div` speedup | ||||||
|                             , confServicePort = read servicePortString |           , confServicePort = read servicePortString | ||||||
|                             , confServiceHost = confDomainString |           , confServiceHost = confDomainString | ||||||
|                             } |                             } | ||||||
|     pure (fConf, sConf) |     pure (fConf, sConf) | ||||||
| 
 | 
 | ||||||
|  |  | ||||||
|  | @ -199,7 +199,7 @@ convergenceSampleThread nsSTM = forever $ do | ||||||
|     -- unjoined node: try joining through all bootstrapping nodes |     -- unjoined node: try joining through all bootstrapping nodes | ||||||
|     else tryBootstrapJoining nsSTM >> pure () |     else tryBootstrapJoining nsSTM >> pure () | ||||||
|     let delaySecs = confBootstrapSamplingInterval . nodeConfig $ parentNode |     let delaySecs = confBootstrapSamplingInterval . nodeConfig $ parentNode | ||||||
|     threadDelay $ delaySecs * 10^6 |     threadDelay delaySecs | ||||||
| 
 | 
 | ||||||
| 
 | 
 | ||||||
| -- | Try joining the DHT through any of the bootstrapping nodes until it succeeds. | -- | Try joining the DHT through any of the bootstrapping nodes until it succeeds. | ||||||
|  | @ -310,12 +310,13 @@ joinOnNewEntriesThread nsSTM = loop | ||||||
|   where |   where | ||||||
|     loop = do |     loop = do | ||||||
|         nsSnap <- readTVarIO nsSTM |         nsSnap <- readTVarIO nsSTM | ||||||
|         (lookupResult, cache) <- atomically $ do |         (lookupResult, parentNode) <- atomically $ do | ||||||
|             cache <- readTVar $ nodeCacheSTM nsSnap |             cache <- readTVar $ nodeCacheSTM nsSnap | ||||||
|  |             parentNode <- readTVar $ parentRealNode nsSnap | ||||||
|             case queryLocalCache nsSnap cache 1 (getNid nsSnap) of |             case queryLocalCache nsSnap cache 1 (getNid nsSnap) of | ||||||
|               -- empty cache, block until cache changes and then retry |               -- empty cache, block until cache changes and then retry | ||||||
|               (FORWARD s) | Set.null s -> retry |               (FORWARD s) | Set.null s -> retry | ||||||
|               result                   -> pure (result, cache) |               result                   -> pure (result, parentNode) | ||||||
|         case lookupResult of |         case lookupResult of | ||||||
|           -- already joined |           -- already joined | ||||||
|           FOUND _ -> |           FOUND _ -> | ||||||
|  | @ -325,8 +326,7 @@ joinOnNewEntriesThread nsSTM = loop | ||||||
|               joinResult <- runExceptT $ fediChordVserverJoin nsSTM |               joinResult <- runExceptT $ fediChordVserverJoin nsSTM | ||||||
|               either |               either | ||||||
|                 -- on join failure, sleep and retry |                 -- on join failure, sleep and retry | ||||||
|                 -- TODO: make delay configurable |                 (const $ threadDelay (confJoinAttemptsInterval . nodeConfig $ parentNode) >> loop) | ||||||
|                 (const $ threadDelay (30 * 10^6) >> loop) |  | ||||||
|                 (const $ pure ()) |                 (const $ pure ()) | ||||||
|                 joinResult |                 joinResult | ||||||
| 
 | 
 | ||||||
|  | @ -341,20 +341,16 @@ nodeCacheWriter nsSTM = | ||||||
|         modifyTVar' (nodeCacheSTM ns) cacheModifier |         modifyTVar' (nodeCacheSTM ns) cacheModifier | ||||||
| 
 | 
 | ||||||
| 
 | 
 | ||||||
| -- TODO: make max entry age configurable |  | ||||||
| maxEntryAge :: POSIXTime |  | ||||||
| maxEntryAge = 600 |  | ||||||
| 
 |  | ||||||
| 
 |  | ||||||
| -- | Periodically iterate through cache, clean up expired entries and verify unverified ones | -- | Periodically iterate through cache, clean up expired entries and verify unverified ones | ||||||
| nodeCacheVerifyThread :: LocalNodeStateSTM s -> IO () | nodeCacheVerifyThread :: LocalNodeStateSTM s -> IO () | ||||||
| nodeCacheVerifyThread nsSTM = forever $ do | nodeCacheVerifyThread nsSTM = forever $ do | ||||||
|     putStrLn "cache verify run: begin" |     putStrLn "cache verify run: begin" | ||||||
|     -- get cache |     -- get cache | ||||||
|     (ns, cache) <- atomically $ do |     (ns, cache, maxEntryAge) <- atomically $ do | ||||||
|         ns <- readTVar nsSTM |         ns <- readTVar nsSTM | ||||||
|         cache <- readTVar $ nodeCacheSTM ns |         cache <- readTVar $ nodeCacheSTM ns | ||||||
|         pure (ns, cache) |         maxEntryAge <- confMaxNodeCacheAge . nodeConfig <$> readTVar (parentRealNode ns) | ||||||
|  |         pure (ns, cache, maxEntryAge) | ||||||
|     -- iterate entries: |     -- iterate entries: | ||||||
|     -- for avoiding too many time syscalls, get current time before iterating. |     -- for avoiding too many time syscalls, get current time before iterating. | ||||||
|     now <- getPOSIXTime |     now <- getPOSIXTime | ||||||
|  | @ -402,7 +398,7 @@ nodeCacheVerifyThread nsSTM = forever $ do | ||||||
|                                                              ) |                                                              ) | ||||||
| 
 | 
 | ||||||
|     putStrLn "cache verify run: end" |     putStrLn "cache verify run: end" | ||||||
|     threadDelay $ 10^6 * round maxEntryAge `div` 20 |     threadDelay $ fromEnum (maxEntryAge / 20) `div` 10^6    -- convert from pico to milliseconds | ||||||
| 
 | 
 | ||||||
| 
 | 
 | ||||||
| -- | Checks the invariant of at least @jEntries@ per cache slice. | -- | Checks the invariant of at least @jEntries@ per cache slice. | ||||||
|  | @ -548,8 +544,8 @@ stabiliseThread nsSTM = forever $ do | ||||||
|                     newPredecessor |                     newPredecessor | ||||||
| 
 | 
 | ||||||
|     putStrLn "stabilise run: end" |     putStrLn "stabilise run: end" | ||||||
|     -- TODO: make delay configurable |     stabiliseDelay <- confStabiliseInterval . nodeConfig <$> readTVarIO (parentRealNode newNs) | ||||||
|     threadDelay (60 * 10^6) |     threadDelay stabiliseDelay | ||||||
|   where |   where | ||||||
|     -- | send a stabilise request to the n-th neighbour |     -- | send a stabilise request to the n-th neighbour | ||||||
|     -- (specified by the provided getter function) and on failure retry |     -- (specified by the provided getter function) and on failure retry | ||||||
|  | @ -636,19 +632,15 @@ type RequestMap = Map.Map (SockAddr, Integer) RequestMapEntry | ||||||
| data RequestMapEntry = RequestMapEntry (Set.Set FediChordMessage) (Maybe Integer) | data RequestMapEntry = RequestMapEntry (Set.Set FediChordMessage) (Maybe Integer) | ||||||
|                   POSIXTime |                   POSIXTime | ||||||
| 
 | 
 | ||||||
| -- TODO: make purge age configurable |  | ||||||
| -- | periodically clean up old request parts |  | ||||||
| responsePurgeAge :: POSIXTime |  | ||||||
| responsePurgeAge = 60 -- seconds |  | ||||||
| 
 | 
 | ||||||
| requestMapPurge :: MVar RequestMap -> IO () | requestMapPurge :: POSIXTime -> MVar RequestMap -> IO () | ||||||
| requestMapPurge mapVar = forever $ do | requestMapPurge purgeAge mapVar = forever $ do | ||||||
|     rMapState <- takeMVar mapVar |     rMapState <- takeMVar mapVar | ||||||
|     now <- getPOSIXTime |     now <- getPOSIXTime | ||||||
|     putMVar mapVar $ Map.filter (\(RequestMapEntry _ _ ts)  -> |     putMVar mapVar $ Map.filter (\(RequestMapEntry _ _ ts)  -> | ||||||
|         now - ts < responsePurgeAge |         now - ts < purgeAge | ||||||
|                                 ) rMapState |                                 ) rMapState | ||||||
|     threadDelay $ round responsePurgeAge * 2 * 10^6 |     threadDelay $ (fromEnum purgeAge * 2) `div` 10^6 | ||||||
| 
 | 
 | ||||||
| 
 | 
 | ||||||
| -- | Wait for messages, deserialise them, manage parts and acknowledgement status, | -- | Wait for messages, deserialise them, manage parts and acknowledgement status, | ||||||
|  | @ -663,12 +655,13 @@ fediMessageHandler sendQ recvQ nsSTM = do | ||||||
|     -- not change. |     -- not change. | ||||||
|     -- Other functions are passed the nsSTM reference and thus can get the latest state. |     -- Other functions are passed the nsSTM reference and thus can get the latest state. | ||||||
|     nsSnap <- readTVarIO nsSTM |     nsSnap <- readTVarIO nsSTM | ||||||
|  |     nodeConf <- nodeConfig <$> readTVarIO (parentRealNode nsSnap) | ||||||
|     -- handling multipart messages: |     -- handling multipart messages: | ||||||
|     -- Request parts can be insert into a map (key: (sender IP against spoofing, request ID), value: timestamp + set of message parts, handle all of them when size of set == parts) before being handled. This map needs to be purged periodically by a separate thread and can be protected by an MVar for fairness. |     -- Request parts can be insert into a map (key: (sender IP against spoofing, request ID), value: timestamp + set of message parts, handle all of them when size of set == parts) before being handled. This map needs to be purged periodically by a separate thread and can be protected by an MVar for fairness. | ||||||
|     requestMap <- newMVar (Map.empty :: RequestMap) |     requestMap <- newMVar (Map.empty :: RequestMap) | ||||||
|     -- run receive loop and requestMapPurge concurrently, so that an exception makes |     -- run receive loop and requestMapPurge concurrently, so that an exception makes | ||||||
|     -- both of them fail |     -- both of them fail | ||||||
|     concurrently_ (requestMapPurge requestMap) $ forever $ do |     concurrently_ (requestMapPurge (confResponsePurgeAge nodeConf) requestMap) $ forever $ do | ||||||
|         -- wait for incoming messages |         -- wait for incoming messages | ||||||
|         (rawMsg, sourceAddr) <- atomically $ readTQueue recvQ |         (rawMsg, sourceAddr) <- atomically $ readTQueue recvQ | ||||||
|         let aMsg = deserialiseMessage rawMsg |         let aMsg = deserialiseMessage rawMsg | ||||||
|  | @ -807,4 +800,4 @@ lookupCacheCleanup nodeSTM = do | ||||||
|                 now - ts < confMaxLookupCacheAge (nodeConfig node) |                 now - ts < confMaxLookupCacheAge (nodeConfig node) | ||||||
|                        ) |                        ) | ||||||
|                                                        ) |                                                        ) | ||||||
|         threadDelay $ round (confMaxLookupCacheAge $ nodeConfig node) * (10^5) |         threadDelay $ fromEnum (2 * confMaxLookupCacheAge (nodeConfig node)) `div` 10^6 | ||||||
|  |  | ||||||
|  | @ -411,10 +411,18 @@ data FediChordConf = FediChordConf | ||||||
|     -- ^ listening port for the FediChord DHT |     -- ^ listening port for the FediChord DHT | ||||||
|     , confBootstrapNodes            :: [(String, PortNumber)] |     , confBootstrapNodes            :: [(String, PortNumber)] | ||||||
|     -- ^ list of potential bootstrapping nodes |     -- ^ list of potential bootstrapping nodes | ||||||
|  |     , confStabiliseInterval         :: Int | ||||||
|  |     -- ^ pause between stabilise runs, in milliseconds | ||||||
|     , confBootstrapSamplingInterval :: Int |     , confBootstrapSamplingInterval :: Int | ||||||
|     -- ^ pause between sampling the own ID through bootstrap nodes, in seconds |     -- ^ pause between sampling the own ID through bootstrap nodes, in milliseconds | ||||||
|     , confMaxLookupCacheAge         :: POSIXTime |     , confMaxLookupCacheAge         :: POSIXTime | ||||||
|     -- ^ maximum age of lookup cache entries in seconds |     -- ^ maximum age of key lookup cache entries in seconds | ||||||
|  |     , confJoinAttemptsInterval      :: Int | ||||||
|  |     -- ^ interval between join attempts on newly learned nodes, in milliseconds | ||||||
|  |     , confMaxNodeCacheAge           :: POSIXTime | ||||||
|  |     -- ^ maximum age of entries in the node cache, in milliseconds | ||||||
|  |     , confResponsePurgeAge          :: POSIXTime | ||||||
|  |     -- ^ maximum age of message parts in response part cache, in seconds | ||||||
|     } |     } | ||||||
|     deriving (Show, Eq) |     deriving (Show, Eq) | ||||||
| 
 | 
 | ||||||
|  |  | ||||||
		Loading…
	
	Add table
		Add a link
		
	
		Reference in a new issue