Compare commits
4 commits
6699237243
...
4e359775ec
Author | SHA1 | Date | |
---|---|---|---|
|
4e359775ec | ||
|
2c827ea326 | ||
|
3892dc91aa | ||
|
7c87a578d3 |
|
@ -27,7 +27,8 @@ Request ::= SEQUENCE {
|
|||
-- request and response instead of explicit flag
|
||||
|
||||
Response ::= SEQUENCE {
|
||||
responseTo INTEGER (0..4294967295), -- arbitrarily restricting to an unsigned 32bit integer
|
||||
-- requestID of the request responding to
|
||||
requestID INTEGER (0..4294967295), -- arbitrarily restricting to an unsigned 32bit integer
|
||||
senderID NodeID,
|
||||
part Partnum,
|
||||
finalPart BOOLEAN, -- flag indicating this `part` to be the last of this response
|
||||
|
|
|
@ -86,7 +86,7 @@ executable Hash2Pub
|
|||
other-extensions: GeneralizedNewtypeDeriving
|
||||
|
||||
-- Directories containing source files.
|
||||
hs-source-dirs: src/Hash2Pub
|
||||
hs-source-dirs: app
|
||||
|
||||
-- Base language which the package is written in.
|
||||
default-language: Haskell2010
|
||||
|
|
|
@ -226,9 +226,9 @@ encodeMessage
|
|||
<> maybe [] encodePayload requestPayload
|
||||
<> [End Sequence]
|
||||
encodeMessage
|
||||
(Response responseTo senderID part isFinalPart action responsePayload) = [
|
||||
(Response requestID senderID part isFinalPart action responsePayload) = [
|
||||
Start Sequence
|
||||
, IntVal responseTo
|
||||
, IntVal requestID
|
||||
, IntVal . getNodeID $ senderID
|
||||
, IntVal part
|
||||
, Boolean isFinalPart
|
||||
|
@ -277,7 +277,7 @@ parseRequest action = do
|
|||
pure $ Request requestID sender part isFinalPart action payload
|
||||
|
||||
parseResponse :: Integer -> ParseASN1 FediChordMessage
|
||||
parseResponse responseTo = do
|
||||
parseResponse requestID = do
|
||||
senderID <- fromInteger <$> parseInteger :: ParseASN1 NodeID
|
||||
part <- parseInteger
|
||||
isFinalPart <- parseBool
|
||||
|
@ -290,7 +290,7 @@ parseResponse responseTo = do
|
|||
Stabilise -> parseStabiliseResponse
|
||||
Ping -> parsePingResponse
|
||||
|
||||
pure $ Response responseTo senderID part isFinalPart action payload
|
||||
pure $ Response requestID senderID part isFinalPart action payload
|
||||
|
||||
parseBool :: ParseASN1 Bool
|
||||
parseBool = do
|
||||
|
|
|
@ -161,13 +161,14 @@ sendMessageSize = 1200
|
|||
-- encode the response to a request that just signals successful receipt
|
||||
ackRequest :: NodeID -> FediChordMessage -> Map.Map Integer BS.ByteString
|
||||
ackRequest ownID req@Request{} = serialiseMessage sendMessageSize $ Response {
|
||||
responseTo = requestID req
|
||||
requestID = requestID req
|
||||
, senderID = ownID
|
||||
, part = part req
|
||||
, isFinalPart = False
|
||||
, action = action req
|
||||
, payload = Nothing
|
||||
}
|
||||
ackRequest _ _ = Map.empty
|
||||
|
||||
|
||||
-- | Dispatch incoming requests to the dedicated handling and response function, and enqueue
|
||||
|
@ -212,6 +213,7 @@ handleIncomingRequest nsSTM sendQ msgSet sourceAddr = do
|
|||
-- | execute a key ID lookup on local cache and respond with the result
|
||||
respondQueryID :: LocalNodeStateSTM -> Set.Set FediChordMessage -> IO (Map.Map Integer BS.ByteString)
|
||||
respondQueryID nsSTM msgSet = do
|
||||
putStrLn "responding to a QueryID request"
|
||||
-- this message cannot be split reasonably, so just
|
||||
-- consider the first payload
|
||||
let
|
||||
|
@ -232,7 +234,7 @@ respondQueryID nsSTM msgSet = do
|
|||
queryResult = queryLocalCache nsSnap cache (fromIntegral $ queryLBestNodes senderPayload') (queryTargetID senderPayload')
|
||||
}
|
||||
queryResponseMsg = Response {
|
||||
responseTo = requestID aRequestPart
|
||||
requestID = requestID aRequestPart
|
||||
, senderID = getNid nsSnap
|
||||
, part = if Set.size msgSet == 1 then 1 else fromIntegral $ Set.size msgSet + 1
|
||||
, isFinalPart = False
|
||||
|
@ -266,7 +268,7 @@ respondLeave nsSTM msgSet = do
|
|||
. setSuccessors (delete senderID $ requestSuccs <> successors nsSnap) $ nsSnap
|
||||
-- TODO: handle handover of key data
|
||||
let leaveResponse = Response {
|
||||
responseTo = requestID aRequestPart
|
||||
requestID = requestID aRequestPart
|
||||
, senderID = getNid nsSnap
|
||||
, part = if Set.size msgSet == 1 then 1 else fromIntegral $ Set.size msgSet + 1
|
||||
, isFinalPart = False
|
||||
|
@ -287,7 +289,7 @@ respondStabilise nsSTM msgSet = do
|
|||
, stabilisePredecessors = predecessors nsSnap
|
||||
}
|
||||
stabiliseResponse = Response {
|
||||
responseTo = requestID aRequestPart
|
||||
requestID = requestID aRequestPart
|
||||
, senderID = getNid nsSnap
|
||||
, part = if Set.size msgSet == 1 then 1 else fromIntegral $ Set.size msgSet + 1
|
||||
, isFinalPart = False
|
||||
|
@ -307,7 +309,7 @@ respondPing nsSTM msgSet = do
|
|||
aRequestPart = Set.elemAt 0 msgSet
|
||||
responsePayload = PingResponsePayload { pingNodeStates = [ toRemoteNodeState nsSnap ] }
|
||||
pingResponse = Response {
|
||||
responseTo = requestID aRequestPart
|
||||
requestID = requestID aRequestPart
|
||||
, senderID = getNid nsSnap
|
||||
, part = if Set.size msgSet == 1 then 1 else fromIntegral $ Set.size msgSet + 1
|
||||
, isFinalPart = False
|
||||
|
@ -343,7 +345,7 @@ respondJoin nsSTM msgSet = do
|
|||
, joinCache = toRemoteCache cache
|
||||
}
|
||||
joinResponse = Response {
|
||||
responseTo = requestID aRequestPart
|
||||
requestID = requestID aRequestPart
|
||||
, senderID = getNid joinedNS
|
||||
, part = if Set.size msgSet == 1 then 1 else fromIntegral $ Set.size msgSet + 1
|
||||
, isFinalPart = False
|
||||
|
@ -354,7 +356,7 @@ respondJoin nsSTM msgSet = do
|
|||
pure joinResponse
|
||||
-- otherwise respond with empty payload
|
||||
else pure Response {
|
||||
responseTo = requestID aRequestPart
|
||||
requestID = requestID aRequestPart
|
||||
, senderID = getNid nsSnap
|
||||
, part = if Set.size msgSet == 1 then 1 else fromIntegral $ Set.size msgSet + 1
|
||||
, isFinalPart = False
|
||||
|
@ -486,7 +488,10 @@ sendRequestTo :: Int -- ^ timeout in seconds
|
|||
sendRequestTo timeoutMillis numAttempts msgIncomplete sock = do
|
||||
-- give the message a random request ID
|
||||
randomID <- randomRIO (0, 2^32-1)
|
||||
let requests = serialiseMessage sendMessageSize $ msgIncomplete randomID
|
||||
let
|
||||
msgComplete = msgIncomplete randomID
|
||||
requests = serialiseMessage sendMessageSize msgComplete
|
||||
putStrLn $ "sending request message " <> show msgComplete
|
||||
-- create a queue for passing received response messages back, even after a timeout
|
||||
responseQ <- newTBQueueIO $ 2*maximumParts -- keep room for duplicate packets
|
||||
-- start sendAndAck with timeout
|
||||
|
|
|
@ -128,6 +128,7 @@ fediChordBootstrapJoin :: LocalNodeStateSTM -- ^ the local 'NodeSta
|
|||
fediChordBootstrapJoin nsSTM (joinHost, joinPort) =
|
||||
-- can be invoked multiple times with all known bootstrapping nodes until successfully joined
|
||||
bracket (mkSendSocket joinHost joinPort) close (\sock -> do
|
||||
putStrLn "BootstrapJoin"
|
||||
-- 1. get routed to placement of own ID until FOUND:
|
||||
-- Initialise an empty cache only with the responses from a bootstrapping node
|
||||
ns <- readTVarIO nsSTM
|
||||
|
@ -145,6 +146,7 @@ fediChordBootstrapJoin nsSTM (joinHost, joinPort) =
|
|||
Just (FORWARD resultset) -> foldr' (addCacheEntryPure now) cacheAcc resultset
|
||||
)
|
||||
initCache bootstrapResponse
|
||||
putStrLn "initialised bootstrap cache"
|
||||
fediChordJoin bootstrapCache nsSTM
|
||||
)
|
||||
`catch` (\e -> pure . Left $ "Error at bootstrap joining: " <> displayException (e :: IOException))
|
||||
|
|
|
@ -30,7 +30,7 @@ data FediChordMessage = Request
|
|||
, payload :: Maybe ActionPayload
|
||||
}
|
||||
| Response
|
||||
{ responseTo :: Integer
|
||||
{ requestID :: Integer
|
||||
, senderID :: NodeID
|
||||
, part :: Integer
|
||||
, isFinalPart :: Bool
|
||||
|
@ -40,8 +40,12 @@ data FediChordMessage = Request
|
|||
deriving (Show, Eq)
|
||||
|
||||
instance Ord FediChordMessage where
|
||||
compare a b | requestID a == requestID b = part a `compare` part b
|
||||
compare a@Request{} b@Request{} | requestID a == requestID b = part a `compare` part b
|
||||
| otherwise = requestID a `compare` requestID b
|
||||
compare a@Response{} b@Response{} | requestID a == requestID b = part a `compare` part b
|
||||
| otherwise = requestID a `compare` requestID b
|
||||
-- comparing different constructor types always yields "not equal"
|
||||
compare _ _ = LT
|
||||
|
||||
data ActionPayload = QueryIDRequestPayload
|
||||
{ queryTargetID :: NodeID
|
||||
|
|
|
@ -201,7 +201,7 @@ spec = do
|
|||
, payload = undefined
|
||||
}
|
||||
responseTemplate = Response {
|
||||
responseTo = 2342
|
||||
requestID = 2342
|
||||
, senderID = nid exampleNodeState
|
||||
, part = 1
|
||||
, isFinalPart = True
|
||||
|
|
Loading…
Reference in a new issue