Skip to main content
Warning: You are using the test version of PyPI. This is a pre-production deployment of Warehouse. Changes made here affect the production instance of TestPyPI (testpypi.python.org).
Help us improve Python packaging - Donate today!

A JSON-RPC client library for asyncio

Project Description

This is a compact and simple JSON-RPC client implementation for asyncio python code. This code is forked from https://github.com/gciotta/jsonrpc-requests

Main Features

  • Python 3.4 & 3.5 compatible
  • Supports nested namespaces (eg. app.users.getUsers())
  • 100% test coverage

Usage

Execute remote JSON-RPC functions

import asyncio
from jsonrpc_async import Server

@asyncio.coroutine
def routine():
    server = Server('http://localhost:8080')
    try:
        yield from server.foo(1, 2)
        yield from server.foo(bar=1, baz=2)
        yield from server.foo({'foo': 'bar'})
        yield from server.foo.bar(baz=1, qux=2)
    finally:
        yield from server.session.close()

asyncio.get_event_loop().run_until_complete(routine())

A notification

import asyncio
from jsonrpc_async import Server

@asyncio.coroutine
def routine():
    server = Server('http://localhost:8080')
    try:
        yield from server.foo(bar=1, _notification=True)
    finally:
        yield from server.session.close()

asyncio.get_event_loop().run_until_complete(routine())

Pass through arguments to aiohttp (see also aiohttp documentation)

import asyncio
import aiohttp
from jsonrpc_async import Server

@asyncio.coroutine
def routine():
    server = Server(
        'http://localhost:8080',
        auth=aiohttp.BasicAuth('user', 'pass'),
        headers={'x-test2': 'true'})
    try:
        yield from server.foo()
    finally:
        yield from server.session.close()

asyncio.get_event_loop().run_until_complete(routine())

Pass through aiohttp exceptions

import asyncio
import aiohttp
from jsonrpc_async import Server

@asyncio.coroutine
def routine():
    server = Server('http://unknown-host')
    try:
        yield from server.foo()
    except TransportError as transport_error:
        print(transport_error.args[1]) # this will hold a aiohttp exception instance
    finally:
        yield from server.session.close()

asyncio.get_event_loop().run_until_complete(routine())

Tests

Install the Python tox package and run tox, it’ll test this package with various versions of Python.

Credits

@gciotta for creating the base project jsonrpc-requests.

@mbroadst for providing full support for nested method calls, JSON-RPC RFC compliance and other improvements.

@vaab for providing api and tests improvements, better RFC compliance.

Release History

Release History

This version
History Node

0.1

Supported By

WebFaction WebFaction Technical Writing Elastic Elastic Search Pingdom Pingdom Monitoring Dyn Dyn DNS Sentry Sentry Error Logging CloudAMQP CloudAMQP RabbitMQ Heroku Heroku PaaS Kabu Creative Kabu Creative UX & Design Fastly Fastly CDN DigiCert DigiCert EV Certificate Rackspace Rackspace Cloud Servers DreamHost DreamHost Log Hosting