diff --git a/Hash2Pub.cabal b/Hash2Pub.cabal index e3aa4c1..bf4d856 100644 --- a/Hash2Pub.cabal +++ b/Hash2Pub.cabal @@ -55,7 +55,7 @@ library import: deps -- Modules exported by the library. - exposed-modules: Hash2Pub.FediChord, Hash2Pub.DHTProtocol, Hash2Pub.ASN1Coding + exposed-modules: Hash2Pub.FediChord, Hash2Pub.DHTProtocol, Hash2Pub.ASN1Coding, Hash2Pub.ProtocolTypes -- Modules included in this library but not exported. other-modules: Hash2Pub.Utils diff --git a/src/Hash2Pub/ASN1Coding.hs b/src/Hash2Pub/ASN1Coding.hs index faa653f..d11073d 100644 --- a/src/Hash2Pub/ASN1Coding.hs +++ b/src/Hash2Pub/ASN1Coding.hs @@ -18,7 +18,7 @@ import Safe import Hash2Pub.FediChord import Hash2Pub.Utils -import Hash2Pub.DHTProtocol +import Hash2Pub.ProtocolTypes import Debug.Trace @@ -77,6 +77,9 @@ chunkLength numParts totalSize = ceiling $ (realToFrac totalSize :: Double) / re -- The number of parts per message is limited to 150 for DOS protection reasons. -- The returned byte strings might exceed the desired maximum length, as only the payload (and not all of them) -- can be split into multiple parts. +-- +-- The return type is a Map from part number to encoded part, to be able to acknowledge +-- an encoded part without having to decode its number. serialiseMessage :: Int -- maximum message size in bytes -> FediChordMessage -- mesage to be serialised in preparation for sending -> Map.Map Integer BS.ByteString -- list of ASN.1 DER encoded messages together representing diff --git a/src/Hash2Pub/DHTProtocol.hs b/src/Hash2Pub/DHTProtocol.hs index 4688d39..1f3fabc 100644 --- a/src/Hash2Pub/DHTProtocol.hs +++ b/src/Hash2Pub/DHTProtocol.hs @@ -20,11 +20,15 @@ module Hash2Pub.DHTProtocol import Data.Maybe (maybe, fromMaybe) import qualified Data.Set as Set import qualified Data.Map as Map +import qualified Data.ByteString as BS import Data.Time.Clock.POSIX import Network.Socket hiding (send, sendTo, recv, recvFrom) import Network.Socket.ByteString import System.Timeout import Control.Monad.State.Strict +import Control.Concurrent.STM +import Control.Concurrent.STM.TQueue +import Control.Concurrent.STM.TBQueue import Hash2Pub.FediChord ( NodeID @@ -42,17 +46,13 @@ import Hash2Pub.FediChord , localCompare ) +import Hash2Pub.ASN1Coding +import Hash2Pub.ProtocolTypes + import Debug.Trace (trace) -- === queries === -data QueryResponse = FORWARD (Set.Set RemoteCacheEntry) -- ^return closest nodes from local cache. - -- whole cache entry is returned for making - -- the entry time stamp available to the - -- protocol serialiser - | FOUND NodeState -- ^node is the responsible node for queried ID - deriving (Show, Eq) - -- TODO: evaluate more fine-grained argument passing to allow granular locking -- | look up an ID to either claim responsibility for it or return the closest l nodes from the local cache queryLocalCache :: NodeState -> NodeCache -> Int -> NodeID -> QueryResponse @@ -82,87 +82,6 @@ queryLocalCache ownState nCache lBestNodes targetID Nothing -> Set.empty Just nPred@(RemoteCacheEntry ns ts) -> Set.insert nPred $ closestPredecessor (remainingLookups-1) (nid ns) --- === protocol serialisation data types - -data Action = - QueryID - | Join - | Leave - | Stabilise - | Ping - deriving (Show, Eq, Enum) - -data FediChordMessage = - Request { - requestID :: Integer - , sender :: NodeState - , parts :: Integer - , part :: Integer - -- ^ part starts at 0 - , action :: Action - , payload :: Maybe ActionPayload - } - | Response { - responseTo :: Integer - , senderID :: NodeID - , parts :: Integer - , part :: Integer - , action :: Action - , payload :: Maybe ActionPayload - } deriving (Show, Eq) - -data ActionPayload = - QueryIDRequestPayload { - queryTargetID :: NodeID - , queryLBestNodes :: Integer - } - | JoinRequestPayload - | LeaveRequestPayload { - leaveSuccessors :: [NodeID] - , leavePredecessors :: [NodeID] - } - | StabiliseRequestPayload - | PingRequestPayload - | QueryIDResponsePayload { - queryResult :: QueryResponse - } - | JoinResponsePayload { - joinSuccessors :: [NodeID] - , joinPredecessors :: [NodeID] - , joinCache :: [RemoteCacheEntry] - } - | LeaveResponsePayload - | StabiliseResponsePayload { - stabiliseSuccessors :: [NodeID] - , stabilisePredecessors :: [NodeID] - } - | PingResponsePayload { - pingNodeStates :: [NodeState] - } - deriving (Show, Eq) - --- | global limit of parts per message used when (de)serialising messages. --- Used to limit the impact of DOS attempts with partial messages. -maximumParts :: Num a => a -maximumParts = 150 - --- | dedicated data type for cache entries sent to or received from the network, --- as these have to be considered as unvalidated. Also helps with separation of trust. -data RemoteCacheEntry = RemoteCacheEntry NodeState POSIXTime - deriving (Show, Eq) - -instance Ord RemoteCacheEntry where - (RemoteCacheEntry ns1 _) `compare` (RemoteCacheEntry ns2 _) = nid ns1 `compare` nid ns2 - -toRemoteCacheEntry :: CacheEntry -> Maybe RemoteCacheEntry -toRemoteCacheEntry (NodeEntry _ ns ts) = Just $ RemoteCacheEntry ns ts -toRemoteCacheEntry (ProxyEntry _ (Just entry@NodeEntry{})) = toRemoteCacheEntry entry -toRemoteCacheEntry _ = Nothing - --- helper function for use in tests -remoteNode_ :: RemoteCacheEntry -> NodeState -remoteNode_ (RemoteCacheEntry ns _) = ns - -- cache operations -- | update or insert a 'RemoteCacheEntry' into the cache, diff --git a/src/Hash2Pub/ProtocolTypes.hs b/src/Hash2Pub/ProtocolTypes.hs new file mode 100644 index 0000000..4271174 --- /dev/null +++ b/src/Hash2Pub/ProtocolTypes.hs @@ -0,0 +1,94 @@ +module Hash2Pub.ProtocolTypes where + +import qualified Data.Set as Set +import Data.Time.Clock.POSIX (POSIXTime) + +import Hash2Pub.FediChord + +data QueryResponse = FORWARD (Set.Set RemoteCacheEntry) -- ^return closest nodes from local cache. + -- whole cache entry is returned for making + -- the entry time stamp available to the + -- protocol serialiser + | FOUND NodeState -- ^node is the responsible node for queried ID + deriving (Show, Eq) + +-- === protocol serialisation data types + +data Action = + QueryID + | Join + | Leave + | Stabilise + | Ping + deriving (Show, Eq, Enum) + +data FediChordMessage = + Request { + requestID :: Integer + , sender :: NodeState + , parts :: Integer + , part :: Integer + -- ^ part starts at 0 + , action :: Action + , payload :: Maybe ActionPayload + } + | Response { + responseTo :: Integer + , senderID :: NodeID + , parts :: Integer + , part :: Integer + , action :: Action + , payload :: Maybe ActionPayload + } deriving (Show, Eq) + +data ActionPayload = + QueryIDRequestPayload { + queryTargetID :: NodeID + , queryLBestNodes :: Integer + } + | JoinRequestPayload + | LeaveRequestPayload { + leaveSuccessors :: [NodeID] + , leavePredecessors :: [NodeID] + } + | StabiliseRequestPayload + | PingRequestPayload + | QueryIDResponsePayload { + queryResult :: QueryResponse + } + | JoinResponsePayload { + joinSuccessors :: [NodeID] + , joinPredecessors :: [NodeID] + , joinCache :: [RemoteCacheEntry] + } + | LeaveResponsePayload + | StabiliseResponsePayload { + stabiliseSuccessors :: [NodeID] + , stabilisePredecessors :: [NodeID] + } + | PingResponsePayload { + pingNodeStates :: [NodeState] + } + deriving (Show, Eq) + +-- | global limit of parts per message used when (de)serialising messages. +-- Used to limit the impact of DOS attempts with partial messages. +maximumParts :: Num a => a +maximumParts = 150 + +-- | dedicated data type for cache entries sent to or received from the network, +-- as these have to be considered as unvalidated. Also helps with separation of trust. +data RemoteCacheEntry = RemoteCacheEntry NodeState POSIXTime + deriving (Show, Eq) + +instance Ord RemoteCacheEntry where + (RemoteCacheEntry ns1 _) `compare` (RemoteCacheEntry ns2 _) = nid ns1 `compare` nid ns2 + +toRemoteCacheEntry :: CacheEntry -> Maybe RemoteCacheEntry +toRemoteCacheEntry (NodeEntry _ ns ts) = Just $ RemoteCacheEntry ns ts +toRemoteCacheEntry (ProxyEntry _ (Just entry@NodeEntry{})) = toRemoteCacheEntry entry +toRemoteCacheEntry _ = Nothing + +-- helper function for use in tests +remoteNode_ :: RemoteCacheEntry -> NodeState +remoteNode_ (RemoteCacheEntry ns _) = ns