Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

CSS-5833 Ranger group management #21

Merged
merged 16 commits into from
Oct 24, 2023
Merged
7 changes: 7 additions & 0 deletions src/literals.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,13 @@
JAVA_ENV = {"JAVA_HOME": "/opt/java/openjdk"}
RANGER_POLICY_PATH = "/etc/ranger"

# UNIX literals
UNIX_TYPE_MAPPING = {
"user": "passwd",
"group": "group",
"membership": "group",
}

# Connector literal
CONNECTOR_FIELDS = {
"accumlo": {
Expand Down
247 changes: 199 additions & 48 deletions src/relations/policy.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

import logging

import yaml
from ops import framework
from ops.model import BlockedStatus
from ops.pebble import ExecError
Expand All @@ -15,9 +16,10 @@
RANGER_PLUGIN_VERSION,
RANGER_POLICY_PATH,
TRINO_PORTS,
UNIX_TYPE_MAPPING,
)
from log import log_event_handler
from utils import render
from utils import handle_exec_error, render

logger = logging.getLogger(__name__)

Expand All @@ -26,8 +28,8 @@ class PolicyRelationHandler(framework.Object):
"""Client for trino policy relations.

Attributes:
plugin_version: the version of the Ranger plugin
ranger_plugin_path: the path of the unpacked ranger plugin
plugin_version: The version of the Ranger plugin
ranger_plugin_path: The path of the unpacked ranger plugin
"""

plugin_version = RANGER_PLUGIN_VERSION["path"]
Expand All @@ -38,7 +40,7 @@ def __init__(self, charm, relation_name="policy"):

Args:
charm: The charm to attach the hooks to.
relation_name: the name of the relation defaults to policy.
relation_name: The name of the relation defaults to policy.
"""
super().__init__(charm, "policy")
self.charm = charm
Expand All @@ -63,7 +65,7 @@ def _on_relation_created(self, event):
"""Handle policy relation created.

Args:
event: relation created event.
event: The relation created event.
"""
if not self.charm.unit.is_leader():
return
Expand All @@ -78,7 +80,10 @@ def _on_relation_changed(self, event):
"""Handle policy relation changed.

Args:
event: relation changed event.
event: Relation changed event.

Raises:
ExecError: When failure to enable Ranger plugin.
"""
if not self.charm.unit.is_leader():
return
Expand All @@ -101,22 +106,39 @@ def _on_relation_changed(self, event):
container, policy_manager_url, policy_relation
)
self._enable_plugin(container)
logger.info("Ranger plugin is enabled.")
except ExecError as err:
logger.error(err)
self.charm.unit.status = BlockedStatus(
"Failed to enable Ranger plugin."
)
return
raise ExecError(f"Unable to enable Ranger plugin: {err}") from err

users_and_groups = event.relation.data[event.app].get(
"user-group-configuration", None
)
if users_and_groups:
try:
self._synchronize(users_and_groups, container)
except ExecError:
logger.exception("Failed to synchronize groups:")
event.defer()
except Exception:
logger.exception(
"An exception occurred while sychronizing Ranger groups:"
)
self.charm.unit.status = BlockedStatus(
"Failed to synchronize Ranger groups."
)
self.charm._restart_trino(container)

def _prepare_service(self, event):
"""Prepare service to be created in Ranger.

Args:
event: relation created event
event: Relation created event

Returns:
service: service values to be set in relation databag
service: Service values to be set in relation databag
"""
host = self.charm.config["application-name"]
port = TRINO_PORTS["HTTP"]
Expand All @@ -139,7 +161,10 @@ def _on_relation_broken(self, event):
"""Handle policy relation broken.

Args:
event: relation broken event.
event: Relation broken event.

Raises:
ExecError: When failure to disable Ranger plugin.
"""
if not self.charm.unit.is_leader():
return
Expand All @@ -151,7 +176,11 @@ def _on_relation_broken(self, event):
if not container.exists(self.ranger_plugin_path):
return

self._disable_ranger_plugin(container)
try:
self._disable_ranger_plugin(container)
logger.info("Ranger plugin disabled successfully")
except ExecError as err:
raise ExecError(f"Unable to disable Ranger plugin: {err}") from err

if container.exists(RANGER_POLICY_PATH):
container.remove_path(RANGER_POLICY_PATH, recursive=True)
Expand All @@ -161,38 +190,29 @@ def _on_relation_broken(self, event):

self.charm._restart_trino(container)

@handle_exec_error
def _enable_plugin(self, container):
"""Enable ranger plugin.

Args:
container: application container

Raises:
ExecError: in case unable to enable trino plugin
container: The application container
"""
command = [
"bash",
"enable-trino-plugin.sh",
]
try:
container.exec(
command,
working_dir=self.ranger_plugin_path,
environment=JAVA_ENV,
).wait_output()
logger.info("Ranger plugin enabled successfully")
except ExecError as err:
logger.error(err.stdout)
raise
container.exec(
command,
working_dir=self.ranger_plugin_path,
environment=JAVA_ENV,
).wait_output()

@handle_exec_error
def _unpack_plugin(self, container):
"""Unpack ranger plugin tar.

Args:
container: application container

Raises:
ExecError: in case unable to enable trino plugin
"""
if container.exists(self.ranger_plugin_path):
return
Expand All @@ -204,12 +224,7 @@ def _unpack_plugin(self, container):
"xf",
f"ranger-{tar_version}-trino-plugin.tar.gz",
]
try:
container.exec(command, working_dir="/root").wait_output()
logger.info("Ranger plugin unpacked successfully")
except ExecError as err:
logger.error(err.stdout)
raise
container.exec(command, working_dir="/root").wait_output()

def _configure_plugin_properties(
self, container, policy_manager_url, policy_relation
Expand All @@ -235,26 +250,162 @@ def _configure_plugin_properties(
permissions=0o744,
)

@handle_exec_error
def _synchronize(self, config, container):
"""Handle synchronization of Ranger users, groups and group membership.

Args:
config: String of user and group configuration from Ranger relation.
container: Trino application container.
"""
data = yaml.safe_load(config)
self._sync(container, data["users"], "user")
AmberCharitos marked this conversation as resolved.
Show resolved Hide resolved
self._sync(container, data["groups"], "group")
self._sync(container, data["memberships"], "membership")
logger.info("User synchronization successful!")

@handle_exec_error
def _sync(self, container, apply_objects, member_type):
"""Synchronize Unix users and groups.

Args:
container: The container to run the command in.
apply_objects: The users and group mappings to be applied to Trino.
member_type: The type of Unix object to create, either "user" or "group".
"""
# get existing values
existing = self._get_unix(container, member_type)

# get values to apply
apply = self._transfornm_apply_values(apply_objects, member_type)

# create memnbers
to_create = [item for item in apply if item not in existing]
self._create_members(container, member_type, to_create)

# delete memberships
if member_type == "membership":
to_delete = [item for item in existing if item not in apply]
self._delete_memberships(container, to_delete)

@handle_exec_error
def _get_unix(self, container, member_type):
"""Get a list of Unix users or groups from the specified container.

Args:
container: The container to run the command in.
member_type: The Unix object to retrieve, either "user", "group" or "membership".

Returns:
values: Either a list of usernames/groups or a list of (group, user) tuples.
"""
member_type_mapping = UNIX_TYPE_MAPPING
command = ["getent", member_type_mapping[member_type]]

out = container.exec(command).wait_output()

# Split the output to rows.
rows = out[0].strip().split("\n")
if member_type == "membership":
AmberCharitos marked this conversation as resolved.
Show resolved Hide resolved
# Create a list of (group, user) tuples.
members = [(row.split(":")[0], row.split(":")[3]) for row in rows]
values = []
for group, users in members:
values += [
(group, user.strip())
for user in users.split(",")
if user.strip()
]
else:
# Split the output to rows and create a list of user or group values.
values = [row.split(":")[0] for row in rows]
return values

def _transfornm_apply_values(self, data, member_type):
"""Get list of users, groups or memberships to apply from configuration file.

Args:
data: User, group or membership data.
member_type: One of "user", "group" or "membership".

Returns:
List of users, groups or memberships to apply.
"""
if member_type in ["user", "group"]:
return [member["name"] for member in data]

membership_tuples = [
(membership["groupname"], user)
for membership in data
for user in membership["users"]
]
return membership_tuples

@handle_exec_error
def _create_members(self, container, member_type, to_create):
"""Create Unix users, groups or memberships.

Args:
container: The container to run the command in.
member_type: The Unix object to retrieve, either "user", "group" or "membership".
to_create: List of users, groups or memberships to create.
"""
for member in to_create:
logger.debug(f"Attempting to create {member_type}: {member}")

if member_type == "group":
command = [f"{member_type}add", member]
if member_type == "user":
command = [f"{member_type}add", "-c", "ranger", member]
elif member_type == "membership":
command = ["usermod", "-aG", member[0], member[1]]

container.exec(command).wait_output()

@handle_exec_error
def _delete_memberships(self, container, to_delete):
"""Delete Unix group memberships.

Args:
container: The container to run the command in.
to_delete: List of memberships to delete.
"""
for membership in to_delete:
user_info = self._get_user_gecos(container, membership[1])
AmberCharitos marked this conversation as resolved.
Show resolved Hide resolved
if "ranger" in user_info:
logger.debug(f"Attempting to delete membership {membership}")
container.exec(
["deluser", membership[1], membership[0]]
).wait_output()

@handle_exec_error
def _get_user_gecos(self, container, username):
"""Get the Gecos information for a specific user.

Args:
container: The container to run the command in.
username: The username for which to retrieve the Gecos information.

Returns:
user_info: The Gecos information for the user.
"""
out = container.exec(["getent", "passwd", username]).wait_output()
user_info = out[0].strip().split(":")[4]
return user_info

@handle_exec_error
def _disable_ranger_plugin(self, container):
"""Disable ranger plugin.

Args:
container: application container

Raises:
ExecError: in case unable to enable trino plugin
"""
command = [
"bash",
"disable-trino-plugin.sh",
]
try:
container.exec(
command,
working_dir=self.ranger_plugin_path,
environment=JAVA_ENV,
).wait_output()
logger.info("Ranger plugin disabled successfully")
except ExecError as err:
logger.error(err.stdout)
raise
container.exec(
command,
working_dir=self.ranger_plugin_path,
environment=JAVA_ENV,
).wait_output()
Loading