diff --git a/lib/stream-chat/channel_batch_updater.rb b/lib/stream-chat/channel_batch_updater.rb new file mode 100644 index 0000000..bfecf04 --- /dev/null +++ b/lib/stream-chat/channel_batch_updater.rb @@ -0,0 +1,214 @@ +# typed: strict +# frozen_string_literal: true + +require 'stream-chat/client' +require 'stream-chat/stream_response' +require 'stream-chat/types' + +module StreamChat + class ChannelBatchUpdater + extend T::Sig + + sig { params(client: StreamChat::Client).void } + def initialize(client) + @client = client + end + + # Member operations + + # addMembers - Add members to channels matching the filter + # @param filter [StringKeyHash] Filter to select channels + # @param members [T.any(T::Array[String], T::Array[StringKeyHash])] Members to add + # @return [StreamChat::StreamResponse] The server response + sig { params(filter: StringKeyHash, members: T.any(T::Array[String], T::Array[StringKeyHash])).returns(StreamChat::StreamResponse) } + def add_members(filter, members) + @client.update_channels_batch( + { + operation: 'addMembers', + filter: filter, + members: members + } + ) + end + + # removeMembers - Remove members from channels matching the filter + # @param filter [StringKeyHash] Filter to select channels + # @param members [T::Array[StringKeyHash]] Members to remove (each with user_id key) + # @return [StreamChat::StreamResponse] The server response + sig { params(filter: StringKeyHash, members: T::Array[StringKeyHash]).returns(StreamChat::StreamResponse) } + def remove_members(filter, members) + @client.update_channels_batch( + { + operation: 'removeMembers', + filter: filter, + members: members + } + ) + end + + # inviteMembers - Invite members to channels matching the filter + # @param filter [StringKeyHash] Filter to select channels + # @param members [T.any(T::Array[String], T::Array[StringKeyHash])] Members to invite + # @return [StreamChat::StreamResponse] The server response + sig { params(filter: StringKeyHash, members: T.any(T::Array[String], T::Array[StringKeyHash])).returns(StreamChat::StreamResponse) } + def invite_members(filter, members) + @client.update_channels_batch( + { + operation: 'invites', + filter: filter, + members: members + } + ) + end + + # addModerators - Add moderators to channels matching the filter + # @param filter [StringKeyHash] Filter to select channels + # @param members [T::Array[String]] Member IDs to promote to moderator + # @return [StreamChat::StreamResponse] The server response + sig { params(filter: StringKeyHash, members: T::Array[String]).returns(StreamChat::StreamResponse) } + def add_moderators(filter, members) + @client.update_channels_batch( + { + operation: 'addModerators', + filter: filter, + members: members + } + ) + end + + # demoteModerators - Remove moderator role from members in channels matching the filter + # @param filter [StringKeyHash] Filter to select channels + # @param members [T::Array[String]] Member IDs to demote + # @return [StreamChat::StreamResponse] The server response + sig { params(filter: StringKeyHash, members: T::Array[String]).returns(StreamChat::StreamResponse) } + def demote_moderators(filter, members) + @client.update_channels_batch( + { + operation: 'demoteModerators', + filter: filter, + members: members + } + ) + end + + # assignRoles - Assign roles to members in channels matching the filter + # @param filter [StringKeyHash] Filter to select channels + # @param members [T::Array[StringKeyHash]] Members with role assignments + # @return [StreamChat::StreamResponse] The server response + sig { params(filter: StringKeyHash, members: T::Array[StringKeyHash]).returns(StreamChat::StreamResponse) } + def assign_roles(filter, members) + @client.update_channels_batch( + { + operation: 'assignRoles', + filter: filter, + members: members + } + ) + end + + # Visibility operations + + # hide - Hide channels matching the filter + # @param filter [StringKeyHash] Filter to select channels + # @return [StreamChat::StreamResponse] The server response + sig { params(filter: StringKeyHash).returns(StreamChat::StreamResponse) } + def hide(filter) + @client.update_channels_batch( + { + operation: 'hide', + filter: filter + } + ) + end + + # show - Show channels matching the filter + # @param filter [StringKeyHash] Filter to select channels + # @return [StreamChat::StreamResponse] The server response + sig { params(filter: StringKeyHash).returns(StreamChat::StreamResponse) } + def show(filter) + @client.update_channels_batch( + { + operation: 'show', + filter: filter + } + ) + end + + # archive - Archive channels matching the filter for specified members + # @param filter [StringKeyHash] Filter to select channels + # @param members [T::Array[StringKeyHash]] Members to archive channels for (each with user_id key) + # @return [StreamChat::StreamResponse] The server response + sig { params(filter: StringKeyHash, members: T::Array[StringKeyHash]).returns(StreamChat::StreamResponse) } + def archive(filter, members) + @client.update_channels_batch( + { + operation: 'archive', + filter: filter, + members: members + } + ) + end + + # unarchive - Unarchive channels matching the filter for specified members + # @param filter [StringKeyHash] Filter to select channels + # @param members [T::Array[StringKeyHash]] Members to unarchive channels for (each with user_id key) + # @return [StreamChat::StreamResponse] The server response + sig { params(filter: StringKeyHash, members: T::Array[StringKeyHash]).returns(StreamChat::StreamResponse) } + def unarchive(filter, members) + @client.update_channels_batch( + { + operation: 'unarchive', + filter: filter, + members: members + } + ) + end + + # Data operations + + # updateData - Update data on channels matching the filter + # @param filter [StringKeyHash] Filter to select channels + # @param data [StringKeyHash] Data to update + # @return [StreamChat::StreamResponse] The server response + sig { params(filter: StringKeyHash, data: StringKeyHash).returns(StreamChat::StreamResponse) } + def update_data(filter, data) + @client.update_channels_batch( + { + operation: 'updateData', + filter: filter, + data: data + } + ) + end + + # addFilterTags - Add filter tags to channels matching the filter + # @param filter [StringKeyHash] Filter to select channels + # @param tags [T::Array[String]] Tags to add + # @return [StreamChat::StreamResponse] The server response + sig { params(filter: StringKeyHash, tags: T::Array[String]).returns(StreamChat::StreamResponse) } + def add_filter_tags(filter, tags) + @client.update_channels_batch( + { + operation: 'addFilterTags', + filter: filter, + filter_tags_update: tags + } + ) + end + + # removeFilterTags - Remove filter tags from channels matching the filter + # @param filter [StringKeyHash] Filter to select channels + # @param tags [T::Array[String]] Tags to remove + # @return [StreamChat::StreamResponse] The server response + sig { params(filter: StringKeyHash, tags: T::Array[String]).returns(StreamChat::StreamResponse) } + def remove_filter_tags(filter, tags) + @client.update_channels_batch( + { + operation: 'removeFilterTags', + filter: filter, + filter_tags_update: tags + } + ) + end + end +end diff --git a/lib/stream-chat/client.rb b/lib/stream-chat/client.rb index 642872a..494e38f 100644 --- a/lib/stream-chat/client.rb +++ b/lib/stream-chat/client.rb @@ -17,6 +17,7 @@ require 'stream-chat/util' require 'stream-chat/types' require 'stream-chat/moderation' +require 'stream-chat/channel_batch_updater' module StreamChat DEFAULT_BLOCKLIST = 'profanity_en_2020_v1' @@ -1170,6 +1171,21 @@ def mark_delivered(data = nil, user_id: nil) post('channels/delivered', data: data || {}, params: { user_id: user_id }) end + # Update channels in batch. + # @param payload [StringKeyHash] Payload containing operation, filter, and optional members/data/filter_tags_update + # @return [StreamChat::StreamResponse] API response + sig { params(payload: StringKeyHash).returns(StreamChat::StreamResponse) } + def update_channels_batch(payload) + put('channels/batch', data: payload) + end + + # Returns a ChannelBatchUpdater instance for batch channel operations. + # @return [StreamChat::ChannelBatchUpdater] A ChannelBatchUpdater instance + sig { returns(StreamChat::ChannelBatchUpdater) } + def channel_batch_updater + ChannelBatchUpdater.new(self) + end + private sig { returns(T::Hash[String, String]) } diff --git a/spec/channel_batch_updater_spec.rb b/spec/channel_batch_updater_spec.rb new file mode 100644 index 0000000..9f1c611 --- /dev/null +++ b/spec/channel_batch_updater_spec.rb @@ -0,0 +1,249 @@ +# frozen_string_literal: true + +require 'securerandom' +require 'stream-chat' + +describe StreamChat::ChannelBatchUpdater do + def loop_times(times) + loop do + begin + yield() + return + rescue StandardError, RSpec::Expectations::ExpectationNotMetError + raise if times.zero? + end + + sleep(1) + times -= 1 + end + end + + def rate_limit_error?(task) + result = task['result'] + return false unless result.is_a?(Hash) + + description = result['description'] + return false unless description.is_a?(String) + + description.downcase.include?('rate limit') + end + + def fetch_task_with_retry(task_id, attempt) + @client.get_task(task_id) + rescue StandardError => e + raise e if attempt >= 10 + + sleep(1) + nil + end + + def wait_for_task(task_id, timeout_seconds: 120) + sleep(2) # Initial delay + + timeout_seconds.times do |i| + task = fetch_task_with_retry(task_id, i) + next if task.nil? + + expect(task['task_id']).to eq(task_id) + + case task['status'] + when 'waiting', 'pending', 'running' + sleep(1) + when 'completed' + return task + when 'failed' + # If result is empty, continue polling (matches Go behavior) + result = task['result'] + if result.nil? || (result.is_a?(Hash) && result.empty?) + sleep(2) + elsif rate_limit_error?(task) + sleep(2) + else + raise "Task failed with result: #{task['result']}" + end + end + end + + raise "Task did not complete within #{timeout_seconds} seconds" + end + + before(:all) do + @client = StreamChat::Client.from_env + @created_users = [] + end + + before(:each) do + @random_users = [{ id: SecureRandom.uuid, name: 'user1' }, { id: SecureRandom.uuid, name: 'user2' }] + @random_user = { id: SecureRandom.uuid } + + users_to_insert = [@random_users[0], @random_users[1], @random_user] + + @created_users.push(*users_to_insert.map { |u| u[:id] }) + @client.upsert_users(users_to_insert) + + @channel1 = @client.channel('messaging', channel_id: SecureRandom.uuid, data: { test: true }) + @channel1.create(@random_user[:id]) + + @channel2 = @client.channel('messaging', channel_id: SecureRandom.uuid, data: { test: true }) + @channel2.create(@random_user[:id]) + end + + after(:each) do + @channel1&.delete + rescue StreamChat::StreamAPIException + # Ignore if channel already deleted + ensure + begin + @channel2&.delete + rescue StreamChat::StreamAPIException + # Ignore if channel already deleted + end + end + + after(:all) do + curr_idx = 0 + batch_size = 25 + + slice = @created_users.slice(0, batch_size) + + while !slice.nil? && !slice.empty? + @client.delete_users(slice, user: StreamChat::HARD_DELETE, messages: StreamChat::HARD_DELETE) + + curr_idx += batch_size + slice = @created_users.slice(curr_idx, batch_size) + end + end + + describe 'Client#update_channels_batch' do + it 'returns error if options is empty' do + expect { @client.update_channels_batch({}) }.to raise_error(StreamChat::StreamAPIException) + end + + it 'batch updates channels with valid options' do + response = @client.update_channels_batch( + { + operation: 'addMembers', + filter: { cid: { '$in' => [@channel1.cid, @channel2.cid] } }, + members: [{ 'user_id' => @random_users[0][:id] }] + } + ) + + expect(response['task_id']).not_to be_empty + end + end + + describe 'ChannelBatchUpdater#add_members' do + it 'adds members to channels matching filter' do + updater = @client.channel_batch_updater + + member_ids = @random_users.map { |u| u[:id] } + members = member_ids.map { |id| { 'user_id' => id } } + response = updater.add_members( + { cid: { '$in' => [@channel1.cid, @channel2.cid] } }, + members + ) + + expect(response['task_id']).not_to be_empty + task_id = response['task_id'] + + wait_for_task(task_id) + + # Verify members were added + loop_times(120) do + ch1_state = @channel1.query + ch1_member_ids = ch1_state['members'].map { |m| m['user_id'] } + + member_ids.each do |member_id| + expect(ch1_member_ids).to include(member_id) + end + end + end + end + + describe 'ChannelBatchUpdater#remove_members' do + it 'removes members from channels matching filter' do + # First add both users as members to both channels + members_to_add = @random_users.map { |u| u[:id] } + @channel1.add_members(members_to_add) + @channel2.add_members(members_to_add) + + # Verify members were added + loop_times(60) do + ch1_state = @channel1.query + expect(ch1_state['members'].length).to eq(2) + + ch2_state = @channel2.query + expect(ch2_state['members'].length).to eq(2) + end + + # Verify member IDs match + ch1_state = @channel1.query + ch1_member_ids = ch1_state['members'].map { |m| m['user_id'] } + expect(ch1_member_ids).to match_array(members_to_add) + + ch2_state = @channel2.query + ch2_member_ids = ch2_state['members'].map { |m| m['user_id'] } + expect(ch2_member_ids).to match_array(members_to_add) + + # Now remove one member using batch updater + updater = @client.channel_batch_updater + member_to_remove = members_to_add[0] + + response = updater.remove_members( + { cid: { '$in' => [@channel1.cid, @channel2.cid] } }, + [{ 'user_id' => member_to_remove }] + ) + + expect(response['task_id']).not_to be_empty + task_id = response['task_id'] + + wait_for_task(task_id) + + # Verify member was removed + loop_times(120) do + ch1_state = @channel1.query + ch1_member_ids = ch1_state['members'].map { |m| m['user_id'] } + + expect(ch1_member_ids).not_to include(member_to_remove) + end + end + end + + describe 'ChannelBatchUpdater#archive' do + it 'archives channels for specified members' do + # First add both users as members to both channels + members_to_add = @random_users.map { |u| u[:id] } + @channel1.add_members(members_to_add) + @channel2.add_members(members_to_add) + + # Wait for members to be added + loop_times(60) do + ch1_state = @channel1.query + expect(ch1_state['members'].length).to eq(2) + end + + # Archive channels for one member + updater = @client.channel_batch_updater + member_to_archive = members_to_add[0] + + response = updater.archive( + { cid: { '$in' => [@channel1.cid, @channel2.cid] } }, + [{ 'user_id' => member_to_archive }] + ) + + expect(response['task_id']).not_to be_empty + task_id = response['task_id'] + + wait_for_task(task_id) + + # Verify archived_at is set for the member + loop_times(120) do + ch1_state = @channel1.query + member = ch1_state['members'].find { |m| m['user_id'] == member_to_archive } + + expect(member).not_to be_nil + expect(member['archived_at']).not_to be_nil + end + end + end +end