Compare commits

..

No commits in common. "fea9660f80084c4c6ec31ea8500d0592ab272507" and "61818c58a9429422538e3ae7786cc28595818017" have entirely different histories.

4 changed files with 11 additions and 51 deletions

View file

@ -4,5 +4,5 @@
- error: { lhs: return, rhs: pure } - error: { lhs: return, rhs: pure }
- ignore: {name: ["Avoid lambda using `infix`", "Use lambda-case"]} - ignore: {name: "Avoid lambda using `infix`"}

View file

@ -4,7 +4,6 @@ module Hash2Pub.DHTProtocol
, addCacheEntry , addCacheEntry
, addCacheEntryPure , addCacheEntryPure
, deleteCacheEntry , deleteCacheEntry
, deserialiseMessage
, markCacheEntryAsVerified , markCacheEntryAsVerified
, RemoteCacheEntry(..) , RemoteCacheEntry(..)
, toRemoteCacheEntry , toRemoteCacheEntry
@ -20,7 +19,6 @@ module Hash2Pub.DHTProtocol
, resolve , resolve
, mkSendSocket , mkSendSocket
, mkServerSocket , mkServerSocket
, handleIncomingRequest
) )
where where
@ -33,7 +31,6 @@ import Control.Monad (foldM, forM, forM_)
import qualified Data.ByteString as BS import qualified Data.ByteString as BS
import Data.Either (rights) import Data.Either (rights)
import Data.Foldable (foldl', foldr') import Data.Foldable (foldl', foldr')
import Data.Functor.Identity
import Data.IORef import Data.IORef
import Data.IP (IPv6, fromHostAddress6, import Data.IP (IPv6, fromHostAddress6,
toHostAddress6) toHostAddress6)
@ -144,33 +141,8 @@ markCacheEntryAsVerified timestamp = Map.adjust adjustFunc
adjustFunc (ProxyEntry _ (Just entry)) = adjustFunc entry adjustFunc (ProxyEntry _ (Just entry)) = adjustFunc entry
adjustFunc entry = entry adjustFunc entry = entry
-- | uses the successor and predecessor list of a node as an indicator for whether a
-- node has properly joined the DHT
isJoined_ :: LocalNodeState -> Bool
isJoined_ ns = not . all null $ [successors ns, predecessors ns]
-- ====== message send and receive operations ====== -- ====== message send and receive operations ======
handleIncomingRequest :: LocalNodeState -- ^ the handling node
-> TQueue (BS.ByteString, SockAddr) -- ^ send queue
-> FediChordMessage -- ^ request to handle
-> SockAddr -- ^ source address of the request
-> IO ()
handleIncomingRequest ns sendQ msg sourceAddr = do
-- add nodestate to cache
now <- getPOSIXTime
queueAddEntries (Identity . RemoteCacheEntry (sender msg) $ now) ns
-- distinguish on whether and how to respond
-- create and enqueue ACK
-- Idea: only respond with payload on last part (part == parts), problem: need to know partnumber of response from first part on
-- PLACEHOLDER
pure ()
-- ....... response sending .......
-- ....... request sending .......
-- | send a join request and return the joined 'LocalNodeState' including neighbours -- | send a join request and return the joined 'LocalNodeState' including neighbours
requestJoin :: NodeState a => a -- ^ currently responsible node to be contacted requestJoin :: NodeState a => a -- ^ currently responsible node to be contacted
-> LocalNodeState -- ^ joining NodeState -> LocalNodeState -- ^ joining NodeState

View file

@ -53,12 +53,10 @@ import qualified Data.Map.Strict as Map
import Data.Maybe (fromMaybe, isJust, mapMaybe) import Data.Maybe (fromMaybe, isJust, mapMaybe)
import qualified Data.Set as Set import qualified Data.Set as Set
import Data.Time.Clock.POSIX import Data.Time.Clock.POSIX
import Network.Socket hiding (recv, recvFrom, send, import Network.Socket hiding (recv, recvFrom, send, sendTo)
sendTo)
import Network.Socket.ByteString import Network.Socket.ByteString
-- for hashing and ID conversion -- for hashing and ID conversion
import Control.Concurrent
import Control.Concurrent.Async import Control.Concurrent.Async
import Control.Concurrent.STM import Control.Concurrent.STM
import Control.Concurrent.STM.TQueue import Control.Concurrent.STM.TQueue
@ -215,22 +213,12 @@ fediMessageHandler :: TQueue (BS.ByteString, SockAddr) -- ^ send queue
-> IO () -> IO ()
fediMessageHandler sendQ recvQ ns = forever $ do fediMessageHandler sendQ recvQ ns = forever $ do
-- wait for incoming messages -- wait for incoming messages
(rawMsg, sourceAddr) <- atomically $ readTQueue recvQ -- newMsg <- deserialiseMessage <$> recvFrom sock
let aMsg = deserialiseMessage rawMsg -- either (\_ ->
-- handling multipart messages: -- -- ignore invalid messages
-- So far I handle the effects of each message part immedeiately, before making sure that and whether all parts have been received, based on the idea that even incomplete information is beneficial and handled idempotent. -- pure ()
-- If this turns out not to be the case, request parts can be insert into a map (key: (sender IP against spoofing, request ID), value: timestamp + set of message parts, handle all of them when size of set == parts) before being handled. This map needs to be purged periodically by a separate thread and can be protected by an MVar for fairness. -- )
either (\_ -> -- (\aMsg ->
-- drop invalid messages -- case aMsg of
pure () -- aRequest@Request{} -> handleRequest
)
(\validMsg ->
case validMsg of
aRequest@Request{} -> forkIO (handleIncomingRequest ns sendQ aRequest sourceAddr) >> pure ()
-- Responses should never arrive on the main server port, as they are always
-- responses to requests sent from dedicated sockets on another port
_ -> pure ()
)
aMsg
pure () pure ()

View file

@ -4,7 +4,7 @@ import Control.Concurrent
import Control.Concurrent.Async import Control.Concurrent.Async
import Control.Exception import Control.Exception
import Data.Either import Data.Either
import Data.IP (IPv6, toHostAddress6) import Data.IP (IPv6, toHostAddress6)
import System.Environment import System.Environment
import Hash2Pub.FediChord import Hash2Pub.FediChord