diff --git a/foreign/java/external-processors/iggy-connector-pinot/build.gradle.kts b/foreign/java/external-processors/iggy-connector-pinot/build.gradle.kts new file mode 100644 index 0000000000..89e9869784 --- /dev/null +++ b/foreign/java/external-processors/iggy-connector-pinot/build.gradle.kts @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +plugins { + id("iggy.java-library-conventions") +} + +dependencies { + // Iggy SDK - use local project when building within Iggy repository + api(project(":iggy")) + + // Apache Pinot dependencies (provided - not bundled with connector) + compileOnly(libs.pinot.spi) + + // Serialization support - use Jackson 2.x for Pinot compatibility + implementation(libs.jackson2.databind) { + exclude(group = "tools.jackson.core") + } + + // Apache Commons + implementation(libs.commons.lang3) + + // Logging + compileOnly(libs.slf4j.api) + + // Testing + testImplementation(platform(libs.junit.bom)) + testImplementation(libs.bundles.testing) + testImplementation(libs.pinot.spi) // Need Pinot SPI for tests + testRuntimeOnly(libs.slf4j.simple) +} + +// Task to copy runtime dependencies for Docker deployment (flattened into libs directory) +tasks.register("copyDependencies") { + from(configurations.runtimeClasspath) + into(layout.buildDirectory.dir("libs")) +} + +// Make jar task depend on copyDependencies +tasks.named("jar") { + finalizedBy("copyDependencies") +} + +publishing { + publications { + named("maven") { + artifactId = "pinot-connector" + + pom { + name = "Apache Iggy - Pinot Connector" + description = "Apache Iggy connector plugin for Apache Pinot stream ingestion" + } + } + } +} diff --git a/foreign/java/external-processors/iggy-connector-pinot/deployment/schema.json b/foreign/java/external-processors/iggy-connector-pinot/deployment/schema.json new file mode 100644 index 0000000000..d7ff8496c1 --- /dev/null +++ b/foreign/java/external-processors/iggy-connector-pinot/deployment/schema.json @@ -0,0 +1,31 @@ +{ + "schemaName": "test_events", + "dimensionFieldSpecs": [ + { + "name": "userId", + "dataType": "STRING" + }, + { + "name": "eventType", + "dataType": "STRING" + }, + { + "name": "deviceType", + "dataType": "STRING" + } + ], + "metricFieldSpecs": [ + { + "name": "duration", + "dataType": "LONG" + } + ], + "dateTimeFieldSpecs": [ + { + "name": "timestamp", + "dataType": "LONG", + "format": "1:MILLISECONDS:EPOCH", + "granularity": "1:MILLISECONDS" + } + ] +} diff --git a/foreign/java/external-processors/iggy-connector-pinot/deployment/table.json b/foreign/java/external-processors/iggy-connector-pinot/deployment/table.json new file mode 100644 index 0000000000..f6af853653 --- /dev/null +++ b/foreign/java/external-processors/iggy-connector-pinot/deployment/table.json @@ -0,0 +1,42 @@ +{ + "tableName": "test_events", + "tableType": "REALTIME", + "segmentsConfig": { + "timeColumnName": "timestamp", + "timeType": "MILLISECONDS", + "replication": "1", + "schemaName": "test_events" + }, + "tenants": { + "broker": "DefaultTenant", + "server": "DefaultTenant" + }, + "tableIndexConfig": { + "loadMode": "MMAP", + "streamConfigs": { + "streamType": "iggy", + "stream.iggy.topic.name": "test-events", + "stream.iggy.consumer.type": "lowlevel", + "stream.iggy.consumer.factory.class.name": "org.apache.iggy.connector.pinot.consumer.IggyConsumerFactory", + "realtime.segment.consumer.factory.class.name": "org.apache.iggy.connector.pinot.consumer.IggyConsumerFactory", + "stream.iggy.decoder.class.name": "org.apache.iggy.connector.pinot.decoder.IggyJsonMessageDecoder", + + "stream.iggy.host": "iggy", + "stream.iggy.port": "8090", + "stream.iggy.username": "iggy", + "stream.iggy.password": "iggy", + + "stream.iggy.stream.id": "test-stream", + "stream.iggy.topic.id": "test-events", + "stream.iggy.consumer.group": "pinot-integration-test", + + "stream.iggy.poll.batch.size": "100", + "stream.iggy.connection.pool.size": "4", + "stream.iggy.consumer.prop.auto.offset.reset": "smallest", + + "realtime.segment.flush.threshold.rows": "1000", + "realtime.segment.flush.threshold.time": "600000" + } + }, + "metadata": {} +} diff --git a/foreign/java/external-processors/iggy-connector-pinot/docker-compose.yml b/foreign/java/external-processors/iggy-connector-pinot/docker-compose.yml new file mode 100644 index 0000000000..889338fd2e --- /dev/null +++ b/foreign/java/external-processors/iggy-connector-pinot/docker-compose.yml @@ -0,0 +1,135 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +services: + # Apache Iggy Server (from official Apache repo) + iggy: + image: apache/iggy:latest + container_name: iggy-server + ports: + - "8090:8090" # TCP + - "3000:3000" # HTTP + - "8080:8080" # QUIC + environment: + - IGGY_SYSTEM_LOGGING_LEVEL=info + - IGGY_TCP_ADDRESS=0.0.0.0:8090 + - IGGY_HTTP_ENABLED=true + - IGGY_HTTP_ADDRESS=0.0.0.0:3000 + - IGGY_QUIC_ADDRESS=0.0.0.0:8080 + - IGGY_ROOT_USERNAME=iggy + - IGGY_ROOT_PASSWORD=iggy + cap_add: + - SYS_NICE + security_opt: + - seccomp:unconfined + ulimits: + memlock: + soft: -1 + hard: -1 + healthcheck: + test: ["CMD", "curl", "-f", "http://localhost:3000/"] + interval: 10s + timeout: 5s + retries: 5 + start_period: 10s + networks: + - iggy-pinot-network + + # Zookeeper (required by Pinot) + zookeeper: + image: zookeeper:3.9 + container_name: pinot-zookeeper + ports: + - "2181:2181" + environment: + ZOOKEEPER_CLIENT_PORT: 2181 + ZOOKEEPER_TICK_TIME: 2000 + networks: + - iggy-pinot-network + + # Apache Pinot Controller + pinot-controller: + image: apachepinot/pinot:latest + container_name: pinot-controller + command: "StartController -zkAddress zookeeper:2181" + ports: + - "9000:9000" + environment: + JAVA_OPTS: "-Xms1G -Xmx2G -XX:+UseG1GC" + depends_on: + - zookeeper + healthcheck: + test: ["CMD", "curl", "-f", "http://localhost:9000/health"] + interval: 10s + timeout: 5s + retries: 10 + start_period: 30s + volumes: + - ./build/libs:/opt/pinot/plugins/pinot-stream-ingestion/iggy-connector + - ../../java-sdk/build/libs/iggy-0.6.0.jar:/opt/pinot/plugins/pinot-stream-ingestion/iggy-connector/iggy-0.6.0.jar + - ./deployment:/opt/pinot/deployment + networks: + - iggy-pinot-network + + # Apache Pinot Broker + pinot-broker: + image: apachepinot/pinot:latest + container_name: pinot-broker + command: "StartBroker -zkAddress zookeeper:2181" + ports: + - "8099:8099" + environment: + JAVA_OPTS: "-Xms512M -Xmx1G -XX:+UseG1GC" + depends_on: + - pinot-controller + healthcheck: + test: ["CMD", "curl", "-f", "http://localhost:8099/health"] + interval: 10s + timeout: 5s + retries: 10 + start_period: 30s + networks: + - iggy-pinot-network + + # Apache Pinot Server + pinot-server: + image: apachepinot/pinot:latest + container_name: pinot-server + command: "StartServer -zkAddress zookeeper:2181" + ports: + - "8098:8098" + - "8097:8097" + environment: + JAVA_OPTS: "-Xms1G -Xmx2G -XX:+UseG1GC -Dplugins.include=iggy-connector" + depends_on: + - pinot-broker + healthcheck: + test: ["CMD", "curl", "-f", "http://localhost:8097/health"] + interval: 10s + timeout: 5s + retries: 10 + start_period: 30s + volumes: + - ./build/libs:/opt/pinot/plugins/pinot-stream-ingestion/iggy-connector + - ../../java-sdk/build/libs/iggy-0.6.0.jar:/opt/pinot/plugins/pinot-stream-ingestion/iggy-connector/iggy-0.6.0.jar + - ./deployment:/opt/pinot/deployment + networks: + - iggy-pinot-network + +networks: + iggy-pinot-network: + driver: bridge diff --git a/foreign/java/external-processors/iggy-connector-pinot/examples/sample-messages.json b/foreign/java/external-processors/iggy-connector-pinot/examples/sample-messages.json new file mode 100644 index 0000000000..df0cbcfaa0 --- /dev/null +++ b/foreign/java/external-processors/iggy-connector-pinot/examples/sample-messages.json @@ -0,0 +1,47 @@ +{ + "description": "Sample messages to send to Iggy for testing Pinot ingestion", + "messages": [ + { + "userId": "user_12345", + "sessionId": "session_abc123", + "eventType": "page_view", + "pageUrl": "/products/laptop", + "deviceType": "desktop", + "browser": "Chrome", + "country": "USA", + "city": "San Francisco", + "duration": 45000, + "pageLoadTime": 1200, + "scrollDepth": 75, + "eventTimestamp": 1701234567890 + }, + { + "userId": "user_67890", + "sessionId": "session_def456", + "eventType": "click", + "pageUrl": "/checkout", + "deviceType": "mobile", + "browser": "Safari", + "country": "UK", + "city": "London", + "duration": 2000, + "pageLoadTime": 800, + "scrollDepth": 100, + "eventTimestamp": 1701234570000 + }, + { + "userId": "user_12345", + "sessionId": "session_abc123", + "eventType": "purchase", + "pageUrl": "/confirmation", + "deviceType": "desktop", + "browser": "Chrome", + "country": "USA", + "city": "San Francisco", + "duration": 10000, + "pageLoadTime": 950, + "scrollDepth": 50, + "eventTimestamp": 1701234580000 + } + ] +} diff --git a/foreign/java/external-processors/iggy-connector-pinot/examples/schema.json b/foreign/java/external-processors/iggy-connector-pinot/examples/schema.json new file mode 100644 index 0000000000..77b226091c --- /dev/null +++ b/foreign/java/external-processors/iggy-connector-pinot/examples/schema.json @@ -0,0 +1,59 @@ +{ + "schemaName": "user_events", + "dimensionFieldSpecs": [ + { + "name": "userId", + "dataType": "STRING" + }, + { + "name": "sessionId", + "dataType": "STRING" + }, + { + "name": "eventType", + "dataType": "STRING" + }, + { + "name": "pageUrl", + "dataType": "STRING" + }, + { + "name": "deviceType", + "dataType": "STRING" + }, + { + "name": "browser", + "dataType": "STRING" + }, + { + "name": "country", + "dataType": "STRING" + }, + { + "name": "city", + "dataType": "STRING" + } + ], + "metricFieldSpecs": [ + { + "name": "duration", + "dataType": "LONG" + }, + { + "name": "pageLoadTime", + "dataType": "INT" + }, + { + "name": "scrollDepth", + "dataType": "INT" + } + ], + "dateTimeFieldSpecs": [ + { + "name": "eventTimestamp", + "dataType": "LONG", + "format": "1:MILLISECONDS:EPOCH", + "granularity": "1:MILLISECONDS" + } + ] +} diff --git a/foreign/java/external-processors/iggy-connector-pinot/examples/table-config.json b/foreign/java/external-processors/iggy-connector-pinot/examples/table-config.json new file mode 100644 index 0000000000..a6b16ad956 --- /dev/null +++ b/foreign/java/external-processors/iggy-connector-pinot/examples/table-config.json @@ -0,0 +1,43 @@ +{ + "tableName": "user_events_realtime", + "tableType": "REALTIME", + "segmentsConfig": { + "timeColumnName": "eventTimestamp", + "timeType": "MILLISECONDS", + "replication": "1", + "schemaName": "user_events" + }, + "tenants": { + "broker": "DefaultTenant", + "server": "DefaultTenant" + }, + "tableIndexConfig": { + "loadMode": "MMAP", + "streamConfigs": { + "streamType": "iggy", + "stream.iggy.consumer.type": "lowlevel", + "stream.iggy.consumer.factory.class.name": "org.apache.iggy.connector.pinot.consumer.IggyConsumerFactory", + "stream.iggy.decoder.class.name": "org.apache.iggy.connector.pinot.decoder.IggyJsonMessageDecoder", + + "stream.iggy.host": "localhost", + "stream.iggy.port": "8090", + "stream.iggy.username": "iggy", + "stream.iggy.password": "iggy", + "stream.iggy.enable.tls": "false", + + "stream.iggy.stream.id": "analytics", + "stream.iggy.topic.id": "user-events", + "stream.iggy.consumer.group": "pinot-realtime-consumer", + + "stream.iggy.poll.batch.size": "1000", + "stream.iggy.connection.pool.size": "8", + + "realtime.segment.flush.threshold.rows": "50000", + "realtime.segment.flush.threshold.time": "3600000", + "realtime.segment.flush.threshold.segment.size": "100M" + } + }, + "metadata": { + "customConfigs": {} + } +} diff --git a/foreign/java/external-processors/iggy-connector-pinot/integration-test.sh b/foreign/java/external-processors/iggy-connector-pinot/integration-test.sh new file mode 100755 index 0000000000..4e42a37bfe --- /dev/null +++ b/foreign/java/external-processors/iggy-connector-pinot/integration-test.sh @@ -0,0 +1,234 @@ +#!/bin/bash +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +set -e + +# Colors for output +GREEN='\033[0;32m' +YELLOW='\033[1;33m' +RED='\033[0;31m' +NC='\033[0m' # No Color + +echo -e "${GREEN}=====================================${NC}" +echo -e "${GREEN}Iggy-Pinot Integration Test${NC}" +echo -e "${GREEN}=====================================${NC}" + +# Navigate to connector directory +cd "$(dirname "$0")" + +# Step 1: Build JARs +echo -e "\n${YELLOW}Step 1: Building JARs...${NC}" +cd ../../ +gradle :iggy-connector-pinot:jar :iggy:jar +cd external-processors/iggy-connector-pinot +echo -e "${GREEN}✓ JARs built successfully${NC}" + +# Step 2: Start Docker environment +echo -e "\n${YELLOW}Step 2: Starting Docker environment...${NC}" +docker-compose down -v +docker-compose up -d +echo -e "${GREEN}✓ Docker containers starting${NC}" + +# Step 3: Wait for services to be healthy +echo -e "\n${YELLOW}Step 3: Waiting for services to be healthy...${NC}" + +echo -n "Waiting for Iggy... " +for i in {1..30}; do + if curl --connect-timeout 3 --max-time 5 -s http://localhost:3000/ > /dev/null 2>&1; then + echo -e "${GREEN}✓${NC}" + break + fi + sleep 2 + echo -n "." +done + +echo -n "Waiting for Pinot Controller... " +for i in {1..60}; do + if curl --connect-timeout 3 --max-time 5 -s http://localhost:9000/health > /dev/null 2>&1; then + echo -e "${GREEN}✓${NC}" + break + fi + sleep 2 + echo -n "." +done + +echo -n "Waiting for Pinot Broker... " +for i in {1..60}; do + if curl --connect-timeout 3 --max-time 5 -s http://localhost:8099/health > /dev/null 2>&1; then + echo -e "${GREEN}✓${NC}" + break + fi + sleep 2 + echo -n "." +done + +echo -n "Waiting for Pinot Server... " +for i in {1..60}; do + if curl --connect-timeout 3 --max-time 5 -s http://localhost:8097/health > /dev/null 2>&1; then + echo -e "${GREEN}✓${NC}" + break + fi + sleep 2 + echo -n "." +done + +sleep 5 # Extra time for services to stabilize + +# Step 4: Login to Iggy and create stream/topic +echo -e "\n${YELLOW}Step 4: Logging in to Iggy and creating stream/topic...${NC}" + +# Login and get JWT token +TOKEN=$(curl -s -X POST "http://localhost:3000/users/login" \ + -H "Content-Type: application/json" \ + -d '{"username": "iggy", "password": "iggy"}' | jq -r '.access_token.token') + +if [ -z "$TOKEN" ] || [ "$TOKEN" = "null" ]; then + echo -e "${RED}✗ Failed to get authentication token${NC}" + exit 1 +fi + +echo -e "${GREEN}✓ Authenticated${NC}" + +# Create stream +curl -s -X POST "http://localhost:3000/streams" \ + -H "Authorization: Bearer $TOKEN" \ + -H "Content-Type: application/json" \ + -d '{"stream_id": 1, "name": "test-stream"}' \ + && echo -e "${GREEN}✓ Stream created${NC}" || echo -e "${RED}✗ Stream creation failed (may already exist)${NC}" + +# Create topic +TOPIC_RESPONSE=$(curl -s -X POST "http://localhost:3000/streams/test-stream/topics" \ + -H "Authorization: Bearer $TOKEN" \ + -H "Content-Type: application/json" \ + -d '{"topic_id": 1, "name": "test-events", "partitions_count": 2, "compression_algorithm": "none", "message_expiry": 0, "max_topic_size": 0}') + +if echo "$TOPIC_RESPONSE" | grep -q '"id"'; then + echo -e "${GREEN}✓ Topic created${NC}" +else + echo -e "${RED}✗ Topic creation failed: $TOPIC_RESPONSE${NC}" + exit 1 +fi + +# Create consumer group (topic-scoped, not stream-scoped) +curl -s -X POST "http://localhost:3000/streams/test-stream/topics/test-events/consumer-groups" \ + -H "Authorization: Bearer $TOKEN" \ + -H "Content-Type: application/json" \ + -d '{"name": "pinot-integration-test"}' \ + && echo -e "${GREEN}✓ Consumer group created${NC}" || echo -e "${YELLOW}Note: Consumer group may already exist${NC}" + +# Step 5: Create Pinot schema +echo -e "\n${YELLOW}Step 5: Creating Pinot schema...${NC}" +curl -X POST "http://localhost:9000/schemas" \ + -H "Content-Type: application/json" \ + -d @deployment/schema.json \ + && echo -e "${GREEN}✓ Schema created${NC}" || echo -e "${RED}✗ Schema creation failed${NC}" + +# Step 6: Create Pinot table +echo -e "\n${YELLOW}Step 6: Creating Pinot realtime table...${NC}" +TABLE_RESPONSE=$(curl -s -X POST "http://localhost:9000/tables" \ + -H "Content-Type: application/json" \ + -d @deployment/table.json) + +if echo "$TABLE_RESPONSE" | grep -q '"status":"Table test_events_REALTIME succesfully added"'; then + echo -e "${GREEN}✓ Table created${NC}" +elif echo "$TABLE_RESPONSE" | grep -q '"code":500'; then + echo -e "${RED}✗ Table creation failed${NC}" + echo "$TABLE_RESPONSE" | jq '.' + exit 1 +else + echo -e "${GREEN}✓ Table created${NC}" +fi + +sleep 5 # Let table initialize + +# Step 7: Send test messages to Iggy +echo -e "\n${YELLOW}Step 7: Sending test messages to Iggy...${NC}" + +# Partition value for partition 0 (4-byte little-endian, base64 encoded) +PARTITION_VALUE=$(printf '\x00\x00\x00\x00' | base64) + +for i in {1..10}; do + TIMESTAMP=$(($(date +%s) * 1000)) + MESSAGE=$(cat < /dev/null 2>&1 + echo -e "${GREEN}✓ Message $i sent${NC}" + sleep 1 +done + +# Step 8: Wait for ingestion +echo -e "\n${YELLOW}Step 8: Waiting for Pinot to ingest messages...${NC}" +sleep 15 + +# Step 9: Query Pinot and verify data +echo -e "\n${YELLOW}Step 9: Querying Pinot for ingested data...${NC}" + +QUERY_RESULT=$(curl -s -X POST "http://localhost:8099/query/sql" \ + -H "Content-Type: application/json" \ + -d '{"sql": "SELECT COUNT(*) FROM test_events_REALTIME"}') + +echo "Query Result:" +echo "$QUERY_RESULT" | jq '.' + +# Extract count from result +COUNT=$(echo "$QUERY_RESULT" | jq -r '.resultTable.rows[0][0]' 2>/dev/null || echo "0") + +if [ "$COUNT" -gt "0" ]; then + echo -e "\n${GREEN}=====================================${NC}" + echo -e "${GREEN}✓ Integration Test PASSED!${NC}" + echo -e "${GREEN}Successfully ingested $COUNT messages${NC}" + echo -e "${GREEN}=====================================${NC}" + + # Show sample data + echo -e "\n${YELLOW}Sample data:${NC}" + curl -s -X POST "http://localhost:8099/query/sql" \ + -H "Content-Type: application/json" \ + -d '{"sql": "SELECT * FROM test_events_REALTIME LIMIT 5"}' | jq '.' + + EXIT_CODE=0 +else + echo -e "\n${RED}=====================================${NC}" + echo -e "${RED}✗ Integration Test FAILED!${NC}" + echo -e "${RED}No messages ingested${NC}" + echo -e "${RED}=====================================${NC}" + + # Show logs for debugging + echo -e "\n${YELLOW}Pinot Server logs:${NC}" + docker logs pinot-server --tail 50 + + EXIT_CODE=1 +fi + +# Cleanup option +echo -e "\n${YELLOW}To stop the environment: docker-compose down -v${NC}" +echo -e "${YELLOW}To view logs: docker-compose logs -f${NC}" + +exit $EXIT_CODE diff --git a/foreign/java/external-processors/iggy-connector-pinot/src/main/java/org/apache/iggy/connector/pinot/config/IggyStreamConfig.java b/foreign/java/external-processors/iggy-connector-pinot/src/main/java/org/apache/iggy/connector/pinot/config/IggyStreamConfig.java new file mode 100644 index 0000000000..28ef8c55d3 --- /dev/null +++ b/foreign/java/external-processors/iggy-connector-pinot/src/main/java/org/apache/iggy/connector/pinot/config/IggyStreamConfig.java @@ -0,0 +1,206 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iggy.connector.pinot.config; + +import org.apache.pinot.spi.stream.OffsetCriteria; +import org.apache.pinot.spi.stream.StreamConfig; + +import java.util.Map; + +/** + * Configuration class for Iggy stream ingestion in Pinot. + * Extracts and validates Iggy-specific properties from Pinot's streamConfigs. + * + *

Configuration properties (with prefix "stream.iggy."): + *

    + *
  • host - Iggy server hostname (required)
  • + *
  • port - Iggy server TCP port (default: 8090)
  • + *
  • username - Authentication username (default: "iggy")
  • + *
  • password - Authentication password (default: "iggy")
  • + *
  • stream.id - Iggy stream identifier (required)
  • + *
  • topic.id - Iggy topic identifier (required)
  • + *
  • consumer.group - Consumer group name (required)
  • + *
  • connection.pool.size - TCP connection pool size (default: 4)
  • + *
  • poll.batch.size - Number of messages to fetch per poll (default: 100)
  • + *
  • enable.tls - Enable TLS encryption (default: false)
  • + *
+ */ +public class IggyStreamConfig { + + private static final String IGGY_PREFIX = "stream.iggy."; + + // Connection properties + private static final String HOST_KEY = IGGY_PREFIX + "host"; + private static final String PORT_KEY = IGGY_PREFIX + "port"; + private static final String USERNAME_KEY = IGGY_PREFIX + "username"; + private static final String PASSWORD_KEY = IGGY_PREFIX + "password"; + private static final String ENABLE_TLS_KEY = IGGY_PREFIX + "enable.tls"; + private static final String CONNECTION_POOL_SIZE_KEY = IGGY_PREFIX + "connection.pool.size"; + + // Stream/Topic properties + private static final String STREAM_ID_KEY = IGGY_PREFIX + "stream.id"; + private static final String TOPIC_ID_KEY = IGGY_PREFIX + "topic.id"; + + // Consumer properties + private static final String CONSUMER_GROUP_KEY = IGGY_PREFIX + "consumer.group"; + private static final String POLL_BATCH_SIZE_KEY = IGGY_PREFIX + "poll.batch.size"; + + // Default values + private static final int DEFAULT_PORT = 8090; + private static final String DEFAULT_USERNAME = "iggy"; + private static final String DEFAULT_PASSWORD = "iggy"; + private static final boolean DEFAULT_ENABLE_TLS = false; + private static final int DEFAULT_CONNECTION_POOL_SIZE = 4; + private static final int DEFAULT_POLL_BATCH_SIZE = 100; + + private final StreamConfig streamConfig; + private final Map props; + + /** + * Creates a new Iggy stream configuration from Pinot's StreamConfig. + * + * @param streamConfig Pinot stream configuration + */ + public IggyStreamConfig(StreamConfig streamConfig) { + this.streamConfig = streamConfig; + this.props = streamConfig.getStreamConfigsMap(); + validate(); + } + + /** + * Validates required configuration properties. + * + * @throws IllegalArgumentException if required properties are missing + */ + private void validate() { + requireProperty(HOST_KEY, "Iggy server host is required"); + requireProperty(STREAM_ID_KEY, "Iggy stream ID is required"); + requireProperty(TOPIC_ID_KEY, "Iggy topic ID is required"); + requireProperty(CONSUMER_GROUP_KEY, "Iggy consumer group is required"); + } + + private void requireProperty(String key, String errorMessage) { + if (!props.containsKey(key) + || props.get(key) == null + || props.get(key).trim().isEmpty()) { + throw new IllegalArgumentException(errorMessage + " (property: " + key + ")"); + } + } + + public String getHost() { + return props.get(HOST_KEY); + } + + public int getPort() { + String portStr = props.get(PORT_KEY); + return portStr != null ? Integer.parseInt(portStr) : DEFAULT_PORT; + } + + public String getUsername() { + return props.getOrDefault(USERNAME_KEY, DEFAULT_USERNAME); + } + + public String getPassword() { + return props.getOrDefault(PASSWORD_KEY, DEFAULT_PASSWORD); + } + + public boolean isEnableTls() { + String tlsStr = props.get(ENABLE_TLS_KEY); + return tlsStr != null ? Boolean.parseBoolean(tlsStr) : DEFAULT_ENABLE_TLS; + } + + public int getConnectionPoolSize() { + String poolSizeStr = props.get(CONNECTION_POOL_SIZE_KEY); + return poolSizeStr != null ? Integer.parseInt(poolSizeStr) : DEFAULT_CONNECTION_POOL_SIZE; + } + + public String getStreamId() { + return props.get(STREAM_ID_KEY); + } + + public String getTopicId() { + return props.get(TOPIC_ID_KEY); + } + + public String getConsumerGroup() { + return props.get(CONSUMER_GROUP_KEY); + } + + public int getPollBatchSize() { + String batchSizeStr = props.get(POLL_BATCH_SIZE_KEY); + return batchSizeStr != null ? Integer.parseInt(batchSizeStr) : DEFAULT_POLL_BATCH_SIZE; + } + + /** + * Gets the offset specification from Pinot's consumer config. + * + * @return offset criteria + */ + public OffsetCriteria getOffsetCriteria() { + return streamConfig.getOffsetCriteria(); + } + + /** + * Gets the Pinot table name for this stream. + * + * @return table name with type suffix + */ + public String getTableNameWithType() { + return streamConfig.getTableNameWithType(); + } + + /** + * Creates server address in format "host:port". + * + * @return server address string + */ + public String getServerAddress() { + return getHost() + ":" + getPort(); + } + + @Override + public String toString() { + return "IggyStreamConfig{" + + "host='" + + getHost() + + '\'' + + ", port=" + + getPort() + + ", username='" + + getUsername() + + '\'' + + ", streamId='" + + getStreamId() + + '\'' + + ", topicId='" + + getTopicId() + + '\'' + + ", consumerGroup='" + + getConsumerGroup() + + '\'' + + ", enableTls=" + + isEnableTls() + + ", connectionPoolSize=" + + getConnectionPoolSize() + + ", pollBatchSize=" + + getPollBatchSize() + + '}'; + } +} diff --git a/foreign/java/external-processors/iggy-connector-pinot/src/main/java/org/apache/iggy/connector/pinot/consumer/IggyConsumerFactory.java b/foreign/java/external-processors/iggy-connector-pinot/src/main/java/org/apache/iggy/connector/pinot/consumer/IggyConsumerFactory.java new file mode 100644 index 0000000000..8c10f4f600 --- /dev/null +++ b/foreign/java/external-processors/iggy-connector-pinot/src/main/java/org/apache/iggy/connector/pinot/consumer/IggyConsumerFactory.java @@ -0,0 +1,116 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iggy.connector.pinot.consumer; + +import org.apache.iggy.connector.pinot.config.IggyStreamConfig; +import org.apache.iggy.connector.pinot.metadata.IggyStreamMetadataProvider; +import org.apache.pinot.spi.stream.PartitionGroupConsumer; +import org.apache.pinot.spi.stream.PartitionGroupConsumptionStatus; +import org.apache.pinot.spi.stream.PartitionLevelConsumer; +import org.apache.pinot.spi.stream.StreamConfig; +import org.apache.pinot.spi.stream.StreamConsumerFactory; +import org.apache.pinot.spi.stream.StreamMetadataProvider; + +/** + * Factory for creating Iggy stream consumers and metadata providers. + * This is the main entry point for Pinot's stream ingestion framework to interact with Iggy. + * + *

Configuration in Pinot table config: + *

{@code
+ * "streamConfigs": {
+ *   "streamType": "iggy",
+ *   "stream.iggy.consumer.factory.class.name": "org.apache.iggy.connector.pinot.consumer.IggyConsumerFactory",
+ *   "stream.iggy.host": "localhost",
+ *   "stream.iggy.port": "8090",
+ *   "stream.iggy.username": "iggy",
+ *   "stream.iggy.password": "iggy",
+ *   "stream.iggy.stream.id": "my-stream",
+ *   "stream.iggy.topic.id": "my-topic",
+ *   "stream.iggy.consumer.group": "pinot-consumer-group",
+ *   "stream.iggy.poll.batch.size": "100"
+ * }
+ * }
+ */ +public class IggyConsumerFactory extends StreamConsumerFactory { + + private StreamConfig streamConfig; + + @Override + public void init(StreamConfig streamConfig) { + this.streamConfig = streamConfig; + } + + /** + * Creates a partition-level consumer for reading from a specific Iggy partition. + * Pinot calls this method for each partition that needs to be consumed. + * + * @param clientId unique identifier for this consumer instance + * @param partitionGroupConsumptionStatus consumption status containing partition group ID and offset info + * @return a new partition consumer instance + */ + @Override + public PartitionGroupConsumer createPartitionGroupConsumer( + String clientId, PartitionGroupConsumptionStatus partitionGroupConsumptionStatus) { + IggyStreamConfig iggyConfig = new IggyStreamConfig(this.streamConfig); + int partitionGroupId = partitionGroupConsumptionStatus.getPartitionGroupId(); + return new IggyPartitionGroupConsumer(clientId, iggyConfig, partitionGroupId); + } + + /** + * Creates a partition-level consumer (newer Pinot API). + * Wraps the partition group consumer for compatibility. + * + * @param clientId unique identifier for this consumer instance + * @param partition partition identifier + * @return a new partition consumer instance + */ + @Override + public PartitionLevelConsumer createPartitionLevelConsumer(String clientId, int partition) { + IggyStreamConfig iggyConfig = new IggyStreamConfig(this.streamConfig); + IggyPartitionGroupConsumer groupConsumer = new IggyPartitionGroupConsumer(clientId, iggyConfig, partition); + return new IggyPartitionLevelConsumer(groupConsumer); + } + + /** + * Creates a metadata provider for querying stream information. + * Used by Pinot to discover partitions and check offset positions. + * + * @param clientId unique identifier for this metadata provider instance + * @param groupId partition group identifier + * @return a new metadata provider instance + */ + @Override + public StreamMetadataProvider createPartitionMetadataProvider(String clientId, int groupId) { + IggyStreamConfig iggyConfig = new IggyStreamConfig(this.streamConfig); + return new IggyStreamMetadataProvider(clientId, iggyConfig, groupId); + } + + /** + * Creates a metadata provider for the entire stream (all partitions). + * + * @param clientId unique identifier for this metadata provider instance + * @return a new metadata provider instance + */ + @Override + public StreamMetadataProvider createStreamMetadataProvider(String clientId) { + IggyStreamConfig iggyConfig = new IggyStreamConfig(this.streamConfig); + return new IggyStreamMetadataProvider(clientId, iggyConfig); + } +} diff --git a/foreign/java/external-processors/iggy-connector-pinot/src/main/java/org/apache/iggy/connector/pinot/consumer/IggyMessageBatch.java b/foreign/java/external-processors/iggy-connector-pinot/src/main/java/org/apache/iggy/connector/pinot/consumer/IggyMessageBatch.java new file mode 100644 index 0000000000..5e59ba6a89 --- /dev/null +++ b/foreign/java/external-processors/iggy-connector-pinot/src/main/java/org/apache/iggy/connector/pinot/consumer/IggyMessageBatch.java @@ -0,0 +1,130 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iggy.connector.pinot.consumer; + +import org.apache.pinot.spi.stream.MessageBatch; +import org.apache.pinot.spi.stream.StreamMessage; +import org.apache.pinot.spi.stream.StreamMessageMetadata; +import org.apache.pinot.spi.stream.StreamPartitionMsgOffset; + +import java.util.List; + +/** + * Implementation of Pinot's MessageBatch for Iggy messages. + * Wraps a list of messages with their offsets for consumption by Pinot. + */ +public class IggyMessageBatch implements MessageBatch { + + private final List messages; + + /** + * Creates a new message batch. + * + * @param messages list of messages with offsets + */ + public IggyMessageBatch(List messages) { + this.messages = messages; + } + + @Override + public int getMessageCount() { + return messages.size(); + } + + @Override + public byte[] getMessageAtIndex(int index) { + return messages.get(index).message; + } + + @Override + public int getMessageOffsetAtIndex(int index) { + return index; + } + + @Override + public int getMessageLengthAtIndex(int index) { + return messages.get(index).message.length; + } + + @Override + public long getNextStreamMessageOffsetAtIndex(int index) { + if (index >= 0 && index < messages.size()) { + return messages.get(index).offset.getOffset(); + } + return 0; + } + + @Override + public StreamPartitionMsgOffset getNextStreamPartitionMsgOffsetAtIndex(int index) { + if (index >= 0 && index < messages.size()) { + return messages.get(index).offset; + } + return null; + } + + @Override + public StreamMessage getStreamMessage(int index) { + IggyMessageAndOffset messageAndOffset = messages.get(index); + + // Calculate next offset (current + 1) + long currentOffset = messageAndOffset.offset.getOffset(); + IggyStreamPartitionMsgOffset nextOffset = new IggyStreamPartitionMsgOffset(currentOffset + 1); + + // Create metadata with offset information + StreamMessageMetadata metadata = new StreamMessageMetadata.Builder() + .setRecordIngestionTimeMs(System.currentTimeMillis()) + .setOffset(messageAndOffset.offset, nextOffset) + .build(); + + // Create and return StreamMessage + return new StreamMessage<>(null, messageAndOffset.message, messageAndOffset.message.length, metadata); + } + + @Override + public StreamPartitionMsgOffset getOffsetOfNextBatch() { + if (messages.isEmpty()) { + return new IggyStreamPartitionMsgOffset(0); + } + // Return the offset after the last message + long lastOffset = messages.get(messages.size() - 1).offset.getOffset(); + return new IggyStreamPartitionMsgOffset(lastOffset + 1); + } + + /** + * Container for an Iggy message and its offset. + */ + public static class IggyMessageAndOffset { + private final byte[] message; + private final IggyStreamPartitionMsgOffset offset; + + public IggyMessageAndOffset(byte[] message, IggyStreamPartitionMsgOffset offset) { + this.message = message; + this.offset = offset; + } + + public byte[] getMessage() { + return message; + } + + public IggyStreamPartitionMsgOffset getOffset() { + return offset; + } + } +} diff --git a/foreign/java/external-processors/iggy-connector-pinot/src/main/java/org/apache/iggy/connector/pinot/consumer/IggyPartitionGroupConsumer.java b/foreign/java/external-processors/iggy-connector-pinot/src/main/java/org/apache/iggy/connector/pinot/consumer/IggyPartitionGroupConsumer.java new file mode 100644 index 0000000000..37ba72fc02 --- /dev/null +++ b/foreign/java/external-processors/iggy-connector-pinot/src/main/java/org/apache/iggy/connector/pinot/consumer/IggyPartitionGroupConsumer.java @@ -0,0 +1,265 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iggy.connector.pinot.consumer; + +import org.apache.iggy.client.async.tcp.AsyncIggyTcpClient; +import org.apache.iggy.connector.pinot.config.IggyStreamConfig; +import org.apache.iggy.consumergroup.Consumer; +import org.apache.iggy.identifier.ConsumerId; +import org.apache.iggy.identifier.StreamId; +import org.apache.iggy.identifier.TopicId; +import org.apache.iggy.message.Message; +import org.apache.iggy.message.PolledMessages; +import org.apache.iggy.message.PollingStrategy; +import org.apache.pinot.spi.stream.MessageBatch; +import org.apache.pinot.spi.stream.PartitionGroupConsumer; +import org.apache.pinot.spi.stream.StreamPartitionMsgOffset; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.List; +import java.util.Optional; + +/** + * Partition-level consumer implementation for Iggy streams. + * Reads messages from a single Iggy partition using the AsyncIggyTcpClient. + * + *

This consumer manages: + *

    + *
  • TCP connection to Iggy server
  • + *
  • Single consumer mode (not consumer groups)
  • + *
  • Message polling with explicit offset tracking
  • + *
  • Offset management controlled by Pinot
  • + *
+ */ +public class IggyPartitionGroupConsumer implements PartitionGroupConsumer { + + private static final Logger log = LoggerFactory.getLogger(IggyPartitionGroupConsumer.class); + + private final IggyStreamConfig config; + private final int partitionId; + + private AsyncIggyTcpClient asyncClient; + private StreamId streamId; + private TopicId topicId; + private Consumer consumer; + private long currentOffset; + + /** + * Creates a new partition consumer. + * + * @param clientId unique identifier for this consumer + * @param config Iggy stream configuration + * @param partitionId the partition to consume from + */ + public IggyPartitionGroupConsumer(String clientId, IggyStreamConfig config, int partitionId) { + this.config = config; + this.partitionId = partitionId; + this.currentOffset = 0; + + log.info( + "Created IggyPartitionGroupConsumer: clientId={}, partition={}, config={}", + clientId, + partitionId, + config); + } + + /** + * Fetches the next batch of messages from the Iggy partition. + * This method is called repeatedly by Pinot to poll for new messages. + * + * @param startOffset the offset to start consuming from (may be null) + * @param timeoutMillis timeout for the fetch operation + * @return batch of messages, or empty batch if no messages available + */ + @Override + public MessageBatch fetchMessages(StreamPartitionMsgOffset startOffset, int timeoutMillis) { + try { + ensureConnected(); + + // No need to join consumer group when using single consumer + + // Determine starting offset + long fetchOffset = determineStartOffset(startOffset); + log.debug("Fetching messages from partition {} at offset {}", partitionId, fetchOffset); + + // Poll messages from Iggy + PolledMessages polledMessages = pollMessages(fetchOffset); + log.debug( + "Polled {} messages from partition {}", + polledMessages.messages().size(), + partitionId); + + // Convert to Pinot MessageBatch + MessageBatch batch = convertToMessageBatch(polledMessages); + return batch; + + } catch (RuntimeException e) { + log.error("Error fetching messages from partition {}: {}", partitionId, e.getMessage(), e); + return new IggyMessageBatch(new ArrayList<>()); + } + } + + /** + * Ensures TCP connection to Iggy server is established. + */ + private void ensureConnected() { + if (asyncClient == null) { + log.info("Connecting to Iggy server: {}", config.getServerAddress()); + + asyncClient = AsyncIggyTcpClient.builder() + .host(config.getHost()) + .port(config.getPort()) + .credentials(config.getUsername(), config.getPassword()) + .connectionPoolSize(config.getConnectionPoolSize()) + .build(); + + // Connect and authenticate + asyncClient.connect().join(); + + // Parse stream and topic IDs + streamId = parseStreamId(config.getStreamId()); + topicId = parseTopicId(config.getTopicId()); + // Use single consumer instead of consumer group for explicit offset control + consumer = Consumer.of(ConsumerId.of(Long.valueOf(partitionId))); + + log.info("Connected to Iggy server successfully"); + } + } + + /** + * Determines the starting offset for polling. + */ + private long determineStartOffset(StreamPartitionMsgOffset startOffset) { + if (startOffset != null && startOffset instanceof IggyStreamPartitionMsgOffset) { + IggyStreamPartitionMsgOffset iggyOffset = (IggyStreamPartitionMsgOffset) startOffset; + currentOffset = iggyOffset.getOffset(); + log.debug("Using provided start offset: {}", currentOffset); + return currentOffset; + } + + // Use current tracked offset when no explicit offset provided + log.debug("Using current tracked offset for partition {}: {}", partitionId, currentOffset); + return currentOffset; + } + + /** + * Polls messages from Iggy using TCP client. + */ + private PolledMessages pollMessages(long fetchOffset) { + try { + Optional partition = Optional.of((long) partitionId); + + // Use explicit offset strategy to fetch from the offset Pinot requested + PollingStrategy strategy = PollingStrategy.offset(java.math.BigInteger.valueOf(fetchOffset)); + + log.debug( + "Polling messages: partition={}, offset={}, batchSize={}", + partitionId, + fetchOffset, + config.getPollBatchSize()); + + // Poll with auto-commit disabled (we'll manage offsets via Pinot) + PolledMessages polledMessages = asyncClient + .messages() + .pollMessagesAsync( + streamId, + topicId, + partition, + consumer, + strategy, + Long.valueOf(config.getPollBatchSize()), + false) + .join(); + + log.debug( + "Polled {} messages from partition {}, currentOffset={}", + polledMessages.messages().size(), + partitionId, + polledMessages.currentOffset()); + + // Update current offset only if we got messages + if (!polledMessages.messages().isEmpty() && polledMessages.currentOffset() != null) { + currentOffset = polledMessages.currentOffset().longValue() + 1; + } + + return polledMessages; + + } catch (RuntimeException e) { + log.error("Error polling messages: {}", e.getMessage(), e); + throw new RuntimeException("Failed to poll messages", e); + } + } + + /** + * Converts Iggy PolledMessages to Pinot MessageBatch. + */ + private MessageBatch convertToMessageBatch(PolledMessages polledMessages) { + List messages = new ArrayList<>(); + + for (Message message : polledMessages.messages()) { + long offset = message.header().offset().longValue(); + byte[] payload = message.payload(); + + IggyStreamPartitionMsgOffset msgOffset = new IggyStreamPartitionMsgOffset(offset); + messages.add(new IggyMessageBatch.IggyMessageAndOffset(payload, msgOffset)); + } + + return new IggyMessageBatch(messages); + } + + /** + * Parses stream ID from string (supports both numeric and named streams). + */ + private StreamId parseStreamId(String streamIdStr) { + try { + return StreamId.of(Long.parseLong(streamIdStr)); + } catch (NumberFormatException e) { + return StreamId.of(streamIdStr); + } + } + + /** + * Parses topic ID from string (supports both numeric and named topics). + */ + private TopicId parseTopicId(String topicIdStr) { + try { + return TopicId.of(Long.parseLong(topicIdStr)); + } catch (NumberFormatException e) { + return TopicId.of(topicIdStr); + } + } + + @Override + public void close() { + if (asyncClient != null) { + try { + log.info("Closing Iggy consumer for partition {}", partitionId); + asyncClient.close().join(); + log.info("Iggy consumer closed successfully"); + } catch (RuntimeException e) { + log.error("Error closing Iggy client: {}", e.getMessage(), e); + } finally { + asyncClient = null; + } + } + } +} diff --git a/foreign/java/external-processors/iggy-connector-pinot/src/main/java/org/apache/iggy/connector/pinot/consumer/IggyPartitionLevelConsumer.java b/foreign/java/external-processors/iggy-connector-pinot/src/main/java/org/apache/iggy/connector/pinot/consumer/IggyPartitionLevelConsumer.java new file mode 100644 index 0000000000..5fa5b911e9 --- /dev/null +++ b/foreign/java/external-processors/iggy-connector-pinot/src/main/java/org/apache/iggy/connector/pinot/consumer/IggyPartitionLevelConsumer.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iggy.connector.pinot.consumer; + +import org.apache.pinot.spi.stream.MessageBatch; +import org.apache.pinot.spi.stream.PartitionLevelConsumer; +import org.apache.pinot.spi.stream.StreamPartitionMsgOffset; + +import java.util.concurrent.TimeoutException; + +/** + * Wrapper for IggyPartitionGroupConsumer to implement PartitionLevelConsumer interface. + * Delegates all operations to the underlying partition group consumer. + */ +public class IggyPartitionLevelConsumer implements PartitionLevelConsumer { + + private final IggyPartitionGroupConsumer delegate; + + public IggyPartitionLevelConsumer(IggyPartitionGroupConsumer delegate) { + this.delegate = delegate; + } + + @Override + public MessageBatch fetchMessages(StreamPartitionMsgOffset startOffset, int timeoutMs) throws TimeoutException { + return delegate.fetchMessages(startOffset, timeoutMs); + } + + @Override + public void close() { + delegate.close(); + } +} diff --git a/foreign/java/external-processors/iggy-connector-pinot/src/main/java/org/apache/iggy/connector/pinot/consumer/IggyStreamPartitionMsgOffset.java b/foreign/java/external-processors/iggy-connector-pinot/src/main/java/org/apache/iggy/connector/pinot/consumer/IggyStreamPartitionMsgOffset.java new file mode 100644 index 0000000000..6b7dce8bf9 --- /dev/null +++ b/foreign/java/external-processors/iggy-connector-pinot/src/main/java/org/apache/iggy/connector/pinot/consumer/IggyStreamPartitionMsgOffset.java @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iggy.connector.pinot.consumer; + +import org.apache.pinot.spi.stream.LongMsgOffset; +import org.apache.pinot.spi.stream.StreamPartitionMsgOffset; + +/** + * Represents an offset in an Iggy stream partition. + * Iggy uses monotonically increasing long values for offsets. + */ +public class IggyStreamPartitionMsgOffset implements StreamPartitionMsgOffset { + + private final long offset; + + /** + * Creates a new offset wrapper. + * + * @param offset the Iggy offset value + */ + public IggyStreamPartitionMsgOffset(long offset) { + this.offset = offset; + } + + public long getOffset() { + return offset; + } + + @Override + public int compareTo(StreamPartitionMsgOffset other) { + if (other instanceof IggyStreamPartitionMsgOffset) { + IggyStreamPartitionMsgOffset otherOffset = (IggyStreamPartitionMsgOffset) other; + return Long.compare(this.offset, otherOffset.offset); + } else if (other instanceof LongMsgOffset) { + // Handle comparison with Pinot's LongMsgOffset + LongMsgOffset longOffset = (LongMsgOffset) other; + return Long.compare(this.offset, longOffset.getOffset()); + } + throw new IllegalArgumentException("Cannot compare with incompatible offset type: " + other.getClass()); + } + + @Override + public String toString() { + return String.valueOf(offset); + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + IggyStreamPartitionMsgOffset that = (IggyStreamPartitionMsgOffset) o; + return offset == that.offset; + } + + @Override + public int hashCode() { + return Long.hashCode(offset); + } +} diff --git a/foreign/java/external-processors/iggy-connector-pinot/src/main/java/org/apache/iggy/connector/pinot/decoder/IggyJsonMessageDecoder.java b/foreign/java/external-processors/iggy-connector-pinot/src/main/java/org/apache/iggy/connector/pinot/decoder/IggyJsonMessageDecoder.java new file mode 100644 index 0000000000..1a619f3484 --- /dev/null +++ b/foreign/java/external-processors/iggy-connector-pinot/src/main/java/org/apache/iggy/connector/pinot/decoder/IggyJsonMessageDecoder.java @@ -0,0 +1,97 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iggy.connector.pinot.decoder; + +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.pinot.spi.data.readers.GenericRow; +import org.apache.pinot.spi.stream.StreamMessageDecoder; + +import java.util.Map; +import java.util.Set; + +/** + * JSON message decoder for Iggy streams. + * Decodes JSON-formatted messages from Iggy into Pinot GenericRow format. + * + *

Configuration in Pinot table config: + *

{@code
+ * "streamConfigs": {
+ *   "stream.iggy.decoder.class.name": "org.apache.iggy.connector.pinot.decoder.IggyJsonMessageDecoder"
+ * }
+ * }
+ */ +public class IggyJsonMessageDecoder implements StreamMessageDecoder { + + private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + + /** + * Initializes the decoder with configuration. + * Can be used to set up custom deserialization if needed. + * + * @param props decoder properties from streamConfigs + * @param fieldsToRead set of fields to read from messages + * @param topicName topic name + * @throws Exception if initialization fails + */ + @Override + public void init(Map props, Set fieldsToRead, String topicName) throws Exception { + // No special initialization needed for basic JSON decoding + } + + /** + * Decodes a JSON message payload into a GenericRow. + * + * @param payload raw byte array containing JSON + * @return GenericRow with decoded fields + */ + @Override + public GenericRow decode(byte[] payload, GenericRow destination) { + try { + @SuppressWarnings("unchecked") + Map jsonMap = OBJECT_MAPPER.readValue(payload, Map.class); + + for (Map.Entry entry : jsonMap.entrySet()) { + destination.putValue(entry.getKey(), entry.getValue()); + } + + return destination; + + } catch (java.io.IOException e) { + throw new RuntimeException("Failed to decode JSON message", e); + } + } + + /** + * Decodes a JSON message and returns the specified field values. + * + * @param payload raw byte array containing JSON + * @param offset offset in the payload to start decoding + * @param length length of the message to decode + * @param destination destination GenericRow to populate + * @return GenericRow with requested fields + */ + @Override + public GenericRow decode(byte[] payload, int offset, int length, GenericRow destination) { + // Create a new byte array for the specified range + byte[] messageBytes = new byte[length]; + System.arraycopy(payload, offset, messageBytes, 0, length); + return decode(messageBytes, destination); + } +} diff --git a/foreign/java/external-processors/iggy-connector-pinot/src/main/java/org/apache/iggy/connector/pinot/metadata/IggyStreamMetadataProvider.java b/foreign/java/external-processors/iggy-connector-pinot/src/main/java/org/apache/iggy/connector/pinot/metadata/IggyStreamMetadataProvider.java new file mode 100644 index 0000000000..36ef0aa6e1 --- /dev/null +++ b/foreign/java/external-processors/iggy-connector-pinot/src/main/java/org/apache/iggy/connector/pinot/metadata/IggyStreamMetadataProvider.java @@ -0,0 +1,243 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iggy.connector.pinot.metadata; + +import org.apache.iggy.client.async.tcp.AsyncIggyTcpClient; +import org.apache.iggy.connector.pinot.config.IggyStreamConfig; +import org.apache.iggy.connector.pinot.consumer.IggyStreamPartitionMsgOffset; +import org.apache.iggy.identifier.StreamId; +import org.apache.iggy.identifier.TopicId; +import org.apache.iggy.partition.Partition; +import org.apache.iggy.topic.TopicDetails; +import org.apache.pinot.spi.stream.OffsetCriteria; +import org.apache.pinot.spi.stream.StreamMetadataProvider; +import org.apache.pinot.spi.stream.StreamPartitionMsgOffset; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Optional; + +/** + * Metadata provider for Iggy streams. + * Provides information about partitions, offsets, and message counts. + * + *

This provider connects to Iggy via TCP to query: + *

    + *
  • Number of partitions in a topic
  • + *
  • Oldest available offset per partition
  • + *
  • Latest offset per partition
  • + *
  • Message counts
  • + *
+ */ +public class IggyStreamMetadataProvider implements StreamMetadataProvider { + + private static final Logger log = LoggerFactory.getLogger(IggyStreamMetadataProvider.class); + + private static final long DETAILS_CACHE_MS = 5000; // 5 seconds cache + + private final IggyStreamConfig config; + private final Integer partitionId; // null for stream-level, non-null for partition-level + + private AsyncIggyTcpClient asyncClient; + private StreamId streamId; + private TopicId topicId; + private TopicDetails cachedTopicDetails; + private long lastDetailsRefresh; + + /** + * Creates a stream-level metadata provider (all partitions). + * + * @param clientId unique identifier + * @param config Iggy stream configuration + */ + public IggyStreamMetadataProvider(String clientId, IggyStreamConfig config) { + this(clientId, config, null); + } + + /** + * Creates a partition-level metadata provider. + * + * @param clientId unique identifier + * @param config Iggy stream configuration + * @param partitionId specific partition ID + */ + public IggyStreamMetadataProvider(String clientId, IggyStreamConfig config, Integer partitionId) { + this.config = config; + this.partitionId = partitionId; + + log.info( + "Created IggyStreamMetadataProvider: clientId={}, partitionId={}, config={}", + clientId, + partitionId, + config); + } + + /** + * Retrieves the number of partitions and their metadata. + * Called by Pinot to discover available partitions in the stream. + * + * @param timeoutMillis timeout for the operation + * @return number of partitions in the topic + */ + @Override + public int fetchPartitionCount(long timeoutMillis) { + try { + ensureConnected(); + TopicDetails topicDetails = fetchTopicDetails(); + int partitionCount = topicDetails.partitionsCount().intValue(); + log.info("Found {} partitions for topic {}", partitionCount, config.getTopicId()); + return partitionCount; + } catch (RuntimeException e) { + log.error("Error fetching partition count: {}", e.getMessage(), e); + throw new RuntimeException("Failed to fetch partition count", e); + } + } + + /** + * Fetches the current offset for consumption. + * For Iggy, we rely on consumer group state, so this returns the earliest offset. + * + * @param offsetCriteria offset criteria (earliest, latest, etc.) + * @param timeoutMillis timeout for the operation + * @return current offset for the partition + */ + @Override + public StreamPartitionMsgOffset fetchStreamPartitionOffset(OffsetCriteria offsetCriteria, long timeoutMillis) { + try { + ensureConnected(); + + if (partitionId == null) { + throw new IllegalStateException("Partition ID must be set for offset queries"); + } + + Partition partition = getPartitionInfo(partitionId); + + // Handle offset criteria + if (offsetCriteria != null && offsetCriteria.isSmallest()) { + // Return earliest available offset (0 for Iggy) + return new IggyStreamPartitionMsgOffset(0); + } else if (offsetCriteria != null && offsetCriteria.isLargest()) { + // Return latest offset based on messages count + long latestOffset = partition.messagesCount().longValue(); + return new IggyStreamPartitionMsgOffset(latestOffset); + } else { + // Default to consumer group managed offset (start from 0) + return new IggyStreamPartitionMsgOffset(0); + } + + } catch (RuntimeException e) { + log.error("Error fetching partition offset: {}", e.getMessage(), e); + throw new RuntimeException("Failed to fetch partition offset", e); + } + } + + /** + * Ensures TCP connection to Iggy server is established. + */ + private void ensureConnected() { + if (asyncClient == null) { + log.info("Connecting to Iggy server: {}", config.getServerAddress()); + + asyncClient = AsyncIggyTcpClient.builder() + .host(config.getHost()) + .port(config.getPort()) + .credentials(config.getUsername(), config.getPassword()) + .connectionPoolSize(config.getConnectionPoolSize()) + .build(); + + // Connect and authenticate + asyncClient.connect().join(); + + // Parse stream and topic IDs + streamId = parseStreamId(config.getStreamId()); + topicId = parseTopicId(config.getTopicId()); + + log.info("Connected to Iggy server successfully"); + } + } + + /** + * Fetches topic details with caching. + */ + private TopicDetails fetchTopicDetails() { + long now = System.currentTimeMillis(); + if (cachedTopicDetails == null || (now - lastDetailsRefresh) > DETAILS_CACHE_MS) { + try { + Optional details = + asyncClient.topics().getTopicAsync(streamId, topicId).join(); + cachedTopicDetails = + details.orElseThrow(() -> new RuntimeException("Topic not found: " + config.getTopicId())); + lastDetailsRefresh = now; + } catch (RuntimeException e) { + log.error("Error fetching topic details: {}", e.getMessage(), e); + throw new RuntimeException("Failed to fetch topic details", e); + } + } + return cachedTopicDetails; + } + + /** + * Gets information for a specific partition. + */ + private Partition getPartitionInfo(int partitionId) { + TopicDetails details = fetchTopicDetails(); + return details.partitions().stream() + .filter(p -> p.id().intValue() == partitionId) + .findFirst() + .orElseThrow(() -> new IllegalArgumentException("Partition " + partitionId + " not found")); + } + + /** + * Parses stream ID from string (supports both numeric and named streams). + */ + private StreamId parseStreamId(String streamIdStr) { + try { + return StreamId.of(Long.parseLong(streamIdStr)); + } catch (NumberFormatException e) { + return StreamId.of(streamIdStr); + } + } + + /** + * Parses topic ID from string (supports both numeric and named topics). + */ + private TopicId parseTopicId(String topicIdStr) { + try { + return TopicId.of(Long.parseLong(topicIdStr)); + } catch (NumberFormatException e) { + return TopicId.of(topicIdStr); + } + } + + @Override + public void close() { + if (asyncClient != null) { + try { + log.info("Closing Iggy metadata provider"); + asyncClient.close().join(); + log.info("Iggy metadata provider closed successfully"); + } catch (RuntimeException e) { + log.error("Error closing Iggy client: {}", e.getMessage(), e); + } finally { + asyncClient = null; + } + } + } +} diff --git a/foreign/java/external-processors/iggy-connector-pinot/src/main/resources/META-INF/services/org.apache.pinot.spi.stream.StreamConsumerFactory b/foreign/java/external-processors/iggy-connector-pinot/src/main/resources/META-INF/services/org.apache.pinot.spi.stream.StreamConsumerFactory new file mode 100644 index 0000000000..2a72c4d890 --- /dev/null +++ b/foreign/java/external-processors/iggy-connector-pinot/src/main/resources/META-INF/services/org.apache.pinot.spi.stream.StreamConsumerFactory @@ -0,0 +1,16 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +org.apache.iggy.connector.pinot.consumer.IggyConsumerFactory diff --git a/foreign/java/external-processors/iggy-connector-pinot/src/main/resources/pinot-plugin.properties b/foreign/java/external-processors/iggy-connector-pinot/src/main/resources/pinot-plugin.properties new file mode 100644 index 0000000000..98cae47c36 --- /dev/null +++ b/foreign/java/external-processors/iggy-connector-pinot/src/main/resources/pinot-plugin.properties @@ -0,0 +1,31 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +# Iggy Stream Connector Plugin for Apache Pinot +# This file is required for Pinot 1.3.0+ plugin discovery + +# Plugin name +pluginName=iggy-connector + +# Plugin version +pluginVersion=0.6.0 + +# StreamConsumerFactory class +stream.iggy.consumer.factory.class=org.apache.iggy.connector.pinot.consumer.IggyConsumerFactory + +# MessageDecoder class +stream.iggy.decoder.class=org.apache.iggy.connector.pinot.decoder.IggyJsonMessageDecoder diff --git a/foreign/java/external-processors/iggy-connector-pinot/src/test/java/org/apache/iggy/connector/pinot/config/IggyStreamConfigTest.java b/foreign/java/external-processors/iggy-connector-pinot/src/test/java/org/apache/iggy/connector/pinot/config/IggyStreamConfigTest.java new file mode 100644 index 0000000000..43f0ac7d27 --- /dev/null +++ b/foreign/java/external-processors/iggy-connector-pinot/src/test/java/org/apache/iggy/connector/pinot/config/IggyStreamConfigTest.java @@ -0,0 +1,193 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iggy.connector.pinot.config; + +import org.apache.pinot.spi.stream.StreamConfig; +import org.junit.jupiter.api.Test; + +import java.util.HashMap; +import java.util.Map; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; + +class IggyStreamConfigTest { + + @Test + void testValidConfiguration() { + Map props = createValidConfig(); + StreamConfig streamConfig = new StreamConfig("test_table_REALTIME", props); + IggyStreamConfig config = new IggyStreamConfig(streamConfig); + + assertEquals("localhost", config.getHost()); + assertEquals(8090, config.getPort()); + assertEquals("iggy", config.getUsername()); + assertEquals("iggy", config.getPassword()); + assertEquals("analytics", config.getStreamId()); + assertEquals("events", config.getTopicId()); + assertEquals("test-consumer-group", config.getConsumerGroup()); + assertEquals(100, config.getPollBatchSize()); + assertEquals(4, config.getConnectionPoolSize()); + assertFalse(config.isEnableTls()); + } + + @Test + void testCustomConfiguration() { + Map props = createValidConfig(); + props.put("stream.iggy.port", "9090"); + props.put("stream.iggy.username", "custom-user"); + props.put("stream.iggy.password", "custom-pass"); + props.put("stream.iggy.poll.batch.size", "500"); + props.put("stream.iggy.connection.pool.size", "8"); + props.put("stream.iggy.enable.tls", "true"); + + StreamConfig streamConfig = new StreamConfig("test_table_REALTIME", props); + IggyStreamConfig config = new IggyStreamConfig(streamConfig); + + assertEquals(9090, config.getPort()); + assertEquals("custom-user", config.getUsername()); + assertEquals("custom-pass", config.getPassword()); + assertEquals(500, config.getPollBatchSize()); + assertEquals(8, config.getConnectionPoolSize()); + assertTrue(config.isEnableTls()); + } + + @Test + void testMissingHostThrowsException() { + Map props = createValidConfig(); + props.remove("stream.iggy.host"); + + StreamConfig streamConfig = new StreamConfig("test_table_REALTIME", props); + + IllegalArgumentException exception = + assertThrows(IllegalArgumentException.class, () -> new IggyStreamConfig(streamConfig)); + + assertTrue(exception.getMessage().contains("host")); + } + + @Test + void testMissingStreamIdThrowsException() { + Map props = createValidConfig(); + props.remove("stream.iggy.stream.id"); + + StreamConfig streamConfig = new StreamConfig("test_table_REALTIME", props); + + IllegalArgumentException exception = + assertThrows(IllegalArgumentException.class, () -> new IggyStreamConfig(streamConfig)); + + assertTrue(exception.getMessage().contains("stream ID")); + } + + @Test + void testMissingTopicIdThrowsException() { + Map props = createValidConfig(); + props.remove("stream.iggy.topic.id"); + + StreamConfig streamConfig = new StreamConfig("test_table_REALTIME", props); + + IllegalArgumentException exception = + assertThrows(IllegalArgumentException.class, () -> new IggyStreamConfig(streamConfig)); + + assertTrue(exception.getMessage().contains("topic ID")); + } + + @Test + void testMissingConsumerGroupThrowsException() { + Map props = createValidConfig(); + props.remove("stream.iggy.consumer.group"); + + StreamConfig streamConfig = new StreamConfig("test_table_REALTIME", props); + + IllegalArgumentException exception = + assertThrows(IllegalArgumentException.class, () -> new IggyStreamConfig(streamConfig)); + + assertTrue(exception.getMessage().contains("consumer group")); + } + + @Test + void testServerAddress() { + Map props = createValidConfig(); + StreamConfig streamConfig = new StreamConfig("test_table_REALTIME", props); + IggyStreamConfig config = new IggyStreamConfig(streamConfig); + + assertEquals("localhost:8090", config.getServerAddress()); + } + + @Test + void testTableNameWithType() { + Map props = createValidConfig(); + StreamConfig streamConfig = new StreamConfig("events_REALTIME", props); + IggyStreamConfig config = new IggyStreamConfig(streamConfig); + + assertEquals("events_REALTIME", config.getTableNameWithType()); + } + + @Test + void testToString() { + Map props = createValidConfig(); + StreamConfig streamConfig = new StreamConfig("test_table_REALTIME", props); + IggyStreamConfig config = new IggyStreamConfig(streamConfig); + + String str = config.toString(); + assertTrue(str.contains("localhost")); + assertTrue(str.contains("8090")); + assertTrue(str.contains("analytics")); + assertTrue(str.contains("events")); + assertTrue(str.contains("test-consumer-group")); + } + + @Test + void testNumericStreamAndTopicIds() { + Map props = createValidConfig(); + props.put("stream.iggy.stream.id", "123"); + props.put("stream.iggy.topic.id", "456"); + + StreamConfig streamConfig = new StreamConfig("test_table_REALTIME", props); + IggyStreamConfig config = new IggyStreamConfig(streamConfig); + + assertEquals("123", config.getStreamId()); + assertEquals("456", config.getTopicId()); + } + + private Map createValidConfig() { + Map props = new HashMap<>(); + props.put("streamType", "iggy"); // Required by Pinot StreamConfig + props.put("stream.iggy.topic.name", "events"); // Required by Pinot StreamConfig + props.put("stream.iggy.consumer.type", "lowlevel"); // Required by Pinot + props.put( + "stream.iggy.consumer.factory.class.name", + "org.apache.iggy.connector.pinot.consumer.IggyConsumerFactory"); + props.put("stream.iggy.decoder.class.name", "org.apache.iggy.connector.pinot.decoder.IggyJsonMessageDecoder"); + + props.put("stream.iggy.host", "localhost"); + props.put("stream.iggy.port", "8090"); + props.put("stream.iggy.username", "iggy"); + props.put("stream.iggy.password", "iggy"); + props.put("stream.iggy.stream.id", "analytics"); + props.put("stream.iggy.topic.id", "events"); + props.put("stream.iggy.consumer.group", "test-consumer-group"); + props.put("stream.iggy.poll.batch.size", "100"); + props.put("stream.iggy.connection.pool.size", "4"); + props.put("stream.iggy.enable.tls", "false"); + return props; + } +} diff --git a/foreign/java/external-processors/iggy-connector-pinot/src/test/java/org/apache/iggy/connector/pinot/consumer/IggyMessageBatchTest.java b/foreign/java/external-processors/iggy-connector-pinot/src/test/java/org/apache/iggy/connector/pinot/consumer/IggyMessageBatchTest.java new file mode 100644 index 0000000000..aca0be844a --- /dev/null +++ b/foreign/java/external-processors/iggy-connector-pinot/src/test/java/org/apache/iggy/connector/pinot/consumer/IggyMessageBatchTest.java @@ -0,0 +1,123 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iggy.connector.pinot.consumer; + +import org.junit.jupiter.api.Test; + +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.List; + +import static org.junit.jupiter.api.Assertions.assertArrayEquals; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNull; + +class IggyMessageBatchTest { + + @Test + void testEmptyBatch() { + IggyMessageBatch batch = new IggyMessageBatch(new ArrayList<>()); + assertEquals(0, batch.getMessageCount()); + } + + @Test + void testSingleMessage() { + List messages = new ArrayList<>(); + byte[] payload = "test message".getBytes(StandardCharsets.UTF_8); + IggyStreamPartitionMsgOffset offset = new IggyStreamPartitionMsgOffset(100L); + messages.add(new IggyMessageBatch.IggyMessageAndOffset(payload, offset)); + + IggyMessageBatch batch = new IggyMessageBatch(messages); + + assertEquals(1, batch.getMessageCount()); + assertArrayEquals(payload, batch.getMessageAtIndex(0)); + assertEquals(payload.length, batch.getMessageLengthAtIndex(0)); + assertEquals(100L, batch.getNextStreamMessageOffsetAtIndex(0)); + assertEquals(offset, batch.getNextStreamPartitionMsgOffsetAtIndex(0)); + } + + @Test + void testMultipleMessages() { + List messages = new ArrayList<>(); + + for (int i = 0; i < 10; i++) { + byte[] payload = ("message-" + i).getBytes(StandardCharsets.UTF_8); + IggyStreamPartitionMsgOffset offset = new IggyStreamPartitionMsgOffset(i * 100L); + messages.add(new IggyMessageBatch.IggyMessageAndOffset(payload, offset)); + } + + IggyMessageBatch batch = new IggyMessageBatch(messages); + + assertEquals(10, batch.getMessageCount()); + + for (int i = 0; i < 10; i++) { + byte[] expectedPayload = ("message-" + i).getBytes(StandardCharsets.UTF_8); + assertArrayEquals(expectedPayload, batch.getMessageAtIndex(i)); + assertEquals(expectedPayload.length, batch.getMessageLengthAtIndex(i)); + assertEquals(i * 100L, batch.getNextStreamMessageOffsetAtIndex(i)); + assertEquals(i, batch.getMessageOffsetAtIndex(i)); + } + } + + @Test + void testMessageAndOffsetWrapper() { + byte[] payload = "test".getBytes(StandardCharsets.UTF_8); + IggyStreamPartitionMsgOffset offset = new IggyStreamPartitionMsgOffset(123L); + + IggyMessageBatch.IggyMessageAndOffset wrapper = new IggyMessageBatch.IggyMessageAndOffset(payload, offset); + + assertArrayEquals(payload, wrapper.getMessage()); + assertEquals(offset, wrapper.getOffset()); + assertEquals(123L, wrapper.getOffset().getOffset()); + } + + @Test + void testNullOffsetAtInvalidIndex() { + List messages = new ArrayList<>(); + byte[] payload = "test".getBytes(StandardCharsets.UTF_8); + IggyStreamPartitionMsgOffset offset = new IggyStreamPartitionMsgOffset(100L); + messages.add(new IggyMessageBatch.IggyMessageAndOffset(payload, offset)); + + IggyMessageBatch batch = new IggyMessageBatch(messages); + + assertNull(batch.getNextStreamPartitionMsgOffsetAtIndex(-1)); + assertNull(batch.getNextStreamPartitionMsgOffsetAtIndex(10)); + assertEquals(0, batch.getNextStreamMessageOffsetAtIndex(-1)); + assertEquals(0, batch.getNextStreamMessageOffsetAtIndex(10)); + } + + @Test + void testLargeMessageBatch() { + List messages = new ArrayList<>(); + + // Create 1000 messages + for (int i = 0; i < 1000; i++) { + byte[] payload = new byte[1024]; // 1KB per message + IggyStreamPartitionMsgOffset offset = new IggyStreamPartitionMsgOffset(i); + messages.add(new IggyMessageBatch.IggyMessageAndOffset(payload, offset)); + } + + IggyMessageBatch batch = new IggyMessageBatch(messages); + + assertEquals(1000, batch.getMessageCount()); + assertEquals(1024, batch.getMessageLengthAtIndex(0)); + assertEquals(1024, batch.getMessageLengthAtIndex(999)); + } +} diff --git a/foreign/java/external-processors/iggy-connector-pinot/src/test/java/org/apache/iggy/connector/pinot/consumer/IggyStreamPartitionMsgOffsetTest.java b/foreign/java/external-processors/iggy-connector-pinot/src/test/java/org/apache/iggy/connector/pinot/consumer/IggyStreamPartitionMsgOffsetTest.java new file mode 100644 index 0000000000..d3841b7844 --- /dev/null +++ b/foreign/java/external-processors/iggy-connector-pinot/src/test/java/org/apache/iggy/connector/pinot/consumer/IggyStreamPartitionMsgOffsetTest.java @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iggy.connector.pinot.consumer; + +import org.junit.jupiter.api.Test; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; + +class IggyStreamPartitionMsgOffsetTest { + + @Test + void testOffsetCreation() { + IggyStreamPartitionMsgOffset offset = new IggyStreamPartitionMsgOffset(100L); + assertEquals(100L, offset.getOffset()); + } + + @Test + void testCompareTo() { + IggyStreamPartitionMsgOffset offset1 = new IggyStreamPartitionMsgOffset(100L); + IggyStreamPartitionMsgOffset offset2 = new IggyStreamPartitionMsgOffset(200L); + IggyStreamPartitionMsgOffset offset3 = new IggyStreamPartitionMsgOffset(100L); + + assertTrue(offset1.compareTo(offset2) < 0); + assertTrue(offset2.compareTo(offset1) > 0); + assertEquals(0, offset1.compareTo(offset3)); + } + + @Test + void testEquals() { + IggyStreamPartitionMsgOffset offset1 = new IggyStreamPartitionMsgOffset(100L); + IggyStreamPartitionMsgOffset offset2 = new IggyStreamPartitionMsgOffset(100L); + IggyStreamPartitionMsgOffset offset3 = new IggyStreamPartitionMsgOffset(200L); + + assertEquals(offset1, offset2); + assertNotEquals(offset1, offset3); + assertEquals(offset1, offset1); + assertNotEquals(offset1, null); + assertNotEquals(offset1, "string"); + } + + @Test + void testHashCode() { + IggyStreamPartitionMsgOffset offset1 = new IggyStreamPartitionMsgOffset(100L); + IggyStreamPartitionMsgOffset offset2 = new IggyStreamPartitionMsgOffset(100L); + IggyStreamPartitionMsgOffset offset3 = new IggyStreamPartitionMsgOffset(200L); + + assertEquals(offset1.hashCode(), offset2.hashCode()); + assertNotEquals(offset1.hashCode(), offset3.hashCode()); + } + + @Test + void testToString() { + IggyStreamPartitionMsgOffset offset = new IggyStreamPartitionMsgOffset(12345L); + assertEquals("12345", offset.toString()); + } + + @Test + void testZeroOffset() { + IggyStreamPartitionMsgOffset offset = new IggyStreamPartitionMsgOffset(0L); + assertEquals(0L, offset.getOffset()); + assertEquals("0", offset.toString()); + } + + @Test + void testLargeOffset() { + long largeOffset = Long.MAX_VALUE - 1; + IggyStreamPartitionMsgOffset offset = new IggyStreamPartitionMsgOffset(largeOffset); + assertEquals(largeOffset, offset.getOffset()); + assertEquals(String.valueOf(largeOffset), offset.toString()); + } +} diff --git a/foreign/java/gradle/libs.versions.toml b/foreign/java/gradle/libs.versions.toml index 63cd5b3bda..dd3e8408cb 100644 --- a/foreign/java/gradle/libs.versions.toml +++ b/foreign/java/gradle/libs.versions.toml @@ -19,8 +19,12 @@ # Flink flink = "2.1.1" +# Pinot +pinot = "1.4.0" + # Jackson jackson = "3.0.2" +jackson2 = "2.18.2" # Apache Commons commons-lang3 = "3.20.0" @@ -58,6 +62,10 @@ checkstyle = "12.1.2" [libraries] # Jackson jackson-databind = { module = "tools.jackson.core:jackson-databind", version.ref = "jackson" } +jackson2-databind = { module = "com.fasterxml.jackson.core:jackson-databind", version.ref = "jackson2" } + +# Pinot +pinot-spi = { module = "org.apache.pinot:pinot-spi", version.ref = "pinot" } # Apache HTTP Client httpclient5 = { module = "org.apache.httpcomponents.client5:httpclient5", version.ref = "httpclient5" } diff --git a/foreign/java/settings.gradle.kts b/foreign/java/settings.gradle.kts index 050119ed8d..477beb85d0 100644 --- a/foreign/java/settings.gradle.kts +++ b/foreign/java/settings.gradle.kts @@ -28,3 +28,6 @@ project(":iggy-connector-library").projectDir = file("external-processors/iggy-c include("iggy-flink-examples") project(":iggy-flink-examples").projectDir = file("external-processors/iggy-connector-flink/iggy-flink-examples") + +include("iggy-connector-pinot") +project(":iggy-connector-pinot").projectDir = file("external-processors/iggy-connector-pinot")