d149e5b8 |
#
# Copyright (c) 2011 Edward Langley
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions
# are met:
#
# Redistributions of source code must retain the above copyright notice,
# this list of conditions and the following disclaimer.
#
# Redistributions in binary form must reproduce the above copyright
# notice, this list of conditions and the following disclaimer in the
# documentation and/or other materials provided with the distribution.
#
# Neither the name of the project's author nor the names of its
# contributors may be used to endorse or promote products derived from
# this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
# FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
# HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED
# TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
# PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
# LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
# NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
# SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
#
#
|
7d338113 |
import functools
|
d149e5b8 |
from twisted.trial import unittest
import StringIO
import mock
import jsonrpc.server
import jsonrpc.jsonutil
from twisted.web.test.test_web import DummyRequest
|
cc11cbe4 |
from twisted.internet.defer import succeed, DeferredList
|
d149e5b8 |
from twisted.web.static import server
def _render(resource, request):
result = resource.render(request)
if isinstance(result, str):
request.write(result)
|
7d338113 |
equest.finish()
|
d149e5b8 |
return succeed(None)
elif result is server.NOT_DONE_YET:
if request.finished:
return succeed(None)
else:
return request.notifyFinish()
else:
raise ValueError("Unexpected return value: %r" % (result,))
class SimpleEventHandler(jsonrpc.server.ServerEvents):
|
09f3cd17 |
def log(self, result, request, error=False): pass
|
d149e5b8 |
|
7d338113 |
def findmethod(self, method, *_, **__):
|
d149e5b8 |
if method in set(['echo', 'add']):
return getattr(self, method)
def add(self, a,b):
return a+b
def echo(self, v): return v
|
7d338113 |
def TestResource(setup):
def _inner1(tests):
@functools.wraps(setup)
def _inner2(self):
resource = jsonrpc.server.JSON_RPC()
resource.customize(SimpleEventHandler)
|
d149e5b8 |
|
7d338113 |
request = DummyRequest([''])
request.getCookie = mock.Mock()
|
3ae840e0 |
|
7d338113 |
result = setup(self, request, resource)
if result is not None:
request, resource = result
|
3ae840e0 |
|
7d338113 |
d = _render(resource, request).addCallback(tests, self, request, resource)
return d
return _inner2
return _inner1
|
3ae840e0 |
|
cc11cbe4 |
|
7d338113 |
class TestJSONRPCServer(unittest.TestCase):
|
cc11cbe4 |
|
7d338113 |
def setUp(self):
self.id_ = 'an_id'
self.param = "some data"
|
cc11cbe4 |
|
7d338113 |
@TestResource
def test_eventhandler(self, request, resource):
resource.eventhandler = mock.Mock(wraps=resource.eventhandler)
request.content = StringIO.StringIO('{"jsonrpc": "2.0", "params": %s, "method": "echo", "id": "%s"}' % (jsonrpc.jsonutil.encode([self.param]), self.id_))
return request, resource
@test_eventhandler
def test_eventhandler(ignored, self, request, resource):
self.assertTrue( resource.eventhandler.processcontent.called )
self.assertTrue( resource.eventhandler.defer_with_rpcrequest.called )
self.assertTrue( resource.eventhandler.callmethod.called )
self.assertTrue( resource.eventhandler.defer.called )
self.assertTrue( resource.eventhandler.getresponsecode.called )
self.assertTrue( resource.eventhandler.log.called )
@TestResource
def test_requestid0(self, request, resource):
request.content = StringIO.StringIO('{"jsonrpc": "2.0", "params": %s, "method": "echo", "id": "%s"}' % (jsonrpc.jsonutil.encode([self.param]), self.id_))
return request, resource
|
cc11cbe4 |
|
7d338113 |
@test_requestid0
def test_requestid0(ignored, self, request, resource):
self.assertEqual(len(request.written), 1)
data = jsonrpc.jsonutil.decode(request.written[0])
|
cc11cbe4 |
|
7d338113 |
self.assertEqual(data["id"], self.id_)
|
cc11cbe4 |
|
7d338113 |
@TestResource
def test_requestid1(self, request, resource):
request.content = StringIO.StringIO('{"jsonrpc": "2.0", "params": %s, "method": "echo", "id": 1}' % jsonrpc.jsonutil.encode([self.param]))
return request, resource
|
cc11cbe4 |
|
7d338113 |
@test_requestid1
def test_requestid1(ignored, self, request, resource):
self.assertEqual(len(request.written), 1)
data = jsonrpc.jsonutil.decode(request.written[0])
|
cc11cbe4 |
|
7d338113 |
self.assertEqual(data["id"], 1)
|
cc11cbe4 |
|
7d338113 |
@TestResource
def test_requestid2(self, request, resource):
request.content = StringIO.StringIO('{"jsonrpc": "2.0", "params": %s, "method": "echo", "id": []}' % jsonrpc.jsonutil.encode([self.param]))
return request, resource
|
cc11cbe4 |
|
7d338113 |
@test_requestid2
def test_requestid2(ignored, self, request, resource):
self.assertEqual(len(request.written), 1)
data = jsonrpc.jsonutil.decode(request.written[0])
|
cc11cbe4 |
|
7d338113 |
self.assertNotEqual(data["id"], [])
|
cc11cbe4 |
|
7d338113 |
@TestResource
def test_requestid3(self, request, resource):
request.content = StringIO.StringIO('{"jsonrpc": "2.0", "params": %s, "method": "echo", "id": {}}' % jsonrpc.jsonutil.encode([self.param]))
return request, resource
|
cc11cbe4 |
|
7d338113 |
@test_requestid3
def test_requestid3(ignored, self, request, resource):
self.assertEqual(len(request.written), 1)
data = jsonrpc.jsonutil.decode(request.written[0])
|
3ae840e0 |
|
7d338113 |
self.assertNotEqual(data["id"], {})
|
d149e5b8 |
|
7d338113 |
@TestResource
def test_invalid_data(self, request, resource):
|
d149e5b8 |
request.content = StringIO.StringIO(' {"v": %s}, "method": "echo"}' % (jsonrpc.jsonutil.encode(self.param)))
|
7d338113 |
return request, resource
|
d149e5b8 |
|
7d338113 |
@test_invalid_data
def test_invalid_data(ignored, self, request, resource):
self.assertEqual(len(request.written), 1)
data = jsonrpc.jsonutil.decode(request.written[0])
|
d149e5b8 |
|
7d338113 |
self.assertEqual(data, {"jsonrpc": "2.0", "error": {"code": -32700, "message": "Parse error."}, "id": None})
|
d149e5b8 |
|
7d338113 |
@TestResource
def test_wrongversion(self, request, resource):
|
d149e5b8 |
request.content = StringIO.StringIO('{"jsonrpc": "2.1", "params": %s, "method": "echo", "id": "%s"}' % (jsonrpc.jsonutil.encode([self.param]), self.id_))
|
7d338113 |
@test_wrongversion
def rendered(ignored, self, request, resource):
self.assertEqual(len(request.written), 1)
data = jsonrpc.jsonutil.decode(request.written[0])
self.assertEqual(data, {"jsonrpc": "2.0", "error": {"code": -32600, "message": "Invalid Request."}, "id": self.id_})
|
d149e5b8 |
|
7d338113 |
@TestResource
def test_invalidmethodname(self, request, resource):
|
d149e5b8 |
request.content = StringIO.StringIO('{"jsonrpc": "2.0", "params": %s, "method": 0, "id": "%s"}' % (jsonrpc.jsonutil.encode([self.param]), self.id_))
|
7d338113 |
@test_invalidmethodname
def rendered(ignored, self, request, resource):
self.assertEqual(len(request.written), 1)
data = jsonrpc.jsonutil.decode(request.written[0])
self.assertEqual(data, {"jsonrpc": "2.0", "error": {"code": -32600, "message": "Invalid Request."}, "id": self.id_})
|
d149e5b8 |
|
7d338113 |
@TestResource
def test_missingmethod(self, request, resource):
|
d149e5b8 |
request.content = StringIO.StringIO('{"jsonrpc": "2.0", "params": %s, "method": "non_existent", "id": "%s"}' % (jsonrpc.jsonutil.encode([self.param]), self.id_))
|
7d338113 |
@test_missingmethod
def rendered(ignored, self, request, resource):
self.assertEqual(len(request.written), 1)
data = jsonrpc.jsonutil.decode(request.written[0])
self.assertEqual(data, {"jsonrpc": "2.0", "error": {"code": -32601, "message": "Procedure not found."}, "id": self.id_})
|
d149e5b8 |
|
7d338113 |
@TestResource
|
d149e5b8 |
def test_simplecall(self):
request.content = StringIO.StringIO('{"jsonrpc": "2.0", "params": %s, "method": "echo", "id": "%s"}' % (jsonrpc.jsonutil.encode([self.param]), self.id_))
|
7d338113 |
@test_simplecall
def rendered(ignored):
self.assertEqual(len(request.written), 1)
data = jsonrpc.jsonutil.decode(request.written[0])
|
d149e5b8 |
|
7d338113 |
self.assertEqual(data['id'], self.id_)
self.assertEqual(data['result'], self.param)
|
d149e5b8 |
|
7d338113 |
@TestResource
def test_notify(self, request, resource):
|
d149e5b8 |
request.content = StringIO.StringIO('{"jsonrpc": "2.0", "params": {"v": %s}, "method": "echo"}' % (jsonrpc.jsonutil.encode(self.param)))
|
7d338113 |
@test_notify
def test_notify(ignored, self, request, resource):
self.assertEqual(len(request.written), 0)
|
d149e5b8 |
|
7d338113 |
@TestResource
def _test_kwcall(self, request, resource):
|
d149e5b8 |
request.content = StringIO.StringIO('{"jsonrpc": "2.0", "params": {"v": %s}, "method": "echo", "id": "%s"}' % (jsonrpc.jsonutil.encode(self.param), self.id_))
|
7d338113 |
@_test_kwcall
def test_kwcall_id(ignored, self, request, resource):
self.assertEqual(len(request.written), 1)
data = jsonrpc.jsonutil.decode(request.written[0])
|
d149e5b8 |
|
7d338113 |
self.assertEqual(data['id'], self.id_)
|
d149e5b8 |
|
7d338113 |
@_test_kwcall
def test_kwcall_result(ignored, self, request, resource):
data = jsonrpc.jsonutil.decode(request.written[0])
|
d149e5b8 |
|
7d338113 |
self.assertEqual(data['result'], self.param)
|
d149e5b8 |
|
7d338113 |
@TestResource
def test_err(self, request, resource):
|
d149e5b8 |
request.content = StringIO.StringIO('{"jsonrpc": "2.0", "params": [1, "sss"], "method": "add", "id": "%s"}' % self.id_)
|
7d338113 |
return request, resource
|
d149e5b8 |
|
7d338113 |
@test_err
def test_err(ignored, self, request, resource):
self.assertEqual(len(request.written), 1)
data = jsonrpc.jsonutil.decode(request.written[0])
|
d149e5b8 |
|
7d338113 |
self.assertEqual(data['id'], self.id_)
self.assertTrue(data.get('error', False))
|
d149e5b8 |
|
7d338113 |
@TestResource
def _test_batchcall(self, request, resource):
|
d149e5b8 |
request.content = StringIO.StringIO(
'[{"jsonrpc": "2.0", "params": [1, 2], "method": "add", "id": "1"},'
'{"jsonrpc": "2.0", "params": {"a": 3, "b": 2}, "method": "add", "id": "2"}]'
)
|
7d338113 |
return request, resource
@_test_batchcall
def test_batchcall1(ignored, self, request, resource):
self.assertEqual(len(request.written), 1)
@_test_batchcall
def test_batchcall2(ignored, self, request, resource):
data = jsonrpc.jsonutil.decode(request.written[0])
self.assertEqual(len(data), 2)
@_test_batchcall
def test_batchcall3(ignored, self, request, resource):
data = jsonrpc.jsonutil.decode(request.written[0])
self.assertEqual(set(x['result'] for x in data), set([3,5]))
@_test_batchcall
def test_batchcall4(ignored, self, request, resource):
data = jsonrpc.jsonutil.decode(request.written[0])
self.assertEqual(set(x['id'] for x in data), set("12"))
@_test_batchcall
def test_batchcall5(ignored, self, request, resource):
data = jsonrpc.jsonutil.decode(request.written[0])
self.assertFalse(any(x.get('error', False) for x in data))
@TestResource
def _test_batchcall_1err(self, request, resource):
|
d149e5b8 |
request.content = StringIO.StringIO(
'[{"jsonrpc": "2.0", "params": [1, 2], "method": "add", "id": "1"},'
'{"jsonrpc": "2.0", "params": {"a": "3", "b": 2}, "method": "add", "id": "2"}]'
)
|
7d338113 |
return request, resource
|
d149e5b8 |
|
7d338113 |
@_test_batchcall_1err
def test_batchcall_1err_1(ignored, self, request, resource):
self.assertEqual(len(request.written), 1)
|
d149e5b8 |
|
7d338113 |
@_test_batchcall_1err
def test_batchcall_1err_2(ignored, self, request, resource):
data = jsonrpc.jsonutil.decode(request.written[0])
self.assertEqual(len(data), 2)
|
d149e5b8 |
|
7d338113 |
@_test_batchcall_1err
def test_batchcall_1err_3(ignored, self, request, resource):
data = jsonrpc.jsonutil.decode(request.written[0])
self.assertEqual(set(x['id'] for x in data), set("12"))
|
d149e5b8 |
|
7d338113 |
@_test_batchcall_1err
def test_batchcall_1err_4(ignored, self, request, resource):
data = jsonrpc.jsonutil.decode(request.written[0])
self.assertEqual(set(x.get('result', False) for x in data), set([3,False]))
|
d149e5b8 |
|
7d338113 |
@_test_batchcall_1err
def test_batchcall_1err_5(ignored, self, request, resource):
data = jsonrpc.jsonutil.decode(request.written[0])
self.assertEqual(len(filter(None, [x.get('error') for x in data])), 1)
|
d149e5b8 |
|
7d338113 |
@TestResource
def _test_batchcall_emptylist(self, request, resource):
|
d149e5b8 |
request.content = StringIO.StringIO('[]')
|
7d338113 |
@_test_batchcall_emptylist
def test_batchcall_emptylist(ignored, self, request, resource):
data = jsonrpc.jsonutil.decode(request.written[0])
self.assertEqual(data, {"jsonrpc": "2.0", "error": {"code": -32600, "message": "Invalid Request."}, "id": None})
|
d149e5b8 |
|