Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -30,31 +30,6 @@ api_service_port=17070
### Task Runtime Configuration
####################

# The number of concurrent threads for the source task.
# Effective mode: on every start
# Data type: int
task_source_parallelism_num=4

# The number of concurrent threads for the process task.
# Effective mode: on every start
# Data type: int
task_process_parallelism_num=4

# The number of concurrent threads for the sink task.
# Effective mode: on every start
# Data type: int
task_sink_parallelism_num=4

# The ring buffer size for the processor task.
# Effective mode: on every start
# Data type: int
task_processor_ring_buffer_size=1024

# The ring buffer size for the sink task.
# Effective mode: on every start
# Data type: int
task_sink_ring_buffer_size=1024

# Database file location of task
# Effective mode: on every start
# Data type: string
Expand All @@ -63,7 +38,7 @@ task_database_file_path=system/database/task.db
# Proactively triggers the interval for batch deliveries
# Effective mode: on every start
# Data type: long
executor_cron_heartbeat_event_interval_seconds=20
executor_cron_heartbeat_event_interval_seconds=5

# Task progress storage interval
# Effective mode: on every start
Expand Down Expand Up @@ -93,67 +68,13 @@ plugin_database_file_path=system/database/plugin.db
### Pipe Configuration
####################

# The total bytes that all pipe sinks can transfer per second.
# When given a value less than or equal to 0, it means no limit.
# default value is -1, which means no limit.
# Effective mode: on every start
# Data type: double
pipe_all_sinks_rate_limit_bytes_per_second=-1

# Rate limiter configuration interval in milliseconds for hot reloading
# Effective mode: on every start
# Data type: int
rate_limiter_hot_reload_check_interval_ms=1000

# Maximum number of retry attempts for operations
# Effective mode: on every start
# Data type: int
max_retry_times=5

# Used for connection of IoTDB native clients
# Bind with rpc_address
# Effective mode: on every start
# Data type: int
rpc_port=6667

# Used for connection of IoTDB native clients(Session)
# Could set 127.0.0.1(for local test) or ipv4 address
# Effective mode: on every start
# Data type: String
rpc_address=0.0.0.0

# Buffer size for reading files in pipe connector (8MB default)
# Effective mode: on every start
# Data type: int
pipe_connector_read_file_buffer_size=8388608

# Timeout duration for pipe connector data transfer in milliseconds
# Effective mode: on every start
# Data type: int
pipe_connector_transfer_timeout_ms=900000

# Maximum allowed frame size for Thrift communication
# Effective mode: on every start
# Data type: int
thrift_frame_max_size=536870912

# Enable/disable thrift compression for pipe connector RPC
# Effective mode: on every start
# Data type: boolean
is_pipe_connector_rpc_thrift_compression_enabled=false

# Use this value to set timestamp precision as "ms", "us" or "ns".
# Once the precision has been set, it can not be changed.
# Effective mode: on every start
# Data type: string
timestamp_precision=ms

# Memory allocation ratio for pipe leader cache management
# The maximum number of tablets that can be in a batch
# Effective mode: on every start
# Data type: float
pipe_leader_cache_memory_usage_percentage=0.1

# Enable/disable reference tracking for pipe events
# Effective mode: on every start
# Data type: boolean
pipe_event_reference_tracking_enabled=true
# Data type: int
pipe_max_allowed_event_count_in_tablet_batch=100
Original file line number Diff line number Diff line change
Expand Up @@ -19,50 +19,7 @@

package org.apache.iotdb.collector.config;

import org.apache.iotdb.rpc.RpcUtils;

import java.util.concurrent.TimeUnit;

public class PipeRuntimeOptions extends Options {
public static final Option<Double> PIPE_ALL_SINK_RATE_LIMIT_BYTES_PER_SECOND =
new Option<Double>("pipe_all_sinks_rate_limit_bytes_per_second", -1d) {
@Override
public void setValue(final String valueString) {
value = Double.parseDouble(valueString);
}
};

public static final Option<Integer> RATE_LIMITER_HOT_RELOAD_CHECK_INTERVAL_MS =
new Option<Integer>("rate_limiter_hot_reload_check_interval_ms", 1000) {
@Override
public void setValue(final String valueString) {
value = Integer.parseInt(valueString);
}
};

public static final Option<Integer> MAX_RETRY_TIMES =
new Option<Integer>("max_retry_times", 5) {
@Override
public void setValue(final String valueString) {
value = Integer.parseInt(valueString);
}
};

public static final Option<Integer> RPC_PORT =
new Option<Integer>("rpc_port", 6667) {
@Override
public void setValue(final String valueString) {
value = Integer.parseInt(valueString);
}
};

public static final Option<String> RPC_ADDRESS =
new Option<String>("rpc_address", "0.0.0.0") {
@Override
public void setValue(final String valueString) {
value = valueString;
}
};

public static final Option<Integer> PIPE_CONNECTOR_READ_FILE_BUFFER_SIZE =
new Option<Integer>("pipe_connector_read_file_buffer_size", 8388608) {
Expand All @@ -88,8 +45,8 @@ public void setValue(final String valueString) {
}
};

public static final Option<Boolean> IS_PIPE_CONNECTOR_RPC_THRIFT_COMPRESSION_ENABLED =
new Option<Boolean>("is_pipe_connector_rpc_thrift_compression_enabled", false) {
public static final Option<Boolean> PIPE_CONNECTOR_RPC_THRIFT_COMPRESSION_ENABLED =
new Option<Boolean>("pipe_connector_rpc_thrift_compression_enabled", false) {
@Override
public void setValue(final String valueString) {
value = Boolean.parseBoolean(valueString);
Expand Down Expand Up @@ -120,156 +77,87 @@ public void setValue(String valueString) {
}
};

public static final Option<Boolean> PIPE_EVENT_REFERENCE_TRACKING_ENABLED =
new Option<Boolean>("pipe_event_reference_tracking_enabled", true) {
@Override
public void setValue(String valueString) {
value = Boolean.parseBoolean(valueString);
}
};

public static volatile Option<Long> PIPE_CHECK_MEMORY_ENOUGH_INTERVAL_MS =
new Option<Long>("pipe_check_memory_enough_interval_ms", 10L) {
@Override
public void setValue(final String valueString) {
value = Long.parseLong(valueString);
}
};

public static final Option<Boolean> PIPE_CONNECTOR_READ_FILE_BUFFER_MEMORY_CONTROL =
new Option<Boolean>("pipe_connector_read_file_buffer_memory_control", false) {
@Override
public void setValue(final String valueString) {
value = Boolean.parseBoolean(valueString);
}
};

public static final Option<Integer> PIPE_SUBTASK_EXECUTOR_MAX_THREAD_NUM =
new Option<Integer>(
"pipe_subtask_executor_max_thread_num",
Math.max(5, Runtime.getRuntime().availableProcessors() / 2)) {
@Override
public void setValue(final String valueString) {
value = Integer.parseInt(valueString);
}
};

public static final Option<Integer> PIPE_DATA_STRUCTURE_TABLET_SIZE_IN_BYTES =
new Option<Integer>("pipe_data_structure_tablet_size_in_bytes", 2097152) {
@Override
public void setValue(final String valueString) {
value = Integer.parseInt(valueString);
}
};

public static final Option<Integer> PIPE_DATA_STRUCTURE_TABLET_ROW_SIZE =
new Option<Integer>("pipe_data_structure_tablet_row_size", 2048) {
public static final Option<Integer> PIPE_CONNECTOR_HANDSHAKE_TIMEOUT_MS =
new Option<Integer>("pipe_connector_handshake_timeout_ms", 10 * 1000) {
@Override
public void setValue(final String valueString) {
value = Integer.parseInt(valueString);
}
};

public static final Option<Boolean> PIPE_AIR_GAP_RECEIVER_ENABLED =
new Option<Boolean>("pipe_air_gap_receiver_enabled", true) {

@Override
public void setValue(final String valueString) {
value = Boolean.parseBoolean(valueString);
}
};

public static final Option<Integer> PIPE_AIR_GAP_RECEIVER_PORT =
new Option<Integer>("pipe_air_gap_receiver_port", 9780) {
public static final Option<Integer> DATA_NODE_ID =
new Option<Integer>("data_node_id", -1) {
@Override
public void setValue(final String valueString) {
value = Integer.parseInt(valueString);
}
};

public static final Option<Long> PIPE_SUBTASK_EXECUTOR_PENDING_QUEUE_MAX_BLOCKING_TIME_MS =
new Option<Long>("pipe_subtask_executor_pending_queue_max_blocking_time_ms", 1000L) {
@Override
public void setValue(final String valueString) {
value = Long.parseLong(valueString);
}
};

public static final Option<Boolean> TIMESTAMP_PRECISION_CHECK_ENABLED =
new Option<Boolean>("timestamp_precision_check_enabled", true) {
public static final Option<Boolean> PIPE_MEMORY_MANAGEMENT_ENABLED =
new Option<Boolean>("pipe_memory_management_enabled", true) {
@Override
public void setValue(final String valueString) {
public void setValue(String valueString) {
value = Boolean.parseBoolean(valueString);
}
};

public static final Option<Integer> DN_CONNECTION_TIMEOUT_IN_MS =
new Option<Integer>("dn_connection_timeout_in_ms", (int) TimeUnit.SECONDS.toMillis(60)) {
public static final Option<Integer> PIPE_MEMORY_ALLOCATE_MIN_SIZE_IN_BYTES =
new Option<Integer>("pipe_memory_allocate_min_size_in_bytes", 32) {
@Override
public void setValue(final String valueString) {
public void setValue(String valueString) {
value = Integer.parseInt(valueString);
}
};

public static final Option<Boolean> IS_RPC_THRIFT_COMPRESSION_ENABLED =
new Option<Boolean>("is_rpc_thrift_compression_enabled", false) {
public static final Option<Double> PIPE_TOTAL_FLOATING_MEMORY_PROPORTION =
new Option<Double>("pipe_total_floating_memory_proportion", 0.2) {
@Override
public void setValue(final String valueString) {
value = Boolean.parseBoolean(valueString);
public void setValue(String valueString) {
value = Double.parseDouble(valueString);
}
};

public static final Option<Integer> PIPE_CONNECTOR_REQUEST_SLICE_THRESHOLD_BYTES =
new Option<Integer>(
"pipe_connector_request_slice_threshold_bytes",
(int) (RpcUtils.THRIFT_FRAME_MAX_SIZE * 0.8)) {
public static final Option<Integer> PIPE_MAX_ALLOWED_EVENT_COUNT_IN_TABLET_BATCH =
new Option<Integer>("pipe_max_allowed_event_count_in_tablet_batch", 100) {
@Override
public void setValue(final String valueString) {
public void setValue(String valueString) {
value = Integer.parseInt(valueString);
}
};

public static final Option<Integer> PIPE_CONNECTOR_HANDSHAKE_TIMEOUT_MS =
new Option<Integer>("pipe_connector_handshake_timeout_ms", 10 * 1000) {
public static final Option<Integer> PIPE_MEMORY_ALLOCATE_MAX_RETRIES =
new Option<Integer>("pipe_memory_allocate_max_retries", 10) {
@Override
public void setValue(final String valueString) {
public void setValue(String valueString) {
value = Integer.parseInt(valueString);
}
};

public static final Option<Boolean> PIPE_CONNECTOR_RPC_THRIFT_COMPRESSION_ENABLED =
new Option<Boolean>("pipe_connector_rpc_thrift_compression_enabled", false) {
@Override
public void setValue(final String valueString) {
value = Boolean.parseBoolean(valueString);
}
};

public static final Option<Integer> PIPE_ASYNC_CONNECTOR_SELECTOR_NUMBER =
new Option<Integer>(
"pipe_async_connector_selector_number",
Math.max(4, Runtime.getRuntime().availableProcessors() / 2)) {
public static final Option<Integer> PIPE_MEMORY_ALLOCATE_RETRY_INTERVAL_MS =
new Option<Integer>("pipe_memory_allocate_retry_interval_ms", 50) {
@Override
public void setValue(final String valueString) {
public void setValue(String valueString) {
value = Integer.parseInt(valueString);
}
};

public static final Option<Integer> PIPE_ASYNC_CONNECTOR_MAX_CLIENT_NUMBER =
new Option<Integer>(
"pipe_async_connector_max_client_number",
Math.max(16, Runtime.getRuntime().availableProcessors() / 2)) {
@Override
public void setValue(final String valueString) {
value = Integer.parseInt(valueString);
}
};
public static final Option<Double>
PIPE_DATA_STRUCTURE_TABLET_MEMORY_BLOCK_ALLOCATION_REJECT_THRESHOLD =
new Option<Double>(
"pipe_data_structure_tablet_memory_block_allocation_reject_threshold", 0.4) {
@Override
public void setValue(String valueString) {
value = Double.parseDouble(valueString);
}
};

public static volatile Option<Integer> DATA_NODE_ID =
new Option<Integer>("data_node_id", -1) {
@Override
public void setValue(final String valueString) {
value = Integer.parseInt(valueString);
}
};
public static final Option<Double>
PIPE_DATA_STRUCTURE_TS_FILE_MEMORY_BLOCK_ALLOCATION_REJECT_THRESHOLD =
new Option<Double>(
"pipe_data_structure_ts_file_memory_block_allocation_reject_threshold", 0.4) {
@Override
public void setValue(String valueString) {
value = Double.parseDouble(valueString);
}
};
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.io.File;

public class PluginRuntimeOptions extends Options {

public static final Option<String> PLUGIN_LIB_DIR =
new Option<String>("plugin_lib_dir", "system" + File.separator + "plugin") {
@Override
Expand Down
Loading