Compare commits

...

3 commits

21 changed files with 66 additions and 1 deletions

View file

View file

@ -46,7 +46,7 @@ category: Network
extra-source-files: CHANGELOG.md extra-source-files: CHANGELOG.md
common deps common deps
build-depends: base ^>=4.12.0.0, containers ^>=0.6.0.1, bytestring, utf8-string ^>=1.0.1.1, network ^>=2.8.0.1, time ^>=1.8.0.2, cmdargs ^>= 0.10, cryptonite ^>= 0.25, memory, async, stm, asn1-encoding, asn1-types, asn1-parse, publicsuffix, network-byte-order, safe, iproute build-depends: base ^>=4.12.0.0, containers ^>=0.6.0.1, bytestring, utf8-string ^>=1.0.1.1, network ^>=2.8.0.1, time ^>=1.8.0.2, cmdargs ^>= 0.10, cryptonite ^>= 0.25, memory, async, stm, asn1-encoding, asn1-types, asn1-parse, publicsuffix, network-byte-order, safe, iproute, mtl
ghc-options: -Wall ghc-options: -Wall

17
Readme.md Normal file
View file

@ -0,0 +1,17 @@
# Hash2Pub
***This is heavily WIP and does not provide any useful functionality yet***.
I aim for always having the master branch at a state where it builds and tests pass.
A fully-decentralised relay for global hashtag federation in [ActivityPub](https://activitypub.rocks) based on a distributed hash table.
It allows querying and subscribing to all posts of a certain hashtag and is implemented in Haskell.
This is the practical implementation of the concept presented in the paper [Decentralised Hashtag Search and Subscription for Federated Social Networks](https://git.orlives.de/schmittlauch/paper_hashtag_federation). A 30 minutes [introduction talk](https://conf.tube/videos/watch/340eb706-28c0-4a43-9364-700297ca96cb) is available as well.
The ASN.1 module schema used for DHT messages can be found in `FediChord.asn1`.
## Building
The project and its developent environment are built with [Nix](https://nixos.org/nix/).
The development environment can be entered with `nix-shell`. Then the project can be built with `cabal build` from within the environment, or using `nix-shell --command "cabal build"` to do both steps at once.

View file

@ -23,6 +23,8 @@ import qualified Data.Map as Map
import Data.Time.Clock.POSIX import Data.Time.Clock.POSIX
import Network.Socket hiding (send, sendTo, recv, recvFrom) import Network.Socket hiding (send, sendTo, recv, recvFrom)
import Network.Socket.ByteString import Network.Socket.ByteString
import System.Timeout
import Control.Monad.State.Strict
import Hash2Pub.FediChord import Hash2Pub.FediChord
( NodeID ( NodeID
@ -210,6 +212,52 @@ markCacheEntryAsVerified timestamp = Map.adjust adjustFunc
adjustFunc (ProxyEntry _ (Just entry)) = adjustFunc entry adjustFunc (ProxyEntry _ (Just entry)) = adjustFunc entry
adjustFunc entry = entry adjustFunc entry = entry
-- ====== message send and receive operations ======
requestQueryID :: NodeState -> NodeID -> IO NodeState
-- 1. do a local lookup for the l closest nodes
-- 2. create l sockets
-- 3. send a message async concurrently to all l nodes
-- 4. collect the results, insert them into cache
-- 5. repeat until FOUND (problem: new entries not necessarily already in cache, explicitly compare with closer results)
requestQueryID ns targetID = do
cacheSnapshot <- readIORef $ getNodeCacheRef ns
let localResult = queryLocalCache ns cacheSnapshot (fromMaybe 1 $ getLNumBestNodes ns) targetID
-- FOUND can only be returned if targetID is owned by local node
case localResult of
FOUND thisNode -> return thisNode
FORWARD nodeSet ->
sockets <- mapM (\resultNode -> mkSendSocket (domain result) (dhtPort resultNode)) $ Set.toList nodeSet
-- ToDo: process results immediately instead of waiting for the last one to finish, see https://stackoverflow.com/a/38815224/9198613
responses = mapM
sendRequestTo :: Int -- ^ timeout in seconds
-> Int -- ^ number of retries
-> FediChordMessage -- ^ the message to be sent
-> Socket -- ^ connected socket to use for sending
-> IO (Set.Set FediChordMessage) -- ^ responses
sendRequestTo timeout attempts msg sock = do
let requests = serialiseMessage 1200 msg
-- ToDo: make attempts and timeout configurable
attempts 3 . timeout 5000 $ do
where
-- state reingeben: state = noch nicht geackte messages, result = responses
sendAndAck :: Socket -> StateT (Map.Map Integer BS.ByteString) IO (Set.Set FediChordMessage)
sendAndAck sock = do
remainingSends <- get
sendMany sock $ Map.elems remainingSends
-- timeout pro receive socket, danach catMaybes
-- wichtig: Pakete können dupliziert werden, dh es können mehr ACKs als gesendete parts ankommen
replicateM
-- idea: send all parts at once
-- Set/ Map with unacked parts
-- then recv with timeout for |unackedParts| attempts, receive acked parts from set/ map
-- how to manage individual retries? nested "attempts"
-- | retry an IO action at most *i* times until it delivers a result -- | retry an IO action at most *i* times until it delivers a result
attempts :: Int -- ^ number of retries *i* attempts :: Int -- ^ number of retries *i*
-> IO (Maybe a) -- ^ action to retry -> IO (Maybe a) -- ^ action to retry