diff --git a/README.md b/README.md
index 8be3329..4bc2ea2 100644
--- a/README.md
+++ b/README.md
@@ -1,23 +1,140 @@
-# geoserverx
-A GeoServer REST API client influenced by HTTPX
+# GeoServerX
------------
+A modern, powerful GeoServer REST API client influenced by HTTPX, offering Sync, Async, and CLI capabilities.
-`geoserverx` allows `Sync`, `Async` as well as `CLI` Capabilities to talk to your GeoServer REST APIs which makes it ideal to be used in software development project which relies on content of GeoServer.
+[![Downloads](https://pepy.tech/badge/geoserverx)](https://pepy.tech/project/geoserverx)
+[![License: MIT](https://img.shields.io/badge/License-MIT-yellow.svg)](https://opensource.org/licenses/MIT)
+[![Code style: black](https://img.shields.io/badge/code%20style-black-000000.svg)](https://github.com/psf/black)
+[![Ruff](https://img.shields.io/endpoint?url=https://raw.githubusercontent.com/astral-sh/ruff/main/assets/badge/v2.json)](https://github.com/astral-sh/ruff)
+[![Imports: isort](https://img.shields.io/badge/%20imports-isort-%231674b1?style=flat&labelColor=ef8336)](https://pycqa.github.io/isort/)
+[![Built with Material for MkDocs](https://img.shields.io/badge/Material_for_MkDocs-526CFE?style=for-the-badge&logo=MaterialForMkDocs&logoColor=white)](https://squidfunk.github.io/mkdocs-material/)
-Here is a simplistic view of how geoserverx works under the hood
-![architecture](/docs/assets/images/arch.png "architecture")
+## Features
+- 🚀 Synchronous and Asynchronous API support
+- 🖥️ Command Line Interface (CLI)
+- 🔄 Complete GeoServer REST API coverage
+- 🏗️ Modern Pythonic interface
+- 📦 Easy to install and use
+- 📝 Type hints for better development experience
+## Installation
+```bash
+pip install geoserverx
+```
+## Quick Start
-## Contribution guide
+### Synchronous Usage
-### Feature/Bug request
-Please open new issue if you want to report any bug or send feature request
+```python
+from geoserverx._sync.gsx import SyncGeoServerX
-### Running on local
-`geoserverx` is built with [poetry](https://python-poetry.org/) and it uses following dependencies and dev-dependencies
+# Initialize with default GeoServer settings
+# Default: URL=http://localhost:8080/geoserver/rest, username=admin, password=geoserver
+geo_server = SyncGeoServerX()
-![layout](/docs/assets/images/layout.png "layout")
\ No newline at end of file
+# Custom initialization
+geo_server = SyncGeoServerX(
+ url="http://your-server:8080/geoserver/rest",
+ username="your_username",
+ password="your_password"
+)
+
+# Get all workspaces
+workspaces = geo_server.get_all_workspaces()
+
+# Access workspace information
+print(workspaces) # Print Workspaces Model
+print(workspaces.workspaces) # Print detailed list of workspaces
+print(workspaces.workspaces.workspace[0].name) # Print first workspace name
+```
+
+### Asynchronous Usage
+
+```python
+from geoserverx._async.gsx import AsyncGeoServerX
+import asyncio
+
+async def main():
+ geo_server = AsyncGeoServerX()
+ workspaces = await geo_server.get_all_workspaces()
+ print(workspaces)
+
+asyncio.run(main())
+```
+
+### CLI Usage
+
+```bash
+# Get help
+geoserverx --help
+
+# List workspaces
+geoserverx workspaces list
+
+# Add a new workspace
+geoserverx workspaces create --name my_workspace
+```
+
+## Documentation
+
+Detailed documentation is available through [Material for MkDocs](https://geobeyond.github.io/geoserverx/)
+
+## Development Setup
+
+Install Poetry (dependency management tool):
+
+```bash
+ curl -sSL https://install.python-poetry.org | python3 -
+```
+
+```bash
+# Clone the repository:
+git clone https://github.com/geobeyond/geoserverx.git
+cd geoserverx
+```
+
+```bash
+# Install dependencies:
+poetry install
+```
+
+```bash
+
+# Activate virtual environment:
+poetry shell
+```
+
+## Contributing
+
+We welcome contributions! Here's how you can help:
+
+- Check for open issues or create a new one to discuss new features or bugs.
+- Fork the repo and create a new branch for your contribution.
+- Write your code and tests.
+- Send a pull request.
+
+## Development Guidelines
+
+- Follow [Black](https://black.readthedocs.io/en/stable/) code style
+- Use [isort](https://pycqa.github.io/isort/index.html) ([black profile](https://pycqa.github.io/isort/docs/configuration/black_compatibility.html)) for import sorting
+- Use [Ruff](https://docs.astral.sh/ruff/) for linting
+- Ensure all tests pass
+- Add tests for new features
+- Update documentation as needed
+
+## Bug Reports and Feature Requests
+
+Please use the GitHub issue tracker to report bugs or request features.
+
+## License
+
+This project is licensed under the MIT License - see the LICENSE file for details.
+
+## Acknowledgments
+
+- HTTPX for inspiration
+- GeoServer community
+- All contributors who have helped shape this project
diff --git a/docs/pages/async/workspace.md b/docs/pages/async/workspace.md
index 54f45c4..4b7347b 100644
--- a/docs/pages/async/workspace.md
+++ b/docs/pages/async/workspace.md
@@ -1,8 +1,12 @@
-# Workspaces
+# Workspaces
-`geoserverx` allows users to access all/one workspace from GeoServer, along with ability to add new workspaces.
+`geoserverx` allows users to access all/one workspace from GeoServer, along with ability to do CRUD operations on workspaces.
+
+!!! get "Get started"
+To start using `geoserverx` in Sync mode, create a new instance of `AsyncGeoServerX` Class, read more about it [here](https://geobeyond.github.io/geoserverx/pages/async/)
## Get all workspaces
+
This command fetches all workspaces available in GeoServer. No parameters are required to be passed.
```py
@@ -11,21 +15,57 @@ await client.get_all_workspaces()
```
## Get single workspace
-This command fetches workspace with paramter as name of it from GeoServer.
+
+Fetches workspace details.
+
```Python
# Get workspace with name `cite`
await client.get_workspace('cite')
```
## Create workspace
-This command allows user to create new workspace.
+
+This command allows user to create new workspace.
Creating new workspace requires following parameters
-* Name `str` : To define Name of the workspace
-* default `bool` : To define whether to keep workspace as default or not
-* Isolated `bool` : To define whether to keep workspace Isolated or not
-
+| Parameter | Required | Default value | Data type | Description |
+| --------- | ----------------------------- | ------------- | --------- | ------------------------------------------------------ |
+| Name | :white_check_mark: | | `str` | To define Name of the workspace |
+| default | :negative_squared_cross_mark: | `False` | `bool` | To define whether to keep workspace as default or not |
+| isolated | :negative_squared_cross_mark: | `False` | `bool` | To define whether to keep workspace as Isolated or not |
+
```Python
#Create new workspace with name `my_wrkspc` , make it Default and Isolated
await client.create_workspace(name='my_wrkspc',default=True,Isolated=True)
```
+
+## Delete workspace
+
+This command allows user to delete workspace.
+
+| Parameter | Required | Default value | Data type | Description |
+| --------- | ----------------------------- | ------------- | --------- | ----------------------------------------------------------------------------------------------------------------------------------------------------- |
+| workspace | :white_check_mark: | | `str` | Name of the workspace |
+| recurse | :negative_squared_cross_mark: | `False` | `bool` | This parameter recursively deletes all layers referenced by the specified workspace, including data stores, coverage stores, feature types, and so on |
+
+```Python
+#Delete workspace with name `my_wrkspc`.
+await client.delete_workspace(workspace='my_wrkspc',recurse=True)
+```
+
+## Update workspace
+
+This command allows user to update existing workspace.
+Updating workspace requires following parameters
+
+| Parameter | Required | Data type | Description |
+| --------- | ------------------ | ----------------------------------------------------------------------------------------------------------------------------------------------------- | ------------------------------------ |
+| name | :white_check_mark: | `str` | Name of the workspace to be updated |
+| update | :white_check_mark: | [`UpdateWorkspaceInfo`](https://github.com/geobeyond/geoserverx/blob/b7757c9f0130864b06c40c2faa17afc841fc705f/src/geoserverx/models/workspace.py#L42) | To define body of the update request |
+
+```Python
+#Updating workspace with name `my_wrkspc` , make is Isolated and rename it to `my_new_wrkspc`
+from geoserverx.models.workspace import UpdateWorkspaceInfo
+
+await client.update_workspace(name='my_wrkspc',update=UpdateWorkspaceInfo(name='my_new_wrkspc',isolated=True))
+```
diff --git a/docs/pages/cli/workspace.md b/docs/pages/cli/workspace.md
index e0e8c75..4c6362a 100644
--- a/docs/pages/cli/workspace.md
+++ b/docs/pages/cli/workspace.md
@@ -5,8 +5,6 @@
!!! get "Get started"
To start using `geoserverx` using command line, activate the Environment where package is installed and use `gsx` command
-
-
## Paramters for all workspaces command
@@ -26,6 +24,7 @@ Options:
--username TEXT Geoserver username [default: admin]
--help Show this message and exit.
```
+
As listed above, `workspaces` command accepts four parameters.
@@ -40,12 +39,14 @@ All these parameters have default value setup which will work for local default
## Get all workspaces
+
```console
$ gsx workspaces
{"workspaces": {"workspace": [{"name": "cesium", "href":
"http://127.0.0.1:8080/geoserver/rest/workspaces/cesium.json"}]}}
```
+
## Get all workspaces of hosted GeoServer
@@ -56,6 +57,7 @@ $ gsx workspaces --url http://locahost:8080/geoserver/rest --password myPassword
{"workspaces": {"workspace": [{"name": "giz", "href":
"http://locahost:8080/geoserver/rest/workspaces/giz.json"}]}}
```
+
@@ -78,16 +80,17 @@ Options:
--username TEXT Geoserver username [default: admin]
--help Show this message and exit.
```
+
As listed above, `workspace` accepts `workspace` parameter as the name of workspace
-
## Get single workspaces
+
```console
-$ gsx workspace --workspace cesium
+$ gsx workspace cesium
{"workspace": {"name": "cesium", "isolated": false, "dateCreated": "2023-02-13
06:43:28.793 UTC", "dataStores":
"http://127.0.0.1:8080/geoserver/rest/workspaces/cesium/datastores.json",
@@ -96,10 +99,10 @@ $ gsx workspace --workspace cesium
"wmsStores": "http://127.0.0.1:8080/geoserver/rest/workspaces/cesium/wmsstores.json",
"wmtsStores": "http://127.0.0.1:8080/geoserver/rest/workspaces/cesium/wmtsstores.json"}}
```
-
+
-## Paramters for create workspace command
+## Parameters for create workspace command
@@ -120,6 +123,7 @@ Options:
--username TEXT Geoserver username [default: admin]
--help Show this message and exit.
```
+
As listed above, `create-workspace` command accepts parameters as follows
@@ -128,12 +132,95 @@ As listed above, `create-workspace` command accepts parameters as follows
* --default/--no-default - To keep workspace either default or not
* --isolated/--no-isolated - To keep workspace either isolated or not
-
## Create single workspaces
```console
-$ gsx create-workspace --workspace mydefaultws --default
+$ gsx create-workspace mydefaultws --default
code=201 response='Data added successfully'
```
-
\ No newline at end of file
+
+
+## Parameters for delete workspace command
+
+
+
+```console
+$ gsx delete-workspace --help
+Usage: gsx delete-workspace [OPTIONS]
+
+ Delete workspace in the Geoserver
+
+Arguments:
+ WORKSPACE [required]
+
+Options:
+ --request [sync|async] [default: requestEnum._sync]
+ --recurse / --no-recurse Delete all stores,layers,styles,etc. [default:
+ no-recurse]
+ --url TEXT Geoserver REST URL [default:
+ http://127.0.0.1:8080/geoserver/rest/]
+ --password TEXT Geoserver Password [default: geoserver]
+ --username TEXT Geoserver username [default: admin]
+ --help Show this message and exit.
+```
+
+
+
+As listed above, `delete-workspace` command accepts parameters as follows
+
+* --current_name - name of workspace
+* --recurse / --no-recurse - This parameter recursively deletes all layers referenced by the specified workspace, including data stores, coverage stores, feature types, and so on
+
+## Delete single workspaces
+
+
+```console
+gsx delete-workspace my_wrkspace --recurse
+{"code":200,"response":"Executed successfully"}
+```
+
+
+
+## Parameters for update workspace command
+
+
+
+```console
+$ gsx update-workspace --help
+Usage: gsx update-workspace [OPTIONS]
+
+ Add workspace in the Geoserver
+
+Arguments:
+ CURRENT_NAME [required]
+
+Options:
+ --request [sync|async] [default: requestEnum._sync]
+ --new-name TEXT New Workspace name
+ --isolated / --no-isolated Make workspace isolated? [default: no-isolated]
+ --url TEXT Geoserver REST URL [default:
+ http://127.0.0.1:8080/geoserver/rest/]
+ --password TEXT Geoserver Password [default: geoserver]
+ --username TEXT Geoserver username [default: admin]
+ --help Show this message and exit.
+```
+
+
+
+As listed above, `update-workspace` command accepts parameters as follows
+
+* --current-name - name of current workspace
+* --new-name - name of new workspace
+* --isolated/--no-isolated - To keep workspace either isolated or not
+
+## Update single workspaces
+
+
+
+```console
+gsx update-workspace d --new-name duster
+{"code":200,"response":"Executed successfully"}
+```
+
+
diff --git a/docs/pages/sync/workspace.md b/docs/pages/sync/workspace.md
index 667f821..e2b6568 100644
--- a/docs/pages/sync/workspace.md
+++ b/docs/pages/sync/workspace.md
@@ -1,8 +1,12 @@
-# Workspaces
+# Workspaces
-`geoserverx` allows users to access all/one workspace from GeoServer, along with ability to add new workspaces.
+`geoserverx` allows users to access all/one workspace from GeoServer, along with ability to do CRUD operations on workspaces.
+
+!!! get "Get started"
+To start using `geoserverx` in Sync mode, create a new instance of `SyncGeoServerX` Class, read more about it [here](https://geobeyond.github.io/geoserverx/pages/sync/)
## Get all workspaces
+
This command fetches all workspaces available in GeoServer. No paramters are required to be passed.
```Python
@@ -11,21 +15,55 @@ client.get_all_workspaces()
```
## Get single workspace
-This command fetches workspace with paramter as name of it from GeoServer.
+
+This command fetches workspace with parameter as name of it from GeoServer.
+
```Python
# Get workspace with name `cite`
client.get_workspace('cite')
```
## Create workspace
-This command allows user to create new workspace.
-Creating new workspace requires following parameters
-* Name `str` : To define Name of the workspace
-* default `bool` : To define whether to keep workspace as default or not
-* Isolated `bool` : To define whether to keep workspace Isolated or not
-
+This command allows user to create new workspace.
+
+| Parameter | Required | Default value | Data type | Description |
+| --------- | ----------------------------- | ------------- | --------- | ----------------------------------------------------- |
+| Name | :white_check_mark: | | `str` | To define Name of the workspace |
+| default | :negative_squared_cross_mark: | `False` | `bool` | To define whether to keep workspace as default or not |
+| isolated | :negative_squared_cross_mark: | `False` | `bool` | To define whether to keep workspace as default or not |
+
```Python
#Create new workspace with name `my_wrkspc` , make it Default and Isolated
client.create_workspace(name='my_wrkspc',default=True,Isolated=True)
```
+
+## Delete workspace
+
+This command allows user to delete workspace.
+
+| Parameter | Required | Default value | Data type | Description |
+| --------- | ----------------------------- | ------------- | --------- | ----------------------------------------------------------------------------------------------------------------------------------------------------- |
+| workspace | :white_check_mark: | | `str` | Name of the workspace |
+| recurse | :negative_squared_cross_mark: | `False` | `bool` | This parameter recursively deletes all layers referenced by the specified workspace, including data stores, coverage stores, feature types, and so on |
+
+```Python
+#Delete workspace with name `my_wrkspc`.
+client.delete_workspace(workspace='my_wrkspc',recurse=True)
+```
+
+## Update workspace
+
+This command allows user to update existing workspace.
+
+| Parameter | Required | Data type | Description |
+| --------- | ------------------ | ----------------------------------------------------------------------------------------------------------------------------------------------------- | ------------------------------------ |
+| name | :white_check_mark: | `str` | Name of the workspace to be updated |
+| update | :white_check_mark: | [`UpdateWorkspaceInfo`](https://github.com/geobeyond/geoserverx/blob/b7757c9f0130864b06c40c2faa17afc841fc705f/src/geoserverx/models/workspace.py#L42) | To define body of the update request |
+
+```Python
+#Updating workspace with name `my_wrkspc` , make is Isolated and rename it to `my_new_wrkspc`
+from geoserverx.models.workspace import UpdateWorkspaceInfo
+
+client.update_workspace(name='my_wrkspc',update=UpdateWorkspaceInfo(name='my_new_wrkspc',isolated=True))
+```
diff --git a/mkdocs.yml b/mkdocs.yml
index 153eac3..ef6b764 100644
--- a/mkdocs.yml
+++ b/mkdocs.yml
@@ -49,6 +49,10 @@ markdown_extensions:
- footnotes # notes bottom of page
- attr_list # used to size images
- md_in_html # used to size images
+ - tables
+ - pymdownx.emoji:
+ emoji_index: !!python/name:material.extensions.emoji.twemoji
+ emoji_generator: !!python/name:material.extensions.emoji.to_svg
- pymdownx.tabbed:
alternate_style: true
diff --git a/src/geoserverx/_async/gsx.py b/src/geoserverx/_async/gsx.py
index c8a7251..683d964 100644
--- a/src/geoserverx/_async/gsx.py
+++ b/src/geoserverx/_async/gsx.py
@@ -20,6 +20,8 @@
from geoserverx.models.workspace import (
NewWorkspace,
NewWorkspaceInfo,
+ UpdateWorkspace,
+ UpdateWorkspaceInfo,
WorkspaceModel,
WorkspacesModel,
)
@@ -46,7 +48,7 @@ class AsyncGeoServerX:
username: str = "admin"
password: str = "geoserver"
url: str = "http://127.0.0.1:8080/geoserver/rest/"
- head = {"Content-Type": "application/json"}
+ headers = {"Content-Type": "application/json"}
def __post_init__(self):
if not self.username and not self.password and not self.url:
@@ -98,10 +100,10 @@ def response_recognise(self, r) -> GSResponse:
# check if certain module/plugin exists in geoserver
async def check_modules(self, name) -> Union[bool, GSResponse]:
- Client = self.http_client
+ client = self.http_client
try:
- response = await Client.get("about/status.json")
- response.raise_for_status() # Raises an HTTPError for bad responses (4xx and 5xx)
+ response = await client.get("about/status.json")
+ response.raise_for_status() # Raises an HTTPError for bad response (4xx and 5xx)
# Extract and check the modules
modules = [
@@ -123,107 +125,110 @@ async def check_modules(self, name) -> Union[bool, GSResponse]:
# Handle Module not found exception
return GSResponse(code=412, response=str(e))
- # Get all workspaces
async def get_all_workspaces(self) -> Union[WorkspacesModel, GSResponse]:
- Client = self.http_client
- responses = await Client.get("workspaces")
- if responses.status_code == 200:
- return WorkspacesModel.model_validate(responses.json())
+ client = self.http_client
+ response = await client.get("workspaces")
+ if response.status_code == 200:
+ return WorkspacesModel.model_validate(response.json())
else:
- results = self.response_recognise(responses.status_code)
- return results
+ return self.response_recognise(response.status_code)
- # Get specific workspaces
async def get_workspace(self, workspace: str) -> Union[WorkspaceModel, GSResponse]:
- Client = self.http_client
- responses = await Client.get(f"workspaces/{workspace}")
- if responses.status_code == 200:
- return WorkspaceModel.model_validate(responses.json())
+ client = self.http_client
+ response = await client.get(f"workspaces/{workspace}")
+ if response.status_code == 200:
+ return WorkspaceModel.model_validate(response.json())
else:
- results = self.response_recognise(responses.status_code)
- return results
+ return self.response_recognise(response.status_code)
+
+ async def delete_workspace(
+ self, workspace: str, recurse: bool = False
+ ) -> GSResponse:
+ client = self.http_client
+ response = await client.delete(
+ f"workspaces/{workspace}", params={"recurse": recurse}
+ )
+ return self.response_recognise(response.status_code)
- # Create workspace
async def create_workspace(
self, name: str, default: bool = False, Isolated: bool = False
) -> GSResponse:
- Client = self.http_client
+ client = self.http_client
payload: NewWorkspace = NewWorkspace(
workspace=NewWorkspaceInfo(name=name, isolated=Isolated)
)
- responses = await Client.post(
- f"workspaces?default={default}",
+ response = await client.post(
+ "workspaces",
data=payload.model_dump_json(),
- headers=self.head,
+ headers=self.headers,
+ params={"default": default},
+ )
+ return self.response_recognise(response.status_code)
+
+ async def update_workspace(
+ self, name: str, update: UpdateWorkspaceInfo
+ ) -> GSResponse:
+ client = self.http_client
+ update_ws = UpdateWorkspace(workspace=update)
+ response = await client.put(
+ f"workspaces/{name}.json",
+ data=update_ws.model_dump_json(exclude_none=True),
+ headers=self.headers,
)
- results = self.response_recognise(responses.status_code)
- return results
+ return self.response_recognise(response.status_code)
- # Get vector stores in specific workspaces
async def get_vector_stores_in_workspaces(self, workspace: str) -> DataStoresModel:
- Client = self.http_client
- responses = await Client.get(f"workspaces/{workspace}/datastores")
- if responses.status_code == 200:
- return DataStoresModel.model_validate(responses.json())
+ client = self.http_client
+ response = await client.get(f"workspaces/{workspace}/datastores")
+ if response.status_code == 200:
+ return DataStoresModel.model_validate(response.json())
else:
- results = self.response_recognise(responses.status_code)
- return results
+ return self.response_recognise(response.status_code)
- # Get raster stores in specific workspaces
async def get_raster_stores_in_workspaces(
self, workspace: str
) -> CoveragesStoresModel:
- Client = self.http_client
- responses = await Client.get(f"workspaces/{workspace}/coveragestores")
- if responses.status_code == 200:
- return CoveragesStoresModel.model_validate(responses.json())
+ client = self.http_client
+ response = await client.get(f"workspaces/{workspace}/coveragestores")
+ if response.status_code == 200:
+ return CoveragesStoresModel.model_validate(response.json())
else:
- results = self.response_recognise(responses.status_code)
- return results
+ return self.response_recognise(response.status_code)
- # Get vector store information in specific workspaces
async def get_vector_store(self, workspace: str, store: str) -> DataStoreModel:
url = f"workspaces/{workspace}/datastores/{store}.json"
- Client = self.http_client
- responses = await Client.get(url)
- if responses.status_code == 200:
- return DataStoreModel.model_validate(responses.json())
+ client = self.http_client
+ response = await client.get(url)
+ if response.status_code == 200:
+ return DataStoreModel.model_validate(response.json())
else:
- results = self.response_recognise(responses.status_code)
- return results
+ return self.response_recognise(response.status_code)
- # Get raster store information in specific workspaces
async def get_raster_store(self, workspace: str, store: str) -> CoveragesStoreModel:
url = f"workspaces/{workspace}/coveragestores/{store}.json"
- Client = self.http_client
- responses = await Client.get(url)
- if responses.status_code == 200:
- return CoveragesStoreModel.model_validate(responses.json())
+ client = self.http_client
+ response = await client.get(url)
+ if response.status_code == 200:
+ return CoveragesStoreModel.model_validate(response.json())
else:
- results = self.response_recognise(responses.status_code)
- return results
+ return self.response_recognise(response.status_code)
- # Get all styles in GS
async def get_all_styles(self) -> AllStylesModel:
- Client = self.http_client
- responses = await Client.get("styles")
- if responses.status_code == 200:
- return AllStylesModel.model_validate(responses.json())
+ client = self.http_client
+ response = await client.get("styles")
+ if response.status_code == 200:
+ return AllStylesModel.model_validate(response.json())
else:
- results = self.response_recognise(responses.status_code)
- return results
+ return self.response_recognise(response.status_code)
- # Get specific style in GS
async def get_style(self, style: str) -> StyleModel:
- Client = self.http_client
- responses = await Client.get(f"styles/{style}.json")
- if responses.status_code == 200:
- return StyleModel.model_validate(responses.json())
+ client = self.http_client
+ response = await client.get(f"styles/{style}.json")
+ if response.status_code == 200:
+ return StyleModel.model_validate(response.json())
else:
- results = self.response_recognise(responses.status_code)
- return results
+ return self.response_recognise(response.status_code)
- # Add postgres db
async def create_pg_store(
self,
name: str,
@@ -247,14 +252,13 @@ async def create_pg_store(
).model_dump(exclude_none=True),
)
)
- Client = self.http_client
- responses = await Client.post(
+ client = self.http_client
+ response = await client.post(
f"workspaces/{workspace}/datastores/",
data=payload.model_dump_json(),
- headers=self.head,
+ headers=self.headers,
)
- results = self.response_recognise(responses.status_code)
- return results
+ return self.response_recognise(response.status_code)
async def create_file_store(self, workspace: str, store: str, file, service_type):
service: AddDataStoreProtocol = CreateFileStore()
@@ -275,8 +279,8 @@ async def create_file_store(self, workspace: str, store: str, file, service_type
)
else:
raise ValueError(f"Service type {service_type} not supported")
- responses = await service.addFile(self.http_client, workspace, store)
- return self.response_recognise(responses)
+ response = await service.addFile(self.http_client, workspace, store)
+ return self.response_recognise(response)
if service_type == "shapefile":
service = ShapefileStore(
@@ -293,54 +297,46 @@ async def create_file_store(self, workspace: str, store: str, file, service_type
raise ValueError(f"Service type {service_type} not supported")
await service.addFile(self.http_client, workspace, store)
- # Get all layers
async def get_all_layers(
self, workspace: Optional[str] = None
) -> Union[LayersModel, GSResponse]:
- Client = self.http_client
+ client = self.http_client
if workspace:
- responses = await Client.get(f"/workspaces/{workspace}/layers")
+ response = await client.get(f"/workspaces/{workspace}/layers")
else:
- responses = await Client.get("layers")
- if responses.status_code == 200:
- return LayersModel.model_validate(responses.json())
+ response = await client.get("layers")
+ if response.status_code == 200:
+ return LayersModel.model_validate(response.json())
else:
- results = self.response_recognise(responses.status_code)
- return results
+ return self.response_recognise(response.status_code)
- # Get specific layer
async def get_layer(
self, workspace: str, layer: str
) -> Union[LayerModel, GSResponse]:
- Client = self.http_client
- responses = await Client.get(f"layers/{workspace}:{layer}")
- if responses.status_code == 200:
- return LayerModel.model_validate(responses.json())
+ client = self.http_client
+ response = await client.get(f"layers/{workspace}:{layer}")
+ if response.status_code == 200:
+ return LayerModel.model_validate(response.json())
else:
- results = self.response_recognise(responses.status_code)
- return results
+ return self.response_recognise(response.status_code)
- # Delete specific layer
async def delete_layer(self, workspace: str, layer: str) -> GSResponse:
- Client = self.http_client
- responses = await Client.delete(f"layers/{workspace}:{layer}")
- results = self.response_recognise(responses.status_code)
- return results
+ client = self.http_client
+ response = await client.delete(f"layers/{workspace}:{layer}")
+ return self.response_recognise(response.status_code)
- # Get all layer groups
async def get_all_layer_groups(
self, workspace: Optional[str] = None
) -> Union[LayerGroupsModel, GSResponse]:
- Client = self.http_client
+ client = self.http_client
if workspace:
- responses = await Client.get(f"workspaces/{workspace}/layergroups")
+ response = await client.get(f"workspaces/{workspace}/layergroups")
else:
- responses = await Client.get("layergroups")
- if responses.status_code == 200:
- return LayerGroupsModel.model_validate(responses.json())
+ response = await client.get("layergroups")
+ if response.status_code == 200:
+ return LayerGroupsModel.model_validate(response.json())
else:
- results = self.response_recognise(responses.status_code)
- return results
+ return self.response_recognise(response.status_code)
# Get system status info
async def system_status(self) -> Union[MetricsDataModel, GSResponse]:
@@ -357,39 +353,35 @@ async def system_status(self) -> Union[MetricsDataModel, GSResponse]:
# Get all geofence rules
async def get_all_geofence_rules(self) -> Union[RulesResponse, GSResponse]:
- Client = self.http_client
+ client = self.http_client
# Check if the geofence plugin exists
module_check = await self.check_modules("geofence")
# If the module check fails, return the GSResponse directly
if isinstance(module_check, GSResponse):
return module_check
- responses = await Client.get(
+ response = await client.get(
"geofence/rules/", headers={"Accept": "application/json"}
)
- if responses.status_code == 200:
- return RulesResponse.model_validate(responses.json())
+ if response.status_code == 200:
+ return RulesResponse.model_validate(response.json())
else:
- results = self.response_recognise(responses.status_code)
- return results
+ return self.response_recognise(response.status_code)
- # Get geofence rule by id
async def get_geofence_rule(self, id: int) -> Union[Rule, GSResponse]:
- Client = self.http_client
+ client = self.http_client
# Check if the geofence plugin exists
module_check = await self.check_modules("geofence")
# If the module check fails, return the GSResponse directly
if isinstance(module_check, GSResponse):
return module_check
- responses = await Client.get(
+ response = await client.get(
f"geofence/rules/id/{id}", headers={"Accept": "application/json"}
)
- if responses.status_code == 200:
- return Rule.model_validate(responses.json())
+ if response.status_code == 200:
+ return Rule.model_validate(response.json())
else:
- results = self.response_recognise(responses.status_code)
- return results
+ return self.response_recognise(response.status_code)
- # Create geofence on geoserver
async def create_geofence(self, rule: Rule) -> GSResponse:
PostingRule = NewRule(Rule=rule)
# Check if the geofence plugin exists
@@ -397,11 +389,10 @@ async def create_geofence(self, rule: Rule) -> GSResponse:
# If the module check fails, return the GSResponse directly
if isinstance(module_check, GSResponse):
return module_check
- Client = self.http_client
- responses = await Client.post(
+ client = self.http_client
+ response = await client.post(
"geofence/rules",
content=PostingRule.model_dump_json(),
- headers=self.head,
+ headers=self.headers,
)
- results = self.response_recognise(responses.status_code)
- return results
+ return self.response_recognise(response.status_code)
diff --git a/src/geoserverx/_sync/gsx.py b/src/geoserverx/_sync/gsx.py
index cab8631..6fe78b3 100644
--- a/src/geoserverx/_sync/gsx.py
+++ b/src/geoserverx/_sync/gsx.py
@@ -23,6 +23,8 @@
from geoserverx.models.workspace import (
NewWorkspace,
NewWorkspaceInfo,
+ UpdateWorkspace,
+ UpdateWorkspaceInfo,
WorkspaceModel,
WorkspacesModel,
)
@@ -49,7 +51,7 @@ class SyncGeoServerX:
username: str = "admin"
password: str = "geoserver"
url: str = "http://127.0.0.1:8080/geoserver/rest/"
- head = {"Content-Type": "application/json"}
+ headers = {"Content-Type": "application/json"}
def __post_init__(self):
if not self.username and not self.password and not self.url:
@@ -113,10 +115,10 @@ def inner_function(*args, **kwargs):
# check if certain module/plugin exists in geoserver
@exception_handler
def check_modules(self, name) -> Union[bool, GSResponse]:
- Client = self.http_client
+ client = self.http_client
try:
- response = Client.get("about/status.json")
- response.raise_for_status() # Raises an HTTPError for bad responses (4xx and 5xx)
+ response = client.get("about/status.json")
+ response.raise_for_status() # Raises an HTTPError for bad response (4xx and 5xx)
# Extract and check the modules
modules = [
@@ -138,29 +140,30 @@ def check_modules(self, name) -> Union[bool, GSResponse]:
# Handle Module not found exception
return GSResponse(code=412, response=str(e))
- # Get all workspaces
@exception_handler
def get_all_workspaces(self) -> Union[WorkspacesModel, GSResponse]:
- Client = self.http_client
- responses = Client.get("workspaces")
- if responses.status_code == 200:
- return WorkspacesModel.model_validate(responses.json())
+ client = self.http_client
+ response = client.get("workspaces")
+ if response.status_code == 200:
+ return WorkspacesModel.model_validate(response.json())
else:
- results = self.response_recognise(responses.status_code)
- return results
+ return self.response_recognise(response.status_code)
- # Get specific workspaces
@exception_handler
def get_workspace(self, workspace: str) -> Union[WorkspaceModel, GSResponse]:
- Client = self.http_client
- responses = Client.get(f"workspaces/{workspace}")
- if responses.status_code == 200:
- return WorkspaceModel.model_validate(responses.json())
+ client = self.http_client
+ response = client.get(f"workspaces/{workspace}")
+ if response.status_code == 200:
+ return WorkspaceModel.model_validate(response.json())
else:
- results = self.response_recognise(responses.status_code)
- return results
+ return self.response_recognise(response.status_code)
+
+ @exception_handler
+ def delete_workspace(self, workspace: str, recurse: bool = False) -> GSResponse:
+ client = self.http_client
+ response = client.delete(f"workspaces/{workspace}", params={"recurse": recurse})
+ return self.response_recognise(response.status_code)
- # Create workspace on geoserver
@exception_handler
def create_workspace(
self, name: str, default: bool = False, Isolated: bool = False
@@ -168,125 +171,118 @@ def create_workspace(
payload: NewWorkspace = NewWorkspace(
workspace=NewWorkspaceInfo(name=name, isolated=Isolated)
)
- Client = self.http_client
- responses = Client.post(
- f"workspaces?default={default}",
+ client = self.http_client
+ response = client.post(
+ "workspaces",
content=payload.model_dump_json(),
- headers=self.head,
+ params={"default": default},
+ headers=self.headers,
+ )
+ return self.response_recognise(response.status_code)
+
+ @exception_handler
+ def update_workspace(self, name: str, update: UpdateWorkspaceInfo) -> GSResponse:
+ client = self.http_client
+ update_ws = UpdateWorkspace(workspace=update)
+ response = client.put(
+ f"workspaces/{name}.json",
+ data=update_ws.model_dump_json(exclude_none=True),
+ headers=self.headers,
)
- results = self.response_recognise(responses.status_code)
- return results
+ return self.response_recognise(response.status_code)
- # Get vector stores in specific workspaces
@exception_handler
def get_vector_stores_in_workspaces(self, workspace: str) -> DataStoresModel:
- Client = self.http_client
- responses = Client.get(f"workspaces/{workspace}/datastores")
- if responses.status_code == 200:
- return DataStoresModel.model_validate(responses.json())
+ client = self.http_client
+ response = client.get(f"workspaces/{workspace}/datastores")
+ if response.status_code == 200:
+ return DataStoresModel.model_validate(response.json())
else:
- results = self.response_recognise(responses.status_code)
- return results
+ return self.response_recognise(response.status_code)
- # Get raster stores in specific workspaces
@exception_handler
def get_raster_stores_in_workspaces(self, workspace: str) -> CoveragesStoresModel:
- Client = self.http_client
- responses = Client.get(f"workspaces/{workspace}/coveragestores")
- if responses.status_code == 200:
- return CoveragesStoresModel.model_validate(responses.json())
+ client = self.http_client
+ response = client.get(f"workspaces/{workspace}/coveragestores")
+ if response.status_code == 200:
+ return CoveragesStoresModel.model_validate(response.json())
else:
- results = self.response_recognise(responses.status_code)
- return results
+ return self.response_recognise(response.status_code)
- # Get vector store information in specific workspaces
@exception_handler
def get_vector_store(self, workspace: str, store: str) -> DataStoreModel:
url = f"workspaces/{workspace}/datastores/{store}.json"
- Client = self.http_client
- responses = Client.get(url)
- if responses.status_code == 200:
- return DataStoreModel.model_validate(responses.json())
+ client = self.http_client
+ response = client.get(url)
+ if response.status_code == 200:
+ return DataStoreModel.model_validate(response.json())
else:
- results = self.response_recognise(responses.status_code)
- return results
+ return self.response_recognise(response.status_code)
- # create vector store in specific workspaces
@exception_handler
def create_vector_store(self, workspace: str, store: DataStoresModel) -> GSResponse:
- Client = self.http_client
- responses = Client.post(
+ client = self.http_client
+ response = client.post(
f"workspaces/{workspace}/datastores",
content=store.model_dump_json(),
- headers=self.head,
+ headers=self.headers,
)
- results = self.response_recognise(responses.status_code)
- return results
+ return self.response_recognise(response.status_code)
- # Get raster store information in specific workspaces
@exception_handler
def get_raster_store(self, workspace: str, store: str) -> CoveragesStoreModel:
url = f"workspaces/{workspace}/coveragestores/{store}.json"
- Client = self.http_client
- responses = Client.get(url)
- if responses.status_code == 200:
- return CoveragesStoreModel.model_validate(responses.json())
+ client = self.http_client
+ response = client.get(url)
+ if response.status_code == 200:
+ return CoveragesStoreModel.model_validate(response.json())
else:
- results = self.response_recognise(responses.status_code)
- return results
+ return self.response_recognise(response.status_code)
- # Get raster store information in specific workspaces
@exception_handler
def create_raster_store(
self, workspace: str, store: CoveragesStoreModel
) -> GSResponse:
- Client = self.http_client
- responses = Client.post(
+ client = self.http_client
+ response = client.post(
f"workspaces/{workspace}/coveragestores",
content=store.model_dump_json(),
- headers=self.head,
+ headers=self.headers,
)
- results = self.response_recognise(responses.status_code)
- return results
+ return self.response_recognise(response.status_code)
- # delete store in specific workspaces
@exception_handler
def delete_store(
self, workspace: str, store: str, type: str
) -> GSResponse: # TODO : add enum for type
- Client = self.http_client
+ client = self.http_client
if type == "raster":
- responses = Client.delete(
- f"/workspaces/{workspace}/coveragestores/{store}", headers=self.head
+ response = client.delete(
+ f"/workspaces/{workspace}/coveragestores/{store}", headers=self.headers
)
elif type == "vector":
- responses = Client.delete(
- f"/workspaces/{workspace}/datastores/{store}", headers=self.head
+ response = client.delete(
+ f"/workspaces/{workspace}/datastores/{store}", headers=self.headers
)
- results = self.response_recognise(responses.status_code)
- return results
+ return self.response_recognise(response.status_code)
- # Get all styles in GS
@exception_handler
def get_all_styles(self) -> AllStylesModel:
- Client = self.http_client
- responses = Client.get("styles")
- if responses.status_code == 200:
- return AllStylesModel.model_validate(responses.json())
+ client = self.http_client
+ response = client.get("styles")
+ if response.status_code == 200:
+ return AllStylesModel.model_validate(response.json())
else:
- results = self.response_recognise(responses.status_code)
- return results
+ return self.response_recognise(response.status_code)
- # Get specific style in GS
@exception_handler
def get_style(self, style: str) -> StyleModel:
- Client = self.http_client
- responses = Client.get(f"styles/{style}.json")
- if responses.status_code == 200:
- return StyleModel.model_validate(responses.json())
+ client = self.http_client
+ response = client.get(f"styles/{style}.json")
+ if response.status_code == 200:
+ return StyleModel.model_validate(response.json())
else:
- results = self.response_recognise(responses.status_code)
- return results
+ return self.response_recognise(response.status_code)
@exception_handler
def create_file_store(
@@ -304,10 +300,9 @@ def create_file_store(
)
else:
raise ValueError(f"Service type {service_type} not supported")
- responses = service.addFile(self.http_client, workspace, store)
- return self.response_recognise(responses)
+ response = service.addFile(self.http_client, workspace, store)
+ return self.response_recognise(response)
- # Create workspace
@exception_handler
def create_pg_store(
self,
@@ -332,73 +327,67 @@ def create_pg_store(
).model_dump(exclude_none=True),
)
)
- Client = self.http_client
- responses = Client.post(
+ client = self.http_client
+ response = client.post(
f"workspaces/{workspace}/datastores/",
data=payload.model_dump_json(),
- headers=self.head,
+ headers=self.headers,
)
- results = self.response_recognise(responses.status_code)
- return results
+ return self.response_recognise(response.status_code)
- # Get all layers
@exception_handler
def get_all_layers(
self, workspace: Optional[str] = None
) -> Union[LayersModel, GSResponse]:
- Client = self.http_client
+ client = self.http_client
if workspace:
- responses = Client.get(f"/workspaces/{workspace}/layers")
+ response = client.get(f"/workspaces/{workspace}/layers")
else:
- responses = Client.get("layers")
- if responses.status_code == 200:
- return LayersModel.model_validate(responses.json())
+ response = client.get("layers")
+ if response.status_code == 200:
+ return LayersModel.model_validate(response.json())
else:
- results = self.response_recognise(responses.status_code)
- return results
+ return self.response_recognise(response.status_code)
@exception_handler
def get_vector_layer(
self, workspace: str, store: str, layer: str
) -> Union[FeatureTypesModel, GSResponse]:
- Client = self.http_client
- responses = Client.get(
+ client = self.http_client
+ response = client.get(
f"/workspaces/{workspace}/datastores/{store}/featuretypes/{layer}.json"
)
- if responses.status_code == 200:
+ if response.status_code == 200:
try:
- return FeatureTypesModel.parse_obj(responses.json())
+ return FeatureTypesModel.parse_obj(response.json())
except ValidationError as validation_error:
print("Pydantic Validation Error:")
print(validation_error)
else:
- results = self.response_recognise(responses.status_code)
- return results
+ return self.response_recognise(response.status_code)
@exception_handler
def get_raster_layer(
self, workspace: str, store: str, layer: str
) -> Union[CoverageModel, GSResponse]:
- Client = self.http_client
- responses = Client.get(
+ client = self.http_client
+ response = client.get(
f"/workspaces/{workspace}/coveragestores/{store}/coverages/{layer}.json"
)
- if responses.status_code == 200:
- return CoverageModel.parse_obj(responses.json())
+ if response.status_code == 200:
+ return CoverageModel.parse_obj(response.json())
else:
- results = self.response_recognise(responses.status_code)
- return results
+ return self.response_recognise(response.status_code)
- # Get specific layer
@exception_handler
def get_layer(
self, workspace: str, layer: str, detail: bool = False
) -> Union[LayerModel, FeatureTypesModel, GSResponse]:
- Client = self.http_client
- responses = Client.get(f"layers/{workspace}:{layer}")
- if responses.status_code == 200:
+ client = self.http_client
+ response = client.get(f"layers/{workspace}:{layer}")
+ if response.status_code == 200:
if detail:
- res = responses.json()
+ res = response.json()
if res["layer"]["type"] == "VECTOR":
result = self.get_vector_layer(
workspace,
@@ -412,58 +401,51 @@ def get_layer(
)
return CoverageModel.parse_obj(result.dict())
else:
- return LayerModel.parse_obj(responses.json())
+ return LayerModel.parse_obj(response.json())
else:
- results = self.response_recognise(responses.status_code)
- return results
+ return self.response_recognise(response.status_code)
@exception_handler
def create_vector_layer(
self, workspace: str, layer: FeatureTypesModel
) -> GSResponse:
- Client = self.http_client
- responses = Client.post(
+ client = self.http_client
+ response = client.post(
f"/workspaces/{workspace}/featuretypes",
data=layer.model_dump(by_alias=True, exclude_none=True),
- headers=self.head,
+ headers=self.headers,
)
- results = self.response_recognise(responses.status_code)
- return results
+ return self.response_recognise(response.status_code)
@exception_handler
def create_raster_layer(self, workspace: str, layer: CoverageModel) -> GSResponse:
- Client = self.http_client
- responses = Client.post(
+ client = self.http_client
+ response = client.post(
f"/workspaces/{workspace}/coverages",
data=layer.model_dump_json(),
- headers=self.head,
+ headers=self.headers,
)
- results = self.response_recognise(responses.status_code)
- return results
+ return self.response_recognise(response.status_code)
- # Delete specific layer
@exception_handler
def delete_layer(self, workspace: str, layer: str) -> GSResponse:
- Client = self.http_client
- responses = Client.delete(f"layers/{workspace}:{layer}")
- results = self.response_recognise(responses.status_code)
- return results
+ client = self.http_client
+ response = client.delete(f"layers/{workspace}:{layer}")
+ return self.response_recognise(response.status_code)
- # Get all layer groups
@exception_handler
def get_all_layer_groups(
self, workspace: Optional[str] = None
) -> Union[LayerGroupsModel, GSResponse]:
- Client = self.http_client
+ client = self.http_client
if workspace:
- responses = Client.get(f"workspaces/{workspace}/layergroups")
+ response = client.get(f"workspaces/{workspace}/layergroups")
else:
- responses = Client.get("layergroups")
- if responses.status_code == 200:
- return LayerGroupsModel.model_validate(responses.json())
+ response = client.get("layergroups")
+ if response.status_code == 200:
+ return LayerGroupsModel.model_validate(response.json())
else:
- results = self.response_recognise(responses.status_code)
- return results
+ return self.response_recognise(response.status_code)
# Get system status info
@exception_handler
@@ -482,41 +464,35 @@ def system_status(self) -> Union[MetricsDataModel, GSResponse]:
# Get all geofence rules
@exception_handler
def get_all_geofence_rules(self) -> Union[RulesResponse, GSResponse]:
- Client = self.http_client
+ client = self.http_client
# Check if the geofence plugin exists
module_check = self.check_modules("geofence")
# If the module check fails, return the GSResponse directly
if isinstance(module_check, GSResponse):
return module_check
# Make the HTTP request to fetch geofence rules
- responses = Client.get(
- "geofence/rules/", headers={"Accept": "application/json"}
- )
- if responses.status_code == 200:
- return RulesResponse.model_validate(responses.json())
+ response = client.get("geofence/rules/", headers={"Accept": "application/json"})
+ if response.status_code == 200:
+ return RulesResponse.model_validate(response.json())
else:
- results = self.response_recognise(responses.status_code)
- return results
+ return self.response_recognise(response.status_code)
- # Get geofence rule by id
@exception_handler
def get_geofence_rule(self, id: int) -> Union[GetRule, GSResponse]:
- Client = self.http_client
+ client = self.http_client
# Check if the geofence plugin exists
module_check = self.check_modules("geofence")
# If the module check fails, return the GSResponse directly
if isinstance(module_check, GSResponse):
return module_check
- responses = Client.get(
+ response = client.get(
f"geofence/rules/id/{id}", headers={"Accept": "application/json"}
)
- if responses.status_code == 200:
- return Rule.model_validate(responses.json())
+ if response.status_code == 200:
+ return Rule.model_validate(response.json())
else:
- results = self.response_recognise(responses.status_code)
- return results
+ return self.response_recognise(response.status_code)
- # Create geofence on geoserver
@exception_handler
def create_geofence(self, rule: Rule) -> GSResponse:
PostingRule = NewRule(Rule=rule)
@@ -525,11 +501,10 @@ def create_geofence(self, rule: Rule) -> GSResponse:
# If the module check fails, return the GSResponse directly
if isinstance(module_check, GSResponse):
return module_check
- Client = self.http_client
- responses = Client.post(
+ client = self.http_client
+ response = client.post(
"geofence/rules",
content=PostingRule.model_dump_json(),
- headers=self.head,
+ headers=self.headers,
)
- results = self.response_recognise(responses.status_code)
- return results
+ return self.response_recognise(response.status_code)
diff --git a/src/geoserverx/cli/cli.py b/src/geoserverx/cli/cli.py
index b4c5fa8..169dca2 100644
--- a/src/geoserverx/cli/cli.py
+++ b/src/geoserverx/cli/cli.py
@@ -1,13 +1,22 @@
-import json
from enum import Enum
from pathlib import Path
+from typing import Optional
import typer
from rich import print
+from rich.console import Console
+from rich.table import Table
from geoserverx._sync.gsx import SyncGeoServerX
+from geoserverx.models.workspace import UpdateWorkspaceInfo
app = typer.Typer()
+console = Console()
+
+
+class OutputFormats(str, Enum):
+ json = "json"
+ table = "table"
@app.callback()
@@ -29,7 +38,6 @@ class vectorFileEnum(str, Enum):
gpkg = "gpkg"
-# get all workspaces
@SyncGeoServerX.exception_handler
@app.command(help="Get all workspaces in the Geoserver")
def workspaces(
@@ -39,53 +47,102 @@ def workspaces(
),
password: str = typer.Option("geoserver", help="Geoserver Password"),
username: str = typer.Option("admin", help="Geoserver username"),
+ output: OutputFormats = typer.Option(OutputFormats.table),
):
"""
Get all workspaces in the Geoserver
+ looks like - gsx workspaces --url --username --password
"""
if request.value == "sync":
client = SyncGeoServerX(username, password, url)
- result = client.get_all_workspaces().model_dump_json()
+ result = client.get_all_workspaces()
if "code" in result:
- typer.secho(result, fg=typer.colors.RED)
- else:
print(result)
+ else:
+ if output == "json":
+ print(result.model_dump_json(indent=2))
+ else:
+ try:
+ table = Table("Name", "Link")
+ for workspace in result.workspaces.workspace:
+ table.add_row(workspace.name, workspace.href)
+ console.print(table)
+ except AttributeError:
+ print(result.response)
else:
- typer.echo("Async support will be shortly")
+ print("Async support will be shortly")
-# get workspace
@SyncGeoServerX.exception_handler
@app.command(help="Get workspace in the Geoserver")
def workspace(
+ workspace: str,
request: requestEnum = requestEnum._sync,
- workspace: str = typer.Option(..., help="Workspace name"),
url: str = typer.Option(
"http://127.0.0.1:8080/geoserver/rest/", help="Geoserver REST URL"
),
password: str = typer.Option("geoserver", help="Geoserver Password"),
username: str = typer.Option("admin", help="Geoserver username"),
+ output: OutputFormats = typer.Option(OutputFormats.table),
):
"""
Get workspace in the Geoserver
+ looks like - gsx workspace --url --username --password
"""
if request.value == "sync":
client = SyncGeoServerX(username, password, url)
- result = client.get_workspace(workspace).model_dump_json()
+ result = client.get_workspace(workspace)
if "code" in result:
- typer.secho(result, fg=typer.colors.RED)
- else:
print(result)
+ else:
+ if output == "json":
+ print(result.model_dump_json(indent=2))
+ else:
+ try:
+ table = Table("Column", "Value")
+ table.add_row("name", result.workspace.name)
+ table.add_row("isolated", str(result.workspace.isolated))
+ table.add_row("dateCreated", result.workspace.dateCreated)
+ table.add_row("dataStores", result.workspace.dataStores)
+ table.add_row("coverageStores", result.workspace.coverageStores)
+ table.add_row("wmsStores", result.workspace.wmsStores)
+ table.add_row("wmtsStores", result.workspace.wmtsStores)
+ console.print(table)
+ except AttributeError:
+ print(result.response)
else:
- typer.echo("Async support will be shortly")
+ print("Async support will be shortly")
+
+
+@SyncGeoServerX.exception_handler
+@app.command(help="Delete workspace in the Geoserver")
+def delete_workspace(
+ workspace: str,
+ request: requestEnum = requestEnum._sync,
+ recurse: bool = typer.Option(False, help="Delete all stores,layers,styles,etc."),
+ url: str = typer.Option(
+ "http://127.0.0.1:8080/geoserver/rest/", help="Geoserver REST URL"
+ ),
+ password: str = typer.Option("geoserver", help="Geoserver Password"),
+ username: str = typer.Option("admin", help="Geoserver username"),
+):
+ """
+ Delete workspace in the Geoserver
+ looks like - gsx delete-workspace --recurse/--no-recurse --url --username --password
+ """
+ if request.value == "sync":
+ client = SyncGeoServerX(username, password, url)
+ result = client.delete_workspace(workspace, recurse)
+ print(result.response)
+ else:
+ print("Async support will be shortly")
-# create workspace
@SyncGeoServerX.exception_handler
@app.command(help="Add workspace in the Geoserver")
def create_workspace(
+ workspace: str,
request: requestEnum = requestEnum._sync,
- workspace: str = typer.Option(..., help="Workspace name"),
default: bool = typer.Option(False, help="Make workspace default?"),
isolated: bool = typer.Option(False, help="Make workspace isolated?"),
url: str = typer.Option(
@@ -96,21 +153,46 @@ def create_workspace(
):
"""
Add workspace in the Geoserver
- looks like - gsx create-workspace --workspace --default/--no-default --isolated/--no-isolated --username --password
+ looks like - gsx create-workspace --default/--no-default --isolated/--no-isolated --url --username --password
"""
if request.value == "sync":
client = SyncGeoServerX(username, password, url)
- result = client.create_workspace(workspace, default, isolated).model_dump_json()
- if json.loads(result)["code"] == 201:
- typer.secho(result, fg=typer.colors.GREEN)
- else:
- typer.secho(result, fg=typer.colors.RED)
+ result = client.create_workspace(workspace, default, isolated)
+ print(result.response)
else:
- typer.echo("Async support will be shortly")
+ print("Async support will be shortly")
+
+
+@SyncGeoServerX.exception_handler
+@app.command(help="Add workspace in the Geoserver")
+def update_workspace(
+ current_name: str,
+ request: requestEnum = requestEnum._sync,
+ new_name: Optional[str] = typer.Option(None, help="New Workspace name"),
+ isolated: Optional[bool] = typer.Option(False, help="Make workspace isolated?"),
+ url: str = typer.Option(
+ "http://127.0.0.1:8080/geoserver/rest/", help="Geoserver REST URL"
+ ),
+ password: str = typer.Option("geoserver", help="Geoserver Password"),
+ username: str = typer.Option("admin", help="Geoserver username"),
+):
+ """
+ Update existing workspace in the Geoserver
+ looks like - gsx update-workspace --new-name --isolated/--no-isolated --username --password
+ """
+ if request.value == "sync":
+ client = SyncGeoServerX(username, password, url)
+ result = client.update_workspace(
+ current_name,
+ UpdateWorkspaceInfo(name=new_name, isolated=isolated),
+ )
+ print(result.response)
+
+ else:
+ print("Async support will be shortly")
-# Get vector stores in specific workspaces
@SyncGeoServerX.exception_handler
@app.command(help="Get vector stores in specific workspaces")
def vector_st_wp(
@@ -128,15 +210,11 @@ def vector_st_wp(
if request.value == "sync":
client = SyncGeoServerX(username, password, url)
result = client.get_vector_stores_in_workspaces(workspace).model_dump_json()
- if "code" in result:
- typer.secho(result, fg=typer.colors.RED)
- else:
- print(result)
+ print(result)
else:
- typer.echo("Async support will be shortly")
+ print("Async support will be shortly")
-# Get raster stores in specific workspaces
@SyncGeoServerX.exception_handler
@app.command(help="Get raster stores in specific workspaces")
def raster_st_wp(
@@ -154,15 +232,11 @@ def raster_st_wp(
if request.value == "sync":
client = SyncGeoServerX(username, password, url)
result = client.get_raster_stores_in_workspaces(workspace).model_dump_json()
- if "code" in result:
- typer.secho(result, fg=typer.colors.RED)
- else:
- print(result)
+ print(result)
else:
- typer.echo("Async support will be shortly")
+ print("Async support will be shortly")
-# Get vector store information in specific workspaces
@SyncGeoServerX.exception_handler
@app.command(help="Get vector store information in specific workspaces")
def vector_store(
@@ -181,15 +255,11 @@ def vector_store(
if request.value == "sync":
client = SyncGeoServerX(username, password, url)
result = client.get_vector_store(workspace, store).model_dump_json()
- if "code" in result:
- typer.secho(result, fg=typer.colors.RED)
- else:
- print(result)
+ print(result)
else:
- typer.echo("Async support will be shortly")
+ print("Async support will be shortly")
-# Get raster store information in specific workspaces
@SyncGeoServerX.exception_handler
@app.command(help="Get raster store information in specific workspaces")
def raster_store(
@@ -208,15 +278,11 @@ def raster_store(
if request.value == "sync":
client = SyncGeoServerX(username, password, url)
result = client.get_raster_store(workspace, store).model_dump_json()
- if "code" in result:
- typer.secho(result, fg=typer.colors.RED)
- else:
- print(result)
+ print(result)
else:
- typer.echo("Async support will be shortly")
+ print("Async support will be shortly")
-# Get all styles in Geoserver
@SyncGeoServerX.exception_handler
@app.command(help="Get all styles in Geoserver")
def styles(
@@ -233,15 +299,11 @@ def styles(
if request.value == "sync":
client = SyncGeoServerX(username, password, url)
result = client.get_all_styles().model_dump_json()
- if "code" in result:
- typer.secho(result, fg=typer.colors.RED)
- else:
- print(result)
+ print(result)
else:
- typer.echo("Async support will be shortly")
+ print("Async support will be shortly")
-# Get style in Geoserver
@SyncGeoServerX.exception_handler
@app.command(help="Get style in Geoserver")
def style(
@@ -259,15 +321,11 @@ def style(
if request.value == "sync":
client = SyncGeoServerX(username, password, url)
result = client.get_style(style).model_dump_json()
- if "code" in result:
- typer.secho(result, fg=typer.colors.RED)
- else:
- print(result)
+ print(result)
else:
- typer.echo("Async support will be shortly")
+ print("Async support will be shortly")
-# Create Vector Layer in Geoserver
@SyncGeoServerX.exception_handler
@app.command(help="Create Vector Layer in Geoserver")
def create_file(
@@ -292,17 +350,13 @@ def create_file(
result = client.create_file_store(
workspace, store, files.read(), service_type
)
- if result.code == 201:
- typer.secho(result, fg=typer.colors.GREEN)
- else:
- typer.secho(result, fg=typer.colors.RED)
+ print(result)
except Exception:
- typer.secho("File path is incorrect", fg=typer.colors.YELLOW)
+ print("File path is incorrect")
else:
- typer.echo("Async support will be shortly")
+ print("Async support will be shortly")
-# Create PostgreSQL store in Geoserver
@SyncGeoServerX.exception_handler
@app.command(help="Create PostgreSQL store in Geoserver")
def create_pg_store(
@@ -334,15 +388,11 @@ def create_pg_store(
password=dbpwd,
database=dbname,
)
- if result.code == 201:
- typer.secho(result, fg=typer.colors.GREEN)
- else:
- typer.secho(result, fg=typer.colors.RED)
+ print(result)
else:
- typer.echo("Async support will be shortly")
+ print("Async support will be shortly")
-# get all layers
@SyncGeoServerX.exception_handler
@app.command(help="Get all layers in the Geoserver")
def layers(
@@ -360,15 +410,11 @@ def layers(
if request.value == "sync":
client = SyncGeoServerX(username, password, url)
result = client.get_all_layers(workspace).model_dump_json()
- if "code" in result:
- typer.secho(result, fg=typer.colors.RED)
- else:
- print(result)
+ print(result)
else:
- typer.echo("Async support will be shortly")
+ print("Async support will be shortly")
-# get layer
@SyncGeoServerX.exception_handler
@app.command(help="Get layer in the Geoserver")
def layer(
@@ -388,15 +434,11 @@ def layer(
if request.value == "sync":
client = SyncGeoServerX(username, password, url)
result = client.get_layer(workspace, layer, detail).model_dump_json()
- if "code" in result:
- typer.secho(result, fg=typer.colors.RED)
- else:
- print(result)
+ print(result)
else:
- typer.echo("Async support will be shortly")
+ print("Async support will be shortly")
-# get layer groups
@SyncGeoServerX.exception_handler
@app.command(help="Get layer groups in the Geoserver")
def layer_groups(
@@ -414,12 +456,9 @@ def layer_groups(
if request.value == "sync":
client = SyncGeoServerX(username, password, url)
result = client.get_all_layer_groups(workspace).json()
- if "layerGroups" in result:
- typer.secho(result, fg=typer.colors.RED)
- else:
- print(result)
+ print(result)
else:
- typer.echo("Async support will be shortly")
+ print("Async support will be shortly")
# Get system status info
@@ -465,15 +504,11 @@ def geofence_rules(
if request.value == "sync":
client = SyncGeoServerX(username, password, url)
result = client.get_all_geofence_rules().model_dump_json()
- if "rules" in result:
- typer.secho(result, fg=typer.colors.RED)
- else:
- print(result)
+ print(result)
else:
- typer.echo("Async support will be shortly")
+ print("Async support will be shortly")
-# get geofence rule
@SyncGeoServerX.exception_handler
@app.command(help="Get geofence rule in the Geoserver")
def geofence_rule(
@@ -491,9 +526,6 @@ def geofence_rule(
if request.value == "sync":
client = SyncGeoServerX(username, password, url)
result = client.get_geofence_rule(id).model_dump_json()
- if "rule" in result:
- typer.secho(result, fg=typer.colors.RED)
- else:
- print(result)
+ print(result)
else:
- typer.echo("Async support will be shortly")
+ print("Async support will be shortly")
diff --git a/src/geoserverx/models/workspace.py b/src/geoserverx/models/workspace.py
index 9edd5de..6900d73 100644
--- a/src/geoserverx/models/workspace.py
+++ b/src/geoserverx/models/workspace.py
@@ -19,7 +19,7 @@ class WorkspacesModel(BaseModel):
class SingleWorkspace(BaseModel):
name: str = ...
isolated: bool = ...
- dateCreated: Optional[str]
+ dateCreated: Optional[str] = None
dataStores: str = ...
coverageStores: str = ...
wmsStores: str = ...
@@ -37,3 +37,12 @@ class NewWorkspaceInfo(BaseModel):
class NewWorkspace(BaseModel):
workspace: NewWorkspaceInfo = ...
+
+
+class UpdateWorkspaceInfo(BaseModel):
+ name: Optional[str] = None
+ isolated: Optional[bool] = None
+
+
+class UpdateWorkspace(BaseModel):
+ workspace: UpdateWorkspaceInfo = ...
diff --git a/tests/_async/test_gsx.py b/tests/_async/test_gsx.py
index b48d3b8..661fc15 100644
--- a/tests/_async/test_gsx.py
+++ b/tests/_async/test_gsx.py
@@ -7,6 +7,7 @@
from geoserverx._async.gsx import AsyncGeoServerX, GeoServerXAuth, GeoServerXError
from geoserverx.models.geofence import Rule
+from geoserverx.models.workspace import UpdateWorkspaceInfo
baseUrl = "http://127.0.0.1:8080/geoserver/rest/"
@@ -28,17 +29,6 @@ async def test_error():
assert True
-@pytest.mark.asyncio
-async def test_get_all_workspaces_validation(
- create_a_client, respx_mock, bad_workspaces_connection
-):
- respx_mock.get(f"{baseUrl}workspaces").mock(
- return_value=httpx.Response(404, json=bad_workspaces_connection)
- )
- response = await create_a_client.get_all_workspaces()
- assert response.code == 404
-
-
@pytest.mark.asyncio
async def test_get_all_workspaces_success(
create_a_client, respx_mock, good_workspaces_connection
@@ -60,14 +50,26 @@ async def test_get_all_workspaces_NetworkError(create_a_client, respx_mock):
# Test - get_workspace
@pytest_mark.anyio
+@pytest.mark.parametrize(
+ "workspace_name,status_code,response_data,expected_response",
+ [
+ ("sfsf", 404, {"error": "not found"}, "Result not found"),
+ ],
+)
async def test_get_workspace_validation(
- create_a_client, bad_workspace_connection, respx_mock
+ create_a_client,
+ respx_mock,
+ workspace_name,
+ status_code,
+ response_data,
+ expected_response,
):
- respx_mock.get(f"{baseUrl}workspaces/sfsf").mock(
- return_value=httpx.Response(404, json=bad_workspace_connection)
+ respx_mock.get(f"{baseUrl}workspaces/{workspace_name}").mock(
+ return_value=httpx.Response(status_code, json=response_data)
)
- response = await create_a_client.get_workspace("sfsf")
- assert response.response == "Result not found"
+
+ response = await create_a_client.get_workspace(workspace_name)
+ assert response.response == expected_response
@pytest_mark.anyio
@@ -89,6 +91,65 @@ async def test_get_workspace_ConnectError(create_a_client, respx_mock):
assert response.response == "Error in connecting to Geoserver"
+# Test - update_workspace
+@pytest_mark.anyio
+@pytest.mark.parametrize(
+ "status_code,response_data,expected_response",
+ [
+ (404, {"error": "not found"}, "Result not found"),
+ ],
+)
+async def test_update_workspace_validation(
+ create_a_client,
+ respx_mock,
+ status_code,
+ response_data,
+ expected_response,
+):
+ respx_mock.put(f"{baseUrl}workspaces/tiger.json").mock(
+ return_value=httpx.Response(status_code, json=response_data)
+ )
+ response = await create_a_client.update_workspace(
+ "tiger", UpdateWorkspaceInfo(isolated=True)
+ )
+ assert response.response == expected_response
+
+
+@pytest_mark.anyio
+@pytest_mark.parametrize(
+ "workspace_info,status_code,response_data",
+ [
+ (
+ UpdateWorkspaceInfo(isolated=True),
+ 200,
+ {"workspace": {"isolated": True}},
+ )
+ ],
+)
+async def test_update_workspace_success(
+ create_a_client,
+ respx_mock,
+ workspace_info,
+ status_code,
+ response_data,
+):
+ respx_mock.put(f"{baseUrl}workspaces/tiger.json").mock(
+ return_value=httpx.Response(status_code, json=response_data)
+ )
+ response = await create_a_client.update_workspace("tiger", workspace_info)
+ assert response.code == status_code
+
+
+@pytest_mark.anyio
+async def test_update_workspace_ConnectError(create_a_client, respx_mock):
+ respx.put(f"{baseUrl}workspaces/tiger.json").mock(side_effect=httpx.ConnectError)
+ with pytest.raises(httpx.ConnectError):
+ response = await create_a_client.update_workspace(
+ "tiger", UpdateWorkspaceInfo(isolated=True)
+ )
+ assert response.response == "Error in connecting to Geoserver"
+
+
# Test - get_vector_stores_in_workspaces
@pytest_mark.anyio
async def test_get_vector_stores_in_workspaces_validation(
@@ -292,7 +353,7 @@ async def test_get_style_ConnectError(create_a_client, respx_mock):
async def test_create_workspace_validation(
create_a_client, invalid_new_workspace_connection, respx_mock
):
- respx_mock.post(f"{baseUrl}workspaces?default=False").mock(
+ respx_mock.post(f"{baseUrl}workspaces", params={"default": False}).mock(
return_value=httpx.Response(404, json=invalid_new_workspace_connection)
)
response = await create_a_client.create_workspace("pydad", False, True)
@@ -303,7 +364,7 @@ async def test_create_workspace_validation(
async def test_create_workspace_success(
create_a_client, good_new_workspace_connection, respx_mock
):
- respx_mock.post(f"{baseUrl}workspaces?default=False").mock(
+ respx_mock.post(f"{baseUrl}workspaces", params={"default": False}).mock(
return_value=httpx.Response(201, json=good_new_workspace_connection)
)
response = await create_a_client.create_workspace("pydad", False, True)
@@ -312,7 +373,7 @@ async def test_create_workspace_success(
@pytest_mark.anyio
async def test_create_workspace_ConnectError(create_a_client, respx_mock):
- respx_mock.post(f"{baseUrl}workspaces?default=False").mock(
+ respx_mock.post(f"{baseUrl}workspaces", params={"default": False}).mock(
side_effect=httpx.ConnectError
)
with pytest.raises(httpx.ConnectError):
diff --git a/tests/_sync/test_gsx.py b/tests/_sync/test_gsx.py
index 783dc0c..f39fbdd 100644
--- a/tests/_sync/test_gsx.py
+++ b/tests/_sync/test_gsx.py
@@ -4,6 +4,7 @@
from geoserverx._sync.gsx import GeoServerXAuth, GeoServerXError, SyncGeoServerX
from geoserverx.models.geofence import Rule
+from geoserverx.models.workspace import UpdateWorkspaceInfo
baseUrl = "http://127.0.0.1:8080/geoserver/rest/"
@@ -24,17 +25,6 @@ def test_error():
assert True
-# Test - get_all_workspaces
-def test_get_all_workspaces_validation(
- client: SyncGeoServerX, bad_workspaces_connection, respx_mock
-):
- respx_mock.get(f"{baseUrl}workspaces").mock(
- return_value=httpx.Response(404, json=bad_workspaces_connection)
- )
- response = client.get_all_workspaces()
- assert response.code == 404
-
-
def test_get_all_workspaces_success(
client: SyncGeoServerX, good_workspaces_connection, respx_mock
):
@@ -52,14 +42,26 @@ def test_get_all_workspaces_NetworkError(client: SyncGeoServerX, respx_mock):
# Test - get_workspace
+@pytest_mark.parametrize(
+ "workspace_name,status_code,response_data,expected_response",
+ [
+ ("sfsf", 404, {"error": "not found"}, "Result not found"),
+ ],
+)
def test_get_workspace_validation(
- client: SyncGeoServerX, bad_workspace_connection, respx_mock
+ client: SyncGeoServerX,
+ respx_mock,
+ workspace_name,
+ status_code,
+ response_data,
+ expected_response,
):
- respx_mock.get(f"{baseUrl}workspaces/sfsf").mock(
- return_value=httpx.Response(404, json=bad_workspace_connection)
+ respx_mock.get(f"{baseUrl}workspaces/{workspace_name}").mock(
+ return_value=httpx.Response(status_code, json=response_data)
)
- response = client.get_workspace("sfsf")
- assert response.response == "Result not found"
+
+ response = client.get_workspace(workspace_name)
+ assert response.response == expected_response
def test_get_workspace_success(
@@ -78,6 +80,64 @@ def test_get_workspace_ConnectError(client: SyncGeoServerX, respx_mock):
assert response.response == "Error in connecting to Geoserver"
+# Test - update_workspace
+@pytest_mark.parametrize(
+ "workspace_name,status_code,response_data,expected_response",
+ [
+ ("tiger", 404, {"error": "not found"}, "Result not found"),
+ ],
+)
+def test_update_workspace_validation(
+ client: SyncGeoServerX,
+ respx_mock,
+ workspace_name,
+ status_code,
+ response_data,
+ expected_response,
+):
+ respx_mock.put(f"{baseUrl}workspaces/{workspace_name}.json").mock(
+ return_value=httpx.Response(status_code, json=response_data)
+ )
+ response = client.update_workspace(
+ workspace_name, UpdateWorkspaceInfo(isolated=True)
+ )
+ assert response.response == expected_response
+
+
+@pytest_mark.parametrize(
+ "workspace_name,workspace_info,status_code,response_data",
+ [
+ (
+ "tiger",
+ UpdateWorkspaceInfo(isolated=True),
+ 200,
+ {"workspace": {"isolated": True}},
+ )
+ ],
+)
+def test_update_workspace_success(
+ client: SyncGeoServerX,
+ respx_mock,
+ workspace_name,
+ workspace_info,
+ status_code,
+ response_data,
+):
+ respx_mock.put(f"{baseUrl}workspaces/{workspace_name}.json").mock(
+ return_value=httpx.Response(status_code, json=response_data)
+ )
+ response = client.update_workspace(workspace_name, workspace_info)
+ assert response.code == status_code
+
+
+def test_update_workspace_ConnectError(client: SyncGeoServerX, respx_mock):
+ respx_mock.put(f"{baseUrl}workspaces/tiger.json").mock(
+ side_effect=httpx.ConnectError
+ )
+ response = client.update_workspace("tiger", UpdateWorkspaceInfo(isolated=True))
+ assert response.response == "Error in connecting to Geoserver"
+
+
# Test - get_vector_stores_in_workspaces
def test_get_vector_stores_in_workspaces_validation(
client: SyncGeoServerX, invalid_datastores_model_connection, respx_mock
@@ -256,7 +316,7 @@ def test_get_style_ConnectError(client: SyncGeoServerX, respx_mock):
def test_create_workspace_validation(
client: SyncGeoServerX, invalid_new_workspace_connection, respx_mock
):
- respx_mock.post(f"{baseUrl}workspaces?default=False").mock(
+ respx_mock.post(f"{baseUrl}workspaces", params={"default": False}).mock(
return_value=httpx.Response(404, json=invalid_new_workspace_connection)
)
response = client.create_workspace("pydad", False, True)
@@ -266,7 +326,7 @@ def test_create_workspace_validation(
def test_create_workspace_success(
client: SyncGeoServerX, good_new_workspace_connection, respx_mock
):
- respx_mock.post(f"{baseUrl}workspaces?default=False").mock(
+ respx_mock.post(f"{baseUrl}workspaces", params={"default": False}).mock(
return_value=httpx.Response(201, json=good_new_workspace_connection)
)
response = client.create_workspace("pydad", False, True)
@@ -274,7 +334,7 @@ def test_create_workspace_success(
def test_create_workspace_ConnectError(client: SyncGeoServerX, respx_mock):
- respx_mock.post(f"{baseUrl}workspaces?default=False").mock(
+ respx_mock.post(f"{baseUrl}workspaces", params={"default": False}).mock(
side_effect=httpx.ConnectError
)
response = client.create_workspace("pydad", False, True)
diff --git a/tests/cli/test_cli.py b/tests/cli/test_cli.py
index 1e22a09..ac21f3a 100644
--- a/tests/cli/test_cli.py
+++ b/tests/cli/test_cli.py
@@ -1,4 +1,5 @@
import httpx
+import pytest
from typer.testing import CliRunner
from geoserverx.cli.cli import app
@@ -8,49 +9,121 @@
baseUrl = "http://127.0.0.1:8080/geoserver/rest/"
-# Test - get_all_workspaces
-def test_get_all_workspaces_validation(bad_workspaces_connection, respx_mock):
+# Test - workspaces
+def test_get_all_workspaces_success(respx_mock):
+ """Test getting all workspaces"""
+ # Test data
+ workspace_response = {
+ "workspaces": {
+ "workspace": [
+ {
+ "name": "aa",
+ "href": "http://127.0.0.1:8080/geoserver/rest/workspaces/aa.json",
+ },
+ {
+ "name": "aaba",
+ "href": "http://127.0.0.1:8080/geoserver/rest/workspaces/aaba.json",
+ },
+ ]
+ }
+ }
+ # Mock the response
respx_mock.get(f"{baseUrl}workspaces").mock(
- return_value=httpx.Response(404, json=bad_workspaces_connection)
+ return_value=httpx.Response(200, json=workspace_response)
)
- result = runner.invoke(app, ["workspaces"])
- assert "404" in result.stdout
-
-
-def test_get_all_workspaces_success(good_workspaces_connection, respx_mock):
- respx_mock.get(f"{baseUrl}workspaces").mock(
- return_value=httpx.Response(200, json=good_workspaces_connection)
- )
- result = runner.invoke(app, ["workspaces"])
- assert "pydad" in result.stdout
-
-
-def test_get_all_workspaces_NetworkError(respx_mock):
- respx_mock.get(f"{baseUrl}workspaces").mock(side_effect=httpx.ConnectError)
- result = runner.invoke(app, ["workspaces"])
- assert "Error in connecting to Geoserver" in result.stdout
+ # Invoke the command
+ result = runner.invoke(app, ["workspaces", "--output", "json"])
+ # Assertions
+ assert result.exit_code == 0
+ for workspace in workspace_response["workspaces"]["workspace"]:
+ assert workspace["name"] in result.output
# Test - get_workspace
-def test_get_workspace_validation(bad_workspace_connection, respx_mock):
- respx_mock.get(f"{baseUrl}workspaces/sfsf").mock(
- return_value=httpx.Response(404, json=bad_workspace_connection)
+@pytest.mark.parametrize(
+ "workspace_name,status_code,response_data,expected_response",
+ [
+ (
+ "sfsf",
+ 404,
+ {"code": 404, "response": "Result not found"},
+ "Result not found",
+ ),
+ ],
+)
+def test_get_workspace_validation(
+ workspace_name, status_code, response_data, expected_response, respx_mock
+):
+ respx_mock.get(f"{baseUrl}workspaces/{workspace_name}").mock(
+ return_value=httpx.Response(status_code, json=response_data)
)
- result = runner.invoke(app, ["workspace", "--workspace", "sfsf"])
- assert "Result not found" in result.stdout
+ result = runner.invoke(app, ["workspace", "sfsf"])
+ assert expected_response in result.stdout
def test_get_workspace_success(good_workspace_connection, respx_mock):
respx_mock.get(f"{baseUrl}workspaces/pydad").mock(
return_value=httpx.Response(200, json=good_workspace_connection)
)
- result = runner.invoke(app, ["workspace", "--workspace", "pydad"])
+ result = runner.invoke(app, ["workspace", "pydad"])
assert "pydad" in result.stdout
def test_get_workspace_ConnectError(respx_mock):
respx_mock.get(f"{baseUrl}workspaces/pydad").mock(side_effect=httpx.ConnectError)
- result = runner.invoke(app, ["workspace", "--workspace", "pydad"])
+ result = runner.invoke(app, ["workspace", "pydad"])
+ assert "Error in connecting to Geoserver" in result.stdout
+
+
+# Test - update_workspace
+@pytest.mark.parametrize(
+ "workspace_name,status_code,response_data,expected_response",
+ [
+ (
+ "tiger",
+ 404,
+ {"code": 404, "response": "Result not found"},
+ "Result not found",
+ ),
+ ],
+)
+def test_update_workspace_validation(
+ respx_mock, workspace_name, status_code, response_data, expected_response
+):
+ respx_mock.put(f"{baseUrl}workspaces/{workspace_name}.json").mock(
+ return_value=httpx.Response(status_code, json=response_data)
+ )
+ result = runner.invoke(app, ["update-workspace", workspace_name, "--isolated"])
+ assert expected_response in result.stdout
+
+
+@pytest.mark.parametrize(
+ "workspace_name,workspace_info,status_code,response_data",
+ [
+ (
+ "tiger",
+ "--isolated",
+ 200,
+ "Executed successfully",
+ )
+ ],
+)
+def test_update_workspace_success(
+ respx_mock, workspace_name, workspace_info, status_code, response_data
+):
+ respx_mock.put(f"{baseUrl}workspaces/{workspace_name}.json").mock(
+ return_value=httpx.Response(status_code, json=response_data)
+ )
+ print(workspace_info)
+ result = runner.invoke(app, ["update-workspace", workspace_name, workspace_info])
+ assert response_data in result.stdout
+
+
+def test_update_workspace_ConnectError(respx_mock):
+ respx_mock.put(f"{baseUrl}workspaces/tiger.json").mock(
+ side_effect=httpx.ConnectError
+ )
+ result = runner.invoke(app, ["update-workspace", "tiger", "--isolated"])
assert "Error in connecting to Geoserver" in result.stdout
@@ -224,32 +297,32 @@ def test_get_style_ConnectError(respx_mock):
# Test - create_workspace
def test_create_workspace_validation(invalid_new_workspace_connection, respx_mock):
- respx_mock.post(f"{baseUrl}workspaces?default=False").mock(
+ respx_mock.post(f"{baseUrl}workspaces", params={"default": False}).mock(
return_value=httpx.Response(404, json=invalid_new_workspace_connection)
)
result = runner.invoke(
app,
- ["create-workspace", "--workspace", "burg", "--no-default", "--no-isolated"],
+ ["create-workspace", "burg", "--no-default", "--no-isolated"],
)
assert "Result not found" in result.stdout
def test_create_workspace_success(good_new_workspace_connection, respx_mock):
- respx_mock.post(f"{baseUrl}workspaces?default=False").mock(
+ respx_mock.post(f"{baseUrl}workspaces", params={"default": False}).mock(
return_value=httpx.Response(201, json=good_new_workspace_connection)
)
result = runner.invoke(
- app, ["create-workspace", "--workspace", "pydad", "--no-default", "--isolated"]
+ app, ["create-workspace", "pydad", "--no-default", "--isolated"]
)
assert "Data added successfully" in result.stdout
def test_create_workspace_ConnectError(respx_mock):
- respx_mock.post(f"{baseUrl}workspaces?default=False").mock(
+ respx_mock.post(f"{baseUrl}workspaces", params={"default": False}).mock(
side_effect=httpx.ConnectError
)
result = runner.invoke(
- app, ["create-workspace", "--workspace", "pydad", "--no-default", "--isolated"]
+ app, ["create-workspace", "pydad", "--no-default", "--isolated"]
)
assert "Error in connecting to Geoserver" in result.stdout
diff --git a/tests/conftest.py b/tests/conftest.py
index fef7ee2..7051a62 100644
--- a/tests/conftest.py
+++ b/tests/conftest.py
@@ -24,12 +24,6 @@ def good_workspaces_connection() -> dict:
return item
-@pytest.fixture
-def bad_workspaces_connection() -> dict:
- item = {"code": 502}
- return item
-
-
@pytest.fixture
def good_workspace_connection() -> dict:
item = {
@@ -47,17 +41,11 @@ def good_workspace_connection() -> dict:
@pytest.fixture
-def bad_workspace_connection() -> dict:
+def invalid_update_workspace_connection() -> dict:
item = {"code": 404, "response": "Result not found"}
return item
-@pytest.fixture
-def networkbad_workspace_connection() -> dict:
- item = {"code": 503, "response": "Geoserver unavailable"}
- return item
-
-
@pytest.fixture
def good_datastore_in_bulk_connection() -> dict:
item = {"name": "just", "href": "https://www.linkedin.com/notifications/"}