Skip to content
This repository has been archived by the owner on May 4, 2023. It is now read-only.

Commit

Permalink
Adding isolate-dag command
Browse files Browse the repository at this point in the history
This command allows us to easily create a slimmer version of a large DBT
project, which can then be executed by FishTown's DBT which it needing
to parse the whole of your project. i.e. you can get the speed
improvement of DDBT, but the compability of DBT.

This commit also moves the build status updates from Stdout to Stderr
  • Loading branch information
DomBlack committed Jan 21, 2021
1 parent f357f43 commit e610232
Show file tree
Hide file tree
Showing 9 changed files with 241 additions and 6 deletions.
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ ddbt version 0.2.1
- `ddbt watch` will get act like `run`, followed by `test`. DDBT will then watch your file system for any changes and automatically rerun those parts of the DAG and affected downstream tests or failing tests.
- `ddbt watch --skip-run` is the same as watch, but will skip the initial run (preventing you having to wait for all the models to run) before running the tests and starting to watch your file system.
- `ddbt completion zsh` will generate a shell completion script zsh (or bash if you pass that as argument). Detailed steps to set up the completion script can be found in `ddbt completion --help`
- `ddbt isolate-dag` will create a temporary directory and symlink in all files needed for the given _model_filter_ such that Fishtown's DBT could be run against it without having to be run against every model in your data warehouse

### Global Arguments
- `--models model_filter` _or_ `-m model_filter`: Instead of running for every model in your project, DDBT will only execute against the requested models. See filters below for what is accepted for `my_model`
Expand Down
174 changes: 174 additions & 0 deletions cmd/isolateDAG.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,174 @@
package cmd

import (
"fmt"
"io/ioutil"
"os"
"path/filepath"
"strings"

"github.com/spf13/cobra"

"ddbt/config"
"ddbt/fs"
"ddbt/utils"
)

func init() {
rootCmd.AddCommand(isolateDAG)
addModelsFlag(isolateDAG)
}

var isolateDAG = &cobra.Command{
Use: "isolate-dag",
Short: "Creates a symlinked copy of the selected models, which can be then passed to Fishtown's DBT",
Run: func(cmd *cobra.Command, args []string) {
fileSystem, _ := compileAllModels()

graph := buildGraph(fileSystem, ModelFilter) // Build the execution graph for the given command
graph.AddReferencingTests() // And then add any tests which reference that graph

if err := graph.AddAllUsedMacros(); err != nil {
fmt.Printf("❌ Unable to get all used macros: %s\n", err)
os.Exit(1)
}

isolateGraph(graph)
},
}

func isolateGraph(graph *fs.Graph) {
pb := utils.NewProgressBar("🔪 Isolating DAG", graph.Len())
defer pb.Stop()

// Create a temporary directory to stick the isolated models in
isolationDir, err := ioutil.TempDir(os.TempDir(), "isolated-dag-")
if err != nil {
fmt.Printf("❌ Unable to create temporarily directory for DAG isolation: %s\n", err)
os.Exit(1)
}

// Get the current working directory
cwd, err := os.Getwd()
if err != nil {
fmt.Printf("❌ Unable to get working directory: %s\n", err)
os.Exit(1)
}

symLink := func(pathInProject string) error {
fullOrgPath := filepath.Join(cwd, pathInProject)
symlinkedPath := filepath.Join(isolationDir, pathInProject)

// Create the folder in the isolated dir if needed
err := os.MkdirAll(filepath.Dir(symlinkedPath), os.ModePerm)
if err != nil {
return err
}

// Symlink the file in there
err = os.Symlink(fullOrgPath, symlinkedPath)
if err != nil {
return err
}

return nil
}

// Create a blank file which DBT can read
touch := func(pathInProject string) error {
symlinkedPath := filepath.Join(isolationDir, pathInProject)

// Create the folder in the isolated dir if needed
err := os.MkdirAll(filepath.Dir(symlinkedPath), os.ModePerm)
if err != nil {
return err
}

// If the file doesn't exist create it with no contents
if _, err := os.Stat(symlinkedPath); os.IsNotExist(err) {
file, err := os.Create(symlinkedPath)
if err != nil {
return err
}
return file.Close()
}

return nil
}

projectFiles := []string{
"dbt_project.yml",
"ddbt_config.yml",
"profiles",
"debug",
"docs",
"dbt_modules",
}

// If we have a model groups file bring that too
if config.GlobalCfg.ModelGroupsFile != "" {
projectFiles = append(projectFiles, config.GlobalCfg.ModelGroupsFile)
}

for _, file := range projectFiles {
if err := symLink(file); err != nil && !os.IsNotExist(err) {
pb.Stop()
fmt.Printf("❌ Unable to isolate project file `%s`: %s\n", file, err)
os.Exit(1)
}
}

err = graph.Execute(func(file *fs.File) error {
// Symlink the file from the DAG into the isolated folder
if err := symLink(file.Path); err != nil {
pb.Stop()
fmt.Printf("❌ Unable to isolate %s `%s`: %s\n", file.Type, file.Name, err)
return err
}

// Symlink the schema if it exists
schemaFile := strings.TrimSuffix(file.Path, filepath.Ext(file.Path)) + ".yml"
if _, err := os.Stat(schemaFile); file.Schema != nil && err == nil {
if err := symLink(schemaFile); err != nil {
pb.Stop()
fmt.Printf("❌ Unable to isolate schema for %s `%s`: %s\n", file.Type, file.Name, err)
return err
}
}

// Ensure usptream models are handled
for _, upstream := range file.Upstreams() {
if graph.Contains(upstream) {
continue
}

switch upstream.Type {
case fs.ModelFile:
// Model's outside of the DAG but referenced by it need to exist for DBT to be able to run on this DAG
// even if we run with the upstream command
if err := touch(upstream.Path); err != nil {
pb.Stop()
fmt.Printf("❌ Unable to touch %s `%s`: %s\n", upstream.Type, upstream.Name, err)
return err
}

default:
// Any other than a model which is being used _should_ already be in the graph
pb.Stop()
fmt.Printf("❌ Unexpected Upstream %s `%s`\n", upstream.Type, upstream.Name)
return err
}
}

pb.Increment()
return nil
}, config.NumberThreads(), pb)

if err != nil {
os.Exit(1)
}

pb.Stop()

fmt.Print(isolationDir)
}
4 changes: 2 additions & 2 deletions cmd/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,10 +51,10 @@ func addModelsFlag(cmd *cobra.Command) {
}

func compileAllModels() (*fs.FileSystem, *compiler.GlobalContext) {
fmt.Printf("ℹ️ Building for %s profile\n", config.GlobalCfg.Target.Name)
_, _ = fmt.Fprintf(os.Stderr, "ℹ️ Building for %s profile\n", config.GlobalCfg.Target.Name)

// Read the models on the file system
fileSystem, err := fs.ReadFileSystem(os.Stdout)
fileSystem, err := fs.ReadFileSystem(os.Stderr)
if err != nil {
fmt.Printf("❌ Unable to read filesystem: %s\n", err)
os.Exit(1)
Expand Down
2 changes: 1 addition & 1 deletion cmd/showDAG.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import (

func init() {
rootCmd.AddCommand(showDAG)
showDAG.Flags().StringVarP(&ModelFilter, "models", "m", "", "Select which model(s) to run")
addModelsFlag(showDAG)
}

var showDAG = &cobra.Command{
Expand Down
4 changes: 3 additions & 1 deletion config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,8 @@ type Config struct {
Target *Target

// Custom behaviour which allows us to override the target information on a per folder basis within `/models/`
ModelGroups map[string]*Target
ModelGroups map[string]*Target
ModelGroupsFile string

// seedConfig holds the seed (global) configurations
seedConfig map[string]*SeedConfig
Expand Down Expand Up @@ -119,6 +120,7 @@ func Read(targetProfile string, upstreamProfile string, threads int, strExecutor
}

GlobalCfg.ModelGroups = modelGroups
GlobalCfg.ModelGroupsFile = appConfig.ModelGroupsFile
}

if settings, found := project.Models[project.Name]; found {
Expand Down
13 changes: 13 additions & 0 deletions fs/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,19 @@ func (f *File) Downstreams() []*File {
return downstreams
}

// All the upstreams in this file
func (f *File) Upstreams() []*File {
f.Mutex.Lock()
defer f.Mutex.Unlock()

upstreams := make([]*File, 0, len(f.upstreams))
for upstream := range f.upstreams {
upstreams = append(upstreams, upstream)
}

return upstreams
}

func (f *File) MaskAsDynamicSQL() {
f.Mutex.Lock()
defer f.Mutex.Unlock()
Expand Down
45 changes: 45 additions & 0 deletions fs/graph.go
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,46 @@ func (g *Graph) addDownstreamModels(file *File, visited map[*File]struct{}) {
}
}

func (g *Graph) AddAllUsedMacros() error {
visited := make(map[*File]struct{})

for file := range g.nodes {
g.addUpstreamMacros(file, visited)
}

// Check for circular dependencies & all nodes without upstreams
for file := range visited {
node := g.getNodeFor(file)

if node.upstreamContains(node) {
return errors.New(fmt.Sprintf("%s has a circular dependency on itself", node.file.Name))
}
}

return nil
}

func (g *Graph) addUpstreamMacros(file *File, visited map[*File]struct{}) {
if _, found := visited[file]; found {
return
}
visited[file] = struct{}{}

thisNode := g.getNodeFor(file)

file.Mutex.Lock()
defer file.Mutex.Unlock()
for upstream := range file.upstreams {
if upstream.Type == MacroFile {
upstreamNode := g.getNodeFor(upstream)

g.edge(upstreamNode, thisNode)

g.addUpstreamMacros(upstream, visited)
}
}
}

// Find all tests which reference the models in the existing graph
// and add them to the graph
//
Expand Down Expand Up @@ -404,3 +444,8 @@ func (g *Graph) NumberNodesNeedRerunning() int {

return count
}

func (g *Graph) Contains(file *File) bool {
_, found := g.nodes[file]
return found
}
2 changes: 1 addition & 1 deletion utils/ProgressBar.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ func NewProgressBar(label string, numberItems int) *ProgressBar {
label: label,
completedItems: 0,
numberItems: uint32(numberItems),
output: os.Stdout,
output: os.Stderr,
startTime: time.Now(),
lastIncremented: time.Now(),

Expand Down
2 changes: 1 addition & 1 deletion utils/version.go
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
package utils

const DdbtVersion = "0.2.1"
const DdbtVersion = "0.3.0"

0 comments on commit e610232

Please sign in to comment.