diff --git a/.github/workflows/rabbitmq-oci.yaml b/.github/workflows/rabbitmq-oci.yaml index 6c746459..31335cab 100644 --- a/.github/workflows/rabbitmq-oci.yaml +++ b/.github/workflows/rabbitmq-oci.yaml @@ -104,10 +104,10 @@ jobs: path: ${{ steps.resolve-artifact-path.outputs.ARTIFACT_PATH }} - name: Set up Docker Buildx - uses: docker/setup-buildx-action@v1 + uses: docker/setup-buildx-action@v3 - name: Cache Docker layers - uses: actions/cache@v2 + uses: actions/cache@v3 with: path: /tmp/.buildx-cache key: ${{ runner.os }}-${{ matrix.image_tag_suffix }}-buildx-${{ github.event.pull_request.head.sha || github.sha }} @@ -125,7 +125,7 @@ jobs: - name: Login to DockerHub if: steps.authorized.outputs.PUSH == 'true' - uses: docker/login-action@v1 + uses: docker/login-action@v3 with: username: ${{ secrets.DOCKERHUB_USERNAME }} password: ${{ secrets.DOCKERHUB_PASSWORD }} @@ -160,7 +160,7 @@ jobs: echo "::set-output name=TAG_4::${TAG_4}" - name: Build and push - uses: docker/build-push-action@v2 + uses: docker/build-push-action@v5 with: context: rabbitmq-server/packaging/docker-image pull: true diff --git a/src/osiris_log.erl b/src/osiris_log.erl index 4ca19ad8..daa6bc3c 100644 --- a/src/osiris_log.erl +++ b/src/osiris_log.erl @@ -53,7 +53,9 @@ make_counter/1]). -export([dump_init/1, + dump_init_idx/1, dump_chunk/1, + dump_index/1, dump_crc_check/1]). %% for testing -export([ @@ -597,10 +599,12 @@ init(#{dir := Dir, %% the empty log case {ok, SegFd} = open(Filename, ?FILE_OPTS_WRITE), {ok, IdxFd} = open(IdxFilename, ?FILE_OPTS_WRITE), - %% TODO: do we potentially need to truncate the segment - %% here too? - {ok, _} = file:position(SegFd, eof), - {ok, _} = file:position(IdxFd, eof), + {ok, _} = file:position(SegFd, ?LOG_HEADER_SIZE), + %% the segment could potentially have trailing data here so we'll + %% do a truncate just in case. The index would have been truncated + %% earlier + ok = file:truncate(SegFd), + {ok, _} = file:position(IdxFd, ?IDX_HEADER_SIZE), osiris_log_shared:set_first_chunk_id(Shared, DefaultNextOffset - 1), #?MODULE{cfg = Cfg, mode = @@ -3029,6 +3033,29 @@ dump_init(File) -> {ok, <<"OSIL", _V:4/binary>> } = file:read(Fd, ?LOG_HEADER_SIZE), Fd. +dump_init_idx(File) -> + {ok, Fd} = file:open(File, [raw, binary, read]), + {ok, <<"OSII", _V:4/binary>> } = file:read(Fd, ?IDX_HEADER_SIZE), + Fd. + +dump_index(Fd) -> + case file:read(Fd, ?INDEX_RECORD_SIZE_B) of + {ok, + <>} -> + #{chunk_id => ChunkId, + timestamp => Timestamp, + epoch => Epoch, + file_pos => FilePos, + type => ChType}; + Err -> + Err + end. + + dump_chunk(Fd) -> {ok, Pos} = file:position(Fd, cur), diff --git a/test/osiris_SUITE.erl b/test/osiris_SUITE.erl index bd30301f..8af2571b 100644 --- a/test/osiris_SUITE.erl +++ b/test/osiris_SUITE.erl @@ -31,6 +31,7 @@ all() -> all_tests() -> [single_node_write, + single_node_write_sub_batch_restart, single_node_uncorrelated_write, cluster_write_replication_plain, cluster_write_replication_tls, @@ -178,6 +179,49 @@ single_node_write(Config) -> ?assertEqual(42, osiris:fetch_writer_seq(Leader, Wid)), ok. +single_node_write_sub_batch_restart(Config) -> + Name = ?config(cluster_name, Config), + Dir = ?config(priv_dir, Config), + Conf0 = + #{name => Name, + reference => Name, + epoch => 1, + leader_node => node(), + replica_nodes => [], + tracking_max_writers => 255, + dir => Dir}, + SDir = filename:join(Dir, Name), + {ok, #{leader_pid := Leader}} = osiris:start_cluster(Conf0), + Entries = [simple(<<"abcdefghikjlmn">>) || _ <- lists:seq(1, 11)], + Batch = {batch, 11, 0, iolist_size(Entries), Entries}, + ok = osiris:write(Leader, undefined, 42, Batch), + receive + {osiris_written, _Name, _WriterId, [42]} -> + ok = osiris_writer:stop(Conf0), + %% simulate data loss of first chunk, only header remains + truncate(filename:join(SDir, "00000000000000000000.segment"), 56), + {ok, Leader1} = osiris_writer:start(Conf0#{epoch => 3}), + osiris_writer:read_tracking(Leader1), + ok = osiris:write(Leader1, undefined, 43, Batch), + receive + {osiris_written, _, _, [43]} -> + ok = osiris_writer:stop(Conf0), + {ok, Leader2} = osiris_writer:start(Conf0#{epoch => 4}), + %% read tracking to ensure write is actually running ok + #{} = osiris_writer:read_tracking(Leader2), + ok + after 2000 -> + flush(), + exit(osiris_written_timeout) + end, + timer:sleep(1000), + ok + after 2000 -> + flush(), + exit(osiris_written_timeout) + end, + ok. + single_node_uncorrelated_write(Config) -> Name = ?config(cluster_name, Config), Conf0 = @@ -2113,3 +2157,9 @@ wildcard(Wc) when is_list(Wc) -> wildcard(Wc) -> wildcard(unicode:characters_to_list(Wc)). +truncate(File, Sz) -> + {ok, Fd} = file:open(File, [raw, binary, read, write]), + {ok, _} = file:position(Fd, Sz), + ok = file:truncate(Fd), + ok = file:close(Fd), + ok.