gRPC Define your service using Protocol Buffers, a powerful binary serialization toolset and language
gRPC是基于Protobuf开发的RPC框架,简化了protobuf的开发,提供了服务端和客户端网络交互这一块的代码。
Demo 照着 https://grpc.io/docs/quickstart/java.html 测试一下官方的Demo。
记得要把Update a gRPC service
部分做了。
gRPC整合Gradle与代码生成 https://github.com/grpc/grpc-java 这个是gRPC-java项目,先引入gRPC的依赖。
1 2 3 compile 'io.grpc:grpc-netty:1.4.0' compile 'io.grpc:grpc-protobuf:1.4.0' compile 'io.grpc:grpc-stub:1.4.0'
然后配置gradle的grpc插件
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 apply plugin: 'java' apply plugin: 'com.google.protobuf' buildscript { repositories { mavenCentral() } dependencies { classpath 'com.google.protobuf:protobuf-gradle-plugin:0.8.1' } } protobuf { protoc { artifact = "com.google.protobuf:protoc:3.2.0" } plugins { grpc { artifact = 'io.grpc:protoc-gen-grpc-java:1.4.0' } } generateProtoTasks { all()*.plugins { grpc {} } } }
后面直接用gradle的任务就可以生成代码了。
gRPC提供了3种传输层的实现
https://github.com/google/protobuf-gradle-plugin
The Gradle plugin that compiles Protocol Buffer (aka. Protobuf) definition files (*.proto) in your project. There are two pieces of its job:
It assembles the Protobuf Compiler (protoc) command line and use it to generate Java source files out of your proto files.
It adds the generated Java source files to the input of the corresponding Java compilation unit (sourceSet in a Java project; variant in an Android project), so that they can be compiled along with your Java sources.
实战 配置好后,进行一个演示
在src/main/proto
新建一个文件Student.proto
gradle插件默认从src/main/proto
找proto源文件进行代码生成,这里 有提到,而且这个路径的配置是可以修改的。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 syntax = "proto3"; package com.sail.proto; option java_package = "com.sail.proto"; option java_outer_classname = "StudentProto"; option java_multiple_files = true; service StudentService { rpc GetRealNameByUsername(MyRequest) returns (MyResponse) {} rpc GetStudentsByAge(StudentRequest) returns (stream StudentResponse) {} rpc GetStudentsWrapperByAges(stream StudentRequest) returns (StudentResponseList) {} rpc BiTalk(stream StreamRequest) returns (stream StreamResponse) {} } message MyRequest { string username = 1; } message MyResponse { string realname = 2; } message StudentRequest { int32 age = 1; } message StudentResponse { string name = 1; int32 age = 2; string city = 3; } message StudentResponseList { repeated StudentResponse studentResponse = 1; } message StreamRequest { string request_info = 1; } message StreamResponse { string response_info = 1; }
然后执行gradle generateProto
,生成的代码默认是放在/build
目录下,我们手动拷贝到src/main/java
。
实现代码 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 package com.sail.grpc;import com.sail.proto.*;import io.grpc.stub.StreamObserver;import java.util.UUID;public class StudentServiceImpl extends StudentServiceGrpc .StudentServiceImplBase { @Override public void getRealNameByUsername (MyRequest request, StreamObserver<MyResponse> responseObserver) { System.out.println("接收到客户端信息: " + request.getUsername()); responseObserver.onNext(MyResponse.newBuilder().setRealname("张三" ).build()); responseObserver.onCompleted(); } @Override public void getStudentsByAge (StudentRequest request, StreamObserver<StudentResponse> responseObserver) { System.out.println("接收到客户端信息:" + request.getAge()); responseObserver.onNext(StudentResponse.newBuilder().setName("张三" ).setAge(20 ).setCity("北京" ).build()); responseObserver.onNext(StudentResponse.newBuilder().setName("李四" ).setAge(30 ).setCity("天津" ).build()); responseObserver.onNext(StudentResponse.newBuilder().setName("王五" ).setAge(40 ).setCity("成都" ).build()); responseObserver.onNext(StudentResponse.newBuilder().setName("赵六" ).setAge(50 ).setCity("深圳" ).build()); responseObserver.onCompleted(); } @Override public StreamObserver<StudentRequest> getStudentsWrapperByAges (StreamObserver<StudentResponseList> responseObserver) { return new StreamObserver<StudentRequest>() { @Override public void onNext (StudentRequest value) { System.out.println("onNext: " + value.getAge()); } @Override public void onError (Throwable t) { System.out.println(t.getMessage()); } @Override public void onCompleted () { StudentResponse studentResponse = StudentResponse.newBuilder().setName("张三" ).setAge(20 ).setCity("西安" ).build(); StudentResponse studentResponse2 = StudentResponse.newBuilder().setName("李四" ).setAge(30 ).setCity("成都" ).build(); StudentResponseList studentResponseList = StudentResponseList.newBuilder() .addStudentResponse(studentResponse).addStudentResponse(studentResponse2).build(); responseObserver.onNext(studentResponseList); responseObserver.onCompleted(); } }; } @Override public StreamObserver<StreamRequest> biTalk (StreamObserver<StreamResponse> responseObserver) { return new StreamObserver<StreamRequest>() { @Override public void onNext (StreamRequest value) { System.out.println(value.getRequestInfo()); responseObserver.onNext(StreamResponse.newBuilder().setResponseInfo(UUID.randomUUID().toString()).build()); } @Override public void onError (Throwable t) { System.out.println(t.getMessage()); } @Override public void onCompleted () { responseObserver.onCompleted(); } }; } }
服务器端 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 package com.sail.grpc;import io.grpc.Server;import io.grpc.ServerBuilder;import java.io.IOException;public class GrpcServer { private Server server; private void start () throws IOException { this .server = ServerBuilder.forPort(8899 ).addService(new StudentServiceImpl()).build().start(); System.out.println("server started!" ); Runtime.getRuntime().addShutdownHook(new Thread(() -> { System.out.println("关闭jvm" ); GrpcServer.this .stop(); })); System.out.println("执行到这里" ); } private void stop () { if (server != null ) { this .server.shutdown(); } } private void awaitTermination () throws InterruptedException { if (server != null ) { this .server.awaitTermination(); } } public static void main (String[] args) throws InterruptedException, IOException { GrpcServer grpcServer = new GrpcServer(); grpcServer.start(); grpcServer.awaitTermination(); } }
客户端 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 package com.sail.grpc;import com.sail.proto.*;import io.grpc.ManagedChannel;import io.grpc.ManagedChannelBuilder;import io.grpc.stub.StreamObserver;import java.time.LocalDateTime;import java.util.Iterator;public class GrpcClient { public static void main (String[] args) { ManagedChannel managedChannel = ManagedChannelBuilder.forAddress("localhost" , 8899 ) .usePlaintext(true ).build(); StudentServiceGrpc.StudentServiceBlockingStub blockingStub = StudentServiceGrpc.newBlockingStub(managedChannel); StudentServiceGrpc.StudentServiceStub stub = StudentServiceGrpc.newStub(managedChannel); MyResponse myResponse = blockingStub.getRealNameByUsername(MyRequest.newBuilder().setUsername("zhangsan" ).build()); System.out.println(myResponse.getRealname()); System.out.println("----------------" ); Iterator<StudentResponse> iter = blockingStub.getStudentsByAge(StudentRequest.newBuilder().setAge(20 ).build()); while (iter.hasNext()) { StudentResponse studentResponse = iter.next(); System.out.println(studentResponse.getName() + ", " + studentResponse.getAge() + ", " + studentResponse.getCity()); } System.out.println("----------------" ); StreamObserver<StudentResponseList> studentResponseListStreamObserver = new StreamObserver<StudentResponseList>() { @Override public void onNext (StudentResponseList value) { value.getStudentResponseList().forEach(studentResponse -> { System.out.println(studentResponse.getName() + ", " + studentResponse.getAge() + ", " + studentResponse.getCity()); System.out.println("*******" ); }); } @Override public void onError (Throwable t) { System.out.println(t.getMessage()); } @Override public void onCompleted () { System.out.println("completed!" ); } }; StreamObserver<StudentRequest> studentRequestStreamObserver = stub.getStudentsWrapperByAges(studentResponseListStreamObserver); studentRequestStreamObserver.onNext(StudentRequest.newBuilder().setAge(20 ).build()); studentRequestStreamObserver.onNext(StudentRequest.newBuilder().setAge(30 ).build()); studentRequestStreamObserver.onNext(StudentRequest.newBuilder().setAge(40 ).build()); studentRequestStreamObserver.onNext(StudentRequest.newBuilder().setAge(50 ).build()); studentRequestStreamObserver.onCompleted(); try { Thread.sleep(50000 ); } catch (InterruptedException e) { e.printStackTrace(); } StreamObserver<StreamRequest> requestStreamObserver = stub.biTalk(new StreamObserver<StreamResponse>() { @Override public void onNext (StreamResponse value) { System.out.println(value.getResponseInfo()); } @Override public void onError (Throwable t) { System.out.println(t.getMessage()); } @Override public void onCompleted () { System.out.println("onCompleted!" ); } }); for (int i = 0 ; i < 10 ; i++) { requestStreamObserver.onNext(StreamRequest.newBuilder().setRequestInfo(LocalDateTime.now().toString()).build()); try { Thread.sleep(1000 ); } catch (InterruptedException e) { e.printStackTrace(); } } } }