Home | 简体中文 | 繁体中文 | 杂文 | 知乎专栏 | 51CTO学院 | CSDN程序员研修院 | Github | OSChina 博客 | 腾讯云社区 | 阿里云栖社区 | Facebook | Linkedin | Youtube | 打赏(Donations) | About
知乎专栏多维度架构

11.18. Subscription

在区块链上发生的事件时,能不通知到订阅事件者。

[提示]提示
主意 https://docs.web3j.io/filters.html 有一段话:

Note: filters are not supported on Infura.

目前 Infura 不支持。

11.18.1. 接收所有添加到区块链的新区块

			
Subscription subscription = web3j.blockObservable(false).subscribe(block -> {
    ...
});
			
			

11.18.2. 接收所有添加到区块链的新交易

			
Subscription subscription = web3j.transactionObservable().subscribe(tx -> {
    ...
});
			
			

11.18.3. 接收所有待处理的事务

			
Subscription subscription = web3j.pendingTransactionObservable().subscribe(tx -> {
    ...
});
			
			

测试环境 Mac, 首先启动 Ethereum Wallet,然后启动下面程序,回到 Ethereum Wallet 中做一笔转账。

			
package cn.netkiller.example.ethereum.subscription;

import org.web3j.protocol.Web3j;
import org.web3j.protocol.ipc.UnixIpcService;

import rx.Subscription;

public class PendingTest {

	public PendingTest() {
		// TODO Auto-generated constructor stub
	}

	public static void main(String[] args) {
		// TODO Auto-generated method stub
		System.out.println("Subscription Starting...");
		Web3j web3 = Web3j.build(new UnixIpcService("/Users/neo/Library/Ethereum/geth.ipc"));
		Subscription subscription = web3.pendingTransactionObservable().subscribe(block -> {
			// System.out.println(block.toString());
			System.out.println("block number: " + block.getBlockHash());
		});
		// subscription.unsubscribe();
	}

}
			
			
			

输出

			
block number: 0x74f7dd053dadcf01599dc85d4abf60662695e78ce7531335f44dc03f49dee326
			
			

11.18.4. 将区块块重放到当前的当前位置

			
Subscription subscription = catchUpToLatestAndSubscribeToNewBlocksObservable(
        <startBlockNumber>, <fullTxObjects>)
        .subscribe(block -> {
            ...
});

作者:ChainBoard链博科技
链接:https://www.jianshu.com/p/c7c5556a436b
來源:简书
著作权归作者所有。商业转载请联系作者获得授权,非商业转载请注明出处。			
			
			

11.18.5. 过滤主题

			
EthFilter filter = new EthFilter(DefaultBlockParameterName.EARLIEST, DefaultBlockParameterName.LATEST, <contract-address>).addSingleTopic(...)|.addOptionalTopics(..., ...)|...;

web3j.ethLogObservable(filter).subscribe(log -> {
    ...
});		
			
			

11.18.6. 停止订阅 Subscriptions

			
subscription.unsubscribe();
			
			

11.18.7. 

			
Web3j web3 = Web3j.build(new HttpService("http://127.0.0.1:8080"));
        System.out.println("Connected to Ethereum client version: "
                + web3.toString());
        subscription = web3.blockObservable(false)
                .subscribe(tx -> {
                    System.out.println("observation tx:" + tx.getRawResponse());
                });
        txSubscription = web3.transactionObservable().subscribe((tx) -> {
            System.out.println("txSubscription hash:" + tx.getHash() + ":::::address:" + tx.getTo());
            if (!ValidationUtil.isEmpty(tx.getTo())) {
                System.out.println("getBlockNumber:" + tx.getBlockNumber().longValue());
                System.out.println("getValue:" + tx.getValue());
                System.out.println("getTo:" + tx.getTo());
                System.out.println("getFrom:" + tx.getFrom());
                System.out.println("getHash:" + tx.getHash());
            }
        });
        pendingSubscription = web3.pendingTransactionObservable().subscribe(tx -> {
            System.out.println("pending hash:" + tx.getHash() + ":::::address:" + tx.getTo());
        });
        latestSubscription = web3.catchUpToLatestTransactionObservable(new DefaultBlockParameterNumber(2576860)).subscribe(block ->
                System.out.println("*+observer+getblock" + block.getBlockNumber())
        );