Skip to content

Commit

Permalink
Encode hashes as bytes
Browse files Browse the repository at this point in the history
  • Loading branch information
ChrisPenner committed Jan 23, 2025
1 parent b4adcb6 commit fea3688
Show file tree
Hide file tree
Showing 4 changed files with 135 additions and 66 deletions.
103 changes: 60 additions & 43 deletions unison-cli/src/Unison/Share/SyncV2.hs
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,7 @@ import Data.ByteString.Lazy qualified as BL
import Data.Conduit.Attoparsec qualified as C
import Data.Conduit.List qualified as C
import Data.Conduit.Zlib qualified as C
import Data.Graph qualified as Graph
import Data.Map qualified as Map
import Data.Set qualified as Set
import Data.Text.IO qualified as Text
import Servant.Conduit ()
import System.Console.Regions qualified as Console.Regions
Expand All @@ -37,17 +35,17 @@ import Unison.Cli.Monad (Cli)
import Unison.Cli.Monad qualified as Cli
import Unison.Codebase qualified as Codebase
import Unison.Debug qualified as Debug
import Unison.Hash qualified as Hash
import Unison.Hash32 (Hash32)
import Unison.Hash32 qualified as Hash32
import Unison.Prelude
import Unison.Share.ExpectedHashMismatches (expectedCausalHashMismatches, expectedComponentHashMismatches)
import Unison.Share.Sync.Types
import Unison.Sqlite qualified as Sqlite
import Unison.Sync.Common (causalHashToHash32, hash32ToCausalHash, tempEntityToEntity)
import Unison.Sync.Common (causalHashToHash32, entityToTempEntity, hash32ToCausalHash, tempEntityToEntity)
import Unison.Sync.Common qualified as Sync
import Unison.Sync.EntityValidation qualified as EV
import Unison.Sync.Types qualified as Share
import Unison.Sync.Types (entityHashes_)
import Unison.Sync.Types qualified as Sync
import Unison.SyncV2.Types (CBORBytes)
import Unison.SyncV2.Types (BytesEntity, CBORBytes)
import Unison.SyncV2.Types qualified as SyncV2
import Unison.Util.Servant.CBOR qualified as CBOR
import Unison.Util.Timing qualified as Timing
Expand All @@ -65,7 +63,7 @@ batchSize = 5000
------------------------------------------------------------------------------------------------------------------------
-- Download entities

validateAndSave :: Bool -> (Codebase.Codebase IO v a) -> [(Hash32, TempEntity)] -> StreamM ()
validateAndSave :: Bool -> (Codebase.Codebase IO v a) -> [(Hash32, BytesEntity)] -> StreamM ()
validateAndSave shouldValidate codebase entities = do
let validateEntities =
runExceptT $ when shouldValidate (batchValidateEntities entities)
Expand All @@ -74,7 +72,7 @@ validateAndSave shouldValidate codebase entities = do
ExceptT . liftIO $ IO.withAsync validateEntities \validationTask -> do
Timing.time "Inserting entities" $ Codebase.runTransactionExceptT codebase do
for_ entities \(hash, entity) -> do
void . lift $ Q.saveTempEntityInMain v2HashHandle hash entity
void . lift $ Q.saveTempEntityInMain v2HashHandle hash (bytesToTemp entity)
lift (Sqlite.unsafeIO (IO.wait validationTask)) >>= \case
Left err -> throwError err
Right _ -> pure ()
Expand All @@ -88,8 +86,8 @@ syncUnsortedStream ::
syncUnsortedStream shouldValidate codebase stream = do
Debug.debugLogM Debug.Temp $ "Syncing unsorted stream"
allResults <- C.runConduit $ stream C..| C.sinkList
allEntities <- ExceptT $ Timing.time "Unpacking chunks" $ liftIO $ Codebase.runTransactionExceptT codebase $ do unpackChunks allResults
let sortedEntities = sortDependencyFirst allEntities
sortedEntities <- ExceptT $ Timing.time "Unpacking chunks" $ liftIO $ Codebase.runTransactionExceptT codebase $ do unpackChunks allResults
-- let sortedEntities = sortDependencyFirst allEntities
validateAndSave shouldValidate codebase sortedEntities

-- | Syncs a stream which sends entities which are already sorted in dependency order.
Expand All @@ -106,44 +104,46 @@ syncSortedStream shouldValidate codebase stream = do
validateAndSave shouldValidate codebase (catMaybes entityBatch)
C.runConduit $ stream C..| C.chunksOf batchSize C..| handler

unpackChunk :: SyncV2.EntityChunk -> ExceptT SyncErr Sqlite.Transaction (Maybe (Hash32, TempEntity))
unpackChunk :: SyncV2.EntityChunk -> ExceptT SyncErr Sqlite.Transaction (Maybe (Hash32, BytesEntity))
unpackChunk = \case
SyncV2.EntityChunk {hash, entityCBOR = entityBytes} -> do
-- Only want entities we don't already have
lift (Q.entityLocation hash) >>= \case
lift (Q.entityLocation $ hash32FromBS hash) >>= \case
Just Q.EntityInMainStorage -> pure Nothing
_ -> do
(Just . (hash,)) <$> unpackEntity entityBytes
(Just . (hash32FromBS hash,)) <$> unpackEntity entityBytes
where
unpackEntity :: (CBORBytes TempEntity) -> ExceptT SyncErr Sqlite.Transaction TempEntity
unpackEntity :: (CBORBytes BytesEntity) -> ExceptT SyncErr Sqlite.Transaction BytesEntity
unpackEntity entityBytes = do
case CBOR.deserialiseOrFailCBORBytes entityBytes of
Left err -> do throwError $ (SyncError . SyncV2.PullError'Sync $ SyncV2.SyncErrorDeserializationFailure err)
Right entity -> pure entity

unpackChunks :: [SyncV2.EntityChunk] -> ExceptT SyncErr Sqlite.Transaction [(Hash32, TempEntity)]
unpackChunks :: [SyncV2.EntityChunk] -> ExceptT SyncErr Sqlite.Transaction [(Hash32, BytesEntity)]
unpackChunks xs = do
for xs unpackChunk
<&> catMaybes

batchValidateEntities :: [(Hash32, TempEntity)] -> ExceptT SyncErr IO ()
batchValidateEntities entities = do
mismatches <- fmap catMaybes $ liftIO $ IO.pooledForConcurrently entities \(hash, entity) -> do
IO.evaluate $ EV.validateTempEntity hash entity
for_ mismatches \case
err@(Share.EntityHashMismatch et (Share.HashMismatchForEntity {supplied, computed})) ->
let expectedMismatches = case et of
Share.TermComponentType -> expectedComponentHashMismatches
Share.DeclComponentType -> expectedComponentHashMismatches
Share.CausalType -> expectedCausalHashMismatches
_ -> mempty
in case Map.lookup supplied expectedMismatches of
Just expected
| expected == computed -> pure ()
_ -> do
throwError . SyncError . SyncV2.PullError'DownloadEntities . SyncV2.DownloadEntitiesEntityValidationFailure $ err
err -> do
throwError . SyncError . SyncV2.PullError'DownloadEntities . SyncV2.DownloadEntitiesEntityValidationFailure $ err
batchValidateEntities :: [(Hash32, BytesEntity)] -> ExceptT SyncErr IO ()
batchValidateEntities _entities = undefined

-- -- mismatches <- fmap catMaybes $ liftIO $ IO.pooledForConcurrently entities \(hash, entity) -> do
-- pure Nothing
-- -- IO.evaluate $ EV.validateTempEntity hash entity
-- for_ mismatches \case
-- err@(Share.EntityHashMismatch et (Share.HashMismatchForEntity {supplied, computed})) ->
-- let expectedMismatches = case et of
-- Share.TermComponentType -> expectedComponentHashMismatches
-- Share.DeclComponentType -> expectedComponentHashMismatches
-- Share.CausalType -> expectedCausalHashMismatches
-- _ -> mempty
-- in case Map.lookup supplied expectedMismatches of
-- Just expected
-- | expected == computed -> pure ()
-- _ -> do
-- throwError . SyncError . SyncV2.PullError'DownloadEntities . SyncV2.DownloadEntitiesEntityValidationFailure $ err
-- err -> do
-- throwError . SyncError . SyncV2.PullError'DownloadEntities . SyncV2.DownloadEntitiesEntityValidationFailure $ err

streamIntoCodebase :: Bool -> Codebase.Codebase IO v a -> SyncV2.StreamInitInfo -> Stream () SyncV2.EntityChunk -> StreamM ()
streamIntoCodebase shouldValidate codebase SyncV2.StreamInitInfo {version, entitySorting, numEntities = numEntities} stream = ExceptT do
Expand Down Expand Up @@ -172,11 +172,12 @@ afterSyncChecks codebase hash = do
isJust <$> (Codebase.runTransaction codebase $ Q.loadCausalByCausalHash expectedHash)

-- | Topologically sort entities based on their dependencies.
sortDependencyFirst :: [(Hash32, TempEntity)] -> [(Hash32, TempEntity)]
sortDependencyFirst entities = do
let adjList = entities <&> \(hash32, entity) -> ((hash32, entity), hash32, Set.toList $ Share.entityDependencies (tempEntityToEntity entity))
(graph, vertexInfo, _vertexForKey) = Graph.graphFromEdges adjList
in Graph.reverseTopSort graph <&> \v -> (view _1 $ vertexInfo v)
_sortDependencyFirst :: [(Hash32, BytesEntity)] -> [(Hash32, BytesEntity)]
_sortDependencyFirst entities = entities

-- let adjList = entities <&> \(hash32, entity) -> ((hash32, entity), hash32, Set.toList $ Share.entityDependencies (tempEntityToEntity entity))
-- (graph, vertexInfo, _vertexForKey) = Graph.graphFromEdges adjList
-- in Graph.reverseTopSort graph <&> \v -> (view _1 $ vertexInfo v)

syncFromFile ::
Bool ->
Expand Down Expand Up @@ -236,10 +237,10 @@ withEntityStream conn rootHash mayBranchRef callback = do
entities
& fmap (Sync.entityToTempEntity id)
& Map.toList
& sortDependencyFirst
-- & sortDependencyFirst
& ( fmap \(hash, entity) ->
let entityCBOR = (CBOR.serialiseCBORBytes entity)
in SyncV2.EntityC (SyncV2.EntityChunk {hash, entityCBOR})
let entityCBOR = (CBOR.serialiseCBORBytes $ tempToBytes entity)
in SyncV2.EntityC (SyncV2.EntityChunk {hash = hash32ToBS hash, entityCBOR})
)
& (initialChunk :)
let stream = C.yieldMany contents
Expand All @@ -256,7 +257,7 @@ syncToFile codebase rootHash mayBranchRef destFilePath = do
C.runResourceT $
withEntityStream conn rootHash mayBranchRef \mayTotal stream -> do
withStreamProgressCallback (Just mayTotal) \countC -> runExceptT do
C.runConduit $ stream C..| countC C..| C.map (BL.toStrict . CBOR.serialise) C..| C.transPipe liftIO C.gzip C..| C.sinkFile destFilePath
C.runConduit $ stream C..| countC C..| C.map (BL.toStrict . CBOR.serialise) C..| C.sinkFile destFilePath

-- | Collect all dependencies of a given causal hash.
depsForCausal :: CausalHash -> (Int -> IO ()) -> Sqlite.Transaction (Map Hash32 (Sync.Entity Text Hash32 Hash32))
Expand Down Expand Up @@ -393,3 +394,19 @@ withEntityLoadingCallback action = do
<> " entities...\n\n"
toIO $ action $ \i -> do
liftIO $ IO.atomically (IO.modifyTVar' counterVar (+ i))

hash32ToBS :: Hash32 -> ByteString
hash32ToBS = Hash.toByteString . Hash32.toHash

hash32FromBS :: ByteString -> Hash32
hash32FromBS = Hash32.fromHash . Hash.fromByteString

tempToBytes :: TempEntity -> BytesEntity
tempToBytes = entityToTempEntity id . mapEntity . tempEntityToEntity
where
mapEntity = over entityHashes_ hash32ToBS

bytesToTemp :: BytesEntity -> TempEntity
bytesToTemp = entityToTempEntity id . mapEntity . tempEntityToEntity
where
mapEntity = over entityHashes_ hash32FromBS
54 changes: 54 additions & 0 deletions unison-share-api/src/Unison/Server/Orphans.hs
Original file line number Diff line number Diff line change
Expand Up @@ -544,6 +544,60 @@ instance Serialise TempEntity where
parents <- CBOR.decodeVector
pure $ Entity.C (SqliteCausal.SyncCausalFormat {valueHash, parents})

instance Serialise (Entity.SyncEntity' Text Hash32 ByteString ByteString ByteString ByteString ByteString) where
encode = \case
Entity.TC (TermFormat.SyncTerm (TermFormat.SyncLocallyIndexedComponent elements)) ->
CBOR.encode TermComponentTag
<> CBOR.encodeVector (coerce @(Vector (LocalIds.LocalIds' Text ByteString, ByteString)) @(Vector (ComponentBody Text ByteString)) elements)
Entity.DC (DeclFormat.SyncDecl (DeclFormat.SyncLocallyIndexedComponent elements)) ->
CBOR.encode DeclComponentTag
<> CBOR.encodeVector (coerce @(Vector (LocalIds.LocalIds' Text ByteString, ByteString)) @(Vector (ComponentBody Text ByteString)) elements)
Entity.P (PatchFormat.SyncDiff {}) -> error "Serializing Diffs are not supported"
Entity.P (PatchFormat.SyncFull (PatchFormat.LocalIds {patchTextLookup, patchHashLookup, patchDefnLookup}) bytes) ->
CBOR.encode PatchTag
<> CBOR.encodeVector patchTextLookup
<> CBOR.encodeVector patchHashLookup
<> CBOR.encodeVector patchDefnLookup
<> CBOR.encodeBytes bytes
Entity.N (BranchFormat.SyncDiff {}) -> error "Serializing Diffs are not supported"
Entity.N (BranchFormat.SyncFull (BranchFormat.LocalIds {branchTextLookup, branchDefnLookup, branchPatchLookup, branchChildLookup}) (BranchFormat.LocalBranchBytes bytes)) ->
CBOR.encode NamespaceTag
<> CBOR.encodeVector branchTextLookup
<> CBOR.encodeVector branchDefnLookup
<> CBOR.encodeVector branchPatchLookup
<> CBOR.encodeVector branchChildLookup
<> CBOR.encodeBytes bytes
Entity.C (SqliteCausal.SyncCausalFormat {valueHash, parents}) ->
CBOR.encode CausalTag
<> CBOR.encode valueHash
<> CBOR.encodeVector parents

decode = do
CBOR.decode >>= \case
TermComponentTag -> do
elements <- coerce @(Vector (ComponentBody Text ByteString)) @(Vector (LocalIds.LocalIds' Text ByteString, ByteString)) <$> CBOR.decodeVector
pure $ Entity.TC (TermFormat.SyncTerm (TermFormat.SyncLocallyIndexedComponent elements))
DeclComponentTag -> do
elements <- coerce @(Vector (ComponentBody Text ByteString)) @(Vector (LocalIds.LocalIds' Text ByteString, ByteString)) <$> CBOR.decodeVector
pure $ Entity.DC (DeclFormat.SyncDecl (DeclFormat.SyncLocallyIndexedComponent elements))
PatchTag -> do
patchTextLookup <- CBOR.decodeVector
patchHashLookup <- CBOR.decodeVector
patchDefnLookup <- CBOR.decodeVector
bytes <- CBOR.decodeBytes
pure $ Entity.P (PatchFormat.SyncFull (PatchFormat.LocalIds {patchTextLookup, patchHashLookup, patchDefnLookup}) bytes)
NamespaceTag -> do
branchTextLookup <- CBOR.decodeVector
branchDefnLookup <- CBOR.decodeVector
branchPatchLookup <- CBOR.decodeVector
branchChildLookup <- CBOR.decodeVector
bytes <- CBOR.decodeBytes
pure $ Entity.N (BranchFormat.SyncFull (BranchFormat.LocalIds {branchTextLookup, branchDefnLookup, branchPatchLookup, branchChildLookup}) (BranchFormat.LocalBranchBytes bytes))
CausalTag -> do
valueHash <- CBOR.decode
parents <- CBOR.decodeVector
pure $ Entity.C (SqliteCausal.SyncCausalFormat {valueHash, parents})

encodeVectorWith :: (a -> CBOR.Encoding) -> Vector.Vector a -> CBOR.Encoding
encodeVectorWith f xs =
CBOR.encodeListLen (fromIntegral $ Vector.length xs)
Expand Down
28 changes: 14 additions & 14 deletions unison-share-api/src/Unison/Sync/Common.hs
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,9 @@ import U.Codebase.Sqlite.Causal qualified as Causal
import U.Codebase.Sqlite.Decl.Format qualified as DeclFormat
import U.Codebase.Sqlite.Entity qualified as Entity
import U.Codebase.Sqlite.LocalIds
import U.Codebase.Sqlite.Patch.Format qualified as Patch
import U.Codebase.Sqlite.Patch.Format qualified as PatchFormat
import U.Codebase.Sqlite.Queries qualified as Q
import U.Codebase.Sqlite.TempEntity (TempEntity)
import U.Codebase.Sqlite.TempEntity qualified as Sqlite
import U.Codebase.Sqlite.TempEntity qualified as TempEntity
import U.Codebase.Sqlite.Term.Format qualified as TermFormat
import Unison.Hash32 (Hash32)
import Unison.Hash32 qualified as Hash32
Expand All @@ -50,7 +48,8 @@ hash32ToCausalHash =

-- | Convert an entity that came over the wire from Unison Share into an equivalent type that we can store in the
-- `temp_entity` table.
entityToTempEntity :: forall hash. (hash -> Hash32) -> Share.Entity Text Hash32 hash -> TempEntity
-- entityToTempEntity :: forall hash hash'. (hash -> hash') -> Share.Entity Text Hash32 hash -> Entity.SyncEntity' Text _ _ _ _ _ _
entityToTempEntity :: (a -> branchh) -> Share.Entity t hash a -> Entity.SyncEntity' t hash branchh branchh branchh branchh branchh
entityToTempEntity toHash32 = \case
Share.TC (Share.TermComponent terms) ->
terms
Expand Down Expand Up @@ -91,19 +90,19 @@ entityToTempEntity toHash32 = \case
parents = Vector.fromList (map toHash32 (Set.toList parents))
}
where
mungeLocalIds :: Share.LocalIds Text hash -> TempEntity.TempLocalIds
-- mungeLocalIds :: Share.LocalIds Text hash -> TempEntity.TempLocalIds
mungeLocalIds Share.LocalIds {texts, hashes} =
LocalIds
{ textLookup = Vector.fromList texts,
defnLookup = Vector.map toHash32 (Vector.fromList hashes)
}

mungeNamespaceLocalIds ::
[Text] ->
[hash] ->
[hash] ->
[(hash, hash)] ->
TempEntity.TempNamespaceLocalIds
-- mungeNamespaceLocalIds ::
-- [Text] ->
-- [hash] ->
-- [hash] ->
-- [(hash, hash)] ->
-- TempEntity.TempNamespaceLocalIds
mungeNamespaceLocalIds textLookup defnLookup patchLookup childLookup =
NamespaceFormat.LocalIds
{ branchTextLookup = Vector.fromList textLookup,
Expand All @@ -112,15 +111,16 @@ entityToTempEntity toHash32 = \case
branchChildLookup = Vector.fromList (map (\(x, y) -> (toHash32 x, toHash32 y)) childLookup)
}

mungePatchLocalIds :: [Text] -> [Hash32] -> [hash] -> TempEntity.TempPatchLocalIds
-- mungePatchLocalIds :: [Text] -> [_] -> [hash] -> Patch.PatchLocalIds' Text _ hash'
mungePatchLocalIds textLookup oldHashLookup newHashLookup =
PatchFormat.LocalIds
{ patchTextLookup = Vector.fromList textLookup,
patchHashLookup = Vector.fromList oldHashLookup,
patchDefnLookup = Vector.fromList (map toHash32 newHashLookup)
}

tempEntityToEntity :: Sqlite.TempEntity -> Share.Entity Text Hash32 Hash32
-- tempEntityToEntity :: Sqlite.TempEntity -> Share.Entity Text Hash32 Hash32
tempEntityToEntity :: (Ord h) => Entity.SyncEntity' text oldHash h h h h h -> Share.Entity text oldHash h
tempEntityToEntity = \case
Entity.TC (TermFormat.SyncTerm (TermFormat.SyncLocallyIndexedComponent terms)) ->
terms
Expand Down Expand Up @@ -196,7 +196,7 @@ tempEntityToEntity = \case
parents = Set.fromList (Vector.toList parents)
}
where
mungeLocalIds :: LocalIds' Text Hash32 -> Share.LocalIds Text Hash32
-- mungeLocalIds :: LocalIds' Text Hash32 -> Share.LocalIds Text Hash32
mungeLocalIds LocalIds {textLookup, defnLookup} =
Share.LocalIds
{ texts = Vector.toList textLookup,
Expand Down
16 changes: 7 additions & 9 deletions unison-share-api/src/Unison/SyncV2/Types.hs
Original file line number Diff line number Diff line change
Expand Up @@ -14,27 +14,23 @@ module Unison.SyncV2.Types
PullError (..),
EntitySorting (..),
Version (..),
BytesEntity,
)
where

import Codec.CBOR.Encoding qualified as CBOR
import Codec.Serialise (Serialise (..))
import Codec.Serialise qualified as CBOR
import Codec.Serialise.Decoding qualified as CBOR
import Control.Exception (Exception)
import Data.Aeson (FromJSON (..), ToJSON (..), object, withObject, (.:), (.=))
import Data.Map (Map)
import Data.Map qualified as Map
import Data.Set (Set)
import Data.Text (Text)
import Data.Text qualified as Text
import Data.Word (Word16, Word64)
import U.Codebase.HashTags (CausalHash)
import U.Codebase.Sqlite.TempEntity (TempEntity)
import U.Codebase.Sqlite.Entity (SyncEntity')
import Unison.Core.Project (ProjectAndBranch (..), ProjectBranchName, ProjectName)
import Unison.Debug qualified as Debug
import Unison.Hash32 (Hash32)
import Unison.Prelude (From (..))
import Unison.Prelude
import Unison.Server.Orphans ()
import Unison.Share.API.Hash (HashJWT)
import Unison.Sync.Types qualified as SyncV1
Expand Down Expand Up @@ -214,9 +210,11 @@ instance Serialise StreamInitInfo where
rootBranchRef <- optionalDecodeMapKey "br" m
pure StreamInitInfo {version, entitySorting, numEntities, rootCausalHash, rootBranchRef}

type BytesEntity = SyncEntity' Text Hash32 ByteString ByteString ByteString ByteString ByteString

data EntityChunk = EntityChunk
{ hash :: Hash32,
entityCBOR :: CBORBytes TempEntity
{ hash :: ByteString,
entityCBOR :: CBORBytes BytesEntity
}
deriving (Show, Eq, Ord)

Expand Down

0 comments on commit fea3688

Please sign in to comment.