prepare sending of queryID messages
This commit is contained in:
parent
ff1e530a26
commit
2b234d65db
|
@ -11,7 +11,7 @@ import Data.Maybe (fromMaybe, mapMaybe)
|
||||||
import Data.Time.Clock.POSIX()
|
import Data.Time.Clock.POSIX()
|
||||||
import qualified Data.ByteString as BS
|
import qualified Data.ByteString as BS
|
||||||
import qualified Data.Set as Set
|
import qualified Data.Set as Set
|
||||||
import qualified Data.Map.Strict as Map()
|
import qualified Data.Map.Strict as Map
|
||||||
import Data.Bifunctor (first)
|
import Data.Bifunctor (first)
|
||||||
import Control.Exception (displayException)
|
import Control.Exception (displayException)
|
||||||
import Safe
|
import Safe
|
||||||
|
@ -79,7 +79,7 @@ chunkLength numParts totalSize = ceiling $ (realToFrac totalSize :: Double) / re
|
||||||
-- can be split into multiple parts.
|
-- can be split into multiple parts.
|
||||||
serialiseMessage :: Int -- maximum message size in bytes
|
serialiseMessage :: Int -- maximum message size in bytes
|
||||||
-> FediChordMessage -- mesage to be serialised in preparation for sending
|
-> FediChordMessage -- mesage to be serialised in preparation for sending
|
||||||
-> [BS.ByteString] -- list of ASN.1 DER encoded messages together representing
|
-> Map.Map Integer BS.ByteString -- list of ASN.1 DER encoded messages together representing
|
||||||
-- the contents of the input message
|
-- the contents of the input message
|
||||||
serialiseMessage maxBytesLength msg =
|
serialiseMessage maxBytesLength msg =
|
||||||
splitPayloadUntilSmallEnough 1
|
splitPayloadUntilSmallEnough 1
|
||||||
|
@ -91,11 +91,11 @@ serialiseMessage maxBytesLength msg =
|
||||||
-- splitting
|
-- splitting
|
||||||
| numParts == maximumParts = encodedMsgs numParts
|
| numParts == maximumParts = encodedMsgs numParts
|
||||||
| otherwise = splitPayloadUntilSmallEnough $ numParts + 1
|
| otherwise = splitPayloadUntilSmallEnough $ numParts + 1
|
||||||
messageParts :: Int -> [FediChordMessage]
|
messageParts :: Int -> Map.Map Integer FediChordMessage
|
||||||
messageParts i = foldr (modifyMessage i) [] $ payloadParts i
|
messageParts i = Map.fromAscList $ foldr (modifyMessage i) [] $ payloadParts i
|
||||||
-- insert payload parts into message and adjust parts metadata
|
-- insert payload parts into message and adjust parts metadata
|
||||||
modifyMessage :: Int -> (Integer, ActionPayload) -> [FediChordMessage] -> [FediChordMessage]
|
modifyMessage :: Int -> (Integer, ActionPayload) -> [(Integer, FediChordMessage)] -> [(Integer, FediChordMessage)]
|
||||||
modifyMessage i (partNum, pl) pls = (msg {
|
modifyMessage i (partNum, pl) pls = (partNum, msg {
|
||||||
part = partNum
|
part = partNum
|
||||||
, payload = pl
|
, payload = pl
|
||||||
, parts = fromIntegral i
|
, parts = fromIntegral i
|
||||||
|
@ -104,8 +104,8 @@ serialiseMessage maxBytesLength msg =
|
||||||
payloadParts :: Int -> [(Integer, ActionPayload)]
|
payloadParts :: Int -> [(Integer, ActionPayload)]
|
||||||
payloadParts i = zip [1..] (splitPayload i actionPayload)
|
payloadParts i = zip [1..] (splitPayload i actionPayload)
|
||||||
actionPayload = payload msg
|
actionPayload = payload msg
|
||||||
encodedMsgs i = map (encodeASN1' DER . encodeMessage) $ messageParts i
|
encodedMsgs i = Map.map (encodeASN1' DER . encodeMessage) $ messageParts i
|
||||||
maxMsgLength msgs = maximum $ map BS.length msgs
|
maxMsgLength = maximum . map BS.length . Map.elems
|
||||||
|
|
||||||
-- | Deserialise a ASN.1 DER encoded bytesstring of a single 'FediChordMessage'.
|
-- | Deserialise a ASN.1 DER encoded bytesstring of a single 'FediChordMessage'.
|
||||||
deserialiseMessage :: BS.ByteString
|
deserialiseMessage :: BS.ByteString
|
||||||
|
|
|
@ -2,9 +2,10 @@
|
||||||
|
|
||||||
module Hash2Pub.DHTProtocol
|
module Hash2Pub.DHTProtocol
|
||||||
( QueryResponse (..)
|
( QueryResponse (..)
|
||||||
, incomingQuery
|
, queryLocalCache
|
||||||
, addCacheEntry
|
, addCacheEntry
|
||||||
, deleteCacheEntry
|
, deleteCacheEntry
|
||||||
|
, markCacheEntryAsVerified
|
||||||
, RemoteCacheEntry(..)
|
, RemoteCacheEntry(..)
|
||||||
, toRemoteCacheEntry
|
, toRemoteCacheEntry
|
||||||
, remoteNode_
|
, remoteNode_
|
||||||
|
@ -51,8 +52,8 @@ data QueryResponse = FORWARD (Set.Set RemoteCacheEntry) -- ^return closest nodes
|
||||||
|
|
||||||
-- TODO: evaluate more fine-grained argument passing to allow granular locking
|
-- TODO: evaluate more fine-grained argument passing to allow granular locking
|
||||||
-- | look up an ID to either claim responsibility for it or return the closest l nodes from the local cache
|
-- | look up an ID to either claim responsibility for it or return the closest l nodes from the local cache
|
||||||
incomingQuery :: NodeState -> NodeCache -> Int -> NodeID -> QueryResponse
|
queryLocalCache :: NodeState -> NodeCache -> Int -> NodeID -> QueryResponse
|
||||||
incomingQuery ownState nCache lBestNodes targetID
|
queryLocalCache ownState nCache lBestNodes targetID
|
||||||
-- as target ID falls between own ID and first predecessor, it is handled by this node
|
-- as target ID falls between own ID and first predecessor, it is handled by this node
|
||||||
| (targetID `localCompare` ownID) `elem` [LT, EQ] && not (null preds) && (targetID `localCompare` head preds == GT) = FOUND ownState
|
| (targetID `localCompare` ownID) `elem` [LT, EQ] && not (null preds) && (targetID `localCompare` head preds == GT) = FOUND ownState
|
||||||
-- my interpretation: the "l best next hops" are the l-1 closest preceding nodes and
|
-- my interpretation: the "l best next hops" are the l-1 closest preceding nodes and
|
||||||
|
@ -207,3 +208,14 @@ markCacheEntryAsVerified timestamp = Map.adjust adjustFunc
|
||||||
adjustFunc (NodeEntry _ ns ts) = NodeEntry True ns $ fromMaybe ts timestamp
|
adjustFunc (NodeEntry _ ns ts) = NodeEntry True ns $ fromMaybe ts timestamp
|
||||||
adjustFunc (ProxyEntry _ (Just entry)) = adjustFunc entry
|
adjustFunc (ProxyEntry _ (Just entry)) = adjustFunc entry
|
||||||
adjustFunc entry = entry
|
adjustFunc entry = entry
|
||||||
|
|
||||||
|
-- | retry an IO action at most *i* times until it delivers a result
|
||||||
|
attempts :: Int -- ^ number of retries *i*
|
||||||
|
-> IO (Maybe a) -- ^ action to retry
|
||||||
|
-> IO (Maybe a) -- ^ result after at most *i* retries
|
||||||
|
attempts 0 _ = return Nothing
|
||||||
|
attempts i action = do
|
||||||
|
actionResult <- action
|
||||||
|
case actionResult of
|
||||||
|
Nothing -> attempts (i-1) action
|
||||||
|
Just res -> return res
|
||||||
|
|
|
@ -21,6 +21,7 @@ module Hash2Pub.FediChord (
|
||||||
, putSuccessors
|
, putSuccessors
|
||||||
, getPredecessors
|
, getPredecessors
|
||||||
, putPredecessors
|
, putPredecessors
|
||||||
|
, getLNumBestNodes
|
||||||
, NodeCache
|
, NodeCache
|
||||||
, CacheEntry(..)
|
, CacheEntry(..)
|
||||||
, cacheGetNodeStateUnvalidated
|
, cacheGetNodeStateUnvalidated
|
||||||
|
@ -218,6 +219,10 @@ getPredecessors = getInternals_ predecessors
|
||||||
putPredecessors :: [NodeID] -> NodeState -> NodeState
|
putPredecessors :: [NodeID] -> NodeState -> NodeState
|
||||||
putPredecessors pred' = putInternals_ (\i -> i {predecessors = pred'})
|
putPredecessors pred' = putInternals_ (\i -> i {predecessors = pred'})
|
||||||
|
|
||||||
|
-- | convenience function for extracting the @lNumBestNodes@ from a 'NodeState'
|
||||||
|
getLNumBestNodes :: NodeState -> Maybe Int
|
||||||
|
getLNumBestNodes = getInternals_ lNumBestNodes
|
||||||
|
|
||||||
type NodeCache = Map.Map NodeID CacheEntry
|
type NodeCache = Map.Map NodeID CacheEntry
|
||||||
|
|
||||||
-- |an entry of the 'nodeCache' can hold 2 different kinds of data
|
-- |an entry of the 'nodeCache' can hold 2 different kinds of data
|
||||||
|
@ -506,3 +511,14 @@ mkServerSocket ip port = do
|
||||||
setSocketOption sock IPv6Only 1
|
setSocketOption sock IPv6Only 1
|
||||||
bind sock sockAddr
|
bind sock sockAddr
|
||||||
return sock
|
return sock
|
||||||
|
|
||||||
|
-- | create a UDP datagram socket, connected to a destination.
|
||||||
|
-- The socket gets an arbitrary free local port assigned.
|
||||||
|
mkSendSocket :: String -- ^ destination hostname or IP
|
||||||
|
-> PortNumber -- ^ destination port
|
||||||
|
-> IO Socket -- ^ a socket with an arbitrary source port
|
||||||
|
mkSendSocket dest destPort = do
|
||||||
|
destAddr <- addrAddress <$> resolve (Just dest) (Just destPort)
|
||||||
|
sendSock <- socket AF_INET6 Datagram defaultProtocol
|
||||||
|
setSocketOption sendSock IPv6Only 1
|
||||||
|
return sendSock
|
||||||
|
|
|
@ -131,26 +131,26 @@ spec = do
|
||||||
cacheWith2Entries = addCacheWrapper (remoteEntryFromNow node1) =<< addCacheWrapper (remoteEntryFromNow node2) emptyCache
|
cacheWith2Entries = addCacheWrapper (remoteEntryFromNow node1) =<< addCacheWrapper (remoteEntryFromNow node2) emptyCache
|
||||||
cacheWith4Entries = addCacheWrapper (remoteEntryFromNow node3) =<< addCacheWrapper (remoteEntryFromNow node4) =<< cacheWith2Entries
|
cacheWith4Entries = addCacheWrapper (remoteEntryFromNow node3) =<< addCacheWrapper (remoteEntryFromNow node4) =<< cacheWith2Entries
|
||||||
it "works on an empty cache" $ do
|
it "works on an empty cache" $ do
|
||||||
incomingQuery exampleLocalNode emptyCache 3 (toNodeID 2^(9::Integer)+5) `shouldBe` FORWARD Set.empty
|
queryLocalCache exampleLocalNode emptyCache 3 (toNodeID 2^(9::Integer)+5) `shouldBe` FORWARD Set.empty
|
||||||
incomingQuery exampleLocalNode emptyCache 1 (toNodeID 2342) `shouldBe` FORWARD Set.empty
|
queryLocalCache exampleLocalNode emptyCache 1 (toNodeID 2342) `shouldBe` FORWARD Set.empty
|
||||||
it "works on a cache with less entries than needed" $ do
|
it "works on a cache with less entries than needed" $ do
|
||||||
c2 <- cacheWith2Entries
|
c2 <- cacheWith2Entries
|
||||||
let (FORWARD nodeset) = incomingQuery exampleLocalNode c2 4 (toNodeID 2^(9::Integer)+5)
|
let (FORWARD nodeset) = queryLocalCache exampleLocalNode c2 4 (toNodeID 2^(9::Integer)+5)
|
||||||
Set.map (nid . remoteNode_) nodeset `shouldBe` Set.fromList [ nid1, nid2 ]
|
Set.map (nid . remoteNode_) nodeset `shouldBe` Set.fromList [ nid1, nid2 ]
|
||||||
it "works on a cache with sufficient entries" $ do
|
it "works on a cache with sufficient entries" $ do
|
||||||
c4 <- cacheWith4Entries
|
c4 <- cacheWith4Entries
|
||||||
let
|
let
|
||||||
(FORWARD nodeset1) = incomingQuery exampleLocalNode c4 3 (toNodeID 2^(9::Integer)+5)
|
(FORWARD nodeset1) = queryLocalCache exampleLocalNode c4 3 (toNodeID 2^(9::Integer)+5)
|
||||||
(FORWARD nodeset2) = incomingQuery exampleLocalNode c4 1 (toNodeID 2^(9::Integer)+5)
|
(FORWARD nodeset2) = queryLocalCache exampleLocalNode c4 1 (toNodeID 2^(9::Integer)+5)
|
||||||
Set.map (nid . remoteNode_) nodeset1 `shouldBe` Set.fromList [nid4, nid2, nid3]
|
Set.map (nid . remoteNode_) nodeset1 `shouldBe` Set.fromList [nid4, nid2, nid3]
|
||||||
Set.map (nid . remoteNode_) nodeset2 `shouldBe` Set.fromList [nid4]
|
Set.map (nid . remoteNode_) nodeset2 `shouldBe` Set.fromList [nid4]
|
||||||
it "recognises the node's own responsibility" $ do
|
it "recognises the node's own responsibility" $ do
|
||||||
nC <- cacheWith4Entries
|
nC <- cacheWith4Entries
|
||||||
incomingQuery node1 nC 3 (toNodeID 2^(22::Integer)) `shouldBe` FOUND node1
|
queryLocalCache node1 nC 3 (toNodeID 2^(22::Integer)) `shouldBe` FOUND node1
|
||||||
incomingQuery node1 nC 3 nid1 `shouldBe` FOUND node1
|
queryLocalCache node1 nC 3 nid1 `shouldBe` FOUND node1
|
||||||
it "does not fail on nodes without neighbours (initial state)" $ do
|
it "does not fail on nodes without neighbours (initial state)" $ do
|
||||||
nC <- cacheWith4Entries
|
nC <- cacheWith4Entries
|
||||||
let (FORWARD nodeset) = incomingQuery exampleLocalNode nC 3 (toNodeID 11)
|
let (FORWARD nodeset) = queryLocalCache exampleLocalNode nC 3 (toNodeID 11)
|
||||||
Set.map (nid . remoteNode_ ) nodeset `shouldBe` Set.fromList [nid4, nid2, nid3]
|
Set.map (nid . remoteNode_ ) nodeset `shouldBe` Set.fromList [nid4, nid2, nid3]
|
||||||
|
|
||||||
describe "Messages can be encoded to and decoded from ASN.1" $ do
|
describe "Messages can be encoded to and decoded from ASN.1" $ do
|
||||||
|
|
Loading…
Reference in a new issue