Commit 0e15ac0c authored by Noe Nieto's avatar Noe Nieto 💬
Browse files

Merge remote-tracking branch 'origin/liberty/host/middleware#88'

parents 2d63350c a4d49ea9
Pipeline #50466 failed with stage
......@@ -5,57 +5,12 @@ from django.utils.translation import ugettext_lazy as _
from django.conf import settings
from captcha.fields import CaptchaField
import requests
from purist.models import User
from middleware.common import get_woo_connection
from .models import get_jwt_token, get_wc_userid
shop_hostname = urlparse(settings.WOO_URL).netloc
def get_jwt_token(user, password):
jwt_url = f'{settings.WOO_URL}/wp-json/jwt-auth/v1/token'
with requests.Session() as s:
r1 = s.post(
jwt_url, data={'username': user, 'password': password},
headers={'Content-Type': 'application/x-www-form-urlencoded'}
)
if r1.status_code == 200:
jwt_token = r1.json()['token']
r2 = s.post(f'{jwt_url}/validate', headers={'Authorization': f'Bearer {jwt_token}'})
if r2.status_code == 200 and r2.json()['code'] == 'jwt_auth_valid_token':
return jwt_token
return None
def get_wc_userid(jwt_token):
with requests.Session() as s:
r = s.get(
f'{settings.WOO_URL}/wp-json/wp/v2/users/me?_fields[]=id',
headers={'Authorization': f'Bearer {jwt_token}'}
)
if r.status_code == 200:
usr_info = r.json()
return usr_info.get('id', None)
return None
def update_wc_username(new_username, jwt_token):
"""Returns True for success, False for failure"""
with requests.Session() as s:
r = s.put(
f'{settings.WOO_URL}/wp-json/wp/v2/users/me',
data={
'username': new_username,
'name': new_username,
'nickname': new_username,
'slug': new_username
},
headers={'Authorization': f'Bearer {jwt_token}'}
)
return r.status_code == 200 and r.json()['foo'] == 'whatever'
return False
class WC_UpgradeForm(forms.Form):
wc_email = forms.EmailField(
......@@ -102,9 +57,9 @@ class WC_UpgradeForm(forms.Form):
shop_url = settings.WOO_URL
if {'wc_email', 'wc_password'} <= data.keys():
jwt_token = get_jwt_token(data['wc_email'], data['wc_password'])
userid = get_wc_userid(jwt_token)
self.wc_user_id = userid
self.wc_jwt_token = get_jwt_token(data['wc_email'], data['wc_password'])
self.wc_user_id = userid = get_wc_userid(self.wc_jwt_token)
if not userid:
self.add_error('wc_email', 'This field is required')
self.add_error('wc_password', 'This field is required')
......
from django.db import models
from django.conf import settings
# Create your models here.
import requests
from middleware.common import get_woo_connection
def get_jwt_token(user, password):
jwt_url = f'{settings.WOO_URL}/wp-json/jwt-auth/v1/token'
with requests.Session() as s:
r1 = s.post(
jwt_url, data={'username': user, 'password': password},
headers={'Content-Type': 'application/x-www-form-urlencoded'}
)
if r1.status_code == 200:
jwt_token = r1.json()['token']
r2 = s.post(f'{jwt_url}/validate', headers={'Authorization': f'Bearer {jwt_token}'})
if r2.status_code == 200 and r2.json()['code'] == 'jwt_auth_valid_token':
return jwt_token
return None
def get_wc_userid(jwt_token):
with requests.Session() as s:
r = s.get(
f'{settings.WOO_URL}/wp-json/wp/v2/users/me?_fields[]=id',
headers={'Authorization': f'Bearer {jwt_token}'}
)
if r.status_code == 200:
usr_info = r.json()
return usr_info.get('id', None)
return None
def update_wc_username(wc_userid, username, email):
"""
Returns True for success, False for failure
Note: WP can't update username and email using wp-json/wp/v2/users/me, so
we need to use
"""
wc_api = get_woo_connection()
wc_api.version = 'wp/v2'
res = wc_api.post(
endpoint=f'users/{wc_userid}',
data={
'ldh_username': username,
'name': username,
'nickname': username,
'slug': username
}
)
return res
import json
import functools
from django.conf import settings
from django.core import management
......@@ -112,13 +113,24 @@ class SaneLDAPTestCase(BaseLDAPTestCase):
management.call_command('makemigrations', dry_run=True)
def print_call_arguments(func):
def log_call(*args, **kwargs):
if len(args) == 3:
_, split_result, request = args
else:
split_result, request = args
print(f'[{request.method}]\t{split_result.netloc}\t{split_result.path} | {func.__name__}()')
return func(*args, **kwargs)
return log_call
class UpgradeFromWoocommerceTestCase(BaseLDAPTestCase, BaseWooCommerceTestCase):
directory = dict([groups, foogroup, bargroup, wizgroup, people, foouser])
@print_call_arguments
@urlmatch(method='POST', path='/wp-json/jwt-auth/v1/token')
def post_jwt_token(self, split_result, request):
self._api_hits += 1
print(f'[{request.method}]\t{split_result.netloc}\t{split_result.path} | post_jwt_token()')
return {
'status_code': 200,
'content': json.dumps({
......@@ -129,10 +141,10 @@ class UpgradeFromWoocommerceTestCase(BaseLDAPTestCase, BaseWooCommerceTestCase):
}),
}
@print_call_arguments
@urlmatch(method='POST', path='/wp-json/jwt-auth/v1/token')
def post_jwt_token_fail(self, split_result, request):
self._api_hits += 1
print(f'[{request.method}]\t{split_result.netloc}\t{split_result.path} | post_jwt_token_fail()')
return {
'status_code': 403,
'content': json.dumps({
......@@ -144,10 +156,10 @@ class UpgradeFromWoocommerceTestCase(BaseLDAPTestCase, BaseWooCommerceTestCase):
}),
}
@print_call_arguments
@urlmatch(method='POST', path='/wp-json/jwt-auth/v1/token/validate')
def post_jwt_token_validate(self, split_result, request):
self._api_hits += 1
print(f'[{request.method}]\t{split_result.netloc}\t{split_result.path} | post_jwt_token_validate()')
return {
'status_code': 200,
'content': json.dumps({
......@@ -156,10 +168,10 @@ class UpgradeFromWoocommerceTestCase(BaseLDAPTestCase, BaseWooCommerceTestCase):
}),
}
@print_call_arguments
@urlmatch(method='GET', path='/wp-json/wp/v2/users/me')
def get_wp_me(self, split_result, request):
self._api_hits += 1
print(f'[{request.method}]\t{split_result.netloc}\t{split_result.path} | get_wp_me()')
return {
'status_code': 200,
'content': json.dumps({'id': 97})
......@@ -167,7 +179,7 @@ class UpgradeFromWoocommerceTestCase(BaseLDAPTestCase, BaseWooCommerceTestCase):
def test_form_valid(self):
mock_fns = (
self.post_jwt_token_validate, self.post_jwt_token, self.get_wp_me
self.post_jwt_token_validate, self.post_jwt_token, self.get_wp_me,
)
with HTTMock(*mock_fns):
form = WC_UpgradeForm(data={
......@@ -195,7 +207,7 @@ class UpgradeFromWoocommerceTestCase(BaseLDAPTestCase, BaseWooCommerceTestCase):
self.assertFalse(form.is_valid())
self.assertEqual(self._api_hits, 0)
mock_fns = (self.post_jwt_token_fail,)
mock_fns = (self.post_jwt_token_fail, )
with HTTMock(*mock_fns):
form = WC_UpgradeForm(data={
'wc_email': 'fulano@example.com',
......@@ -213,7 +225,7 @@ class UpgradeFromWoocommerceTestCase(BaseLDAPTestCase, BaseWooCommerceTestCase):
self.assertEqual(resp.status_code, 200)
self.assertContains(resp, 'Shop address')
self.assertContains(resp, 'Shop password')
self.assertContains(resp, 'Your new Awesome Site user name')
self.assertContains(resp, 'Your new Title user name')
self.assertContains(resp, 'Please solve this math problem')
def test_form_post_success(self):
......@@ -229,46 +241,51 @@ class UpgradeFromWoocommerceTestCase(BaseLDAPTestCase, BaseWooCommerceTestCase):
]
self._password_updated = False
@print_call_arguments
@urlmatch(method='GET', path='/wp-json/wc/v1/customers')
def wc_get_customers(urlsplit, request):
"""
Woocommerce says: "I only have one user"
"""
self._api_hits += 1
print(f'[{request.method}]\t{urlsplit.netloc}\t{urlsplit.path} | wc_get_customers()')
return {
'status_code': 200,
'content': json.dumps(self._users_on_wc),
}
@print_call_arguments
@urlmatch(path='/wp-json/wc/v1/customers')
def wc_update_password(urlsplit, request):
"""
The customer's password is updated when the user model is being created
"""
self._api_hits += 1
print(f'[{request.method}]\t{urlsplit.netloc}\t{urlsplit.path} | wc_update_password()')
self._password_updated = True
return {
'status_code': 200,
'content': '{}'
}
@all_requests
def wc_match_all(urlsplit, request):
@print_call_arguments
@urlmatch(method='PUT', path='/wp-json/wp/v2/users/me')
def wc_update_username(urlsplit, request):
"""
Just say "OK" to anything else
The customer's password is updated when the user model is being created
"""
self._api_hits += 1
print(f'[{request.method}]\t{urlsplit.netloc}\t{urlsplit.path} | wc_match_all()')
return {'status_code': 200}
self._password_updated = True
return {
'status_code': 200,
'content': '{}'
}
mock_fns = (
self.post_jwt_token_validate,
self.post_jwt_token,
self.post_jwt_token,
self.get_wp_me,
wc_get_customers,
wc_update_password,
wc_update_username,
)
with HTTMock(*mock_fns):
# ### Given there is an existing account
......
......@@ -5,7 +5,7 @@ from django.views.generic.edit import FormView
from purist.models import AccountType, User
from .forms import WC_UpgradeForm
from .models import update_wc_username
class WC_UpgradeView(FormView):
template_name = 'registration/registration_form.html'
......@@ -21,4 +21,11 @@ class WC_UpgradeView(FormView):
)
user.account_type = AccountType.BASIC
user.save()
update_wc_username(
wc_userid=form.wc_user_id,
username=form.cleaned_data['l1_username'],
email=form.cleaned_data['wc_email'],
jwt_token=form.wc_jwt_token
)
return redirect(self.success_url)
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment