package org.apache.beam.runners.direct;

import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import org.apache.beam.runners.core.KeyedWorkItem;
import org.apache.beam.runners.core.KeyedWorkItems;
import org.apache.beam.runners.core.StateNamespace;
import org.apache.beam.runners.core.StateNamespaces;
import org.apache.beam.runners.core.StateTags;
import org.apache.beam.runners.core.TimerInternals;
import org.apache.beam.runners.direct.DirectExecutionContext;
import org.apache.beam.runners.direct.DirectRunner;
import org.apache.beam.runners.direct.ParDoMultiOverrideFactory;
import org.apache.beam.runners.direct.StepTransformResult;
import org.apache.beam.runners.direct.repackaged.com.google.common.cache.CacheBuilder;
import org.apache.beam.runners.direct.repackaged.com.google.common.cache.CacheLoader;
import org.apache.beam.runners.direct.repackaged.com.google.common.cache.LoadingCache;
import org.apache.beam.runners.direct.repackaged.com.google.common.collect.Iterables;
import org.apache.beam.runners.direct.repackaged.com.google.common.collect.Lists;
import org.apache.beam.sdk.transforms.AppliedPTransform;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.reflect.DoFnSignature;
import org.apache.beam.sdk.transforms.reflect.DoFnSignatures;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.util.WindowingStrategy;
import org.apache.beam.sdk.util.state.StateSpec;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionTuple;
import org.apache.beam.sdk.values.TaggedPValue;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:org/apache/beam/runners/direct/StatefulParDoEvaluatorFactory.class */
public final class StatefulParDoEvaluatorFactory<K, InputT, OutputT> implements TransformEvaluatorFactory {
    private final LoadingCache<AppliedPTransformOutputKeyAndWindow<K, InputT, OutputT>, Runnable> cleanupRegistry;
    private final ParDoEvaluatorFactory<KV<K, InputT>, OutputT> delegateFactory;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/beam/runners/direct/StatefulParDoEvaluatorFactory$AppliedPTransformOutputKeyAndWindow.class */
    public static abstract class AppliedPTransformOutputKeyAndWindow<K, InputT, OutputT> {
        /* JADX INFO: Access modifiers changed from: package-private */
        public abstract AppliedPTransform<PCollection<? extends KeyedWorkItem<K, KV<K, InputT>>>, PCollectionTuple, ParDoMultiOverrideFactory.StatefulParDo<K, InputT, OutputT>> getTransform();

        /* JADX INFO: Access modifiers changed from: package-private */
        public abstract StructuralKey<K> getKey();

        /* JADX INFO: Access modifiers changed from: package-private */
        public abstract BoundedWindow getWindow();

        static <K, InputT, OutputT> AppliedPTransformOutputKeyAndWindow<K, InputT, OutputT> create(AppliedPTransform<PCollection<? extends KeyedWorkItem<K, KV<K, InputT>>>, PCollectionTuple, ParDoMultiOverrideFactory.StatefulParDo<K, InputT, OutputT>> appliedPTransform, StructuralKey<K> structuralKey, BoundedWindow boundedWindow) {
            return new AutoValue_StatefulParDoEvaluatorFactory_AppliedPTransformOutputKeyAndWindow(appliedPTransform, structuralKey, boundedWindow);
        }
    }

    /* loaded from: input_file:org/apache/beam/runners/direct/StatefulParDoEvaluatorFactory$CleanupSchedulingLoader.class */
    private class CleanupSchedulingLoader extends CacheLoader<AppliedPTransformOutputKeyAndWindow<K, InputT, OutputT>, Runnable> {
        private final EvaluationContext evaluationContext;

        public CleanupSchedulingLoader(EvaluationContext evaluationContext) {
            this.evaluationContext = evaluationContext;
        }

        @Override // org.apache.beam.runners.direct.repackaged.com.google.common.cache.CacheLoader
        public Runnable load(final AppliedPTransformOutputKeyAndWindow<K, InputT, OutputT> appliedPTransformOutputKeyAndWindow) {
            String stepName = this.evaluationContext.getStepName(appliedPTransformOutputKeyAndWindow.getTransform());
            HashMap hashMap = new HashMap();
            for (TaggedPValue taggedPValue : appliedPTransformOutputKeyAndWindow.getTransform().getOutputs()) {
                hashMap.put(taggedPValue.getTag(), taggedPValue.getValue());
            }
            WindowingStrategy<?, ?> windowingStrategy = ((PCollection) hashMap.get(((ParDoMultiOverrideFactory.StatefulParDo) appliedPTransformOutputKeyAndWindow.getTransform().getTransform()).getUnderlyingParDo().getMainOutputTag())).getWindowingStrategy();
            BoundedWindow window = appliedPTransformOutputKeyAndWindow.getWindow();
            final DoFn fn = ((ParDoMultiOverrideFactory.StatefulParDo) appliedPTransformOutputKeyAndWindow.getTransform().getTransform()).getUnderlyingParDo().getFn();
            final DoFnSignature signature = DoFnSignatures.getSignature(fn.getClass());
            final DirectExecutionContext.DirectStepContext orCreateStepContext = this.evaluationContext.getExecutionContext(appliedPTransformOutputKeyAndWindow.getTransform(), appliedPTransformOutputKeyAndWindow.getKey()).getOrCreateStepContext(stepName, stepName);
            final StateNamespace window2 = StateNamespaces.window(windowingStrategy.getWindowFn().windowCoder(), window);
            Runnable runnable = new Runnable() { // from class: org.apache.beam.runners.direct.StatefulParDoEvaluatorFactory.CleanupSchedulingLoader.1
                @Override // java.lang.Runnable
                public void run() {
                    for (DoFnSignature.StateDeclaration stateDeclaration : signature.stateDeclarations().values()) {
                        try {
                            orCreateStepContext.m4stateInternals().state(window2, StateTags.tagForSpec(stateDeclaration.id(), (StateSpec) stateDeclaration.field().get(fn))).clear();
                        } catch (IllegalAccessException e) {
                            throw new RuntimeException(String.format("Error accessing %s for %s", StateSpec.class.getName(), fn.getClass().getName()), e);
                        }
                    }
                    StatefulParDoEvaluatorFactory.this.cleanupRegistry.invalidate(appliedPTransformOutputKeyAndWindow);
                }
            };
            this.evaluationContext.scheduleAfterWindowExpiration(appliedPTransformOutputKeyAndWindow.getTransform(), window, windowingStrategy, runnable);
            return runnable;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/beam/runners/direct/StatefulParDoEvaluatorFactory$StatefulParDoEvaluator.class */
    public static class StatefulParDoEvaluator<K, InputT> implements TransformEvaluator<KeyedWorkItem<K, KV<K, InputT>>> {
        private final DoFnLifecycleManagerRemovingTransformEvaluator<KV<K, InputT>> delegateEvaluator;

        public StatefulParDoEvaluator(DoFnLifecycleManagerRemovingTransformEvaluator<KV<K, InputT>> doFnLifecycleManagerRemovingTransformEvaluator) {
            this.delegateEvaluator = doFnLifecycleManagerRemovingTransformEvaluator;
        }

        @Override // org.apache.beam.runners.direct.TransformEvaluator
        public void processElement(WindowedValue<KeyedWorkItem<K, KV<K, InputT>>> windowedValue) throws Exception {
            BoundedWindow boundedWindow = (BoundedWindow) Iterables.getOnlyElement(windowedValue.getWindows());
            Iterator it = ((KeyedWorkItem) windowedValue.getValue()).elementsIterable().iterator();
            while (it.hasNext()) {
                this.delegateEvaluator.processElement((WindowedValue) it.next());
            }
            Iterator it2 = ((KeyedWorkItem) windowedValue.getValue()).timersIterable().iterator();
            while (it2.hasNext()) {
                this.delegateEvaluator.onTimer((TimerInternals.TimerData) it2.next(), boundedWindow);
            }
        }

        @Override // org.apache.beam.runners.direct.TransformEvaluator
        public TransformResult<KeyedWorkItem<K, KV<K, InputT>>> finishBundle() throws Exception {
            TransformResult<KV<K, InputT>> finishBundle = this.delegateEvaluator.finishBundle();
            StepTransformResult.Builder<InputT> addOutput = StepTransformResult.withHold(finishBundle.getTransform(), finishBundle.getWatermarkHold()).withTimerUpdate(finishBundle.getTimerUpdate()).withState(finishBundle.getState()).withAggregatorChanges(finishBundle.getAggregatorChanges()).withMetricUpdates(finishBundle.getLogicalMetricUpdates()).addOutput(Lists.newArrayList(finishBundle.getOutputBundles()));
            for (WindowedValue<KV<K, InputT>> windowedValue : finishBundle.getUnprocessedElements()) {
                addOutput.addUnprocessedElements(windowedValue.withValue(KeyedWorkItems.elementsWorkItem(((KV) windowedValue.getValue()).getKey(), Collections.singleton(windowedValue))));
            }
            return addOutput.build();
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public StatefulParDoEvaluatorFactory(EvaluationContext evaluationContext) {
        this.delegateFactory = new ParDoEvaluatorFactory<>(evaluationContext);
        this.cleanupRegistry = (LoadingCache<AppliedPTransformOutputKeyAndWindow<K, InputT, OutputT>, Runnable>) CacheBuilder.newBuilder().weakValues().build(new CleanupSchedulingLoader(evaluationContext));
    }

    @Override // org.apache.beam.runners.direct.TransformEvaluatorFactory
    public <T> TransformEvaluator<T> forApplication(AppliedPTransform<?, ?, ?> appliedPTransform, DirectRunner.CommittedBundle<?> committedBundle) throws Exception {
        return createEvaluator(appliedPTransform, committedBundle);
    }

    @Override // org.apache.beam.runners.direct.TransformEvaluatorFactory
    public void cleanup() throws Exception {
        this.delegateFactory.cleanup();
    }

    private TransformEvaluator<KeyedWorkItem<K, KV<K, InputT>>> createEvaluator(AppliedPTransform<PCollection<? extends KeyedWorkItem<K, KV<K, InputT>>>, PCollectionTuple, ParDoMultiOverrideFactory.StatefulParDo<K, InputT, OutputT>> appliedPTransform, DirectRunner.CommittedBundle<KeyedWorkItem<K, KV<K, InputT>>> committedBundle) throws Exception {
        DoFn<KV<K, InputT>, OutputT> fn = ((ParDoMultiOverrideFactory.StatefulParDo) appliedPTransform.getTransform()).getUnderlyingParDo().getFn();
        if (DoFnSignatures.getSignature(fn.getClass()).stateDeclarations().size() > 0) {
            Iterator<WindowedValue<KeyedWorkItem<K, KV<K, InputT>>>> it = committedBundle.getElements().iterator();
            while (it.hasNext()) {
                Iterator it2 = it.next().getWindows().iterator();
                while (it2.hasNext()) {
                    this.cleanupRegistry.get(AppliedPTransformOutputKeyAndWindow.create(appliedPTransform, committedBundle.getKey(), (BoundedWindow) it2.next()));
                }
            }
        }
        return new StatefulParDoEvaluator(this.delegateFactory.createEvaluator(appliedPTransform, committedBundle.getKey(), fn, ((ParDoMultiOverrideFactory.StatefulParDo) appliedPTransform.getTransform()).getUnderlyingParDo().getSideInputs(), ((ParDoMultiOverrideFactory.StatefulParDo) appliedPTransform.getTransform()).getUnderlyingParDo().getMainOutputTag(), ((ParDoMultiOverrideFactory.StatefulParDo) appliedPTransform.getTransform()).getUnderlyingParDo().getSideOutputTags().getAll()));
    }
}
