Skip to content

FutureStub cancelled in server call #8053

@humiao8sz

Description

@humiao8sz

What version of gRPC-Java are you using?

1.34.1

What is your environment?

Windows8 jdk1.8.0_202

I have two server ,and they both have same two grpc method(sayOne,sayTwo) ,
when server1 call the server2's sayOne method with FutureStub,
and in server2's sayOne method,it call server1's sayTwo method with FutureStub,
the StreamException happen.
Server1 has the service

	public static ManagedChannel channel = ManagedChannelBuilder.forTarget("192.168.10.1:50052")
		.idleTimeout(60, TimeUnit.MINUTES).usePlaintext().build();

	@Override
	public void sayOne(HelloRequest req, StreamObserver<HelloReply> responseObserver) {
		System.out.println("sayOnesayOnesayOnesayOnesayOnesayOnesayOnesayOnesayOnesayOne ==== ");
		
		HelloRequest request = HelloRequest.newBuilder().setName(""+new Random().nextInt(100000000)).build();
		GreeterFutureStub futureStub = GreeterGrpc.newFutureStub(channel);
		futureStub.sayTwo(request);
		
		HelloReply reply = HelloReply.newBuilder().setMessage("Hello " + req.getName()).build();
		responseObserver.onNext(reply);
		responseObserver.onCompleted();
	}


	@Override
	public void sayTwo(HelloRequest req, StreamObserver<HelloReply> responseObserver) {
		System.out.println("sayTwosayTwosayTwosayTwosayTwosayTwosayTwosayTwosayTwosayTwo");
				
		HelloReply reply = HelloReply.newBuilder().setMessage("Hello " + req.getName()).build();
		responseObserver.onNext(reply);
		responseObserver.onCompleted();
	}

Server2 has the service

	public static ManagedChannel channel = ManagedChannelBuilder.forTarget("192.168.10.2:50051")
		.idleTimeout(60, TimeUnit.MINUTES).usePlaintext().build();
		
	@Override
	public void sayOne(HelloRequest req, StreamObserver<HelloReply> responseObserver) {
		System.out.println("sayOnesayOnesayOnesayOnesayOnesayOnesayOnesayOnesayOnesayOne ==== ");
		
		HelloRequest request = HelloRequest.newBuilder().setName(""+new Random().nextInt(100000000)).build();
		GreeterFutureStub futureStub = GreeterGrpc.newFutureStub(channel);
		futureStub.sayTwo(request);
		
		HelloReply reply = HelloReply.newBuilder().setMessage("Hello " + req.getName()).build();
		responseObserver.onNext(reply);
		responseObserver.onCompleted();
	}


	@Override
	public void sayTwo(HelloRequest req, StreamObserver<HelloReply> responseObserver) {
		System.out.println("sayTwosayTwosayTwosayTwosayTwosayTwosayTwosayTwosayTwosayTwo");
				
		HelloReply reply = HelloReply.newBuilder().setMessage("Hello " + req.getName()).build();
		responseObserver.onNext(reply);
		responseObserver.onCompleted();
	}	

run the code in server 1

private void start() throws IOException {
	/* The port on which the server should run */
	int port = 50051;
	server = ServerBuilder.forPort(port)
			.addService(new GreeterImpl())
			//.executor(GreeterImpl.threadPool)
			.build().start();
	System.out.println("Server started, listening on " + port);
	Runtime.getRuntime().addShutdownHook(new Thread() {
		@Override
		public void run() {
			// Use stderr here since the logger may have been reset by its JVM shutdown hook.
			System.err.println("*** shutting down gRPC server since JVM is shutting down");
			try{
				HelloWorldServer2.this.stop();
			}catch(InterruptedException e){
				e.printStackTrace(System.err);
			}
			System.err.println("*** server shut down");
		}
	});
}

/**
 * Main launches the server from the command line.
 */
public static void main(String[] args) throws IOException, InterruptedException {
	final HelloWorldServer2 server = new HelloWorldServer2();
	server.start();
	new Thread(new Runnable() {
		public void run() {
			while(true){
				HelloRequest request = HelloRequest.newBuilder().setName(""+new Random().nextInt(100000000)).build();
				GreeterFutureStub futureStub = GreeterGrpc.newFutureStub(channel);
				futureStub.sayOne(request);
				try{
					Thread.sleep(10L);
				}catch(InterruptedException e){
					e.printStackTrace();
				}
				System.out.println("11111111111");
			}
		}
	}).start();
}

Then

警告: Stream Error
io.grpc.netty.shaded.io.netty.handler.codec.http2.Http2Exception$StreamException: Received DATA frame for an unknown stream 183
	at io.grpc.netty.shaded.io.netty.handler.codec.http2.Http2Exception.streamError(Http2Exception.java:147)
	at io.grpc.netty.shaded.io.netty.handler.codec.http2.DefaultHttp2ConnectionDecoder$FrameReadListener.shouldIgnoreHeadersOrDataFrame(DefaultHttp2ConnectionDecoder.java:596)
	at io.grpc.netty.shaded.io.netty.handler.codec.http2.DefaultHttp2ConnectionDecoder$FrameReadListener.onDataRead(DefaultHttp2ConnectionDecoder.java:239)
	at io.grpc.netty.shaded.io.netty.handler.codec.http2.Http2InboundFrameLogger$1.onDataRead(Http2InboundFrameLogger.java:48)
	at io.grpc.netty.shaded.io.netty.handler.codec.http2.DefaultHttp2FrameReader.readDataFrame(DefaultHttp2FrameReader.java:422)
	at io.grpc.netty.shaded.io.netty.handler.codec.http2.DefaultHttp2FrameReader.processPayloadState(DefaultHttp2FrameReader.java:251)
	at io.grpc.netty.shaded.io.netty.handler.codec.http2.DefaultHttp2FrameReader.readFrame(DefaultHttp2FrameReader.java:160)
	at io.grpc.netty.shaded.io.netty.handler.codec.http2.Http2InboundFrameLogger.readFrame(Http2InboundFrameLogger.java:41)
	at io.grpc.netty.shaded.io.netty.handler.codec.http2.DefaultHttp2ConnectionDecoder.decodeFrame(DefaultHttp2ConnectionDecoder.java:174)
	at io.grpc.netty.shaded.io.netty.handler.codec.http2.Http2ConnectionHandler$FrameDecoder.decode(Http2ConnectionHandler.java:378)
	at io.grpc.netty.shaded.io.netty.handler.codec.http2.Http2ConnectionHandler.decode(Http2ConnectionHandler.java:438)
	at io.grpc.netty.shaded.io.netty.handler.codec.ByteToMessageDecoder.decodeRemovalReentryProtection(ByteToMessageDecoder.java:501)
	at io.grpc.netty.shaded.io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:440)
	at io.grpc.netty.shaded.io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:276)
	at io.grpc.netty.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
	at io.grpc.netty.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
	at io.grpc.netty.shaded.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
	at io.grpc.netty.shaded.io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
	at io.grpc.netty.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
	at io.grpc.netty.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
	at io.grpc.netty.shaded.io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
	at io.grpc.netty.shaded.io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:163)
	at io.grpc.netty.shaded.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:714)
	at io.grpc.netty.shaded.io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:650)
	at io.grpc.netty.shaded.io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:576)
	at io.grpc.netty.shaded.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:493)
	at io.grpc.netty.shaded.io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
	at io.grpc.netty.shaded.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
	at io.grpc.netty.shaded.io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
	at java.lang.Thread.run(Thread.java:745)

Metadata

Metadata

Assignees

Labels

Type

No type

Projects

No projects

Relationships

None yet

Development

No branches or pull requests

Issue actions