aboutsummaryrefslogtreecommitdiffstats
path: root/app
diff options
context:
space:
mode:
Diffstat (limited to 'app')
-rw-r--r--app/scripts/lib/inpage-provider.js59
-rw-r--r--app/scripts/lib/obj-multiplex.js48
-rw-r--r--app/scripts/lib/port-stream.js16
-rw-r--r--app/scripts/lib/stream-utils.js24
-rw-r--r--app/scripts/metamask-controller.js80
5 files changed, 94 insertions, 133 deletions
diff --git a/app/scripts/lib/inpage-provider.js b/app/scripts/lib/inpage-provider.js
index c63af06dc..db46e4f17 100644
--- a/app/scripts/lib/inpage-provider.js
+++ b/app/scripts/lib/inpage-provider.js
@@ -1,8 +1,9 @@
-const pipe = require('pump')
-const StreamProvider = require('web3-stream-provider')
+const pump = require('pump')
+const RpcEngine = require('json-rpc-engine')
+const createIdRemapMiddleware = require('json-rpc-engine/src/idRemapMiddleware')
+const createStreamMiddleware = require('json-rpc-middleware-stream')
const LocalStorageStore = require('obs-store')
const ObjectMultiplex = require('./obj-multiplex')
-const createRandomId = require('./random-id')
module.exports = MetamaskInpageProvider
@@ -11,7 +12,7 @@ function MetamaskInpageProvider (connectionStream) {
// setup connectionStream multiplexing
var multiStream = self.multiStream = ObjectMultiplex()
- pipe(
+ pump(
connectionStream,
multiStream,
connectionStream,
@@ -20,7 +21,7 @@ function MetamaskInpageProvider (connectionStream) {
// subscribe to metamask public config (one-way)
self.publicConfigStore = new LocalStorageStore({ storageKey: 'MetaMask-Config' })
- pipe(
+ pump(
multiStream.createStream('publicConfig'),
self.publicConfigStore,
(err) => logStreamDisconnectWarning('MetaMask PublicConfigStore', err)
@@ -30,39 +31,19 @@ function MetamaskInpageProvider (connectionStream) {
multiStream.ignoreStream('phishing')
// connect to async provider
- const asyncProvider = self.asyncProvider = new StreamProvider()
- pipe(
- asyncProvider,
+ const streamMiddleware = createStreamMiddleware()
+ pump(
+ streamMiddleware.stream,
multiStream.createStream('provider'),
- asyncProvider,
+ streamMiddleware.stream,
(err) => logStreamDisconnectWarning('MetaMask RpcProvider', err)
)
- // start and stop polling to unblock first block lock
-
- self.idMap = {}
- // handle sendAsync requests via asyncProvider
- self.sendAsync = function (payload, cb) {
- // rewrite request ids
- var request = eachJsonMessage(payload, (_message) => {
- const message = Object.assign({}, _message)
- const newId = createRandomId()
- self.idMap[newId] = message.id
- message.id = newId
- return message
- })
- // forward to asyncProvider
- asyncProvider.sendAsync(request, function (err, res) {
- if (err) return cb(err)
- // transform messages to original ids
- eachJsonMessage(res, (message) => {
- var oldId = self.idMap[message.id]
- delete self.idMap[message.id]
- message.id = oldId
- return message
- })
- cb(null, res)
- })
- }
+ // handle sendAsync requests via dapp-side rpc engine
+ const engine = new RpcEngine()
+ engine.push(createIdRemapMiddleware())
+ engine.push(streamMiddleware)
+
+ self.sendAsync = engine.handle.bind(engine)
}
MetamaskInpageProvider.prototype.send = function (payload) {
@@ -122,14 +103,6 @@ MetamaskInpageProvider.prototype.isMetaMask = true
// util
-function eachJsonMessage (payload, transformFn) {
- if (Array.isArray(payload)) {
- return payload.map(transformFn)
- } else {
- return transformFn(payload)
- }
-}
-
function logStreamDisconnectWarning (remoteLabel, err) {
let warningMsg = `MetamaskInpageProvider - lost connection to ${remoteLabel}`
if (err) warningMsg += '\n' + err.stack
diff --git a/app/scripts/lib/obj-multiplex.js b/app/scripts/lib/obj-multiplex.js
deleted file mode 100644
index 0034febe0..000000000
--- a/app/scripts/lib/obj-multiplex.js
+++ /dev/null
@@ -1,48 +0,0 @@
-const through = require('through2')
-
-module.exports = ObjectMultiplex
-
-function ObjectMultiplex (opts) {
- opts = opts || {}
- // create multiplexer
- const mx = through.obj(function (chunk, enc, cb) {
- const name = chunk.name
- const data = chunk.data
- if (!name) {
- console.warn(`ObjectMultiplex - Malformed chunk without name "${chunk}"`)
- return cb()
- }
- const substream = mx.streams[name]
- if (!substream) {
- console.warn(`ObjectMultiplex - orphaned data for stream "${name}"`)
- } else {
- if (substream.push) substream.push(data)
- }
- return cb()
- })
- mx.streams = {}
- // create substreams
- mx.createStream = function (name) {
- const substream = mx.streams[name] = through.obj(function (chunk, enc, cb) {
- mx.push({
- name: name,
- data: chunk,
- })
- return cb()
- })
- mx.on('end', function () {
- return substream.emit('end')
- })
- if (opts.error) {
- mx.on('error', function () {
- return substream.emit('error')
- })
- }
- return substream
- }
- // ignore streams (dont display orphaned data warning)
- mx.ignoreStream = function (name) {
- mx.streams[name] = true
- }
- return mx
-}
diff --git a/app/scripts/lib/port-stream.js b/app/scripts/lib/port-stream.js
index 607a9c9ed..648d88087 100644
--- a/app/scripts/lib/port-stream.js
+++ b/app/scripts/lib/port-stream.js
@@ -1,5 +1,6 @@
const Duplex = require('readable-stream').Duplex
const inherits = require('util').inherits
+const noop = function(){}
module.exports = PortDuplexStream
@@ -20,20 +21,14 @@ PortDuplexStream.prototype._onMessage = function (msg) {
if (Buffer.isBuffer(msg)) {
delete msg._isBuffer
var data = new Buffer(msg)
- // console.log('PortDuplexStream - saw message as buffer', data)
this.push(data)
} else {
- // console.log('PortDuplexStream - saw message', msg)
this.push(msg)
}
}
PortDuplexStream.prototype._onDisconnect = function () {
- try {
- this.push(null)
- } catch (err) {
- this.emit('error', err)
- }
+ this.destroy()
}
// stream plumbing
@@ -45,19 +40,12 @@ PortDuplexStream.prototype._write = function (msg, encoding, cb) {
if (Buffer.isBuffer(msg)) {
var data = msg.toJSON()
data._isBuffer = true
- // console.log('PortDuplexStream - sent message as buffer', data)
this._port.postMessage(data)
} else {
- // console.log('PortDuplexStream - sent message', msg)
this._port.postMessage(msg)
}
} catch (err) {
- // console.error(err)
return cb(new Error('PortDuplexStream - disconnected'))
}
cb()
}
-
-// util
-
-function noop () {}
diff --git a/app/scripts/lib/stream-utils.js b/app/scripts/lib/stream-utils.js
index ba79990cc..8bb0b4f3c 100644
--- a/app/scripts/lib/stream-utils.js
+++ b/app/scripts/lib/stream-utils.js
@@ -1,6 +1,6 @@
const Through = require('through2')
-const endOfStream = require('end-of-stream')
-const ObjectMultiplex = require('./obj-multiplex')
+const ObjectMultiplex = require('obj-multiplex')
+const pump = require('pump')
module.exports = {
jsonParseStream: jsonParseStream,
@@ -23,14 +23,14 @@ function jsonStringifyStream () {
}
function setupMultiplex (connectionStream) {
- var mx = ObjectMultiplex()
- connectionStream.pipe(mx).pipe(connectionStream)
- endOfStream(mx, function (err) {
- if (err) console.error(err)
- })
- endOfStream(connectionStream, function (err) {
- if (err) console.error(err)
- mx.destroy()
- })
- return mx
+ const mux = new ObjectMultiplex()
+ pump(
+ connectionStream,
+ mux,
+ connectionStream,
+ (err) => {
+ if (err) console.error(err)
+ }
+ )
+ return mux
}
diff --git a/app/scripts/metamask-controller.js b/app/scripts/metamask-controller.js
index a007d6fc5..735fc4af0 100644
--- a/app/scripts/metamask-controller.js
+++ b/app/scripts/metamask-controller.js
@@ -1,12 +1,14 @@
const EventEmitter = require('events')
const extend = require('xtend')
const promiseToCallback = require('promise-to-callback')
-const pipe = require('pump')
+const pump = require('pump')
const Dnode = require('dnode')
const ObservableStore = require('obs-store')
const EthStore = require('./lib/eth-store')
const EthQuery = require('eth-query')
-const streamIntoProvider = require('web3-stream-provider/handler')
+const RpcEngine = require('json-rpc-engine')
+const createEngineStream = require('json-rpc-middleware-stream/engineStream')
+const createFilterMiddleware = require('eth-json-rpc-filters')
const setupMultiplex = require('./lib/stream-utils.js').setupMultiplex
const KeyringController = require('./keyring-controller')
const NetworkController = require('./controllers/network')
@@ -77,12 +79,13 @@ module.exports = class MetamaskController extends EventEmitter {
// rpc provider
this.provider = this.initializeProvider()
+ this.blockTracker = this.provider
// eth data query tools
this.ethQuery = new EthQuery(this.provider)
this.ethStore = new EthStore({
provider: this.provider,
- blockTracker: this.provider,
+ blockTracker: this.blockTracker,
})
// key mgmt
@@ -109,7 +112,7 @@ module.exports = class MetamaskController extends EventEmitter {
getNetwork: this.networkController.getNetworkState.bind(this),
signTransaction: this.keyringController.signTransaction.bind(this.keyringController),
provider: this.provider,
- blockTracker: this.provider,
+ blockTracker: this.blockTracker,
ethQuery: this.ethQuery,
ethStore: this.ethStore,
})
@@ -366,7 +369,14 @@ module.exports = class MetamaskController extends EventEmitter {
setupControllerConnection (outStream) {
const api = this.getApi()
const dnode = Dnode(api)
- outStream.pipe(dnode).pipe(outStream)
+ pump(
+ outStream,
+ dnode,
+ outStream,
+ (err) => {
+ if (err) console.error(err)
+ }
+ )
dnode.on('remote', (remote) => {
// push updates to popup
const sendUpdate = remote.sendUpdate.bind(remote)
@@ -375,26 +385,64 @@ module.exports = class MetamaskController extends EventEmitter {
}
setupProviderConnection (outStream, originDomain) {
- streamIntoProvider(outStream, this.provider, onRequest, onResponse)
+ // setup json rpc engine stack
+ const engine = new RpcEngine()
+ engine.push(originMiddleware)
+ engine.push(loggerMiddleware)
+ engine.push(createFilterMiddleware({
+ provider: this.provider,
+ blockTracker: this.blockTracker,
+ }))
+ engine.push(createProviderMiddleware({ provider: this.provider }))
+
+ // setup connection
+ const providerStream = createEngineStream({ engine })
+ pump(
+ outStream,
+ providerStream,
+ outStream,
+ (err) => {
+ if (err) console.error(err)
+ }
+ )
+
// append dapp origin domain to request
- function onRequest (request) {
- request.origin = originDomain
+ function originMiddleware (req, res, next, end) {
+ req.origin = originDomain
+ next()
}
+
// log rpc activity
- function onResponse (err, request, response) {
- if (err) return console.error(err)
- if (response.error) {
- console.error('Error in RPC response:\n', response)
+ function loggerMiddleware (req, res, next, end) {
+ next((cb) => {
+ if (res.error) {
+ console.error('Error in RPC response:\n', res)
+ }
+ if (req.isMetamaskInternal) return
+ log.info(`RPC (${originDomain}):`, req, '->', res)
+ cb()
+ })
+ }
+
+ // forward requests to provider
+ function createProviderMiddleware({ provider }) {
+ return (req, res, next, end) => {
+ provider.sendAsync(req, (err, _res) => {
+ if (err) return end(err)
+ res.result = _res.result
+ end()
+ })
}
- if (request.isMetamaskInternal) return
- log.info(`RPC (${originDomain}):`, request, '->', response)
}
}
setupPublicConfig (outStream) {
- pipe(
+ pump(
this.publicConfigStore,
- outStream
+ outStream,
+ (err) => {
+ if (err) console.error(err)
+ }
)
}