Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion .github/workflows/integ-tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,8 @@ on:
branches:
- develop
- main

merge_group:

permissions:
contents: read

Expand Down
20 changes: 20 additions & 0 deletions .github/workflows/validate-branch-into-main.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
name: Validate PR Branch into Main

on:
pull_request:
branches:
- main

jobs:
validate-pr-branch:
runs-on: ubuntu-latest
steps:
- name: Check source branch
run: |
SOURCE_BRANCH="${{ github.head_ref }}"
if [[ "$SOURCE_BRANCH" != "develop" ]]; then
echo "Error: Only pull requests from develop branch are allowed into main"
echo "Current source branch ($SOURCE_BRANCH)."
exit 1
fi
echo "Source branch is develop - merge allowed"
Comment on lines +10 to +20

Check warning

Code scanning / CodeQL

Workflow does not contain permissions Medium

Actions job or workflow does not limit the permissions of the GITHUB_TOKEN. Consider setting an explicit permissions block, using the following as a minimal starting point: {}

Copilot Autofix

AI 6 days ago

In general, this problem is fixed by explicitly defining a permissions: block for the workflow or for individual jobs, granting only the minimal scopes required. For a job that only reads context information (like github.head_ref) and doesn’t interact with repository contents, PRs, or issues, the least‑privilege configuration is to set contents: read at the workflow or job level (or even permissions: {} if no GitHub API access is needed, but contents: read is a common safe baseline).

For this specific file, the simplest non‑breaking fix is to add a root‑level permissions: block that applies to all jobs. The workflow only reads metadata and performs shell logic, so we can restrict the token to read‑only repository contents. Concretely, in .github/workflows/validate-branch-into-main.yaml, insert:

permissions:
  contents: read

between the name: section and the on: block (e.g., after line 2), leaving all job and step definitions unchanged. No imports or additional methods are needed since this is just a YAML configuration change and does not affect the workflow’s behavior.

Suggested changeset 1
.github/workflows/validate-branch-into-main.yaml

Autofix patch

Autofix patch
Run the following command in your local git repository to apply this patch
cat << 'EOF' | git apply
diff --git a/.github/workflows/validate-branch-into-main.yaml b/.github/workflows/validate-branch-into-main.yaml
--- a/.github/workflows/validate-branch-into-main.yaml
+++ b/.github/workflows/validate-branch-into-main.yaml
@@ -1,5 +1,8 @@
 name: Validate PR Branch into Main 
 
+permissions:
+  contents: read
+
 on:
   pull_request:
     branches:
EOF
@@ -1,5 +1,8 @@
name: Validate PR Branch into Main

permissions:
contents: read

on:
pull_request:
branches:
Copilot is powered by AI and may make mistakes. Always verify output.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -193,4 +193,4 @@ See [CONTRIBUTING](CONTRIBUTING.md#security-issue-notifications) for more inform

## License

This project is licensed under the Apache-2.0 License.
This project is licensed under the Apache-2.0 License.
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,9 @@ func GetInitRequestMessage(fileUtil utils.FileUtil, args []string) (intmodel.Ini
XrayTracingMode: intmodel.XRayTracingModePassThrough,
CurrentWorkingDir: cwd,
RuntimeBinaryCommand: cmd,
AvailabilityZoneId: "",
AmiId: "",

AvailabilityZoneId: "use1-az1",
AmiId: "",
}, nil
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ func Test_getInitRequestMessage(t *testing.T) {
XrayTracingMode: intmodel.XRayTracingModePassThrough,
CurrentWorkingDir: "REPLACE",
RuntimeBinaryCommand: []string{"/path/to/bootstrap"},
AvailabilityZoneId: "",
AvailabilityZoneId: "use1-az1",
AmiId: "",
},
},
Expand Down Expand Up @@ -116,7 +116,7 @@ func Test_getInitRequestMessage(t *testing.T) {
XrayTracingMode: intmodel.XRayTracingModePassThrough,
CurrentWorkingDir: "/var/task",
RuntimeBinaryCommand: []string{"/custom/bootstrap", "custom_handler"},
AvailabilityZoneId: "",
AvailabilityZoneId: "use1-az1",
AmiId: "",
},
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,9 @@ import (
"os"
"time"

"github.com/google/uuid"
"github.com/aws/aws-lambda-runtime-interface-emulator/internal/lmds"

rieinvoke "github.com/aws/aws-lambda-runtime-interface-emulator/internal/lambda-managed-instances/aws-lambda-rie/internal/invoke"
"github.com/aws/aws-lambda-runtime-interface-emulator/internal/lambda-managed-instances/aws-lambda-rie/internal/telemetry"
"github.com/aws/aws-lambda-runtime-interface-emulator/internal/lambda-managed-instances/interop"
Expand Down Expand Up @@ -47,8 +50,9 @@ func Run(supv supvmodel.ProcessSupervisor, args []string, fileUtil utils.FileUti
responderFactoryFunc := func(_ context.Context, invokeReq interop.InvokeRequest) invoke.InvokeResponseSender {
return rieinvoke.NewResponder(invokeReq)
}
invokeRouter := invoke.NewInvokeRouter(rapid.MaxIdleRuntimesQueueSize, eventsAPI, responderFactoryFunc, timeout.NewRecentCache())
invokeRouter := invoke.NewInvokeRouter(rapid.RuntimePoolSize, eventsAPI, responderFactoryFunc, timeout.NewRecentCache())

metadataToken := uuid.NewString()
deps := rapid.Dependencies{
EventsAPI: eventsAPI,
LogsEgressAPI: telemetry.NewLogsEgress(telemetryAPIRelay, os.Stdout),
Expand All @@ -57,9 +61,10 @@ func Run(supv supvmodel.ProcessSupervisor, args []string, fileUtil utils.FileUti
RuntimeAPIAddrPort: runtimeAPIAddr,
FileUtils: fileUtil,
InvokeRouter: invokeRouter,
MetadataService: lmds.NewService(metadataToken),
}

raptorApp, err := raptor.StartApp(deps, "", noOpLogger{})
raptorApp, err := raptor.StartApp(deps, "", metadataToken, noOpLogger{})
if err != nil {
return nil, nil, nil, fmt.Errorf("could not start runtime api server: %w", err)
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
# RIE Telemetry Package

The RIE (Runtime Interface Emulator) telemetry package provides Telemetry API.

## Architecture Overview

```
┌─────────────────┐ ┌─────────────────┐ ┌──────────────────┐
│ EventsAPI │ │ LogsEgress │ │ SubscriptionAPI │
│ │ │ │ │ │
│ • Platform │ │ • Runtime logs │ │ • Subscription │
│ events │ │ • Extension │ │ management │
│ • Lifecycle │ │ logs │ │ • Schema │
│ events │ │ • Log capture │ │ validation │
└─────────┬───────┘ └─────────┬───────┘ └──────────┬───────┘
│ │ │
└──────────────┬───────────────────────────────┘
┌────▼────┐
│ Relay │
│ │
│ Event │
│ Broker │
└────┬────┘
┌──────────────┼──────────────┐
│ │ │
┌─────▼─────┐ ┌─────▼─────┐ ┌─────▼─────┐
│Subscriber │ │Subscriber │ │Subscriber │
│ A │ │ B │ │ C │
└─────┬─────┘ └─────┬─────┘ └─────┬─────┘
│ │ │
┌─────▼─────┐ ┌─────▼─────┐ ┌─────▼─────┐
│TCP Client │ │HTTP Client│ │TCP Client │
└───────────┘ └───────────┘ └───────────┘
```

## Core Components

### 1. EventsAPI (`events_api.go`)
**Responsibility**: Platform event generation and distribution

The EventsAPI serves as the primary interface for generating and broadcasting AWS Lambda platform events. It implements the `EventsAPI` interface and handles various lifecycle events including initialization, invocation, and error reporting.

### 2. LogsEgress (`logs_egress.go`)
**Responsibility**: Log capture and forwarding

The LogsEgress component implements the `StdLogsEgressAPI` interface to capture stdout/stderr from both runtime and extensions, forwarding them to telemetry subscribers while maintaining original console output.

### 3. Relay (`relay.go`)
**Responsibility**: Event broadcasting and subscriber management

The Relay acts as a central event broker, managing subscribers and broadcasting events to all registered telemetry consumers.

### 4. SubscriptionAPI (`subscription_api.go`)
**Responsibility**: Subscription management and validation

The SubscriptionAPI handles telemetry subscription requests, validates them against JSON schemas, and manages the subscription lifecycle.

## Internal Components

### 1. Subscriber (`internal/subscriber.go`)
**Responsibility**: Event batching and delivery

Each subscriber represents a telemetry consumer and manages efficient event delivery through batching and asynchronous processing.

### 2. Client (`internal/client.go`)
**Responsibility**: Protocol-specific event delivery

The client abstraction provides protocol-specific implementations for delivering events to telemetry consumers.

### 3. Batch (`internal/batch.go`)
**Responsibility**: Event collection and timing

The batch component manages collections of events with size and time-based flushing logic.

### 4. Types (`internal/types.go`)
**Responsibility**: Type definitions and constants

Centralized type definitions for protocols, event categories, and configuration structures.

## Event Flow

### 1. Subscription Flow
```
Extension/Agent → SubscriptionAPI → Schema Validation → Subscriber Creation → Relay Registration
```

### 2. Event Flow
```
Event Source → EventsAPI → Relay → Subscribers → Batching → Client
```

### 3. Log Flow
```
Runtime/Extension → LogsEgress → Console Output + Relay → Subscribers → Batching → Client
```
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ func TestSubscriber(t *testing.T) {

agentName := fmt.Sprintf("test-name-%d", rand.Uint32())
sub := NewSubscriber(agentName, map[EventCategory]struct{}{CategoryPlatform: {}}, BufferingConfig{MaxItems: 2, MaxBytes: math.MaxInt, Timeout: math.MaxInt}, client, logsDroppedEventAPI)
time.Sleep(100 * time.Millisecond)
time.Sleep(200 * time.Millisecond)
assert.Equal(t, agentName, sub.AgentName())

sub.Flush(context.Background())
Expand All @@ -43,19 +43,23 @@ func TestSubscriber(t *testing.T) {
client.On("send", mock.Anything, mock.Anything).Return(nil)
sub.SendAsync(event, CategoryPlatform)

time.Sleep(100 * time.Millisecond)

require.Eventually(t, func() bool {
return client.AssertNumberOfCalls(t, "send", 1)
}, time.Second, 10*time.Millisecond)
}, 2*time.Second, 10*time.Millisecond)

sub.SendAsync(event, CategoryPlatform)

time.Sleep(100 * time.Millisecond)
assert.Eventually(
t,
func() bool {

sub.Flush(context.Background())
return client.AssertNumberOfCalls(t, "send", 2)
},
time.Second,
2*time.Second,
10*time.Millisecond,
)
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"

"github.com/aws/aws-lambda-runtime-interface-emulator/internal/lambda-managed-instances/aws-lambda-rie/internal/telemetry/internal"
Expand Down Expand Up @@ -53,17 +54,31 @@ func TestLogsEgress(t *testing.T) {
require.NotNil(t, stderr)

line := []byte("test\n")
relay.On("broadcast", "test", tt.expectedCategory, tt.expectedCategory).Twice()

done := make(chan struct{}, 2)

relay.
On("broadcast", "test", tt.expectedCategory, tt.expectedCategory).
Twice().
Run(func(args mock.Arguments) {
done <- struct{}{}
})

n, err := stdout.Write(line)
assert.NoError(t, err)
assert.Len(t, line, n)
n, err = stderr.Write(line)
assert.NoError(t, err)
assert.Len(t, line, n)

assert.Eventually(t, func() bool {
return relay.AssertNumberOfCalls(t, "broadcast", 2)
}, 1*time.Second, 10*time.Millisecond)
for i := 0; i < 2; i++ {
select {
case <-done:

case <-time.After(2 * time.Second):
t.Fatal("timeout waiting for broadcast calls")
}
}
})
}
}
12 changes: 12 additions & 0 deletions internal/lambda-managed-instances/aws-lambda-rie/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0

package main

import (
"github.com/aws/aws-lambda-runtime-interface-emulator/internal/lambda-managed-instances/aws-lambda-rie/run"
)

func main() {
run.Run()
}
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,10 @@ func (_m *MockInvokeMetrics) SendMetrics(_a0 model.AppError) error {
return r0
}

func (_m *MockInvokeMetrics) SetReservationUsed(wasReserved bool) {
_m.Called(wasReserved)
}

func (_m *MockInvokeMetrics) TriggerGetRequest() {
_m.Called()
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0

package interop

import mock "github.com/stretchr/testify/mock"

type MockReserveIdleRuntimeRequest struct {
mock.Mock
}

func (_m *MockReserveIdleRuntimeRequest) InvokeID() string {
ret := _m.Called()

if len(ret) == 0 {
panic("no return value specified for InvokeID")
}

var r0 string
if rf, ok := ret.Get(0).(func() string); ok {
r0 = rf()
} else {
r0 = ret.Get(0).(string)
}

return r0
}

func NewMockReserveIdleRuntimeRequest(t interface {
mock.TestingT
Cleanup(func())
}) *MockReserveIdleRuntimeRequest {
mock := &MockReserveIdleRuntimeRequest{}
mock.Mock.Test(t)

t.Cleanup(func() { mock.AssertExpectations(t) })

return mock
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0

package interop

import mock "github.com/stretchr/testify/mock"

type MockReserveIdleRuntimeResponse struct {
mock.Mock
}

func (_m *MockReserveIdleRuntimeResponse) reserveIdleRuntimeResponse() {
_m.Called()
}

func NewMockReserveIdleRuntimeResponse(t interface {
mock.TestingT
Cleanup(func())
}) *MockReserveIdleRuntimeResponse {
mock := &MockReserveIdleRuntimeResponse{}
mock.Mock.Test(t)

t.Cleanup(func() { mock.AssertExpectations(t) })

return mock
}
Loading
Loading