From 7b15110e62e5e7d94b57ad5e9b16e9ffbe5d4f58 Mon Sep 17 00:00:00 2001 From: Dave Eargle Date: Fri, 1 Oct 2021 15:55:21 -0600 Subject: [PATCH 1/4] initial commit First pass at extracting just the API changes from #524 --- psiturk/api/__init__.py | 188 +++++++++++++++++++++++++++++++++++- psiturk/models.py | 13 +++ psiturk/psiturk_statuses.py | 14 +++ 3 files changed, 210 insertions(+), 5 deletions(-) diff --git a/psiturk/api/__init__.py b/psiturk/api/__init__.py index bc90d6d24..82aef589e 100644 --- a/psiturk/api/__init__.py +++ b/psiturk/api/__init__.py @@ -1,10 +1,11 @@ from __future__ import generator_stop -from flask import Blueprint, jsonify, make_response, request, current_app as app +from flask import Blueprint, jsonify, make_response, request, session, current_app as app, send_file from flask.json import JSONEncoder from flask_restful import Api, Resource from psiturk.dashboard import login_required -from psiturk.services_manager import psiturk_services_manager as services_manager -from psiturk.models import Participant, Campaign +from psiturk.services_manager import SESSION_SERVICES_MANAGER_MODE_KEY, \ + psiturk_services_manager as services_manager +from psiturk.models import Participant, Campaign, Hit from psiturk.experiment import app from psiturk.psiturk_exceptions import * from psiturk.amt_services_wrapper import WrapperResponse @@ -15,6 +16,9 @@ from apscheduler.triggers.base import BaseTrigger import datetime import pytz +import json +from io import BytesIO +import zipfile from pytz.tzinfo import BaseTzInfo api_blueprint = Blueprint('api', __name__, url_prefix='/api') @@ -118,13 +122,134 @@ def get(self, assignment_id=None): class AssignmentsAction(Resource): + + + # GET: For downloading data + def get(self, action): + """ + """ + + # ACTION: Download data + # assignment: the assignment id for which to download data + if action == 'datadownload': + assignmentid = request.args.get('assignmentid') + fields = ['question_data', 'trial_data', 'event_data'] + participant = Participant.query.filter(Participant.assignmentid == assignmentid).first() + mem_file = BytesIO() + with zipfile.ZipFile(mem_file, 'w', compression = zipfile.ZIP_DEFLATED) as zf: + write_data_to_zip(participant, fields, zf) + mem_file.seek(0) + return send_file(mem_file, attachment_filename = participant.workerid + '_data.zip', as_attachment=True) + + def post(self, action=None): - data = request.json - if action == 'approve_all': + """ + """ + + # ACTION: Approve + # assignments: a list of assignment ids to approve + # all_studies: approve in mturk even if not in local db? + if action == 'approve': + assignments = request.json['assignments'] + all_studies = request.json['all_studies'] + _return = [] + for assignment in assignments: + try: + response = services_manager.amt_services_wrapper \ + .approve_assignment_by_assignment_id(assignment, all_studies) + _return.append({ + "assignment": assignment, + "success": response.status == "success", + "message": str(response)}) + except Exception as e: + _return.append({ + "assignment": assignment, + "success": False, + "message": str(e)}) + return _return + + # ACTION: Reject + # assignments: a list of assignment ids to reject + # all_studies: reject in mturk even if not in local db? + elif action == 'reject': + assignments = request.json['assignments'] + all_studies = request.json['all_studies'] + _return = [] + for assignment in assignments: + try: + response = services_manager.amt_services_wrapper \ + .reject_assignment(assignment, all_studies) + _return.append({ + "assignment": assignment, + "success": response.status == 'success', + "message": str(response)}) + except Exception as e: + _return.append({ + "assignment": assignment, + "success": False, + "message": str(e)}) + return _return + + # ACTION: Bonus + # assignments: a list of assignment ids to bonus + # all_studies: bonus in mturk even if not in local db? + # amount: a float value to bonus, or "auto" for auto-bonusing from local + # reason: a string reason to send to the worker + elif action == 'bonus': + assignments = request.json['assignments'] + all_studies = request.json['all_studies'] + amount = request.json['amount'] + reason = request.json['reason'] + _return = [] + for assignment in assignments: + try: + resp = services_manager.amt_services_wrapper \ + .bonus_assignment_for_assignment_id( + assignment, amount, reason, all_studies) + _return.append({ + "assignment": assignment, + "success": resp.status == 'success', + "message": str(resp)}) + except Exception as e: + _return.append({ + "assignment": assignment, + "success": False, + "message": str(e)}) + return _return + + # ACTION: Data + # assignments: a list of assignment ids to retrieve data for + elif action == 'data': + assignments = request.json['assignments'] + _return = {} + for assignment_id in assignments: + p = Participant.query.filter_by(assignmentid=assignment_id).first() + q_data = json.loads(p.datastring)["questiondata"] + e_data = json.loads(p.datastring)["eventdata"] + t_data = json.loads(p.datastring)["data"] + jsonData = { + 'question_data': [{ + 'questionname': q, + 'response': json.dumps(q_data[q])} for q in q_data], + 'event_data': [{ + 'eventtype': e['eventtype'], + 'interval': e['interval'], + 'value': e['value'], + 'timestamp': e['timestamp']} for e in e_data], + 'trial_data': [{ + 'current_trial': t['current_trial'], + 'dateTime': t['dateTime'], + 'trialdata': json.dumps(t['trialdata'])} for t in t_data] + } + _return[assignment_id] = jsonData + return _return + + elif action == 'approve_all': response = services_manager.amt_services_wrapper.approve_all_assignments() if not response.success: raise response.exception return response.data['results'] + elif action == 'bonus_all': if 'reason' not in data or not data['reason']: raise APIException(message='bonus reason is missing!') @@ -134,6 +259,7 @@ def post(self, action=None): if not response.success: raise response.exception return response.data['results'], 201 + else: raise APIException(message='action `{}` not recognized!'.format(action)) @@ -278,19 +404,71 @@ def post(self): raise APIException(message='task name `{}` not recognized!'.format(data['name'])) +class WorkerList(Resource): + # POST: Returns the full list of workers from the local database + # codeversion: the codeversion for which to retrieve workers for + def post(self): + worker_ids = request.json['worker_ids'] + codeversion = request.json['codeversion'] + query = Participant.query + if len(worker_ids) > 0: + query = query.filter(Participant.workerid.in_(worker_ids)) + if codeversion: + query = query.filter(Participant.codeversion == codeversion) + _return = query.all() + return [p.toAPIData() for p in _return] + + +# --------------------------- DATA WRITING HELPERS --------------------------- # + +# Writes data to an open zipfile +def write_data_to_zip(participant, fields, zf, prefix=''): + for field in fields: + output = get_datafile(participant, field) + zf.writestr(prefix + field + '.csv', output) + +def get_datafile(participant, datatype): + contents = { + "trial_data": { + "function": lambda p: p.get_trial_data(), + "headerline": "uniqueid,currenttrial,time,trialData\n" + }, + "event_data": { + "function": lambda p: p.get_event_data(), + "headerline": "uniqueid,eventtype,interval,value,time\n" + }, + "question_data": { + "function": lambda p: p.get_question_data(), + "headerline": "uniqueid,questionname,response\n" + }, + } + ret = contents[datatype]["headerline"] + contents[datatype]["function"](participant) + return ret + + +# ------------------------------ RESOURCE ADDING ----------------------------- # + +# Services Manager api.add_resource(ServicesManager, '/services_manager', '/services_manager/') +# Assignments api.add_resource(AssignmentList, '/assignments', '/assignments/') api.add_resource(AssignmentsAction, '/assignments/action/') +# Hits api.add_resource(Hits, '/hit/') api.add_resource(HitList, '/hits/', '/hits/') api.add_resource(HitsAction, '/hits/action/') +# Campaigns api.add_resource(CampaignList, '/campaigns', '/campaigns/') api.add_resource(Campaigns, '/campaigns/') +# Tasks api.add_resource(TaskList, '/tasks', '/tasks/') api.add_resource(Tasks, '/tasks/') +# Workers +api.add_resource(WorkerList, '/workers', '/workers/') + api.init_app(api_blueprint) diff --git a/psiturk/models.py b/psiturk/models.py index f6707d96a..e7f2e5aa3 100644 --- a/psiturk/models.py +++ b/psiturk/models.py @@ -7,6 +7,7 @@ from sqlalchemy.orm import validates, deferred from sqlalchemy.ext.declarative import declarative_base from psiturk.db import db_session +from psiturk.psiturk_statuses import PSITURK_STATUS_CODES from .psiturk_config import PsiturkConfig from typing import List from itertools import groupby @@ -84,6 +85,18 @@ def __repr__(self): self.status, self.codeversion) + def toAPIData(self): + return { + 'hitId': self.hitid, + 'assignmentId': self.assignmentid, + 'workerId': self.workerid, + 'submit_time': self.endhit, + 'accept_time': self.beginhit, + 'status': PSITURK_STATUS_CODES[self.status], + 'codeversion': self.codeversion, + 'bonus': self.bonus + } + def get_trial_data(self): try: trialdata = json.loads(self.datastring)["data"] diff --git a/psiturk/psiturk_statuses.py b/psiturk/psiturk_statuses.py index ad656b0f2..fcf1bec43 100644 --- a/psiturk/psiturk_statuses.py +++ b/psiturk/psiturk_statuses.py @@ -7,3 +7,17 @@ CREDITED = 5 QUITEARLY = 6 BONUSED = 7 +REJECTED = 8 + +# Back-reference +PSITURK_STATUS_CODES = [ + 'Not Accepted', + 'Allocated', + 'Started', + 'Completed', + 'Submitted', + 'Credited', + 'Quit Early', + 'Bonused', + 'Rejected' +] From 4d98862abee818abcc29cbacc771fce1eb46c1e3 Mon Sep 17 00:00:00 2001 From: Dave Eargle Date: Tue, 5 Oct 2021 00:54:42 -0600 Subject: [PATCH 2/4] fix WorkerList WorkerList shouldn't accept a list of worker ids to filter on -- create a separate resource Worker to get a single worker --- psiturk/api/__init__.py | 39 ++++++++++++++++++++++++--------------- 1 file changed, 24 insertions(+), 15 deletions(-) diff --git a/psiturk/api/__init__.py b/psiturk/api/__init__.py index 82aef589e..5580f42c2 100644 --- a/psiturk/api/__init__.py +++ b/psiturk/api/__init__.py @@ -264,7 +264,7 @@ def post(self, action=None): raise APIException(message='action `{}` not recognized!'.format(action)) -class Hits(Resource): +class HitResource(Resource): def patch(self, hit_id): data = request.json if 'is_expired' in data and data['is_expired']: @@ -325,7 +325,7 @@ def get(self, action=None): raise APIException(message='action `{}` not recognized!'.format(action)) -class Campaigns(Resource): +class CampaignResource(Resource): def get(self, campaign_id): campaign = Campaign.query.filter(Campaign.id == campaign_id).one() return campaign @@ -370,7 +370,7 @@ def post(self): -class Tasks(Resource): +class TaskResource(Resource): def delete(self, task_id): app.apscheduler.remove_job(str(task_id)) return '', 204 @@ -404,15 +404,23 @@ def post(self): raise APIException(message='task name `{}` not recognized!'.format(data['name'])) +class WorkerResource(Resource): + def get(self, worker_id): + p = Participant.query.filter(Participant.workerid == worker_id).one() + return p.toAPIData() + + class WorkerList(Resource): - # POST: Returns the full list of workers from the local database - # codeversion: the codeversion for which to retrieve workers for - def post(self): - worker_ids = request.json['worker_ids'] - codeversion = request.json['codeversion'] + + def get(self, codeversion=None): + ''' + Returns workers from the local database + + codeversion: + the codeversion on which to filter retrieved workers + ''' + query = Participant.query - if len(worker_ids) > 0: - query = query.filter(Participant.workerid.in_(worker_ids)) if codeversion: query = query.filter(Participant.codeversion == codeversion) _return = query.all() @@ -456,19 +464,20 @@ def get_datafile(participant, datatype): api.add_resource(AssignmentsAction, '/assignments/action/') # Hits -api.add_resource(Hits, '/hit/') -api.add_resource(HitList, '/hits/', '/hits/') +api.add_resource(HitResource, '/hit/') +api.add_resource(HitList, '/hits/', '/hits/status/') api.add_resource(HitsAction, '/hits/action/') # Campaigns +api.add_resource(CampaignResource, '/campaign/') api.add_resource(CampaignList, '/campaigns', '/campaigns/') -api.add_resource(Campaigns, '/campaigns/') # Tasks +api.add_resource(TaskResource, '/task/') api.add_resource(TaskList, '/tasks', '/tasks/') -api.add_resource(Tasks, '/tasks/') # Workers -api.add_resource(WorkerList, '/workers', '/workers/') +api.add_resource(WorkerList, '/workers', '/workers/', '/workers/codeversion/') +api.add_resource(WorkerResource, '/worker/') api.init_app(api_blueprint) From 1c5c0edd1a5f935805aa2b5281f27e4d8f61c59b Mon Sep 17 00:00:00 2001 From: Dave Eargle Date: Tue, 5 Oct 2021 00:56:47 -0600 Subject: [PATCH 3/4] remove unused session imports --- psiturk/api/__init__.py | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/psiturk/api/__init__.py b/psiturk/api/__init__.py index 5580f42c2..ff3d39b14 100644 --- a/psiturk/api/__init__.py +++ b/psiturk/api/__init__.py @@ -1,10 +1,9 @@ from __future__ import generator_stop -from flask import Blueprint, jsonify, make_response, request, session, current_app as app, send_file +from flask import Blueprint, jsonify, make_response, request, current_app as app, send_file from flask.json import JSONEncoder from flask_restful import Api, Resource from psiturk.dashboard import login_required -from psiturk.services_manager import SESSION_SERVICES_MANAGER_MODE_KEY, \ - psiturk_services_manager as services_manager +from psiturk.services_manager import psiturk_services_manager as services_manager from psiturk.models import Participant, Campaign, Hit from psiturk.experiment import app from psiturk.psiturk_exceptions import * From 0bc175a37fa30c60e0972d0490bc7b7bc7317ffb Mon Sep 17 00:00:00 2001 From: Dave Eargle Date: Tue, 5 Oct 2021 01:16:51 -0600 Subject: [PATCH 4/4] switch to request.args instead of url routed --- psiturk/api/__init__.py | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/psiturk/api/__init__.py b/psiturk/api/__init__.py index ff3d39b14..c2a6fadf0 100644 --- a/psiturk/api/__init__.py +++ b/psiturk/api/__init__.py @@ -285,7 +285,8 @@ def delete(self, hit_id): class HitList(Resource): - def get(self, status=None): + def get(self): + status = request.args.get('status') if status == 'active': hits = services_manager.amt_services_wrapper.get_active_hits().data else: @@ -411,14 +412,14 @@ def get(self, worker_id): class WorkerList(Resource): - def get(self, codeversion=None): + def get(self): ''' Returns workers from the local database codeversion: the codeversion on which to filter retrieved workers ''' - + codeversion = request.args.get('codeversion') query = Participant.query if codeversion: query = query.filter(Participant.codeversion == codeversion) @@ -464,7 +465,7 @@ def get_datafile(participant, datatype): # Hits api.add_resource(HitResource, '/hit/') -api.add_resource(HitList, '/hits/', '/hits/status/') +api.add_resource(HitList, '/hits', '/hits/') api.add_resource(HitsAction, '/hits/action/') # Campaigns