Skip to content
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -369,6 +369,7 @@ All \<system\>, \<stream\>, \<partition\>, \<store-name\>, \<topic\>, are popula
| | expired-preferred-host-requests | Number of expired resource-requests-for -preferred-host received by the cluster manager. |
| | expired-any-host-requests | Number of expired resource-requests-for -any-host received by the cluster manager. |
| | host-affinity-match-pct | Percentage of non-expired preferred host requests. This measures the % of resource-requests for which host-affinity provided the preferred host. |
| | \<containerId\>-failure-count | Number of times a container identified by containerId has failed |
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe we decided to use "processorId" for 0,1,2..

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

that lingo is used internally in code as the naming conventions for javadocs, this is public-facing metrics page where we do not need to have context between processorId and containerId


| **Group** | **Metric name** | **Meaning** |
| --- | --- | --- |
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.commons.lang3.tuple.ImmutablePair;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.samza.SamzaException;
Expand Down Expand Up @@ -182,7 +183,7 @@ public ContainerProcessManager(Config config, SamzaApplicationState state, Metri
this.jobConfig = new JobConfig(clusterManagerConfig);

this.hostAffinityEnabled = clusterManagerConfig.getHostAffinityEnabled();

this.containerProcessManagerMetrics = new ContainerProcessManagerMetrics(jobConfig, state, registry);
this.clusterResourceManager = resourceManager;
this.containerManager = containerManager;
this.diagnosticsManager = Option.empty();
Expand Down Expand Up @@ -236,6 +237,12 @@ public void start() {
Map<String, String> processorToHostMapping = state.jobModelManager.jobModel().getAllContainerLocality();
containerAllocator.requestResources(processorToHostMapping);

// Initialize the per processor failure count to be 0
processorToHostMapping.keySet().forEach(processorId -> {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See comment below on how/why this information isnt really in "state"

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

replied there

state.perProcessorFailureCount.put(processorId, new AtomicInteger(0));
containerProcessManagerMetrics.registerProcessorFailureCountMetric(processorId);
});

// Start container allocator thread
LOG.info("Starting the container allocator thread");
allocatorThread.start();
Expand Down Expand Up @@ -472,6 +479,9 @@ void onResourceCompletedWithUnknownStatus(SamzaResourceStatus resourceStatus, St
LOG.info("Container ID: {} for Processor ID: {} failed with exit code: {}.", containerId, processorId, exitStatus);
Instant now = Instant.now();
state.failedContainers.incrementAndGet();
if (state.perProcessorFailureCount.get(processorId) != null) {
state.perProcessorFailureCount.get(processorId).incrementAndGet();
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

else {
Log.error("Unknown/orphan container") ??
}

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This method is the helper to and is invoked from onResourceCompleted(...) which does the check for processorId to be legit, remeber that we also get redundant notifications so we cannot declare a container orphan / unknown, we need more testing to deem callback senarios as orphans and that work is beyond the scope of this change

state.jobHealthy.set(false);

state.neededProcessors.incrementAndGet();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,13 @@ public enum SamzaAppStatus { UNDEFINED, SUCCEEDED, FAILED }
*/
public final ConcurrentHashMap<String, SamzaResourceStatus> failedProcessors = new ConcurrentHashMap<>(0);


/**
* Map of the Samza processor ID to the count of failed attempts
* Modified by AMRMCallbackThread
*/
public final ConcurrentMap<String, AtomicInteger> perProcessorFailureCount = new ConcurrentHashMap<>(0);

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If this information is only useful for metric-emission, does it need to be stored in "state" ?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That is correct, we can directly wire metrics registry in ContainerManager, ContainerAllocator and instantiate new guage and counters in the code but all metrics related to AM are under this ContainerProcessManagerMetrics class which holds MetricsRegistry and SamzaApplicationState, so once does not need to wire MetricsRegistry individually to each AM class ContainerManager, ContainerAllocator. This is the justification for maintained this state variable to wire metrics, I feel this approach is cleaner

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Check SamzaApplicationState most of the state there is just used for metric emissions

/**
* Final status of the application. Made to be volatile s.t. changes will be visible in multiple threads.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,4 +64,11 @@ class ContainerProcessManagerMetrics(val config: Config,

val mContainerMemoryMb = newGauge("container-memory-mb", () => clusterManagerConfig.getContainerMemoryMb)
val mContainerCpuCores = newGauge("container-cpu-cores", () => clusterManagerConfig.getNumCores)


val mPerContainerFailureCount = collection.mutable.Map[String, Gauge[Int]]()
def registerProcessorFailureCountMetric(containerId: String) {
mPerContainerFailureCount.put(containerId, newGauge("container_" + containerId + "-failure-count", () => state.perProcessorFailureCount.get(containerId).get()))
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,8 @@ public class TestContainerProcessManager {
};
private Config config = new MapConfig(configVals);
private ContainerPlacementMetadataStore containerPlacementMetadataStore;
private CoordinatorStreamStore coordinatorStreamStore;
private ContainerProcessManager cpm;

private Config getConfig() {
Map<String, String> map = new HashMap<>();
Expand Down Expand Up @@ -129,7 +131,7 @@ private JobModelManager getJobModelManagerWithoutHostAffinity(int containerCount
public void setup() throws Exception {
server = new MockHttpServer("/", 7777, null, new ServletHolder(DefaultServlet.class));
CoordinatorStreamStoreTestUtil coordinatorStreamStoreTestUtil = new CoordinatorStreamStoreTestUtil(config);
CoordinatorStreamStore coordinatorStreamStore = coordinatorStreamStoreTestUtil.getCoordinatorStreamStore();
coordinatorStreamStore = coordinatorStreamStoreTestUtil.getCoordinatorStreamStore();
coordinatorStreamStore.init();
containerPlacementMetadataStore = new ContainerPlacementMetadataStore(coordinatorStreamStore);
containerPlacementMetadataStore.start();
Expand All @@ -154,8 +156,7 @@ public void testContainerProcessManager() throws Exception {
MockClusterResourceManager clusterResourceManager = new MockClusterResourceManager(callback, state);
ContainerManager containerManager =
new ContainerManager(containerPlacementMetadataStore, state, clusterResourceManager, true, false);
ContainerProcessManager cpm =
buildContainerProcessManager(new ClusterManagerConfig(new MapConfig(conf)), state, clusterResourceManager, Optional.empty());
cpm = buildContainerProcessManager(new ClusterManagerConfig(new MapConfig(conf)), state, clusterResourceManager, Optional.empty());

ContainerAllocator allocator =
(ContainerAllocator) getPrivateFieldFromCpm("containerAllocator", cpm).get(cpm);
Expand Down Expand Up @@ -200,8 +201,7 @@ public void testOnInit() throws Exception {
new ContainerManager(containerPlacementMetadataStore, state, clusterResourceManager,
clusterManagerConfig.getHostAffinityEnabled(), false);

ContainerProcessManager cpm =
buildContainerProcessManager(clusterManagerConfig, state, clusterResourceManager, Optional.empty());
cpm = buildContainerProcessManager(clusterManagerConfig, state, clusterResourceManager, Optional.empty());

MockContainerAllocatorWithoutHostAffinity allocator = new MockContainerAllocatorWithoutHostAffinity(
clusterResourceManager,
Expand Down Expand Up @@ -231,7 +231,7 @@ public void run() {
assertEquals(1, state.neededProcessors.get());
assertEquals(1, allocator.requestedContainers);

cpm.stop();

}

@Test
Expand All @@ -242,8 +242,7 @@ public void testOnShutdown() throws Exception {
MockClusterResourceManager clusterResourceManager = new MockClusterResourceManager(callback, state);
ClusterManagerConfig clusterManagerConfig = spy(new ClusterManagerConfig(conf));

ContainerProcessManager cpm =
buildContainerProcessManager(clusterManagerConfig, state, clusterResourceManager, Optional.empty());
cpm = buildContainerProcessManager(clusterManagerConfig, state, clusterResourceManager, Optional.empty());
cpm.start();

Thread allocatorThread = (Thread) getPrivateFieldFromCpm("allocatorThread", cpm).get(cpm);
Expand Down Expand Up @@ -274,8 +273,7 @@ public void testCpmShouldStopWhenContainersFinish() throws Exception {
state,
containerManager);

ContainerProcessManager cpm =
spy(buildContainerProcessManager(clusterManagerConfig, state, clusterResourceManager, Optional.of(allocator)));
cpm = spy(buildContainerProcessManager(clusterManagerConfig, state, clusterResourceManager, Optional.of(allocator)));

// start triggers a request
cpm.start();
Expand Down Expand Up @@ -322,8 +320,7 @@ public void testNewContainerRequestedOnFailureWithUnknownCode() throws Exception
state,
containerManager);

ContainerProcessManager cpm = spy(
buildContainerProcessManager(clusterManagerConfig, state, clusterResourceManager, Optional.of(allocator)));
cpm = spy(buildContainerProcessManager(clusterManagerConfig, state, clusterResourceManager, Optional.of(allocator)));

// start triggers a request
cpm.start();
Expand Down Expand Up @@ -381,7 +378,7 @@ public void testNewContainerRequestedOnFailureWithUnknownCode() throws Exception
assertTrue(cpm.shouldShutdown());
assertEquals(SamzaApplicationState.SamzaAppStatus.FAILED, state.status);

cpm.stop();

}

/**
Expand Down Expand Up @@ -421,8 +418,7 @@ private void testContainerRequestedRetriesExceedingWindowOnFailureWithUnknownCod
state,
containerManager);

ContainerProcessManager cpm =
buildContainerProcessManager(clusterManagerConfig, state, clusterResourceManager, Optional.of(allocator));
cpm = buildContainerProcessManager(clusterManagerConfig, state, clusterResourceManager, Optional.of(allocator));

// start triggers a request
cpm.start();
Expand Down Expand Up @@ -475,7 +471,7 @@ private void testContainerRequestedRetriesExceedingWindowOnFailureWithUnknownCod
assertEquals(false, cpm.getJobFailureCriteriaMet());
assertEquals(1, cpm.getProcessorFailures().get(processorId).getCount());

cpm.stop();

}

@Test
Expand Down Expand Up @@ -507,8 +503,7 @@ private void testContainerRequestedRetriesNotExceedingWindowOnFailureWithUnknown
state,
containerManager);

ContainerProcessManager cpm =
buildContainerProcessManager(clusterManagerConfig, state, clusterResourceManager, Optional.of(allocator));
cpm = buildContainerProcessManager(clusterManagerConfig, state, clusterResourceManager, Optional.of(allocator));

// start triggers a request
cpm.start();
Expand Down Expand Up @@ -596,7 +591,7 @@ private void testContainerRequestedRetriesNotExceedingWindowOnFailureWithUnknown
assertEquals(0, allocator.getContainerRequestState().numPendingRequests());
assertEquals(0, allocator.getContainerRequestState().numDelayedRequests());

cpm.stop();

}

@Test
Expand All @@ -617,8 +612,7 @@ public void testInvalidNotificationsAreIgnored() throws Exception {
state,
containerManager);

ContainerProcessManager cpm =
spy(buildContainerProcessManager(clusterManagerConfig, state, clusterResourceManager, Optional.of(allocator)));
cpm = spy(buildContainerProcessManager(clusterManagerConfig, state, clusterResourceManager, Optional.of(allocator)));

// Start the task clusterResourceManager
cpm.start();
Expand Down Expand Up @@ -693,8 +687,7 @@ public void testAllBufferedResourcesAreUtilized() throws Exception {
state,
containerManager);

ContainerProcessManager cpm =
spy(buildContainerProcessManager(new ClusterManagerConfig(cfg), state, clusterResourceManager, Optional.of(allocator)));
cpm = spy(buildContainerProcessManager(new ClusterManagerConfig(cfg), state, clusterResourceManager, Optional.of(allocator)));

cpm.start();
assertFalse(cpm.shouldShutdown());
Expand Down Expand Up @@ -758,8 +751,7 @@ public void testDuplicateNotificationsDoNotAffectJobHealth() throws Exception {
state,
containerManager);

ContainerProcessManager cpm =
spy(buildContainerProcessManager(clusterManagerConfig, state, clusterResourceManager, Optional.of(allocator)));
cpm = spy(buildContainerProcessManager(clusterManagerConfig, state, clusterResourceManager, Optional.of(allocator)));

// Start the task manager
cpm.start();
Expand Down Expand Up @@ -833,8 +825,7 @@ public void testNewContainerRequestedOnFailureWithKnownCode() throws Exception {
state,
containerManager);

ContainerProcessManager cpm =
spy(buildContainerProcessManager(clusterManagerConfig, state, clusterResourceManager, Optional.of(allocator)));
cpm = spy(buildContainerProcessManager(clusterManagerConfig, state, clusterResourceManager, Optional.of(allocator)));

// Start the task clusterResourceManager
cpm.start();
Expand Down Expand Up @@ -906,13 +897,16 @@ public void testNewContainerRequestedOnFailureWithKnownCode() throws Exception {
assertFalse(cpm.shouldShutdown());
assertFalse(state.jobHealthy.get());
assertEquals(ResourceRequestState.ANY_HOST, allocator.getContainerRequestState().peekPendingRequest().getPreferredHost());

cpm.stop();
}

@After
public void teardown() {
if (cpm != null) {
cpm.stop();
}
server.stop();
containerPlacementMetadataStore.stop();
coordinatorStreamStore.close();
}

private ContainerProcessManager buildContainerProcessManager(ClusterManagerConfig clusterManagerConfig, SamzaApplicationState state,
Expand Down