catch and handle more join errors
This commit is contained in:
		
							parent
							
								
									6ff765c63e
								
							
						
					
					
						commit
						b4ecf8b0aa
					
				
					 3 changed files with 8 additions and 7 deletions
				
			
		|  | @ -23,10 +23,10 @@ module Hash2Pub.DHTProtocol | ||||||
|         where |         where | ||||||
| 
 | 
 | ||||||
| import           Control.Concurrent.Async | import           Control.Concurrent.Async | ||||||
| import           Control.Exception |  | ||||||
| import           Control.Concurrent.STM | import           Control.Concurrent.STM | ||||||
| import           Control.Concurrent.STM.TBQueue | import           Control.Concurrent.STM.TBQueue | ||||||
| import           Control.Concurrent.STM.TQueue | import           Control.Concurrent.STM.TQueue | ||||||
|  | import           Control.Exception | ||||||
| import           Control.Monad                  (foldM, forM, forM_) | import           Control.Monad                  (foldM, forM, forM_) | ||||||
| import qualified Data.ByteString                as BS | import qualified Data.ByteString                as BS | ||||||
| import           Data.Either                    (rights) | import           Data.Either                    (rights) | ||||||
|  | @ -146,7 +146,7 @@ markCacheEntryAsVerified timestamp = Map.adjust adjustFunc | ||||||
| -- | send a join request and return the joined 'LocalNodeState' including neighbours | -- | send a join request and return the joined 'LocalNodeState' including neighbours | ||||||
| requestJoin :: NodeState a => a             -- ^ currently responsible node to be contacted | requestJoin :: NodeState a => a             -- ^ currently responsible node to be contacted | ||||||
|             -> LocalNodeState               -- ^ joining NodeState |             -> LocalNodeState               -- ^ joining NodeState | ||||||
|             -> IO (Maybe LocalNodeState)    -- ^ node after join with all its new information |             -> IO (Either String LocalNodeState)    -- ^ node after join with all its new information | ||||||
| requestJoin toJoinOn ownState = | requestJoin toJoinOn ownState = | ||||||
|     bracket (mkSendSocket (getDomain toJoinOn) (getDhtPort toJoinOn)) close (\sock -> do |     bracket (mkSendSocket (getDomain toJoinOn) (getDhtPort toJoinOn)) close (\sock -> do | ||||||
|         responses <- sendRequestTo 5000 3 (\rid -> Request rid (toRemoteNodeState ownState) 1 1 Join (Just JoinRequestPayload)) sock |         responses <- sendRequestTo 5000 3 (\rid -> Request rid (toRemoteNodeState ownState) 1 1 Join (Just JoinRequestPayload)) sock | ||||||
|  | @ -166,10 +166,11 @@ requestJoin toJoinOn ownState = | ||||||
|             (setPredecessors [] . setSuccessors [] $ ownState) |             (setPredecessors [] . setSuccessors [] $ ownState) | ||||||
|             responses |             responses | ||||||
|         if responses == Set.empty |         if responses == Set.empty | ||||||
|            then pure Nothing |            then pure . Left $ "join error: got no response from " <> show (getNid toJoinOn) | ||||||
|             -- sort successors and predecessors |             -- sort successors and predecessors | ||||||
|            else pure . Just . setSuccessors (sortBy localCompare $ successors joinedStateUnsorted) . setPredecessors (sortBy localCompare $ predecessors joinedStateUnsorted) $ joinedStateUnsorted |            else pure . Right . setSuccessors (sortBy localCompare $ successors joinedStateUnsorted) . setPredecessors (sortBy localCompare $ predecessors joinedStateUnsorted) $ joinedStateUnsorted | ||||||
|                                                                             ) |                                                                             ) | ||||||
|  |         `catch` (\e -> pure . Left $ displayException (e :: IOException)) | ||||||
| 
 | 
 | ||||||
| 
 | 
 | ||||||
| -- | Send a 'QueryID' 'Request' for getting the node that handles a certain key ID. | -- | Send a 'QueryID' 'Request' for getting the node that handles a certain key ID. | ||||||
|  |  | ||||||
|  | @ -154,8 +154,8 @@ fediChordJoin cacheSnapshot ns = do | ||||||
|     -- 2. then send a join to the currently responsible node |     -- 2. then send a join to the currently responsible node | ||||||
|     joinResult <- requestJoin currentlyResponsible ns |     joinResult <- requestJoin currentlyResponsible ns | ||||||
|     case joinResult of |     case joinResult of | ||||||
|       Nothing -> pure . Left $ "Error joining on " <> show currentlyResponsible |       Left err       -> pure . Left $ "Error joining on " <> err | ||||||
|       Just joinedNS -> pure . Right $ joinedNS |       Right joinedNS -> pure . Right $ joinedNS | ||||||
| 
 | 
 | ||||||
| 
 | 
 | ||||||
| -- | cache updater thread that waits for incoming NodeCache update instructions on | -- | cache updater thread that waits for incoming NodeCache update instructions on | ||||||
|  |  | ||||||
|  | @ -2,9 +2,9 @@ module Main where | ||||||
| 
 | 
 | ||||||
| import           Control.Concurrent | import           Control.Concurrent | ||||||
| import           Control.Exception | import           Control.Exception | ||||||
|  | import           Data.Either | ||||||
| import           Data.IP            (IPv6, toHostAddress6) | import           Data.IP            (IPv6, toHostAddress6) | ||||||
| import           System.Environment | import           System.Environment | ||||||
| import           Data.Either |  | ||||||
| 
 | 
 | ||||||
| import           Hash2Pub.FediChord | import           Hash2Pub.FediChord | ||||||
| 
 | 
 | ||||||
|  |  | ||||||
		Loading…
	
	Add table
		Add a link
		
	
		Reference in a new issue