跳到主要内容

gRPC

gRPC 是一种高性能, 开源的远程过程调用(RPC)框架, 最初由 Google 开发. 它使用 HTTP/2 作为传输协议, 并采用 Protocol Buffers(protobuf)作为接口描述语言和数据序列化格式. gRPC 支持多种编程语言, 可以实现跨语言的服务调用. 其特点包括双向流, 负载均衡, 认证和超时控制等, 适用于微服务架构下的高效通信

模块(package)

安装 protobuf 插件, 模块会生成 gRPC 的 protobuf 定义和对应的 Stub 接口供客户端(client)使用

安装 protobuf 插件和 gRPC 依赖

user-package/build.gradle
buildscript {
repositories {
jcenter()
}
}

plugins {
id 'java'
id "org.graphoenix" version "0.1.1"
id 'com.google.protobuf' version '0.9.1'
}

classes {
dependsOn generateGraphQLSource
dependsOn generateProtobufV3
}

// gRPC 配置
protobuf {
protoc {
artifact = 'com.google.protobuf:protoc:3.21.7'
}
plugins {
grpc {
artifact = 'io.grpc:protoc-gen-grpc-java:1.52.1'
}
reactor {
artifact = 'com.salesforce.servicelibs:reactor-grpc:1.2.3'
}
}
generateProtoTasks {
all()*.plugins {
grpc {}
reactor {}
}
}
}

// gRPC 目录配置
sourceSets {
main {
java {
srcDirs 'build/generated/source/proto/main/java'
srcDirs 'build/generated/source/proto/main/grpc'
srcDirs 'build/generated/source/proto/main/reactor'
}
}
}

dependencies {
// gRPC 依赖
runtimeOnly 'io.grpc:grpc-netty-shaded:1.52.1'
implementation 'io.grpc:grpc-protobuf:1.52.1'
implementation 'io.grpc:grpc-stub:1.52.1'
implementation 'com.salesforce.servicelibs:reactor-grpc-stub:1.2.3'
compileOnly 'org.apache.tomcat:annotations-api:6.0.53' // necessary for Java 9+

// protobuf 依赖
protobuf 'org.graphoenix:graphoenix-core:0.1.3'

// ...
}

生成 DTO 和 protobuf

./gradlew :user-package:build
protobuf目录
|-- order-microservices
|-- user-package 用户模块
|-- build.gradle
|-- src
|-- main
|-- java
| |-- demo.gp.user
| |-- package-info.java
|-- proto
| |-- demo.gp.user
| |-- enums.proto 枚举类型
| |-- input_objects.proto 输入类型
| |-- interfaces.proto 接口类型
| |-- objects.proto 对象类型
| |-- query.proto 查询服务
| |-- mutation.proto 变更服务
|-- resources
|-- graphql
|-- user.gql 定义用户相关类型

Graphoenix 根据 GraphQL 生成对应的 protobuf 定义文件


服务端(server)

安装 graphoenix-grpc-server 模块

安装 graphoenix-grpc-server 模块, Graphoenix 将根据模块(package)中的定义自动实现 gRPC 服务接口

user-app/build.gradle
dependencies {
// gRPC 服务
implementation 'org.graphoenix:graphoenix-grpc-server:0.1.4'

annotationProcessor 'org.graphoenix:graphoenix-grpc-server:0.1.4'

// ...
}

启动 gRPC 服务

Run/Debug user-app/src/main/java/demo/gp/user/App.java


客户端(client)

引用需要调用的 gRPC 模块(package)

order-package/build.gradle
dependencies {
implementation project(':user-package')

// ...
}

查询

使用 QueryServiceGrpc.QueryServiceBlockingStub 接口中与 Query 对象中同名的方法查询, 使用 setSelectionSet 方法来设置查询字段

import demo.gp.user.dto.enumType.grpc.UserType;
import demo.gp.user.grpc.*;
import io.graphoenix.core.dto.enumType.grpc.Operator;
import io.graphoenix.core.dto.inputObjectType.grpc.StringExpression;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import org.junit.jupiter.api.Test;

import static org.junit.jupiter.api.Assertions.*;

public class UserGrpcTest {

// gRPC server 地址
private static final String userGrpcAddress = "localhost:50053";
private static final ManagedChannel userManagedChannel = ManagedChannelBuilder.forTarget(userGrpcAddress).usePlaintext().build();
private static final QueryServiceGrpc.QueryServiceBlockingStub queryServiceStub = QueryServiceGrpc.newBlockingStub(userManagedChannel);

@Test
void queryUserTest() {
QueryUserRequest queryUserRequest = QueryUserRequest.newBuilder()
.setSelectionSet("{name userType}")
.setName(
StringExpression.newBuilder()
.setOpr(Operator.EQ_OPERATOR)
.setVal("Alice")
.build()
)
.build();
QueryUserResponse response = queryServiceStub.user(queryUserRequest);

assertAll(
() -> assertEquals(response.getUser().getName(), "Alice"),
() -> assertEquals(response.getUser().getUserType(), UserType.VIP_USER_TYPE),
() -> assertEquals(response.getUser().getEmail(), "")
);
}
}

方法等同于如下 GraphQL 查询

query {
user(name: { opr: EQ, val: "Alice" }) {
name
userType
}
}

变更

使用 MutationServiceGrpc.MutationServiceBlockingStub 接口中与 Mutation 对象中同名的方法变更, 使用 setSelectionSet 方法来设置查询字段

import demo.gp.user.dto.enumType.grpc.UserType;
import demo.gp.user.grpc.*;
import io.graphoenix.core.dto.enumType.grpc.Operator;
import io.graphoenix.core.dto.inputObjectType.grpc.StringExpression;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import org.junit.jupiter.api.Test;

import static org.junit.jupiter.api.Assertions.*;

public class UserGrpcTest {

// gRPC server 地址
private static final String userGrpcAddress = "localhost:50053";
private static final ManagedChannel userManagedChannel = ManagedChannelBuilder.forTarget(userGrpcAddress).usePlaintext().build();
private static final MutationServiceGrpc.MutationServiceBlockingStub mutationServiceBlockingStub = MutationServiceGrpc.newBlockingStub(userManagedChannel);

@Test
void MutationUserTest() {
MutationUserRequest mutationUserRequest = MutationUserRequest.newBuilder()
.setName("Uma")
.setEmail("uma@example.com")
.setUserType(UserType.VIP_USER_TYPE)
.build();
MutationUserResponse response = mutationServiceBlockingStub.user(mutationUserRequest);

assertAll(
() -> assertEquals(response.getUser().getName(), "Uma"),
() -> assertEquals(response.getUser().getEmail(), "uma@example.com"),
() -> assertEquals(response.getUser().getUserType(), UserType.VIP_USER_TYPE)
);
}
}

方法等同于如下 GraphQL 变更

mutaion {
user(name: "Uma", email: "uma@example.com", userType: VIP) {
id
name
email
userType
}
}

响应式

Graphoenix 支持 Reactive gRPC, 响应式 Stub 以 Reactor 开头命名

import demo.gp.user.dto.enumType.grpc.UserType;
import demo.gp.user.grpc.*;
import io.graphoenix.core.dto.enumType.grpc.Operator;
import io.graphoenix.core.dto.inputObjectType.grpc.StringExpression;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import org.junit.jupiter.api.Test;
import reactor.core.publisher.Mono;
import reactor.test.StepVerifier;

import static org.junit.jupiter.api.Assertions.*;

public class UserGrpcTest {

// gRPC server 地址
private static final String userGrpcAddress = "localhost:50053";
private static final ManagedChannel userManagedChannel = ManagedChannelBuilder.forTarget(userGrpcAddress).usePlaintext().build();
// 响应式 Stub
private static final ReactorQueryServiceGrpc.ReactorQueryServiceStub reactorQueryServiceStub = ReactorQueryServiceGrpc.newReactorStub(userManagedChannel);

@Test
void reactorQueryUserTest() {
QueryUserRequest queryUserRequest = QueryUserRequest.newBuilder()
.setName(
StringExpression.newBuilder()
.setOpr(Operator.EQ_OPERATOR)
.setVal("Bob")
.build()
)
.build();
Mono<QueryUserResponse> responseMono = reactorQueryServiceStub.user(queryUserRequest);

StepVerifier.create(responseMono)
.assertNext(response ->
assertAll(
() -> assertEquals(response.getUser().getName(), "Bob"),
() -> assertEquals(response.getUser().getEmail(), "bob@example.com"),
() -> assertEquals(response.getUser().getUserType(), UserType.REGULAR_USER_TYPE)
)
)
.expectComplete()
.verify();
}
}

负载均衡

Graphoenix 提供 PackageNameResolverProvider 支持 gRPC 负载均衡, 使用 package://package.name 作为模块地址, 提供 pick_firstround_robin 两种策略

import io.grpc.ManagedChannel;
import io.grpc.NameResolverRegistry;
import io.graphoenix.grpc.client.resolver.PackageNameResolverProvider;

public class UserGrpcTest {

// private static final String userGrpcAddress = "localhost:50053";
// private static final ManagedChannel userManagedChannel = ManagedChannelBuilder.forTarget(userGrpcAddress).usePlaintext().build();
// gRPC server 地址
private static final String userGrpcAddress = "package://demo.gp.user";
private static final ManagedChannel userManagedChannel = ManagedChannelBuilder.forTarget(userGrpcAddress).defaultLoadBalancingPolicy("round_robin").usePlaintext().build()
private static final QueryServiceGrpc.QueryServiceBlockingStub queryServiceStub = QueryServiceGrpc.newBlockingStub(userManagedChannel);

@Inject
public UserGrpcTest(PackageNameResolverProvider packageNameResolverProvider) {
// 注册地址工厂
NameResolverRegistry.getDefaultRegistry().register(packageNameResolverProvider);
}
}

本节示例

https://github.com/doukai/order-microservices/tree/main/order-app/src/test/java/demo/gp/order/test/grpc