Compare commits
4 commits
6699237243
...
4e359775ec
Author | SHA1 | Date | |
---|---|---|---|
|
4e359775ec | ||
|
2c827ea326 | ||
|
3892dc91aa | ||
|
7c87a578d3 |
|
@ -27,7 +27,8 @@ Request ::= SEQUENCE {
|
||||||
-- request and response instead of explicit flag
|
-- request and response instead of explicit flag
|
||||||
|
|
||||||
Response ::= SEQUENCE {
|
Response ::= SEQUENCE {
|
||||||
responseTo INTEGER (0..4294967295), -- arbitrarily restricting to an unsigned 32bit integer
|
-- requestID of the request responding to
|
||||||
|
requestID INTEGER (0..4294967295), -- arbitrarily restricting to an unsigned 32bit integer
|
||||||
senderID NodeID,
|
senderID NodeID,
|
||||||
part Partnum,
|
part Partnum,
|
||||||
finalPart BOOLEAN, -- flag indicating this `part` to be the last of this response
|
finalPart BOOLEAN, -- flag indicating this `part` to be the last of this response
|
||||||
|
|
|
@ -86,7 +86,7 @@ executable Hash2Pub
|
||||||
other-extensions: GeneralizedNewtypeDeriving
|
other-extensions: GeneralizedNewtypeDeriving
|
||||||
|
|
||||||
-- Directories containing source files.
|
-- Directories containing source files.
|
||||||
hs-source-dirs: src/Hash2Pub
|
hs-source-dirs: app
|
||||||
|
|
||||||
-- Base language which the package is written in.
|
-- Base language which the package is written in.
|
||||||
default-language: Haskell2010
|
default-language: Haskell2010
|
||||||
|
|
|
@ -226,9 +226,9 @@ encodeMessage
|
||||||
<> maybe [] encodePayload requestPayload
|
<> maybe [] encodePayload requestPayload
|
||||||
<> [End Sequence]
|
<> [End Sequence]
|
||||||
encodeMessage
|
encodeMessage
|
||||||
(Response responseTo senderID part isFinalPart action responsePayload) = [
|
(Response requestID senderID part isFinalPart action responsePayload) = [
|
||||||
Start Sequence
|
Start Sequence
|
||||||
, IntVal responseTo
|
, IntVal requestID
|
||||||
, IntVal . getNodeID $ senderID
|
, IntVal . getNodeID $ senderID
|
||||||
, IntVal part
|
, IntVal part
|
||||||
, Boolean isFinalPart
|
, Boolean isFinalPart
|
||||||
|
@ -277,7 +277,7 @@ parseRequest action = do
|
||||||
pure $ Request requestID sender part isFinalPart action payload
|
pure $ Request requestID sender part isFinalPart action payload
|
||||||
|
|
||||||
parseResponse :: Integer -> ParseASN1 FediChordMessage
|
parseResponse :: Integer -> ParseASN1 FediChordMessage
|
||||||
parseResponse responseTo = do
|
parseResponse requestID = do
|
||||||
senderID <- fromInteger <$> parseInteger :: ParseASN1 NodeID
|
senderID <- fromInteger <$> parseInteger :: ParseASN1 NodeID
|
||||||
part <- parseInteger
|
part <- parseInteger
|
||||||
isFinalPart <- parseBool
|
isFinalPart <- parseBool
|
||||||
|
@ -290,7 +290,7 @@ parseResponse responseTo = do
|
||||||
Stabilise -> parseStabiliseResponse
|
Stabilise -> parseStabiliseResponse
|
||||||
Ping -> parsePingResponse
|
Ping -> parsePingResponse
|
||||||
|
|
||||||
pure $ Response responseTo senderID part isFinalPart action payload
|
pure $ Response requestID senderID part isFinalPart action payload
|
||||||
|
|
||||||
parseBool :: ParseASN1 Bool
|
parseBool :: ParseASN1 Bool
|
||||||
parseBool = do
|
parseBool = do
|
||||||
|
|
|
@ -161,13 +161,14 @@ sendMessageSize = 1200
|
||||||
-- encode the response to a request that just signals successful receipt
|
-- encode the response to a request that just signals successful receipt
|
||||||
ackRequest :: NodeID -> FediChordMessage -> Map.Map Integer BS.ByteString
|
ackRequest :: NodeID -> FediChordMessage -> Map.Map Integer BS.ByteString
|
||||||
ackRequest ownID req@Request{} = serialiseMessage sendMessageSize $ Response {
|
ackRequest ownID req@Request{} = serialiseMessage sendMessageSize $ Response {
|
||||||
responseTo = requestID req
|
requestID = requestID req
|
||||||
, senderID = ownID
|
, senderID = ownID
|
||||||
, part = part req
|
, part = part req
|
||||||
, isFinalPart = False
|
, isFinalPart = False
|
||||||
, action = action req
|
, action = action req
|
||||||
, payload = Nothing
|
, payload = Nothing
|
||||||
}
|
}
|
||||||
|
ackRequest _ _ = Map.empty
|
||||||
|
|
||||||
|
|
||||||
-- | Dispatch incoming requests to the dedicated handling and response function, and enqueue
|
-- | Dispatch incoming requests to the dedicated handling and response function, and enqueue
|
||||||
|
@ -212,6 +213,7 @@ handleIncomingRequest nsSTM sendQ msgSet sourceAddr = do
|
||||||
-- | execute a key ID lookup on local cache and respond with the result
|
-- | execute a key ID lookup on local cache and respond with the result
|
||||||
respondQueryID :: LocalNodeStateSTM -> Set.Set FediChordMessage -> IO (Map.Map Integer BS.ByteString)
|
respondQueryID :: LocalNodeStateSTM -> Set.Set FediChordMessage -> IO (Map.Map Integer BS.ByteString)
|
||||||
respondQueryID nsSTM msgSet = do
|
respondQueryID nsSTM msgSet = do
|
||||||
|
putStrLn "responding to a QueryID request"
|
||||||
-- this message cannot be split reasonably, so just
|
-- this message cannot be split reasonably, so just
|
||||||
-- consider the first payload
|
-- consider the first payload
|
||||||
let
|
let
|
||||||
|
@ -232,7 +234,7 @@ respondQueryID nsSTM msgSet = do
|
||||||
queryResult = queryLocalCache nsSnap cache (fromIntegral $ queryLBestNodes senderPayload') (queryTargetID senderPayload')
|
queryResult = queryLocalCache nsSnap cache (fromIntegral $ queryLBestNodes senderPayload') (queryTargetID senderPayload')
|
||||||
}
|
}
|
||||||
queryResponseMsg = Response {
|
queryResponseMsg = Response {
|
||||||
responseTo = requestID aRequestPart
|
requestID = requestID aRequestPart
|
||||||
, senderID = getNid nsSnap
|
, senderID = getNid nsSnap
|
||||||
, part = if Set.size msgSet == 1 then 1 else fromIntegral $ Set.size msgSet + 1
|
, part = if Set.size msgSet == 1 then 1 else fromIntegral $ Set.size msgSet + 1
|
||||||
, isFinalPart = False
|
, isFinalPart = False
|
||||||
|
@ -266,7 +268,7 @@ respondLeave nsSTM msgSet = do
|
||||||
. setSuccessors (delete senderID $ requestSuccs <> successors nsSnap) $ nsSnap
|
. setSuccessors (delete senderID $ requestSuccs <> successors nsSnap) $ nsSnap
|
||||||
-- TODO: handle handover of key data
|
-- TODO: handle handover of key data
|
||||||
let leaveResponse = Response {
|
let leaveResponse = Response {
|
||||||
responseTo = requestID aRequestPart
|
requestID = requestID aRequestPart
|
||||||
, senderID = getNid nsSnap
|
, senderID = getNid nsSnap
|
||||||
, part = if Set.size msgSet == 1 then 1 else fromIntegral $ Set.size msgSet + 1
|
, part = if Set.size msgSet == 1 then 1 else fromIntegral $ Set.size msgSet + 1
|
||||||
, isFinalPart = False
|
, isFinalPart = False
|
||||||
|
@ -287,7 +289,7 @@ respondStabilise nsSTM msgSet = do
|
||||||
, stabilisePredecessors = predecessors nsSnap
|
, stabilisePredecessors = predecessors nsSnap
|
||||||
}
|
}
|
||||||
stabiliseResponse = Response {
|
stabiliseResponse = Response {
|
||||||
responseTo = requestID aRequestPart
|
requestID = requestID aRequestPart
|
||||||
, senderID = getNid nsSnap
|
, senderID = getNid nsSnap
|
||||||
, part = if Set.size msgSet == 1 then 1 else fromIntegral $ Set.size msgSet + 1
|
, part = if Set.size msgSet == 1 then 1 else fromIntegral $ Set.size msgSet + 1
|
||||||
, isFinalPart = False
|
, isFinalPart = False
|
||||||
|
@ -307,7 +309,7 @@ respondPing nsSTM msgSet = do
|
||||||
aRequestPart = Set.elemAt 0 msgSet
|
aRequestPart = Set.elemAt 0 msgSet
|
||||||
responsePayload = PingResponsePayload { pingNodeStates = [ toRemoteNodeState nsSnap ] }
|
responsePayload = PingResponsePayload { pingNodeStates = [ toRemoteNodeState nsSnap ] }
|
||||||
pingResponse = Response {
|
pingResponse = Response {
|
||||||
responseTo = requestID aRequestPart
|
requestID = requestID aRequestPart
|
||||||
, senderID = getNid nsSnap
|
, senderID = getNid nsSnap
|
||||||
, part = if Set.size msgSet == 1 then 1 else fromIntegral $ Set.size msgSet + 1
|
, part = if Set.size msgSet == 1 then 1 else fromIntegral $ Set.size msgSet + 1
|
||||||
, isFinalPart = False
|
, isFinalPart = False
|
||||||
|
@ -343,7 +345,7 @@ respondJoin nsSTM msgSet = do
|
||||||
, joinCache = toRemoteCache cache
|
, joinCache = toRemoteCache cache
|
||||||
}
|
}
|
||||||
joinResponse = Response {
|
joinResponse = Response {
|
||||||
responseTo = requestID aRequestPart
|
requestID = requestID aRequestPart
|
||||||
, senderID = getNid joinedNS
|
, senderID = getNid joinedNS
|
||||||
, part = if Set.size msgSet == 1 then 1 else fromIntegral $ Set.size msgSet + 1
|
, part = if Set.size msgSet == 1 then 1 else fromIntegral $ Set.size msgSet + 1
|
||||||
, isFinalPart = False
|
, isFinalPart = False
|
||||||
|
@ -354,7 +356,7 @@ respondJoin nsSTM msgSet = do
|
||||||
pure joinResponse
|
pure joinResponse
|
||||||
-- otherwise respond with empty payload
|
-- otherwise respond with empty payload
|
||||||
else pure Response {
|
else pure Response {
|
||||||
responseTo = requestID aRequestPart
|
requestID = requestID aRequestPart
|
||||||
, senderID = getNid nsSnap
|
, senderID = getNid nsSnap
|
||||||
, part = if Set.size msgSet == 1 then 1 else fromIntegral $ Set.size msgSet + 1
|
, part = if Set.size msgSet == 1 then 1 else fromIntegral $ Set.size msgSet + 1
|
||||||
, isFinalPart = False
|
, isFinalPart = False
|
||||||
|
@ -486,7 +488,10 @@ sendRequestTo :: Int -- ^ timeout in seconds
|
||||||
sendRequestTo timeoutMillis numAttempts msgIncomplete sock = do
|
sendRequestTo timeoutMillis numAttempts msgIncomplete sock = do
|
||||||
-- give the message a random request ID
|
-- give the message a random request ID
|
||||||
randomID <- randomRIO (0, 2^32-1)
|
randomID <- randomRIO (0, 2^32-1)
|
||||||
let requests = serialiseMessage sendMessageSize $ msgIncomplete randomID
|
let
|
||||||
|
msgComplete = msgIncomplete randomID
|
||||||
|
requests = serialiseMessage sendMessageSize msgComplete
|
||||||
|
putStrLn $ "sending request message " <> show msgComplete
|
||||||
-- create a queue for passing received response messages back, even after a timeout
|
-- create a queue for passing received response messages back, even after a timeout
|
||||||
responseQ <- newTBQueueIO $ 2*maximumParts -- keep room for duplicate packets
|
responseQ <- newTBQueueIO $ 2*maximumParts -- keep room for duplicate packets
|
||||||
-- start sendAndAck with timeout
|
-- start sendAndAck with timeout
|
||||||
|
|
|
@ -128,6 +128,7 @@ fediChordBootstrapJoin :: LocalNodeStateSTM -- ^ the local 'NodeSta
|
||||||
fediChordBootstrapJoin nsSTM (joinHost, joinPort) =
|
fediChordBootstrapJoin nsSTM (joinHost, joinPort) =
|
||||||
-- can be invoked multiple times with all known bootstrapping nodes until successfully joined
|
-- can be invoked multiple times with all known bootstrapping nodes until successfully joined
|
||||||
bracket (mkSendSocket joinHost joinPort) close (\sock -> do
|
bracket (mkSendSocket joinHost joinPort) close (\sock -> do
|
||||||
|
putStrLn "BootstrapJoin"
|
||||||
-- 1. get routed to placement of own ID until FOUND:
|
-- 1. get routed to placement of own ID until FOUND:
|
||||||
-- Initialise an empty cache only with the responses from a bootstrapping node
|
-- Initialise an empty cache only with the responses from a bootstrapping node
|
||||||
ns <- readTVarIO nsSTM
|
ns <- readTVarIO nsSTM
|
||||||
|
@ -145,6 +146,7 @@ fediChordBootstrapJoin nsSTM (joinHost, joinPort) =
|
||||||
Just (FORWARD resultset) -> foldr' (addCacheEntryPure now) cacheAcc resultset
|
Just (FORWARD resultset) -> foldr' (addCacheEntryPure now) cacheAcc resultset
|
||||||
)
|
)
|
||||||
initCache bootstrapResponse
|
initCache bootstrapResponse
|
||||||
|
putStrLn "initialised bootstrap cache"
|
||||||
fediChordJoin bootstrapCache nsSTM
|
fediChordJoin bootstrapCache nsSTM
|
||||||
)
|
)
|
||||||
`catch` (\e -> pure . Left $ "Error at bootstrap joining: " <> displayException (e :: IOException))
|
`catch` (\e -> pure . Left $ "Error at bootstrap joining: " <> displayException (e :: IOException))
|
||||||
|
|
|
@ -30,7 +30,7 @@ data FediChordMessage = Request
|
||||||
, payload :: Maybe ActionPayload
|
, payload :: Maybe ActionPayload
|
||||||
}
|
}
|
||||||
| Response
|
| Response
|
||||||
{ responseTo :: Integer
|
{ requestID :: Integer
|
||||||
, senderID :: NodeID
|
, senderID :: NodeID
|
||||||
, part :: Integer
|
, part :: Integer
|
||||||
, isFinalPart :: Bool
|
, isFinalPart :: Bool
|
||||||
|
@ -40,8 +40,12 @@ data FediChordMessage = Request
|
||||||
deriving (Show, Eq)
|
deriving (Show, Eq)
|
||||||
|
|
||||||
instance Ord FediChordMessage where
|
instance Ord FediChordMessage where
|
||||||
compare a b | requestID a == requestID b = part a `compare` part b
|
compare a@Request{} b@Request{} | requestID a == requestID b = part a `compare` part b
|
||||||
| otherwise = requestID a `compare` requestID b
|
| otherwise = requestID a `compare` requestID b
|
||||||
|
compare a@Response{} b@Response{} | requestID a == requestID b = part a `compare` part b
|
||||||
|
| otherwise = requestID a `compare` requestID b
|
||||||
|
-- comparing different constructor types always yields "not equal"
|
||||||
|
compare _ _ = LT
|
||||||
|
|
||||||
data ActionPayload = QueryIDRequestPayload
|
data ActionPayload = QueryIDRequestPayload
|
||||||
{ queryTargetID :: NodeID
|
{ queryTargetID :: NodeID
|
||||||
|
|
|
@ -201,7 +201,7 @@ spec = do
|
||||||
, payload = undefined
|
, payload = undefined
|
||||||
}
|
}
|
||||||
responseTemplate = Response {
|
responseTemplate = Response {
|
||||||
responseTo = 2342
|
requestID = 2342
|
||||||
, senderID = nid exampleNodeState
|
, senderID = nid exampleNodeState
|
||||||
, part = 1
|
, part = 1
|
||||||
, isFinalPart = True
|
, isFinalPart = True
|
||||||
|
|
Loading…
Reference in a new issue