extract queryID message sending function so it can be used for first join
This commit is contained in:
		
							parent
							
								
									16769d1395
								
							
						
					
					
						commit
						d5841d13fd
					
				
					 1 changed files with 12 additions and 3 deletions
				
			
		|  | @ -12,6 +12,8 @@ module Hash2Pub.DHTProtocol | ||||||
|     , ActionPayload(..) |     , ActionPayload(..) | ||||||
|     , FediChordMessage(..) |     , FediChordMessage(..) | ||||||
|     , maximumParts |     , maximumParts | ||||||
|  |     , sendQueryIdMessage | ||||||
|  |     , requestQueryID | ||||||
|     ) |     ) | ||||||
|         where |         where | ||||||
| 
 | 
 | ||||||
|  | @ -152,7 +154,7 @@ requestQueryID ns targetID = do | ||||||
|               -- create connected sockets to all query targets |               -- create connected sockets to all query targets | ||||||
|               sockets <- mapM (\resultNode -> mkSendSocket (domain resultNode) (dhtPort resultNode)) $ remoteNode <$> Set.toList nodeSet |               sockets <- mapM (\resultNode -> mkSendSocket (domain resultNode) (dhtPort resultNode)) $ remoteNode <$> Set.toList nodeSet | ||||||
|               -- ToDo: make attempts and timeout configurable |               -- ToDo: make attempts and timeout configurable | ||||||
|               queryThreads <- mapM (async . sendRequestTo 5000 3 (lookupMessage targetID)) sockets |               queryThreads <- mapM (async . sendQueryIdMessage targetID ns) sockets | ||||||
|               -- ToDo: process results immediately instead of waiting for the last one to finish, see https://stackoverflow.com/a/38815224/9198613 |               -- ToDo: process results immediately instead of waiting for the last one to finish, see https://stackoverflow.com/a/38815224/9198613 | ||||||
|               -- ToDo: exception handling, maybe log them |               -- ToDo: exception handling, maybe log them | ||||||
|               responses <- (mconcat . fmap Set.elems) . rights <$> mapM waitCatch queryThreads |               responses <- (mconcat . fmap Set.elems) . rights <$> mapM waitCatch queryThreads | ||||||
|  | @ -179,7 +181,14 @@ requestQueryID ns targetID = do | ||||||
|               -- if no FOUND, recursively call lookup again |               -- if no FOUND, recursively call lookup again | ||||||
|               maybe (lookupLoop newLCache) pure foundResp |               maybe (lookupLoop newLCache) pure foundResp | ||||||
| 
 | 
 | ||||||
|     lookupMessage targetID rID = Request rID (toRemoteNodeState ns) 1 1 QueryID (Just $ pl ns targetID) | 
 | ||||||
|  | sendQueryIdMessage :: NodeID                        -- ^ target key ID to look up | ||||||
|  |                    -> LocalNodeState                -- ^ node state of the node doing the query | ||||||
|  |                    -> Socket                        -- ^ connected socket to use for sending | ||||||
|  |                    -> IO (Set.Set FediChordMessage) -- ^ responses | ||||||
|  | sendQueryIdMessage targetID ns = sendRequestTo 5000 3 (lookupMessage targetID ns) | ||||||
|  |     where | ||||||
|  |         lookupMessage targetID ns rID = Request rID (toRemoteNodeState ns) 1 1 QueryID (Just $ pl ns targetID) | ||||||
|         pl ns' targetID' = QueryIDRequestPayload { queryTargetID = targetID', queryLBestNodes = fromIntegral . lNumBestNodes $ ns } |         pl ns' targetID' = QueryIDRequestPayload { queryTargetID = targetID', queryLBestNodes = fromIntegral . lNumBestNodes $ ns } | ||||||
| 
 | 
 | ||||||
| -- | Generic function for sending a request over a connected socket and collecting the response. | -- | Generic function for sending a request over a connected socket and collecting the response. | ||||||
|  |  | ||||||
		Loading…
	
	Add table
		Add a link
		
	
		Reference in a new issue