Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SSH tunneling #44

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -98,3 +98,6 @@ ENV/
*.txt

/venv--*
*config.json
*catalog.json
.vscode/
8 changes: 7 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,13 @@ here.
| `persist_empty_tables` | `["boolean", "null"]` | `False` | Whether the Target should create tables which have no records present in Remote. |
| `default_column_length` | `["integer", "null"]` | `1000` | All columns with the VARCHAR(CHARACTER VARYING) type will be have this length.Range: 1-65535. |
| `state_support` | `["boolean", "null"]` | `True` | Whether the Target should emit `STATE` messages to stdout for further consumption. In this mode, which is on by default, STATE messages are buffered in memory until all the records that occurred before them are flushed according to the batch flushing schedule the target is configured with. |
| `target_s3` | `["object"]` | `N/A` | See `S3` below |
| `target_s3` | `["object"]` | `N/A` | See `S3` below
| `use_ssh_tunnel` | `["boolean", "null"]` | `False` | Include `true` in your config to make the database connection through an SSH Tunnel
| `ssh_jump_server` | `["string", "null"]` | `N/A` | The hostname of your jump server, the one you want to tunnel through to get to the Redshift server
| `ssh_jump_server_port` | `["integer", "null"]` | `22` | The port of your jump server, usually 22
| `ssh_private_key_path` | `["string", "null"]` | `N/A` | The local path to the SSH private key file
| `ssh_private_key_password` | `["string", "null"]` | `N/A` | The passphrase to your SSH private key file. Optional if your key file has no passphrase set
| `ssh_username` | `["string", "null"]` | `N/A` | The username to use to connect to the jump server

#### S3 Config.json

Expand Down
3 changes: 2 additions & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,8 @@
install_requires=[
'boto3>=1.9.205,<1.10.0',
'singer-target-postgres==0.2.4',
'urllib3==1.25.9'
'urllib3==1.25.9',
'sshtunnel==0.1.5'
],
setup_requires=[
"pytest-runner"
Expand Down
107 changes: 72 additions & 35 deletions target_redshift/__init__.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import psycopg2
import singer
from singer import utils
import sshtunnel
from target_postgres import target_tools
from target_postgres.postgres import MillisLoggingConnection

Expand All @@ -9,47 +10,83 @@

LOGGER = singer.get_logger()

REQUIRED_CONFIG_KEYS = [
'redshift_host',
'redshift_database',
'redshift_username',
'redshift_password',
'target_s3'
]
def required_config_keys(use_ssh_tunnel=False):
keys = [
'redshift_host',
'redshift_database',
'redshift_username',
'redshift_password',
'target_s3'
]
if use_ssh_tunnel:
keys += [
'ssh_jump_server',
'ssh_jump_server_port',
'ssh_private_key_path',
'ssh_username'
]
return keys



def main(config, input_stream=None):
with psycopg2.connect(
connection_factory=MillisLoggingConnection,
host=config.get('redshift_host'),
port=config.get('redshift_port', 5439),
dbname=config.get('redshift_database'),
user=config.get('redshift_username'),
password=config.get('redshift_password')
) as connection:
s3_config = config.get('target_s3')
s3 = S3(s3_config.get('aws_access_key_id'),
s3_config.get('aws_secret_access_key'),
s3_config.get('bucket'),
s3_config.get('key_prefix'),
aws_session_token=s3_config.get('aws_session_token'))

redshift_target = RedshiftTarget(
connection,
s3,
redshift_schema=config.get('redshift_schema', 'public'),
logging_level=config.get('logging_level'),
default_column_length=config.get('default_column_length', 1000),
persist_empty_tables=config.get('persist_empty_tables')
)

if input_stream:
target_tools.stream_to_target(input_stream, redshift_target, config=config)
tunnel = None
try:
LOGGER.info(config)
if bool(config.get('use_ssh_tunnel')) == True:
LOGGER.info(f"use_ssh_tunnel is set to true; connecting to {config['redshift_host']}:{config['redshift_port']} via {config['ssh_jump_server']}:{config['ssh_jump_server_port']}")
tunnel = sshtunnel.open_tunnel(
(config['ssh_jump_server'], int(config['ssh_jump_server_port'])),
ssh_username=config['ssh_username'],
ssh_pkey=config['ssh_private_key_path'],
ssh_private_key_password=config['ssh_private_key_password'] if 'ssh_private_key_password' in config else None,
remote_bind_address=(config['redshift_host'], int(config['redshift_port']))
)
tunnel.start()
config['redshift_host'] = '127.0.0.1' # rewrite the config to go through the tunnel
config['redshift_port'] = tunnel.local_bind_port
else:
target_tools.main(redshift_target)
LOGGER.debug(f"use_ssh_tunnel is not set or is false; connecting directly to {config['redshift_host']}:{config['redshift_port']}")

with psycopg2.connect(
connection_factory=MillisLoggingConnection,
host=config.get('redshift_host'),
port=config.get('redshift_port', 5439),
dbname=config.get('redshift_database'),
user=config.get('redshift_username'),
password=config.get('redshift_password')
) as connection:
s3_config = config.get('target_s3')
s3 = S3(s3_config.get('aws_access_key_id'),
s3_config.get('aws_secret_access_key'),
s3_config.get('bucket'),
s3_config.get('key_prefix'),
aws_session_token=s3_config.get('aws_session_token'))

redshift_target = RedshiftTarget(
connection,
s3,
redshift_schema=config.get('redshift_schema', 'public'),
logging_level=config.get('logging_level'),
default_column_length=config.get('default_column_length', 1000),
persist_empty_tables=config.get('persist_empty_tables')
)

if input_stream:
target_tools.stream_to_target(input_stream, redshift_target, config=config)
else:
target_tools.main(redshift_target)

finally:
if tunnel is not None:
tunnel.stop()



def cli():
args = utils.parse_args(REQUIRED_CONFIG_KEYS)
args = utils.parse_args(required_config_keys())
if bool(args.config.get('use_ssh_tunnel')) == True:
args = utils.parse_args(required_config_keys(True))


main(args.config)